Tổng quan về ThreadPoolExecutor
Mặc dù Python đã có module threading, nhưng việc quản lý trực tiếp các luồng có thể gặp nhiều hạn chế. Lấy ví dụ trong lập trình crawler, bạn cần kiểm soát số lượng luồng chạy đồng thời. Giả sử bạn tạo 20 luồng nhưng chỉ muốn 3 luồng hoạt động song song; việc tạo xóa 20 luồng đều tiêu tốn tài nguyên hệ thống. Giải pháp đơn giản là chỉ dùng 3 luồng, mỗi luồng nhận một tác vụ, các tác vụ còn lại xếp hàng chờ và được gán khi có luồng rảnh. Đây là ý tưởng cốt lõi của thread pool.
Từ Python 3.2, thư viện chuẩn cung cấp module concurrent.futures với hai class chính: ThreadPoolExecutor và ProcessPoolExecutor, giúp trừu tượng hóa việc quản lý luồng/tiến trình. Lợi ích chính bao gồm:
- Luồng chính có thể lấy trạng thái và kết quả trả về của từng tác vụ.
- Luồng chính được thông báo ngay khi có tác vụ hoàn thành.
- Cung cấp interface thống nhất cho cả đa luồng và đa tiến trình.
Ví dụ cơ bản
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_page(delay):
"""Mô phỏng request mạng"""
time.sleep(delay)
print(f"Trang đã tải xong sau {delay}s")
return delay
executor = ThreadPoolExecutor(max_workers=2)
task_1 = executor.submit(fetch_page, 3)
task_2 = executor.submit(fetch_page, 2)
print(task_1.done()) # False: chưa xong
print(task_2.cancel()) # False: không hủy được vì đã chạy
time.sleep(4)
print(task_1.done()) # True
print(task_1.result()) # 3
Giải thích chi tiết:
- Tham số
max_workersquy định số luồng tối đa trong pool. submit()gửi tác vụ vào pool và trả về future (tương tự handle) ngay lập tức (non-blocking).done()kiểm tra tác vụ đã hoàn thành chưa. Ở ví dụ trên, sau 2 giây, task_2 hoàn thành trước, nhưng ban đầu task_1 chưa xong.cancel()chỉ hủy được tác vụ chưa được phân vào luồng (còn trong hàng chờ).result()chờ và lấy giá trị trả về (blocking).
Xử lý kết quả với as_completed
Thay vì liên tục kiểm tra từng tác vụ, ta dùng as_completed để nhận kết quả ngay khi có luồng hoàn thành.
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_page(delay):
time.sleep(delay)
print(f"Xong {delay}s")
return delay
with ThreadPoolExecutor(max_workers=2) as executor:
tasks = [executor.submit(fetch_page, d) for d in [3, 2, 4]]
for future in as_completed(tasks):
result = future.result()
print(f"Main nhận: {result}s")
# Kết quả: 2s -> 3s -> 4s (theo thứ tự hoàn thành)
as_completed là generator: nó chờ cho đến khi có tác vụ kết thúc, rồi yield future đó. Nhờ vậy, tác vụ nào xong trước sẽ được xử lý trước, không cần tuân theo thứ tự submit.
Phương thức map
executor.map cũng cho phép áp dụng cùng một hàm lên nhiều đối số, nhưng thứ tự kết quả giống thứ tự đầu vào (bất kể thời gian thực thi).
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_page(delay):
time.sleep(delay)
print(f"Xong {delay}s")
return delay
with ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(fetch_page, [3, 2, 4])
for res in results:
print(f"Main nhận: {res}s")
# Kết quả: in ra 3s -> 2s -> 4s (theo thứ tự list đầu vào)
Chặn chờ với wait
Hàm wait cho phép luồng chính chặn cho đến khi thỏa mãn điều kiện nhất định (ví dụ: chờ tất cả hoặc một số tác vụ hoàn thành).
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import time
def job(n):
time.sleep(n)
return n
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(job, t) for t in (3, 5, 1)]
# Chờ tất cả tác vụ kết thúc
done, not_done = wait(futures, timeout=4, return_when=ALL_COMPLETED)
print(f"Đã xong: {len(done)}, còn lại: {len(not_done)}")
Trong ví dụ trên, timeout=4 chỉ thời gian chờ tối đa. return_when có thể là ALL_COMPLETED, FIRST_COMPLETED hoặc FIRST_EXCEPTION.