Giải Vấn Đề Bất Nhất Dữ Liệu Khi Truy Vấn HBase Bằng Spark

Triệu chứng:

Bảng kết quả truy vấn:

+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+ |totalCount|January|February|March|April| May|June|July|August|September|October|November|December|totalMileage| +----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+ | 33808| 0| 0| 0| 0|33798| 0| 0| 0| 0| 0| 0| 0| 79995.0| +----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+

Bảng hiện tại được phân vùng trước thành 10 phân vùng.

Theo dữ liệu theo tháng, tổng số lượng trong bảng kiểm thử hiện tại là: 33798

Tổng số lượng trong hbase cũng là: 33798

Điểm kỳ lạ: Số lượng truy vấn bằng sparkSQL kết nối với hbase là: 33808

Câu lệnh SQL tại thời điểm đó là: select count(1) from orderData

Rất kỳ lạ, vì sau khi truy vấn bằng SQL, tổng dữ liệu đã tăng thêm 10 bản ghi.

============================================================

Nguyên nhân:

Ở đây đã thiết lập giá trị SCAN_BATCHSIZE của hbase, sẽ thiết lập batchsize cho scan. Tài liệu thiết lập này giải thích như sau:

Đặt số lượng giá trị tối đa cần trả về cho mỗi lần gọi next()

Trước đây tôi luôn nghĩ rằng đây là thiết lập số hàng đọc mỗi lần, nhưng thực tế values có vẻ là số cột được đọc, và việc kích hoạt giá trị này sẽ khiến hbase scan trả về kết quả một phần của một hàng;

Sau đó, khi bình luận lại thiết lập này, chương trình có thể chạy bình thường.

Hơn nữa, chúng ta hãy xem xét thiết lập này từ phía mã nguồn hbase. Scan của hbase có hai biến thành viên:

  • private boolean allowPartialResults = false;
  • private int batch = -1;

allowPartialResult này rõ ràng là thiết lập trả về kết quả một phần, vậy batch thì sao? setBatch() sẽ không thiết lập allowPartialResult. Tuy nhiên, trong hàm getResultsToAddToCache() của Scan, nếu giá trị batch lớn hơn 0, nó sẽ thiết lập isBatch=true. Sau đó sẽ có đoạn mã này:

// Nếu người gọi đã chỉ định trong scan của họ rằng họ có thể chấp nhận xem kết quả một phần,
// thì chỉ cần thêm tất cả kết quả vào danh sách. Lưu ý rằng vì việc scan batch cũng trả về kết quả
// cho một hàng theo từng phần, chúng ta coi batch được đặt tương đương với cho phép một phần. Hệ quả
// của việc coi batch tương đương với kết quả một phần là có thể người gọi sẽ nhận được kết quả
// trả về mà số lượng cell trong kết quả ít hơn kích thước batch ngay cả khi đó có thể không phải
// là nhóm cell cuối cùng cho hàng đó.
    if (allowPartials || isBatchSet) {
      addResultsToList(resultsToAddToCache, resultsFromServer, 0,
          (null == resultsFromServer ? 0 : resultsFromServer.length));
      return resultsToAddToCache;
    }

Mã lỗi trước đây:

TableInputFormat.SCAN_BATCHSIZE
lazy val taoScan = {

    val cauHinhHbase = HBaseConfiguration.create()
    cauHinhHbase.set("hbase.zookeeper.quorum", GlobalConfigUtils.hbaseQuorem)
    cauHinhHbase.set(TableInputFormat.INPUT_TABLE, tenBangHbase)
    cauHinhHbase.set(TableInputFormat.SCAN_COLUMNS, cotTruyVan)
    cauHinhHbase.set(TableInputFormat.SCAN_ROW_START, khoaBatDau)
    cauHinhHbase.set(TableInputFormat.SCAN_ROW_STOP, ketThucKhoa)
    cauHinhHbase.set(TableInputFormat.SCAN_BATCHSIZE , "10000")//TODO Điểm này dẫn đến bất nhất dữ liệu khi truy vấn
    cauHinhHbase.set(TableInputFormat.SCAN_CACHEDROWS , "10000")
    cauHinhHbase.set(TableInputFormat.SHUFFLE_MAPS , "1000")

    val rddHbase = sqlContext.sparkContext.newAPIHadoopRDD(
      cauHinhHbase,
      classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result]
    )

    val rs: RDD[Row] = rddHbase.map(tuple => tuple._2).map(ketQua => {

      var giaTri = new ArrayBuffer[Any]()
      truongBangHbase.foreach { truong =>
        giaTri += GiaiPhap.giaiPhap(truong, ketQua)
      }
      Row.fromSeq(giaTri.toSeq)
    })
    rs
  }

Giải pháp:

Loại bỏ thiết lập TableInputFormat.SCAN_BATCHSIZE là được

Kết quả truy vấn sau khi loại bỏ:

+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+ |totalCount|January|February|March|April| May|June|July|August|September|October|November|December|totalMileage| +----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+ | 33798| 0| 0| 0| 0|33798| 0| 0| 0| 0| 0| 0| 0| 79995.0| +----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+

Vấn đề đã được giải quyết~

Thẻ: Spark hbase BigData hadoop DataConsistency

Đăng vào ngày 14 tháng 6 lúc 00:41