在它出现之前,我们有 Future。但 Future 有一个巨大的痛点:

  • 阻塞性Future.get() 方法是阻塞的。一旦调用,你的线程就会被卡住,直到任务完成。这极大地浪费了线程资源。
  • 功能单一:你只能获取结果或取消任务,无法在一个任务完成后自动触发另一个任务,也无法组合多个任务。

CompletableFuture 解决了这些问题,它引入了回调(Callback) 机制,实现了非阻塞的、响应式的编程模型

创建异步任务

  • runAsync(Runnable runnable) : 运行一个没有返回值的异步任务。
  • supplyAsync(Supplier<U> supplier) : 运行一个有返回值的异步任务。Supplier 就是一个不接受参数但返回一个值的方法。

任务完成后的回调(链式调用)

  • thenApply(Function<T, U> fn) : 接收上一个任务的结果作为输入,经过处理后,返回一个新的结果。T -> U
  • thenAccept(Consumer<T> action) : 接收上一个任务的结果作为输入,执行一个动作,但没有返回值。T -> Void
  • thenRun(Runnable action) : 不关心上一个任务的结果,只要它完成了,就执行指定的动作。Void -> Void
  • thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) : 当两个并行的异步任务都完成时,将它们的结果合并处理。
  • allOf(CompletableFuture<?>... cfs) : 当你需要等待多个异步任务全部执行完毕时使用。allOf 本身返回 CompletableFuture<Void>

异常处理

  • exceptionally(Function<Throwable, T> fn) : 类似于 try-catch 中的 catch 块。如果任务链中任何一步出现异常,它会捕获异常并提供一个默认的返回结果。
  • handle(BiFunction<T, Throwable, U> fn) : 类似于 try-catch-finally。无论成功还是失败,它都会被执行。你可以根据是否有异常来决定返回什么。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class FiveTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println("主线程启动");
        // 创建一个 CompletableFuture 对象,实现无返回结果
//        CompletableFuture.runAsync(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(1);
//                System.out.println("1231231");
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            }
//
//        });

        // 创建一个 CompletableFuture 获取结果
//        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello world");
//        System.out.println(future1.get());


        // 实现一个复杂的 CompletableFuture
//        CompletableFuture.supplyAsync(() -> {
//            int a = 0;
//            for (int i = 0; i < 10; i++) {
//                a++;
//            }
//            return a;
//        }).thenApply(f -> f + 1).thenAccept(f -> {
//            try {
//                TimeUnit.SECONDS.sleep(10);
//                System.out.println("f = " + f);
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            }
//        }).thenRun(() -> {
//            System.out.println("任务执行完毕");
//        });


        // 组合多个CompletableFuture

//        CompletableFuture<Double> oneFuture = CompletableFuture.supplyAsync(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            } catch (Exception e) {
//            }
//            return 100.0;
//        });
//        CompletableFuture<Double> twoFuture = CompletableFuture.supplyAsync(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(1);
//            } catch (Exception e) {
//            }
//            return 0.85;
//        });
//        // 使用 thenCombine 组合两个CompletableFuture
//        CompletableFuture<Double> finalPriceFuture = oneFuture.thenCombine(
//                twoFuture,
//                (one, two) -> {
//                    return one + two;
//                }
//        );
//        System.out.println(finalPriceFuture.get()); // 输出: 最终价格是: 85.0

        // 等待所有任务完成
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) {} return "用户数据"; });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) {} return "新闻列表"; });
        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) {} return "天气预报"; });
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(f1, f2, f3);

        allFutures.join();
        System.out.println("所有并行任务都已完成!");
        System.out.println(f1.get());
        System.out.println(f2.get());
        System.out.println(f3.get());

        // 这里延时了很久,如果执行完了自己结束进程
        TimeUnit.SECONDS.sleep(100);
        System.out.println("主线程结束");
    }
}

标签: java

添加新评论