Chuyển Đổi Dòng Dữ Liệu Trong ReactiveX

Giới thiệu về các toán tử biến đổi

Các toán tử chuyển đổi (transforming operators) trong ReactiveX cho phép thay đổi cấu trúc, nội dung hoặc tổ chức của dòng dữ liệu (Observable) mà không làm gián đoạn luồng phát hành. Chúng không lọc hay dừng luồng — thay vào đó, chúng tái cấu trúc cách dữ liệu được nhóm, ánh xạ, tích lũy hoặc phân chia thành các dòng con.

Hàm tiện ích hiển thị kết quả

Để kiểm tra đầu ra một cách nhất quán, ta định nghĩa hàm tiện ích chung cho từng nền tảng:
  • RxJava (Kotlin):
fun <T> Observable<T>.log(tag: String = "Observable") =
    this.subscribe(
        { println("$tag → $it") },
        { e -> println("$tag ❌ ${e.javaClass.simpleName}: ${e.message}") },
        { println("$tag ✅") }
    )
  • Rx.NET (C#):
public static void Log<T>(this IObservable<T> source, string tag = "Observable")
{
    source.Subscribe(
        value => Console.WriteLine($"{tag} → {value}"),
        ex => Console.WriteLine($"{tag} ❌ {ex.GetType().Name}: {ex.Message}"),
        () => Console.WriteLine($"{tag} ✅")
    );
}

Toán tử buffer(): Nhóm phần tử theo khoảng thời gian hoặc số lượng

buffer() thu thập các giá trị từ nguồn vào các mảng (list), sau đó phát hành từng mảng như một phần tử riêng biệt.
  • Nhóm theo số phần tử (cửa sổ trượt):
Observable.range(0, 8)
    .buffer(3, 2) // kích thước = 3, bước nhảy = 2
    .log("sliding-buffer")

Kết quả:

sliding-buffer → [0, 1, 2]
sliding-buffer → [2, 3, 4]
sliding-buffer → [4, 5, 6]
sliding-buffer → [6, 7]
sliding-buffer ✅
  • Nhóm theo thời gian (với khoảng cách mở rộng):
Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(6)
    .buffer(300, 200, TimeUnit.MILLISECONDS)
    .log("time-buffer")

Tạo các mảng dựa trên thời gian bắt đầu và độ dài cửa sổ — ví dụ: [0,1], [1,2,3], [2,3,4], v.v.

Toán tử flatMap() và họ hàng: Tích hợp dòng con

flatMap() nhận mỗi phần tử đầu vào và tạo một Observable mới từ nó, sau đó "dẹp phẳng" toàn bộ các dòng con thành một luồng duy nhất — thứ tự phát hành phụ thuộc vào tốc độ hoàn tất của từng dòng con.
  • Dùng flatMap() để tạo chuỗi ký tự:
Observable.just(1, 2, 3)
    .flatMap { n -> Observable.fromArray(*('A'..'Z').take(n).toTypedArray()) }
    .log("letters")

Kết quả: A, A, B, A, B, C

  • Dùng concatMap() để đảm bảo thứ tự tuần tự:
Observable.just(100L, 200L)
    .concatMap { delay ->
        Observable.timer(delay, TimeUnit.MILLISECONDS)
            .map { "delayed-$delay" }
            .take(1)
    }
    .log("sequential")

Các dòng con được xử lý lần lượt — không chồng chéo.

  • Dùng switchMap() để hủy bỏ dòng cũ khi có dòng mới:
BehaviorSubject.createDefault(100L)
    .switchMap { delay ->
        Observable.interval(delay, TimeUnit.MILLISECONDS)
            .map { "tick-$delay" }
            .take(2)
    }
    .log("switched")

Khi giá trị chủ đề thay đổi, dòng cũ bị hủy ngay lập tức.

Toán tử groupBy(): Phân nhóm theo khóa

groupBy() chia luồng gốc thành nhiều luồng con, mỗi luồng chứa các phần tử có cùng giá trị khóa.
Observable.fromArray("apple", "banana", "avocado", "blueberry")
    .groupBy { it.first() }
    .flatMap { group ->
        group.toList().map { list -> "${group.key}: ${list.size}" }
    }
    .log("group-count")

Kết quả: a: 2, b: 2

Toán tử map()ofType(): Biến đổi và lọc kiểu

map() áp dụng hàm chuyển đổi lên từng phần tử — tương đương với Select trong LINQ.
Observable.fromArray(1, 2, 3, 4)
    .map { it * it }
    .log("squared")
ofType() chỉ giữ lại các phần tử khớp với kiểu mục tiêu — an toàn hơn cast(), vì không gây lỗi khi gặp kiểu không tương thích.
Observable.fromArray(1, "hello", 2.5, 42, true)
    .ofType(Integer::class.java)
    .log("integers-only")

Kết quả: 1, 42

Toán tử scan(): Tích lũy trạng thái theo thời gian

scan() hoạt động giống như reduce(), nhưng phát hành giá trị trung gian ở mỗi bước — hữu ích để xây dựng tổng chạy, min/max chạy hoặc trạng thái tích lũy.
Observable.fromArray(10, -5, 20, -15)
    .scan(0) { acc, x -> acc + x }
    .log("running-sum")

Kết quả: 10, 5, 25, 10

Toán tử window(): Chia luồng thành các cửa sổ quan sát độc lập

Khác với buffer(), window() không đóng gói giá trị thành mảng — thay vào đó, nó phát hành các Observable con riêng biệt. Người dùng phải đăng ký vào từng cửa sổ để đọc giá trị.
Observable.range(0, 6)
    .window(2)
    .flatMap { windowObs -> 
        windowObs.toList().toObservable()
    }
    .log("window-as-list")

Tương đương với buffer(2), nhưng triển khai thông qua cơ chế cửa sổ.

Thẻ: RxJava RxNET RxSwift ReactiveX buffer

Đăng vào ngày 1 tháng 6 lúc 16:36