Spark Streaming với Kafka 1.0.1: Quản lý Offset trực tiếp vào HBase

Trong kiến trúc xử lý luồng thời gian thực, việc đảm bảo tính nhất quán và khả năng khôi phục sau sự cố phụ thuộc rất lớn vào cơ chế quản lý offset một cách đáng tin cậy. Bài viết này trình bày cách tích hợp Spark Streaming (phiên bản tương thích với Kafka 1.0.1) theo mô hình Direct Stream, đồng thời lưu trữ và truy vấn offset từ HBase — thay vì sử dụng ZooKeeper như cách tiếp cận truyền thống.

Cấu trúc lưu trữ offset trong HBase

Offset được ghi dưới dạng các hàng (rows) trong bảng HBase với key có định dạng:

topicName:groupId:timestampMillis

Mỗi hàng chứa các cột trong family info, với tên cột là ID phân vùng (partition), và giá trị là offset cuối cùng đã xử lý thành công. Ví dụ:

testDirect:co:1552667605000  column=info:0, value=66  
testDirect:co:1552667605000  column=info:1, value=269  
testDirect:co:1552667605000  column=info:2, value=67

Môi trường triển khai

  • Scala: 2.11.8
  • Apache Spark: 2.2.0 (kèm gói spark-streaming-kafka-0-10_2.11)
  • Apache Kafka: 1.0.1
  • HBase: 1.2.0-cdh5.14.0

Vấn đề thường gặp và nguyên nhân

Khi khởi tạo consumer mà không đăng ký chủ đề hoặc phân vùng cụ thể, Kafka client sẽ ném ngoại lệ:

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions

Nguyên nhân nằm ở phương thức poll() trong lớp KafkaConsumer: nếu subscriptions rỗng hoặc chưa được gán phân vùng, hệ thống từ chối thực hiện yêu cầu lấy dữ liệu. Điều này đặc biệt xảy ra khi làm việc với topic mới — nơi chưa tồn tại offset nào trong HBase để khởi tạo Assign.

Giải pháp linh hoạt: Tự động chọn chiến lược khởi tạo

Thay vì cứng nhắc dùng một phương thức duy nhất, mã nguồn được thiết kế để kiểm tra trạng thái tồn tại của offset trong HBase và lựa chọn chiến lược phù hợp:

  • Nếu không tìm thấy offset nào → sử dụng Subscribe để bắt đầu từ vị trí cấu hình (auto.offset.reset).
  • Nếu tìm thấy offset → dùng Assign với danh sách phân vùng và offset đã lưu.

Mã nguồn tối ưu hóa

Dưới đây là phiên bản đã tái cấu trúc logic, loại bỏ phụ thuộc vào ZooKeeper cho số lượng phân vùng (thay bằng API AdminClient của Kafka), đồng thời cải tiến cách đọc ghi offset để tăng độ rõ ràng và an toàn:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object HBaseOffsetManager {

  def main(args: Array[String]): Unit = {
    require(args.length == 6, "Usage: <batchSec> <bootstrap> <topics> <group> <hbaseTable> <hbaseConfPath>")

    val Array(batchSec, bootstrap, topicList, groupId, hbaseTable, hbaseConfPath) = args

    val conf = new SparkConf().setAppName("Kafka-HBase-Offset").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(batchSec.toLong))

    val topics = topicList.split(",").toSet
    val kafkaParams = Map(
      "bootstrap.servers" -> bootstrap,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> "false"
    )

    // Lấy thông tin phân vùng từ Kafka AdminClient (không dùng ZooKeeper)
    val adminProps = Map(
      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap,
      AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "30000"
    )
    val admin = AdminClient.create(adminProps)
    val partitions = admin.describeTopics(topics.asJava).values().asScala
      .map(_.toCompletableFuture.get().partitions().asScala.map(_.partition()).toSet)
      .reduce(_ ++ _)

    // Đọc offset cuối cùng từ HBase
    val lastOffsets = fetchLatestOffsetsFromHBase(hbaseTable, hbaseConfPath, topics.head, groupId)

    // Xây dựng DStream linh hoạt
    val stream: DStream[ConsumerRecord[String, String]] = lastOffsets match {
      case Some(offsetMap) if offsetMap.nonEmpty =>
        KafkaUtils.createDirectStream(
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Assign[String, String](offsetMap.keys, kafkaParams, offsetMap)
        )
      case _ =>
        KafkaUtils.createDirectStream(
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
    }

    // Lưu offset sau mỗi batch
    stream.foreachRDD { rdd =>
      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      saveOffsetsToHBase(hbaseTable, hbaseConfPath, topics.head, groupId, offsets)
      // Xử lý dữ liệu...
      rdd.count()
    }

    ssc.start()
    ssc.awaitTermination()
  }

  private def fetchLatestOffsetsFromHBase(
    table: String,
    confPath: String,
    topic: String,
    group: String
  ): Option[Map[TopicPartition, Long]] = {
    val hconf = HBaseConfiguration.create()
    hconf.addResource(confPath)
    val conn = ConnectionFactory.createConnection(hconf)
    val t = conn.getTable(TableName.valueOf(table))

    val scan = new org.apache.hadoop.hbase.client.Scan()
    scan.setReversed(true)
    scan.setLimit(1)
    scan.setFilter(new org.apache.hadoop.hbase.filter.PrefixFilter(s"$topic:$group:".getBytes))

    val scanner = t.getScanner(scan)
    try {
      val result = scanner.next()
      if (result == null) None
      else {
        val rowKey = Bytes.toString(result.getRow)
        val timestamp = rowKey.split(":")(2).toLong
        val offsetMap = collection.mutable.Map[TopicPartition, Long]()
        result.listCells().asScala.foreach { cell =>
          val partitionId = Bytes.toString(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
          val offsetValue = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
          offsetMap += new TopicPartition(topic, partitionId.toInt) -> offsetValue.toLong
        }
        Some(offsetMap.toMap)
      }
    } finally {
      scanner.close()
      t.close()
      conn.close()
    }
  }

  private def saveOffsetsToHBase(
    table: String,
    confPath: String,
    topic: String,
    group: String,
    offsets: Array[OffsetRange]
  ): Unit = {
    val hconf = HBaseConfiguration.create()
    hconf.addResource(confPath)
    val conn = ConnectionFactory.createConnection(hconf)
    val t = conn.getTable(TableName.valueOf(table))

    try {
      val rowKey = s"$topic:$group:${System.currentTimeMillis()}"
      val put = new Put(rowKey.getBytes)
      offsets.foreach { o =>
        put.addColumn(
          Bytes.toBytes("info"),
          Bytes.toBytes(o.partition.toString),
          Bytes.toBytes(o.untilOffset.toString)
        )
      }
      t.put(put)
    } finally {
      t.close()
      conn.close()
    }
  }
}

Mã nguồn trên loại bỏ hoàn toàn các thành phần lỗi thời như ZkUtils, thay thế bằng AdminClient chuẩn Kafka để xác định số phân vùng — giúp tăng tính tương thích và giảm phụ thuộc vào hạ tầng ZooKeeper. Đồng thời, logic đọc/ghi offset được tách rời, dễ kiểm thử và mở rộng.

Thẻ: spark-streaming kafka hbase offset-management direct-stream

Đăng vào ngày 27 tháng 6 lúc 07:07