Loading
Loading...

Java 中的异步编程工具 CompletableFuture

环境信息:

  • JDK 版本: 1.8

示例地址:


一、CompletableFuture 概述

1.1 什么是同步和异步

在介绍 CompletableFuture 之前,先简单说一下什么是同步和异步这两个概念,介绍这俩个概念最直接的方式就是使用经典的 烧水泡茶 过程进行描述,一般来讲我们烧水泡茶时会经过 洗茶壶->注水->烧水->洗茶杯->装茶叶->泡茶 几个步骤:

如果使用 同步编程,那么泡茶过程可以划分为:

如果使用 异步编程,那么泡茶过程可以划分为:

根据上面的例子,相信大家对同步和异步有了一个大概了解,同步就是按指定顺序执行任务,这种执行过程必须按顺序进行,一步步执行,所以执行速度比较缓慢。而 异步 则不需要按照指定的顺序执行任务,它可以多个任务并行执行,等待异步任务完成后进行反馈,然后按照反馈结果执行特定操作即可,所以执行速度非常快。

1.2 CompletableFuture 简介

在 JDK 8 版本中,新增了很多功能与工具,其中 CompletableFuture 就是其中一种用于构建异步编程的工具,该工具主要是对 Future 进行优化,使开发者可以很方便的创建串行或者并行任务,以及实现任务的 OR 或者 AND 这种组合方式,极大程度上简化了异步编程中的代码量。

1.3 为什么推出 CompletableFuture

在 JDK 8 之前的版本中,编写异步任务常使用 Future 实现,使用 Future 执行异步任务并且获得异步执行结果时,我们会通过两种方式,要么调用阻塞的 get() 方法,或者轮询调用 isDone() 方法获取任务状态,判断是否为 true。不过这两种方法在执行时都会使主线程被迫等待,对性能会产生一定影响,故而 Java 团队在 JDK 8 版本中新增了 CompletableFuture 工具,通过使用观察者模式进行设计,实现异步回调进行异步编程,一定程度上降低了编写异步任务的代码量和难度。

二、CompletableFuture 中的分类

在 CompletableFuture 中有很多方法,并且这些方法命名并不是很容易归类,比如 thenRun 和 thenRunAsync 方法,这些方法有的以 Async 结尾,而有的则不是,一般情况下我们会认为带 Async 的方法属于异步方法,不带 Async 的方法属于同步方法,不过这个看法其实是并不是完全正确,因为有的方法即使不以 Async 结尾,也是通过异步方式执行的。

其实,本人认为如果需要对 CompletableFuture 中的方法进行归类的话,可以按照使用过程进行分类,这样比较容易将不同的方法划分到不同的类别中。比如,使用过程可以划分为 任务开启任务处理任务结束查看任务状态设置任务结果 以及 任务异常处理 等几种方法,其中 任务处理 又可以分为 串行任务并行任务

注意: 下面要罗列的以 Async 结尾的方法中,执行时并不会直接从 commonPool 线程池中获取线程,而是判断环境中设置的变量 commonParallelism 并行度,或者应用所在服务器的 CPU 数量是否大于 1,如果满足条件则就会从 ForkJoinPool.commonPool() 线程池中获取一个线程执行任务,如果不满足条件就会每次执行任务时新建一个线程,使用新创建的线程去执行任务。在下面介绍线程池时,会提到这一点。

2.1 任务开启方法

任务开启方法一般指的是 CompletableFuture 中,可以直接调用的静态方法,使用这些方法可以直接开启一个新任务,该任务相当于任务链中要执行的第一个步骤,常常我们会在该步骤中执行获取待处理的数据等操作。

任务开启方法包含如下:

方法名称有返回值描述
runAsync    x    从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。
supplyAsync    √    从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,返回指定类型的返回值。

2.2 任务处理方法

任务处理方法一般指的是 CompletableFuture 中,用于以串行或并行的方式处理数据的静态或实例方法,使用这些方法可以实现 OR 或 AND 任务组合,是异步编程中最重要的一些方法。

任务处理方法包含如下:

任务处理-串行任务

方法名称有返回值描述
thenRun    x    串行执行任务。并且该任务方法执行结束后,没有返回值。
thenRunAsync    x    串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。
thenApply    √    串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenApplyAsync    √    串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenAccept    x    串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。
thenAcceptAsync    x    串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
handle    √    串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
handleAsync    √    串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
whenComplete    x    串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。
whenCompleteAsync    x    串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
thenCompose    √    串行执行任务,按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。
thenComposeAsync    √    串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。

任务处理-并行任务

方法名称有返回值描述
thenCombine    √    并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenCombineAsync    √    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenAcceptBoth    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
thenAcceptBothAsync    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
runAfterBoth    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。
runAfterBothAsync    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
applyToEither    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
applyToEitherAsync    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
runAfterEither    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
runAfterEitherAsync    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
acceptEither    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
acceptEitherAsync    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
allOf    x    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。
anyOf    √    并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。

2.3 任务结束方法

任务结束指的是调用 CompletableFuture 中的实例方法,获取执行结果或者取消任务等,结束现有的任务链。

任务结束包含的方法如下:

方法名称有返回值描述
get    √    获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。
join    √    获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。
getNow    √    立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。
cancel    √    取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。

2.4 查看任务状态方法

查看任务状态方法用于查看 CompletableFuture 任务执行状态,其中包含的方法如下:

方法名称有返回值描述
isDone    √    查看任务是否执行完成,如果当前阶段执行完成(无论是正常完成还是异常完成)则返回 true,否则返回 false。
isCancelled    √    查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。
isCompletedExceptionally    √    查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。

2.5 设置任务结果方法

设置任务结果方法用于设置 CompletableFuture 任务结果,使其返回指定结果,其中包含的方法如下:

方法名称有返回值描述
obtrudeValue    x    设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。
obtrudeException    x    设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。
complete    √    设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。
completeException    √    设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

2.6 任务异常处理方法

任务异常处理方法用于处理 CompletableFuture 执行过程中的异常,对异常进行处理,其中包含的方法如下:

方法名称有返回值描述
exceptionally    x    判断上一个任务执行时是否发生异常,如果是则就会执行 exceptionally 方法,并且将上一步异常作为当前方法的参数,然后对异常进行处理。当然,如果上一阶段执行过程中没有出现异常,则不会执行 exceptionally 方法。

三、CompletableFuture 中的线程池

3.1 CompletableFuture 默认的线程池

使用 CompletableFuture 执行异步任务时,会首先判断设置的 parallelism 数量,该参数一般跟当前服务器 CPU 数量相关:

  • 如果 CPU 数量等于 1,CompletableFuture 执行任务时,每次都会创建一个新的线程执行任务;
  • 如果 CPU 数量大于 1,CompletableFuture 执行任务时,将使用公共的 ForkJoinPool.commonPool() 线程池;

一般情况下在多核 CPU 服务器中运行应用,都会默认使用 ForkJoinPool.commonPool() 线程池,该线程池根据名称也可以大概了解到,是基于 Fork 和 Join 组合实现的,执行过程中可以将大的任务拆分为多个小任务并行执行,并且支持以窃取的方式,线程池中的线程在执行完自己工作队列中的任务后,可以窃取别的线程工作队列中没有执行完成的任务,协助其执行,尽可能使用并行的方式快速完成全部任务。所以 ForkJoinPool.commonPool() 线程池更适合执行计算密集型任务,而不太适合 IO 密集型任务

不过公共的 ForkJoinPool.commonPool() 线程池是 JVM 进程中所有 CompletableFuture 和 Stream 共享,如果全局上下文环境中存在大量使用 ForkJoinPool.commonPool() 线程池的任务,并且这些任务中包含大量的 IO 操作,那么该线程池性能将会受到很大影响。所以,一般情况下我们使用 CompletableFuture 时,需要避免使用公共线程池,而是使用自定义的线程池,并且设置不同的任务使用不同类型的线程池,以适用不同的任务场景。

3.2 CompletableFuture 如何使用自定义线程池

在 CompletableFuture 中的绝大部分方法,都存在接收不同类型的重载方法,如:

正常的 CompletableFuture 方法:

  • thenRun(Runnable action)
  • handle(BiFunction<? super T, Throwable, ? extends U> fn)

使用公共线程池 commonPool 的异步方法:

  • thenRunAsync(Runnable action)
  • handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

使用自定义线程池的 CompletableFuture 方法:

  • thenRunAsync(Runnable action,Executor executor)
  • handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

大家根据上面的三种方法也能得知,在 CompletableFuture 中的方法,大部分都存在可以接收自定义线程池 Executor 参数的重载方法。因此,我们可以使用接收自定义线程池的 CompletableFuture 方法,将我们自定义的线程池作为参数,传入其中,使用该线程池中的线程执行任务。

如下,是一个使用 CompletableFuture 方法执行任务时使用自定义线程的示例:

import java.time.LocalDate;
import java.util.concurrent.*;
public class SupplyExample {
/**
* 创建自定义线程池
*
* @return 自定义线程池
*/
public static ThreadPoolExecutor myThreadPool() {
// 核心线程数
int coreSize = 10;
// 最大线程数
int maxSize = 20;
// 空闲线程的回收时间和单位
TimeUnit timeUnit = TimeUnit.SECONDS;
// 空闲线程时间销毁
long keepAliveTime = 60L;
// 工作队列
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);
// 拒绝策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
// 创建并返回自定义线程池
return new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, timeUnit, workQueue, handler);
}
/**
* 执行 CompletableFuture.supplyAsync 方法
*/
public static void supplyAsyncExample() {
// 自定义线程池
ThreadPoolExecutor threadPool = myThreadPool();
// 执行 CompletableFuture 任务,将日期字符串转换为 LocalDate 日期对象
CompletableFuture<LocalDate> cf = CompletableFuture
.supplyAsync(() -> LocalDate.parse("2022-06-01"), threadPool);
// 调用 join 方法进入堵塞状态,直至获取任务执行结果输出到控制台
System.out.println(cf.join());
}
/**
* main() 方法
*/
public static void main(String[] args) {
supplyAsyncExample();
}
}

3.3 CPU 集型任务和 IO 密集型任务

常常我们执行的任务可以根据 CPU 的使用或者 IO 的使用进行划分,可以分为:

CPU 密集型任务

CPU 密集型任务也称为计算密集型任务,指的是任务执行过程需要进行大量计算,没有堵塞,且消耗大量的 CPU 资源。比如,计算圆周率,视频解码,类型转换等。

IO 密集型任务

IO 密集型任务,指的是需要进行大量磁盘IO读取,网络IO操作等,执行过程会造成堵塞,需要创建大量的线程执行任务。比如,数据库读写,文件读写,网络请求等。

3.4 不同类型任务对线程池的选择

在 Java 中常用的线程池按执行方式划分的话,其实可以划分为 ForkJoinPoolExecutorService 两种。

ForkJoinPool 线程池

关于 ForkJoinPool 线程池上面也提到过,这种线程池是基于分而治之的思想对任务进行拆解与合并,执行过程需要消耗大量 CPU 分解任务,然后进行计算,因此特别适合执行 计算密集型任务

ExecutorService 线程池

而 ExecutorService 线程池是传统方式的线程池,使用池化方式管理线程,提前将若干个可执行的线程放入池中,当我们需要时就从池中获取线程执行任务,执行过程不会对任务进行拆解,并且使用完毕后不需要销毁线程而是放回池中,方便下次再使用,从而减少创建和销毁线程的性能开销。所以,这种线程池更适合执行过程中需要进行大量 IO 操作的任务,即适合 IO 密集型任务

3.5 线程线程大小配置推荐

一般情况下,业务上使用的线程池都会设置线程池的线程数量大小,这个线程池线程数量大小设置为多少是一个值得考虑的问题。设置线程池过多会造成线程浪费,过少会造成处理任务的线程过少,造成任务堆积。所以,这个配置线程数量多大并不容易把控。最优的就是配置不同的线程数进行测试,然后判断应用设置线程池大小为多大性能最优。

这里可以参考网上的一套万能的推荐配置,跟操作系统的 CPU 数量相关,如下:

  • CPU 密集型任务: N + 1
  • IO 密集型任务: 2N + 1

还有一种针对 IO 密集型任务设置估算公式,按公式进行配置这种性能最优,公式如下:

  • 估算公式: = (线程等待时间 + 线程 CPU 时间 / 线程 CPU 时间) * CPU 核心数

如果是既有 IO 操作的步骤,也有比较消耗 CPU 的步骤,这种混合型任务可以进行拆分,将不同的任务使用不同的线程池。

四、CompletableFuture 任务开启方法示例

4.1 runAsync

方法描述

方法有返回值描述
runAsync       x       从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。

任务要求

使用异步的方式,模拟获取远程信息,然后将获取的远程信息输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
public class RunAsyncExample {
public static void runAsyncExample() {
// 调用 runAsync 方法,异步执行 runnable 中的代码逻辑,模拟获取远程信息
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("模拟获取远程信息并输出到控制台"));
// 调用 join 方法进行等待,获取执行结果
cf.join();
}
public static void main(String[] args) {
runAsyncExample();
}
}

4.2 supplyAsync

方法描述

方法有返回值描述
supplyAsync       √       从 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,返回指定类型的返回值。

任务要求

使用异步的方式,模拟获取远程信息,然后将获取的信息作为任务返回结果,之后将任务返回的结果输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
public class SupplyAsyncExample {
public static void supplyAsyncExample() {
// 调用 supplyAsync 方法,异步执行 runnable 中的代码逻辑,模拟获取远程信息,然后返回
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("模拟获取远程信息");
return "远程信息";
});
// 调用 join 方法进行等待,获取执行结果
String result = cf.join();
System.out.println(result);
}
public static void main(String[] args) {
supplyAsyncExample();
}
}

五、CompletableFuture 任务处理-串行任务方法示例

5.1 thenRun 和 thenRunAsync

方法描述

方法有返回值描述
thenRun       x       使用主线程或者执行上一步任务的子线程,串行执行任务。并且该任务方法执行结束后,没有返回值。
thenRunAsync       x       串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。

任务要求

使用 CompletableFuture 工具,按照代码顺序执行任务,处理使用 ”,” 拼接的日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,先通过 , 拆分字符串,然后存入原子引用类型对象包裹的 List 集合中;
  • 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;

任务执行完成后,将全部日期对象输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenRun

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
public class ThenRunExample {
public static void thenRunExample() {
// 模拟的远程获取的日期字符串
String dateStr = "1995-05-10,2000-03-15";
// 原子引用类对象
AtomicReference<List<String>> dateStrList = new AtomicReference<>();
// LocalDate集合
List<LocalDate> dateList = new ArrayList<>();
// 执行 CompletableFuture 任务
// (1) 第一步,先通过 "," 拆分字符串,然后存入原子引用对象包裹的 List 集合中;
// (2) 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;
CompletableFuture
.runAsync(() -> dateStrList.set(Arrays.asList(dateStr.split(","))))
.thenRun(() -> dateStrList.get().forEach(v -> dateList.add(LocalDate.parse(v, DateTimeFormatter.ofPattern("yyyy-MM-dd")))))
.join();
// 输出转换结果
for (LocalDate localDate : dateList) {
System.out.println(localDate.toString());
}
}
public static void main(String[] args) {
thenRunExample();
}
}

thenRunAsync

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
public class ThenRunAsyncExample {
public static void thenRunAsyncExample() {
// 模拟的远程获取的日期字符粗
String dateStr = "1995-05-10,2000-03-15";
// 原子引用类对象
AtomicReference<List<String>> dateStrList = new AtomicReference<>();
// LocalDate集合
List<LocalDate> dateList = new ArrayList<>();
// 执行 CompletableFuture 任务
// (1) 第一步,先通过 "," 拆分字符串,然后存入原子引用对象包裹的 List 集合中;
// (2) 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;
CompletableFuture
.runAsync(() -> dateStrList.set(Arrays.asList(dateStr.split(","))))
.thenRunAsync(() -> dateStrList.get().forEach(v -> dateList.add(LocalDate.parse(v, DateTimeFormatter.ofPattern("yyyy-MM-dd")))))
.join();
// 输出转换结果
for (LocalDate localDate : dateList) {
System.out.println(localDate.toString());
}
}
public static void main(String[] args) {
thenRunAsyncExample();
}
}

5.2 thenApply 和 thenApplyAsync

方法描述

方法有返回值描述
thenApply       √       使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenApplyAsync       √       串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

任务要求

使用 CompletableFuture 按照顺序执行任务,获取远程日期字符串,然后对日期进行逻辑计算,并返回计算后的结果。

  • 第一步,先模拟通过远程结果获取日期字符串;
  • 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
  • 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenApply

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture;
public class ThenApplyExample {
public static void thenApplyExample() {
// 执行 CompletableFuture 串行任务
CompletableFuture<String> cf = CompletableFuture
// 第一步,先模拟通过远程结果获取日期字符串;
.supplyAsync(() -> "2022-06-01")
// 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
.thenApply(param -> LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd")))
// 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
.thenApply(param -> param.plusDays(10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
// 调用 join 方法进入堵塞状态,直至获取任务执行结果
String result = cf.join();
System.out.println(result);
}
public static void main(String[] args) {
thenApplyExample();
}
}

thenApplyAsync

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture;
public class ThenApplyAsyncExample {
public static void thenApplyAsyncExample() {
// 执行 CompletableFuture 串行任务
CompletableFuture<String> cf = CompletableFuture
// 第一步,先模拟通过远程结果获取日期字符串;
.supplyAsync(() -> "2022-06-01")
// 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
.thenApplyAsync(param -> LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd")))
// 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
.thenApplyAsync(param -> param.plusDays(10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
// 调用 join 方法进入堵塞状态,直至获取任务执行结果
String result = cf.join();
System.out.println(result);
}
public static void main(String[] args) {
thenApplyAsyncExample();
}
}

5.3 thenAccept 和 thenAcceptAsync

方法描述

方法有返回值描述
thenAccept       x       使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。
thenAcceptAsync       x       串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。

任务要求

使用 CompletableFuture 按照顺序执行任务,处理使用 ”,” 拼接的日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,先通过 , 拆分字符串,转换为字符串数组,返回字符串数组;
  • 第二步,接收上一步中的日期字符串数组,遍历日期字符串数组,然后将数组中的日期字符串都转换为 LocalDate 日期对象;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenAccept

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
public class ThenAcceptExample {
public static void thenAcceptExample() {
// 模拟的远程获取的日期字符粗
String dateStr = "1995-05-10,2000-03-15";
// 存储 LocalDate 的集合
List<LocalDate> dateList = new CopyOnWriteArrayList<>();
// 执行 CompletableFuture 任务
CompletableFuture
.supplyAsync(() -> dateStr.split(","))
.thenAccept(dateArray -> {
for (String s : dateArray) {
dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
}
})
.join();
// 输出转换结果
for (LocalDate localDate : dateList) {
System.out.println(localDate.toString());
}
}
public static void main(String[] args) {
thenAcceptExample();
}
}

thenAcceptAsync

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
public class ThenAcceptAsyncExample {
public static void thenAcceptAsyncExample() {
// 模拟的远程获取的日期字符粗
String dateStr = "1995-05-10,2000-03-15";
// 存储 LocalDate 的集合
List<LocalDate> dateList = new CopyOnWriteArrayList<>();
// 执行 CompletableFuture 任务
CompletableFuture
.supplyAsync(() -> dateStr.split(","))
.thenAcceptAsync(dateArray -> {
for (String s : dateArray) {
dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
}
})
.join();
// 输出转换结果
for (LocalDate localDate : dateList) {
System.out.println(localDate.toString());
}
}
public static void main(String[] args) {
thenAcceptAsyncExample();
}
}

5.4 handle 和 handleAsync

方法描述

方法有返回值描述
handle       √       使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
handleAsync       √       串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

任务要求

使用 CompletableFuture 按照顺序执行任务,从远程接口获取日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,获取远程日期字符串,然后返回;
  • 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

handle

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class HandleExample {
public static void handleExample() {
// 执行 CompletableFuture 串行任务:
// 第一步,获取远程日期字符串,然后返回
// 第二步,接收上一步中的日期字符串,将其转换为 `LocalDate` 日期对象
CompletableFuture<LocalDate> cf = CompletableFuture
.supplyAsync(() -> {
int random = new Random().nextInt(2);
// 50% 概率发生异常
if (random != 0) {
throw new RuntimeException("模拟发生异常");
}
// 50% 概率返回正常值
return "2022-06-01";
})
.handle((param, exception) -> {
// 如果上一步结果为异常,则返回现在的日期,否则将上一步获取的日期字符串转换为日期对象
if (exception != null) {
return LocalDate.now();
}
return LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
});
// 调用 join 方法进入堵塞状态,直至获取任务执行结果
LocalDate result = cf.join();
System.out.println(result);
}
public static void main(String[] args) {
handleExample();
}
}

handleAsync

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class HandleAsyncExample {
public static void handleAsyncExample() {
// 执行 CompletableFuture 串行任务:
// 第一步,获取远程日期字符串,然后返回;
// 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象;
CompletableFuture<LocalDate> cf = CompletableFuture
.supplyAsync(() -> {
int random = new Random().nextInt(2);
// 50% 概率发生异常
if (random != 0) {
throw new RuntimeException("模拟发生异常");
}
// 50% 概率返回正常值
return "2022-06-01";
})
.handleAsync((param, exception) -> {
// 如果上一步结果为异常,则返回现在的日期,否则将上一步获取的日期字符串转换为日期对象
if (exception != null) {
return LocalDate.now();
}
return LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
});
// 调用 join 方法进入堵塞状态,直至获取任务执行结果
LocalDate result = cf.join();
System.out.println(result);
}
public static void main(String[] args) {
handleAsyncExample();
}
}

5.5 whenComplete 和 whenCompleteAsync

方法描述

方法有返回值描述
whenComplete       x       使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。
whenCompleteAsync       x       串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
  除此之外:
   如果上一阶段中正常执行结束,则该方法的结果参数不为 null;
   如果上一阶段中抛出异常,则该方法的异常参数不为 null;

任务要求

使用 CompletableFuture 按照顺序执行任务,从远程接口获取日期字符串,将其转换为 LocalDate 日期对象。

  • 第一步,获取远程日期字符串,然后返回;
  • 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象,并且加入到集合;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

whenComplete

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class WhenCompleteExample {
public static void whenCompleteExample() {
// 模拟的远程获取的日期字符粗
String dateStr = "1995-05-10,2000-03-15";
// 存储 LocalDate 的集合
List<LocalDate> dateList = new CopyOnWriteArrayList<>();
// 执行 CompletableFuture 任务
CompletableFuture
.supplyAsync(() -> {
int random = new Random().nextInt(2);
if (random == 1){
throw new RuntimeException("模拟发生异常");
}
return dateStr.split(",");
})
.whenComplete((dateArray, exception) -> {
// 如果上一步执行过程没有发生异常,则将日期字符串转换为日期对象
if (dateArray != null){
for (String s : dateArray) {
dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
}
}
})
.join();
// 输出转换结果
for (LocalDate localDate : dateList) {
System.out.println(localDate.toString());
}
}
public static void main(String[] args) {
whenCompleteExample();
}
}

whenCompleteAsync

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.CopyOnWriteArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class WhenCompleteAsyncExample {
public static void whenCompleteAsyncExample(){
// 模拟的远程获取的日期字符粗
String dateStr = "1995-05-10,2000-03-15";
// 存储 LocalDate 的集合
List<LocalDate> dateList = new CopyOnWriteArrayList<>();
// 执行 CompletableFuture 任务
CompletableFuture
.supplyAsync(() -> {
int random = new Random().nextInt(2);
if (random == 1){
throw new RuntimeException("模拟发生异常");
}
return dateStr.split(",");
})
.whenCompleteAsync((dateArray, exception) -> {
// 如果上一步执行过程没有发生异常,则将日期字符串转换为日期对象
if (dateArray != null){
for (String s : dateArray) {
dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
}
}
})
.join();
// 输出转换结果
for (LocalDate localDate : dateList) {
System.out.println(localDate.toString());
}
}
public static void main(String[] args) {
whenCompleteAsyncExample();
}
}

5.6 thenCompose 和 thenComposeAsync

方法描述

方法有返回值描述
thenCompose       √       使用主线程或者执行上一步任务的线程,串行执行任务。按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。
thenComposeAsync       √       串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。

任务要求

使用 CompletableFuture 按照顺序执行任务,获取远程日期字符串,然后对日期进行逻辑计算,并返回计算后的结果。

  • 第一步,先模拟通过远程结果获取日期字符串;
  • 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
  • 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;

任务执行完成后,将任务返回的日期字符串输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

thenCompose

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture;
public class ThenComposeExample {
public static void thenComposeExample() {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf2 = CompletableFuture
// 模拟获取远程接口获取创建时间,然后得到结果 "2022-06-01"
.supplyAsync(() -> "2022-06-01")
// 实现将两个有依赖关系的任务组合在一起,一个用于转换字符串为日期类型,一个用于计算并转换为日期字符串
.thenCompose(param -> {
// 将时间字符串转换为 LocalDate 日期类型
LocalDate localDate = LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
// 最后创建一个新的任务,任务中将 LocalDate 日期+10天,并转换回字符串返回
return CompletableFuture.supplyAsync(() -> localDate
.plusDays(10)
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
});
// 进入堵塞状态,等待这些阶段执行完成
String result = cf2.join();
System.out.println("计算结果:" + result);
}
public static void main(String[] args) {
thenComposeExample();
}
}

thenComposeAsync

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture;
public class ThenComposeAsyncExample {
public static void thenComposeAsyncExample() {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf2 = CompletableFuture
// 模拟获取远程接口获取创建时间,然后得到结果 "2022-06-01"
.supplyAsync(() -> "2022-06-01")
// 实现将两个有依赖关系的任务组合在一起,一个用于转换字符串为日期类型,一个用于计算并转换为日期字符串
.thenComposeAsync(param -> {
// 将时间字符串转换为 LocalDate 日期类型
LocalDate localDate = LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
// 最后创建一个新的任务,任务中将 LocalDate 日期+10天,并转换回字符串返回
return CompletableFuture.supplyAsync(() -> localDate
.plusDays(10)
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
});
// 进入堵塞状态,等待这些阶段执行完成
String result = cf2.join();
System.out.println("计算结果:" + result);
}
public static void main(String[] args) {
thenComposeAsyncExample();
}
}

六、CompletableFuture 任务处理-并行任务方法示例

6.1 thenCombine 和 thenCombineAsync

方法描述

方法有返回值描述
thenCombine       √       并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenCombineAsync       √       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。

不过这里需要注意,如果上一个阶段执为异常完成,则不会执行组合的下一个阶段,直接抛出异常信息。

任务要求

使用 CompletableFuture 并行执行任务,模拟获取远程端口的【姓名】和【岁数】:

  • 任务一,调用远程接口,获取姓名;
  • 任务二,调用远程接口,获取岁数;

等到任务一和任务二都执行完成后,将任务一和任务二中获取的【姓名】和【岁数】拼在一个字符串中,然后返回;

执行流程

任务执行流程如下图所示:

示例代码

thenCombine

import java.util.concurrent.CompletableFuture;
public class ThenCombineExample {
public static void thenCombineExample() {
// 模拟调用远程接口获取【姓名】
CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
// 模拟调用远程接口获取【岁数】
CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
// 执行 CompletableFuture 任务
// 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
// 然后将获取的【姓名】和【岁数】拼在一起,之后返回新字符串
CompletableFuture<String> cf = nameCf.thenCombine(ageCf, (name, age) -> "姓名:" + name + ", 岁数:" + age);
// 进入堵塞状态,等待这些阶段执行完成后获取结果
String info = cf.join();
System.out.println(info);
}
public static void main(String[] args) {
thenCombineExample();
}
}

thenCombineAsync

import java.util.concurrent.CompletableFuture;
public class ThenCombineAsyncExample {
public static void thenCombineAsyncExample() {
// 模拟调用远程接口获取【姓名】
CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
// 模拟调用远程接口获取【岁数】
CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
// 执行 CompletableFuture 任务
// 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
// 然后将获取的【姓名】和【岁数】拼在一起,之后返回新字符串
CompletableFuture<String> cf = nameCf.thenCombineAsync(ageCf, (name, age) -> "姓名:" + name + ", 岁数:" + age);
// 进入堵塞状态,等待这些阶段执行完成后获取结果
String info = cf.join();
System.out.println(info);
}
public static void main(String[] args) {
thenCombineAsyncExample();
}
}

6.2 thenAcceptBoth 和 thenAcceptBothAsync

方法描述

方法有返回值描述
thenAcceptBoth       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
thenAcceptBothAsync       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,模拟获取远程端口的【姓名】和【岁数】:

  • 任务一,调用远程接口,获取姓名;
  • 任务二,调用远程接口,获取岁数;

等到任务一和任务二都执行完成后,将任务一和任务二中获取的【姓名】和【岁数】拼在一个字符串中,然后输出到控制台;

执行流程

任务执行流程如下图所示:

示例代码

thenAcceptBoth

import java.util.concurrent.CompletableFuture;
public class ThenAcceptBothExample {
public static void thenAcceptBothExample(){
// 模拟调用远程接口获取【姓名】
CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
// 模拟调用远程接口获取【岁数】
CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
// 执行 CompletableFuture 任务
// 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
// 然后将获取的【姓名】和【岁数】输出
CompletableFuture<Void> cf = nameCf.thenAcceptBoth(ageCf,
(name, age) -> System.out.println("姓名:" + name + ", 岁数:" + age));
// 进入堵塞状态等待各阶段执行完成
cf.join();
}
public static void main(String[] args) {
thenAcceptBothExample();
}
}

thenAcceptBothAsync

import java.util.concurrent.CompletableFuture;
public class ThenAcceptBothAsyncExample {
public static void thenAcceptBothAsyncExample() {
// 模拟调用远程接口获取【姓名】
CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
// 模拟调用远程接口获取【岁数】
CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
// 执行 CompletableFuture 任务
// 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
// 然后将获取的【姓名】和【岁数】输出
CompletableFuture<Void> cf = nameCf.thenAcceptBothAsync(ageCf,
(name, age) -> System.out.println("姓名:" + name + ", 岁数:" + age));
// 进入堵塞状态等待各阶段执行完成
cf.join();
}
public static void main(String[] args) {
thenAcceptBothAsyncExample();
}
}

6.3 runAfterBoth 和 runAfterBothAsync

方法描述

方法有返回值描述
thenAcceptBoth       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。
thenAcceptBothAsync       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,执行步骤一和步骤二:

  • 任务一,执行步骤一;
  • 任务二,执行步骤二;

等到步骤一和步骤二都执行完成后,就执行最终步骤。

执行流程

任务执行流程如下图所示:

示例代码

runAfterBoth

import java.util.concurrent.CompletableFuture;
public class RunAfterBothExample {
public static void runAfterBothExample() {
// CompletableFuture 任务 - 步骤1
CompletableFuture<Void> step1 = CompletableFuture.runAsync(() -> System.out.println("阶段1"));
// CompletableFuture 任务 - 步骤2
CompletableFuture<Void> step2 = CompletableFuture.runAsync(() -> System.out.println("阶段2"));
// 当【阶段1】和【阶段2】并行执行完成后,则执行特定任务
step1.runAfterBoth(step2, ()-> System.out.println("全部阶段完成,执行特定任务"));
}
public static void main(String[] args) {
runAfterBothExample();
}
}

runAfterBothAsync

import java.util.concurrent.CompletableFuture;
public class RunAfterBothAsyncExample {
public static void runAfterBothAsyncExample() {
// CompletableFuture 任务 - 步骤1
CompletableFuture<Void> step1 = CompletableFuture.runAsync(() -> System.out.println("阶段1"));
// CompletableFuture 任务 - 步骤2
CompletableFuture<Void> step2 = CompletableFuture.runAsync(() -> System.out.println("阶段2"));
// 当【阶段1】和【阶段2】并行执行完成后,则执行特定任务
step1.runAfterBothAsync(step2, ()-> System.out.println("全部阶段完成,执行特定任务"));
}
public static void main(String[] args) {
runAfterBothAsyncExample();
}
}

6.4 applyToEither 和 applyToEitherAsync

方法描述

方法有返回值描述
applyToEither       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
applyToEitherAsync       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:

  • 任务一,从通道1中获取信息;
  • 任务二,从通道2中获取信息;
  • 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就处理先执行完成任务中返回的结果;

几个任务执行完成后,将任务返回结果输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

applyToEither

import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class ApplyToEitherExample {
public static void applyToEitherExample(){
// 通道1 (模拟获取远程信息)
CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
randomSleep(1000);
return "阶段1-从通道1成功获取结果";
});
// 通道2 (模拟获取远程信息)
CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
randomSleep(1000);
return "阶段2-从通道2成功获取结果";
});
// 执行 CompletableFuture 任务
// 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
// 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
// 务要执行的参数,然后执行下一步操作,执行完成后返回指定值。
CompletableFuture<String> channelTask = channel1.applyToEither(channel2, response -> "获取的结果: " + response);
// 进入堵塞状态,等待这些阶段执行完成后获取结果并输出
String result = channelTask.join();
System.out.println(result);
}
/**
* 指定规定时间内睡眠
* @param time 随机睡眠时间
*/
private static void randomSleep(int time) {
try {
Thread.sleep(new Random().nextInt(time));
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {
applyToEitherExample();
}
}

applyToEitherAsync

import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class ApplyToEitherAsyncExample {
public static void applyToEitherAsyncExample1(){
// 通道1 (模拟获取远程信息)
CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
randomSleep(1000);
return "阶段1-从通道1成功获取结果";
});
// 通道2 (模拟获取远程信息)
CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
randomSleep(1000);
return "阶段2-从通道2成功获取结果";
});
// 执行 CompletableFuture 任务
// 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
// 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
// 务要执行的参数,然后执行下一步操作,执行完成后返回指定值。
CompletableFuture<String> channelTask = channel1.applyToEitherAsync(channel2, response -> "获取的结果: " + response);
// 进入堵塞状态,等待这些阶段执行完成后获取结果并输出
String result = channelTask.join();
System.out.println(result);
}
/**
* 指定规定时间内睡眠
* @param time 随机睡眠时间
*/
private static void randomSleep(int time){
try {
int randomNumber = new Random().nextInt(time);
Thread.sleep(randomNumber);
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {
applyToEitherAsyncExample1();
}
}

6.5 runAfterEither 和 runAfterEitherAsync

方法描述

方法有返回值描述
runAfterEither       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
runAfterEitherAsync       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:

  • 任务一,执行步骤一;
  • 任务二,执行步骤二;
  • 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就执行特定动作。

执行流程

任务执行流程如下图所示:

示例代码

runAfterEither

import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class RunAfterEitherExample {
public static void runAfterEitherExample(){
// 通道1 (模拟向远程通道1写入数据)
CompletableFuture<Void> channel1 = CompletableFuture.runAsync(() -> {
randomSleep(1000);
System.out.println("阶段1-执行完成");
});
// 通道2 (模拟向远程通道2写入数据)
CompletableFuture<Void> channel2 = CompletableFuture.runAsync(() -> {
randomSleep(1000);
System.out.println("阶段2-执行完成");
});
// 执行 CompletableFuture 任务
// 并行执行【任务1】和【任务2】,无论哪个任务先执行完成,直接进行下一步任务,执行特定动作
channel1.runAfterEither(channel2, () -> System.out.println("任意任务执行完成-执行特定操作")).join();
}
/**
* 指定规定时间内睡眠
*
* @param time 随机睡眠时间
*/
private static void randomSleep(int time) {
try {
Thread.sleep(new Random().nextInt(time));
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {
runAfterEitherExample();
}
}

runAfterEitherAsync

import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class RunAfterEitherAsyncExample {
public static void runAfterEitherAsyncExample(){
// 通道1 (模拟向远程通道1写入数据)
CompletableFuture<Void> channel1 = CompletableFuture.runAsync(() -> {
randomSleep(1000);
System.out.println("阶段1-执行完成");
});
// 通道2 (模拟向远程通道2写入数据)
CompletableFuture<Void> channel2 = CompletableFuture.runAsync(() -> {
randomSleep(1000);
System.out.println("阶段2-执行完成");
});
// 执行 CompletableFuture 任务
// 并行执行【任务1】和【任务2】,无论哪个任务先执行完成,直接进行下一步任务,执行特定动作
channel1.runAfterEitherAsync(channel2, () -> System.out.println("任意任务执行完成-执行特定操作")).join();
}
/**
* 指定规定时间内睡眠
*
* @param time 随机睡眠时间
*/
private static void randomSleep(int time) {
try {
Thread.sleep(new Random().nextInt(time));
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {
runAfterEitherAsyncExample();
}
}

6.6 acceptEither 和 acceptEitherAsync

方法描述

方法有返回值描述
AcceptEither       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
AcceptEitherAsync       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

任务要求

使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:

  • 任务一,从通道1中获取信息;
  • 任务二,从通道2中获取信息;
  • 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就将先执行完成任务中返回的结果输出到控制台;

执行流程

任务执行流程如下图所示:

示例代码

AcceptEither

import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class AcceptEitherExample {
public static void acceptEitherExample() {
// 通道1 (模拟获取远程信息)
CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
randomSleep(1000);
return "阶段1-成功获取结果";
});
// 通道2 (模拟获取远程信息)
CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
randomSleep(1000);
return "阶段2-成功获取结果";
});
// 执行 CompletableFuture 任务
// 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
// 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
// 务要执行的参数,然后执行下一步操作,执行完成后没有返回值。
channel1.acceptEither(channel2, result -> System.out.println("获取的结果: " + result)).join();
}
/**
* 指定规定时间内睡眠
*
* @param time 随机睡眠时间
*/
private static void randomSleep(int time) {
try {
Thread.sleep(new Random().nextInt(time));
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {
acceptEitherExample();
}
}

AcceptEitherAsync

import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class AcceptEitherAsyncExample {
public static void acceptEitherAsyncExample(){
// 通道1 (模拟获取远程信息)
CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
randomSleep(1000);
return "阶段1-成功获取结果";
});
// 通道2 (模拟获取远程信息)
CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
randomSleep(1000);
return "阶段2-成功获取结果";
});
// 执行 CompletableFuture 任务
// 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
// 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
// 务要执行的参数,然后执行下一步操作,执行完成后没有返回值。
channel1.acceptEitherAsync(channel2, result -> System.out.println("获取的结果: " + result)).join();
}
/**
* 指定规定时间内睡眠
*
* @param time 随机睡眠时间
*/
private static void randomSleep(int time){
try {
Thread.sleep(new Random().nextInt(time));
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {
acceptEitherAsyncExample();
}
}

6.7 allOf

方法描述

方法有返回值描述
allOf       x       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。

不过这里需要注意: allOf 方法执行时,如果传入的 CompletableFuture<?> 中的其中一个阶段异常完成时,那么返回的 CompletableFuture<Void> 也异常完成,并将此异常作为异常原因。

任务要求

并行执行 CompletableFuture 任务,模拟从三个接口获取用户相关信息:

  • 任务一,调用接口1获取用户基本信息;
  • 任务二,调用接口2获取用户头像;
  • 任务三,调用接口3获取用用户余额;

等待三个任务并行执行完成,然后将获取的信息聚合在 Map 集合中。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
public class AllOfExample {
public static void allOfExample() {
// 创建聚合数据的 Map 集合
Map<String, String> userMap = new ConcurrentHashMap<>(3);
// 创建待执行的 Runnable 参数
Runnable runnable1 = () -> {
System.out.println("任务1-成功获取用户基本信息");
userMap.put("userInfo", "{name: mydlq, age: 18}");
};
Runnable runnable2 = () -> {
System.out.println("任务2-成功获取用户头像");
userMap.put("avatar", "http://www.xxx.com/avatar");
};
Runnable runnable3 = () -> {
System.out.println("任务3-成功获取用户余额");
userMap.put("balance", "1000");
};
// 执行多个 CompletableFuture,需要等待全部完成
CompletableFuture<Void> cf = CompletableFuture.allOf(
CompletableFuture.runAsync(runnable1),
CompletableFuture.runAsync(runnable2),
CompletableFuture.runAsync(runnable3)
);
// 进入堵塞状态,等待执行完成
cf.join();
// 输出用户信息
System.out.println("获取的用户信息:");
for (Map.Entry<String, String> entry : userMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
}
public static void main(String[] args) {
allOfExample();
}
}

6.8 anyOf

方法描述

方法有返回值描述
anyOf       √       并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。

不过这里需要注意: anyOf 方法执行时,如果传入的全部 CompletableFuture<?> 阶段都没有完成前,任意一个阶段执行过程出现异常并没有处理,也就是说该阶段执行过程异常完成,那么返回的 CompletableFuture<Object> 也异常完成,并将此异常作为异常原因。

任务要求

并行执行 CompletableFuture 任务,模拟从三个接口中获取相同的信息,只要有任意一个接口先行返回信息,就直接执行下一步骤,而无需等待。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
public class AnyOfExample {
public static void anyOfExample() {
Supplier<String> supplier1 = () -> {
System.out.println("通道1");
return "通道1-成功获取信息";
};
Supplier<String> supplier2 = () -> {
System.out.println("通道2");
return "通道2-成功获取信息";
};
Supplier<String> supplier3 = () -> {
System.out.println("通道3");
return "通道3-成功获取信息";
};
// 执行多个 CompletableFuture,只要任意一个执行完成就执行下一步
CompletableFuture<Object> cf = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(supplier1),
CompletableFuture.supplyAsync(supplier2),
CompletableFuture.supplyAsync(supplier3)
);
// 进入堵塞状态,等待执行完成,输出获取的信息
Object result = cf.join();
System.out.println(result);
}
/**
* 随机睡眠指定时间
*
* @param time 睡眠时间
*/
public static void randomTimeSleep(int time){
try {
Thread.sleep(new Random().nextInt(time));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
anyOfExample();
}
}

七、任务结束方法

7.1 get

方法描述

方法有返回值描述
get       √       获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。

任务要求

执行 CompletableFuture 任务,然后调用 get 方法进行等待,获取执行结果。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class GetExample {
public static void getExample() throws ExecutionException, InterruptedException {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "执行结果");
// 调用 get 方法进行等待,获取执行结果
cf.get();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
getExample();
}
}

7.2 join

方法描述

方法有返回值描述
join       √       获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。

任务要求

执行 CompletableFuture 任务,然后调用 join 方法进行等待,获取执行结果。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
public class JoinExample {
public static void joinExample() {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "执行结果");
// 调用 join 方法进行等待,获取执行结果
cf.join();
}
public static void main(String[] args) {
joinExample();
}
}

7.3 getNow

方法描述

方法有返回值描述
getNow       √       立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。

任务要求

执行 CompletableFuture 任务,然后调用 getNow 方法获取任务执行结果,如果任务没有执行完成则返回默认值。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class GetNowExample {
public static void getNowExample() {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
// 睡眠5毫秒
sleep(5);
return "示例-执行完成";
});
// 随机睡眠1~10毫秒
sleep(new Random().nextInt(10));
// 调用 getNow 方法获取执行结果,如果任务未执行完成则输出设置的默认值
String result = cf.getNow("默认值");
System.out.println(result);
}
/**
* 线程睡眠
*
* @param millis 睡眠时间(单位:毫秒)
*/
public static void sleep(long millis){
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
getNowExample();
}
}

7.4 cancel

方法描述

方法有返回值描述
cancel       √       取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。

任务要求

执行 CompletableFuture 任务,然后调用 cancel 方法取消任务。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CancelExample {
public static void cancelExample() throws ExecutionException, InterruptedException {
// 执行 CompletableFuture 任务
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
// 随机睡眠1~10毫秒
sleep(new Random().nextInt(10));
System.out.println("示例-执行任务完成");‘
});
// 随机睡眠1~10毫秒
sleep(new Random().nextInt(10));
// 调用 cancel 方法取消任务
boolean cancelResult = cf.cancel(true);
System.out.println("取消任务: " + cancelResult);
// 调用 get 方法获取执行结果,如果取消任务将抛出 CancellationException 异常,这里对该异常进行处理
try {
cf.get();
} catch (CancellationException e) {
System.out.println("获取任务失败,任务已经被取消");
}
}
/**
* 线程睡眠
*
* @param millis 睡眠时间(单位:毫秒)
*/
public static void sleep(long millis){
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
cancelExample();
}
}

八、CompletableFuture 查看任务状态方法示例

8.1 isDone

方法描述

查看任务是否执行完成,如果当前阶段执行完成 (无论是正常完成还是异常完成) 则返回 true,否则返回 false

方法有返回值描述
isDone       √       查看任务是否执行完成,如果当前阶段执行完成 (无论是正常完成还是异常完成) 则返回 true,否则返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 isDone 方法查看任务是否执行完成。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
public class IsDoneExample {
public static void isDoneExample() throws InterruptedException {
// 执行 CompletableFuture 任务
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("任务执行中..."));
// 调用 isDone 方法查看任务是否执行完成
System.out.println("任务是否完成: " + cf.isDone());
// 等待1秒时间
Thread.sleep(1000L);
// 调用 isDone 方法再次查看任务是否执行完成
System.out.println("任务是否完成: " + cf.isDone());
}
public static void main(String[] args) throws InterruptedException {
isDoneExample();
}
}

8.2 isCancelled

方法描述

方法有返回值描述
isCancelled       √       查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 cancel 方法取消任务,之后调用 isCancelled 方法观察任务是否成功取消。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
public class IsCancelledExample {
public static void isCancelledExample(){
// 执行 CompletableFuture 任务
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("执行 CompletableFuture 任务"));
// 调用 cancel 方法取消任务
cf.cancel(true);
// 调用 isCancelled 方法,查询任务是否成功被取消
System.out.println("是否取消任务: " + cf.isCancelled());
}
public static void main(String[] args) {
isCancelledExample();
}
}

8.3 isCompletedExceptionally

方法描述

方法有返回值描述
isCompletedExceptionally       √       查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 isCompletedExceptionally 方法查看任务是否异常执行完成。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
public class IsCompletedExceptionallyExample {
public static void isCompletedExceptionallyExample() throws InterruptedException {
// 执行 CompletableFuture 任务
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
System.out.println("执行中...");
// 模拟发生异常
System.out.println(1/0);
});
// 等待1秒确保任务执行完成
Thread.sleep(1000L);
// 调用 isCompletedExceptionally 方法验证当前阶段是否异常完成
System.out.println("是否异常完成: " + cf.isCompletedExceptionally());
}
public static void main(String[] args) throws InterruptedException {
isCompletedExceptionallyExample();
}
}

九、CompletableFuture 设置任务结果方法示例

9.1 obtrudeValue

方法描述

方法有返回值描述
obtrudeValue       x       设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。

任务要求

执行 CompletableFuture 任务,然后调用 obtrudeValue 方法强制设置任务执行结果为指定值,无论当前任务是否执行成功/失败。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ObtrudeValueExample {
public static void obtrudeValueExample() throws ExecutionException, InterruptedException {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
// 设置或重置 get 方法和与其相关方法的返回的值
cf.obtrudeValue("示例-强制设置返回值-无论成功失败");
// 调用 get 方法进行等待,获取执行结果并输出到控制台
String result = cf.get();
System.out.println(result);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
obtrudeValueExample();
}
}

9.2 obtrudeException

方法描述

方法有返回值描述
obtrudeException       x       设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。

任务要求

执行 CompletableFuture 任务,然后调用 obtrudeException 方法强制设置任务执行结果为指定异常,无论当前任务是否执行成功/失败。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ObtrudeExceptionExample {
public static void obtrudeExceptionExample() throws ExecutionException, InterruptedException {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
// 设置 get 方法和与其相关的方法后续执行抛出指定异常
cf.obtrudeException(new Exception("未知异常"));
// 调用 get 方法进行等待,获取执行结果并输出
String result = cf.get();
System.out.println(result);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
obtrudeExceptionExample();
}
}

9.3 complete

方法描述

方法有返回值描述
complete       √       设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 complete 方法设置或重置任务执行结果,然后将最后任务执行结果输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompleteExample {
public static void completeExample() throws ExecutionException, InterruptedException {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
// 设置或重置 get 方法和与其相关方法的返回的值,任务没有执行完成返回 true,否则返回 false
boolean setResult = cf.complete("示例-任务没有完成-设置返回值");
System.out.println("设置返回值为执行结果: " + setResult);
// 调用 get 方法进行等待,获取执行结果并输出
String result = cf.get();
System.out.println(result);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
completeExample();
}
}

9.4 CompleteException

方法描述

方法有返回值描述
completeException       √       设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

任务要求

执行 CompletableFuture 任务,然后调用 complete 方法设置或重置任务执行结果为指定异常,然后将最后任务执行结果或异常输出到控制台。

执行流程

任务执行流程如下图所示:

示例代码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompleteExceptionallyExample {
public static void completeExceptionallyExample() throws ExecutionException, InterruptedException {
// 执行 CompletableFuture 任务
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
// 设置或重置 get 方法和与其相关方法的返回的值,任务没有执行完成返回 true,否则返回 false
boolean setResult = cf.completeExceptionally(new Exception("未知异常"));
System.out.println("设置返回值为执行结果: " + setResult);
// 调用 get 方法进行等待,获取执行结果并输出
String result = cf.get();
System.out.println(result);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
completeExceptionallyExample();
}
}

十、任务异常处理方法

10.1 exceptionally

方法描述

方法有返回值描述
exceptionally        x        判断上一个任务执行时是否发生异常,如果是则就会执行 exceptionally 方法,并且将上一步异常作为当前方法的参数,然后对异常进行处理。当然,如果上一阶段执行过程中没有出现异常,则不会执行 exceptionally 方法。

任务要求

执行 CompletableFuture 任务,并且使用 exceptionally 方法:

  • 如果 exceptionally 方法的上一阶段执行过程中出现异常,则会执行 exceptionally 阶段;
  • 如果 exceptionally 方法的上一阶段执行过程中没有出现异常,则不会执行 exceptionally 阶段;

执行流程

任务执行流程如下图所示:

示例代码

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ExceptionallyExample {
public static void exceptionallyExample() throws ExecutionException, InterruptedException {
// 执行 CompletableFuture 串行任务,并且使用 exceptionally 方法:
// - 如果 exceptionally 方法的上一阶段执行过程中出现异常,则会执行 exceptionally 阶段;
// - 如果 exceptionally 方法的上一阶段执行过程中没有出现异常,则不会执行 exceptionally 阶段;
CompletableFuture<String> cf = CompletableFuture
// 执行任务,50%概率发生异常,50%概率返回正常值
.supplyAsync(() -> {
if (new Random().nextInt(2) != 0) {
throw new RuntimeException("模拟发生异常");
}
return "正常结束";
})
// 处理上一步中抛出的异常
.exceptionally(Throwable::getMessage);
// 调用 get 方法进行等待,获取执行结果
String result = cf.get();
System.out.println(result);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
exceptionallyExample();
}
}
---END---
如果本文对你有帮助,可以关注我的公众号 "小豆丁技术栈" 了解最新动态,顺便也请帮忙 Github 点颗星哦,感谢~

本文作者:超级小豆丁 @ 小豆丁技术栈

本文链接:http://www.mydlq.club/article/124/

本文标题:Java 中的异步编程工具 CompletableFuture

本文版权:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!