Java 中的异步编程工具 CompletableFuture

Java 中的异步编程工具 CompletableFuture

文章目录

  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。


环境信息:

  • JDK 版本: 1.8

示例地址:


一、CompletableFuture 概述

1.1 什么是同步和异步

在介绍 CompletableFuture 之前,先简单说一下什么是同步和异步这两个概念,介绍这俩个概念最直接的方式就是使用经典的 烧水泡茶 过程进行描述,一般来讲我们烧水泡茶时会经过 洗茶壶->注水->烧水->洗茶杯->装茶叶->泡茶 几个步骤:

如果使用 同步编程,那么泡茶过程可以划分为:

如果使用 异步编程,那么泡茶过程可以划分为:

根据上面的例子,相信大家对同步和异步有了一个大概了解,同步就是按指定顺序执行任务,这种执行过程必须按顺序进行,一步步执行,所以执行速度比较缓慢。而 异步 则不需要按照指定的顺序执行任务,它可以多个任务并行执行,等待异步任务完成后进行反馈,然后按照反馈结果执行特定操作即可,所以执行速度非常快。

1.2 CompletableFuture 简介

在 JDK 8 版本中,新增了很多功能与工具,其中 CompletableFuture 就是其中一种用于构建异步编程的工具,该工具主要是对 Future 进行优化,使开发者可以很方便的创建串行或者并行任务,以及实现任务的 OR 或者 AND 这种组合方式,极大程度上简化了异步编程中的代码量。

1.3 为什么推出 CompletableFuture

在 JDK 8 之前的版本中,编写异步任务常使用 Future 实现,使用 Future 执行异步任务并且获得异步执行结果时,我们会通过两种方式,要么调用阻塞的 get() 方法,或者轮询调用 isDone() 方法获取任务状态,判断是否为 true。不过这两种方法在执行时都会使主线程被迫等待,对性能会产生一定影响,故而 Java 团队在 JDK 8 版本中新增了 CompletableFuture 工具,通过使用观察者模式进行设计,实现异步回调进行异步编程,一定程度上降低了编写异步任务的代码量和难度。

二、CompletableFuture 中的分类

在 CompletableFuture 中有很多方法,并且这些方法命名并不是很容易归类,比如 thenRun 和 thenRunAsync 方法,这些方法有的以 Async 结尾,而有的则不是,一般情况下我们会认为带 Async 的方法属于异步方法,不带 Async 的方法属于同步方法,不过这个看法其实是并不是完全正确,因为有的方法即使不以 Async 结尾,也是通过异步方式执行的。

其实,本人认为如果需要对 CompletableFuture 中的方法进行归类的话,可以按照使用过程进行分类,这样比较容易将不同的方法划分到不同的类别中。比如,使用过程可以划分为 任务开启任务处理任务结束查看任务状态设置任务结果 以及 任务异常处理 等几种方法,其中 任务处理 又可以分为 串行任务并行任务

注意: 下面要罗列的以 Async 结尾的方法中,执行时并不会直接从 commonPool 线程池中获取线程,而是判断环境中设置的变量 commonParallelism 并行度,或者应用所在服务器的 CPU 数量是否大于 1,如果满足条件则就会从 ForkJoinPool.commonPool() 线程池中获取一个线程执行任务,如果不满足条件就会每次执行任务时新建一个线程,使用新创建的线程去执行任务。在下面介绍线程池时,会提到这一点。

2.1 任务开启方法

任务开启方法一般指的是 CompletableFuture 中,可以直接调用的静态方法,使用这些方法可以直接开启一个新任务,该任务相当于任务链中要执行的第一个步骤,常常我们会在该步骤中执行获取待处理的数据等操作。

任务开启方法包含如下:

方法名称 有返回值 描述
runAsync     x     从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。
supplyAsync     √     从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,返回指定类型的返回值。

2.2 任务处理方法

任务处理方法一般指的是 CompletableFuture 中,用于以串行或并行的方式处理数据的静态或实例方法,使用这些方法可以实现 OR 或 AND 任务组合,是异步编程中最重要的一些方法。

任务处理方法包含如下:

任务处理-串行任务

方法名称 有返回值 描述
thenRun     x     串行执行任务。并且该任务方法执行结束后,没有返回值。
thenRunAsync     x     串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。
thenApply     √     串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenApplyAsync     √     串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenAccept     x     串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。
thenAcceptAsync     x     串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
handle     √     串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
handleAsync     √     串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
whenComplete     x     串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。
whenCompleteAsync     x     串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
thenCompose     √     串行执行任务,按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。
thenComposeAsync     √     串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。

任务处理-并行任务

方法名称 有返回值 描述
thenCombine     √     并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenCombineAsync     √     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenAcceptBoth     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
thenAcceptBothAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
runAfterBoth     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。
runAfterBothAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
applyToEither     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
applyToEitherAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
runAfterEither     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
runAfterEitherAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
acceptEither     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
acceptEitherAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
allOf     √     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。
anyOf     √     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。

2.3 任务结束方法

任务结束指的是调用 CompletableFuture 中的实例方法,获取执行结果或者取消任务等,结束现有的任务链。

任务结束包含的方法如下:

方法名称 有返回值 描述
get     √     获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。
join     √     获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。
getNow     √     立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。
cancel     √     取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。

2.4 查看任务状态方法

查看任务状态方法用于查看 CompletableFuture 任务执行状态,其中包含的方法如下:

方法名称 有返回值 描述
isDone     √     查看任务是否执行完成,如果当前阶段执行完成(无论是正常完成还是异常完成)则返回 true,否则返回 false。
isCancelled     √     查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。
isCompletedExceptionally     √     查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。

2.5 设置任务结果方法

设置任务结果方法用于设置 CompletableFuture 任务结果,使其返回指定结果,其中包含的方法如下:

方法名称 有返回值 描述
obtrudeValue     x     设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。
obtrudeException     x     设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。
complete     √     设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。
completeException     √     设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

2.6 任务异常处理方法

任务异常处理方法用于处理 CompletableFuture 执行过程中的异常,对异常进行处理,其中包含的方法如下:

方法名称 有返回值 描述
exceptionally     x     判断上一个任务执行时是否发生异常,如果是则就会执行 exceptionally 方法,并且将上一步异常作为当前方法的参数,然后对异常进行处理。当然,如果上一阶段执行过程中没有出现异常,则不会执行 exceptionally 方法。

三、CompletableFuture 中的线程池

3.1 CompletableFuture 默认的线程池

使用 CompletableFuture 执行异步任务时,会首先判断设置的 parallelism 数量,该参数一般跟当前服务器 CPU 数量相关:

  • 如果 CPU 数量等于 1,CompletableFuture 执行任务时,每次都会创建一个新的线程执行任务;
  • 如果 CPU 数量大于 1,CompletableFuture 执行任务时,将使用公共的 ForkJoinPool.commonPool() 线程池;

一般情况下在多核 CPU 服务器中运行应用,都会默认使用 ForkJoinPool.commonPool() 线程池,该线程池根据名称也可以大概了解到,是基于 Fork 和 Join 组合实现的,执行过程中可以将大的任务拆分为多个小任务并行执行,并且支持以窃取的方式,线程池中的线程在执行完自己工作队列中的任务后,可以窃取别的线程工作队列中没有执行完成的任务,协助其执行,尽可能使用并行的方式快速完成全部任务。所以 ForkJoinPool.commonPool() 线程池更适合执行计算密集型任务,而不太适合 IO 密集型任务

不过公共的 ForkJoinPool.commonPool() 线程池是 JVM 进程中所有 CompletableFuture 和 Stream 共享,如果全局上下文环境中存在大量使用 ForkJoinPool.commonPool() 线程池的任务,并且这些任务中包含大量的 IO 操作,那么该线程池性能将会受到很大影响。所以,一般情况下我们使用 CompletableFuture 时,需要避免使用公共线程池,而是使用自定义的线程池,并且设置不同的任务使用不同类型的线程池,以适用不同的任务场景。

3.2 CompletableFuture 如何使用自定义线程池

在 CompletableFuture 中的绝大部分方法,都存在接收不同类型的重载方法,如:

正常的 CompletableFuture 方法:

  • thenRun(Runnable action)
  • handle(BiFunction<? super T, Throwable, ? extends U> fn)

使用公共线程池 commonPool 的异步方法:

  • thenRunAsync(Runnable action)
  • handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

使用自定义线程池的 CompletableFuture 方法:

  • thenRunAsync(Runnable action,Executor executor)
  • handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

大家根据上面的三种方法也能得知,在 CompletableFuture 中的方法,大部分都存在可以接收自定义线程池 Executor 参数的重载方法。因此,我们可以使用接收自定义线程池的 CompletableFuture 方法,将我们自定义的线程池作为参数,传入其中,使用该线程池中的线程执行任务。

如下,是一个使用 CompletableFuture 方法执行任务时使用自定义线程的示例:

 1import java.time.LocalDate;
 2import java.util.concurrent.*;
 3
 4public class SupplyExample {
 5
 6   /**
 7       * 创建自定义线程池
 8       *
 9       * @return 自定义线程池
10       */
11    public static ThreadPoolExecutor myThreadPool() {
12        // 核心线程数
13        int coreSize = 10;
14        // 最大线程数
15        int maxSize = 20;
16        // 空闲线程的回收时间和单位
17        TimeUnit timeUnit = TimeUnit.SECONDS;
18        // 空闲线程时间销毁
19        long keepAliveTime = 60L;
20        // 工作队列
21        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);
22        // 拒绝策略
23        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
24
25        // 创建并返回自定义线程池
26        return new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, timeUnit, workQueue, handler);
27    }
28
29   /**
30       * 执行 CompletableFuture.supplyAsync 方法
31       */
32    public static void supplyAsyncExample() {
33        // 自定义线程池
34        ThreadPoolExecutor threadPool = myThreadPool();
35
36        // 执行 CompletableFuture 任务,将日期字符串转换为 LocalDate 日期对象
37        CompletableFuture<LocalDate> cf = CompletableFuture
38                .supplyAsync(() -> LocalDate.parse("2022-06-01"), threadPool);
39
40        // 调用 join 方法进入堵塞状态,直至获取任务执行结果输出到控制台
41        System.out.println(cf.join());
42    }
43
44   /**
45       * main() 方法
46       */
47    public static void main(String[] args) {
48        supplyAsyncExample();
49    }
50
51}

3.3 CPU 集型任务和 IO 密集型任务

常常我们执行的任务可以根据 CPU 的使用或者 IO 的使用进行划分,可以分为:

CPU 密集型任务

CPU 密集型任务也称为计算密集型任务,指的是任务执行过程需要进行大量计算,没有堵塞,且消耗大量的 CPU 资源。比如,计算圆周率,视频解码,类型转换等。

IO 密集型任务

IO 密集型任务,指的是需要进行大量磁盘IO读取,网络IO操作等,执行过程会造成堵塞,需要创建大量的线程执行任务。比如,数据库读写,文件读写,网络请求等。

3.4 不同类型任务对线程池的选择

在 Java 中常用的线程池按执行方式划分的话,其实可以划分为 ForkJoinPoolExecutorService 两种。

ForkJoinPool 线程池

关于 ForkJoinPool 线程池上面也提到过,这种线程池是基于分而治之的思想对任务进行拆解与合并,执行过程需要消耗大量 CPU 分解任务,然后进行计算,因此特别适合执行 计算密集型任务

ExecutorService 线程池

而 ExecutorService 线程池是传统方式的线程池,使用池化方式管理线程,提前将若干个可执行的线程放入池中,当我们需要时就从池中获取线程执行任务,执行过程不会对任务进行拆解,并且使用完毕后不需要销毁线程而是放回池中,方便下次再使用,从而减少创建和销毁线程的性能开销。所以,这种线程池更适合执行过程中需要进行大量 IO 操作的任务,即适合 IO 密集型任务

3.5 线程线程大小配置推荐

一般情况下,业务上使用的线程池都会设置线程池的线程数量大小,这个线程池线程数量大小设置为多少是一个值得考虑的问题。设置线程池过多会造成线程浪费,过少会造成处理任务的线程过少,造成任务堆积。所以,这个配置线程数量多大并不容易把控。最优的就是配置不同的线程数进行测试,然后判断应用设置线程池大小为多大性能最优。

这里可以参考网上的一套万能的推荐配置,跟操作系统的 CPU 数量相关,如下:

  • CPU 密集型任务: N + 1
  • IO 密集型任务: 2N + 1

还有一种针对 IO 密集型任务设置估算公式,按公式进行配置这种性能最优,公式如下:

  • 估算公式: = (线程等待时间 + 线程 CPU 时间 / 线程 CPU 时间) * CPU 核心数

如果是既有 IO 操作的步骤,也有比较消耗 CPU 的步骤,这种混合型任务可以进行拆分,将不同的任务使用不同的线程池。

四、CompletableFuture 任务开启方法示例

4.1 runAsync

方法描述

方法 有返回值 描述
runAsync        x        从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。

任务要求

使用异步的方式,模拟获取远程信息,然后将获取的远程信息输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2
 3public class RunAsyncExample {
 4
 5    public static void runAsyncExample() {
 6        // 调用 runAsync 方法,异步执行 runnable 中的代码逻辑,模拟获取远程信息
 7        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("模拟获取远程信息并输出到控制台"));
 8
 9        // 调用 join 方法进行等待,获取执行结果
10        cf.join();
11    }
12
13    public static void main(String[] args) {
14        runAsyncExample();
15    }
16
17}

4.2 supplyAsync

方法描述

方法 有返回值 描述
supplyAsync        √        从 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,返回指定类型的返回值。

任务要求

使用异步的方式,模拟获取远程信息,然后将获取的信息作为任务返回结果,之后将任务返回的结果输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2
 3public class SupplyAsyncExample {
 4
 5    public static void supplyAsyncExample() {
 6        // 调用 supplyAsync 方法,异步执行 runnable 中的代码逻辑,模拟获取远程信息,然后返回
 7        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
 8            System.out.println("模拟获取远程信息");
 9            return "远程信息";
10        });
11
12        // 调用 join 方法进行等待,获取执行结果
13        String result = cf.join();
14        System.out.println(result);
15    }
16
17    public static void main(String[] args) {
18        supplyAsyncExample();
19    }
20
21}

五、CompletableFuture 任务处理-串行任务方法示例

5.1 thenRun 和 thenRunAsync

方法描述

方法 有返回值 描述
thenRun        x        使用主线程或者执行上一步任务的子线程,串行执行任务。并且该任务方法执行结束后,没有返回值。
thenRunAsync        x        串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。

任务要求

使用 CompletableFuture 工具,按照代码顺序执行任务,处理使用 "," 拼接的日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,先通过 , 拆分字符串,然后存入原子引用类型对象包裹的 List 集合中;
  • 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;

任务执行完成后,将全部日期对象输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenRun

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.ArrayList;
 4import java.util.Arrays;
 5import java.util.List;
 6import java.util.concurrent.CompletableFuture;
 7import java.util.concurrent.atomic.AtomicReference;
 8
 9public class ThenRunExample {
10    
11    public static void thenRunExample() {
12        // 模拟的远程获取的日期字符串
13        String dateStr = "1995-05-10,2000-03-15";
14        // 原子引用类对象
15        AtomicReference<List<String>> dateStrList = new AtomicReference<>();
16        // LocalDate集合
17        List<LocalDate> dateList = new ArrayList<>();
18
19        // 执行 CompletableFuture 任务
20        // (1) 第一步,先通过 "," 拆分字符串,然后存入原子引用对象包裹的 List 集合中;
21        // (2) 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;
22        CompletableFuture
23                .runAsync(() -> dateStrList.set(Arrays.asList(dateStr.split(","))))
24                .thenRun(() -> dateStrList.get().forEach(v -> dateList.add(LocalDate.parse(v, DateTimeFormatter.ofPattern("yyyy-MM-dd")))))
25                .join();
26
27        // 输出转换结果
28        for (LocalDate localDate : dateList) {
29            System.out.println(localDate.toString());
30        }
31    }
32
33    public static void main(String[] args) {
34        thenRunExample();
35    }
36
37}

thenRunAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.ArrayList;
 4import java.util.Arrays;
 5import java.util.List;
 6import java.util.concurrent.CompletableFuture;
 7import java.util.concurrent.atomic.AtomicReference;
 8
 9public class ThenRunAsyncExample {
10
11    public static void thenRunAsyncExample() {
12        // 模拟的远程获取的日期字符粗
13        String dateStr = "1995-05-10,2000-03-15";
14        // 原子引用类对象
15        AtomicReference<List<String>> dateStrList = new AtomicReference<>();
16        // LocalDate集合
17        List<LocalDate> dateList = new ArrayList<>();
18
19        // 执行 CompletableFuture 任务
20        // (1) 第一步,先通过 "," 拆分字符串,然后存入原子引用对象包裹的 List 集合中;
21        // (2) 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;
22        CompletableFuture
23                .runAsync(() -> dateStrList.set(Arrays.asList(dateStr.split(","))))
24                .thenRunAsync(() -> dateStrList.get().forEach(v -> dateList.add(LocalDate.parse(v, DateTimeFormatter.ofPattern("yyyy-MM-dd")))))
25                .join();
26
27        // 输出转换结果
28        for (LocalDate localDate : dateList) {
29            System.out.println(localDate.toString());
30        }
31    }
32
33    public static void main(String[] args) {
34        thenRunAsyncExample();
35    }
36
37}

5.2 thenApply 和 thenApplyAsync

方法描述

方法 有返回值 描述
thenApply        √        使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenApplyAsync        √        串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

任务要求

使用 CompletableFuture 按照顺序执行任务,获取远程日期字符串,然后对日期进行逻辑计算,并返回计算后的结果。

  • 第一步,先模拟通过远程结果获取日期字符串;
  • 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
  • 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenApply

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.concurrent.CompletableFuture;
 4
 5public class ThenApplyExample {
 6
 7    public static void thenApplyExample() {
 8        // 执行 CompletableFuture 串行任务
 9        CompletableFuture<String> cf = CompletableFuture
10                // 第一步,先模拟通过远程结果获取日期字符串;
11                .supplyAsync(() -> "2022-06-01")
12                // 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
13                .thenApply(param -> LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd")))
14                // 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
15                .thenApply(param -> param.plusDays(10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
16
17        // 调用 join 方法进入堵塞状态,直至获取任务执行结果
18        String result = cf.join();
19        System.out.println(result);
20    }
21
22    public static void main(String[] args) {
23        thenApplyExample();
24    }
25
26}

thenApplyAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.concurrent.CompletableFuture;
 4
 5public class ThenApplyAsyncExample {
 6
 7    public static void thenApplyAsyncExample() {
 8        // 执行 CompletableFuture 串行任务
 9        CompletableFuture<String> cf = CompletableFuture
10                // 第一步,先模拟通过远程结果获取日期字符串;
11                .supplyAsync(() -> "2022-06-01")
12                // 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
13                .thenApplyAsync(param -> LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd")))
14                // 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
15                .thenApplyAsync(param -> param.plusDays(10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
16
17        // 调用 join 方法进入堵塞状态,直至获取任务执行结果
18        String result = cf.join();
19        System.out.println(result);
20    }
21
22    public static void main(String[] args) {
23        thenApplyAsyncExample();
24    }
25
26}

5.3 thenAccept 和 thenAcceptAsync

方法描述

方法 有返回值 描述
thenAccept        x        使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。
thenAcceptAsync        x        串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。

任务要求

使用 CompletableFuture 按照顺序执行任务,处理使用 "," 拼接的日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,先通过 , 拆分字符串,转换为字符串数组,返回字符串数组;
  • 第二步,接收上一步中的日期字符串数组,遍历日期字符串数组,然后将数组中的日期字符串都转换为 LocalDate 日期对象;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenAccept

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.List;
 4import java.util.concurrent.CompletableFuture;
 5import java.util.concurrent.CopyOnWriteArrayList;
 6
 7public class ThenAcceptExample {
 8
 9    public static void thenAcceptExample() {
10        // 模拟的远程获取的日期字符粗
11        String dateStr = "1995-05-10,2000-03-15";
12        // 存储 LocalDate 的集合
13        List<LocalDate> dateList = new CopyOnWriteArrayList<>();
14
15        // 执行 CompletableFuture 任务
16        CompletableFuture
17                .supplyAsync(() -> dateStr.split(","))
18                .thenAccept(dateArray -> {
19                    for (String s : dateArray) {
20                        dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
21                    }
22                })
23                .join();
24
25        // 输出转换结果
26        for (LocalDate localDate : dateList) {
27            System.out.println(localDate.toString());
28        }
29    }
30
31    public static void main(String[] args) {
32        thenAcceptExample();
33    }
34
35}

thenAcceptAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.List;
 4import java.util.concurrent.CompletableFuture;
 5import java.util.concurrent.CopyOnWriteArrayList;
 6
 7public class ThenAcceptAsyncExample {
 8
 9    public static void thenAcceptAsyncExample() {
10        // 模拟的远程获取的日期字符粗
11        String dateStr = "1995-05-10,2000-03-15";
12        // 存储 LocalDate 的集合
13        List<LocalDate> dateList = new CopyOnWriteArrayList<>();
14
15        // 执行 CompletableFuture 任务
16        CompletableFuture
17                .supplyAsync(() -> dateStr.split(","))
18                .thenAcceptAsync(dateArray -> {
19                    for (String s : dateArray) {
20                        dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
21                    }
22                })
23                .join();
24
25        // 输出转换结果
26        for (LocalDate localDate : dateList) {
27            System.out.println(localDate.toString());
28        }
29    }
30
31    public static void main(String[] args) {
32        thenAcceptAsyncExample();
33    }
34
35}

5.4 handle 和 handleAsync

方法描述

方法 有返回值 描述
handle        √        使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
handleAsync        √        串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

任务要求

使用 CompletableFuture 按照顺序执行任务,从远程接口获取日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,获取远程日期字符串,然后返回;
  • 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

handle

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.Random;
 4import java.util.concurrent.CompletableFuture;
 5
 6public class HandleExample {
 7
 8    public static void handleExample() {
 9        // 执行 CompletableFuture 串行任务:
10        // 第一步,获取远程日期字符串,然后返回
11        // 第二步,接收上一步中的日期字符串,将其转换为 `LocalDate` 日期对象
12        CompletableFuture<LocalDate> cf = CompletableFuture
13                .supplyAsync(() -> {
14                    int random = new Random().nextInt(2);
15                    // 50% 概率发生异常
16                    if (random != 0) {
17                        throw new RuntimeException("模拟发生异常");
18                    }
19                    // 50% 概率返回正常值
20                    return "2022-06-01";
21                })
22                .handle((param, exception) -> {
23                    // 如果上一步结果为异常,则返回现在的日期,否则将上一步获取的日期字符串转换为日期对象
24                    if (exception != null) {
25                        return LocalDate.now();
26                    }
27                    return LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
28                });
29
30        // 调用 join 方法进入堵塞状态,直至获取任务执行结果
31        LocalDate result = cf.join();
32        System.out.println(result);
33    }
34
35    public static void main(String[] args) {
36        handleExample();
37    }
38
39}

handleAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.Random;
 4import java.util.concurrent.CompletableFuture;
 5
 6public class HandleAsyncExample {
 7
 8    public static void handleAsyncExample() {
 9        // 执行 CompletableFuture 串行任务:
10        // 第一步,获取远程日期字符串,然后返回;
11        // 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象;
12        CompletableFuture<LocalDate> cf = CompletableFuture
13                .supplyAsync(() -> {
14                    int random = new Random().nextInt(2);
15                    // 50% 概率发生异常
16                    if (random != 0) {
17                        throw new RuntimeException("模拟发生异常");
18                    }
19                    // 50% 概率返回正常值
20                    return "2022-06-01";
21                })
22                .handleAsync((param, exception) -> {
23                    // 如果上一步结果为异常,则返回现在的日期,否则将上一步获取的日期字符串转换为日期对象
24                    if (exception != null) {
25                        return LocalDate.now();
26                    }
27                    return LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
28                });
29
30        // 调用 join 方法进入堵塞状态,直至获取任务执行结果
31        LocalDate result = cf.join();
32        System.out.println(result);
33    }
34
35    public static void main(String[] args) {
36        handleAsyncExample();
37    }
38
39}

5.5 whenComplete 和 whenCompleteAsync

方法描述

方法 有返回值 描述
whenComplete        x        使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。
whenCompleteAsync        x        串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
  除此之外:
   如果上一阶段中正常执行结束,则该方法的结果参数不为 null;
   如果上一阶段中抛出异常,则该方法的异常参数不为 null;

任务要求

使用 CompletableFuture 按照顺序执行任务,从远程接口获取日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,获取远程日期字符串,然后返回;
  • 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象,并且加入到集合;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

whenComplete

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.concurrent.CopyOnWriteArrayList;
 4import java.util.List;
 5import java.util.Random;
 6import java.util.concurrent.CompletableFuture;
 7
 8public class WhenCompleteExample {
 9
10    public static void whenCompleteExample() {
11        // 模拟的远程获取的日期字符粗
12        String dateStr = "1995-05-10,2000-03-15";
13        // 存储 LocalDate 的集合
14        List<LocalDate> dateList = new CopyOnWriteArrayList<>();
15
16        // 执行 CompletableFuture 任务
17        CompletableFuture
18                .supplyAsync(() -> {
19                    int random = new Random().nextInt(2);
20                    if (random == 1){
21                        throw new RuntimeException("模拟发生异常");
22                    }
23                    return dateStr.split(",");
24                })
25                .whenComplete((dateArray, exception) -> {
26                    // 如果上一步执行过程没有发生异常,则将日期字符串转换为日期对象
27                    if (dateArray != null){
28                        for (String s : dateArray) {
29                            dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
30                        }
31                    }
32                })
33                .join();
34
35        // 输出转换结果
36        for (LocalDate localDate : dateList) {
37            System.out.println(localDate.toString());
38        }
39    }
40
41    public static void main(String[] args) {
42        whenCompleteExample();
43    }
44
45}

whenCompleteAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.CopyOnWriteArrayList;
 4import java.util.List;
 5import java.util.Random;
 6import java.util.concurrent.CompletableFuture;
 7
 8public class WhenCompleteAsyncExample {
 9
10    public static void whenCompleteAsyncExample(){
11        // 模拟的远程获取的日期字符粗
12        String dateStr = "1995-05-10,2000-03-15";
13        // 存储 LocalDate 的集合
14        List<LocalDate> dateList = new CopyOnWriteArrayList<>();
15
16        // 执行 CompletableFuture 任务
17        CompletableFuture
18                .supplyAsync(() -> {
19                    int random = new Random().nextInt(2);
20                    if (random == 1){
21                        throw new RuntimeException("模拟发生异常");
22                    }
23                    return dateStr.split(",");
24                })
25                .whenCompleteAsync((dateArray, exception) -> {
26                    // 如果上一步执行过程没有发生异常,则将日期字符串转换为日期对象
27                    if (dateArray != null){
28                        for (String s : dateArray)