Khái niệm Pool Xử lý Đa Luồng
Trong các ứng dụng máy chủ như web server, database server, việc xử lý đồng thời nhiều tác vụ ngắn đòi hỏi cơ chế tối ưu hóa tài nguyên. Thay vì tạo mới và hủy bỏ luồng liên tục gây tiêu tốn tài nguyên, kỹ thuật "pool" cho phép tái sử dụng luồng đã tồn tại.
Pool xử lý đa luồng là tập hợp các luồng ở trạng thái chờ sẵn, chỉ tiêu thụ bộ nhớ tối thiểu mà không chiếm dụng CPU. Khi có tác vụ mới, pool sẽ phân bổ luồng rảnh để xử lý. Khi số luồng hoạt động đạt giới hạn, pool có thể mở rộng tạm thời hoặc điều chỉnh tự động dựa trên tải hệ thống.
Các nguyên tắc quan trọng
- Tối ưu kích thước pool: Số luồng tối ưu phụ thuộc vào đặc tính ứng dụng và tài nguyên hệ thống. Với tác vụ CPU-bound, nên giới hạn bằng số nhân CPU. Với tác vụ I/O-bound, có thể tăng kích thước pool.
- Quản lý trạng thái: Cần theo dõi số luồng đang hoạt động, số tác vụ chờ xử lý để tối ưu hiệu năng.
- Giải phóng tài nguyên: Đảm bảo luồng được trả về pool sau khi hoàn thành, tránh rò rỉ tài nguyên.
Triển khai đơn giản
import queue
import threading
import time
class SimplePool:
def __init__(self, max_size=20):
self.task_queue = queue.Queue(max_size)
for _ in range(max_size):
self.task_queue.put(threading.Thread)
def get_worker(self):
return self.task_queue.get()
def release_worker(self):
self.task_queue.put(threading.Thread)
def execute_task(index, pool):
print(f"Xử lý {index}")
time.sleep(1)
pool.release_worker()
pool = SimplePool(5)
for i in range(100):
worker = pool.get_worker()
thread = worker(target=execute_task, args=(i, pool))
thread.start()
Triển khai nâng cao
from queue import Queue
import threading
import contextlib
class AdvancedPool:
def __init__(self, max_threads=10):
self.task_queue = Queue()
self.max_threads = max_threads
self.active_workers = 0
self.busy_threads = []
self.idle_threads = []
@contextlib.contextmanager
def track_state(self, thread_list, thread):
thread_list.append(thread)
try:
yield
finally:
thread_list.remove(thread)
def worker_routine(self):
while True:
task = self.task_queue.get()
if task is None:
break
with self.track_state(self.busy_threads, threading.current_thread()):
func, args, kwargs = task
func(*args, **kwargs)
self.idle_threads.append(threading.current_thread())
def add_task(self, func, *args, **kwargs):
self.task_queue.put((func, args, kwargs))
def start_pool(self):
while self.active_workers < min(self.max_threads, self.task_queue.qsize()):
thread = threading.Thread(target=self.worker_routine)
thread.start()
self.active_workers += 1
def shutdown(self):
for _ in range(self.active_workers):
self.task_queue.put(None)
Điểm cần lưu ý
- Tránh deadlock bằng cách đồng bộ hóa truy cập tài nguyên chung
- Quản lý trạng thái luồng để phát hiện tắc nghẽn
- Thiết kế cơ chế tự điều chỉnh kích thước pool theo tải thực tế
- Đảm bảo giải phóng tài nguyên kịp thời khi không sử dụng