Hướng Dẫn Các Toán Tử Xử Lý và Lọc Dữ Liệu Trong ReactiveX

Giới thiệu về toán tử lọc

Trong hệ sinh thái ReactiveX, việc kiểm soát dòng dữ liệu phát ra từ Observable là vô cùng quan trọng. Bài viết này sẽ đi sâu vào chi tiết cách sử dụng các toán tử giúp lược bỏ, giới hạn hoặc chọn lọc thông tin dựa trên thời gian, giá trị hoặc vị trí.

Debounce và Throttle

Các toán tử này hoạt động dựa trên nguyên tắc thời gian nghỉ ngơi (inactivity). Chúng chỉ cho phép dữ liệu đi qua nếu không có bất kỳ tín hiệu mới nào xuất hiện sau một khoảng thời gian quy định. Điều này thường được dùng để giảm tải khi người dùng gõ quá nhanh hoặc thay đổi liên tục.

using System;
using System.Linq;
using System.Threading;
using System.Reactive.Subjects;
using System.Reactive.Linq;

var signalStream = new PublishSubject<int>();
var debouncedStream = signalStream.Debounce(TimeSpan.FromMilliseconds(300));

debouncedStream.Subscribe(i => Console.WriteLine($"Received at {DateTime.Now:HH:mm:ss}: {i}"));

for(int i = 1; i <= 5; i++)
{
    Console.WriteLine($"Pushed {i} at {DateTime.Now:HH:mm:ss}");
    signalStream.OnNext(i);
    Thread.Sleep(100);
}

Thread.Sleep(1000); 
// Đợi đủ thời gian debounce để thấy kết quả
Console.WriteLine("Done first batch.");
import io.reactivex.rxjava3.kotlin.*
import java.util.concurrent.TimeUnit

Observable.interval(50, TimeUnit.MILLISECONDS)
    .take(5)
    .concatWith(Observable.interval(400, TimeUnit.MILLISECONDS).take(3))
    .mapIndexed { index, _ -> index + 1 }
    .throttleFirst(1, TimeUnit.SECONDS)
    .observeOn(io.reactivex.schedulers.Schedulers.computation()) // Giả lập environment
    .dump()
/*
onNext: 1
onNext: 6
*/
import Foundation
import RxSwift

let bag = DisposeBag()
Observable<Int>.interval(.milliseconds(100), scheduler: MainScheduler.instance)
    .take(10)
    .sample(Observable.timer(1, .seconds, scheduler: MainScheduler.instance))
    .subscribe(onNext: { print("Sampled Value: \($0)") })
    .disposed(by: bag)

Distinct và DistinctUntilChanged

Để loại bỏ dữ liệu trùng lặp, ReactiveX cung cấp hai cơ chế. distinct() sẽ ghi nhớ toàn bộ lịch sử và bỏ qua mọi giá trị đã từng tồn tại trước đó. Ngược lại, distinctUntilChanged() chỉ so sánh với phần tử liền kề trước đó, giữ lại nếu giá trị thay đổi.

var subject = new Subject<string>();
var uniqueStream = subject.Distinct();

uniqueStream.Subscribe(s => Console.WriteLine($"Unique: {s}"));
subject.OnNext("A");
subject.OnNext("B");
subject.OnNext("A"); // Sẽ bị chặn
subject.OnNext("C");
subject.OnNext("B"); // Sẽ bị chặn (đã gặp trước đây)
subject.OnCompleted();
val distinctValues = Observable.create<String> { emitter ->
    emitter.onNext("Cat")
    emitter.onNext("Dog")
    emitter.onNext("Cat")
    emitter.onComplete()
}

distinctValues
    .distinctUntilChanged { it[0] } // Chỉ lọc dựa trên ký tự đầu
    .dump()
/*
onNext: Cat
onNext: Dog
*/
let disposeBag = DisposeBag()
["🐶", "🐱", "🐱", "🐼"]
    .asObservable()
    .distinctUntilChanged()
    .subscribe(onNext: { print("\$0)") })
    .disposed(by: disposeBag)
/*
🐶
🐱
🐼
*/

Lấy Phần Tử Theo Vị Trí (ElementAt)

Toán tử element_at(n) trả về giá trị nằm ở thứ tự chỉ số n. Nếu thiếu chỉ số đó, mặc định nó sẽ gây lỗi ngoại lệ. Để an toàn hơn, sử dụng phiên bản Default để trả về giá trị mặc định thay vì báo lỗi.

var numberSet = Enumerable.Range(10, 10).AsObservable();
try
{
    var fifthItem = numberSet.ElementAt(5);
    fifthItem.Subscribe(v => Console.WriteLine($"Item 5: {v}"));
}
catch(Exception e)
{
    Console.WriteLine($"Error: {e.Message}");
}
val range = Observable.range(100, 5L)
range.elementAtOrDefault(10, -1)
.dump()
/*
onSuccess: -1
*/
let animals = ["🐶", "🐱", "🐭", "🐹"]
animals.asObservable()
    .element(at: 2)
    .observe(on: MainScheduler.instance)
    .subscribe(onNext: { print("Index 2 is \($0)") })
    .disposed(by: bag)

Lọc theo Điều kiện (Filter / Where)

Một trong những công dụng phổ biến nhất là giữ lại các phần tử thỏa mãn một điều kiện cụ thể (predicate). Cú pháp thường dùng là filter (Java/Kotlin) hoặc where (C#).

val inputNumbers = listOf(1, 2, 3, 4, 5, 6)
Observable.fromIterable(inputNumbers)
    .filter { it >= 4 } // Chỉ giữ lại số lớn hơn hoặc bằng 4
    .dump()
/*
onNext: 4
onNext: 5
onNext: 6
*/
Observable.Range(1, 10)
    .Where(x => x % 3 == 0)
    .Subscribe(x => Console.WriteLine($"Divisible by 3: {x}"));

Truy Vấn Phần Tử Đầu/Từm Cuối (First / Single / Last)

Khi cần lấy duy nhất một phần tử, bạn có nhiều lựa chọn tùy thuộc vào số lượng kết quả mong đợi:

  • First: Lấy phần tử đầu tiên khớp điều kiện.
  • Single: Yêu cầu chuỗi phải có chính xác một phần tử khớp.
  • FirstOrDefault / SingleOrDefault: Tương tự nhưng không throw exception nếu không tìm thấy, mà trả về giá trị null/vô giá trị.
var sequence = Generate(0, i => i < 5, i => ++i, i => i);
sequence.FirstAsync().Subscribe(
    v => Console.WriteLine($"First: {v}"),
    ex => Console.WriteLine($"Error: {ex.Message}"));
val testObservable = Observable.just("apple", "banana")
testObservable.singleElement() // Trả về "apple"
.dump()
val emptyObservable = Observable.empty<String>()
emptyObservable.firstOrError() // Ném ngoại lệ

IgnoreElements

Toán tử này bỏ qua hoàn toàn dữ liệu phát ra, chỉ còn lại tín hiệu onComplete hoặc onError. Thường dùng khi bạn muốn subscribe vào stream nhưng chỉ quan tâm đến trạng thái kết thúc của nó.

var source = new Subject<int>();
var result = source.IgnoreElements();

result.Subscribe(_ => {}, () => Console.WriteLine("Finished!"));
source.OnNext(99);
source.OnNext(100);
source.OnCompleted(); 
// Output chỉ hiển thị Finished!

Wait

Mặc dù ít được dùng trực tiếp trong Reactive stream chaining phức tạp, Wait (trong RxBus hoặc một số thư viện wrapper) cho phép chặn tiến trình hiện tại cho đến khi stream kết thúc và trả về giá trị cuối cùng. Đây là phương thức chuyển đổi stream thành giá trị đơn lẻ ngay lập tức.

var waitSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
try {
    var finalValue = waitSource.Wait(); // Chờ tới 3 giây
    Console.WriteLine($"Wait Result: {finalValue}");
} catch(Exception e) {
    Console.WriteLine($"Failed: {e.Message}");
}

Sample

Không giống như Debounce chờ sau cùng, Sample lấy mẫu định kỳ. Nếu có nhiều sự kiện gửi trong một chu kỳ thời gian, chỉ sự kiện cuối cùng của chu kỳ đó được phát hành.

Observable.interval(100, TimeUnit.MILLISECONDS)
    .sample(2, TimeUnit.SECONDS) // Mỗi 2 giây lấy 1 lần
    .take(10)
    .dump()

Skip và SkipLast

Để cắt bỏ phần đầu hoặc đuôi của chuỗi dữ liệu:

let data = [0, 1, 2, 3, 4, 5]
data.asObservable()
    .skip(2) // Bỏ qua 0, 1
    .dropLast(1) // Bỏ qua 5 cuối cùng
    .subscribe(onNext: { print($0) })
    .disposed(by: bag)
/*
2
3
4
*/
Observable.range(1, 10)
    .skipLast(3)
    .dump()
/*
onNext: 1
...
onNext: 7
*/

Take và TakeLast

Ngược lại với Skip, Take giới hạn số lượng phần tử đầu tiên nhận được.

Observable.Range(0, 100)
    .Take(5)
    .Subscribe(i => Console.Write($"{i}, "));
Observable.just("A", "B", "C", "D", "E")
    .takeLast(2)
    .dump()
/*
onNext: D
onNext: E
*/

Thẻ: RxJava ReactiveX ObservableOperators csharp SwiftLanguage

Đăng vào ngày 29 tháng 6 lúc 23:29