Đồng bộ dữ liệu MySQL sang HBase theo thời gian thực

Vấn đề với MySQL khi dữ liệu lớn

Trong môi trường doanh nghiệp, MySQL là hệ quản trị cơ sở dữ liệu được sử dụng rộng rãi nhất. Tuy nhiên, MySQL có một hạn chế quan trọng: khi khối lượng dữ liệu đạt đến hàng triệu bản ghi, các thao tác truy vấn và cập nhật trở nên cực kỳ chậm chạp. Nếu hệ thống yêu cầu hiển thị dữ liệu theo thời gian thực, đây thực sự là một thách thức lớn cho MySQL. Thêm vào đó, MySQL còn phải phục vụ đồng thời nhiều developers và users khác nhau trong cùng một thời điểm.

Sau khi nghiên cứu, giải pháp được đề xuất là đồng bộ dữ liệu MySQL sang HBase - một database phân tán được thiết kế để xử lý khối lượng dữ liệu cực lớn.

Kiến trúc ban đầu

MySQL---logstash---kafka---sparkStreaming---hbase---web

Hoặc:

MySQL---sqoop---hbase---web

Vấn đề với kiến trúc cũ

Dù sử dụng logstash hay kafka, cả hai đều gặp phải một vấn đề quan trọng: trong quá trình import dữ liệu, chúng cần thực hiện các truy vấn trực tiếp đến MySQL. Điều này tất yếu làm tăng tải cho MySQL - vốn đang phải xử lý nhiều tác vụ khác nhau.

Câu hỏi đặt ra là: có cách nào để đồng bộ dữ liệu MySQL sang HBase mà không gây thêm gánh nặng cho MySQL hay không?

Câu trả lời là CÓ - sử dụng canal hoặc maxwell để phân tích binlog của MySQL.

Kiến trúc mới

MySQL---canal---kafka---flink---hbase---web

Bước 1: Kích hoạt Binlog trong MySQL

Binlog là nhật ký nhị phân của MySQL, dùng để ghi lại các thay đổi dữ liệu như INSERT, UPDATE, DELETE. Các truy vấn đọc dữ liệu như SELECT hay SHOW không được ghi vào binlog. Chức năng chính của binlog là phục vụ việc đồng bộ master-slave và khôi phục dữ liệu增量.

Để MySQL tạo binlog, cần bật tính năng log-bin:

-rw-rw---- 1 mysql mysql 669 May 10 21:29 mysql-bin.000001
-rw-rw---- 1 mysql mysql 126 May 10 22:06 mysql-bin.000002
-rw-rw---- 1 mysql mysql 11799 May 15 18:17 mysql-bin.000003

Cấu hình file /etc/my.cnf:

log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW
server_id=1

Giải thích:

  • log-bin: đường dẫn lưu trữ binlog
  • binlog-format=ROW: ghi nhận mỗi dòng dữ liệu bị thay đổi
  • server_id: ID duy nhất cho máy chủ (nếu là cluster thì không được trùng lặp)

Kiểm tra trạng thái binlog:

show variables like '%log_bin%';

Nếu kết quả hiển thị các thông số binlog, tức là đã kích hoạt thành công.

Bước 2: Cài đặt và cấu hình Canal

Giới thiệu về Canal

Canal là một dự án mã nguồn mở từ Alibaba, được viết hoàn toàn bằng Java. Công cụ này phân tích nhật ký增量 của database và cung cấp tính năng đăng ký tiêu thụ dữ liệu增量. Canal chủ yếu hỗ trợ MySQL (và cả MariaDB).

Nguồn gốc: Các công ty thuộc hệ sinh thái Alibaba ban đầu có nhu cầu đồng bộ dữ liệu giữa hai trung tâm dữ liệu ở Hàng Châu và Mỹ. Trước đây, việc đồng bộ dựa trên trigger, nhưng từ năm 2010, Alibaba bắt đầu thử nghiệm phương pháp phân tích nhật ký database để lấy các thay đổi增量 và đồng bộ hóa - từ đó hình thành nên Canal.

Nguyên lý hoạt động

  1. Canal giả lập giao thức tương tác của MySQL slave, tự nhận mình là MySQL slave và gửi yêu cầu dump đến MySQL master
  2. MySQL master nhận yêu cầu dump và bắt đầu gửi binary log cho slave (chính là Canal)
  3. Canal phân tích binary log (dạng byte stream ban đầu)

Cài đặt Canal

tar -zxvf canal.deployer-1.0.23.tar.gz -C /export/servers/canal

Cấu hình file instance.properties:

vim /export/servers/canal/conf/example/instance.properties

Phát triển ứng dụng Canal

Sau khi cài đặt Canal, cần phát triển thêm mã nguồn để chuyển tiếp dữ liệu đã phân tích sang Kafka. Canal đã cung cấp mã mẫu, chỉ cần tùy chỉnh theo nhu cầu.

Mã mẫu: https://github.com/alibaba/canal/wiki/ClientExample

Thêm dependency Maven:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.23</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.1</version>
</dependency>

Kiểm tra hoạt động của Canal

Bước 1: Khởi động Kafka và tạo topic

/export/servers/kafka/bin/kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/null 2>&1 &
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic mycanal

Bước 2: Khởi động consumer để kiểm tra Canal

/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic mycanal

Bước 3: Khởi động MySQL

service mysqld start

Bước 4: Khởi động Canal

canal/bin/startup.sh

Bước 5: Kết nối MySQL và thực hiện các thao tác INSERT/UPDATE/DELETE để quan sát dữ liệu trong Kafka console

Bước 3: Sử dụng Flink để đọc dữ liệu từ Kafka và ghi vào HBase

Pom.xml - Các dependency cần thiết:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <scala.version>2.11.8</scala.version>
    <flink.version>1.6.0</flink.version>
    <hadoop.version>2.7.5</hadoop.version>
</properties>

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hbase_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

Mã nguồn Scala - Xử lý dữ liệu với Flink:

import java.util
import java.util.Properties
import org.apache.commons.lang3.StringUtils
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, Put}
import org.apache.hadoop.hbase.util.Bytes

case class FieldChange(colName: String, colValue: String)

case class BinlogRecord(
  binFile: String,
  binPosition: String,
  database: String,
  table: String,
  operation: String,
  dataColumns: String,
  rowIndex: String
)

object SyncToHBase {
  val zookeeperQuorum = "hadoop01,hadoop02,hadoop03"
  val kafkaBrokers = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
  val kafkaTopic = "canal"
  val zkPort = "2181"
  val hbaseTable: TableName = TableName.valueOf("canal")
  val colFamily = "info"

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(2000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointInterval(6000)
    System.setProperty("hadoop.home.dir", "/")

    val kafkaProps = new Properties()
    kafkaProps.setProperty("bootstrap.servers", kafkaBrokers)
    kafkaProps.setProperty("zookeeper.connect", zookeeperQuorum)
    kafkaProps.setProperty("group.id", kafkaTopic)
    
    val kafkaConsumer = new FlinkKafkaConsumer09[String](kafkaTopic, new SimpleStringSchema(), kafkaProps)
    val stream = env.addSource(kafkaConsumer).setParallelism(1)

    val parsedData = stream.map { line =>
      val parts = line.split("#CS#")
      val len = parts.length
      
      BinlogRecord(
        if(len > 0) parts(0) else "",
        if(len > 1) parts(1) else "",
        if(len > 2) parts(2) else "",
        if(len > 3) parts(3) else "",
        if(len > 4) parts(4) else "",
        if(len > 5) parts(5) else "",
        if(len > 6) parts(6) else ""
      )
    }

    parsedData.map { record =>
      val colStr = record.dataColumns
      val colArray = parseColumnList(colStr)
      val pkValue = extractPrimaryKey(colArray)
      val rowKey = record.database + "_" + record.table + "_" + pkValue
      val opType = record.operation
      val changedFields = extractChangedColumns(colArray, opType)

      if(opType.equals("DELETE")){
        deleteFromHBase(rowKey, opType)
      } else {
        if(changedFields.size() > 0){
          upsertToHBase(rowKey, opType, changedFields)
        }
      }
    }
    env.execute()
  }

  def parseColumnList(colListStr: String): String = {
    colListStr.substring(1, colListStr.length - 1)
  }

  def extractPrimaryKey(cols: String): String = {
    val matches: Array[String] = StringUtils.substringsBetween(cols, "[", "]")
    val firstMatch = matches(0)
    firstMatch.split(",")(1).trim
  }

  def extractChangedColumns(cols: String, opType: String): util.ArrayList[FieldChange] = {
    val matches: Array[String] = StringUtils.substringsBetween(cols, "[", "]")
    val result = new util.ArrayList[FieldChange]()
    
    opType match {
      case "UPDATE" =>
        for(i <- 1 to matches.length - 1){
          val fields = matches(i).split(",")
          if(fields(2).trim.toBoolean == true){
            result.add(FieldChange(fields(0), fields(1)))
          }
        }
      case "INSERT" =>
        for(i <- 1 to matches.length - 1){
          val fields = matches(i).split(",")
          result.add(FieldChange(fields(0), fields(1)))
        }
      case _ =>
    }
    result
  }

  def upsertToHBase(rowKey: String, opType: String, fields: util.ArrayList[FieldChange]): Unit = {
    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", zookeeperQuorum)
    config.set("hbase.master", "hadoop01:60000")
    config.set("hbase.zookeeper.property.clientPort", zkPort)
    config.setInt("hbase.rpc.timeout", 20000)
    config.setInt("hbase.client.operation.timeout", 30000)
    config.setInt("hbase.client.scanner.timeout.period", 200000)
    
    val connection = ConnectionFactory.createConnection(config)
    val admin = connection.getAdmin
    
    val tableDesc = new HTableDescriptor(hbaseTable)
    val colDesc = new HColumnDescriptor(colFamily)
    tableDesc.addFamily(colDesc)
    
    if(!admin.tableExists(hbaseTable)){
      admin.createTable(tableDesc)
    }
    
    val table = connection.getTable(hbaseTable)
    val put = new Put(Bytes.toBytes(rowKey))
    
    for(i <- 0 to fields.size() - 1){
      val field = fields.get(i)
      put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(field.colName), Bytes.toBytes(field.colValue))
    }
    table.put(put)
  }

  def deleteFromHBase(rowKey: String, opType: String): Unit = {
    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", zookeeperQuorum)
    config.set("hbase.zookeeper.property.clientPort", zkPort)
    config.setInt("hbase.rpc.timeout", 20000)
    config.setInt("hbase.client.operation.timeout", 30000)
    config.setInt("hbase.client.scanner.timeout.period", 200000)
    
    val connection = ConnectionFactory.createConnection(config)
    val admin = connection.getAdmin
    
    val tableDesc = new HTableDescriptor(hbaseTable)
    val colDesc = new HColumnDescriptor(colFamily)
    tableDesc.addFamily(colDesc)
    
    if(admin.tableExists(hbaseTable)){
      val table = connection.getTable(hbaseTable)
      val delete = new Delete(Bytes.toBytes(rowKey))
      table.delete(delete)
    }
  }
}

Cấu hình Maven để build:

<sourceDirectory>src/main/scala</sourceDirectory>
<mainClass>ten.package.MainClass</mainClass>

Chạy ứng dụng Canal:

java -jar canal.jar -Xms100m -Xmx100m

Chạy ứng dụng Flink trên Yarn:

/opt/flink-1.6.0/bin/flink run -m yarn-cluster -yn 2 -p 1 /path/to/SyncDB-1.0-SNAPSHOT.jar

Kiến trúc này hoạt động theo nguyên tắc: Canal đóng vai trò như một "slave" ảo của MySQL, lắng nghe các thay đổi từ binlog và chuyển tiếp sang Kafka mà không hề gây tải trọng cho MySQL. Flink sau đó tiêu thụ dữ liệu từ Kafka và thực hiện các thao tác ghi vào HBase một cách hiệu quả.

Thẻ: mysql hbase canal kafka Flink

Đăng vào ngày 4 tháng 6 lúc 22:56