在异步编程中,CompletableFuture是处理并发任务的强大工具。然而,当面临需要严格顺序执行的异步任务链,并且需要收集每个任务的结果时,可能会遇到一些挑战。例如,业务场景可能要求前一个任务完成后,后一个任务才能开始,同时我们希望将所有任务的计算结果汇总到一个集合中。
考虑一个耗时的业务处理函数,它返回一个CompletionStage
import java.time.LocalDateTime; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class SequentialTaskProcessor { private CompletionStage<Integer> process(int a) { return CompletableFuture.supplyAsync(() -> { System.err.printf("%s dispatch %d\n", LocalDateTime.now(), a); // 模拟长时间运行的业务处理 try { Thread.sleep(10); // 增加延迟以观察效果 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return a + 10; }).whenCompleteAsync((e, t) -> { if (t != null) System.err.printf("!!! error processing '%d' !!!\n", a); System.err.printf("%s finish %d\n", LocalDateTime.now(), e); }); }
我们的目标是多次调用process函数,确保它们按顺序执行,并将每次的结果收集到一个List
一种直观的尝试是使用thenApplyAsync并在其内部调用process(element).toCompletableFuture().join()。
import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; // ... (process方法同上) public void firstApproach() { List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStage<List<Integer>> result = CompletableFuture.completedFuture(new ArrayList<>()); for (Integer element : arr) { result = result.thenApplyAsync((ret) -> { // 在thenApplyAsync内部阻塞等待前一个CompletableFuture完成 Integer a = process(element).toCompletableFuture().join(); ret.add(a); return ret; }); } List<Integer> computeResult = result.toCompletableFuture().join(); System.out.println("First approach results: " + computeResult); }
问题分析: 虽然这种方法能够实现顺序执行并收集结果,但它效率低下。thenApplyAsync本身会在一个线程池中执行其回调,而回调内部的process(element).toCompletableFuture().join()又会阻塞这个线程,直到process方法返回的CompletableFuture完成。这意味着一个逻辑步骤可能间接占用两个线程资源(一个用于thenApplyAsync的回调,另一个用于process内部的异步任务),造成线程资源的浪费和不必要的阻塞。观察输出日志,会发现dispatch和finish的时间戳是严格顺序的,但线程利用率不高。
另一种尝试是使用thenCombineAsync,期望它能将前一个阶段的结果与新任务的结果结合:
// ... (process方法同上) public void secondApproach() { List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStage<List<Integer>> result = CompletableFuture.completedFuture(new ArrayList<>()); for (Integer element : arr) { // process(element) 在这里被立即调用,而非等待前一个阶段完成 result = result.thenCombineAsync(process(element), (array, ret) -> { array.add(ret); return array; }); } List<Integer> computeResult = result.toCompletableFuture().join(); System.out.println("Second approach results: " + computeResult); }
问题分析: 这种方法会导致任务并发执行,而非顺序执行。thenCombineAsync的第二个参数CompletionStage other在方法调用时就会被评估并启动。这意味着在循环中,所有的process(element)调用几乎是同时发起的,它们会并发执行。观察输出日志,会发现dispatch的时间戳是交错的,这违反了顺序执行的要求。thenCombineAsync适用于两个独立的异步任务都完成后再进行合并的场景,而不是链式顺序执行的场景。
为了实现任务的顺序执行并高效地收集结果,我们需要利用CompletableFuture提供的更高级的组合方法,特别是thenCompose。
这种方法通过thenCompose确保任务顺序执行,并使用thenAccept将结果添加到循环外部维护的列表中。
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import java.util.stream.IntStream; // ... (process方法同上) public class SequentialTaskProcessor { // ... process 方法 ... public void solutionOne() { List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); // 初始化一个表示链式操作开始的CompletableFuture,其结果类型为Void CompletionStage<Void> loopStage = CompletableFuture.completedFuture(null); final List<Integer> resultList = new ArrayList<>(); // 外部结果列表 for (Integer element : arr) { loopStage = loopStage // thenCompose确保前一个阶段完成后,才执行process(element) .thenCompose(v -> process(element)) // thenAccept将process的结果添加到外部列表中,并返回CompletionStage<Void> .thenAccept(resultList::add); } // 阻塞等待所有任务完成 loopStage.toCompletableFuture().join(); System.out.println("Solution One results: " + resultList); } public static void main(String[] args) { SequentialTaskProcessor processor = new SequentialTaskProcessor(); System.out.println("--- Running Solution One ---"); processor.solutionOne(); System.out.println("\n--- Running Solution Two ---"); processor.solutionTwo(); } }
原理详解:
这种方法简洁且高效,避免了不必要的阻塞和线程浪费。
另一种方法是在CompletableFuture链中直接传递并累积结果列表。
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import java.util.stream.IntStream; // ... (process方法同上) public class SequentialTaskProcessor { // ... process 方法 ... public void solutionTwo() { List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); // 初始化一个携带空列表的CompletableFuture CompletionStage<List<Integer>> listStage = CompletableFuture.completedFuture(new ArrayList<>()); for (Integer element : arr) { listStage = listStage // thenCompose确保前一个阶段完成后,才执行process(element) .thenCompose(list -> process(element) // thenAccept将process的结果添加到当前列表 .thenAccept(list::add) // thenApply将CompletionStage<Void>转换回CompletionStage<List<Integer>> .thenApply(v -> list) ); } // 阻塞等待所有任务完成,并获取最终的列表 List<Integer> resultList = listStage.toCompletableFuture().join(); System.out.println("Solution Two results: " + resultList); } // ... main 方法 ... }
原理详解:
两种解决方案都能够有效地实现异步任务的顺序执行和结果收集,并且都避免了线程阻塞和并发执行的问题。
注意事项:
通过理解thenCompose的扁平化特性和thenAccept/thenApply的组合使用,我们可以更灵活、高效地构建复杂的异步任务流,满足各种顺序执行和结果收集的需求。
以上就是深入理解CompletableFuture:实现任务的顺序执行与结果收集的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号