Xử lý Đồng thời trong Python: Socket Server, Luồng, Tiến trình và Coroutine

Mô hình Socket Server và Cơ chế Đồng thời

Module socketserver trong Python được thiết kế để xây dựng các máy chủ mạng có khả năng xử lý nhiều kết nối cùng lúc. Cơ chế hoạt động dựa trên sự kết hợp giữa IO đa đường (IO multiplexing) cùng với mô hình đa luồng (multithreading) hoặc đa tiến trình (multiprocessing). Khi một client thiết lập kết nối, server sẽ khởi tạo một luồng hoặc tiến trình riêng biệt để quản lý phiên làm việc đó, đảm bảo các yêu cầu không bị chặn lẫn nhau.

Khác với mô hình select hay epoll thuần túy, socketserver tự động hóa việc phân phối kết nối. Ngay khi client kết nối thành công, một kênh giao tiếp độc lập được thiết lập giữa client và luồng/tiến trình phục vụ nó. Dữ liệu subsequent được trao đổi trực tiếp qua kênh này mà không cần thông qua vòng lặp chính của server.

Trong môi trường đa tiến trình, các client truyền tải dữ liệu lớn sẽ hoàn toàn độc lập về mặt bộ nhớ. Ngược lại, trong môi trường đa luồng của Python, do tồn tại Global Interpreter Lock (GIL), tại một thời điểm chỉ có một luồng thực thi mã bytecode. Việc chuyển đổi ngữ cảnh (context switch) xảy ra ở tầng hệ thống, dẫn đến hiệu suất xử lý CPU-bound không cao như mong đợi, nhưng vẫn hiệu quả cho các tác vụ I/O-bound.

Triển khai ThreadingTCPServer

ThreadingTCPServer là một lớp server mà mỗi kết nối client đến sẽ được phục vụ bởi một luồng riêng biệt. Để sử dụng, lập trình viên cần định nghĩa một class kế thừa từ BaseRequestHandler và implement phương thức handle.

Quy trình cơ bản bao gồm:

  1. Khởi tạo class handler xử lý logic nghiệp vụ.
  2. Khởi tạo đối tượng server với địa chỉ IP và cổng mong muốn.
  3. Gọi phương thức serve_forever để bắt đầu lắng nghe.
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socketserver

class ClientHandler(socketserver.BaseRequestHandler):
    def handle(self):
        connection = self.request
        connection.sendall(b'Chao mung den he thong ho tro. Nhan 1 de xem trang thai, quit de thoat.\n')
        running = True
        
        while running:
            data = connection.recv(1024).decode().strip()
            if not data:
                break
            
            if data.lower() == 'quit':
                running = False
            elif data == '1':
                response = 'He thong dang hoat dong on dinh.'
                connection.sendall(response.encode())
            else:
                connection.sendall(b'Lenh khong hop le. Vui long thu lai.')

if __name__ == '__main__':
    HOST, PORT = '127.0.0.1', 9000
    server = socketserver.ThreadingTCPServer((HOST, PORT), ClientHandler)
    print(f'Server dang chay tai {HOST}:{PORT}')
    server.serve_forever()

Phía client kết nối và giao tiếp như sau:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

TARGET = ('127.0.0.1', 9000)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(TARGET)
client.settimeout(10)

try:
    while True:
        msg = client.recv(1024).decode()
        print(f'Received: {msg}')
        user_input = input('Nhap lenh: ')
        client.sendall(user_input.encode())
        if user_input.lower() == 'quit':
            break
finally:
    client.close()

Phân tích Nguyên lý Hoạt động

Việc hiểu rõ source code giúp tối ưu hóa hệ thống. Quy trình xử lý bên trong ThreadingTCPServer diễn ra như sau:

  • Khởi tạo server gọi __init__ của TCPServer để bind socket.
  • BaseServer.__init__ lưu trữ class handler vào RequestHandlerClass.
  • Vòng lặp serve_forever liên tục kiểm tra kết nối đến.
  • Khi có kết nối, ThreadingMixIn.process_request được gọi để spawn một thread mới.
  • Thread mới thực thi finish_request, khởi tạo instance của handler và gọi phương thức handle.

Bản chất của việc hỗ trợ nhiều client đồng thời nằm ở sự kết hợp giữa select (để chấp nhận kết nối) và threading (để xử lý logic). Dưới đây là mô phỏng đơn giản hóa cơ chế này:

import socket
import threading
import select

def handle_client(conn, addr):
    print(f'Ket noi tu {addr}')
    conn.send(b'Da ket noi.\n')
    active = True
    while active:
        data = conn.recv(1024)
        if data.decode().strip() == 'quit':
            active = False
        else:
            conn.send(b'Da nhan du lieu.\n')
    conn.close()

server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(('127.0.0.1', 9002))
server_sock.listen(5)

while True:
    ready_to_read, _, _ = select.select([server_sock], [], [], 1)
    if server_sock in ready_to_read:
        conn, addr = server_sock.accept()
        worker = threading.Thread(target=handle_client, args=(conn, addr))
        worker.daemon = True
        worker.start()

Sử dụng ForkingTCPServer

Tương tự như ThreadingTCPServer, ForkingTCPServer cũng xử lý mỗi kết nối bằng một đơn vị thực thi riêng, nhưng thay vì là thread, nó sử dụng process (tiến trình). Điều này phù hợp hơn cho các tác vụ tận dụng đa lõi CPU hoặc cần sự cô lập bộ nhớ chặt chẽ.

Việc chuyển đổi chỉ cần thay đổi class khởi tạo server:

server = socketserver.ForkingTCPServer((HOST, PORT), ClientHandler)

Lưu ý rằng cơ chế fork hoạt động tốt trên Unix/Linux, trong khi trên Windows có thể gặp hạn chế do cách quản lý process khác biệt.

Quản lý Luồng với Threading

Module threading cung cấp các công cụ để làm việc với luồng. Luồng là đơn vị thực thi nhỏ nhất trong một process. Khi khởi tạo, luồng sẽ chờ CPU lập lịch để thực thi.

import threading
import time

def worker(task_id):
    time.sleep(1)
    print(f'Task {task_id} da hoan thanh')

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print('Tat ca cac task da ket thuc')

Do GIL, các luồng Python không thực sự chạy song song trên nhiều lõi CPU cho các tác vụ tính toán nặng, nhưng vẫn hữu ích cho các tác vụ chờ I/O.

Đồng bộ hóa với Lock

Khi nhiều luồng truy cập vào cùng một tài nguyên chung, hiện tượng race condition có thể xảy ra. Khóa (Lock) giúp đảm bảo tính nguyên tử của thao tác.

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(1000):
        lock.acquire()
        try:
            counter += 1
        finally:
            lock.release()

threads = []
for _ in range(5):
    t = threading.Thread(target=increment)
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print(f'Gia tri cuoi cung: {counter}')

Điều phối với Event

Đối tượng Event cho phép một luồng báo hiệu cho các luồng khác về một trạng thái cụ thể. Cơ chế dựa trên cờ flag nội bộ.

  • set(): Đặt cờ thành True, mở khóa các luồng đang chờ.
  • clear(): Đặt cờ thành False, các luồng gọi wait() sẽ bị chặn.
import threading
import time

event = threading.Event()

def waiter(name):
    print(f'{name} dang cho signal...')
    event.wait()
    print(f'{name} da nhan signal, bat dau lam viec')

for i in range(3):
    threading.Thread(target=waiter, args=(f'Worker-{i}',)).start()

time.sleep(2)
print('Gui signal...')
event.set()

Xử lý Đa Tiến trình (Multiprocessing)

Module multiprocessing cho phép tạo các tiến trình con để vượt qua giới hạn của GIL. Mỗi tiến trình có không gian bộ nhớ độc lập.

from multiprocessing import Process

def calculate_square(n):
    print(f'Binh phuong cua {n} la {n*n}')

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Process(target=calculate_square, args=(i,))
        jobs.append(p)
        p.start()
    
    for p in jobs:
        p.join()

Chia sẻ Dữ liệu giữa các Tiến trình

Do bộ nhớ độc lập, các tiến trình không thể truy cập biến toàn cục của nhau trực tiếp. Cần sử dụng các cấu trúc đặc biệt như Array, Value hoặc Manager.

from multiprocessing import Process, Array

def modify_array(arr, index):
    arr[index] = index * 10
    print(f'Tien trinh sua phan tu {index}: {arr[index]}')

if __name__ == '__main__':
    shared_array = Array('i', [0, 0, 0, 0, 0])
    processes = []
    
    for i in range(5):
        p = Process(target=modify_array, args=(shared_array, i))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
        
    print(f'Du lieu cuoi cung: {list(shared_array)}')

Khi nhiều tiến trình cùng ghi vào vùng nhớ chung, khóa liên tiến trình (RLock trong multiprocessing) là bắt buộc để tránh dữ liệu bị hỏng.

Process Pool

Process Pool quản lý một tập hợp các tiến trình worker, tái sử dụng chúng để xử lý nhiều tác vụ mà không cần tạo mới liên tục, giúp tối ưu hiệu năng.

from multiprocessing import Pool
import time

def heavy_task(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        results = pool.map_async(heavy_task, range(10))
        output = results.get()
        print(output)

Coroutine và Xử lý Bất đồng bộ

Khác với thread và process được hệ điều hành quản lý, coroutine được điều phối ở mức người dùng (user-level). Lập trình viên có quyền kiểm soát hoàn toàn thời điểm chuyển đổi ngữ cảnh. Điều này giúp giảm thiểu overhead so với thread switching.

Trong Python, greenlet là thư viện cấp thấp cho phép chuyển đổi thủ công giữa các coroutine.

from greenlet import greenlet

def task_a():
    print('Bat dau A')
    gr_b.switch()
    print('Tiep tuc A')
    gr_b.switch()
    print('Ket thuc A')

def task_b():
    print('Bat dau B')
    gr_a.switch()
    print('Tiep tuc B')
    gr_a.switch()
    print('Ket thuc B')

gr_a = greenlet(task_a)
gr_b = greenlet(task_b)

gr_a.switch()

Tuy nhiên, việc chuyển đổi thủ công dễ gây phức tạp. Thư viện gevent đóng gói greenlet và tự động chuyển đổi ngữ cảnh khi gặp các thao tác I/O (như network request), giúp viết code đồng bộ nhưng chạy bất đồng bộ.

from gevent import monkey
monkey.patch_all()
import gevent
import urllib.request

def fetch_url(url):
    print(f'Dang lay: {url}')
    response = urllib.request.urlopen(url)
    data = response.read()
    print(f'Nhan {len(data)} bytes tu {url}')

gevent.joinall([
    gevent.spawn(fetch_url, 'https://www.python.org'),
    gevent.spawn(fetch_url, 'https://www.github.com'),
    gevent.spawn(fetch_url, 'https://www.wikipedia.org'),
])

Khi một coroutine trong gevent thực hiện I/O, nó sẽ nhường quyền thực thi cho các coroutine khác đang chờ, tận dụng thời gian chờ đợi để xử lý tác vụ khác mà không cần nhiều thread.

Thẻ: python socketserver threading multiprocessing gevent

Đăng vào ngày 11 tháng 6 lúc 18:25