Thao tác Tập hợp trong Spark: Aggregate và AggregateByKey

  1. Aggregate ============

Aggregate là một thao tác tập hợp (aggregation) trong Spark. Dưới đây là ví dụ minh họa:

import org.apache.spark.{SparkConf, SparkContext}

class TongHopTest {

  def chayChuongTrinh(args:Array[String]): Unit = {

    // Cấu hình môi trường thực thi
    val cauHinh = new SparkConf().setAppName("Ví dụ Aggregate").setMaster("spark://master:7077").setJars(Seq("D:\\DuAn\\SparkDemo\\SparkDemo.jar"))
    val boNho = new SparkContext(cauHinh)

    var danhSachSo = List(3,7,6,2,4,8,1,5,9,2)
    var ketQua = danhSachSo.par.aggregate((0,0))(
      // Hàm seqOp
      (bienTichLuy, giaTri) => (bienTichLuy._1+giaTri, bienTichLuy._2+1),
      // Hàm combOp
      (khoi1, khoi2) => (khoi1._1+khoi2._1, khoi1._2+khoi2._2)
    )

    println(ketQua)

    boNho.stop
  }

}

Trong ví dụ trên, bienTichLuy khởi đầu là (0,0), giaTri đại diện cho các giá trị trong danh sách danhSachSo. Hàm seqOp cộng giá trị của từng phần tử vào thành phần đầu tiên của Tuple và đếm số lượng phần tử vào thành phần thứ hai. Nếu không có phân vùng, hàm combOp sẽ không được thực thi. Ngay cả khi có phân vùng, kết quả cuối cùng vẫn không thay đổi.

Kết quả chạy:

(47,10)
  1. AggregateByKey

AggregateByKey hoạt động tương tự như Aggregate, nhưng thực hiện việc tập hợp dựa trên giá trị của Key.

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created on 2023/10/15
  */
class KhoiTaoTheoKey {

  def khoiChay(args:Array[String]): Unit = {

    // Cấu hình môi trường thực thi
    val cauHinh = new SparkConf().setAppName("Ví dụ AggregateByKey").setMaster("spark://master:7077").setJars(Seq("D:\\DuAn\\SparkDemo\\SparkDemo.jar"))
    val boNho = new SparkContext(cauHinh)

    val duLieu = List((1,4),(1,5),(1,3),(2,7),(3,2),(3,9))
    val banGhiRDD = boNho.parallelize(duLieu)

    val ketQua : RDD[(Int,Int)] = banGhiRDD.aggregateByKey(0)(
      // seqOp
      math.max(_,_),
      // combOp
      _+_
    )

    ketQua.collect.foreach(println)
    boNho.stop
  }

}

Dựa trên các giá trị Key khác nhau, dữ liệu được chia thành 3 nhóm:

(1) (1,4),(1,5),(1,3);

(2) (2,7);

(3) (3,2),(3,9)。

Mỗi nhóm sẽ thực hiện thao tác seqOp, tức là thực hiện phép toán math.max() giữa giá trị V trong cặp (K,V) và 0, sau đó lấy kết quả để tiếp tục tính toán với V tiếp theo. Với nhóm đầu tiên, quá trình diễn ra như sau:

0, 4 => 4

4, 5 => 5

5, 3 => 5

Vậy kết quả cuối cùng là (1,5). Hàm combOp có nhiệm vụ cộng các giá trị V từ các phân vùng, nhưng trong trường hợp này không có phân vùng nên hàm này không được thực thi.

Kết quả chạy:

(2,7)
(1,5)
(3,9)

Nếu tạo RDD với 3 phân vùng:

val banGhiRDD = boNho.parallelize(duLieu,3)

Kết quả chạy sẽ thay đổi thành:

(3,9)
(1,8)
(2,7)

Lý do là vì một phân vùng trả về (1,4), phân vùng khác trả về (1,3), và hàm combOp sẽ cộng hai giá trị V này lại để được (1,8).

Thẻ: Spark Apache Spark RDD Transformation Aggregate

Đăng vào ngày 21 tháng 6 lúc 04:23