Trong Apache Flink, việc xây dựng và tối ưu hóa luồng xử lý dữ liệu bắt đầu từ StreamGraph và kết thúc bằng JobGraph, một cấu trúc được tối ưu để gửi tới cụm thực thi. Một khái niệm quan trọng trong quá trình này là mẫu phân phối (Distribution Pattern), xác định cách các subtask của tác vụ sản xuất kết nối với các subtask của tác vụ tiêu thụ.
public enum DistributionPattern {
/**
* Mỗi subtask sản xuất kết nối với mọi subtask tiêu thụ.
*/
ALL_TO_ALL,
/**
* Mỗi subtask sản xuất kết nối với một hoặc nhiều subtask tiêu thụ theo cách điểm-điểm.
*/
POINTWISE
}
Với ALL_TO_ALL, toàn bộ các subtask ở phía trên (upstream) đều thiết lập kênh truyền đến tất cả subtask ở phía dưới (downstream), phù hợp cho các chiến lược phân vùng như RebalancePartitioner hay ShufflePartitioner.
Ngược lại, POINTWISE chỉ thiết lập kết nối giữa các subtask theo ánh xạ nhất định — ví dụ: subtask i của upstream kết nối với subtask i % parallelism của downstream, thường dùng trong trường hợp ForwardPartitioner.
Quá trình chuyển đổi từ StreamGraph sang JobGraph
Sau khi StreamGraph được xây dựng từ chương trình người dùng (DataStream API), hệ thống tiến hành dịch thành JobGraph thông qua chuỗi gọi hàm sau:
PipelineExecutorUtils.getJobGraph()
→ FlinkPipelineTranslationUtil.getJobGraph()
→ StreamGraphTranslator.translateToJobGraph()
→ StreamGraph.getJobGraph()
→ StreamingJobGraphGenerator.createJobGraph()
Trong bước cuối cùng, StreamingJobGraphGenerator đóng vai trò trung tâm, chịu trách nhiệm tối ưu hóa đồ thị bằng kỹ thuật chaining – gộp nhiều toán tử liên tiếp thành một JobVertex duy nhất để giảm độ trễ và chi phí giao tiếp giữa các task.
Điều kiện để thực hiện Chaining
Hai toán tử liền kề có thể được gộp vào cùng một chuỗi (chain) nếu thỏa mãn đồng thời các điều kiện sau:
- Toán tử phía dưới chỉ có đúng một cạnh đầu vào (
inEdges.size() == 1). - Cả hai nằm trong cùng một nhóm chia sẻ slot (
SlotSharingGroup). - Chiến lược phân vùng dữ liệu là
ForwardPartitioner. - Chế độ trao đổi dữ liệu không phải dạng batch.
- Độ song song của cả hai toán tử là bằng nhau.
- Thuộc tính chaining trong
StreamGraphđược bật (giá trị mặc định làtrue). - Hai toán tử hỗ trợ chain hóa theo phương thức
areOperatorsChainable().
Cơ chế kiểm tra khả năng chain hóa
Mã nguồn kiểm tra điều kiện chain được thực hiện qua phương thức isChainable() như sau:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode downstream = streamGraph.getTargetVertex(edge);
return downstream.getInEdges().size() == 1
&& isChainableInput(edge, streamGraph);
}
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upstream = streamGraph.getSourceVertex(edge);
StreamNode downstream = streamGraph.getTargetVertex(edge);
return upstream.isSameSlotSharingGroup(downstream)
&& areOperatorsChainable(upstream, downstream, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& !streamGraph.isTransformedHighThroughputMode();
}
Nếu điều kiện được đáp ứng, hai node sẽ được gộp vào cùng một chuỗi thực thi bên trong OperatorChainInfo, và cuối cùng sinh ra một JobVertex duy nhất thay vì hai.
Thiết lập chuỗi thực thi
Phương thức setChaining() trong StreamingJobGraphGenerator đảm nhận việc duyệt đệ quy từ các điểm khởi đầu (thường là source) và xây dựng từng chuỗi operator:
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
Map<Integer, OperatorChainInfo> chainHeads = buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
Collection<OperatorChainInfo> entryPoints = chainHeads.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
for (OperatorChainInfo info : entryPoints) {
createChain(info.getStartNodeId(), 1, info, chainHeads);
}
}
Hàm createChain() sẽ tiếp tục đi sâu vào các cạnh đầu ra của mỗi node, mở rộng chuỗi miễn là điều kiện chain vẫn còn thỏa mãn.