1. Môi trường phát triển
Node.JS phiên bản 14.8.0
2. Bắt đầu nhanh với worker_threads
Javascript và Node.js trước đây luôn hoạt động trên mô hình đơn luồng cho đến khi module chính thức worker_threads được giới thiệu để giải quyết các tác vụ tính toán nặng.
Sử dụng đoạn mã sau để khởi tạo một luồng làm việc:
if (isPrimaryThread) {
// Tệp hiện tại sẽ được tải lại trong thể hiện luồng làm việc
new WorkerProcess(__filename);
console.log("Đang chạy trong tiến trình chính");
console.log("isPrimaryThread là", isPrimaryThread);
// Luồng làm việc có thể khởi động tệp chỉ định hoặc sử dụng tham số eval để chạy trực tiếp mã nguồn
// Vì không tồn tại process.env trong luồng con, nên có thể sử dụng tùy chọn threadData
// Khi đó threadData sẽ được nhân bản vào require('worker_threads') theo thuật toán nhân bản cấu trúc HTML
} else {
console.log("Đang chạy trong luồng làm việc");
console.log("isPrimaryThread là", isPrimaryThread); // In ra 'false'
}
Kết quả đầu ra của đoạn mã trên:
Đang chạy trong tiến trình chính
isPrimaryThread là true
Đang chạy trong luồng làm việc
isPrimaryThread là false
3. Giao tiếp giữa các luồng
1) Giao tiếp giữa cha và con - parentPort
Trong luồng chính, đối tượng trả về từ WorkerProcess đại diện cho thể hiện luồng làm việc.
Trong luồng làm việc, require('worker_threads').parentPort là MessagePort của luồng chính.
Sử dụng hai đối tượng này có thể thực hiện giao tiếp song công giữa luồng chính và luồng làm việc.
const { WorkerProcess, isPrimaryThread, parentPort } = require("worker_threads");
// Sử dụng parentPort và worker để giao tiếp song công
if (isPrimaryThread) {
const workerInstance = new WorkerProcess(__filename);
// 1. Thiết lập lắng nghe tin nhắn từ luồng làm việc
workerInstance.on("message", (message) => console.log(message));
// 2. Gửi tin nhắn "ping" đến luồng làm việc
workerInstance.postMessage("ping");
} else {
// 3. Thiết lập lắng nghe tin nhắn từ luồng cha
parentPort.on("message", (message) =>
// 4. Sau khi nhận tin nhắn từ luồng cha, thêm vào trường response và gửi lại
parentPort.postMessage({ response: message })
);
}
Kết quả đầu ra:
{
response: "ping";
}
2) Giao tiếp giữa bất kỳ luồng nào - MessageChannel
Kênh truyền thông (MessageChannel) có thể thực hiện truyền dữ liệu giữa bất kỳ luồng nào (ví dụ như giữa nhiều luồng làm việc). So với phương pháp sử dụng require('worker_threads').parentPort, nhược điểm là chỉ có thể giao tiếp giữa luồng chính và luồng làm việc.
Nguyên lý giao tiếp qua kênh:
- Tạo kênh giao tiếp thông qua
MessageChannel - Sử dụng
postMessageđể truyền dữ liệu - Tham số đầu tiên là dữ liệu cần truyền, bao gồm
MessagePortcủa kênh giao tiếp - Tham số thứ hai là
transferList, các đối tượng trong danh sách này sẽ không còn sử dụng được ở phía gửi
Ví dụ về giao tiếp giữa các luồng anh em sử dụng kênh truyền:
const {
isPrimaryThread, parentPort, threadId, MessageChannel, WorkerProcess
} = require('worker_threads');
if (isPrimaryThread) {
const firstWorker = new WorkerProcess(__filename);
const secondWorker = new WorkerProcess(__filename);
const communicationChannel = new MessageChannel();
firstWorker.postMessage({ targetPort: communicationChannel.port1 }, [communicationChannel.port1]);
secondWorker.postMessage({ targetPort: communicationChannel.port2 }, [communicationChannel.port2]);
} else {
parentPort.once('message', (data) => {
data.targetPort.postMessage('xin chào');
data.targetPort.on('message', msg => {
console.log(`luồng ${threadId}: nhận ${msg}`);
});
});
}
Đầu ra trên console:
luồng 1: nhận xin chào
luồng 2: nhận xin chào
3) Giao tiếp qua bộ nhớ chia sẻ - SharedArrayBuffer
Đa luồng trong Node.js cũng có thể giao tiếp thông qua "bộ nhớ chia sẻ". Luồng chính và các luồng làm việc cùng thao tác trên cùng một vùng lưu trữ vật lý, thực hiện chia sẻ dữ liệu và tránh chi phí sao chép.
Nguyên lý giao tiếp qua bộ nhớ chia sẻ:
- Tạo dữ liệu nhị phân thông qua SharedArrayBuffer
- Dữ liệu SharedArrayBuffer không thể đặt trong tham số thứ hai của postMessage
Ví dụ về giao tiếp sử dụng bộ nhớ chia sẻ:
const assertion = require("assert");
const {
WorkerProcess,
MessageChannel,
MessagePort,
isPrimaryThread,
parentPort,
} = require("worker_threads");
if (isPrimaryThread) {
const workerInstance = new WorkerProcess(__filename);
const communicationChannel = new MessageChannel();
// binaryArray được đặt trong bộ nhớ chia sẻ
const sharedMemory = new SharedArrayBuffer(4);
const binaryArray = new Uint8Array(sharedMemory);
console.log("[main] binaryArray ban đầu", binaryArray);
workerInstance.postMessage({ portForYou: communicationChannel.port1, binaryArray }, [
communicationChannel.port1,
]);
communicationChannel.port2.on("message", (msg) => {
console.log("[main] binaryArray sau khi worker sửa đổi", binaryArray);
});
} else {
parentPort.on("message", (data) => {
assertion(data.portForYou instanceof MessagePort);
data.binaryArray[1] = 25;
console.log("[worker] binaryArray sau khi sửa đổi", data.binaryArray);
data.portForYou.postMessage("");
data.portForYou.close();
});
}
Kết quả đầu ra:
[main] binaryArray ban đầu Uint8Array(4) [ 0, 0, 0, 0 ]
[worker] binaryArray sau khi sửa đổi Uint8Array(4) [ 0, 25, 0, 0 ]
[main] binaryArray sau khi worker sửa đổi Uint8Array(4) [ 0, 25, 0, 0 ]
4. Đánh giá hiệu suất của worker_threads
1) Tính toán đơn luồng
Sử dụng ví dụ tạo số nguyên tố, chạy trực tiếp phạm vi từ 2 đến 1e7:
const lowerBound = 2;
const upperBound = 1e7;
const primeNumbers = [];
function createPrimes(start, range) {
let isPrimeNumber = true;
let end = start + range;
for (let i = start; i < end; i++) {
for (let j = lowerBound; j < Math.sqrt(end); j++) {
if (i !== j && i % j === 0) {
isPrimeNumber = false;
break;
}
}
if (isPrimeNumber) {
primeNumbers.push(i);
}
isPrimeNumber = true;
}
}
createPrimes(lowerBound, upperBound);
Sử dụng lệnh time để quan sát, tổng thời gian tiêu tốn là 10.36 giây, tỷ lệ sử dụng CPU đơn lõi đạt 99%
2) Tính toán đa luồng
Luồng chính chịu trách nhiệm chia nhỏ dữ liệu, phân chia dữ liệu thành các phần bằng nhau và phân phối cho các luồng làm việc.
Các luồng làm việc đọc phạm vi tính toán từ threadData và đồng thời tính toán các số nguyên tố trong phạm vi đó.
Mã nguồn:
const { WorkerProcess, isPrimaryThread, parentPort, threadData } = require('worker_threads');
const lowerBound = 2;
let primeNumbers = [];
function createPrimes(start, range) {
let isPrimeNumber = true;
let end = start + range;
for (let i = start; i < end; i++) {
for (let j = lowerBound; j < Math.sqrt(end); j++) {
if (i !== j && i % j === 0) {
isPrimeNumber = false;
break;
}
}
if (isPrimeNumber) {
primeNumbers.push(i);
}
isPrimeNumber = true;
}
}
if (isPrimaryThread) {
const upperBound = 1e7;
const threadAmount = +process.argv[2] || 4;
const threadSet = new Set();
console.log(`Chạy với ${threadAmount} luồng...`);
const partition = Math.ceil((upperBound - lowerBound) / threadAmount);
let start = lowerBound;
for (let i = 0; i < threadAmount - 1; i++) {
const currentStart = start;
threadSet.add(new WorkerProcess(__filename, { threadData: { start: currentStart, range: partition } }));
start += partition;
}
threadSet.add(new WorkerProcess(__filename, { threadData: { start, range: partition + ((upperBound - lowerBound + 1) % threadAmount) } }));
for (let worker of threadSet) {
worker.on('error', (err) => { throw err; });
worker.on('exit', () => {
threadSet.delete(worker);
console.log(`Luồng đang thoát, còn ${threadSet.size} luồng đang chạy...`);
if (threadSet.size === 0) {
// console.log(primeNumbers.join('\n'));
}
})
worker.on('message', (msg) => {
primeNumbers = primeNumbers.concat(msg);
});
}
} else {
createPrimes(threadData.start, threadData.range);
parentPort.postMessage(primeNumbers);
}
Khởi động 4 luồng làm việc cục bộ, với CPU 6 lõi không trong trạng thái bận, thời gian cuối cùng là 2.658 giây, giảm 4 lần so với đơn luồng. Hiệu suất CPU là 314%, sử dụng hơn 3 tài nguyên CPU để hỗ trợ tính toán này:
Có thể thấy hiệu quả tối ưu rất rõ rệt, tận dụng tốt lợi thế của CPU đa lõi.
5. Mô hình底层 của worker_threads
Đối với đơn luồng, Node.js bao gồm các thành phần sau:
- Một tiến trình
- Một luồng
- Một vòng lặp sự kiện
- Một thể hiện engine JS
- Một thể hiện Node.js
Đối với luồng làm việc, thành phần của Node.js trở thành:
- Một tiến trình
- Nhiều luồng
- Mỗi luồng có vòng lặp sự kiện độc lập
- Mỗi luồng có thể hiện engine JS độc lập
- Mỗi luồng có thể hiện Node.js độc lập
6. Câu hỏi thường gặp
1. Đa luồng có thể chạy song song? worker_threads có thể cải thiện hiệu suất CPU?
Đồng phát là chuyển đổi nhanh các tác vụ, bản chất vẫn là thực thi tuần tự; song song là thực thi đồng thời các tác vụ.
Đa luồng có thể đồng phát hoặc song song, điều này phụ thuộc vào việc có thể giành được tài nguyên CPU hay không. Quá trình này do hệ điều hành điều phối, không thể kiểm soát bằng tay.
Nhưng nhìn chung, giống như đa tiến trình, đa luồng cũng có thể cải thiện hiệu suất sử dụng CPU.
2. Sự khác biệt giữa worker_threads và cluster/child_process
child_process là module đa tiến trình của Node.js. cluster là module cụm được đóng gói dựa trên child_process, cung cấp nhiều API cho việc chạy đa tiến trình song song. Chúng đều dựa trên mô hình tiến trình.
worker_threads dựa trên mô hình luồng, chi phí thấp hơn, nhẹ nhàng hơn, và trong cùng một tiến trình, có thể giao tiếp thông qua bộ nhớ chia sẻ. Phù hợp để giải quyết các vấn đề tính toán nặng.