CompletableFuture
从本质上说,Future表示一个异步计算的结果。它提供了isDone()
来检测计算是否已经完成,并且在计算结束后,可以通过get()
方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
CompletableFuture是Future接口的扩展和增强。CompletableFuture完整地继承了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
- 同步:使用当前线程运行任务;
- 异步:使用CompletableFuture线程池其他线程运行任务,异步方法的名字中带有
Async
.
runAsync
CompletableFuture.runAsync(
new Runnable() {
@Override
public void run() {
// 待运行的任务
}
});
CompletableFuture.runAsync(
() -> {
// 待运行的任务
});
supplyAsync、thenApply、thenAccept、thenRun
CompletableFuture<String> supplyAsync =
CompletableFuture.supplyAsync(
() -> {
// 有返回值
return "返回值";
});
CompletableFuture<String> thenApply =
supplyAsync.thenApply(
name -> {
// 接受值、有返回值
return name;
});
CompletableFuture<Void> thenAccept =
thenApply.thenAccept(
name -> {
// 接受值、返回值void
System.err.println(name);
});
CompletableFuture<Void> thenRun =
thenAccept.thenRun(
() -> {
// 没有接受值、返回值void
});
thenCompose、thenCombine
CompletableFuture<String> supplyAsync1 =
CompletableFuture.supplyAsync(
() -> {
return "返回值";
});
CompletableFuture<String> thenCompose =
supplyAsync1.thenCompose(
name -> {
// thenApply() 返回计算结果的原始类型
// thenCompose() 返回CompletableFuture类型
return CompletableFuture.supplyAsync(() -> name + "test");
});
// 3.2 组合两个独立的任务
CompletableFuture<Integer> roundsFuture = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture<Integer> winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);
// 在另外两个参数未就绪时,它将会处于等待状态。
CompletableFuture<Object> winRateFuture =
roundsFuture.thenCombine(
winRoundsFuture,
(rounds, winRounds) -> {
if (rounds == 0) {
return 0.0;
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
});
anyOf、allOf
CompletableFuture<Integer> roundsFuture1 =
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(200);
return 500;
} catch (InterruptedException e) {
return null;
}
});
CompletableFuture<Integer> winRoundsFuture1 =
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(100);
return 365;
} catch (InterruptedException e) {
return null;
}
});
// anyOf 等待任一任务执行结束后执行、返回执行结果
CompletableFuture<Object> completedFuture =
CompletableFuture.anyOf(winRoundsFuture1, roundsFuture1);
System.out.println(completedFuture.get()); // 返回365
// allOf 所有任务结束后执行特定的动作、返回Void
CompletableFuture<Void> completedFutures =
CompletableFuture.allOf(winRoundsFuture, roundsFuture);
异常处理 exceptionally handle
CompletableFuture.supplyAsync(
() -> {
int i = 1 / 0;
return "返回值";
})
.exceptionally(
// 捕获异常并返回错误情况下的默认值、仅在发生异常时才会调用
ex -> {
System.out.println("出错:" + ex.getMessage());
return "";
});
CompletableFuture.supplyAsync(
() -> {
int i = 1 / 0;
return "返回值";
})
.handle(
// 那么无论是否发生异常,都会调用它
// 需要通过 if (ex != null) 来判断是否发生了异常
(res, ex) -> {
if (ex != null) {
System.out.println("出错:" + ex.getMessage());
return "";
}
return res;
});
使用自定义线程池
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> roundsFuture2 =
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(200);
return 500;
} catch (InterruptedException e) {
return null;
}
},
executor);