11、CompletableFuture 2022-02-15 19:35 > 本文尚未完稿,以后会修改的。 ```java package com.example.demo.core.ForkJoin; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; import java.util.stream.Collectors; /** * @author: HanXu * on 2022/1/10 * Class description: CompletableFuture, 对Future的增强 */ public class CompletableFutureDemo { public static void main(String[] args) throws Exception { /*CompletableFuture completableFuture = new CompletableFuture(); new Thread( () -> { System.out.println("子线程开始干活"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } completableFuture.complete("success"); } ).start(); //主线程调用 get 方法阻塞 System.out.println("主线程调用 get 方法获取结果为: " + completableFuture.get()); System.out.println("主线程完成,阻塞结束!!!!!!");*/ // thenApply(); // test1(); // test4(); // test5(); // test6(); // test7(); test8(); } /** * 没有返回值的异步任务 * * @throws Exception */ public static void test00() throws Exception { System.out.println("主线程开始"); //运行一个没有返回值的异步任务 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { System.out.println("子线程启动干活"); Thread.sleep(5000); System.out.println("子线程完成"); } catch (Exception e) { e.printStackTrace(); } }); //主线程阻塞 future.get(); System.out.println("主线程结束"); } /** * 有返回值的异步任务 * * @throws Exception */ public static void test01() throws Exception { System.out.println("主线程开始"); //运行一个有返回值的异步任务 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { System.out.println("子线程开始任务"); Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return "111111"; }); //主线程阻塞 String s = future.get(); System.out.println("主线程结束, 子线程的结果为:" + s); } private static Integer num = 10; /** * thenApply() 一个线程依赖另一个线程时,可以把他们串行化 * * @throws Exception */ public static void test1() throws Exception { System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { System.out.println("加 10 任务开始"); num += 10; } catch (Exception e) { e.printStackTrace(); } return num; }).thenApply(integer -> num * num); Integer integer = future.get(); System.out.println("主线程结束, 子线程的结果为:" + integer); } /** * thenAccept() 消费处理结果,无返回值 * * @throws Exception */ public static void test2() throws Exception { System.out.println("主线程开始"); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { try { System.out.println("加 10 任务开始"); num += 10; } catch (Exception e) { e.printStackTrace(); } return num; }).thenApply(integer -> num * num).thenAccept(integer -> System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + integer)); //阻塞等待全部执行 future.get(); } /** * 异常处理: exceptionally * * @throws Exception */ public static void test3() throws Exception { System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; System.out.println("加 10 任务开始"); num += 10; return num; }).exceptionally(ex -> { System.out.println(ex.getMessage()); return -1; }); System.out.println(future.get()); } /** * handle 类似于thenAccept(),是最后一步处理调用,但是可以处理异常。 * * @throws Exception */ public static void test4() throws Exception { System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任务开始"); num += 10; // int i = 10 / 0; return num; }).handle((i, ex) -> { System.out.println("进入 handle 方法"); if (ex != null) { System.out.println("发生了异常,内容为:" + ex.getMessage()); return -1; } else { System.out.println("正常完成,内容为: " + i); return i; } }); System.out.println(future.get()); } /** * thenCompose()合并两个有依赖关系的CompletableFutures的执行结果 * * @throws Exception */ public static void test5() throws Exception { System.out.println("主线程开始"); //第一步加 10 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任务开始"); num += 10; return num; }); //合并 CompletableFuture<Integer> future1 = future.thenCompose(i -> //再来一个 CompletableFuture CompletableFuture.supplyAsync(() -> i + 1)); System.out.println(future.get()); System.out.println(future1.get()); } /** * thenCombine 合并两个没有依赖关系的 CompletableFutures 任务 * * @throws Exception */ public static void test6() throws Exception { System.out.println("主线程开始"); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任务开始"); num += 10; return num; }); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { System.out.println("乘以 10 任务开始"); num = num * 10; return num; }); //合并两个结果 CompletableFuture<Object> future = job1.thenCombine(job2, (BiFunction<Integer, Integer, List<Integer>>) (a, b) -> { List<Integer> list = new ArrayList<>(); list.add(a); list.add(b); return list; }); System.out.println("合并结果为:" + future.get()); } /** * allOf() 一系列独立的 future 任务,等其所有的任务执行完后做一些事情 * * @throws Exception */ public static void test7() throws Exception { System.out.println("主线程开始"); List<CompletableFuture> list = new ArrayList<>(); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任务开始"); num += 10; return num; }); list.add(job1); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { System.out.println("乘以 10 任务开始"); num = num * 10; return num; }); list.add(job2); CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { System.out.println("减以 10 任务开始"); num = num * 10; return num; }); list.add(job3); CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { System.out.println("除以 10 任务开始"); num = num * 10; return num; }); list.add(job4); //多任务合并 List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList()); System.out.println(collect); } /** * anyOf: 只要在多个 future 里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束 * * @throws Exception */ public static void test8() throws Exception { System.out.println("主线程开始"); CompletableFuture<Integer>[] futures = new CompletableFuture[4]; CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); System.out.println("加 10 任务开始"); num += 10; return num; } catch (Exception e) { return 0; } }); futures[0] = job1; CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); System.out.println("乘以 10 任务开始"); num = num * 10; return num; } catch (Exception e) { return 1; } }); futures[1] = job2; CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); System.out.println("减以 10 任务开始"); num = num * 10; return num; } catch (Exception e) { return 2; } }); futures[2] = job3; CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(4000); System.out.println("除以 10 任务开始"); num = num * 10; return num; } catch (Exception e) { return 3; } }); futures[3] = job4; CompletableFuture<Object> future = CompletableFuture.anyOf(futures); System.out.println(future.get()); } } ``` --END--
发表评论