CompletableFuture

简介

异步计算

所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。

JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

以前我们获取一个异步任务的结果可能是这样写的:

Future 接口的局限性

Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

  • 将多个异步计算的结果合并成一个
  • 等待Future集合中的所有任务都完成
  • Future完成事件(即,任务完成以后触发执行动作)

CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

CompletableFuture

  • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法
  • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了Future和CompletionStage接口

实例代码

基本的CompletableFuter使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class BaseComFuture {

public static void main(String[] args) throws InterruptedException, ExecutionException {


long start = System.nanoTime();
//
// System.out.println("get start");
// TimeUnit.SECONDS.sleep(3);
// TimeUnit.SECONDS.sleep(3);
// TimeUnit.SECONDS.sleep(3);
// System.out.println(456);
//
// System.out.println(System.nanoTime()-start);

/*
* 新建一个CompletableFuture对象
*/
start = System.nanoTime();
CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(()->{
try {
System.out.println("get start1");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}

return "Hello CompletableFuture";
});
// CompletableFuture<String> resultCompletableFuture1 = CompletableFuture.supplyAsync(()->{
// try {
// System.out.println("get start2");
// TimeUnit.SECONDS.sleep(3);
// System.out.println(Thread.currentThread().getName());
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// return "Hello CompletableFuture";
// });
// CompletableFuture<String> resultCompletableFuture2 = CompletableFuture.supplyAsync(()->{
// try {
// System.out.println("get start3");
// TimeUnit.SECONDS.sleep(5);
// System.out.println(Thread.currentThread().getName());
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// return "Hello CompletableFuture";
// });
TimeUnit.SECONDS.sleep(1);
System.out.println(123);
System.out.println(resultCompletableFuture.get());
System.out.println("aaaaaaa");
// System.out.println(resultCompletableFuture1.get());
System.out.println("aaaaaaa");
// System.out.println(resultCompletableFuture2.get());
System.out.println("aaaaaaa");
System.out.println(456);
System.out.println(System.nanoTime()-start);
/***
* ps : 首先会进入该进程,执行get方法 //所以输出结果中立马会输出get start 然后,sleep模拟长时间计算操作/或者其他情况的阻塞
* //所以这个时候屏幕会等三秒,一直停在那里 3秒后才会打印 // 三秒后,才会开始打印相关信息,按照顺序执行
* (不要被try语句迷惑了啊,那就是一个一瞬执行下来的逻辑) 打印完后该进程退出!
*
* 如果我用resultCompletableFuture的回调函数去处理这个会有什么现象呢? 请看BaseComFutureCallback类中的演示
*/

/**
* 上面那个打印语句会阻塞3秒,执行完后 ,才会执行这一句
*/

}

}

回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BaseComFutureCallback {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
/*
* 新建一个CompletableFuture对象
*/
CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(()->{
try {
System.out.println("get start,will sleep 3s");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}

return "Hello CompletableFuture";
}, executor);

CompletableFuture<String> resultCompletableFuture2 = CompletableFuture.supplyAsync(()->{
try {
System.out.println("get start,will sleep 3s");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}

return "Hello CompletableFuture";
}, executor);

/**
* ps: 回调函数,顾名思义 那就是调用的对象中声明的方法正确执行完后,就会调用这个方法。
* 其中,accept中的参数就是get函数返回的参数
*/
System.out.println(resultCompletableFuture.thenAccept((t)->{
System.out.println("进入回调函数-" + t);
System.out.println(Thread.currentThread().getName());
}));
System.out.println(resultCompletableFuture2.thenAccept((t)->{
System.out.println("进入回调函数-" + t);
System.out.println(Thread.currentThread().getName());
}));

System.out.println("带有回调的print语句后面一句话");
System.out.println("");

/**
* it will shutdown 10's later
*/
System.out.println("it will shutdown 10's later");
TimeUnit.SECONDS.sleep(10);

executor.shutdown();
System.out.println("shutdown");
}
}
/**
* ps: 两点
* 1.这个时候你会看到 ,在带有回调的print语句后的一个print会先打印(因为那个回调在等着get函数里面sleep3秒钟),
* 但是,不会阻塞着等,而是会执行下面的语句,然后,上面的语句执行完后,再来执行这个带有回调的print
* 2.当打印线程的时候,我们可以看到,回调与get是统一个线程!
* 因为我们用的thenAccept去做回调,
* 那么,当我们尝试用带Async的回调方法去回调试试看
* 详见BaseComFutureCallbackAsync
*
*
* 补充:
* 这里与上一个不同,因为这个地方加了个executor,为什么?
* 由于回调,resultCompletableFuture中的get不知道会阻塞到什么时候
* (虽然现在写死的是3秒,但是生产环境中 ,哪个能保证3秒?)
* 但是,这个地方他又不会像上一个程序那样一顺执行,非得等阻塞完成再执行下一句
* 而是马上执行下面的print,当main中所有语句顺序执行完了之后,这个main线程就关闭了,
* 所以,你的回调永远不会执行,因为主线程都down掉了。你咋回来。(可惜啊,main不会等,等了不还是阻塞)
*
* 所以,这里我先起一个executor,用来开一个线程池,如果不显式关闭,
* 他就会一直挂在那,既然一直挂在那里,我显然,会等到我回调的那天(get必然会返回值,只是时间长短啊)
*
* 所以,哪怕你main语句执行完了,但是我只要不关闭这个线程池,你的main就一直跟我挂在那,因为还有一句话没有执行完嘛.
*
* BaseComFutureCallback2中对这个的演示会更明显
*
*/

异常通知的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class BaseComFutureExceptionally2 {

public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
/*
* 新建一个CompletableFuture对象
*/
CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
System.out.println("get start,will sleep 3s");
TimeUnit.SECONDS.sleep(3);
throw new RuntimeException("错误");
// System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}

return "Hello CompletableFuture";
}
}, executor);

System.out.println(resultCompletableFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String t) {
System.out.println("进入回调函数-" + t);
throw new RuntimeException("aaaaa");
// System.out.println(Thread.currentThread().getName());
}

}).exceptionally(new Function<Throwable, Void>() {

/**
* ps
* 当出现异常的的时候,会执行这个function
* 这里就是对异常进行一些处理
*/
@Override
public Void apply(Throwable t) {
System.out.println(t.getMessage());
return null;
}
}));

/**
* ps: 我们返现,其余的流程都是一样的,就是回调函数不再执行了!
* 任务还是会执行完成(只要主线程等待足够时间再结束,或者不结束 )
* 但是,我们的回调函数,一般应用中肯定是希望执行的(既然是回调,我肯定希望在目标方法执行完后,进行一些处理工作),哪
* 怕是你报错,我也要指导你报错了,你完全不执行,显然是不合理的
*
* 但是我们实际应用中坑定不是通过
* resultCompletableFuture.completeExceptionally(new Exception("error"));
* 这个方法来抛出异常,肯定是在get中执行的时候出现异常然后抛出
* 这种情况详见BaseComFutureExceptionally2
*/
resultCompletableFuture.completeExceptionally(new RuntimeException("error"));

System.out.println("it will shutdown 10's later");
TimeUnit.SECONDS.sleep(10);

executor.shutdown();

System.out.println("time out ,main shutdown now");
}

}
/**
* 我们往往不仅需要通过回调的方式,让线程不阻塞
* 我们还需要将这些回调操作串起来
* 接着请看下面的转换
* 需要用到apply函数组
*
*
* public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
* public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
* public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
* 当原来的CompletableFuture计算完后,
* 将结果传递给函数fn,
* 将fn的结果作为新的CompletableFuture的参数去参与运算。
* 因此它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>
*
* 持续这么循环的调下去,有点像递归,当然,递归是本函数的持续调用(直到递归条件不满足)
*
*
*/

我们不仅仅回调,还可以将这些回调 操作串起来

需要用到apply函数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.util.concurrent.*;
import java.util.function.Function;

public class ConvertComFuture {

public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
// throw new RuntimeException("错误");
System.out.println("aaaaa");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "zero";
}, executor);

CompletableFuture<Integer> f2 = f1.thenApply(new Function<String, Integer>() {

@Override
public Integer apply(String t) {
System.out.println("进入f2的apply方法");
System.out.println("f1传进来的字符串-"+t);
System.out.println("返回该字符串的长度-"+Integer.valueOf(t.length()));

try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.valueOf(t.length());
}
}).exceptionally(new Function<Throwable, Integer>() {

@Override
public Integer apply(Throwable t) {
System.out.println(t.getMessage());
return null;
}

});
System.out.println("bbbbb");

/**
* 此处,apply的是f1
*/

CompletableFuture<Double> f3 = f2.thenApply(self -> self * 2.0).thenApply(self -> self*3).exceptionally(new Function<Throwable, Double>() {
@Override
public Double apply(Throwable t) {
System.out.println(t.getMessage());
return null;
}
});//这个参数名字随便取,叫self最合适,因为他本来就是把f2自身的结果带到f3中去,参与f3的运算
/**
* 此处,apply的是f2
*
* 这个地方的self,不严格的说,就是一个的代名词,就是代表了f2的返回值
*
* 如果想要看的清楚一点,可以像f1一样显式声明一个function匿名对象,覆盖apply方法,然后写逻辑
* 其中,方括号前面的参数是传进来的参数类型,后面的参数类型是返回类型
*
* 不过,我们一般采用f2的方式,更简洁一点
*
* 这里需要注意的是,f3获得最终结果还真不会马上执行,也不会导致主进程阻塞
* 而是等着这里面所有的“回调”阶段一个接一个的完成后,再显示出来(或者说再进入f3的执行执行逻辑)。
*
* 但是,这个方法一旦有异常,就会抛出
*/
//System.out.println(f3.get());


System.out.println("shutdown in 3s");
TimeUnit.SECONDS.sleep(3);

System.out.println("shutdown");
executor.shutdown();
}

}
Jeff-Eric wechat