Sử dụng bộ xử lý ExecuteScript trong Apache NiFi: Xử lý luồng dữ liệu và quản lý lỗi

NiFi xử lý dữ liệu thông qua các flow file, mỗi đối tượng gồm hai thành phần chính: attributes (siêu dữ liệu) và content (dữ liệu nhị phân thuần túy). Trong khi attributes được thao tác dễ dàng qua các thuộc tính của đối tượng flowFile, thì content yêu cầu cơ chế đặc biệt để đọc/ghi — do bản chất không có schema, định dạng hay cấu trúc cố định. Việc xử lý nội dung phụ thuộc vào ngữ cảnh: một số processor giả định định dạng dựa trên thuộc tính như mime.type, một số khác yêu cầu phân tích thủ công.

Các giao diện callback cho I/O với flow file

API ProcessSession cung cấp ba giao diện callback để tương tác an toàn với nội dung flow file, đảm bảo tài nguyên được mở/đóng đúng cách:

  • InputStreamCallback: Dùng với session.read(flowFile, callback) để truy cập InputStream từ flow file hiện tại. Phương thức duy nhất process(InputStream in) được gọi tự động; stream sẽ đóng sau khi thực thi.
  • OutputStreamCallback: Dùng với session.write(flowFile, callback) để ghi dữ liệu mới vào flow file. Phương thức process(OutputStream out) nhận stream đã được quản lý — người dùng không cần gọi close() thủ công.
  • StreamCallback: Cũng dùng với session.write(flowFile, callback), nhưng cung cấp cả InputStream (đọc nội dung cũ) và OutputStream (ghi nội dung mới), thích hợp cho biến đổi nội dung tại chỗ.

Lưu ý: Các phương thức thay thế như session.read(flowFile) trả về InputStream trực tiếp hoặc session.importFrom(inputStream, flowFile) để ghi dữ liệu từ nguồn ngoài đều yêu cầu quản lý vòng đời stream thủ công — điều này làm tăng rủi ro rò rỉ tài nguyên nếu quên đóng.

Đọc nội dung từ flow file

Khi cần xử lý toàn bộ nội dung (ví dụ: parse JSON, kiểm tra độ dài, chuyển mã), nên sử dụng InputStreamCallback. Tuy nhiên, với dữ liệu lớn, nên ưu tiên xử lý từng phần (streaming) thay vì tải toàn bộ vào bộ nhớ.

Ví dụ (Groovy):

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

def flowFile = session.get()
if (!flowFile) return

def content = ''
session.read(flowFile, { inputStream ->
    content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    // Xử lý chuỗi ở đây: validate, transform, extract...
    log.info("Độ dài nội dung: ${content.length()}")
} as InputStreamCallback)

Ghi nội dung mới vào flow file

Dùng OutputStreamCallback để tạo hoặc thay thế hoàn toàn nội dung. Đây là lựa chọn phù hợp khi tạo flow file mới từ dữ liệu tĩnh hoặc API bên ngoài.

Ví dụ (JavaScript):

const OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback")
const StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

const flowFile = session.get()
if (!flowFile) return

const newData = "Dữ liệu được sinh bởi script"
flowFile = session.write(flowFile, new OutputStreamCallback((outputStream) => {
    outputStream.write(newData.getBytes(StandardCharsets.UTF_8))
}))

Biến đổi nội dung tại chỗ

Khi cần sửa đổi nội dung hiện có (ví dụ: chuẩn hóa JSON, thêm tiêu đề, mã hóa Base64), StreamCallback là giải pháp tối ưu — nó cho phép đọc và ghi trong cùng một transaction, tránh tạo bản sao tạm.

Ví dụ (Jython):

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class TransformCallback(StreamCallback):
    def process(self, inputStream, outputStream):
        raw = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        # Chuyển thành chữ hoa, thêm dấu gạch ngang đầu dòng
        transformed = "- " + raw.toUpperCase()
        outputStream.write(bytearray(transformed.encode('UTF-8')))

flowFile = session.get()
if flowFile:
    flowFile = session.write(flowFile, TransformCallback())

Xử lý ngoại lệ và xác thực dữ liệu

ExecuteScript hỗ trợ hai mối quan hệ chuyển tiếp: REL_SUCCESSREL_FAILURE. Mọi lỗi phát sinh trong script (exception, dữ liệu không hợp lệ) nên được bắt và chuyển flow file sang REL_FAILURE, đồng thời ghi log chi tiết.

Ví dụ (JRuby):

java_import org.apache.nifi.processor.Processor
flowFile = session.get()
if !flowFile
  return
end

begin
  # Thực hiện xử lý có thể thất bại
  content = session.read(flowFile) { |in| in.readAllBytes }
  raise "Nội dung rỗng" if content.empty?

  # Ghi kết quả
  flowFile = session.write(flowFile) { |out| out.write(content) }
  session.transfer(flowFile, REL_SUCCESS)

rescue => e
  log.error("Xử lý thất bại cho flow file #{flowFile.uuid}", e)
  session.transfer(flowFile, REL_FAILURE)
end

Thẻ: apache-nifi executescript groovy jython JavaScript

Đăng vào ngày 29 tháng 6 lúc 19:57