Hoàn Thành Bất Đồng Bộ với CompletableFuture trong Java

Hoàn Thành Bất Đồng Bộ với CompletableFuture trong Java

Java 8 đã giới thiệu lớp CompletableFuture, lớp này thực hiện CompletionStage và Future interface, giúp đơn giản hóa khả năng lập trình bất đồng bộ trong Java. Lớp này có nhiều phương thức, nhưng bản chất chỉ là một cách tiếp cận: thực thi "hàm callback" sau khi tác vụ hoàn thành.

Tạo và Sử dụng CompletableFuture

CompletableFuture cung cấp hỗ trợ cho tính toán bất đồng bộ, có thể xử lý kết quả tính toán thông qua callback. CompletableFuture thực hiện CompletionStage và Future interface, vì vậy vẫn có thể sử dụng như Future trước đây, mặc dù cách này không còn được khuyến khích.

Tạo CompletableFuture

// Tạo CompletableFuture với kết quả đã có
CompletableFuture<String> future = CompletableFuture.completedFuture("kết quả");
future.get();

// Mặc định CompletableFuture không có kết quả, gọi future.get() sẽ bị chặn cho đến khi có kết quả hoặc ngoại lệ
future = new CompletableFuture<>();
try {
    future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
    // không quan tâm
}

// Điền kết quả vào future
future.complete("kết quả");
assert "kết quả".equals(future.get());

// Điền ngoại lệ vào future
future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("ngoại lệ"));
try {
    future.get();
} catch (Exception e) {
    assert "ngoại lệ".equals(e.getCause().getMessage());
}

Bên cạnh việc tạo trực tiếp CompletableFuture, chúng ta có thể sử dụng 4 phương thức sau để tạo đối tượng:

// runAsync là Runnable task, không trả về giá trị
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// supplyAsync là tác vụ bất đồng bộ có kết quả trả về
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

// Ví dụ sử dụng
CompletableFuture.runAsync(() -> {
    System.out.println("xin chào thế giới");
}, executor);
CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
});

Nếu không có executor, sẽ sử dụng ForkJoinPool.commonPool() làm pool thread cho tác vụ bất đồng bộ; ngược lại sẽ sử dụng executor.

Hoàn thành hành động CompletableFuture

public CompletableFuture<T> whenComplete(BiConsumer action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer action, Executor executor)
public CompletableFuture<T> exceptionally(Function fn)

// Ví dụ sử dụng
CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
}).whenCompleteAsync((result, e) -> {
    System.out.println(result + " " + e);
}).exceptionally((e) -> {
    System.out.println("ngoại lệ " + e);
    return "ngoại lệ";
});

Phương thức whenComplete sẽ thực thi action trực tiếp trong thread hiện tại sau khi tác vụ hoàn thành, các phương thức có hậu tố Async sẽ giao action cho thread khác thực thi. exceptionally sẽ thực thi khi có ngoại lệ.

Chuyển đổi kết quả CompletableFuture

Phương thức handle cho phép chuyển đổi kiểu trả về của CompletableFuture:

public <U> CompletableFuture<U> handle(BiFunction fn)
public <U> CompletableFuture<U> handleAsync(BiFunction fn)
public <U> CompletableFuture<U> handleAsync(BiFunction fn, Executor executor)

// Ví dụ handle:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
});
CompletableFuture<Integer> f2 = f1.handle((r, e) -> {
    System.out.println("xử lý");
    return 1;
});

Phương thức thenApply chỉ xử lý kết quả bình thường, khi có ngoại lệ sẽ ném ra:

public <U> CompletableFuture<U> thenApply(Function fn)
public <U> CompletableFuture<U> thenApplyAsync(Function fn)
public <U> CompletableFuture<U> thenApplyAsync(Function fn, Executor executor)

// Ví dụ thenApply:
CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
}).thenApply((r) -> {
    System.out.println(r);
    return "aaa";
}).thenApply((r) -> {
    System.out.println(r);
    return 1;
});

Tác vụ tiêu thụ kết quả

Để thực thi các hành động tiêu thụ mà không trả về CompletableFuture mới, sử dụng thenAccept:

public CompletableFuture<Void> thenAccept(Consumer action)
public CompletableFuture<Void> thenAcceptAsync(Consumer action)
public CompletableFuture<Void> thenAcceptAsync(Consumer action, Executor executor)

// Ví dụ thenAccept:
CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
}).thenAccept(r -> {
    System.out.println(r);
}).thenAccept(r -> {
    // r ở đây là Void (null)
    System.out.println(r);
});

Hoạt động trên nhiều CompletableFuture

Để thực hiện các thao tác trên kết quả của hai CompletableFuture, sử dụng thenAcceptBoth:

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage other, BiConsumer action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage other, BiConsumer action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage other, Runnable action)

// Ví dụ thenAcceptBoth:
CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
}).thenAcceptBoth(CompletableFuture.completedFuture("kết quả2"), (r1, r2) -> {
    System.out.println(r1 + "-" + r2);
});

Phương thức thenCombine cho phép kết hợp hai CompletableFuture và trả về kết quả mới:

public  CompletableFuture<V> thenCombine(CompletionStage other, BiFunction fn)
public  CompletableFuture<V> thenCombineAsync(CompletionStage other, BiFunction fn)
public  CompletableFuture<V> thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor)

// Ví dụ thenCombine:
CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
}).thenCombine(CompletableFuture.completedFuture("kết quả2"), (r1, r2) -> {
    System.out.println(r1 + "-" + r2);
    return r1 + "-" + r2;
});

Thực thi khi một trong nhiều CompletableFuture hoàn thành

public CompletableFuture<Void> acceptEither(CompletionStage other, Consumer action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage other, Consumer action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage other, Consumer action, Executor executor)

public <U> CompletableFuture<U> applyToEither(CompletionStage other, Function fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage other, Function fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage other, Function fn, Executor executor)

Hoàn thành tất cả hoặc bất kỳ CompletableFuture

public static CompletableFuture<Void> allOf(CompletableFuture... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture... cfs)

Thực thi Runnable sau khi hoàn thành

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

Chuỗi CompletableFuture

Phương thức thenCompose cho phép trả về một CompletableFuture khác:

public <U> CompletableFuture<U> thenCompose(Function fn)
public <U> CompletableFuture<U> thenComposeAsync(Function fn)
public <U> CompletableFuture<U> thenComposeAsync(Function fn, Executor executor)

// Ví dụ thenCompose:
CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
}).thenCompose(r -> {
    System.out.println(r);
    return CompletableFuture.supplyAsync(() -> {
        System.out.println(r + " kết quả2");
        return r + " kết quả2";
    });
});

Cơ chế triển khai CompletableFuture

Cơ chế triển khai CompletableFuture tương tự như "sự kiện sau khi thực thi" trong ThreadPoolExecutor:

public class ThreadPoolExecutor extends AbstractExecutorService {
    // ...
    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    // ...
}

Trong CompletableFuture, cơ chế này được thực hiện thông qua callback:

CompletableFuture.supplyAsync(() -> {
    System.out.println("xin chào thế giới");
    return "kết quả";
}).thenApply(r -> {
    System.out.println(r);
    return r;
});

Quá trình triển khai CompletableFuture bao gồm 3 bước chính:

  1. Thực thi tác vụ
  2. Thêm hành động hoàn thành (callback)
  3. Thực thi callback

1. Thực thi tác vụ

Logic chính của việc thực thi tác vụ nằm trong phương thức AsyncSupply.run:

public void run() {
    CompletableFuture<T> d; Supplier<T> f;
    if ((d = dep) != null && (f = fn) != null) {
        dep = null; fn = null;
        if (d.result == null) {
            try {
                d.completeValue(f.get());
            } catch (Throwable ex) {
                d.completeThrowable(ex);
            }
        }
        d.postComplete();
    }
}

2. Thêm callback

Quá trình thêm callback bắt đầu từ thenApply:

public <U> CompletableFuture<U> thenApply(
    Function fn) {
    return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d = new CompletableFuture<V>();
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply c = new UniApply(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

3. Thực thi callback

Callback được thực thi từ CompletableFuture.postComplete:

final void postComplete() {
    CompletableFuture f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture d; Completion t;
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

Lập trình bất đồng bộ

Bản chất của lập trình bất đồng bộ là callback. Người dùng cung cấp một phương thức callback, không được gọi trực tiếp bởi bên triển khai mà bởi một bên khác khi có sự kiện hoặc điều kiện xảy ra. CompletableFuture thực hiện điều này bằng cách gọi callback sau khi tác vụ hoàn thành.

Lập trình bất đồng bộ được áp dụng rộng rãi trong nhiều ngôn ngữ và framework. Ví dụ:

  • Node.js sử dụng callback cho lập trình bất đồng bộ
  • Netty mở rộng Future interface với addListener
  • Dubbo sử dụng callback cho xử lý response RPC
  • Google Guava cung cấp ListenableFuture, SettableFuture
final String name = ...;
inFlight.add(name);
ListenableFuture<Result> future = service.query(name);
future.addListener(new Runnable() {
  public void run() {
    processedCount.incrementAndGet();
    inFlight.remove(name);
    lastProcessed.set(name);
    logger.info("Đã xử lý {0}", name);
  }
}, executor);

Thẻ: Java completablefuture lập trình bất đồng bộ future CompletionStage

Đăng vào ngày 28 tháng 6 lúc 07:53