Giới thiệu về Giao diện Future trong Java Concurrency

Trong mô hình đơn luồng, việc lấy kết quả từ phương thức thực thi rất đơn giản, chỉ cần nhận trực tiếp kết quả trả về. Tuy nhiên, trong môi trường đa luồng, làm thế nào để thu thập kết quả từ các luồng khác? Ví dụ, nếu Luồng A khởi tạo Luồng B và Luồng C, làm sao Luồng A có thể lấy kết quả của chúng? Liệu Luồng A sẽ bị chặn lại chờ đợi hay tiếp tục thực thi không bị gián đoạn? Và khi nào nên thu thập kết quả từ Luồng B và Luồng C? Đây là những vấn đề cần giải quyết, và gói JUC cung cấp giao diện Future để xử lý.

Giao diện Callable

Để thu thập kết quả bất đồng bộ, không thể thiếu giao diện Callable. Ngoài cách tạo luồng bằng cách kế thừa lớp Thread hoặc triển khai Runnable, còn một cách khác là triển khai Callable để trả về kết quả hoạt động.

@FunctionalInterface
public interface Callable<T> {
    /**
     * Trả về kết quả hoặc ném ra ngoại lệ
     */
    T call() throws Exception;
}

Giao diện Callable cho phép trả về kết quả, nhưng cần một đối tượng để lưu trữ kết quả này, đó chính là đối tượng Future.

Giao diện Future

Future dùng để thu thập kết quả từ hoạt động bất đồng bộ. Sau khi hoàn thành hoạt động, kết quả được lưu trữ trong Future và có thể truy cập thông qua phương thức get(). Định nghĩa của giao diện Future như sau:

public interface Future<T> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    T get() throws InterruptedException, ExecutionException;
    T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Lớp FutureTask

FutureTask là lớp triển khai của Future và cũng triển khai Runnable. Hàm tạo của nó yêu cầu tham số là kiểu Callable, vì vậy nhiệm vụ của FutureTask là tạo luồng thực hiện nội dung của Callable và thu thập kết quả.

Ví dụ sử dụng

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            Thread.sleep(3000L);
            System.out.println("Kết quả đã sẵn sàng");
            return "kết quả";
        }
    });
    System.out.println("Đang chờ kết quả...");
    futureTask.run();
    String result = futureTask.get();
    System.out.println("Kết quả: " + result);
}

Quá trình sử dụng khá đơn giản: đầu tiên tạo một đối tượng FutureTask với Callable, sau đó gọi phương thức run để thực thi Callable, cuối cùng gọi phương thức get để chờ kết quả.

Nguyên lý thực hiện

FutureTask triển khai cả Runnable và Future, do đó cần triển khai 5 phương thức của Future và phương thức run của Runnable. Dưới đây là phân tích về phương thức run và get.

private volatile int state;
private Callable<T> callable;
private volatile Thread runner;

public FutureTask(Callable<T> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;
}

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<T> c = callable;
        if (c != null && state == NEW) {
            T result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Phương thức run gán giá trị runner cho đối tượng FutureTask, thực thi phương thức call của Callable, và lưu trữ kết quả thông qua phương thức set.

protected void set(T v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        finishCompletion();
    }
}

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;
}

Phương thức get kiểm tra trạng thái và thu thập kết quả nếu đã hoàn thành, hoặc chờ đợi nếu chưa.

public T get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private T report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (T)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

Thẻ: Java Callable future FutureTask Concurrency

Đăng vào ngày 29 tháng 5 lúc 10:54