Hoàn Thành Bất Đồng Bộ với CompletableFuture trong Java
Java 8 đã giới thiệu lớp CompletableFuture, lớp này thực hiện CompletionStage và Future interface, giúp đơn giản hóa khả năng lập trình bất đồng bộ trong Java. Lớp này có nhiều phương thức, nhưng bản chất chỉ là một cách tiếp cận: thực thi "hàm callback" sau khi tác vụ hoàn thành.
Tạo và Sử dụng CompletableFuture
CompletableFuture cung cấp hỗ trợ cho tính toán bất đồng bộ, có thể xử lý kết quả tính toán thông qua callback. CompletableFuture thực hiện CompletionStage và Future interface, vì vậy vẫn có thể sử dụng như Future trước đây, mặc dù cách này không còn được khuyến khích.
Tạo CompletableFuture
// Tạo CompletableFuture với kết quả đã có
CompletableFuture<String> future = CompletableFuture.completedFuture("kết quả");
future.get();
// Mặc định CompletableFuture không có kết quả, gọi future.get() sẽ bị chặn cho đến khi có kết quả hoặc ngoại lệ
future = new CompletableFuture<>();
try {
future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
// không quan tâm
}
// Điền kết quả vào future
future.complete("kết quả");
assert "kết quả".equals(future.get());
// Điền ngoại lệ vào future
future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("ngoại lệ"));
try {
future.get();
} catch (Exception e) {
assert "ngoại lệ".equals(e.getCause().getMessage());
}
Bên cạnh việc tạo trực tiếp CompletableFuture, chúng ta có thể sử dụng 4 phương thức sau để tạo đối tượng:
// runAsync là Runnable task, không trả về giá trị
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// supplyAsync là tác vụ bất đồng bộ có kết quả trả về
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
// Ví dụ sử dụng
CompletableFuture.runAsync(() -> {
System.out.println("xin chào thế giới");
}, executor);
CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
});
Nếu không có executor, sẽ sử dụng ForkJoinPool.commonPool() làm pool thread cho tác vụ bất đồng bộ; ngược lại sẽ sử dụng executor.
Hoàn thành hành động CompletableFuture
public CompletableFuture<T> whenComplete(BiConsumer super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function fn)
// Ví dụ sử dụng
CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
}).whenCompleteAsync((result, e) -> {
System.out.println(result + " " + e);
}).exceptionally((e) -> {
System.out.println("ngoại lệ " + e);
return "ngoại lệ";
});
Phương thức whenComplete sẽ thực thi action trực tiếp trong thread hiện tại sau khi tác vụ hoàn thành, các phương thức có hậu tố Async sẽ giao action cho thread khác thực thi. exceptionally sẽ thực thi khi có ngoại lệ.
Chuyển đổi kết quả CompletableFuture
Phương thức handle cho phép chuyển đổi kiểu trả về của CompletableFuture:
public <U> CompletableFuture<U> handle(BiFunction super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction super T,Throwable,? extends U> fn, Executor executor)
// Ví dụ handle:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
});
CompletableFuture<Integer> f2 = f1.handle((r, e) -> {
System.out.println("xử lý");
return 1;
});
Phương thức thenApply chỉ xử lý kết quả bình thường, khi có ngoại lệ sẽ ném ra:
public <U> CompletableFuture<U> thenApply(Function super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function super T,? extends U> fn, Executor executor)
// Ví dụ thenApply:
CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
}).thenApply((r) -> {
System.out.println(r);
return "aaa";
}).thenApply((r) -> {
System.out.println(r);
return 1;
});
Tác vụ tiêu thụ kết quả
Để thực thi các hành động tiêu thụ mà không trả về CompletableFuture mới, sử dụng thenAccept:
public CompletableFuture<Void> thenAccept(Consumer super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer super T> action, Executor executor)
// Ví dụ thenAccept:
CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
}).thenAccept(r -> {
System.out.println(r);
}).thenAccept(r -> {
// r ở đây là Void (null)
System.out.println(r);
});
Hoạt động trên nhiều CompletableFuture
Để thực hiện các thao tác trên kết quả của hai CompletableFuture, sử dụng thenAcceptBoth:
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage extends U> other, BiConsumer super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage extends U> other, BiConsumer super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage extends U> other, BiConsumer super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage> other, Runnable action)
// Ví dụ thenAcceptBoth:
CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
}).thenAcceptBoth(CompletableFuture.completedFuture("kết quả2"), (r1, r2) -> {
System.out.println(r1 + "-" + r2);
});
Phương thức thenCombine cho phép kết hợp hai CompletableFuture và trả về kết quả mới:
public CompletableFuture<V> thenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
public CompletableFuture<V> thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
public CompletableFuture<V> thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn, Executor executor)
// Ví dụ thenCombine:
CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
}).thenCombine(CompletableFuture.completedFuture("kết quả2"), (r1, r2) -> {
System.out.println(r1 + "-" + r2);
return r1 + "-" + r2;
});
Thực thi khi một trong nhiều CompletableFuture hoàn thành
public CompletableFuture<Void> acceptEither(CompletionStage extends T> other, Consumer super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage extends T> other, Consumer super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage extends T> other, Consumer super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage extends T> other, Function super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage extends T> other, Function super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage extends T> other, Function super T,U> fn, Executor executor)
Hoàn thành tất cả hoặc bất kỳ CompletableFuture
public static CompletableFuture<Void> allOf(CompletableFuture>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture>... cfs)
Thực thi Runnable sau khi hoàn thành
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
Chuỗi CompletableFuture
Phương thức thenCompose cho phép trả về một CompletableFuture khác:
public <U> CompletableFuture<U> thenCompose(Function super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function super T,? extends CompletionStage<U>> fn, Executor executor)
// Ví dụ thenCompose:
CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
}).thenCompose(r -> {
System.out.println(r);
return CompletableFuture.supplyAsync(() -> {
System.out.println(r + " kết quả2");
return r + " kết quả2";
});
});
Cơ chế triển khai CompletableFuture
Cơ chế triển khai CompletableFuture tương tự như "sự kiện sau khi thực thi" trong ThreadPoolExecutor:
public class ThreadPoolExecutor extends AbstractExecutorService {
// ...
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
// ...
}
Trong CompletableFuture, cơ chế này được thực hiện thông qua callback:
CompletableFuture.supplyAsync(() -> {
System.out.println("xin chào thế giới");
return "kết quả";
}).thenApply(r -> {
System.out.println(r);
return r;
});
Quá trình triển khai CompletableFuture bao gồm 3 bước chính:
- Thực thi tác vụ
- Thêm hành động hoàn thành (callback)
- Thực thi callback
1. Thực thi tác vụ
Logic chính của việc thực thi tác vụ nằm trong phương thức AsyncSupply.run:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
2. Thêm callback
Quá trình thêm callback bắt đầu từ thenApply:
public <U> CompletableFuture<U> thenApply(
Function super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply c = new UniApply(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
3. Thực thi callback
Callback được thực thi từ CompletableFuture.postComplete:
final void postComplete() {
CompletableFuture> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null;
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
Lập trình bất đồng bộ
Bản chất của lập trình bất đồng bộ là callback. Người dùng cung cấp một phương thức callback, không được gọi trực tiếp bởi bên triển khai mà bởi một bên khác khi có sự kiện hoặc điều kiện xảy ra. CompletableFuture thực hiện điều này bằng cách gọi callback sau khi tác vụ hoàn thành.
Lập trình bất đồng bộ được áp dụng rộng rãi trong nhiều ngôn ngữ và framework. Ví dụ:
- Node.js sử dụng callback cho lập trình bất đồng bộ
- Netty mở rộng Future interface với addListener
- Dubbo sử dụng callback cho xử lý response RPC
- Google Guava cung cấp ListenableFuture, SettableFuture
final String name = ...;
inFlight.add(name);
ListenableFuture<Result> future = service.query(name);
future.addListener(new Runnable() {
public void run() {
processedCount.incrementAndGet();
inFlight.remove(name);
lastProcessed.set(name);
logger.info("Đã xử lý {0}", name);
}
}, executor);