创建线程方式5-CompletableFuture
在它出现之前,我们有 Future。但 Future 有一个巨大的痛点:
- 阻塞性:
Future.get()
方法是阻塞的。一旦调用,你的线程就会被卡住,直到任务完成。这极大地浪费了线程资源。 - 功能单一:你只能获取结果或取消任务,无法在一个任务完成后自动触发另一个任务,也无法组合多个任务。
CompletableFuture
解决了这些问题,它引入了回调(Callback) 机制,实现了非阻塞的、响应式的编程模型。
创建异步任务
runAsync(Runnable runnable)
: 运行一个没有返回值的异步任务。supplyAsync(Supplier<U> supplier)
: 运行一个有返回值的异步任务。Supplier
就是一个不接受参数但返回一个值的方法。
任务完成后的回调(链式调用)
thenApply(Function<T, U> fn)
: 接收上一个任务的结果作为输入,经过处理后,返回一个新的结果。T -> U
thenAccept(Consumer<T> action)
: 接收上一个任务的结果作为输入,执行一个动作,但没有返回值。T -> Void
thenRun(Runnable action)
: 不关心上一个任务的结果,只要它完成了,就执行指定的动作。Void -> Void
thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
: 当两个并行的异步任务都完成时,将它们的结果合并处理。allOf(CompletableFuture<?>... cfs)
: 当你需要等待多个异步任务全部执行完毕时使用。allOf
本身返回CompletableFuture<Void>
。
异常处理
exceptionally(Function<Throwable, T> fn)
: 类似于try-catch
中的catch
块。如果任务链中任何一步出现异常,它会捕获异常并提供一个默认的返回结果。handle(BiFunction<T, Throwable, U> fn)
: 类似于try-catch-finally
。无论成功还是失败,它都会被执行。你可以根据是否有异常来决定返回什么。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class FiveTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("主线程启动");
// 创建一个 CompletableFuture 对象,实现无返回结果
// CompletableFuture.runAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(1);
// System.out.println("1231231");
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// });
// 创建一个 CompletableFuture 获取结果
// CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello world");
// System.out.println(future1.get());
// 实现一个复杂的 CompletableFuture
// CompletableFuture.supplyAsync(() -> {
// int a = 0;
// for (int i = 0; i < 10; i++) {
// a++;
// }
// return a;
// }).thenApply(f -> f + 1).thenAccept(f -> {
// try {
// TimeUnit.SECONDS.sleep(10);
// System.out.println("f = " + f);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }).thenRun(() -> {
// System.out.println("任务执行完毕");
// });
// 组合多个CompletableFuture
// CompletableFuture<Double> oneFuture = CompletableFuture.supplyAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (Exception e) {
// }
// return 100.0;
// });
// CompletableFuture<Double> twoFuture = CompletableFuture.supplyAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(1);
// } catch (Exception e) {
// }
// return 0.85;
// });
// // 使用 thenCombine 组合两个CompletableFuture
// CompletableFuture<Double> finalPriceFuture = oneFuture.thenCombine(
// twoFuture,
// (one, two) -> {
// return one + two;
// }
// );
// System.out.println(finalPriceFuture.get()); // 输出: 最终价格是: 85.0
// 等待所有任务完成
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) {} return "用户数据"; });
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) {} return "新闻列表"; });
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) {} return "天气预报"; });
CompletableFuture<Void> allFutures = CompletableFuture.allOf(f1, f2, f3);
allFutures.join();
System.out.println("所有并行任务都已完成!");
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
// 这里延时了很久,如果执行完了自己结束进程
TimeUnit.SECONDS.sleep(100);
System.out.println("主线程结束");
}
}