Điều biến điều kiện (condition variable), std::future, std::promise, std::packaged_task và tư duy lập trình hàm là những công cụ cốt lõi để xây dựng các hệ thống đồng bộ an toàn và linh hoạt trong C++. Bài viết này trình bày cách kết hợp chúng một cách hiệu quả — từ việc đồng bộ hóa thao tác trên cấu trúc dữ liệu chia sẻ đến việc thiết kế thuật toán song song dựa trên nguyên lý hàm thuần túy.
1. Đồng bộ hóa bằng điều biến điều kiện
Điều biến điều kiện cho phép một hoặc nhiều luồng tạm dừng thực thi cho đến khi một điều kiện cụ thể được đáp ứng, thường thông qua sự phối hợp với mutex để đảm bảo truy cập an toàn vào tài nguyên chia sẻ.
Dưới đây là ví dụ minh họa mô hình "nhà sản xuất – người tiêu thụ" đơn giản với ngăn xếp an toàn luồng:
#include <iostream>
#include <thread>
#include <stack>
#include <mutex>
#include <condition_variable>
#include <memory>
template<typename T>
class ThreadSafeStack {
private:
mutable std::mutex mtx_;
std::condition_variable cv_;
std::stack<T> data_;
public:
void push(T value) {
std::lock_guard<std::mutex> lock(mtx_);
data_.push(std::move(value));
cv_.notify_one(); // Thông báo ngay khi có phần tử mới
}
std::shared_ptr<T> waitPop() {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] { return !data_.empty(); });
auto ptr = std::make_shared<T>(std::move(data_.top()));
data_.pop();
return ptr;
}
bool tryPop(T& output) {
std::lock_guard<std::mutex> lock(mtx_);
if (data_.empty()) return false;
output = std::move(data_.top());
data_.pop();
return true;
}
};
Mã trên khắc phục điểm yếu của phiên bản chỉ dùng mutex: thay vì kiểm tra empty() rồi gọi pop() (rủi ro race condition), luồng tiêu thụ chờ trực tiếp trên điều kiện !data_.empty(). Hàm waitPop() sử dụng cv_.wait(..., predicate) — đảm bảo cả việc kiểm tra điều kiện và chờ đợi được thực hiện nguyên tử.
Các phương thức thời gian chờ như wait_for và wait_until hỗ trợ xử lý trường hợp chờ vô hạn không mong muốn:
wait_for(lock, 2s): Chờ tối đa 2 giây; trả vềstd::cv_status::timeoutnếu hết hạn.wait_until(lock, steady_clock::now() + 500ms): Chờ đến một thời điểm tuyệt đối.
2. Ba thành phần nền tảng của lập trình bất đồng bộ
2.1. std::async và std::future
std::async là cơ chế cấp cao để khởi chạy tác vụ bất đồng bộ, trả về std::future để truy vấn kết quả hoặc trạng thái hoàn tất.
Hành vi thực thi phụ thuộc vào chính sách khởi động:
std::launch::async: Luôn tạo luồng mới.std::launch::deferred: Thực thi trì hoãn — chỉ chạy khi gọiget()hoặcwait()trênfuture, và chạy trên luồng gọi.std::launch::async | std::launch::deferred(hoặc không chỉ định): Hành vi do triển khai thư viện quyết định (thường ưu tiênasyncnếu hệ thống đủ tài nguyên).
Lưu ý quan trọng: Một std::future được trả về bởi std::async sẽ chặn luồng gọi trong hàm hủy nếu chưa được tiêu thụ — dẫn đến hành vi tuần tự không mong muốn. Vì vậy, luôn lưu trữ future vào biến để kiểm soát vòng đời rõ ràng.
2.2. std::promise và std::packaged_task
std::promise cung cấp cơ chế "đặt giá trị" từ một luồng và "lấy giá trị" từ luồng khác thông qua future liên kết. Nó đặc biệt hữu ích khi cần truyền giá trị hoặc ngoại lệ từ luồng nền ra luồng chính.
void producer(std::promise<int>& prom) {
std::this_thread::sleep_for(1s);
prom.set_value(123); // Hoặc prom.set_exception(...)
}
int main() {
std::promise<int> p;
auto f = p.get_future();
std::thread t(producer, std::ref(p));
std::cout << f.get() << "\n"; // Chặn cho đến khi set_value được gọi
t.join();
}
std::packaged_task bao bọc một callable (hàm, lambda…) và tự động tạo future để thu nhận kết quả sau khi thực thi. Đây là lựa chọn lý tưởng khi muốn "đóng gói" một tác vụ để gửi tới luồng khác:
std::packaged_task<int()> task([]{ return 42; });
auto fut = task.get_future();
std::thread t(std::move(task));
t.detach();
std::cout << fut.get() << "\n";
2.3. std::shared_future
Khi nhiều luồng cần chờ cùng một kết quả, std::future không đủ vì nó chỉ có thể được tiêu thụ một lần. std::shared_future giải quyết vấn đề này bằng cách cho phép sao chép và chia sẻ giữa các luồng:
std::promise<double> p;
std::shared_future<double> sf = p.get_future();
std::thread t1([&sf]{ std::cout << "T1: " << sf.get() << "\n"; });
std::thread t2([&sf]{ std::cout << "T2: " << sf.get() << "\n"; });
p.set_value(3.14159);
t1.join(); t2.join();
3. Thiết kế thuật toán song song theo phong cách hàm
Lập trình hàm nhấn mạnh tính thuần túy (không side-effect), bất biến (immutable data) và phân chia theo chức năng. Trong ngữ cảnh song song, điều này giúp tránh xung đột dữ liệu một cách tự nhiên.
Dưới đây là phiên bản sắp xếp nhanh song song sử dụng std::async để xử lý đệ quy một nhánh:
#include <list>
#include <algorithm>
#include <future>
template<typename T>
std::list<T> parallelQuickSort(std::list<T> input) {
if (input.size() <= 1) return input;
std::list<T> result;
result.splice(result.begin(), input, input.begin());
const T& pivot = *result.begin();
auto mid = std::partition(input.begin(), input.end(),
[&](const T& x) { return x < pivot; });
std::list<T> lower(input.begin(), mid);
std::list<T> upper(mid, input.end());
// Xử lý nhánh "lower" song song
auto futureLower = std::async(parallelQuickSort<T>, std::move(lower));
// Xử lý nhánh "upper" tuần tự
auto sortedUpper = parallelQuickSort(std::move(upper));
// Kết hợp kết quả
result.splice(result.end(), sortedUpper);
result.splice(result.begin(), futureLower.get());
return result;
}
Ở đây, mỗi lần gọi đệ quy tạo ra hai danh sách con độc lập (lower, upper). Việc chuyển lower vào std::async đảm bảo rằng việc sắp xếp nó không ảnh hưởng đến trạng thái của upper hay result, nhờ vào tính bất biến của các đối tượng được di chuyển (std::move). Đây là chìa khóa để đạt được độ an toàn luồng mà không cần khóa thủ công.
Tư duy hàm không chỉ làm mã dễ hiểu hơn — nó còn tạo điều kiện thuận lợi cho việc song song hóa tự động, vì các hàm thuần túy có thể được hoán đổi, hoãn thực thi hoặc lặp lại mà không gây hậu quả phụ.