Trong kiến trúc xử lý luồng thời gian thực, việc đảm bảo tính nhất quán và khả năng khôi phục sau sự cố phụ thuộc rất lớn vào cơ chế quản lý offset một cách đáng tin cậy. Bài viết này trình bày cách tích hợp Spark Streaming (phiên bản tương thích với Kafka 1.0.1) theo mô hình Direct Stream, đồng thời lưu trữ và truy vấn offset từ HBase — thay vì sử dụng ZooKeeper như cách tiếp cận truyền thống.
Cấu trúc lưu trữ offset trong HBase
Offset được ghi dưới dạng các hàng (rows) trong bảng HBase với key có định dạng:
topicName:groupId:timestampMillis
Mỗi hàng chứa các cột trong family info, với tên cột là ID phân vùng (partition), và giá trị là offset cuối cùng đã xử lý thành công. Ví dụ:
testDirect:co:1552667605000 column=info:0, value=66
testDirect:co:1552667605000 column=info:1, value=269
testDirect:co:1552667605000 column=info:2, value=67
Môi trường triển khai
- Scala: 2.11.8
- Apache Spark: 2.2.0 (kèm gói spark-streaming-kafka-0-10_2.11)
- Apache Kafka: 1.0.1
- HBase: 1.2.0-cdh5.14.0
Vấn đề thường gặp và nguyên nhân
Khi khởi tạo consumer mà không đăng ký chủ đề hoặc phân vùng cụ thể, Kafka client sẽ ném ngoại lệ:
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
Nguyên nhân nằm ở phương thức poll() trong lớp KafkaConsumer: nếu subscriptions rỗng hoặc chưa được gán phân vùng, hệ thống từ chối thực hiện yêu cầu lấy dữ liệu. Điều này đặc biệt xảy ra khi làm việc với topic mới — nơi chưa tồn tại offset nào trong HBase để khởi tạo Assign.
Giải pháp linh hoạt: Tự động chọn chiến lược khởi tạo
Thay vì cứng nhắc dùng một phương thức duy nhất, mã nguồn được thiết kế để kiểm tra trạng thái tồn tại của offset trong HBase và lựa chọn chiến lược phù hợp:
- Nếu không tìm thấy offset nào → sử dụng
Subscribeđể bắt đầu từ vị trí cấu hình (auto.offset.reset). - Nếu tìm thấy offset → dùng
Assignvới danh sách phân vùng và offset đã lưu.
Mã nguồn tối ưu hóa
Dưới đây là phiên bản đã tái cấu trúc logic, loại bỏ phụ thuộc vào ZooKeeper cho số lượng phân vùng (thay bằng API AdminClient của Kafka), đồng thời cải tiến cách đọc ghi offset để tăng độ rõ ràng và an toàn:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object HBaseOffsetManager {
def main(args: Array[String]): Unit = {
require(args.length == 6, "Usage: <batchSec> <bootstrap> <topics> <group> <hbaseTable> <hbaseConfPath>")
val Array(batchSec, bootstrap, topicList, groupId, hbaseTable, hbaseConfPath) = args
val conf = new SparkConf().setAppName("Kafka-HBase-Offset").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(batchSec.toLong))
val topics = topicList.split(",").toSet
val kafkaParams = Map(
"bootstrap.servers" -> bootstrap,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "false"
)
// Lấy thông tin phân vùng từ Kafka AdminClient (không dùng ZooKeeper)
val adminProps = Map(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap,
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "30000"
)
val admin = AdminClient.create(adminProps)
val partitions = admin.describeTopics(topics.asJava).values().asScala
.map(_.toCompletableFuture.get().partitions().asScala.map(_.partition()).toSet)
.reduce(_ ++ _)
// Đọc offset cuối cùng từ HBase
val lastOffsets = fetchLatestOffsetsFromHBase(hbaseTable, hbaseConfPath, topics.head, groupId)
// Xây dựng DStream linh hoạt
val stream: DStream[ConsumerRecord[String, String]] = lastOffsets match {
case Some(offsetMap) if offsetMap.nonEmpty =>
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](offsetMap.keys, kafkaParams, offsetMap)
)
case _ =>
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
}
// Lưu offset sau mỗi batch
stream.foreachRDD { rdd =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
saveOffsetsToHBase(hbaseTable, hbaseConfPath, topics.head, groupId, offsets)
// Xử lý dữ liệu...
rdd.count()
}
ssc.start()
ssc.awaitTermination()
}
private def fetchLatestOffsetsFromHBase(
table: String,
confPath: String,
topic: String,
group: String
): Option[Map[TopicPartition, Long]] = {
val hconf = HBaseConfiguration.create()
hconf.addResource(confPath)
val conn = ConnectionFactory.createConnection(hconf)
val t = conn.getTable(TableName.valueOf(table))
val scan = new org.apache.hadoop.hbase.client.Scan()
scan.setReversed(true)
scan.setLimit(1)
scan.setFilter(new org.apache.hadoop.hbase.filter.PrefixFilter(s"$topic:$group:".getBytes))
val scanner = t.getScanner(scan)
try {
val result = scanner.next()
if (result == null) None
else {
val rowKey = Bytes.toString(result.getRow)
val timestamp = rowKey.split(":")(2).toLong
val offsetMap = collection.mutable.Map[TopicPartition, Long]()
result.listCells().asScala.foreach { cell =>
val partitionId = Bytes.toString(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
val offsetValue = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
offsetMap += new TopicPartition(topic, partitionId.toInt) -> offsetValue.toLong
}
Some(offsetMap.toMap)
}
} finally {
scanner.close()
t.close()
conn.close()
}
}
private def saveOffsetsToHBase(
table: String,
confPath: String,
topic: String,
group: String,
offsets: Array[OffsetRange]
): Unit = {
val hconf = HBaseConfiguration.create()
hconf.addResource(confPath)
val conn = ConnectionFactory.createConnection(hconf)
val t = conn.getTable(TableName.valueOf(table))
try {
val rowKey = s"$topic:$group:${System.currentTimeMillis()}"
val put = new Put(rowKey.getBytes)
offsets.foreach { o =>
put.addColumn(
Bytes.toBytes("info"),
Bytes.toBytes(o.partition.toString),
Bytes.toBytes(o.untilOffset.toString)
)
}
t.put(put)
} finally {
t.close()
conn.close()
}
}
}
Mã nguồn trên loại bỏ hoàn toàn các thành phần lỗi thời như ZkUtils, thay thế bằng AdminClient chuẩn Kafka để xác định số phân vùng — giúp tăng tính tương thích và giảm phụ thuộc vào hạ tầng ZooKeeper. Đồng thời, logic đọc/ghi offset được tách rời, dễ kiểm thử và mở rộng.