Hiện tượng dữ liệu lệch (data skew) là một trong những nguyên nhân phổ biến gây giảm hiệu năng xử lý trong Apache Spark. Bài viết này trình bày các tình huống thường gặp và giải pháp khắc phục, bao gồm: điều chỉnh nguồn dữ liệu đầu vào, thay đổi mức độ song song, sử dụng Partitioner tùy chỉnh, chuyển từ Reduce-side Join sang Map-side Join, và bổ sung tiền tố ngẫu nhiên cho các key bị lệch.
Nguyên nhân và tác hại của data skew
Data skew xảy ra khi một số partition chứa lượng dữ liệu lớn bất thường so với phần còn lại, khiến task xử lý partition đó trở thành nút cổ chai. Trong môi trường phân tán, lý tưởng là thời gian xử lý giảm tuyến tính theo số node. Tuy nhiên, nếu dữ liệu không cân bằng, một vài task sẽ xử lý hầu hết khối lượng công việc, làm chậm toàn bộ job.
Ví dụ: 3 máy xử lý dữ liệu, nhưng 1 máy xử lý 80% dữ liệu — thời gian hoàn thành chỉ giảm 20% so với chạy đơn lẻ, thay vì giảm 67% như kỳ vọng.
Hậu quả:
- Giảm hiệu suất tổng thể do task chậm nhất quyết định thời gian hoàn thành stage.
- Có thể gây tràn bộ nhớ (OOM) và làm job thất bại.
Phân tích nguồn gốc data skew trong Spark
Mỗi stage trong Spark gồm nhiều task chạy song song. Thời gian hoàn thành stage phụ thuộc vào task chậm nhất. Sự chênh lệch thời gian giữa các task chủ yếu do sự chênh lệch về lượng dữ liệu mỗi task xử lý.
Nguồn dữ liệu gây skew thường đến từ:
- Dữ liệu đầu vào (HDFS, Kafka, file...)
- Dữ liệu shuffle từ stage trước
Giải pháp 1: Cân bằng dữ liệu đầu vào
Đọc từ Kafka
Khi dùng DirectStream, mỗi Kafka partition tương ứng một Spark task. Nếu dữ liệu trên các Kafka partition không đều, Spark sẽ bị skew. Nên sử dụng RoundRobinPartitioner hoặc UniformStickyPartitioner để đảm bảo phân phối đều. Nếu bắt buộc phải gom nhóm dữ liệu theo key (ví dụ: user ID), cần kiểm soát kích thước nhóm để tránh tạo ra partition quá lớn.
Đọc từ file
Khi đọc file qua textFile(path, minPartitions), Spark chia file thành các split dựa trên thuật toán:
splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
Với file nén không chia được (như gzip), cả file sẽ thành một partition duy nhất — dễ gây skew nếu file nén tuy nhỏ nhưng chứa lượng dữ liệu lớn.
Ví dụ thực tế: Một thư mục chứa 10 file 271.9MB và 1 file 8.5GB (nén xuống còn 25.3MB). Khi xử lý, file nén tuy nhỏ nhưng mất 4.4 phút để xử lý — gấp 26 lần các task khác (~10 giây).
Khuyến nghị: Ưu tiên định dạng có thể chia nhỏ (Parquet, ORC, LZO index…), hoặc đảm bảo kích thước dữ liệu thực tế trong mỗi file là tương đương.
Giải pháp 2: Điều chỉnh mức độ song song (parallelism)
Spark mặc định dùng HashPartitioner để phân phối key trong shuffle. Nếu số lượng key khác nhau ánh xạ vào cùng một partition quá lớn, task đó sẽ bị quá tải.
Ví dụ: Có 1.5 tỷ bản ghi, trong đó 45 triệu bản ghi có key chia 12 dư 8 → tất cả đổ dồn vào task thứ 8, khiến task này mất 38 giây — trong khi các task khác chỉ mất 16 giây.
Khắc phục: Tăng hoặc giảm số partition shuffle để phân tán key đều hơn. Thử đặt groupByKey(48) → task nặng nhất giảm còn 11.25 triệu bản ghi, thời gian xử lý giảm xuống 24 giây. Thậm chí, đôi khi giảm parallelism (ví dụ: 11) cũng giúp cân bằng tốt hơn.
Cách thiết lập:
- Trong code: truyền tham số trực tiếp vào phép toán (ví dụ:
.reduceByKey(48)) - Spark SQL:
SET spark.sql.shuffle.partitions=48; - Cấu hình global:
spark.default.parallelism
Hạn chế: Chỉ hiệu quả khi skew do nhiều key khác nhau ánh xạ vào cùng task — không giải quyết được nếu một key duy nhất chiếm phần lớn dữ liệu.
Giải pháp 3: Sử dụng Partitioner tùy chỉnh
Thay vì dùng HashPartitioner mặc định, bạn có thể tự định nghĩa logic phân phối key.
.partitionBy(new Partitioner() {
public int numPartitions() { return 12; }
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if (id >= 9500000 && id <= 9500084 && (id - 9500000) % 12 == 0)
return (id - 9500000) / 12;
else
return id % 12;
}
})
Ưu điểm: Không ảnh hưởng đến mức độ song song của các stage sau.
Nhược điểm: Phải viết riêng cho từng trường hợp, không linh hoạt. Không giải quyết được vấn đề nếu một key duy nhất quá lớn.
Giải pháp 4: Chuyển từ Reduce-side Join sang Map-side Join
Khi một bảng nhỏ (dưới ngưỡng broadcast) join với bảng lớn, nên dùng broadcast join để tránh shuffle.
Cách kích hoạt:
SET spark.sql.autoBroadcastJoinThreshold=104857600; -- 100MB
Kết quả: DAG chỉ còn 1 stage, không còn shuffle → loại bỏ hoàn toàn khả năng xảy ra data skew trong join. Thời gian xử lý giảm từ 7.3 phút xuống còn 1.5 phút.
Điều kiện áp dụng: Bảng nhỏ phải đủ nhỏ để broadcast vào bộ nhớ tất cả executor.
Hạn chế: Không áp dụng được cho phép gộp nhóm (aggregation), chỉ hiệu quả với join.
Giải pháp 5: Thêm tiền tố ngẫu nhiên cho key bị lệch
Áp dụng khi một vài key chiếm phần lớn dữ liệu, và bảng còn lại có key phân bố đều.
Cơ chế:
- Tách riêng dữ liệu chứa các key bị lệch.
- Thêm tiền tố ngẫu nhiên (1–N) vào key bị lệch ở bảng lớn.
- Nhân bản dữ liệu tương ứng ở bảng nhỏ lên N lần, mỗi bản thêm một tiền tố tương ứng.
- Join hai tập đã xử lý, sau đó loại bỏ tiền tố.
- Join phần dữ liệu không bị lệch như bình thường.
- Gộp kết quả bằng
union.
Kết quả: Thời gian xử lý giảm từ 1 phút 54 giây xuống còn 58 giây.
Ưu điểm: Hiệu quả cao, chỉ mở rộng dữ liệu ở phần bị lệch.
Hạn chế: Không phù hợp nếu có quá nhiều key bị lệch — khiến bảng nhỏ phải nhân bản quá lớn. Cần quét dữ liệu 2 lần.
Giải pháp 6: Mở rộng toàn bộ bảng nhỏ
Khi có quá nhiều key bị lệch, không thể tách riêng từng key — hãy thêm tiền tố ngẫu nhiên cho toàn bộ bảng lớn, và nhân bản toàn bộ bảng nhỏ lên N lần.
JavaPairRDD leftWithPrefix = leftRDD.mapToPair(t ->
new Tuple2<>(new Random().nextInt(N) + "," + t._1(), t._2()));
JavaPairRDD rightExpanded = rightRDD.flatMapToPair(t ->
IntStream.range(0, N)
.mapToObj(i -> new Tuple2<>(i + "," + t._1(), t._2()))
.collect(Collectors.toList()).iterator());
JavaPairRDD result = leftWithPrefix
.join(rightExpanded)
.mapToPair(t -> new Tuple2<>(t._1().split(",")[1], t._2()._2()));
Ưu điểm: Áp dụng được cho mọi trường hợp có bảng nhỏ.
Nhược điểm: Tiêu tốn bộ nhớ và CPU do phải nhân bản toàn bộ bảng nhỏ.
Kết luận
Không có giải pháp vạn năng cho data skew. Cần phân tích đặc điểm dữ liệu (số lượng key lệch, kích thước dataset, tỉ lệ nghiêng…) để lựa chọn hoặc kết hợp nhiều kỹ thuật. Trong thực tế, thường bắt đầu bằng broadcast join hoặc điều chỉnh parallelism, sau đó mới áp dụng các kỹ thuật phức tạp hơn như thêm tiền tố ngẫu nhiên.