Tối ưu hóa DataX cho hiệu suất đồng bộ dữ liệu cao

Quy trình thực thi của DataX

Để thực hiện tối ưu, trước tiên cần hiểu rõ quy trình thực thi, quy trình như sau:

  1. DataX hoàn thành một công việc đồng bộ dữ liệu đơn lẻ, được gọi là Job. Khi nhận được một Job, DataX sẽ khởi động một tiến trình để hoàn thành toàn bộ quá trình đồng bộ. Module Job của DataX đóng vai trò là nút quản lý trung tâm cho một công việc đơn lẻ, chịu trách nhiệm dọn dẹp dữ liệu, chia nhỏ tác vụ con (chuyển đổi một công việc đơn lẻ thành nhiều Task con) và quản lý TaskGroup.
  2. Sau khi DataX Job khởi động, tùy theo chiến lược chia nhỏ nguồn dữ liệu khác nhau, Job sẽ được chia thành nhiều Task nhỏ hơn để thực thi đồng thời. Task là đơn vị nhỏ nhất của công việc DataX, mỗi Task sẽ chịu trách nhiệm đồng bộ một phần dữ liệu.
  3. Sau khi chia nhỏ nhiều Task, DataX Job sẽ gọi module Scheduler, dựa trên lượng dữ liệu đồng thời được cấu hình, để tái tổ hợp các Task đã chia nhỏ thành TaskGroup (nhóm tác vụ). Mỗi TaskGroup chịu trách nhiệm chạy với một mức độ đồng nhất nhất định để hoàn thành tất cả các Task được phân bổ, mặc định số lượng đồng thời cho một nhóm tác vụ là 5.
  4. Mỗi Task do TaskGroup chịu trách nhiệm khởi động. Sau khi Task được khởi động, nó sẽ cố định khởi động các luồng Reader → Channel → Writer để hoàn thành công việc đồng bộ dữ liệu.
  5. Khi DataX Job chạy, Job sẽ giám sát và chờ đợi nhiều TaskGroup hoàn thành. Khi tất cả TaskGroup hoàn thành, Job sẽ thoát thành công. Ngược lại, nó sẽ thoát bất thường với giá trị thoát khác 0.

Tóm tắt quy trình như sau: Một DataX Job sẽ được chia thành nhiều Task, mỗi Task được nhóm theo TaskGroup, bên trong một Task sẽ có một nhóm Reader → Channel → Writer. Channel là kênh trao đổi dữ liệu kết nối Reader và Writer, tất cả dữ liệu đều được truyền qua Channel.

Dựa trên quy trình trên, chúng ta có thể thực hiện các tối ưu sau

Tối ưu 1: Tăng tốc độ mỗi channel Trong DataX, mỗi Channel có cơ chế kiểm soát tốc độ nghiêm ngặt, chia thành hai loại: kiểm soát số bản ghi đồng bộ mỗi giây và kiểm soát số byte đồng bộ mỗi giây. Giới hạn tốc độ mặc định là 1MB/s, có thể thiết lập tốc độ byte hoặc record này dựa trên tình hình phần cứng cụ thể, thường thiết lập tốc độ byte, ví dụ: chúng ta có thể thiết lập giới hạn tốc độ cho một Channel là 5MB.

Tối ưu 2:* Tăng số lượng Channel đồng thời trong DataX Job Số lượng đồng thời = số lượng taskGroup × số Task đồng thời được thực thi bởi mỗi TaskGroup (mặc định số lượng đồng thời cho một nhóm tác vụ là 5).

Có ba cách cấu hình để tăng số lượng Channel đồng thời trong job:

  1. Cấu hình giới hạn Byte toàn cầu và giới hạn Byte cho mỗi Channel, Số Channel = Giới hạn Byte toàn cầu / Giới hạn Byte cho mỗi Channel
  2. Cấu hình giới hạn Record toàn cầu và giới hạn Record cho mỗi Channel, Số Channel = Giới hạn Record toàn cầu / Giới hạn Record cho mỗi Channel
  3. Cấu hình trực tiếp số lượng Channel.

Cấu hình có nghĩa là: job.setting.speed.channel : số lượng Channel đồng thời job.setting.speed.record : cấu hình giới hạn record cho channel toàn cầu job.setting.speed.byte: cấu hình giới hạn byte cho channel toàn cầu

core.transport.channel.speed.record: giới hạn record cho mỗi channel core.transport.channel.speed.byte: giới hạn byte cho mỗi channel

Cách 1:

Ví dụ: core.transport.channel.speed.byte=1048576, job.setting.speed.byte=5242880, do đó Số Channel = Giới hạn Byte toàn cầu / Giới hạn Byte cho mỗi Channel = 5242880/1048576 = 5, cấu hình như sau:

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte" : 5242880
            }
        },
        ...
    }
}

Cách 2:

Ví dụ: core.transport.channel.speed.record=100, job.setting.speed.record=500, do đó cấu hình giới hạn Record toàn cầu và giới hạn Record cho mỗi Channel, Số Channel = Giới hạn Record toàn cầu / Giới hạn Record cho mỗi Channel = 500/100 = 5

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "record": 100
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "record" : 500
            }
        },
        ...
    }
}

Cách 3:

Ví dụ: cấu hình trực tiếp job.setting.speed.channel=5, do đó số Channel đồng thời trong job là 5

{
    "job": {
        "setting": {
            "speed": {
                "channel" : 5
            }
        },
        ...
    }
}

Lưu ý quan trọng

  • Khi tăng số lượng Channel đồng thời trong DataX Job, cần điều chỉnh tham số heap của JVM, lý do như sau:
         - Khi số Channel trong một Job tăng lên, việc sử dụng bộ nhớ sẽ tăng đáng kể, vì DataX đóng vai trò là kênh trao đổi dữ liệu, sẽ lưu trữ nhiều dữ liệu trong bộ nhớ.
         - Ví dụ, Channel sẽ có một Buffer, làm vùng đệm tạm thời để trao đổi dữ liệu, và trong một số Reader và Writer cũng tồn tại một số Buffer. Để ngăn chặn các lỗi như tràn bộ nhớ của JVM, cần tăng tham số heap của JVM.
         - Thường chúng tôi khuyên nên đặt bộ nhớ là 4G hoặc 8G, điều này cũng có thể điều chỉnh tùy theo tình hình thực tế
         - Hai cách điều chỉnh tham số xms xmx của JVM: một là thay đổi trực tiếp datax.py;另一种是在启动的时候,加上对应的参数,如下:
           python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
  • Số lượng Channel không phải càng nhiều càng tốt, lý do như sau:
           - Việc tăng số lượng Channel mang lại nhiều tiêu thụ CPU và bộ nhớ hơn.
           - Nếu cấu hình đồng thời Channel quá cao dẫn đến bộ nhớ JVM không đủ dùng, sẽ xảy ra tình trạng Full GC thường xuyên, tốc độ xuất dữ liệu sẽ giảm đột ngột, phản tác dụng. Điều này có thể phát hiện qua quan sát nhật ký

Kiểm tra sử dụng datax từ mysql đến mysql, hiệu quả kiểm tra với các cấu hình khác nhau như sau:

Sử dụng channel đơn mặc định giới hạn tốc độ 1M/s, kết quả kiểm tra như sau, hoàn thành sau 1660s:

Sử dụng kênh đơn, tốc độ 5M/s, kết quả kiểm tra như sau, hoàn thành sau 50s:

Sử dụng 5 kênh, tốc độ 5M/s, kết quả kiểm tra như sau, hoàn thành sau 10s:

Lưu ý: Khi MysqlReader trích xuất dữ liệu, nếu chỉ định splitPk, điều này có nghĩa là người dùng muốn sử dụng trường được splitPk đại diện để phân mảnh dữ liệu, DataX sẽ khởi động các tác vụ đồng thời để đồng bộ dữ liệu, điều này có thể nâng cao đáng kể hiệu suất đồng bộ dữ liệu. Nếu không điền splitPk, bao gồm không cung cấp splitPk hoặc splitPk có giá trị rỗng, DataX coi là sử dụng kênh đơn để đồng bộ dữ liệu của bảng, kiểm tra thứ ba không cấu hình splitPk không thể thấy hiệu quả

Tối ưu không có công thức cố định, trước tiên cần hiểu nguyên lý, sau đó thực hiện tối ưu cục bộ hoặc toàn cầu dựa trên nguyên lý và quá trình thực thi, cảm ơn mọi người đã đọc

Thẻ: DataX tối ưu hóa đồng bộ dữ liệu Performance tuning ETL

Đăng vào ngày 21 tháng 5 lúc 19:05