6、线程间通信 2022-02-15 19:32 > 以下基于JDK1.8 ### 线程间通信 使用**锁**来解决线程间彼此**干涉**的问题。线程间**通信**是解决线程间彼此**协调**的问题。 《Java编程思想》中提到的几种线程间通信的方式: - wait/notify/notifyAll - Lock和Condition - 管道(PipedReader、PipedWriter,跟阻塞队列差不多) - 阻塞队列 其实广义上来讲,线程只要不是孤立执行,就一定会涉及到线程间通信。 - join:一个线程让另一个线程加入执行 - LockSupport.park()/unpark() 阻塞/唤醒 - 线程工具类: CountDownLatch 等 - 线程中断,t.interrupt();:一个线程中对另一个线程发起中断请求 - [volatile](http://riun.xyz/work/55):保证内存可见性,也可以用来线程间通信 线程间通信的本质:共享内存,消息传递。 #### 1、synchronized 使用synchronized与wait()、notifyAll() 【共享内存,对象监视器】 ```java package com.example.demo.core.ThreadDemo.signal; /** * @author: HanXu * on 2022/1/13 * Class description: 线程间通信:synchronized与wait()、notifyAll() 【共享内存,对象监视器】 * 交替打印、奇偶数 */ public class SyncDemo { private static volatile int i = 0; private static int count = 10; public static void main(String[] args) { //对象监视器 Object monitor = new Object(); new Thread(() -> { synchronized (monitor) { while (i < count) { System.out.println(Thread.currentThread().getName() + ":" + i++); monitor.notifyAll(); try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "A").start(); new Thread(() -> { synchronized (monitor) { while (i < count) { System.out.println(Thread.currentThread().getName() + ":" + i++); monitor.notifyAll(); try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "B").start(); } } /* A:0 B:1 A:2 B:3 A:4 B:5 A:6 B:7 A:8 B:9 */ ``` #### 2、Lock Lock 与 await()、signalAll() 【JDK并发类库】 ```java package com.example.demo.core.ThreadDemo.signal; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author: HanXu * on 2022/1/13 * Class description: 线程间通信:Lock 与 await()、signalAll() 【JDK并发类库】 * 交替打印、奇偶数 */ public class LockDemo { private static final Lock lock = new ReentrantLock(); private static final Condition condition = lock.newCondition(); private static volatile int i = 0; private static int count = 10; public static void main(String[] args) { new Thread(() -> { lock.lock(); try { while (i < count) { System.out.println(Thread.currentThread().getName() + ":" + i++); condition.signalAll(); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { lock.unlock(); } }, "A").start(); new Thread(() -> { lock.lock(); try { while (i < count) { System.out.println(Thread.currentThread().getName() + ":" + i++); condition.signalAll(); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { lock.unlock(); } }, "B").start(); } } /* A:0 B:1 A:2 B:3 A:4 B:5 A:6 B:7 A:8 B:9 */ ``` #### 3、volatile volatile (AtomicInteger类似) ```java package com.example.demo.core.ThreadDemo.signal; /** * @author: HanXu * on 2022/1/13 * Class description: 线程间通信:volatile (AtomicInteger类似) * 交替打印、奇偶数 */ public class VolatileDemo { private static volatile int i = 0; private static int count = 10; /** * volatile保证内存的可见性 * 这里的先输出i,然后在i++。不能改为输出i++。因为合并成一句代码之后,执行时先执行的i++,会导致刚加完还没输出,被另一个线程抢占,然后输出他的内容。 * 最终导致的结果就是两个线程不是交替输出的。 */ public static void main(String[] args) { new Thread(() -> { while (i < count) { if ((i & 1) == 1) { //奇数 System.out.println(Thread.currentThread().getName() + ":" + i); i++; } } }, "A").start(); new Thread(() -> { while (i < count) { if ((i & 1) != 1) { //偶数 System.out.println(Thread.currentThread().getName() + ":" + i); i++; } } }, "B").start(); } } /* B:0 A:1 B:2 A:3 B:4 A:5 B:6 A:7 B:8 A:9 B:10 */ ``` #### 4、阻塞队列 ```java package com.example.demo.core.ThreadDemo.signal; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @author hanxu03 * 线程间通信:BlockingQueue * 写线程向阻塞队列添加字符,读线程从阻塞队列获取字符并打印到控制台,读取不到时阻塞。 */ public class SendReceiveBQ { public static void main(String[] args) throws InterruptedException { Sender sender = new Sender(); Receiver receiver = new Receiver(sender); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(sender); executorService.execute(receiver); TimeUnit.SECONDS.sleep(2); executorService.shutdownNow(); } } class Receiver implements Runnable { private LinkedBlockingQueue<Character> blockingQueue; Receiver(Sender sender) { //使用同一个阻塞队列 blockingQueue = sender.getQueue(); } @Override public void run() { try { while (true) { //阻塞,直到读取到字符 System.out.println("Receiver:" + blockingQueue.take()); } } catch (InterruptedException e) { System.out.println(e + " Receiver interrupted"); } } } class Sender implements Runnable { private Random random = new Random(47); private LinkedBlockingQueue<Character> blockingQueue = new LinkedBlockingQueue<>(); public LinkedBlockingQueue<Character> getQueue() { return blockingQueue; } @Override public void run() { try { while (true) { for (char c = 'A'; c <= 'z'; c++) { blockingQueue.put(c); TimeUnit.MILLISECONDS.sleep(random.nextInt(500)); } } } catch (InterruptedException e) { System.out.println(e + " Sender sleep interrupted"); } } } ``` 执行结果: ``` Receiver:A Receiver:B Receiver:C Receiver:D Receiver:E Receiver:F Receiver:G java.lang.InterruptedException Receiver interrupted java.lang.InterruptedException: sleep interrupted Sender sleep interrupted ``` ### 线程间定制化通信 交替加减。A线程对一个值加1,B线程对一个值减1 #### 使用synchronized实现: ```java package com.example.demo.core.ThreadDemo.signal.IncrDecr; /** * @author hanxu03 * * 线程间通信:交替加减。A线程对一个值加1,B线程对一个值减1 * 1、synchronized wait() notifyAll() * 2、Condition await() signal() */ public class IncrDecrClass1 { //加减对象 private volatile int number = 0; public static void main(String[] args) { IncrDecrClass1 incrDecrClass1 = new IncrDecrClass1(); new Thread(() -> { for (int i = 0; i < 5; i++) { incrDecrClass1.increment(); } }, "线程 A").start(); new Thread(() -> { for (int i = 0; i < 5; i++) { incrDecrClass1.decrement(); } }, "线程 B").start(); } /** * 加 1 */ public synchronized void increment() { try { while (number != 0) { this.wait(); } number++; System.out.println("--------" + Thread.currentThread().getName() + " 加一成功----------, 值为:" + number); this.notifyAll(); } catch (Exception e) { e.printStackTrace(); } } /** * 减一 */ public synchronized void decrement() { try { while (number == 0) { this.wait(); } number--; System.out.println("--------" + Thread.currentThread().getName() + " 减一成功----------, 值为:" + number); this.notifyAll(); } catch (Exception e) { e.printStackTrace(); } } } ``` 执行结果: ``` --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 ``` #### 使用Lock实现: ```java package com.example.demo.core.ThreadDemo.signal.IncrDecr; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author hanxu03 * * 线程间通信:交替加减。A线程对一个值加1,B线程对一个值减1 * 1、synchronized wait() notifyAll() * 2、Condition await() signal() */ public class IncrDecrClass2 { //加减对象 private volatile int number = 0; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); public static void main(String[] args) { IncrDecrClass2 incrDecrClass1 = new IncrDecrClass2(); new Thread(() -> { for (int i = 0; i < 5; i++) { incrDecrClass1.increment(); } }, "线程 A").start(); new Thread(() -> { for (int i = 0; i < 5; i++) { incrDecrClass1.decrement(); } }, "线程 B").start(); } /** * 加 1 */ public void increment() { lock.lock(); try { while (number != 0) { condition.await(); } number++; System.out.println("--------" + Thread.currentThread().getName() + " 加一成功----------, 值为:" + number); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 减一 */ public void decrement() { lock.lock(); try { while (number == 0) { condition.await(); } number--; System.out.println("--------" + Thread.currentThread().getName() + " 减一成功----------, 值为:" + number); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } ``` 执行结果: ```java --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 --------线程 A 加一成功----------, 值为:1 --------线程 B 减一成功----------, 值为:0 ``` #### 三个线程通信: 三个线程,A 线程打印 5 次 A,B 线程打印 10 次 B,C 线程打印 15 次 C。按照此顺序循环 10 轮 ##### 正确案例: ```java package com.example.demo.core.ThreadDemo.signal.ThreeSignal; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author: HanXu * on 2021/12/29 * Class description: 定制化通信 * A 线程打印 5 次 A,B 线程打印 10 次 B,C 线程打印 15 次 C, * 按照此顺序循环 10 轮 */ public class CustomerPrintABCClass { //多线程之间共享的内存信息:1、计数 2、标记 private volatile int number; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); public static void main(String[] args) { CustomerPrintABCClass customerPrintABCClass = new CustomerPrintABCClass(); new Thread( () -> { for (int i = 0; i < 10; i++) { customerPrintABCClass.printA(); } } , "线程A").start(); new Thread( () -> { for (int i = 0; i < 10; i++) { customerPrintABCClass.printB(); } } , "线程B").start(); new Thread( () -> { for (int i = 0; i < 10; i++) { customerPrintABCClass.printC(); } } , "线程C").start(); } public void printA() { lock.lock(); try { while (number != 0) { condition.await(); } for (int i = 0; i < 5; i++) { number++; System.out.println(Thread.currentThread().getName() + "打印第" + number + "次"); } condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { while (number != 5) { condition.await(); } for (int i = 5; i < 15; i++) { number++; System.out.println(Thread.currentThread().getName() + "打印第" + (number - 5) + "次"); } condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { while (number != 15) { condition.await(); } for (int i = 15; i < 30; i++) { number++; System.out.println(Thread.currentThread().getName() + "打印第" + (number - 15) + "次"); } number = 0; condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } ``` 执行结果: ``` 线程A打印第1次 线程A打印第2次 线程A打印第3次 线程A打印第4次 线程A打印第5次 线程B打印第1次 线程B打印第2次 线程B打印第3次 线程B打印第4次 线程B打印第5次 线程B打印第6次 线程B打印第7次 线程B打印第8次 线程B打印第9次 线程B打印第10次 线程C打印第1次 线程C打印第2次 线程C打印第3次 线程C打印第4次 线程C打印第5次 线程C打印第6次 线程C打印第7次 线程C打印第8次 线程C打印第9次 线程C打印第10次 线程C打印第11次 线程C打印第12次 线程C打印第13次 线程C打印第14次 线程C打印第15次 ... ``` ##### 错误案例: 还是上面这个题目,差不多的代码,只需要改一下就能出错: 改动点:把`printC()`里面的while改成if。 ```java package com.example.demo.core.ThreadDemo.signal.ThreeSignal; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author: HanXu * on 2021/12/29 * Class description: 定制化通信 * A 线程打印 5 次 A,B 线程打印 10 次 B,C 线程打印 15 次 C, * 按照此顺序循环 10 轮 */ public class CustomerPrintABCErrorClass { //多线程之间共享的内存信息:1、计数 2、标记 private volatile int number; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); public static void main(String[] args) { CustomerPrintABCErrorClass customerPrintABCClass = new CustomerPrintABCErrorClass(); new Thread( () -> { for (int i = 0; i < 10; i++) { customerPrintABCClass.printA(); } } , "线程A").start(); new Thread( () -> { for (int i = 0; i < 10; i++) { customerPrintABCClass.printB(); } } , "线程B").start(); new Thread( () -> { for (int i = 0; i < 10; i++) { customerPrintABCClass.printC(); } } , "线程C").start(); } public void printA() { lock.lock(); try { while (number != 0) { condition.await(); } for (int i = 0; i < 5; i++) { number++; System.out.println(Thread.currentThread().getName() + "打印第" + number + "次"); } condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { while (number != 5) { condition.await(); } for (int i = 5; i < 15; i++) { number++; System.out.println(Thread.currentThread().getName() + "打印第" + (number - 5) + "次"); } condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { //这里从while改为if if (number != 15) { condition.await(); } for (int i = 15; i < 30; i++) { number++; System.out.println(Thread.currentThread().getName() + "打印第" + (number - 15) + "次"); } number = 0; condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } ``` 执行结果: ``` 线程A打印第1次 线程A打印第2次 线程A打印第3次 线程A打印第4次 线程A打印第5次 线程B打印第1次 线程B打印第2次 线程B打印第3次 线程B打印第4次 线程B打印第5次 线程B打印第6次 线程B打印第7次 线程B打印第8次 线程B打印第9次 线程B打印第10次 线程C打印第1次 线程C打印第2次 线程C打印第3次 线程C打印第4次 线程C打印第5次 线程C打印第6次 线程C打印第7次 线程C打印第8次 线程C打印第9次 线程C打印第10次 线程C打印第11次 线程C打印第12次 线程C打印第13次 线程C打印第14次 线程C打印第15次 线程A打印第1次 线程A打印第2次 线程A打印第3次 线程A打印第4次 线程A打印第5次 线程C打印第-9次 线程C打印第-8次 线程C打印第-7次 线程C打印第-6次 线程C打印第-5次 线程C打印第-4次 ``` ##### 原因: 可以看到第一轮执行正常,第二轮执行时,线程C依旧出错了。他没有等待满足他的条件就执行了。 在多线程编程中,“如果不满足条件,就等待”,不能写成"if wait"这样;因为if只判断一次,线程等待时一旦被唤醒,就会立即向下执行,而不会判断条件满足与否。所以应该使用`while wait` 这样的句式。每次被唤醒,都应该判断是否满足条件。 ### 其他 #### join(): ```java package com.example.demo.core.ThreadDemo; /** * @author: HanXu * on 2022/1/6 * Class description: join */ public class JoinDemo { public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { try { Thread.sleep(1000); System.out.println("线程1执行完了"); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); Thread thread2 = new Thread(() -> { try { Thread.sleep(1000); System.out.println("线程2执行完了"); } catch (InterruptedException e) { e.printStackTrace(); } }); thread2.start(); thread1.join(); thread2.join(); System.out.println("主线程执行"); } } /* 线程1执行完了 线程2执行完了 主线程执行 */ ``` #### LockSupport ```java package com.example.demo.core.Lock.LockSupport; import java.util.concurrent.locks.LockSupport; /** * @author: HanXu * on 2022/1/14 * Class description: 用两个线程,一个输出数字,一个输出字母,交替输出 1A2B3C4D...26Z */ public class Demo2 { public static void main(String[] args) { ThreadA threadA = new ThreadA(); ThreadB threadB = new ThreadB(); threadA.setAnotherThread(threadB); threadB.setAnotherThread(threadA); threadA.start(); threadB.start(); //1A2B3C4D5E6F7G8H9I10J11K12L13M14N15O16P17Q18R19S20T21U22V23W24X25Y26Z } } class ThreadA extends Thread { private Thread anotherThread; public void setAnotherThread(Thread anotherThread) { this.anotherThread = anotherThread; } @Override public void run() { for (int i = 1; i < 27; i++) { //因为是输出1A2B3C4D,所以1先输出,这里直接输出,零一个线程进来就需要阻塞 System.out.print(i); //输出后唤醒另一个线程,然后阻塞自己 LockSupport.unpark(anotherThread); LockSupport.park(); } } } class ThreadB extends Thread { private Thread anotherThread; public void setAnotherThread(Thread anotherThread) { this.anotherThread = anotherThread; } @Override public void run() { for (int i = 'A'; i < 'Z' + 1; i++) { LockSupport.park(); System.out.print((char) i); //输出后唤醒另一个线程 LockSupport.unpark(anotherThread); } } } ``` #### interrupt(): * t.interrupt()只是给了一个中断标记,如果收到中断标记,你还继续执行,那么这个标记就又会变为false。正确的业务做法是收到中断标记后,return * 调用线程的interrupt()设置这个标记,当一个线程通过Thread.interrupted()判断到自己被中断后,立即会将这个状态清空。也就是说判断到是true后如果不及时做出反应,下次再判断就是false了 * 其他线程中,通过某个线程.isInterrupted()判断这个线程是否被中断,不会将中断状态清空。 ```java package com.example.demo.core.ThreadDemo; import java.time.LocalDateTime; /** * @author hanxu03 * 主线程中断自定义线程 */ public class InterruptDemo { public static void main(String[] args) throws InterruptedException { Thread t = new Thread() { @Override public void run() { //预期循环10次 for (int i = 0; i < 10; i++) { try { Thread.sleep(2000); System.out.println("自定义线程:当前时间:" + LocalDateTime.now()); } catch (InterruptedException e) {//这个异常由sleep方法抛出 e.printStackTrace(); System.out.println("自定义线程:收到中断信号,总共循环了" + i + "次..."); //接受到中断信号时,停止运行 //这里如果没有return的话,即使收到中断信号,这个线程依旧会继续执行,一直执行完才会返回。 return; } } } }; t.start(); //主线程休眠5秒 Thread.sleep(5000); System.out.println("主线程:等待5秒后发送中断信号..."); t.interrupt(); } } /* 自定义线程:当前时间:2022-01-13T16:48:40.068 自定义线程:当前时间:2022-01-13T16:48:42.070 主线程:等待5秒后发送中断信号... java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at com.example.demo.core.ThreadDemo.InterruptDemo$1.run(InterruptDemo.java:14) 自定义线程:收到中断信号,总共循环了2次... */ ``` --END--
发表评论