Giới thiệu về AbstractQueuedSynchronizer
AbstractQueuedSynchronizer, thường được gọi tắt là AQS, đóng vai trò là nền tảng cốt lõi trong gói thư viện java.util.concurrent.locks. Lớp trừu tượng này cung cấp một khung làm việc thống nhất để xây dựng các công cụ đồng bộ hóa dựa trên hàng đợi như Semaphores, CountDownLatches và các biến thể của khóa. Việc quản lý trạng thái đồng bộ và cơ chế xếp hàng chờ được thực hiện thông qua một cấu trúc nội tại.
Các đặc trưng nổi bật của AQS
- Quản lý trạng thái: Một biến số kiểu nguyên (
int) được sử dụng để lưu trữ mức độ đồng bộ hiện tại. Các phương thức nhưgetState(),setState(int)cùng thao táccompareAndSetStateđảm bảo tính toàn vẹn của dữ liệu này dưới dạng nguyên tử. - Hàng đợi chờ sắp xếp: Cấu trúc nội bộ duy trì một danh sách liên kết theo nguyên tắc vào trước ra trước (FIFO). Những luồng không thể giành được tài nguyên sẽ bị chặn và đưa vào đuôi của hàng đợi này.
- Chế độ hoạt động linh hoạt: Hỗ trợ cả cơ chế chiếm hữu (mỗi lần chỉ một luồng có thể hoạt động) và chia sẻ (nhiều luồng có thể truy cập đồng thời).
- Bổ sung điều kiện chờ: Thông qua lớp
ConditionObject, AQS cho phép tạo ra các tín hiệu điều kiện, giúp luồng tạm dừng khi chưa đáp ứng điều kiện mong muốn.
Cấu trúc dữ liệu bên trong
Để quản lý luồng, AQS sử dụng hai con trỏ chính là head và tail đại diện cho đầu và cuối của hàng đợi chờ, cộng với biến synchronizationState để theo dõi trạng thái tài nguyên.
// Định nghĩa mẫu các trường quan trọng trong AQS
protected abstract class SyncSupport extends OwnableSynchronizer {
private static final long serialVersionUID = 6098231234567890123L;
// Tham chiếu tới nút đầu tiên trong hàng đợi
private transient volatile WaitingNode queueHead;
// Tham chiếu tới nút cuối cùng trong hàng đợi
private transient volatile WaitingNode queueTail;
// Trạng thái khóa hiện tại
private volatile int synchronizationState;
// Lấy trạng thái hiện hành
protected final int getState() {
return synchronizationState;
}
// Cập nhật trạng thái mới
protected final void setState(int updatedState) {
synchronizationState = updatedState;
}
// Thao tác thay đổi trạng thái bằng CAS
protected final boolean attemptStateChange(int expected, int newValue) {
return unsafe.compareAndSwapInt(this, stateOffset, expected, newValue);
}
}
Cấu trúc của lớp Nút chờ (Node)
Mỗi phần tử trong hàng đợi là một đối tượng Node chứa thông tin luồng chờ và trạng thái của nó.
static class WaitingNode {
// Các hằng số trạng thái
static final WaitingNode SHARED_MODE = new WaitingNode();
static final WaitingNode EXCLUSIVE_MODE = null;
static final int STATUS_CANCELLED = 1;
static final int STATUS_SIGNAL = -1;
static final int STATUS_CONDITION = -2;
static final int STATUS_PROPAGATE = -3;
volatile int currentStatus; // Trạng thái chờ
volatile WaitingNode previous; // Nút liền trước
volatile WaitingNode next; // Nút liền sau
volatile Thread runner; // Luồng đang chạy
WaitingNode nextWaiter; // Chờ kế tiếp
// Khởi tạo với luồng và chế độ
WaitingNode(Thread executingThread, WaitingNode waitMode) {
this.nextWaiter = waitMode;
this.runner = executingThread;
}
boolean isSharedMode() {
return nextWaiter == SHARED_MODE;
}
}
Hàm xử lý cốt lõi: acquire
Phương thức acquire(int argument) định nghĩa cách một luồng cố gắng chiếm giữ tài nguyên theo chế độ độc quyền.
public final void acquire(int argument) {
// Bước 1: Thử chiếm ngay lập tức
if (!tryObtainLock(argument)) {
// Bước 2: Nếu thất bại, đưa luồng vào hàng đợi
WaitingNode node = enqueueWaitingNode(EXCLUSIVE_MODE);
// Bước 3: Chờ trong hàng đợi cho đến khi thành công
if (spinWhileLocked(node, argument)) {
// Bước 4: Đánh dấu lại cờ ngắt nếu cần
restoreInterruptFlag();
}
}
}
Trong đó, tryObtainLock là phương thức trừu tượng mà các lớp con phải triển khai để xác định logic cạnh tranh cụ thể.
Quá trình thêm luồng vào hàng đợi
Vị trí addWaitingNode chịu trách nhiệm nối mới luồng vào đuôi hàng đợi một cách an toàn.
private WaitingNode enqueueWaitingNode(WaitingNode mode) {
// Tạo nút mới cho luồng hiện tại
WaitingNode currentNode = new WaitingNode(Thread.currentThread(), mode);
// Kiểm tra nút đuôi hiện tại
WaitingNode lastNode = queueTail;
if (lastNode != null) {
// Thiết lập liên kết ngược
currentNode.previous = lastNode;
// Cố gắng cập nhật đuôi bằng CAS
if (compareAndSetQueueTail(lastNode, currentNode)) {
lastNode.next = currentNode;
return currentNode;
}
}
// Nếu rơi vào điều kiện đồng bộ phức tạp, chuyển sang phương thức enq chuẩn
return stableEnqueue(currentNode);
}
Cơ chế xoay vòng chờ (Spinning)
Hàm spinWhileLocked kiểm soát việc luồng tự do hay bị treo khi không giành được tài nguyên.
final boolean spinWhileLocked(WaitingNode waitingNode, int requestValue) {
boolean lockSuccess = true;
boolean wasInterrupted = false;
try {
for (;;) {
WaitingNode predecessor = waitingNode.predecessor();
// Nếu tiền nhân là head và có thể lấy lock
if (predecessor == queueHead && tryObtainLock(requestValue)) {
setQueueHead(waitingNode);
predecessor.next = null; // Giải phóng GC
lockSuccess = false;
return wasInterrupted;
}
// Quyết định xem có nên pause luồng hay không
if (checkIfShouldPark(predecessor, waitingNode) &&
parkThreadAndCheck()) {
wasInterrupted = true;
}
}
} finally {
if (lockSuccess) {
cancelLockRequest(waitingNode);
}
}
}
Hàm kiểm tra trạng thái chờ:
private static boolean checkIfShouldPark(WaitingNode pred, WaitingNode curr) {
int status = pred.currentStatus;
if (status == WaitingNode.STATUS_SIGNAL)
return true;
// Xử lý tình huống nút tiền nhân đã hủy bỏ
else if (status > 0) {
do {
curr.previous = pred = pred.previous;
} while (pred != null && pred.currentStatus > 0);
pred.next = curr;
} else {
compareAndSetWaitStatus(pred, status, WaitingNode.STATUS_SIGNAL);
}
return false;
}
Hàm giải phóng: release
Sau khi hoàn thành tác vụ, luồng gọi release(int argument) để trả lại tài nguyên và kích hoạt luồng tiếp theo.
public final boolean release(int argument) {
// Thử hạ cấp trạng thái
if (tryReleaseResource(argument)) {
WaitingNode h = queueHead;
if (h != null && h.currentStatus != 0) {
wakeUpNextThread(h);
}
return true;
}
return false;
}
Chế độ Chia sẻ (Shared Mode)
Khác với chế độ độc quyền, việc chiếm hữu chung được xử lý qua acquireShared.
public final void acquireShared(int arg) {
if (tryAcquireSharedLogic(arg) < 0) {
// Thực hiện quy trình chờ đặc biệt cho chế độ chia sẻ
enterSharedQueue(arg);
}
}
Việc đánh thức trong chế độ này cũng phức tạp hơn nhằm đảm bảo nhiều luồng có thể nhận được tài nguyên cùng lúc nếu điều kiện cho phép.
Luồng xử lý tổng quát
- Khởi tạo trạng thái ban đầu khi tạo đối tượng đồng bộ hóa.
- Luồng gọi phương thức lấy khóa (
acquire). NếutryAcquiretrả về thật, nó được cấp quyền ngay. - Nếu thất bại, luồng được đóng gói thành Node và ghép nối vào hàng đợi chờ FIFO.
- Luồng tự treo (park) cho đến khi người đứng trước nó giải phóng khóa hoặc bị ngắt.
- Khi nguồn lực được giải phóng (
release), người tiếp theo trong hàng đợi được đánh thức và thử lại việc chiếm giữ.