主页 > 人工智能  > 

Java中CompletableFuture异步工具类

Java中CompletableFuture异步工具类

参考:CompletableFuture 详解 | JavaGuide

实际项目中,一个接口可能需要同时获取多种不同的数据,然后再汇总返回,举个例子:用户请求获取订单信息,可能需要同时获取用户信息、商品详情、物流信息、等数据。

如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。

考虑到这些任务之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化。

Future介绍

Future是Java5引入的接口,提供了基本的异步处理功能,但它的局限性在于只能通过 get()方法阻塞获取结果,无法链式调用多个任务,也缺 少异常处理机制。

CompletableFuture 是 Future 的增强版,提供了非阻塞的结果处理、任务组合和异常处理,使得异步编程更加灵活和强大。

在 Java 中,Future 只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

取消任务;

判断任务是否被取消;

判断任务是否已经执行完成;

获取任务执行结果。

// V 代表了Future执行的任务返回值的类型 public interface Future<V> {    // 取消任务执行    // 成功取消返回 true,否则返回 false    boolean cancel(boolean mayInterruptIfRunning);    // 判断任务是否被取消    boolean isCancelled();    // 判断任务是否已经执行完成    boolean isDone();    // 获取任务执行结果    V get() throws InterruptedException, ExecutionException;    // 指定时间内没有返回计算结果就抛出 TimeOutException 异常    V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutExceptio }

CompletableFuture是 java 8引入的一个强大的异步工具类。允许非阻塞地处理异步任务,并且可以通过链式调用组合多个异步操作。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { }

核心特性如下:

异步执行

通过runAsync和supplyAsync方法,可以异步地执行任务。

任务完成回调

使用thenApply、thenAccept、thenRun等方法,可以在任务完成后执行回调。

任务组合

可以将多个 CompletableFuture 组合在一起,通过 thenCombine、thenCompose 等方法,处理多个异步任务之间的依赖关系。

异常处理

提供了exceptionally、handle等方法,可以在异步任务发生异常时进行处理。

并行处理

可以通过allOf和anyOf方法,并行地执行多个异步任务,并在所有任务完成或任意一个任务完成时执行回调。

扩展

创建异步任务

通过 new 关键字

CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>(); //获取异步计算的结果,调用 get() 方法的线程会 阻塞 直到 CompletableFuture 完成运算 rpcResponse = completableFuture.get();   // complete() 方法只能调用一次,后续调用将被忽略。 resultFuture plete(rpcResponse); ​ //如果你已经知道计算的结果的话,可以使用静态方法 completedFuture() 来创建 CompletableFuture CompletableFuture<String> future = CompletableFuture pletedFuture("hello!"); assertEquals("hello!", future.get());

runAsync: 创建异步任务,不返回结果

supplyAsync: 创建异步任务并返回结果

CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {  //前面的这个future1不是用来接收数据的,仅仅表示完成状态    // 异步任务 }); ​ //future2接收异步执行的结果,即里面返回出来的字符串,后续可以利用thenAccept等方法处理这个结果 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {    // 异步任务并返回结果    return "Hello, nihao !"; }); 任务完成回调

thenApply: 在任务完成后可以对任务结果进行转换返回

thenAccept: 在任务完成后对结果进行消费,但不返回新结果

thenRun: 在任务完成后执行一个操作,但不需要使用任务结果

// 转换任务结果 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")   .thenApply(result -> result + " nihao ");     // 消费任务结果,不返回新结果 CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")   .thenAccept(result -> System.out.println(result));     // 不消费任务结果,也不返回新结果 CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello") .thenRun(() -> System.out.println("Task finished")); 任务组合

thenCombine: 合并两个CompletableFuture 的结果

thenCompose: 将一个CompletableFuture 的结果作为另一个 CompletableFuture 的输入。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");   //1 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "nihao");   //2 ​ //future1.thenComebine(fu2,...) CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2); ​ //将前者的result 传给 后者 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")   .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " nihao")); 异常处理

exceptionally: 在任务发生异常时提供默认值。

handle: 在任务完成或发生异常时进行处理。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {    if (true) {        throw new RuntimeException("Exception");   }    return "Hello"; }).exceptionally(ex -> "nihao");     //提供发生异常时的默认值 ​ ​ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {    if (true) {        throw new RuntimeException("Exception");   }    return "Hello"; }).handle((result, ex) -> {       //提供发生异常后的处理    if (ex != null) {        return "Default Value";   }    return result; }); 并行处理

allOf: 等待多个CompletableFuture 全部完成

anyOf: 任意一个 CompletableFuture 完成时执行操作

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2); allFutures.thenRun(() -> System.out.println("nihao tasks finished"));    //全部完成在执行 ​ ​ CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2); anyFuture.thenAccept(result -> System.out.println("nihao task finished with result: " + result));  //任意一个完成即可执行 使用建议

CompletableFuture 默认使用全局共享的 ForkJoinPool monPool() 作为执行器,所以应用程序、多个库或框架(如 Spring、第三方库)若都依赖 CompletableFuture,默认情况下它们都会共享同一个线程池。

为避免这些问题,建议为 CompletableFuture 提供自定义线程池,根据任务特性调整线程池大小和队列类型,也更好地处理线程中的异常情况

CompletableFuture的get()方法是阻塞的,尽量避免使用。如果必须要使用的话,需要添加超时时间,否则可能会导致主线程一直等待,无法执行其他任务。

标签:

Java中CompletableFuture异步工具类由讯客互联人工智能栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Java中CompletableFuture异步工具类