NiFi xử lý dữ liệu thông qua các flow file, mỗi đối tượng gồm hai thành phần chính: attributes (siêu dữ liệu) và content (dữ liệu nhị phân thuần túy). Trong khi attributes được thao tác dễ dàng qua các thuộc tính của đối tượng flowFile, thì content yêu cầu cơ chế đặc biệt để đọc/ghi — do bản chất không có schema, định dạng hay cấu trúc cố định. Việc xử lý nội dung phụ thuộc vào ngữ cảnh: một số processor giả định định dạng dựa trên thuộc tính như mime.type, một số khác yêu cầu phân tích thủ công.
Các giao diện callback cho I/O với flow file
API ProcessSession cung cấp ba giao diện callback để tương tác an toàn với nội dung flow file, đảm bảo tài nguyên được mở/đóng đúng cách:
InputStreamCallback: Dùng vớisession.read(flowFile, callback)để truy cậpInputStreamtừ flow file hiện tại. Phương thức duy nhấtprocess(InputStream in)được gọi tự động; stream sẽ đóng sau khi thực thi.OutputStreamCallback: Dùng vớisession.write(flowFile, callback)để ghi dữ liệu mới vào flow file. Phương thứcprocess(OutputStream out)nhận stream đã được quản lý — người dùng không cần gọiclose()thủ công.StreamCallback: Cũng dùng vớisession.write(flowFile, callback), nhưng cung cấp cảInputStream(đọc nội dung cũ) vàOutputStream(ghi nội dung mới), thích hợp cho biến đổi nội dung tại chỗ.
Lưu ý: Các phương thức thay thế như session.read(flowFile) trả về InputStream trực tiếp hoặc session.importFrom(inputStream, flowFile) để ghi dữ liệu từ nguồn ngoài đều yêu cầu quản lý vòng đời stream thủ công — điều này làm tăng rủi ro rò rỉ tài nguyên nếu quên đóng.
Đọc nội dung từ flow file
Khi cần xử lý toàn bộ nội dung (ví dụ: parse JSON, kiểm tra độ dài, chuyển mã), nên sử dụng InputStreamCallback. Tuy nhiên, với dữ liệu lớn, nên ưu tiên xử lý từng phần (streaming) thay vì tải toàn bộ vào bộ nhớ.
Ví dụ (Groovy):
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
def flowFile = session.get()
if (!flowFile) return
def content = ''
session.read(flowFile, { inputStream ->
content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
// Xử lý chuỗi ở đây: validate, transform, extract...
log.info("Độ dài nội dung: ${content.length()}")
} as InputStreamCallback)
Ghi nội dung mới vào flow file
Dùng OutputStreamCallback để tạo hoặc thay thế hoàn toàn nội dung. Đây là lựa chọn phù hợp khi tạo flow file mới từ dữ liệu tĩnh hoặc API bên ngoài.
Ví dụ (JavaScript):
const OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback")
const StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
const flowFile = session.get()
if (!flowFile) return
const newData = "Dữ liệu được sinh bởi script"
flowFile = session.write(flowFile, new OutputStreamCallback((outputStream) => {
outputStream.write(newData.getBytes(StandardCharsets.UTF_8))
}))
Biến đổi nội dung tại chỗ
Khi cần sửa đổi nội dung hiện có (ví dụ: chuẩn hóa JSON, thêm tiêu đề, mã hóa Base64), StreamCallback là giải pháp tối ưu — nó cho phép đọc và ghi trong cùng một transaction, tránh tạo bản sao tạm.
Ví dụ (Jython):
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class TransformCallback(StreamCallback):
def process(self, inputStream, outputStream):
raw = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# Chuyển thành chữ hoa, thêm dấu gạch ngang đầu dòng
transformed = "- " + raw.toUpperCase()
outputStream.write(bytearray(transformed.encode('UTF-8')))
flowFile = session.get()
if flowFile:
flowFile = session.write(flowFile, TransformCallback())
Xử lý ngoại lệ và xác thực dữ liệu
ExecuteScript hỗ trợ hai mối quan hệ chuyển tiếp: REL_SUCCESS và REL_FAILURE. Mọi lỗi phát sinh trong script (exception, dữ liệu không hợp lệ) nên được bắt và chuyển flow file sang REL_FAILURE, đồng thời ghi log chi tiết.
Ví dụ (JRuby):
java_import org.apache.nifi.processor.Processor
flowFile = session.get()
if !flowFile
return
end
begin
# Thực hiện xử lý có thể thất bại
content = session.read(flowFile) { |in| in.readAllBytes }
raise "Nội dung rỗng" if content.empty?
# Ghi kết quả
flowFile = session.write(flowFile) { |out| out.write(content) }
session.transfer(flowFile, REL_SUCCESS)
rescue => e
log.error("Xử lý thất bại cho flow file #{flowFile.uuid}", e)
session.transfer(flowFile, REL_FAILURE)
end