10、JUC辅助类 2022-02-15 19:34 ### CountDownLatch 设定一个初始值N,当一个线程调用countDown()时,N值减一。主线程调用await(),将会阻塞,当N值减到0时,await()将会通过。 N不可重复利用, 减到0之后就一直是0了 ```java package com.example.demo.core.JUCUtils; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; /** * @author: HanXu * on 2022/1/6 * Class description: 计数器 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { // simpleTest(); // blockNThread(); // blockOne(); CountDownLatch countDownLatch = new CountDownLatch(5); long count = countDownLatch.getCount(); System.out.println(count); //一直阻塞 countDownLatch.await(); //阻塞一定时间 //boolean await = countDownLatch.await(2, TimeUnit.SECONDS); //System.out.println(await); } private static void blockOne() throws InterruptedException { //一个阻塞,等待其他多个线程完成任务,最后由这一个线程汇总结果 CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { new Thread(() -> { try { Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); System.out.println(Thread.currentThread().getName() + ":finished"); }).start(); } countDownLatch.await(); System.out.println("主线程:所有任务完成后,收集结果并汇总"); } private static void blockNThread() throws InterruptedException { //多个线程阻塞,等待一个线程开启计时器。 //通过CountDownLatch的await()实现了让多个线程一起阻塞,然后主线程调用countDown()将计数器减为0,让所有线程一起往下执行。模拟(接近)并发请求。 CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 5; i++) { new Thread(() -> { try { countDownLatch.await();//所有线程都阻塞在这里 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":开始执行"); //doSomething }).start(); } System.out.println("裁判正在倒计时"); for (int i = 3; i > 0 ; i--) { System.out.println(i); Thread.sleep(1000); } System.out.println("比赛开始!"); countDownLatch.countDown(); } private static void simpleTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(10); //计数器不为0时,阻塞,为0时,会被唤醒 //countDownLatch.await(); //计数器减一 countDownLatch.countDown(); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { System.out.println("我开始执行了"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("我被唤醒了"); }); Thread.sleep(20); for (int i = 0; i < 10; i++) { //countDownLatch.countDown(); } System.out.println("结束了"); } } ``` 参考:[CountDownLatch的两种常用场景](https://zhuanlan.zhihu.com/p/148231820) ![](http://minio.riun.xyz/riun1/2022-01-06_1QRfHQgcZXFFxxVhz5.jpg) ### CyclicBarrier 设置初始值N,调用await()的线程会等待,等到等待的线程数量达到N,所有线程一起执行。 N可以重复利用 ```java package com.example.demo.core.JUCUtils; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; /** * @author: HanXu * on 2022/1/6 * Class description: CyclicBarrier:循环栅栏。 障碍点,所有线程都要在这个障碍点互相等待,到齐了才能继续往下。 * * 栅栏:他的主要方法就是await(),调用await()的线程会阻塞等待,知道阻塞等待的线程数量等于CyclicBarrier设置的值N。然后所有线程会一起执行。 * 循环:当所有线程一起执行后,CyclicBarrier内部的值又变回了N。可以重复利用 * * 循环主要是和CountDownLatch区别,CountDownLatch在countDown()减到0后,所有await()的线程一起执行。但是他内部的值不会恢复了,就变为0了。 * * CyclicBarrier还新增了回调属性,当任务完成后,会执行Runnable。 */ public class CyclicBarrierDemo { //阶段 static int peroid = 1; /** * 里程碑 * 每阶段完成后,会执行这里; */ static Runnable milestoneRunnable = new Runnable() { @Override public void run() { switch (peroid) { case 1: System.out.println("********第1阶段***************"); break; case 2: System.out.println("********第2阶段***************"); break; case 3: System.out.println("********第3阶段***************"); break; } peroid++; } }; static final int parties = 5; static final Random random = new Random(); //这种直接将任务传入的方式,执行回调时是最后一个await()的线程去执行的 // static final CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, milestoneRunnable); //可以搞一个线程池去执行,这样回调就是异步执行的了 static ExecutorService executorService = Executors.newFixedThreadPool(1); static final CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> { executorService.execute(milestoneRunnable); }); public static void main(String[] args) { // 所有人,开始行动 for (int i = 0; i < parties; i++) { new StaffThread().start(); } } static class StaffThread extends Thread { @Override public void run() { try { String staff = "员工【" + Thread.currentThread().getName() + "】"; // 第一阶段:来公司集合 System.out.println(staff + "从家出发了……"); Thread.sleep(random.nextInt(5000)); System.out.println(staff + "到达公司"); // 协同,第一次等大家到齐 cyclicBarrier.await(); // 第二阶段:出发去公园 System.out.println(staff + "出发去公园玩"); Thread.sleep(random.nextInt(5000)); System.out.println(staff + "到达公园门口集合"); // 协同:第二次等大家到齐 cyclicBarrier.await(); // 第三阶段:去餐厅 System.out.println(staff + "出发去餐厅"); Thread.sleep(random.nextInt(5000)); System.out.println(staff + "到达餐厅"); // 协同:第三次等大家到齐 cyclicBarrier.await(); // 第四阶段:就餐 System.out.println(staff + "开始用餐"); Thread.sleep(random.nextInt(5000)); System.out.println(staff + "用餐结束,回家"); } catch (Exception e) { e.printStackTrace(); } } } public static void main1(String[] args) throws BrokenBarrierException, InterruptedException { CyclicBarrier cyclicBarrier = new CyclicBarrier(5); // cyclicBarrier.await(); // int numberWaiting = cyclicBarrier.getNumberWaiting(); // System.out.println(numberWaiting); test(cyclicBarrier); } private static void test(CyclicBarrier cyclicBarrier) { for (int i = 0; i < 5; i++) { new Thread(() -> { int randomInt = ThreadLocalRandom.current().nextInt(2000); try { Thread.sleep(randomInt); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":从家到公司花费时间【" + randomInt + "】"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } //所有线程都阻塞在cyclicBarrier.await();,等到阻塞线程数量达到5后,一起向下执行 System.out.println(Thread.currentThread().getName() + ":到齐了:" + System.currentTimeMillis()); }).start(); } } } ``` 参考: - [CyclicBarrier多任务协同的利器](https://zhuanlan.zhihu.com/p/148521577) - [CountDownLatch和CyclicBarrier让多线程步调一致( 微信公众号:对账系统 )](https://mp.weixin.qq.com/s?__biz=MzAwNTE3MzgwNA==&mid=2247483867&idx=1&sn=e7e8684850f1fcf9734c3567beb06089&chksm=9b21e698ac566f8ec0691b365a32c9aaac2e004758bcbb461343eba316e55a86652dbb5b988b&scene=21#wechat_redirect) ![](http://minio.riun.xyz/riun1/2022-01-07_1Rd4ERc1xa2N4TVY0Y.jpg) ### Semaphore 给定一个初始值N代表N个信号量,调用acquire()会使用一个信号量,调用release()会释放一个信号量 N可以重复利用 ```java package com.example.demo.core.JUCUtils; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author: HanXu * on 2022/1/7 * Class description: Semaphore信号量。n个信号量,acquire()表示使用了一个,release()表示释放了一个。 */ @Slf4j public class SemaphoreDemo { public static void main(String[] args) throws InterruptedException { //10个人 int personNum = 10; //2个窗口 Semaphore semaphore = new Semaphore(2); CountDownLatch countDownLatch = new CountDownLatch(personNum); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < personNum; i++) { executorService.execute(() -> { try { semaphore.acquire(); doSomething(); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); //代表一个人办理完业务了 countDownLatch.countDown(); } }); } //这里是等10个人都办理完业务,就关闭程序。countDownLatch在这里只是辅助关闭程序的,不是必要 countDownLatch.await(); executorService.shutdown(); } private static void doSomething() { System.out.println(Thread.currentThread().getName() + ":正在办理业务"); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` ### 不同点: - 1、**CountDownLatch 主要用来解决一个线程等待多个线程的场景**,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点; 而 **CyclicBarrier 是一组线程之间互相等待**,更像是几个驴友之间不离不弃。 - 2、除此之外 CountDownLatch的计数器是**不能循环利用**的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。 但CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。 - 3、除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。 --END--
发表评论