Nguyên lý hoạt động của bể luồng trong lập trình đa luồng

Tại sao cần sử dụng bể luồng?

Việc tạo mới luồng để thực thi tác vụ và hủy bỏ sau khi hoàn thành sẽ tiêu tốn tài nguyên hệ thống. Bể luồng giúp tối ưu hóa bằng cách tạo sẵn luồng, duy trì chúng và giảm thiểu chi phí khởi tạo/hủy bỏ luồng, từ đó nâng cao hiệu suất xử lý.

Quá trình chuyển đổi giữa 5 trạng thái của bể luồng

Dưới đây là các trạng thái chính trong mã nguồn:

/**
 * Trạng thái vận hành xác định chu kỳ sống của bể luồng, bao gồm:
 *
 *   CHẠY:     Chấp nhận tác vụ mới và xử lý các tác vụ trong hàng đợi
 *   DỪNG:     Không chấp nhận tác vụ mới nhưng vẫn xử lý hàng đợi
 *   NGỪNG:    Không chấp nhận tác vụ mới, không xử lý hàng đợi,
 *             và ngắt các tác vụ đang chạy
 *   SẮP KẾT THÚC: Tất cả tác vụ đã kết thúc, số lượng luồng bằng 0,
 *             luồng chuyển sang trạng thái này sẽ gọi phương thức terminated()
 *   ĐÃ KẾT THÚC: Phương thức terminated() đã hoàn tất
 */
Bảng so sánh các trạng thái:
Trạng tháiChấp nhận tác vụ mớiXử lý hàng đợiGhi chú
CHẠYTrạng thái hoạt động bao gồm CHẠY và CHỜ
DỪNGKhôngĐược thiết lập khi gọi shutdown()
NGỪNGKhôngKhôngĐược thiết lập khi gọi shutdownNow(), ngắt các tác vụ đang chạy
SẮP KẾT THÚCKhôngKhôngKhi tất cả luồng dừng lại, workerCount = 0
ĐÃ KẾT THÚCKhôngKhôngThực thi xong terminated() từ trạng thái SẮP KẾT THÚC
Phân tích mã nguồn trạng thái:

// Phương thức tryTerminate được gọi khi luồng kết thúc hoặc gặp lỗi
final void tryTerminate() {
	for (;;) {
		int c = controlState.get();
		if (isRunning(c) ||
			runStateAtLeast(c, TIDYING) ||
			(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
			return;
		if (workerCountOf(c) != 0) { // Kiểm tra điều kiện dừng
			interruptIdleWorkers(ONLY_ONE);
			return;
		}

		final ReentrantLock mainLock = this.mainLock;
		mainLock.lock();
		try {
			if (controlState.compareAndSet(c, controlStateOf(TIDYING, 0))) {
				try {
					terminated(); // Chuyển sang trạng thái TERMINATED sau khi thực thi
				} finally {
					controlState.set(controlStateOf(TERMINATED, 0));
					termination.signalAll();
				}
				return;
			}
		} finally {
			mainLock.unlock();
		}
	}
}
Cách biểu diễn trạng thái:

private final AtomicInteger controlState = new AtomicInteger(controlStateOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Phân tích mã khởi tạo luồng:

private boolean addWorker(Runnable initialTask, boolean isCore) {
	retry:
	for (;;) {
		int c = controlState.get();
		int rs = runStateOf(c);

		if (rs >= SHUTDOWN &&
			! (rs == SHUTDOWN &&
			   initialTask == null &&
			   ! workQueue.isEmpty()))
			return false;

		for (;;) {
			int wc = workerCountOf(c);
			if (wc >= CAPACITY ||
				wc >= (isCore ? corePoolSize : maxPoolSize))
				return false;
			if (compareAndIncrementWorkerCount(c)) 
				break retry; 
			c = controlState.get();  
			if (runStateOf(c) != rs)
				continue retry; 
		}
	}
	
	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		w = new Worker(initialTask);
		final Thread t = w.thread;
		if (t != null) {
			final ReentrantLock mainLock = this.mainLock;
			mainLock.lock();
			try {
				int rs = runStateOf(controlState.get());
				if (rs < SHUTDOWN ||
					(rs == SHUTDOWN && initialTask == null)) {
					if (t.isAlive()) 
						throw new IllegalThreadStateException();
					workers.add(w);
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
					workerAdded = true;
				}
			} finally {
				mainLock.unlock();
			}
			if (workerAdded) {
				t.start();
				workerStarted = true;
			}
		}
	} finally {
		if (! workerStarted)
			addWorkerFailed(w);
	}
	return workerStarted;
}
Phương thức execute:

public void execute(Runnable task) {
	if (task == null)
		throw new NullPointerException();
	int c = controlState.get();
	if (workerCountOf(c) < corePoolSize) {
		if (addWorker(task, true))
			return;
		c = controlState.get();
	}
	if (isRunning(c) && workQueue.offer(task)) {
		int recheck = controlState.get();
		if (! isRunning(recheck) && remove(task)) 
			reject(task);
		else if (workerCountOf(recheck) == 0) 
			addWorker(null, false);
	}
	else if (!addWorker(task, false)) 
		reject(task);
}
Phương thức shutdown:

public void shutdown() {
	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try {
		checkShutdownAccess();
		advanceRunState(SHUTDOWN);
		interruptIdleWorkers();
		onShutdown(); 
	} finally {
		mainLock.unlock();
	}
	tryTerminate();
}
Phương thức shutdownNow:

public List<Runnable> shutdownNow() {
	List<Runnable> pendingTasks;
	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try {
		checkShutdownAccess();
		advanceRunState(STOP);
		interruptWorkers();
		pendingTasks = drainQueue();
	} finally {
		mainLock.unlock();
	}
	tryTerminate();
	return pendingTasks;
}
Vai trò của mainLock: Đây là khóa toàn cục được sử dụng để đảm bảo tính nguyên tử khi thao tác với danh sách workers, tránh hiện tượng đọc dữ liệu không nhất quán.

Thẻ: Java ThreadPoolExecutor Concurrency multi-threading Thread Management

Đăng vào ngày 26 tháng 5 lúc 22:15