Java 中的异步编程工具 CompletableFuture

Java 中的异步编程工具 CompletableFuture

文章目录

  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。


环境信息:

  • JDK 版本: 1.8

示例地址:


一、CompletableFuture 概述

1.1 什么是同步和异步

在介绍 CompletableFuture 之前,先简单说一下什么是同步和异步这两个概念,介绍这俩个概念最直接的方式就是使用经典的 烧水泡茶 过程进行描述,一般来讲我们烧水泡茶时会经过 洗茶壶->注水->烧水->洗茶杯->装茶叶->泡茶 几个步骤:

如果使用 同步编程,那么泡茶过程可以划分为:

如果使用 异步编程,那么泡茶过程可以划分为:

根据上面的例子,相信大家对同步和异步有了一个大概了解,同步就是按指定顺序执行任务,这种执行过程必须按顺序进行,一步步执行,所以执行速度比较缓慢。而 异步 则不需要按照指定的顺序执行任务,它可以多个任务并行执行,等待异步任务完成后进行反馈,然后按照反馈结果执行特定操作即可,所以执行速度非常快。

1.2 CompletableFuture 简介

在 JDK 8 版本中,新增了很多功能与工具,其中 CompletableFuture 就是其中一种用于构建异步编程的工具,该工具主要是对 Future 进行优化,使开发者可以很方便的创建串行或者并行任务,以及实现任务的 OR 或者 AND 这种组合方式,极大程度上简化了异步编程中的代码量。

1.3 为什么推出 CompletableFuture

在 JDK 8 之前的版本中,编写异步任务常使用 Future 实现,使用 Future 执行异步任务并且获得异步执行结果时,我们会通过两种方式,要么调用阻塞的 get() 方法,或者轮询调用 isDone() 方法获取任务状态,判断是否为 true。不过这两种方法在执行时都会使主线程被迫等待,对性能会产生一定影响,故而 Java 团队在 JDK 8 版本中新增了 CompletableFuture 工具,通过使用观察者模式进行设计,实现异步回调进行异步编程,一定程度上降低了编写异步任务的代码量和难度。

二、CompletableFuture 中的分类

在 CompletableFuture 中有很多方法,并且这些方法命名并不是很容易归类,比如 thenRun 和 thenRunAsync 方法,这些方法有的以 Async 结尾,而有的则不是,一般情况下我们会认为带 Async 的方法属于异步方法,不带 Async 的方法属于同步方法,不过这个看法其实是并不是完全正确,因为有的方法即使不以 Async 结尾,也是通过异步方式执行的。

其实,本人认为如果需要对 CompletableFuture 中的方法进行归类的话,可以按照使用过程进行分类,这样比较容易将不同的方法划分到不同的类别中。比如,使用过程可以划分为 任务开启任务处理任务结束查看任务状态设置任务结果 以及 任务异常处理 等几种方法,其中 任务处理 又可以分为 串行任务并行任务

注意: 下面要罗列的以 Async 结尾的方法中,执行时并不会直接从 commonPool 线程池中获取线程,而是判断环境中设置的变量 commonParallelism 并行度,或者应用所在服务器的 CPU 数量是否大于 1,如果满足条件则就会从 ForkJoinPool.commonPool() 线程池中获取一个线程执行任务,如果不满足条件就会每次执行任务时新建一个线程,使用新创建的线程去执行任务。在下面介绍线程池时,会提到这一点。

2.1 任务开启方法

任务开启方法一般指的是 CompletableFuture 中,可以直接调用的静态方法,使用这些方法可以直接开启一个新任务,该任务相当于任务链中要执行的第一个步骤,常常我们会在该步骤中执行获取待处理的数据等操作。

任务开启方法包含如下:

方法名称 有返回值 描述
runAsync     x     从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。
supplyAsync     √     从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,返回指定类型的返回值。

2.2 任务处理方法

任务处理方法一般指的是 CompletableFuture 中,用于以串行或并行的方式处理数据的静态或实例方法,使用这些方法可以实现 OR 或 AND 任务组合,是异步编程中最重要的一些方法。

任务处理方法包含如下:

任务处理-串行任务

方法名称 有返回值 描述
thenRun     x     串行执行任务。并且该任务方法执行结束后,没有返回值。
thenRunAsync     x     串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。
thenApply     √     串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenApplyAsync     √     串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenAccept     x     串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。
thenAcceptAsync     x     串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
handle     √     串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
handleAsync     √     串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
whenComplete     x     串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。
whenCompleteAsync     x     串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
thenCompose     √     串行执行任务,按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。
thenComposeAsync     √     串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。

任务处理-并行任务

方法名称 有返回值 描述
thenCombine     √     并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenCombineAsync     √     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenAcceptBoth     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
thenAcceptBothAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
runAfterBoth     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。
runAfterBothAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
applyToEither     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
applyToEitherAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
runAfterEither     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
runAfterEitherAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
acceptEither     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
acceptEitherAsync     x     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
allOf     √     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。
anyOf     √     并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。

2.3 任务结束方法

任务结束指的是调用 CompletableFuture 中的实例方法,获取执行结果或者取消任务等,结束现有的任务链。

任务结束包含的方法如下:

方法名称 有返回值 描述
get     √     获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。
join     √     获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。
getNow     √     立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。
cancel     √     取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。

2.4 查看任务状态方法

查看任务状态方法用于查看 CompletableFuture 任务执行状态,其中包含的方法如下:

方法名称 有返回值 描述
isDone     √     查看任务是否执行完成,如果当前阶段执行完成(无论是正常完成还是异常完成)则返回 true,否则返回 false。
isCancelled     √     查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。
isCompletedExceptionally     √     查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。

2.5 设置任务结果方法

设置任务结果方法用于设置 CompletableFuture 任务结果,使其返回指定结果,其中包含的方法如下:

方法名称 有返回值 描述
obtrudeValue     x     设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。
obtrudeException     x     设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。
complete     √     设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。
completeException     √     设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

2.6 任务异常处理方法

任务异常处理方法用于处理 CompletableFuture 执行过程中的异常,对异常进行处理,其中包含的方法如下:

方法名称 有返回值 描述
exceptionally     x     判断上一个任务执行时是否发生异常,如果是则就会执行 exceptionally 方法,并且将上一步异常作为当前方法的参数,然后对异常进行处理。当然,如果上一阶段执行过程中没有出现异常,则不会执行 exceptionally 方法。

三、CompletableFuture 中的线程池

3.1 CompletableFuture 默认的线程池

使用 CompletableFuture 执行异步任务时,会首先判断设置的 parallelism 数量,该参数一般跟当前服务器 CPU 数量相关:

  • 如果 CPU 数量等于 1,CompletableFuture 执行任务时,每次都会创建一个新的线程执行任务;
  • 如果 CPU 数量大于 1,CompletableFuture 执行任务时,将使用公共的 ForkJoinPool.commonPool() 线程池;

一般情况下在多核 CPU 服务器中运行应用,都会默认使用 ForkJoinPool.commonPool() 线程池,该线程池根据名称也可以大概了解到,是基于 Fork 和 Join 组合实现的,执行过程中可以将大的任务拆分为多个小任务并行执行,并且支持以窃取的方式,线程池中的线程在执行完自己工作队列中的任务后,可以窃取别的线程工作队列中没有执行完成的任务,协助其执行,尽可能使用并行的方式快速完成全部任务。所以 ForkJoinPool.commonPool() 线程池更适合执行计算密集型任务,而不太适合 IO 密集型任务

不过公共的 ForkJoinPool.commonPool() 线程池是 JVM 进程中所有 CompletableFuture 和 Stream 共享,如果全局上下文环境中存在大量使用 ForkJoinPool.commonPool() 线程池的任务,并且这些任务中包含大量的 IO 操作,那么该线程池性能将会受到很大影响。所以,一般情况下我们使用 CompletableFuture 时,需要避免使用公共线程池,而是使用自定义的线程池,并且设置不同的任务使用不同类型的线程池,以适用不同的任务场景。

3.2 CompletableFuture 如何使用自定义线程池

在 CompletableFuture 中的绝大部分方法,都存在接收不同类型的重载方法,如:

正常的 CompletableFuture 方法:

  • thenRun(Runnable action)
  • handle(BiFunction<? super T, Throwable, ? extends U> fn)

使用公共线程池 commonPool 的异步方法:

  • thenRunAsync(Runnable action)
  • handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

使用自定义线程池的 CompletableFuture 方法:

  • thenRunAsync(Runnable action,Executor executor)
  • handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

大家根据上面的三种方法也能得知,在 CompletableFuture 中的方法,大部分都存在可以接收自定义线程池 Executor 参数的重载方法。因此,我们可以使用接收自定义线程池的 CompletableFuture 方法,将我们自定义的线程池作为参数,传入其中,使用该线程池中的线程执行任务。

如下,是一个使用 CompletableFuture 方法执行任务时使用自定义线程的示例:

 1import java.time.LocalDate;
 2import java.util.concurrent.*;
 3
 4public class SupplyExample {
 5
 6   /**
 7       * 创建自定义线程池
 8       *
 9       * @return 自定义线程池
10       */
11    public static ThreadPoolExecutor myThreadPool() {
12        // 核心线程数
13        int coreSize = 10;
14        // 最大线程数
15        int maxSize = 20;
16        // 空闲线程的回收时间和单位
17        TimeUnit timeUnit = TimeUnit.SECONDS;
18        // 空闲线程时间销毁
19        long keepAliveTime = 60L;
20        // 工作队列
21        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);
22        // 拒绝策略
23        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
24
25        // 创建并返回自定义线程池
26        return new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, timeUnit, workQueue, handler);
27    }
28
29   /**
30       * 执行 CompletableFuture.supplyAsync 方法
31       */
32    public static void supplyAsyncExample() {
33        // 自定义线程池
34        ThreadPoolExecutor threadPool = myThreadPool();
35
36        // 执行 CompletableFuture 任务,将日期字符串转换为 LocalDate 日期对象
37        CompletableFuture<LocalDate> cf = CompletableFuture
38                .supplyAsync(() -> LocalDate.parse("2022-06-01"), threadPool);
39
40        // 调用 join 方法进入堵塞状态,直至获取任务执行结果输出到控制台
41        System.out.println(cf.join());
42    }
43
44   /**
45       * main() 方法
46       */
47    public static void main(String[] args) {
48        supplyAsyncExample();
49    }
50
51}

3.3 CPU 集型任务和 IO 密集型任务

常常我们执行的任务可以根据 CPU 的使用或者 IO 的使用进行划分,可以分为:

CPU 密集型任务

CPU 密集型任务也称为计算密集型任务,指的是任务执行过程需要进行大量计算,没有堵塞,且消耗大量的 CPU 资源。比如,计算圆周率,视频解码,类型转换等。

IO 密集型任务

IO 密集型任务,指的是需要进行大量磁盘IO读取,网络IO操作等,执行过程会造成堵塞,需要创建大量的线程执行任务。比如,数据库读写,文件读写,网络请求等。

3.4 不同类型任务对线程池的选择

在 Java 中常用的线程池按执行方式划分的话,其实可以划分为 ForkJoinPoolExecutorService 两种。

ForkJoinPool 线程池

关于 ForkJoinPool 线程池上面也提到过,这种线程池是基于分而治之的思想对任务进行拆解与合并,执行过程需要消耗大量 CPU 分解任务,然后进行计算,因此特别适合执行 计算密集型任务

ExecutorService 线程池

而 ExecutorService 线程池是传统方式的线程池,使用池化方式管理线程,提前将若干个可执行的线程放入池中,当我们需要时就从池中获取线程执行任务,执行过程不会对任务进行拆解,并且使用完毕后不需要销毁线程而是放回池中,方便下次再使用,从而减少创建和销毁线程的性能开销。所以,这种线程池更适合执行过程中需要进行大量 IO 操作的任务,即适合 IO 密集型任务

3.5 线程线程大小配置推荐

一般情况下,业务上使用的线程池都会设置线程池的线程数量大小,这个线程池线程数量大小设置为多少是一个值得考虑的问题。设置线程池过多会造成线程浪费,过少会造成处理任务的线程过少,造成任务堆积。所以,这个配置线程数量多大并不容易把控。最优的就是配置不同的线程数进行测试,然后判断应用设置线程池大小为多大性能最优。

这里可以参考网上的一套万能的推荐配置,跟操作系统的 CPU 数量相关,如下:

  • CPU 密集型任务: N + 1
  • IO 密集型任务: 2N + 1

还有一种针对 IO 密集型任务设置估算公式,按公式进行配置这种性能最优,公式如下:

  • 估算公式: = (线程等待时间 + 线程 CPU 时间 / 线程 CPU 时间) * CPU 核心数

如果是既有 IO 操作的步骤,也有比较消耗 CPU 的步骤,这种混合型任务可以进行拆分,将不同的任务使用不同的线程池。

四、CompletableFuture 任务开启方法示例

4.1 runAsync

方法描述

方法 有返回值 描述
runAsync        x        从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。

任务要求

使用异步的方式,模拟获取远程信息,然后将获取的远程信息输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2
 3public class RunAsyncExample {
 4
 5    public static void runAsyncExample() {
 6        // 调用 runAsync 方法,异步执行 runnable 中的代码逻辑,模拟获取远程信息
 7        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("模拟获取远程信息并输出到控制台"));
 8
 9        // 调用 join 方法进行等待,获取执行结果
10        cf.join();
11    }
12
13    public static void main(String[] args) {
14        runAsyncExample();
15    }
16
17}

4.2 supplyAsync

方法描述

方法 有返回值 描述
supplyAsync        √        从 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,返回指定类型的返回值。

任务要求

使用异步的方式,模拟获取远程信息,然后将获取的信息作为任务返回结果,之后将任务返回的结果输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2
 3public class SupplyAsyncExample {
 4
 5    public static void supplyAsyncExample() {
 6        // 调用 supplyAsync 方法,异步执行 runnable 中的代码逻辑,模拟获取远程信息,然后返回
 7        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
 8            System.out.println("模拟获取远程信息");
 9            return "远程信息";
10        });
11
12        // 调用 join 方法进行等待,获取执行结果
13        String result = cf.join();
14        System.out.println(result);
15    }
16
17    public static void main(String[] args) {
18        supplyAsyncExample();
19    }
20
21}

五、CompletableFuture 任务处理-串行任务方法示例

5.1 thenRun 和 thenRunAsync

方法描述

方法 有返回值 描述
thenRun        x        使用主线程或者执行上一步任务的子线程,串行执行任务。并且该任务方法执行结束后,没有返回值。
thenRunAsync        x        串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。

任务要求

使用 CompletableFuture 工具,按照代码顺序执行任务,处理使用 "," 拼接的日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,先通过 , 拆分字符串,然后存入原子引用类型对象包裹的 List 集合中;
  • 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;

任务执行完成后,将全部日期对象输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenRun

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.ArrayList;
 4import java.util.Arrays;
 5import java.util.List;
 6import java.util.concurrent.CompletableFuture;
 7import java.util.concurrent.atomic.AtomicReference;
 8
 9public class ThenRunExample {
10    
11    public static void thenRunExample() {
12        // 模拟的远程获取的日期字符串
13        String dateStr = "1995-05-10,2000-03-15";
14        // 原子引用类对象
15        AtomicReference<List<String>> dateStrList = new AtomicReference<>();
16        // LocalDate集合
17        List<LocalDate> dateList = new ArrayList<>();
18
19        // 执行 CompletableFuture 任务
20        // (1) 第一步,先通过 "," 拆分字符串,然后存入原子引用对象包裹的 List 集合中;
21        // (2) 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;
22        CompletableFuture
23                .runAsync(() -> dateStrList.set(Arrays.asList(dateStr.split(","))))
24                .thenRun(() -> dateStrList.get().forEach(v -> dateList.add(LocalDate.parse(v, DateTimeFormatter.ofPattern("yyyy-MM-dd")))))
25                .join();
26
27        // 输出转换结果
28        for (LocalDate localDate : dateList) {
29            System.out.println(localDate.toString());
30        }
31    }
32
33    public static void main(String[] args) {
34        thenRunExample();
35    }
36
37}

thenRunAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.ArrayList;
 4import java.util.Arrays;
 5import java.util.List;
 6import java.util.concurrent.CompletableFuture;
 7import java.util.concurrent.atomic.AtomicReference;
 8
 9public class ThenRunAsyncExample {
10
11    public static void thenRunAsyncExample() {
12        // 模拟的远程获取的日期字符粗
13        String dateStr = "1995-05-10,2000-03-15";
14        // 原子引用类对象
15        AtomicReference<List<String>> dateStrList = new AtomicReference<>();
16        // LocalDate集合
17        List<LocalDate> dateList = new ArrayList<>();
18
19        // 执行 CompletableFuture 任务
20        // (1) 第一步,先通过 "," 拆分字符串,然后存入原子引用对象包裹的 List 集合中;
21        // (2) 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;
22        CompletableFuture
23                .runAsync(() -> dateStrList.set(Arrays.asList(dateStr.split(","))))
24                .thenRunAsync(() -> dateStrList.get().forEach(v -> dateList.add(LocalDate.parse(v, DateTimeFormatter.ofPattern("yyyy-MM-dd")))))
25                .join();
26
27        // 输出转换结果
28        for (LocalDate localDate : dateList) {
29            System.out.println(localDate.toString());
30        }
31    }
32
33    public static void main(String[] args) {
34        thenRunAsyncExample();
35    }
36
37}

5.2 thenApply 和 thenApplyAsync

方法描述

方法 有返回值 描述
thenApply        √        使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenApplyAsync        √        串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

任务要求

使用 CompletableFuture 按照顺序执行任务,获取远程日期字符串,然后对日期进行逻辑计算,并返回计算后的结果。

  • 第一步,先模拟通过远程结果获取日期字符串;
  • 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
  • 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenApply

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.concurrent.CompletableFuture;
 4
 5public class ThenApplyExample {
 6
 7    public static void thenApplyExample() {
 8        // 执行 CompletableFuture 串行任务
 9        CompletableFuture<String> cf = CompletableFuture
10                // 第一步,先模拟通过远程结果获取日期字符串;
11                .supplyAsync(() -> "2022-06-01")
12                // 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
13                .thenApply(param -> LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd")))
14                // 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
15                .thenApply(param -> param.plusDays(10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
16
17        // 调用 join 方法进入堵塞状态,直至获取任务执行结果
18        String result = cf.join();
19        System.out.println(result);
20    }
21
22    public static void main(String[] args) {
23        thenApplyExample();
24    }
25
26}

thenApplyAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.concurrent.CompletableFuture;
 4
 5public class ThenApplyAsyncExample {
 6
 7    public static void thenApplyAsyncExample() {
 8        // 执行 CompletableFuture 串行任务
 9        CompletableFuture<String> cf = CompletableFuture
10                // 第一步,先模拟通过远程结果获取日期字符串;
11                .supplyAsync(() -> "2022-06-01")
12                // 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
13                .thenApplyAsync(param -> LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd")))
14                // 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
15                .thenApplyAsync(param -> param.plusDays(10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
16
17        // 调用 join 方法进入堵塞状态,直至获取任务执行结果
18        String result = cf.join();
19        System.out.println(result);
20    }
21
22    public static void main(String[] args) {
23        thenApplyAsyncExample();
24    }
25
26}

5.3 thenAccept 和 thenAcceptAsync

方法描述

方法 有返回值 描述
thenAccept        x        使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。
thenAcceptAsync        x        串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。

任务要求

使用 CompletableFuture 按照顺序执行任务,处理使用 "," 拼接的日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,先通过 , 拆分字符串,转换为字符串数组,返回字符串数组;
  • 第二步,接收上一步中的日期字符串数组,遍历日期字符串数组,然后将数组中的日期字符串都转换为 LocalDate 日期对象;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenAccept

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.List;
 4import java.util.concurrent.CompletableFuture;
 5import java.util.concurrent.CopyOnWriteArrayList;
 6
 7public class ThenAcceptExample {
 8
 9    public static void thenAcceptExample() {
10        // 模拟的远程获取的日期字符粗
11        String dateStr = "1995-05-10,2000-03-15";
12        // 存储 LocalDate 的集合
13        List<LocalDate> dateList = new CopyOnWriteArrayList<>();
14
15        // 执行 CompletableFuture 任务
16        CompletableFuture
17                .supplyAsync(() -> dateStr.split(","))
18                .thenAccept(dateArray -> {
19                    for (String s : dateArray) {
20                        dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
21                    }
22                })
23                .join();
24
25        // 输出转换结果
26        for (LocalDate localDate : dateList) {
27            System.out.println(localDate.toString());
28        }
29    }
30
31    public static void main(String[] args) {
32        thenAcceptExample();
33    }
34
35}

thenAcceptAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.List;
 4import java.util.concurrent.CompletableFuture;
 5import java.util.concurrent.CopyOnWriteArrayList;
 6
 7public class ThenAcceptAsyncExample {
 8
 9    public static void thenAcceptAsyncExample() {
10        // 模拟的远程获取的日期字符粗
11        String dateStr = "1995-05-10,2000-03-15";
12        // 存储 LocalDate 的集合
13        List<LocalDate> dateList = new CopyOnWriteArrayList<>();
14
15        // 执行 CompletableFuture 任务
16        CompletableFuture
17                .supplyAsync(() -> dateStr.split(","))
18                .thenAcceptAsync(dateArray -> {
19                    for (String s : dateArray) {
20                        dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
21                    }
22                })
23                .join();
24
25        // 输出转换结果
26        for (LocalDate localDate : dateList) {
27            System.out.println(localDate.toString());
28        }
29    }
30
31    public static void main(String[] args) {
32        thenAcceptAsyncExample();
33    }
34
35}

5.4 handle 和 handleAsync

方法描述

方法 有返回值 描述
handle        √        使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
handleAsync        √        串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

任务要求

使用 CompletableFuture 按照顺序执行任务,从远程接口获取日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,获取远程日期字符串,然后返回;
  • 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

handle

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.Random;
 4import java.util.concurrent.CompletableFuture;
 5
 6public class HandleExample {
 7
 8    public static void handleExample() {
 9        // 执行 CompletableFuture 串行任务:
10        // 第一步,获取远程日期字符串,然后返回
11        // 第二步,接收上一步中的日期字符串,将其转换为 `LocalDate` 日期对象
12        CompletableFuture<LocalDate> cf = CompletableFuture
13                .supplyAsync(() -> {
14                    int random = new Random().nextInt(2);
15                    // 50% 概率发生异常
16                    if (random != 0) {
17                        throw new RuntimeException("模拟发生异常");
18                    }
19                    // 50% 概率返回正常值
20                    return "2022-06-01";
21                })
22                .handle((param, exception) -> {
23                    // 如果上一步结果为异常,则返回现在的日期,否则将上一步获取的日期字符串转换为日期对象
24                    if (exception != null) {
25                        return LocalDate.now();
26                    }
27                    return LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
28                });
29
30        // 调用 join 方法进入堵塞状态,直至获取任务执行结果
31        LocalDate result = cf.join();
32        System.out.println(result);
33    }
34
35    public static void main(String[] args) {
36        handleExample();
37    }
38
39}

handleAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.Random;
 4import java.util.concurrent.CompletableFuture;
 5
 6public class HandleAsyncExample {
 7
 8    public static void handleAsyncExample() {
 9        // 执行 CompletableFuture 串行任务:
10        // 第一步,获取远程日期字符串,然后返回;
11        // 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象;
12        CompletableFuture<LocalDate> cf = CompletableFuture
13                .supplyAsync(() -> {
14                    int random = new Random().nextInt(2);
15                    // 50% 概率发生异常
16                    if (random != 0) {
17                        throw new RuntimeException("模拟发生异常");
18                    }
19                    // 50% 概率返回正常值
20                    return "2022-06-01";
21                })
22                .handleAsync((param, exception) -> {
23                    // 如果上一步结果为异常,则返回现在的日期,否则将上一步获取的日期字符串转换为日期对象
24                    if (exception != null) {
25                        return LocalDate.now();
26                    }
27                    return LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
28                });
29
30        // 调用 join 方法进入堵塞状态,直至获取任务执行结果
31        LocalDate result = cf.join();
32        System.out.println(result);
33    }
34
35    public static void main(String[] args) {
36        handleAsyncExample();
37    }
38
39}

5.5 whenComplete 和 whenCompleteAsync

方法描述

方法 有返回值 描述
whenComplete        x        使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。
whenCompleteAsync        x        串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
  除此之外:
   如果上一阶段中正常执行结束,则该方法的结果参数不为 null;
   如果上一阶段中抛出异常,则该方法的异常参数不为 null;

任务要求

使用 CompletableFuture 按照顺序执行任务,从远程接口获取日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,获取远程日期字符串,然后返回;
  • 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象,并且加入到集合;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

whenComplete

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.concurrent.CopyOnWriteArrayList;
 4import java.util.List;
 5import java.util.Random;
 6import java.util.concurrent.CompletableFuture;
 7
 8public class WhenCompleteExample {
 9
10    public static void whenCompleteExample() {
11        // 模拟的远程获取的日期字符粗
12        String dateStr = "1995-05-10,2000-03-15";
13        // 存储 LocalDate 的集合
14        List<LocalDate> dateList = new CopyOnWriteArrayList<>();
15
16        // 执行 CompletableFuture 任务
17        CompletableFuture
18                .supplyAsync(() -> {
19                    int random = new Random().nextInt(2);
20                    if (random == 1){
21                        throw new RuntimeException("模拟发生异常");
22                    }
23                    return dateStr.split(",");
24                })
25                .whenComplete((dateArray, exception) -> {
26                    // 如果上一步执行过程没有发生异常,则将日期字符串转换为日期对象
27                    if (dateArray != null){
28                        for (String s : dateArray) {
29                            dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
30                        }
31                    }
32                })
33                .join();
34
35        // 输出转换结果
36        for (LocalDate localDate : dateList) {
37            System.out.println(localDate.toString());
38        }
39    }
40
41    public static void main(String[] args) {
42        whenCompleteExample();
43    }
44
45}

whenCompleteAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.CopyOnWriteArrayList;
 4import java.util.List;
 5import java.util.Random;
 6import java.util.concurrent.CompletableFuture;
 7
 8public class WhenCompleteAsyncExample {
 9
10    public static void whenCompleteAsyncExample(){
11        // 模拟的远程获取的日期字符粗
12        String dateStr = "1995-05-10,2000-03-15";
13        // 存储 LocalDate 的集合
14        List<LocalDate> dateList = new CopyOnWriteArrayList<>();
15
16        // 执行 CompletableFuture 任务
17        CompletableFuture
18                .supplyAsync(() -> {
19                    int random = new Random().nextInt(2);
20                    if (random == 1){
21                        throw new RuntimeException("模拟发生异常");
22                    }
23                    return dateStr.split(",");
24                })
25                .whenCompleteAsync((dateArray, exception) -> {
26                    // 如果上一步执行过程没有发生异常,则将日期字符串转换为日期对象
27                    if (dateArray != null){
28                        for (String s : dateArray) {
29                            dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
30                        }
31                    }
32                })
33                .join();
34
35        // 输出转换结果
36        for (LocalDate localDate : dateList) {
37            System.out.println(localDate.toString());
38        }
39    }
40
41    public static void main(String[] args) {
42        whenCompleteAsyncExample();
43    }
44
45}

5.6 thenCompose 和 thenComposeAsync

方法描述

方法 有返回值 描述
thenCompose        √        使用主线程或者执行上一步任务的线程,串行执行任务。按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。
thenComposeAsync        √        串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。

任务要求

使用 CompletableFuture 按照顺序执行任务,获取远程日期字符串,然后对日期进行逻辑计算,并返回计算后的结果。

  • 第一步,先模拟通过远程结果获取日期字符串;
  • 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
  • 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenCompose

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.concurrent.CompletableFuture;
 4
 5public class ThenComposeExample {
 6
 7    public static void thenComposeExample() {
 8        // 执行 CompletableFuture 任务
 9        CompletableFuture<String> cf2 = CompletableFuture
10                // 模拟获取远程接口获取创建时间,然后得到结果 "2022-06-01"
11                .supplyAsync(() -> "2022-06-01")
12                // 实现将两个有依赖关系的任务组合在一起,一个用于转换字符串为日期类型,一个用于计算并转换为日期字符串
13                .thenCompose(param -> {
14                    // 将时间字符串转换为 LocalDate 日期类型
15                    LocalDate localDate = LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
16                    // 最后创建一个新的任务,任务中将 LocalDate 日期+10天,并转换回字符串返回
17                    return CompletableFuture.supplyAsync(() -> localDate
18                            .plusDays(10)
19                            .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
20                });
21
22        // 进入堵塞状态,等待这些阶段执行完成
23        String result = cf2.join();
24        System.out.println("计算结果:" + result);
25    }
26
27    public static void main(String[] args) {
28        thenComposeExample();
29    }
30
31}

thenComposeAsync

 1import java.time.LocalDate;
 2import java.time.format.DateTimeFormatter;
 3import java.util.concurrent.CompletableFuture;
 4
 5public class ThenComposeAsyncExample {
 6
 7    public static void thenComposeAsyncExample() {
 8        // 执行 CompletableFuture 任务
 9        CompletableFuture<String> cf2 = CompletableFuture
10                // 模拟获取远程接口获取创建时间,然后得到结果 "2022-06-01"
11                .supplyAsync(() -> "2022-06-01")
12                // 实现将两个有依赖关系的任务组合在一起,一个用于转换字符串为日期类型,一个用于计算并转换为日期字符串
13                .thenComposeAsync(param -> {
14                    // 将时间字符串转换为 LocalDate 日期类型
15                    LocalDate localDate = LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
16                    // 最后创建一个新的任务,任务中将 LocalDate 日期+10天,并转换回字符串返回
17                    return CompletableFuture.supplyAsync(() -> localDate
18                            .plusDays(10)
19                            .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
20                });
21
22        // 进入堵塞状态,等待这些阶段执行完成
23        String result = cf2.join();
24        System.out.println("计算结果:" + result);
25    }
26
27    public static void main(String[] args) {
28        thenComposeAsyncExample();
29    }
30
31}

六、CompletableFuture 任务处理-并行任务方法示例

6.1 thenCombine 和 thenCombineAsync

方法描述

方法 有返回值 描述
thenCombine        √        并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenCombineAsync        √        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。

不过这里需要注意,如果上一个阶段执为异常完成,则不会执行组合的下一个阶段,直接抛出异常信息。

任务要求

使用 CompletableFuture 并行执行任务,模拟获取远程端口的【姓名】和【岁数】:

  • 任务一,调用远程接口,获取姓名;
  • 任务二,调用远程接口,获取岁数;

等到任务一和任务二都执行完成后,将任务一和任务二中获取的【姓名】和【岁数】拼在一个字符串中,然后返回;

执行流程

任务执行流程如下图所示:

示例代码

thenCombine

 1import java.util.concurrent.CompletableFuture;
 2
 3public class ThenCombineExample {
 4
 5    public static void thenCombineExample() {
 6        // 模拟调用远程接口获取【姓名】
 7        CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
 8        // 模拟调用远程接口获取【岁数】
 9        CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
10
11        // 执行 CompletableFuture 任务
12        // 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
13        // 然后将获取的【姓名】和【岁数】拼在一起,之后返回新字符串
14        CompletableFuture<String> cf = nameCf.thenCombine(ageCf, (name, age) -> "姓名:" + name + ", 岁数:" + age);
15
16        // 进入堵塞状态,等待这些阶段执行完成后获取结果
17        String info = cf.join();
18        System.out.println(info);
19    }
20
21    public static void main(String[] args) {
22        thenCombineExample();
23    }
24
25}

thenCombineAsync

 1import java.util.concurrent.CompletableFuture;
 2
 3public class ThenCombineAsyncExample {
 4
 5    public static void thenCombineAsyncExample() {
 6        // 模拟调用远程接口获取【姓名】
 7        CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
 8        // 模拟调用远程接口获取【岁数】
 9        CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
10
11        // 执行 CompletableFuture 任务
12        // 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
13        // 然后将获取的【姓名】和【岁数】拼在一起,之后返回新字符串
14        CompletableFuture<String> cf = nameCf.thenCombineAsync(ageCf, (name, age) -> "姓名:" + name + ", 岁数:" + age);
15
16        // 进入堵塞状态,等待这些阶段执行完成后获取结果
17        String info = cf.join();
18        System.out.println(info);
19    }
20
21    public static void main(String[] args) {
22        thenCombineAsyncExample();
23    }
24
25}

6.2 thenAcceptBoth 和 thenAcceptBothAsync

方法描述

方法 有返回值 描述
thenAcceptBoth        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
thenAcceptBothAsync        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,模拟获取远程端口的【姓名】和【岁数】:

  • 任务一,调用远程接口,获取姓名;
  • 任务二,调用远程接口,获取岁数;

等到任务一和任务二都执行完成后,将任务一和任务二中获取的【姓名】和【岁数】拼在一个字符串中,然后输出到控制台;

执行流程

任务执行流程如下图所示:

示例代码

thenAcceptBoth

 1import java.util.concurrent.CompletableFuture;
 2
 3public class ThenAcceptBothExample {
 4
 5    public static void thenAcceptBothExample(){
 6        // 模拟调用远程接口获取【姓名】
 7        CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
 8        // 模拟调用远程接口获取【岁数】
 9        CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
10
11        // 执行 CompletableFuture 任务
12        // 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
13        // 然后将获取的【姓名】和【岁数】输出
14        CompletableFuture<Void> cf = nameCf.thenAcceptBoth(ageCf,
15                (name, age) -> System.out.println("姓名:" + name + ", 岁数:" + age));
16
17        // 进入堵塞状态等待各阶段执行完成
18        cf.join();
19    }
20
21    public static void main(String[] args) {
22        thenAcceptBothExample();
23    }
24
25}

thenAcceptBothAsync

 1import java.util.concurrent.CompletableFuture;
 2
 3public class ThenAcceptBothAsyncExample {
 4
 5    public static void thenAcceptBothAsyncExample() {
 6        // 模拟调用远程接口获取【姓名】
 7        CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
 8        // 模拟调用远程接口获取【岁数】
 9        CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
10
11        // 执行 CompletableFuture 任务
12        // 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
13        // 然后将获取的【姓名】和【岁数】输出
14        CompletableFuture<Void> cf = nameCf.thenAcceptBothAsync(ageCf,
15                (name, age) -> System.out.println("姓名:" + name + ", 岁数:" + age));
16
17        // 进入堵塞状态等待各阶段执行完成
18        cf.join();
19    }
20
21    public static void main(String[] args) {
22        thenAcceptBothAsyncExample();
23    }
24
25}

6.3 runAfterBoth 和 runAfterBothAsync

方法描述

方法 有返回值 描述
thenAcceptBoth        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。
thenAcceptBothAsync        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,执行步骤一和步骤二:

  • 任务一,执行步骤一;
  • 任务二,执行步骤二;

等到步骤一和步骤二都执行完成后,就执行最终步骤。

执行流程

任务执行流程如下图所示:

示例代码

runAfterBoth

 1import java.util.concurrent.CompletableFuture;
 2
 3public class RunAfterBothExample {
 4
 5    public static void runAfterBothExample() {
 6        // CompletableFuture 任务 - 步骤1
 7        CompletableFuture<Void> step1 = CompletableFuture.runAsync(() -> System.out.println("阶段1"));
 8        // CompletableFuture 任务 - 步骤2
 9        CompletableFuture<Void> step2 = CompletableFuture.runAsync(() -> System.out.println("阶段2"));
10
11        // 当【阶段1】和【阶段2】并行执行完成后,则执行特定任务
12        step1.runAfterBoth(step2, ()-> System.out.println("全部阶段完成,执行特定任务"));
13    }
14
15    public static void main(String[] args) {
16        runAfterBothExample();
17    }
18
19}

runAfterBothAsync

 1import java.util.concurrent.CompletableFuture;
 2
 3public class RunAfterBothAsyncExample {
 4
 5    public static void runAfterBothAsyncExample() {
 6        // CompletableFuture 任务 - 步骤1
 7        CompletableFuture<Void> step1 = CompletableFuture.runAsync(() -> System.out.println("阶段1"));
 8        // CompletableFuture 任务 - 步骤2
 9        CompletableFuture<Void> step2 = CompletableFuture.runAsync(() -> System.out.println("阶段2"));
10
11        // 当【阶段1】和【阶段2】并行执行完成后,则执行特定任务
12        step1.runAfterBothAsync(step2, ()-> System.out.println("全部阶段完成,执行特定任务"));
13    }
14
15    public static void main(String[] args) {
16        runAfterBothAsyncExample();
17    }
18
19}

6.4 applyToEither 和 applyToEitherAsync

方法描述

方法 有返回值 描述
applyToEither        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
applyToEitherAsync        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:

  • 任务一,从通道1中获取信息;
  • 任务二,从通道2中获取信息;
  • 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就处理先执行完成任务中返回的结果;

几个任务执行完成后,将任务返回结果输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

applyToEither

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3
 4public class ApplyToEitherExample {
 5
 6    public static void applyToEitherExample(){
 7        // 通道1 (模拟获取远程信息)
 8        CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
 9            randomSleep(1000);
10            return "阶段1-从通道1成功获取结果";
11        });
12        // 通道2 (模拟获取远程信息)
13        CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
14            randomSleep(1000);
15            return "阶段2-从通道2成功获取结果";
16        });
17
18        // 执行 CompletableFuture 任务
19        // 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
20        // 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
21        // 务要执行的参数,然后执行下一步操作,执行完成后返回指定值。
22        CompletableFuture<String> channelTask = channel1.applyToEither(channel2, response -> "获取的结果: " + response);
23
24        // 进入堵塞状态,等待这些阶段执行完成后获取结果并输出
25        String result = channelTask.join();
26        System.out.println(result);
27    }
28
29    /**
30     * 指定规定时间内睡眠
31     * @param time 随机睡眠时间
32     */
33    private static void randomSleep(int time) {
34        try {
35            Thread.sleep(new Random().nextInt(time));
36        } catch (InterruptedException e) {
37        }
38    }
39
40    public static void main(String[] args) {
41        applyToEitherExample();
42    }
43
44}

applyToEitherAsync

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3
 4public class ApplyToEitherAsyncExample {
 5
 6    public static void applyToEitherAsyncExample1(){
 7        // 通道1 (模拟获取远程信息)
 8        CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
 9            randomSleep(1000);
10            return "阶段1-从通道1成功获取结果";
11        });
12        // 通道2 (模拟获取远程信息)
13        CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
14            randomSleep(1000);
15            return "阶段2-从通道2成功获取结果";
16        });
17
18        // 执行 CompletableFuture 任务
19        // 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
20        // 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
21        // 务要执行的参数,然后执行下一步操作,执行完成后返回指定值。
22        CompletableFuture<String> channelTask = channel1.applyToEitherAsync(channel2, response -> "获取的结果: " + response);
23
24        // 进入堵塞状态,等待这些阶段执行完成后获取结果并输出
25        String result = channelTask.join();
26        System.out.println(result);
27    }
28
29    /**
30     * 指定规定时间内睡眠
31     * @param time 随机睡眠时间
32     */
33    private static void randomSleep(int time){
34        try {
35            int randomNumber = new Random().nextInt(time);
36            Thread.sleep(randomNumber);
37        } catch (InterruptedException e) {
38        }
39    }
40
41    public static void main(String[] args) {
42        applyToEitherAsyncExample1();
43    }
44
45}

6.5 runAfterEither 和 runAfterEitherAsync

方法描述

方法 有返回值 描述
runAfterEither        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
runAfterEitherAsync        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:

  • 任务一,执行步骤一;
  • 任务二,执行步骤二;
  • 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就执行特定动作。

执行流程

任务执行流程如下图所示:

示例代码

runAfterEither

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3
 4public class RunAfterEitherExample {
 5
 6    public static void runAfterEitherExample(){
 7        // 通道1 (模拟向远程通道1写入数据)
 8        CompletableFuture<Void> channel1 = CompletableFuture.runAsync(() -> {
 9            randomSleep(1000);
10            System.out.println("阶段1-执行完成");
11        });
12        // 通道2 (模拟向远程通道2写入数据)
13        CompletableFuture<Void> channel2 = CompletableFuture.runAsync(() -> {
14            randomSleep(1000);
15            System.out.println("阶段2-执行完成");
16        });
17
18        // 执行 CompletableFuture 任务
19        // 并行执行【任务1】和【任务2】,无论哪个任务先执行完成,直接进行下一步任务,执行特定动作
20        channel1.runAfterEither(channel2, () -> System.out.println("任意任务执行完成-执行特定操作")).join();
21    }
22
23    /**
24     * 指定规定时间内睡眠
25     *
26     * @param time 随机睡眠时间
27     */
28    private static void randomSleep(int time) {
29        try {
30            Thread.sleep(new Random().nextInt(time));
31        } catch (InterruptedException e) {
32        }
33    }
34
35    public static void main(String[] args) {
36        runAfterEitherExample();
37    }
38
39}

runAfterEitherAsync

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3
 4public class RunAfterEitherAsyncExample {
 5
 6    public static void runAfterEitherAsyncExample(){
 7        // 通道1 (模拟向远程通道1写入数据)
 8        CompletableFuture<Void> channel1 = CompletableFuture.runAsync(() -> {
 9            randomSleep(1000);
10            System.out.println("阶段1-执行完成");
11        });
12        // 通道2 (模拟向远程通道2写入数据)
13        CompletableFuture<Void> channel2 = CompletableFuture.runAsync(() -> {
14            randomSleep(1000);
15            System.out.println("阶段2-执行完成");
16        });
17
18        // 执行 CompletableFuture 任务
19        // 并行执行【任务1】和【任务2】,无论哪个任务先执行完成,直接进行下一步任务,执行特定动作
20        channel1.runAfterEitherAsync(channel2, () -> System.out.println("任意任务执行完成-执行特定操作")).join();
21    }
22
23
24    /**
25     * 指定规定时间内睡眠
26     *
27     * @param time 随机睡眠时间
28     */
29    private static void randomSleep(int time) {
30        try {
31            Thread.sleep(new Random().nextInt(time));
32        } catch (InterruptedException e) {
33        }
34    }
35
36    public static void main(String[] args) {
37        runAfterEitherAsyncExample();
38    }
39
40}

6.6 acceptEither 和 acceptEitherAsync

方法描述

方法 有返回值 描述
AcceptEither        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
AcceptEitherAsync        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:

  • 任务一,从通道1中获取信息;
  • 任务二,从通道2中获取信息;
  • 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就将先执行完成任务中返回的结果输出到控制台;

执行流程

任务执行流程如下图所示:

示例代码

AcceptEither

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3
 4public class AcceptEitherExample {
 5
 6    public static void acceptEitherExample() {
 7        // 通道1 (模拟获取远程信息)
 8        CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
 9            randomSleep(1000);
10            return "阶段1-成功获取结果";
11        });
12        // 通道2 (模拟获取远程信息)
13        CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
14            randomSleep(1000);
15            return "阶段2-成功获取结果";
16        });
17
18        // 执行 CompletableFuture 任务
19        // 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
20        // 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
21        // 务要执行的参数,然后执行下一步操作,执行完成后没有返回值。
22        channel1.acceptEither(channel2, result -> System.out.println("获取的结果: " + result)).join();
23    }
24
25    /**
26     * 指定规定时间内睡眠
27     *
28     * @param time 随机睡眠时间
29     */
30    private static void randomSleep(int time) {
31        try {
32            Thread.sleep(new Random().nextInt(time));
33        } catch (InterruptedException e) {
34        }
35    }
36
37    public static void main(String[] args) {
38        acceptEitherExample();
39    }
40
41}

AcceptEitherAsync

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3
 4public class AcceptEitherAsyncExample {
 5
 6    public static void acceptEitherAsyncExample(){
 7        // 通道1 (模拟获取远程信息)
 8        CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
 9            randomSleep(1000);
10            return "阶段1-成功获取结果";
11        });
12        // 通道2 (模拟获取远程信息)
13        CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
14            randomSleep(1000);
15            return "阶段2-成功获取结果";
16        });
17
18        // 执行 CompletableFuture 任务
19        // 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
20        // 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
21        // 务要执行的参数,然后执行下一步操作,执行完成后没有返回值。
22        channel1.acceptEitherAsync(channel2, result -> System.out.println("获取的结果: " + result)).join();
23    }
24    
25    /**
26     * 指定规定时间内睡眠
27     *
28     * @param time 随机睡眠时间
29     */
30    private static void randomSleep(int time){
31        try {
32            Thread.sleep(new Random().nextInt(time));
33        } catch (InterruptedException e) {
34        }
35    }
36
37    public static void main(String[] args) {
38        acceptEitherAsyncExample();
39    }
40
41}

6.7 allOf

方法描述

方法 有返回值 描述
allOf        x        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。

不过这里需要注意: allOf 方法执行时,如果传入的 CompletableFuture<?> 中的其中一个阶段异常完成时,那么返回的 CompletableFuture<Void> 也异常完成,并将此异常作为异常原因。

任务要求

并行执行 CompletableFuture 任务,模拟从三个接口获取用户相关信息:

  • 任务一,调用接口1获取用户基本信息;
  • 任务二,调用接口2获取用户头像;
  • 任务三,调用接口3获取用用户余额;

等待三个任务并行执行完成,然后将获取的信息聚合在 Map 集合中。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.Map;
 2import java.util.concurrent.CompletableFuture;
 3import java.util.concurrent.ConcurrentHashMap;
 4
 5public class AllOfExample {
 6
 7    public static void allOfExample() {
 8        // 创建聚合数据的 Map 集合
 9        Map<String, String> userMap = new ConcurrentHashMap<>(3);
10
11        // 创建待执行的 Runnable 参数
12        Runnable runnable1 = () -> {
13            System.out.println("任务1-成功获取用户基本信息");
14            userMap.put("userInfo", "{name: mydlq, age: 18}");
15        };
16        Runnable runnable2 = () -> {
17            System.out.println("任务2-成功获取用户头像");
18            userMap.put("avatar", "http://www.xxx.com/avatar");
19        };
20        Runnable runnable3 = () -> {
21            System.out.println("任务3-成功获取用户余额");
22            userMap.put("balance", "1000");
23        };
24
25        // 执行多个 CompletableFuture,需要等待全部完成
26        CompletableFuture<Void> cf = CompletableFuture.allOf(
27                CompletableFuture.runAsync(runnable1),
28                CompletableFuture.runAsync(runnable2),
29                CompletableFuture.runAsync(runnable3)
30        );
31
32        // 进入堵塞状态,等待执行完成
33        cf.join();
34
35        // 输出用户信息
36        System.out.println("获取的用户信息:");
37        for (Map.Entry<String, String> entry : userMap.entrySet()) {
38            System.out.println(entry.getKey() + ": " + entry.getValue());
39        }
40    }
41
42    public static void main(String[] args) {
43        allOfExample();
44    }
45
46}

6.8 anyOf

方法描述

方法 有返回值 描述
anyOf        √        并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。

不过这里需要注意: anyOf 方法执行时,如果传入的全部 CompletableFuture<?> 阶段都没有完成前,任意一个阶段执行过程出现异常并没有处理,也就是说该阶段执行过程异常完成,那么返回的 CompletableFuture<Object> 也异常完成,并将此异常作为异常原因。

任务要求

并行执行 CompletableFuture 任务,模拟从三个接口中获取相同的信息,只要有任意一个接口先行返回信息,就直接执行下一步骤,而无需等待。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3import java.util.function.Supplier;
 4
 5public class AnyOfExample {
 6
 7    public static void anyOfExample() {
 8        Supplier<String> supplier1 = () -> {
 9            System.out.println("通道1");
10            return "通道1-成功获取信息";
11        };
12        Supplier<String> supplier2 = () -> {
13            System.out.println("通道2");
14            return "通道2-成功获取信息";
15        };
16        Supplier<String> supplier3 = () -> {
17            System.out.println("通道3");
18            return "通道3-成功获取信息";
19        };
20
21        // 执行多个 CompletableFuture,只要任意一个执行完成就执行下一步
22        CompletableFuture<Object> cf = CompletableFuture.anyOf(
23                CompletableFuture.supplyAsync(supplier1),
24                CompletableFuture.supplyAsync(supplier2),
25                CompletableFuture.supplyAsync(supplier3)
26        );
27
28        // 进入堵塞状态,等待执行完成,输出获取的信息
29        Object result = cf.join();
30        System.out.println(result);
31    }
32
33    /**
34     * 随机睡眠指定时间
35     *
36     * @param time 睡眠时间
37     */
38    public static void randomTimeSleep(int time){
39        try {
40            Thread.sleep(new Random().nextInt(time));
41        } catch (InterruptedException e) {
42            throw new RuntimeException(e);
43        }
44    }
45
46    public static void main(String[] args) {
47        anyOfExample();
48    }
49
50}

七、任务结束方法

7.1 get

方法描述

方法 有返回值 描述
get        √        获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。

任务要求

执行 CompletableFuture 任务,然后调用 get 方法进行等待,获取执行结果。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2import java.util.concurrent.ExecutionException;
 3
 4public class GetExample {
 5
 6    public static void getExample() throws ExecutionException, InterruptedException {
 7        // 执行 CompletableFuture 任务
 8        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "执行结果");
 9
10        // 调用 get 方法进行等待,获取执行结果
11        cf.get();
12    }
13
14    public static void main(String[] args) throws ExecutionException, InterruptedException {
15        getExample();
16    }
17
18}

7.2 join

方法描述

方法 有返回值 描述
join        √        获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。

任务要求

执行 CompletableFuture 任务,然后调用 join 方法进行等待,获取执行结果。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2
 3public class JoinExample {
 4
 5    public static void joinExample() {
 6        // 执行 CompletableFuture 任务
 7        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "执行结果");
 8
 9        // 调用 join 方法进行等待,获取执行结果
10        cf.join();
11    }
12
13    public static void main(String[] args) {
14        joinExample();
15    }
16
17}

7.3 getNow

方法描述

方法 有返回值 描述
getNow        √        立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。

任务要求

执行 CompletableFuture 任务,然后调用 getNow 方法获取任务执行结果,如果任务没有执行完成则返回默认值。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3
 4public class GetNowExample {
 5
 6    public static void getNowExample() {
 7        // 执行 CompletableFuture 任务
 8        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
 9            // 睡眠5毫秒
10            sleep(5);
11            return "示例-执行完成";
12        });
13
14        // 随机睡眠1~10毫秒
15        sleep(new Random().nextInt(10));
16
17        // 调用 getNow 方法获取执行结果,如果任务未执行完成则输出设置的默认值
18        String result = cf.getNow("默认值");
19        System.out.println(result);
20    }
21
22    /**
23     * 线程睡眠
24     *
25     * @param millis 睡眠时间(单位:毫秒)
26     */
27    public static void sleep(long millis){
28        try {
29            Thread.sleep(millis);
30        } catch (InterruptedException e) {
31            e.printStackTrace();
32        }
33    }
34
35    public static void main(String[] args) {
36        getNowExample();
37    }
38
39}

7.4 cancel

方法描述

方法 有返回值 描述
cancel        √        取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。

任务要求

执行 CompletableFuture 任务,然后调用 cancel 方法取消任务。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.Random;
 2import java.util.concurrent.CancellationException;
 3import java.util.concurrent.CompletableFuture;
 4import java.util.concurrent.ExecutionException;
 5
 6public class CancelExample {
 7
 8    public static void cancelExample() throws ExecutionException, InterruptedException {
 9        // 执行 CompletableFuture 任务
10        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
11            // 随机睡眠1~10毫秒
12            sleep(new Random().nextInt(10));
13            System.out.println("示例-执行任务完成");
14        });
15
16        // 随机睡眠1~10毫秒
17        sleep(new Random().nextInt(10));
18
19        // 调用 cancel 方法取消任务
20        boolean cancelResult = cf.cancel(true);
21        System.out.println("取消任务: " + cancelResult);
22
23        // 调用 get 方法获取执行结果,如果取消任务将抛出 CancellationException 异常,这里对该异常进行处理
24        try {
25            cf.get();
26        } catch (CancellationException e) {
27            System.out.println("获取任务失败,任务已经被取消");
28        }
29    }
30
31    /**
32     * 线程睡眠
33     *
34     * @param millis 睡眠时间(单位:毫秒)
35     */
36    public static void sleep(long millis){
37        try {
38            Thread.sleep(millis);
39        } catch (InterruptedException e) {
40            e.printStackTrace();
41        }
42    }
43
44    public static void main(String[] args) throws ExecutionException, InterruptedException {
45        cancelExample();
46    }
47
48}

八、CompletableFuture 查看任务状态方法示例

8.1 isDone

方法描述

查看任务是否执行完成,如果当前阶段执行完成 (无论是正常完成还是异常完成) 则返回 true,否则返回 false

方法 有返回值 描述
isDone        √        查看任务是否执行完成,如果当前阶段执行完成 (无论是正常完成还是异常完成) 则返回 true,否则返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 isDone 方法查看任务是否执行完成。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2
 3public class IsDoneExample {
 4
 5    public static void isDoneExample() throws InterruptedException {
 6        // 执行 CompletableFuture 任务
 7        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("任务执行中..."));
 8
 9        // 调用 isDone 方法查看任务是否执行完成
10        System.out.println("任务是否完成: " + cf.isDone());
11
12        // 等待1秒时间
13        Thread.sleep(1000L);
14
15        // 调用 isDone 方法再次查看任务是否执行完成
16        System.out.println("任务是否完成: " + cf.isDone());
17    }
18
19    public static void main(String[] args) throws InterruptedException {
20        isDoneExample();
21    }
22
23}

8.2 isCancelled

方法描述

方法 有返回值 描述
isCancelled        √        查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 cancel 方法取消任务,之后调用 isCancelled 方法观察任务是否成功取消。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2
 3public class IsCancelledExample {
 4
 5    public static void isCancelledExample(){
 6        // 执行 CompletableFuture 任务
 7        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("执行 CompletableFuture 任务"));
 8
 9        // 调用 cancel 方法取消任务
10        cf.cancel(true);
11
12        // 调用 isCancelled 方法,查询任务是否成功被取消
13        System.out.println("是否取消任务: " + cf.isCancelled());
14    }
15
16    public static void main(String[] args) {
17        isCancelledExample();
18    }
19
20}

8.3 isCompletedExceptionally

方法描述

方法 有返回值 描述
isCompletedExceptionally        √        查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 isCompletedExceptionally 方法查看任务是否异常执行完成。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2
 3public class IsCompletedExceptionallyExample {
 4
 5    public static void isCompletedExceptionallyExample() throws InterruptedException {
 6        // 执行 CompletableFuture 任务
 7        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
 8            System.out.println("执行中...");
 9            // 模拟发生异常
10            System.out.println(1/0);
11        });
12
13        // 等待1秒确保任务执行完成
14        Thread.sleep(1000L);
15        
16        // 调用 isCompletedExceptionally 方法验证当前阶段是否异常完成
17        System.out.println("是否异常完成: " + cf.isCompletedExceptionally());
18    }
19
20    public static void main(String[] args) throws InterruptedException {
21        isCompletedExceptionallyExample();
22    }
23
24}

九、CompletableFuture 设置任务结果方法示例

9.1 obtrudeValue

方法描述

方法 有返回值 描述
obtrudeValue        x        设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。

任务要求

执行 CompletableFuture 任务,然后调用 obtrudeValue 方法强制设置任务执行结果为指定值,无论当前任务是否执行成功/失败。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2import java.util.concurrent.ExecutionException;
 3
 4public class ObtrudeValueExample {
 5
 6    public static void obtrudeValueExample() throws ExecutionException, InterruptedException {
 7        // 执行 CompletableFuture 任务
 8        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
 9
10        // 设置或重置 get 方法和与其相关方法的返回的值
11        cf.obtrudeValue("示例-强制设置返回值-无论成功失败");
12
13        // 调用 get 方法进行等待,获取执行结果并输出到控制台
14        String result = cf.get();
15        System.out.println(result);
16    }
17
18    public static void main(String[] args) throws ExecutionException, InterruptedException {
19        obtrudeValueExample();
20    }
21
22}

9.2 obtrudeException

方法描述

方法 有返回值 描述
obtrudeException        x        设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。

任务要求

执行 CompletableFuture 任务,然后调用 obtrudeException 方法强制设置任务执行结果为指定异常,无论当前任务是否执行成功/失败。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2import java.util.concurrent.ExecutionException;
 3
 4public class ObtrudeExceptionExample {
 5
 6    public static void obtrudeExceptionExample() throws ExecutionException, InterruptedException {
 7        // 执行 CompletableFuture 任务
 8        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
 9
10        // 设置 get 方法和与其相关的方法后续执行抛出指定异常
11        cf.obtrudeException(new Exception("未知异常"));
12
13        // 调用 get 方法进行等待,获取执行结果并输出
14        String result = cf.get();
15        System.out.println(result);
16    }
17
18    public static void main(String[] args) throws ExecutionException, InterruptedException {
19        obtrudeExceptionExample();
20    }
21
22}

9.3 complete

方法描述

方法 有返回值 描述
complete        √        设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 complete 方法设置或重置任务执行结果,然后将最后任务执行结果输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2import java.util.concurrent.ExecutionException;
 3
 4public class CompleteExample {
 5
 6    public static void completeExample() throws ExecutionException, InterruptedException {
 7        // 执行 CompletableFuture 任务
 8        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
 9
10        // 设置或重置 get 方法和与其相关方法的返回的值,任务没有执行完成返回 true,否则返回 false
11        boolean setResult = cf.complete("示例-任务没有完成-设置返回值");
12        System.out.println("设置返回值为执行结果: " + setResult);
13
14        // 调用 get 方法进行等待,获取执行结果并输出
15        String result = cf.get();
16        System.out.println(result);
17    }
18
19    public static void main(String[] args) throws ExecutionException, InterruptedException {
20        completeExample();
21    }
22
23}

9.4 CompleteException

方法描述

方法 有返回值 描述
completeException        √        设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 complete 方法设置或重置任务执行结果为指定异常,然后将最后任务执行结果或异常输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.concurrent.CompletableFuture;
 2import java.util.concurrent.ExecutionException;
 3
 4public class CompleteExceptionallyExample {
 5
 6    public static void completeExceptionallyExample() throws ExecutionException, InterruptedException {
 7        // 执行 CompletableFuture 任务
 8        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
 9
10        // 设置或重置 get 方法和与其相关方法的返回的值,任务没有执行完成返回 true,否则返回 false
11        boolean setResult = cf.completeExceptionally(new Exception("未知异常"));
12        System.out.println("设置返回值为执行结果: " + setResult);
13
14        // 调用 get 方法进行等待,获取执行结果并输出
15        String result = cf.get();
16        System.out.println(result);
17    }
18
19    public static void main(String[] args) throws ExecutionException, InterruptedException {
20        completeExceptionallyExample();
21    }
22
23}

十、任务异常处理方法

10.1 exceptionally

方法描述

方法 有返回值 描述
exceptionally         x         判断上一个任务执行时是否发生异常,如果是则就会执行 exceptionally 方法,并且将上一步异常作为当前方法的参数,然后对异常进行处理。当然,如果上一阶段执行过程中没有出现异常,则不会执行 exceptionally 方法。

任务要求

执行 CompletableFuture 任务,并且使用 exceptionally 方法:

  • 如果 exceptionally 方法的上一阶段执行过程中出现异常,则会执行 exceptionally 阶段;
  • 如果 exceptionally 方法的上一阶段执行过程中没有出现异常,则不会执行 exceptionally 阶段;

执行流程

任务执行流程如下图所示:

示例代码

 1import java.util.Random;
 2import java.util.concurrent.CompletableFuture;
 3import java.util.concurrent.ExecutionException;
 4
 5public class ExceptionallyExample {
 6
 7    public static void exceptionallyExample() throws ExecutionException, InterruptedException {
 8        // 执行 CompletableFuture 串行任务,并且使用 exceptionally 方法:
 9        // - 如果 exceptionally 方法的上一阶段执行过程中出现异常,则会执行 exceptionally 阶段;
10        // - 如果 exceptionally 方法的上一阶段执行过程中没有出现异常,则不会执行 exceptionally 阶段;
11        CompletableFuture<String> cf = CompletableFuture
12                // 执行任务,50%概率发生异常,50%概率返回正常值
13                .supplyAsync(() -> {
14                    if (new Random().nextInt(2) != 0) {
15                        throw new RuntimeException("模拟发生异常");
16                    }
17                    return "正常结束";
18                })
19                // 处理上一步中抛出的异常
20                .exceptionally(Throwable::getMessage);
21
22        // 调用 get 方法进行等待,获取执行结果
23        String result = cf.get();
24        System.out.println(result);
25    }
26
27    public static void main(String[] args) throws ExecutionException, InterruptedException {
28        exceptionallyExample();
29    }
30
31}

---END---

如果本文对你有帮助,可以关注我的公众号"小豆丁技术栈"了解最新动态,顺便也请帮忙 github 点颗星哦~感谢~


  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。