CompletableFuture

29

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法;

CompletableFuture是Future接口的扩展和增强。CompletableFuture完整地继承了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

  • 同步:使用当前线程运行任务;
  • 异步:使用CompletableFuture线程池其他线程运行任务,异步方法的名字中带有Async.

runAsync

    CompletableFuture.runAsync(
        new Runnable() {
          @Override
          public void run() {
            // 待运行的任务
          }
        });
    CompletableFuture.runAsync(
        () -> {
          // 待运行的任务
        });

supplyAsync、thenApply、thenAccept、thenRun

    CompletableFuture<String> supplyAsync =
        CompletableFuture.supplyAsync(
            () -> {
              // 有返回值
              return "返回值";
            });
    CompletableFuture<String> thenApply =
        supplyAsync.thenApply(
            name -> {
              // 接受值、有返回值
              return name;
            });
    CompletableFuture<Void> thenAccept =
        thenApply.thenAccept(
            name -> {
              // 接受值、返回值void
              System.err.println(name);
            });
    CompletableFuture<Void> thenRun =
        thenAccept.thenRun(
            () -> {
              // 没有接受值、返回值void
            });

thenCompose、thenCombine

    CompletableFuture<String> supplyAsync1 =
        CompletableFuture.supplyAsync(
            () -> {
              return "返回值";
            });
    CompletableFuture<String> thenCompose =
        supplyAsync1.thenCompose(
            name -> {
              // thenApply() 返回计算结果的原始类型
              // thenCompose() 返回CompletableFuture类型
              return CompletableFuture.supplyAsync(() -> name + "test");
            });
    // 3.2 组合两个独立的任务
    CompletableFuture<Integer> roundsFuture = CompletableFuture.supplyAsync(() -> 500);
    CompletableFuture<Integer> winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);
    // 在另外两个参数未就绪时,它将会处于等待状态。
    CompletableFuture<Object> winRateFuture =
        roundsFuture.thenCombine(
            winRoundsFuture,
            (rounds, winRounds) -> {
              if (rounds == 0) {
                return 0.0;
              }
              DecimalFormat df = new DecimalFormat("0.00");
              return df.format((float) winRounds / rounds);
            });

anyOf、allOf

    CompletableFuture<Integer> roundsFuture1 =
        CompletableFuture.supplyAsync(
            () -> {
              try {
                Thread.sleep(200);
                return 500;
              } catch (InterruptedException e) {
                return null;
              }
            });
    CompletableFuture<Integer> winRoundsFuture1 =
        CompletableFuture.supplyAsync(
            () -> {
              try {
                Thread.sleep(100);
                return 365;
              } catch (InterruptedException e) {
                return null;
              }
            });
    // anyOf 等待任一任务执行结束后执行、返回执行结果
    CompletableFuture<Object> completedFuture =
        CompletableFuture.anyOf(winRoundsFuture1, roundsFuture1);
    System.out.println(completedFuture.get()); // 返回365
    // allOf 所有任务结束后执行特定的动作、返回Void
    CompletableFuture<Void> completedFutures =
        CompletableFuture.allOf(winRoundsFuture, roundsFuture);

异常处理 exceptionally handle

    CompletableFuture.supplyAsync(
            () -> {
              int i = 1 / 0;
              return "返回值";
            })
        .exceptionally(
            // 捕获异常并返回错误情况下的默认值、仅在发生异常时才会调用
            ex -> {
              System.out.println("出错:" + ex.getMessage());
              return "";
            });
    CompletableFuture.supplyAsync(
            () -> {
              int i = 1 / 0;
              return "返回值";
            })
        .handle(
            // 那么无论是否发生异常,都会调用它
            // 需要通过 if (ex != null) 来判断是否发生了异常
            (res, ex) -> {
              if (ex != null) {
                System.out.println("出错:" + ex.getMessage());
                return "";
              }
              return res;
            });

使用自定义线程池

    Executor executor = Executors.newFixedThreadPool(10);

    CompletableFuture<Integer> roundsFuture2 =
        CompletableFuture.supplyAsync(
            () -> {
              try {
                Thread.sleep(200);
                return 500;
              } catch (InterruptedException e) {
                return null;
              }
            },
            executor);