所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果
java5之前
在java5之前,主要通过 Thread
或者实现 Runnable
来创建线程,可以通过 Thread
的一些方法来控制线程
1 | public class ThreadDemo { |
java5之前实现的局限性:
- 缺少线程管理的原生支持(没有线程池)
- 缺少”锁”的api(缺少Lock这样的api)
- 缺少执行完成的原生支持
- 执行结果获取困难
java5
线程池
java5增加了线程池,由 Doug Lea
编写
1 | public class ExecutorDemo { |
Future
增加了 Future
,提供了可以获取执行结果的方法(Callable是有返回值操作,相对于Runnable)
1 | public class FutureDemo { |
Future的限制:
- 无法手动完成
- 阻塞式结果返回 future.get()
- 无法链式调用多个Future,从 ExecutorService#invokeAll 方法中只能返回Future的集合
- 无法合并多个Future的结果,从 ExecutorService#invokeAll 方法中只能返回Future的集合
java7
Fork/Join
ForkJoin是Java7提供的原生多线程并行处理框架,其基本思想是将大人物分割成小任务,最后将小任务聚合起来得到结果。
它非常类似于HADOOP提供的MapReduce框架,只是MapReduce的任务可以针对集群内的所有计算节点,可以充分利用集群的能力完成计算任务。ForkJoin更加类似于单机版的MapReduce
Fork/Join使用两个类完成以上两件事情:
ForkJoinTask:
我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。
- RecursiveAction,用于没有返回结果的任务
- RecursiveTask,用于有返回值的任务
ForkJoinPool:
task要通过ForkJoinPool来执行,分割的子任务也会添加到当前工作线程的双端队列中,进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务。
计算整数之和 DEMO:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65public class ForkJoinDemo {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
LongAccumulator accumulator = new LongAccumulator(((left, right) -> left + right), 0);
List<Long> params = new ArrayList<>();
for (long i = 0; i < 10000000; i++) {
params.add(i);
}
long start = System.currentTimeMillis();
forkJoinPool.invoke(new LongSumTask(params, accumulator));
long end = System.currentTimeMillis();
System.out.println(accumulator.get());
System.out.printf("消耗时间:%d %s\n", end - start, "ms");
forkJoinPool.shutdown();
}
static class LongSumTask extends RecursiveAction {
private final List<Long> elements;
private final LongAccumulator accumulator;
LongSumTask(List<Long> elements, LongAccumulator accumulator) {
this.elements = elements;
this.accumulator = accumulator;
}
public void compute() {
int size = elements.size();
int parts = size / 2;
// 使用简单的二分法,将计算平分,当元素只有一个的时候使用 LongAccumulator 进行累加计算
if (size > 1) {
List<Long> left = elements.subList(0, parts);
List<Long> right = elements.subList(parts, size);
new LongSumTask(left, accumulator).fork().join();
new LongSumTask(right, accumulator).fork().join();
} else {
if (elements.isEmpty()) {
return;
}
Long num = elements.get(0);
accumulator.accumulate(num);
}
}
}
}
java8
CompletableFuture
在Java8中,CompletableFuture
提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture
的方法
CompletableFuture
实现了 Future
和 CompletionStage
1
2
3public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
// ...
}
相关的操作可以查看官方API或者相关博客
DEMO:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 完成操作(可以被其它线程去做)
// CompletableFuture<String> completableFuture = new CompletableFuture<>();
// completableFuture.complete("Hello World");
// String v = completableFuture.get();
// System.out.println(v);
// 2. runAsync 异步执行,阻塞操作
// CompletableFuture asyncCompletableFuture = CompletableFuture.runAsync(() -> {
// System.out.printf("[Thread : %s]Hello World...\n", Thread.currentThread().getName());
// });
//
// // 这里仍然是阻塞的
// asyncCompletableFuture.get();
//
// System.out.println("Starting...");
// 3. supplyAsync 异步执行,阻塞操作
// CompletableFuture<String> asyncCompletableFuture = CompletableFuture.supplyAsync(() -> {
// // 获取数据操作,比如来自于数据库
// return String.format("[Thread : %s]Hello World...\n", Thread.currentThread().getName());
// });
//
// String v = asyncCompletableFuture.get();
// System.out.println(v);
// System.out.println("Starting...");
// 4. 合并操作
CompletableFuture<String> combinedCompletableFuture = CompletableFuture.supplyAsync(() -> {
// 获取数据操作,比如来自于数据库
return String.format("[Thread : %s]Hello World...", Thread.currentThread().getName());
}).thenApply(value -> {
System.out.printf("current thread : %s\n", Thread.currentThread().getName());
return value + " - 来自于数据库";
}).thenApplyAsync(value -> {
System.out.printf("current thread : %s\n", Thread.currentThread().getName());
return value + " at " + LocalDate.now();
}).exceptionally(e -> {
// 异常处理
e.printStackTrace();
return "";
});
while (!combinedCompletableFuture.isDone()) {
}
String v = combinedCompletableFuture.get();
System.out.println(v);
System.out.println("Starting...");
}
}
事实上,如果每个操作都很简单的话,没有必要用这种多线程异步的方式,因为创建线程还需要时间,还不如直接同步执行来得快。
事实证明,只有当每个操作很复杂需要花费相对很长的时间(比如,调用多个其它的系统的接口)的时候用 CompletableFuture
才合适,不然区别真的不大,还不如顺序同步执行。