Kiến Trúc Và Nguyên Lý Hoạt Động Của AQS Trong Java

Giới thiệu về AbstractQueuedSynchronizer

AbstractQueuedSynchronizer, thường được gọi tắt là AQS, đóng vai trò là nền tảng cốt lõi trong gói thư viện java.util.concurrent.locks. Lớp trừu tượng này cung cấp một khung làm việc thống nhất để xây dựng các công cụ đồng bộ hóa dựa trên hàng đợi như Semaphores, CountDownLatches và các biến thể của khóa. Việc quản lý trạng thái đồng bộ và cơ chế xếp hàng chờ được thực hiện thông qua một cấu trúc nội tại.

Các đặc trưng nổi bật của AQS

  • Quản lý trạng thái: Một biến số kiểu nguyên (int) được sử dụng để lưu trữ mức độ đồng bộ hiện tại. Các phương thức như getState(), setState(int) cùng thao tác compareAndSetState đảm bảo tính toàn vẹn của dữ liệu này dưới dạng nguyên tử.
  • Hàng đợi chờ sắp xếp: Cấu trúc nội bộ duy trì một danh sách liên kết theo nguyên tắc vào trước ra trước (FIFO). Những luồng không thể giành được tài nguyên sẽ bị chặn và đưa vào đuôi của hàng đợi này.
  • Chế độ hoạt động linh hoạt: Hỗ trợ cả cơ chế chiếm hữu (mỗi lần chỉ một luồng có thể hoạt động) và chia sẻ (nhiều luồng có thể truy cập đồng thời).
  • Bổ sung điều kiện chờ: Thông qua lớp ConditionObject, AQS cho phép tạo ra các tín hiệu điều kiện, giúp luồng tạm dừng khi chưa đáp ứng điều kiện mong muốn.

Cấu trúc dữ liệu bên trong

Để quản lý luồng, AQS sử dụng hai con trỏ chính là headtail đại diện cho đầu và cuối của hàng đợi chờ, cộng với biến synchronizationState để theo dõi trạng thái tài nguyên.

// Định nghĩa mẫu các trường quan trọng trong AQS
protected abstract class SyncSupport extends OwnableSynchronizer {
    
    private static final long serialVersionUID = 6098231234567890123L;

    // Tham chiếu tới nút đầu tiên trong hàng đợi
    private transient volatile WaitingNode queueHead;

    // Tham chiếu tới nút cuối cùng trong hàng đợi
    private transient volatile WaitingNode queueTail;

    // Trạng thái khóa hiện tại
    private volatile int synchronizationState;

    // Lấy trạng thái hiện hành
    protected final int getState() {
        return synchronizationState;
    }

    // Cập nhật trạng thái mới
    protected final void setState(int updatedState) {
        synchronizationState = updatedState;
    }

    // Thao tác thay đổi trạng thái bằng CAS
    protected final boolean attemptStateChange(int expected, int newValue) {
        return unsafe.compareAndSwapInt(this, stateOffset, expected, newValue);
    }
}

Cấu trúc của lớp Nút chờ (Node)

Mỗi phần tử trong hàng đợi là một đối tượng Node chứa thông tin luồng chờ và trạng thái của nó.

static class WaitingNode {
    // Các hằng số trạng thái
    static final WaitingNode SHARED_MODE = new WaitingNode();
    static final WaitingNode EXCLUSIVE_MODE = null;

    static final int STATUS_CANCELLED = 1;
    static final int STATUS_SIGNAL = -1;
    static final int STATUS_CONDITION = -2;
    static final int STATUS_PROPAGATE = -3;

    volatile int currentStatus;       // Trạng thái chờ
    volatile WaitingNode previous;    // Nút liền trước
    volatile WaitingNode next;        // Nút liền sau
    volatile Thread runner;           // Luồng đang chạy
    WaitingNode nextWaiter;           // Chờ kế tiếp

    // Khởi tạo với luồng và chế độ
    WaitingNode(Thread executingThread, WaitingNode waitMode) {
        this.nextWaiter = waitMode;
        this.runner = executingThread;
    }

    boolean isSharedMode() {
        return nextWaiter == SHARED_MODE;
    }
}

Hàm xử lý cốt lõi: acquire

Phương thức acquire(int argument) định nghĩa cách một luồng cố gắng chiếm giữ tài nguyên theo chế độ độc quyền.

public final void acquire(int argument) {
    // Bước 1: Thử chiếm ngay lập tức
    if (!tryObtainLock(argument)) {
        // Bước 2: Nếu thất bại, đưa luồng vào hàng đợi
        WaitingNode node = enqueueWaitingNode(EXCLUSIVE_MODE);
        
        // Bước 3: Chờ trong hàng đợi cho đến khi thành công
        if (spinWhileLocked(node, argument)) {
            // Bước 4: Đánh dấu lại cờ ngắt nếu cần
            restoreInterruptFlag();
        }
    }
}

Trong đó, tryObtainLock là phương thức trừu tượng mà các lớp con phải triển khai để xác định logic cạnh tranh cụ thể.

Quá trình thêm luồng vào hàng đợi

Vị trí addWaitingNode chịu trách nhiệm nối mới luồng vào đuôi hàng đợi một cách an toàn.

private WaitingNode enqueueWaitingNode(WaitingNode mode) {
    // Tạo nút mới cho luồng hiện tại
    WaitingNode currentNode = new WaitingNode(Thread.currentThread(), mode);
    
    // Kiểm tra nút đuôi hiện tại
    WaitingNode lastNode = queueTail;
    
    if (lastNode != null) {
        // Thiết lập liên kết ngược
        currentNode.previous = lastNode;
        
        // Cố gắng cập nhật đuôi bằng CAS
        if (compareAndSetQueueTail(lastNode, currentNode)) {
            lastNode.next = currentNode;
            return currentNode;
        }
    }
    
    // Nếu rơi vào điều kiện đồng bộ phức tạp, chuyển sang phương thức enq chuẩn
    return stableEnqueue(currentNode);
}

Cơ chế xoay vòng chờ (Spinning)

Hàm spinWhileLocked kiểm soát việc luồng tự do hay bị treo khi không giành được tài nguyên.

final boolean spinWhileLocked(WaitingNode waitingNode, int requestValue) {
    boolean lockSuccess = true;
    boolean wasInterrupted = false;
    
    try {
        for (;;) {
            WaitingNode predecessor = waitingNode.predecessor();
            
            // Nếu tiền nhân là head và có thể lấy lock
            if (predecessor == queueHead && tryObtainLock(requestValue)) {
                setQueueHead(waitingNode);
                predecessor.next = null; // Giải phóng GC
                lockSuccess = false;
                return wasInterrupted;
            }
            
            // Quyết định xem có nên pause luồng hay không
            if (checkIfShouldPark(predecessor, waitingNode) && 
                parkThreadAndCheck()) {
                wasInterrupted = true;
            }
        }
    } finally {
        if (lockSuccess) {
            cancelLockRequest(waitingNode);
        }
    }
}

Hàm kiểm tra trạng thái chờ:

private static boolean checkIfShouldPark(WaitingNode pred, WaitingNode curr) {
    int status = pred.currentStatus;
    if (status == WaitingNode.STATUS_SIGNAL)
        return true;
    // Xử lý tình huống nút tiền nhân đã hủy bỏ
    else if (status > 0) {
        do {
            curr.previous = pred = pred.previous;
        } while (pred != null && pred.currentStatus > 0);
        pred.next = curr;
    } else {
        compareAndSetWaitStatus(pred, status, WaitingNode.STATUS_SIGNAL);
    }
    return false;
}

Hàm giải phóng: release

Sau khi hoàn thành tác vụ, luồng gọi release(int argument) để trả lại tài nguyên và kích hoạt luồng tiếp theo.

public final boolean release(int argument) {
    // Thử hạ cấp trạng thái
    if (tryReleaseResource(argument)) {
        WaitingNode h = queueHead;
        if (h != null && h.currentStatus != 0) {
            wakeUpNextThread(h);
        }
        return true;
    }
    return false;
}

Chế độ Chia sẻ (Shared Mode)

Khác với chế độ độc quyền, việc chiếm hữu chung được xử lý qua acquireShared.

public final void acquireShared(int arg) {
    if (tryAcquireSharedLogic(arg) < 0) {
        // Thực hiện quy trình chờ đặc biệt cho chế độ chia sẻ
        enterSharedQueue(arg);
    }
}

Việc đánh thức trong chế độ này cũng phức tạp hơn nhằm đảm bảo nhiều luồng có thể nhận được tài nguyên cùng lúc nếu điều kiện cho phép.

Luồng xử lý tổng quát

  1. Khởi tạo trạng thái ban đầu khi tạo đối tượng đồng bộ hóa.
  2. Luồng gọi phương thức lấy khóa (acquire). Nếu tryAcquire trả về thật, nó được cấp quyền ngay.
  3. Nếu thất bại, luồng được đóng gói thành Node và ghép nối vào hàng đợi chờ FIFO.
  4. Luồng tự treo (park) cho đến khi người đứng trước nó giải phóng khóa hoặc bị ngắt.
  5. Khi nguồn lực được giải phóng (release), người tiếp theo trong hàng đợi được đánh thức và thử lại việc chiếm giữ.

Thẻ: Java Concurrency AQS locks multithreading

Đăng vào ngày 27 tháng 5 lúc 17:16