并发编程进阶-CompletableFuture
# CompletableFuture
# 1. Future (opens new window)
# 1.1 概述
Future是Java5
新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通future把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
Future接口定义了操作异步任务执行一些方法
,如何获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断执行是否完毕等。 API地址 (opens new window) java.util.concurrent
应用场景:
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者先执行完,过了一会采取获取子任务的执行结果或者变更任务状态。
示例:老师上课买水问题。
总结:
Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
# 1.2 为什们要用FutureTask类?😄
提示
Future接口常用实现类FutureTask异步任务。API地址 (opens new window) java.util.concurrent
特点:多线程/有返回/异步任务 (异步多线程任务执行且返回有结果)
UML类图
FutureTask 示例
public class FutureDemo01 {
/**
* 目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务
*
* @param args
*/
public static void main(String[] args) {
try {
// 异步任务多线程 FutureTask(Callable<V> callable)
FutureTask<String> futureTask = new FutureTask<>(new MyThread2());
Thread t1 = new Thread(futureTask, "t1");
t1.start();
// 获取返回值
System.out.println(futureTask.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class MyThread2 implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("------ come in call() ------");
return "cnlxc.cn";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
结果:
点击查看
------ come in call() ------
cnlxc.cn
2
# 1.3 线程池方式提升代码性能 🚗
提示
future+线程池异步多线程任务配合,能显著提升程序的执行效率。
# 普通形式
/**
* 3个任务,目前只有一个线程main来处理,请问耗时多少?
*/
private static void task() {
long startTime = System.currentTimeMillis();
// 3个任务
try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {throw new RuntimeException(e);}
try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
long endTime = System.currentTimeMillis();
System.out.println("花费时间:" + (endTime-startTime) + " 毫秒");
System.out.println(Thread.currentThread().getName() + "\t ------end------");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
结果:
花费时间:1329 毫秒
main ------end------
2
# 使用多线程
public static void main(String[] args) {
try {
// 3个任务,目前开启多少个异步任务线程来处理,请问耗时多少?
long startTime = System.currentTimeMillis();
// 3个任务
FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return "task1 over";
});
FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
TimeUnit.MILLISECONDS.sleep(300);
return "task2 over";
});
FutureTask<String> futureTask3 = new FutureTask<String>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return "task3 over";
});
// 方式一:普通形式创建线程(每次新建对象,结束后必须进行垃圾回收)
// Thread t1 = new Thread(futureTask1, "t1");
// t1.start();
// 方式二:使用Executors创建线程池
// ExecutorService threadPool = Executors.newFixedThreadPool(3);
// 方式三:使用线程池管理线程,节约资源。进行线程复用。提升性能。
ExecutorService threadPool = new ThreadPoolExecutor( // 根据阿里巴巴代码规范 线程池此处使用ThreadPoolExecutor 不使用Executors创建
3, // 线程池的核心线程数
5, // 能容纳的最大线程数
2L, // 空闲线程存活时间
TimeUnit.SECONDS, // 存活的时间单位
new ArrayBlockingQueue<>(3), // 存放提交但未执行任务的队列
Executors.defaultThreadFactory(), // 创建线程的工厂类
new ThreadPoolExecutor.AbortPolicy() // 等待队列满后的拒绝策略
);
threadPool.submit(futureTask1);
threadPool.submit(futureTask2);
threadPool.submit(futureTask3);
// 获取返回值
System.out.println(futureTask1.get());
System.out.println(futureTask2.get());
System.out.println(futureTask3.get());
// 释放资源
threadPool.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("花费时间:" + (endTime - startTime) + " 毫秒");
System.out.println(Thread.currentThread().getName() + "\t ------end------");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
结果:
task1 over
task2 over
task3 over
花费时间:534 毫秒
main ------end------
2
3
4
5
注意
问题:线程池执行 submit方法和 execute方法区别?
# 1.4 Future缺点 🚗
# get阻塞
提示
get容易导致阻塞
,一般放在程序后面。一旦调用没见到结果,非要等到结果才会离开,不管你时是否计算完成,容易程序堵塞。- get可以设置超时时间。
- 正常执行
public static void main(String[] args) {
try {
FutureTask<String> futureTask = new FutureTask<String>(() -> {
System.out.println(Thread.currentThread().getName() + " ------ come in ------ ");
TimeUnit.SECONDS.sleep(5);
return "task1 over";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(Thread.currentThread().getName() + " ------ 忙其他任务去了 ------ ");
System.out.println(futureTask.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
main ------ 忙其他任务去了 ------
pool-1-thread-1 ------ come in ------
task1 over
Process finished with exit code 0
2
3
4
5
- 阻塞(不见不散):get放在程序执行的前面
public static void main(String[] args) {
try {
FutureTask<String> futureTask = new FutureTask<String>(() -> {
System.out.println(Thread.currentThread().getName() + " ------ come in ------ ");
TimeUnit.SECONDS.sleep(5);
return "task1 over";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(futureTask.get());
System.out.println(Thread.currentThread().getName() + " ------ 忙其他任务去了 ------ ");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
get容易导致阻塞,一般放在程序后面。一旦调用没见到结果,非要等到结果才会离开,不管你时是否计算完成,容易程序堵塞。
- 过时不候
public static void main(String[] args) {
try {
FutureTask<String> futureTask = new FutureTask<String>(() -> {
System.out.println(Thread.currentThread().getName() + " ------ come in ------ ");
TimeUnit.SECONDS.sleep(5);
return "task1 over";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(Thread.currentThread().getName() + " ------ 忙其他任务去了 ------ ");
// 设置get获取时间,过时不候
System.out.println(futureTask.get(3, TimeUnit.SECONDS));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
结果:
main ------ 忙其他任务去了 ------
t1 ------ come in ------
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.TimeoutException
at cn.cnlxc.future.FutureDemo03.main(FutureDemo03.java:32)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at cn.cnlxc.future.FutureDemo03.main(FutureDemo03.java:28)
Process finished with exit code 1
2
3
4
5
6
7
8
9
# 轮询耗费cpu(isDone)
提示
- 轮询的方式会耗费无谓的CPU资源,二期也见不得能及时获取计算结果。
- 如果想要异步获取结果,通常都会以轮询的方式获取结果,尽量不要阻塞。
public static void main(String[] args) {
try {
FutureTask<String> futureTask = new FutureTask<String>(() -> {
System.out.println(Thread.currentThread().getName() + " ------ come in ------ ");
TimeUnit.SECONDS.sleep(5);
return "task1 over";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(Thread.currentThread().getName() + " ------ 忙其他任务去了 ------ ");
// isDone 轮询获取是否完成
while (true){
if (futureTask.isDone()){
System.out.println(futureTask.get());
break;
}else {
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("正在处理中,不要在催了,越催越慢,再催熄火");
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
结果:
main ------ 忙其他任务去了 ------
t1 ------ come in ------
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
正在处理中,不要在催了,越催越慢,再催熄火
task1 over
Process finished with exit code 0
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 总结
Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
# 1.5 应用场景
简单业务
- 对于简单的业务场景使用Future完全OK
回调通知
- 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
- 通过轮询的方式去判断任务是否完成这样
非常占CPU
并且代码也不优雅
创建异步任务
- Future+线程池提升性能
多个任务前后依赖可以组合处理
- 想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
- 将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果
对计算速度选择最快
# 2. CompletableFuture
# 2.1 CompletableFuture概述
为什么出现CompletableFuture?
get
方法在Future 计算完成之前会一直处在阻塞状态下,isDone
方法容易耗费CPU资源。对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出
CompletableFuture
。CompletableFuture
提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
CompletableFuture源码 (opens new window) java.util.concurrent
CompletableFuture简介
- 在Java8中,Completa leFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
- 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ) ,它支持在计算完成以后触发一些团数或执行某些动作。
- 它实现了Future和CompletionStage接口
CompletionStage简介
CompletionStage
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段一个阶段的计算执行可以是一个
Function
,Consumer
或者Runnable
比如: stage.thenApply(x -> square(x).thenAccept(x > System.out.print(x)).thenRun(() -> System.out.println0))
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
# 2.2 CompletableFuture (opens new window)创建 😈
提示
不推荐使用空参构造的形式创建 CompletableFuture
# 无返回值
# 返回一个新的CompletableFuture,它在运行给定操作后由运行在 ForkJoinPool.commonPool()中的任务 异步完成。
public static CompletableFuture<Void> runAsync(Runnable runnable)
# 返回一个新的CompletableFuture,它在运行给定操作之后由在给定执行程序中运行的任务异步完成。
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
2
3
4
public class CompletableFutureDemo01 {
/**
* 创建 CompletableFuture 无返回值
* <p>
* @param args
*/
public static void main(String[] args) {
try {
// 无返回值
// public static CompletableFuture<Void> runAsync(Runnable runnable)
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
System.out.println("无返回值-使用默认线程池: " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("无返回值-使用默认线程池: " + completableFuture1.get());
// public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 创建线程池
ExecutorService threadPool = new ThreadPoolExecutor( // 根据阿里巴巴代码规范 线程池此处使用ThreadPoolExecutor 不使用Executors创建
3, // 线程池的核心线程数
5, // 能容纳的最大线程数
2L, // 空闲线程存活时间
TimeUnit.SECONDS, // 存活的时间单位
new ArrayBlockingQueue<>(3), // 存放提交但未执行任务的队列
Executors.defaultThreadFactory(), // 创建线程的工厂类
new ThreadPoolExecutor.AbortPolicy() // 等待队列满后的拒绝策略
);
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
System.out.println("无返回值-使用自定义线程池: " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, threadPool);
System.out.println("无返回值-使用自定义线程池: " + completableFuture2.get());
// 关闭线程池
threadPool.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
结果:
无返回值-使用默认线程池: ForkJoinPool.commonPool-worker-9
无返回值-使用默认线程池: null
无返回值-使用自定义线程池: pool-1-thread-1
无返回值-使用自定义线程池: null
2
3
4
# 有返回值 😈
# 返回一个新的CompletableFuture,通过运行在 ForkJoinPool.commonPool()中的任务与通过调用给定的供应商获得的值 异步完成。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
# 返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
2
3
4
public class CompletableFutureDemo02 {
/**
* 创建 CompletableFuture 有返回值
* <p>
* public static CompletableFuture<Void> runAsync(Runnable runnable) # 返回一个新的CompletableFuture,它在运行给定操作后由运行在 ForkJoinPool.commonPool()中的任务 异步完成。
* public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) # 返回一个新的CompletableFuture,它在运行给定操作之后由在给定执行程序中运行的任务异步完成。
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) # 返回一个新的CompletableFuture,通过运行在 ForkJoinPool.commonPool()中的任务与通过调用给定的供应商获得的值 异步完成。
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) # 返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值。
*
* @param args
*/
public static void main(String[] args) {
try {
// 无返回值
// public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("有返回值-使用默认线程池: " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "cnlxc.cn - 使用默认线程池";
});
System.out.println("有返回值-使用默认线程池: " + completableFuture1.get());
//public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
// 创建线程池
ExecutorService threadPool = new ThreadPoolExecutor( // 根据阿里巴巴代码规范 线程池此处使用ThreadPoolExecutor 不使用Executors创建
3, // 线程池的核心线程数
5, // 能容纳的最大线程数
2L, // 空闲线程存活时间
TimeUnit.SECONDS, // 存活的时间单位
new ArrayBlockingQueue<>(3), // 存放提交但未执行任务的队列
Executors.defaultThreadFactory(), // 创建线程的工厂类
new ThreadPoolExecutor.AbortPolicy() // 等待队列满后的拒绝策略
);
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("有返回值-使用自定义线程池: " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "cnlxc.cn - 使用自定义线程池";
}, threadPool);
System.out.println("有返回值-使用自定义线程池: " + completableFuture2.get());
// 关闭线程池
threadPool.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
结果:
有返回值-使用默认线程池: ForkJoinPool.commonPool-worker-9
有返回值-使用默认线程池: cnlxc.cn - 使用默认线程池
有返回值-使用自定义线程池: pool-1-thread-1
有返回值-使用自定义线程池: cnlxc.cn - 使用自定义线程池
2
3
4
# Executor说明
若果没有执行Executor的方法,直接使用默认的
ForkJoinPoll.commonPool()
作为它的线程池执行异步代码。如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码。
# 2.3 减少阻塞和轮询 😄
从Java8开始引入了CompletableFuture
,它是Future的功能增强版,减少阻塞和轮询
可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法.
# 示例一: 使用get
获取结果
public class CompletableFutureDemo03 {
public static void main(String[] args) {
try {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " ------ come in ------");
int result = ThreadLocalRandom.current().nextInt();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("------ 1秒钟后出结果 ------ ");
return result;
});
System.out.println(Thread.currentThread().getName()+ "线程先去忙其他任务");
System.out.println(completableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
结果:
main线程先去忙其他任务
ForkJoinPool.commonPool-worker-9 ------ come in ------
------ 1秒钟后出结果 ------
0
2
3
4
# 示例二: whenComplete替代get 😈
whenComplete (opens new window)
exceptionally (opens new window)
public class CompletableFutureDemo03 {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " ------ come in ------");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("------ 1秒钟后出结果 ------ " + result);
return result;
}).whenComplete((v, e) -> { // 无异常获取数据
if (e == null) {
System.out.println("------ 计算完成,更新系统updateValue:" + v);
}
}).exceptionally(e -> { // 有异常处理
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
结果:
ForkJoinPool.commonPool-worker-9 ------ come in ------
main线程先去忙其他任务
2
mian线程执行完后,会关闭其他线程。看到结果没有打印出来:改进
// 主线程不要立刻结束,否completableFuture默认使用的线程池会立刻关闭: 暂停3秒钟线程 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
1
2
3
4
5
6
改进后代码:
public class CompletableFutureDemo03 {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " ------ come in ------");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("------ 1秒钟后出结果 ------ " + result);
return result;
}).whenComplete((v, e) -> { // 无异常获取数据
if (e == null) {
System.out.println("------ 计算完成,更新系统updateValue:" + v);
}
}).exceptionally(e -> { // 有异常处理
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
// 主线程不要立刻结束,否completableFuture默认使用的线程池会立刻关闭: 暂停3秒钟线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
结果:
ForkJoinPool.commonPool-worker-9 ------ come in ------
main线程先去忙其他任务
------ 1秒钟后出结果 ------ 4
------ 计算完成,更新系统updateValue:4
2
3
4
应用场景:异步处理,可以在supplyAsync
处理一件事情的时候,通过whenComplete
去执行另一件事情
# 示例三:使用线程池改进 😄
改进示例二:不适用睡眠方式获取结果
whenComplete、exceptionally与try/catch/finally
public class CompletableFutureDemo03 {
public static void main(String[] args) {
// 创建线程池
ExecutorService threadPool = new ThreadPoolExecutor( // 根据阿里巴巴代码规范 线程池此处使用ThreadPoolExecutor 不使用Executors创建
3, // 线程池的核心线程数
5, // 能容纳的最大线程数
2L, // 空闲线程存活时间
TimeUnit.SECONDS, // 存活的时间单位
new ArrayBlockingQueue<>(3), // 存放提交但未执行任务的队列
Executors.defaultThreadFactory(), // 创建线程的工厂类
new ThreadPoolExecutor.AbortPolicy() // 等待队列满后的拒绝策略
);
try {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " ------ come in ------");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("------ 1秒钟后出结果 ------ " + result);
// 制造异常进入 exceptionally
if (result > 5) {
int i = 10 / 0;
}
return result;
}, threadPool).whenComplete((v, e) -> { // 无异常获取数据
if (e == null) {
System.out.println("------ 计算完成,更新系统updateValue:" + v);
}
}).exceptionally(e -> { // 有异常处理
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
正常进入whenComplete
结果:
pool-1-thread-1 ------ come in ------
main线程先去忙其他任务
------ 1秒钟后出结果 ------ 0
------ 计算完成,更新系统updateValue:0
2
3
4
异常进入exceptionally
结果:
pool-1-thread-1 ------ come in ------
main线程先去忙其他任务
------ 1秒钟后出结果 ------ 7
异常情况:java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArithmeticException: / by zero
at cn.cnlxc.completableFuture.CompletableFutureDemo03.lambda$main$0(CompletableFutureDemo03.java:34)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 2.4 优点
- 异步任务结束时,会自动回调某个对象的方法;【
whenComplete
方法】 - 主线程设置好回调后,不在关心异步任务的执行,异步任务之间可以顺序执行;【解决示例一】
- 异步任务出错时,会自动回调某个对象的方法。【异常处理
exceptionally
方法】
# 3. 链式语法和join方法
大厂面试题
点击查看
你怎么理解java多线程的?怎么处理并发?线程池有那几个核心参数?.2
Java加锁有哪几种锁? 我先说了syn,syn刚讲到偏向锁,他就不让我讲了,太自信了~
简单说说lock?
hashmap的实现原理? hash冲突怎么解决? 为什么使用红黑树?
spring里面都使用了那些设计模式? 循环依赖怎么解决?
项目中那个地方用了countdownlanch,怎么使用的?
JVM项目了解过吗? 说说都有什么?栈里面都放什么东西?
都用redis来做什么? aof和rdb都什么做持久化缓存的?
myaql的锁机制? mysql的索引是怎么实现的?
spring实现事务的几种方式?
zookeeper怎么实现分布式锁?
java8函数式编程用过吗?
算法:求链表倒数第K个元素?
Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程
# 函数式接口
Runnable:无参数,无返回值
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
2
3
4
Function<T,V>:接收一个参数,并且有返回值
@FunctionalInterface
public interface Function<T, R> {
// 接收一个参数,并且有返回值
R apply(T t);
default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}
default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t) -> after.apply(apply(t));
}
static <T> Function<T, T> identity() {
return t -> t;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Consumer:接收一个参数,没有返回值
@FunctionalInterface
public interface Consumer<T> {
// 接收一个参数,没有返回值
void accept(T t);
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}
2
3
4
5
6
7
8
9
10
11
BiConsumer:接收两个参数,没有返回值
@FunctionalInterface
public interface BiConsumer<T, U> {
// 接收两个参数,没有返回值
void accept(T t, U u);
default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
Objects.requireNonNull(after);
return (l, r) -> {
accept(l, r);
after.accept(l, r);
};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CompletableFuture: whenComplete中使用。
Supplier:没有参数,有一个返回值
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
2
3
4
5
6
7
8
9
10
# chain链式编程 😄
参考博客:什么是链式编程 (opens new window)
Lombok:添加 @Accessors(chain = true)
开启链式编程
点击查看
/**
* Chain链式编程
*/
public class ChainDemo01 {
/**
* Chain链式编程
*
* @param args
*/
public static void main(String[] args) {
Student student = new Student();
// 普通形式
student.setId(1);
student.setStudentName("cnlxc");
student.setMajor("cn");
// Chain 链式编程
student.setId(2).setStudentName("com").setMajor("cx");
}
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Accessors(chain = true) // 开启链式编程
class Student {
private Integer id;
private String studentName;
private String major;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# join方法 😄
CompletableFuture使用
join
方法替换get
方法;原因:get方法必须抛出异常。
public class CompletableFutureDemo04 {
/**
* CompletableFuture join方法和get方法
* @param args
*/
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
return "cnlxc.cn";
});
// get方法
try {
System.out.println(completableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
// join方法
System.out.println(completableFuture.join());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 4. 案例
示例:电商比价
# 需求分析
功能=》性能
需求说明
- 同一款产品,同时搜索出同款产品在各大电商平台的售价;
- 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
输出返回: 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个
List<String>
《mysql》 in jd price is 88.05
《mysql》 in dangdang price is 86.11
《mysal》 in taobao price is 90.43
解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表
- step by step,按部就班,查完京东查淘宝,查完淘宝查天猫......
- all in,万箭齐发,一口气多线程异步任务同时查询。。。。。
# 普通方式实现
单个线程执行;价格:查完京东查淘宝,查完淘宝查天猫......
点击查看
public class CompletableFutureMallDemo01 {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> bookList = getPrice(list, "mysql");
for (String price : bookList) {
System.out.println(price);
}
long endTime = System.currentTimeMillis();
System.out.println("----花费时间:"+ (endTime -startTime) + "毫秒");
}
/**
* 比价的商城列表
*/
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("dangdang"),
new NetMall("tuling"),
new NetMall("taobao")
);
/**
* 获取各个商城价格
* @param list 比价的商城列表
* @param productName 商品名称
* @return
*/
public static List<String> getPrice(List<NetMall> list, String productName) {
// 《mysql》 in dangdang price is 86.11
return list.stream()
.map(netMall ->
String.format("《" + productName + "》 in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
}
/**
* 商城实体
*/
class NetMall {
@Getter
private String netMallName;
public NetMall(String netMallName) {
this.netMallName = netMallName;
}
/**
* 计算价格
*
* @param productName 产品名称
* @return
*/
public BigDecimal calcPrice(String productName) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return BigDecimal.valueOf(ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
结果:
《mysql》 in jd price is 109.88
《mysql》 in dangdang price is 110.55
《mysql》 in tuling price is 109.85
《mysql》 in taobao price is 109.21
花费时间4067毫秒
2
3
4
5
# 异步方式实现 😈
多个线程同时执行:万箭齐发,一口气多线程异步任务同时查询。。。。。
public class CompletableFutureMallDemo02 {
/**
* 1. CompletableFuture异步形式
* 2. 函数式编程、连式编程、Stream历史计算
*/
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> bookList = getPriceByCompletableFuture(list, "mysql");
for (String price : bookList) {
System.out.println(price);
}
long endTime = System.currentTimeMillis();
System.out.println("----花费时间:" + (endTime - startTime) + "毫秒");
}
/**
* 比价的商城列表
*/
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("dangdang"),
new NetMall("tuling"),
new NetMall("taobao")
);
/**
* 通过CompletableFuture异步方式获取各个商城价格
*
* @param list
* @param productName
* @return
*/
public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
// 创建线程池
ExecutorService threadPool = new ThreadPoolExecutor( // 根据阿里巴巴代码规范 线程池此处使用ThreadPoolExecutor 不使用Executors创建
5, // 线程池的核心线程数
10, // 能容纳的最大线程数
2L, // 空闲线程存活时间
TimeUnit.SECONDS, // 存活的时间单位
new ArrayBlockingQueue<>(5), // 存放提交但未执行任务的队列
Executors.defaultThreadFactory(), // 创建线程的工厂类
new ThreadPoolExecutor.AbortPolicy() // 等待队列满后的拒绝策略
);
// 此处不适用ArrayList避免线程不安全问题,使用CopyOnWriteArrayList
List<String> resultList = new CopyOnWriteArrayList<>();
try {
List<String> priceList = list.stream()
.map(netMall -> CompletableFuture.supplyAsync(() ->
// 《mysql》 in dangdang price is 86.11
String.format("《" + productName + "》 in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName))
, threadPool))
// 此处 collect(Collectors.toList()) 返回的是List<CompletableFuture<String>> 需要转换为List<String>
.collect(Collectors.toList())
.stream()
// CompletableFuture 的 join 方法同 get方法一样,
.map(CompletableFuture::join)
.collect(Collectors.toList());
resultList.addAll(priceList);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放线程资源
threadPool.shutdown();
}
return resultList;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
结果:
点击查看
《mysql》 in jd price is 109.06
《mysql》 in dangdang price is 110.72
《mysql》 in tuling price is 109.88
《mysql》 in taobao price is 110.43
----花费时间:1065毫秒
2
3
4
5
注意点:
- 此处使用的自定义的线程池形式创建
CompletableFuture
, 需要设置线程池的执行线线程数量(决定花费时间快慢)。- 使用两次Stream流的方式输出结果:解决第一次Stream流输出的是
List<CompletableFuture<String>>
形式. 转换为List<String>
。- 此处使用
CompletableFuture
的join方法
获取结果。- 一定要记得自定义线程池要释放资源。
# 5. CompletableFuture (opens new window)常用方法
CompletableFuture实现Future、CompletionStage,大部分方法来自对CompletionStage
的实现
常用方法需要自己多用。多了解。
# 获取结果和触发计算
# 对计算结果进行处理
whenComplete、exceptionally与try/catch/finally
# 对计算结果进行消费
方法 | 概述含义 |
---|---|
thenAccept(Consumer<? super T> action) | 消费型函数式接口:接收任务的处理结果,并消费处理,无返回结果 |
代码实现:
点击查看
/**
* 对计算结果进行消费
*
* @author cnlxc
* @date 2023/4/1 18:45
*/
public class CompletableFutureDemo07 {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply((f) -> {
return f + 2;
}).thenApply((f) -> {
return f + 3;
}).thenAccept(System.out::println);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
结果:
6
# code任务之间的顺序执行 😄
方法 | 概述含义 |
---|---|
thenRun(Runnable action) | 任务A执行完执行 B,并且B 不需要A的结果 ,无返回值 |
thenAccept(Consumer<? super T> action) | 任务A执行完执行 B,B 需要 A的结果 ,但是任务 B 无返回值 |
thenApply(Function<? super T,? extends U> fn) | 任务 A执行完执行 B,B 需要 A的结果 ,同时任务 B 有返回值 |
代码实现:
点击查看
public class CompletableFutureDemo07 {
public static void main(String[] args) {
// thenRun(Runnable action)
System.out.println(CompletableFuture.supplyAsync(()-> "resultA").thenRun(()->{}).join());
// thenAccept(Consumer<? super T> action)
System.out.println(CompletableFuture.supplyAsync(()-> "resultA").thenAccept(System.out::println).join()); // 消费型函数式接口
// thenApply(Function<? super T,? extends U> fn)
System.out.println(CompletableFuture.supplyAsync(()-> "resultA").thenApply(f -> f + "resulBb").join());
}
}
2
3
4
5
6
7
8
9
10
11
12
结果:
null
resultA
null
resultAresulBb
2
3
4
# 线程池运行选择 😈
以thenRun
和thenRunAsync
为例:
代码实现:
点击查看
public class CompletableFutureDemo08 {
/**
* 线程池运行选择 以`thenRun`和`thenRunAsync`为例:
*
* @return void
* @author cnlxc
* @date 2023/4/1 19:54
*/
public static void main(String[] args) {
// 创建线程池
ExecutorService threadPool = new ThreadPoolExecutor( // 根据阿里巴巴代码规范 线程池此处使用ThreadPoolExecutor 不使用Executors创建
5, // 线程池的核心线程数
8, // 能容纳的最大线程数
2L, // 空闲线程存活时间
TimeUnit.SECONDS, // 存活的时间单位
new ArrayBlockingQueue<>(3), // 存放提交但未执行任务的队列
Executors.defaultThreadFactory(), // 创建线程的工厂类
new ThreadPoolExecutor.AbortPolicy() // 等待队列满后的拒绝策略
);
try {
// 1. 都只使用默认线程池, 调用thenRun: 使用的线程池都是默认的ForkJoinPool
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("111 " + Thread.currentThread().getName());
return "1";
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("222 " + Thread.currentThread().getName());
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("333 " + Thread.currentThread().getName());
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("444 " + Thread.currentThread().getName());
});
System.out.println("========= 1 ============");
System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
/*
111 ForkJoinPool.commonPool-worker-9
222 ForkJoinPool.commonPool-worker-9
333 ForkJoinPool.commonPool-worker-9
444 ForkJoinPool.commonPool-worker-9
null
*/
// 2. 都只使用默认线程池, 调用thenRunAsync: 使用的线程池都是默认的ForkJoinPool
CompletableFuture<Void> completableFuture2 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("111 " + Thread.currentThread().getName());
return "1";
}).thenRunAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("222 " + Thread.currentThread().getName());
}).thenRunAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("333 " + Thread.currentThread().getName());
}).thenRunAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("444 " + Thread.currentThread().getName());
});
System.out.println("========= 2 ============");
System.out.println(completableFuture2.get(2L, TimeUnit.SECONDS));
/*
111 ForkJoinPool.commonPool-worker-9
222 ForkJoinPool.commonPool-worker-9
333 ForkJoinPool.commonPool-worker-9
444 ForkJoinPool.commonPool-worker-9
null
*/
// 3. 都只使用自定义线程池, 调用thenRun: 使用的线程池都是自定义线程池pool
CompletableFuture<Void> completableFuture3 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("111 " + Thread.currentThread().getName());
return "1";
}, threadPool).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("222 " + Thread.currentThread().getName());
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("333 " + Thread.currentThread().getName());
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("444 " + Thread.currentThread().getName());
});
System.out.println("========= 3 ============");
System.out.println(completableFuture3.get(2L, TimeUnit.SECONDS));
/*
111 pool-1-thread-1
222 pool-1-thread-1
333 pool-1-thread-1
444 pool-1-thread-1
null
*/
// 4. 都只使用自定义线程池, 调用thenRunAsync、thenRun:
CompletableFuture<Void> completableFuture4 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("111 " + Thread.currentThread().getName());
return "1";
}, threadPool).thenRunAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("222 " + Thread.currentThread().getName());
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("333 " + Thread.currentThread().getName());
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (Exception e) {e.printStackTrace();}
System.out.println("444 " + Thread.currentThread().getName());
});
System.out.println("========= 4 ============");
System.out.println(completableFuture4.get(2L, TimeUnit.SECONDS));
/*
111 pool-1-thread-2
222 ForkJoinPool.commonPool-worker-9
333 ForkJoinPool.commonPool-worker-9
444 ForkJoinPool.commonPool-worker-9
null
*/
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
总结:
- 没有传入自定义线程池,都用默认线程池
ForkJoinPool
. - 传入了一个自定义线程池,如果你执行第一个任务的时候,传入了一个自定义线程池:
- 调用
thenRun
方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。 - 调用
thenRunAsync
执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
- 调用
- 有可能处理太快,系统优化切换原则,直接使用main线程处理
其它如:
thenAccept
和thenAcceptAsync
,thenApply
和thenApplyAsync
等,它们之间的区别也是同理
# 对计算速度进行选用
方法 | 概述含义 |
---|---|
applyToEither | 比较两个谁先实现 |
代码实现:
点击查看
public class CompletableFutureDemo09 {
public static void main(String[] args) {
// applyToEither
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("A come in");
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
return "A";
});
CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> {
System.out.println("B come in");
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}
return "B";
});
// applyToEither比较两个结果谁快,谁先执行
CompletableFuture<String> result = a.applyToEither(b, f -> {
return f + " is winer";
});
System.out.println(Thread.currentThread().getName() + "\t" + result.join());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
结果:
A come in
B come in
main A is winer
2
3
# 对计算结果进行合并
两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理;先完成的先等着,等待其它分支任务
方法 | 概述含义 |
---|---|
thenCombine | 比较两个谁先实现 |
标准版代码实现:
点击查看
public class CompletableFutureDemo10 {
public static void main(String[] args) {
// thenCombine
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t --- 启动");
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t --- 启动");
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
return 20;
});
// thenCombine 合并两个结果
CompletableFuture<Integer> result = a.thenCombine(b, (x, y) -> {
System.out.println("-- 合并两个结果");
return x + y;
});
System.out.println(result.join());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
结果:
ForkJoinPool.commonPool-worker-9 --- 启动
ForkJoinPool.commonPool-worker-2 --- 启动
-- 合并两个结果
30
2
3
4
thenCombine
合并代码实现:
点击查看
public class CompletableFutureDemo10 {
public static void main(String[] args) {
// thenCombine
CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t --- 1");
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t --- 2");
return 20;
}), (x, y) -> {
System.out.println(Thread.currentThread().getName() + "\t --- 3");
return x + y;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t --- 4");
return 30;
}), (a, b) -> {
System.out.println(Thread.currentThread().getName() + "\t --- 5");
return a + b;
});
System.out.println("-------主线程结束,END");
System.out.println(thenCombineResult.join());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
结果:
ForkJoinPool.commonPool-worker-9 --- 1
ForkJoinPool.commonPool-worker-2 --- 2
main --- 3
ForkJoinPool.commonPool-worker-2 --- 4
main --- 5
-------主线程结束,END
合并结果:60
2
3
4
5
6
7