Kết hợp các Observable
Chủ đề chính của bài viết này là các toán tử dùng để kết hợp Observable. Trong ReactiveX, Observable về cơ bản là các luồng dữ liệu có thể quan sát được.
Toán tử And/Then/When
Toán tử And/Then/When cho phép kết hợp hai hoặc nhiều luồng dữ liệu thông qua các thành phần trung gian là Pattern và Plan. Cách thức hoạt động của toán tử này là kết hợp dữ liệu đầu tiên từ tất cả các nguồn, sau đó đến dữ liệu thứ hai, và tiếp tục như vậy cho đến khi một trong các nguồn không còn dữ liệu nữa.
Ví dụ với RxNET:
luongMot = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
luongHai = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(10);
luongBa = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(14);
luongKetHop = Observable.When(
luongMot.And(luongHai)
.And(luongBa)
.Then((dau, giua, cuoi) =>
new {
Mot = dau,
Hai = giua,
Ba = cuoi
})
);
luongKetHop.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Hoàn thành"));
Toán tử CombineLatest
Toán tử CombineLatest kết hợp dữ liệu mới nhất từ hai hoặc nhiều luồng dữ liệu bằng cách gọi một hàm cụ thể. Khi một luồng dữ liệu phát ra dữ liệu mới, nó sẽ được kết hợp với dữ liệu mới nhất đã được phát ra từ các luồng khác.
Ví dụ với RxNET:
nguon1 = new Subject<string>();
nguon2 = new Subject<int>();
nguon1
.CombineLatest(
nguon2,
(trai, phai) => string.Format("{0}-{1}", trai, phai))
.Subscribe(
s => Console.WriteLine("OnNext: {0}", s),
() => Console.WriteLine("OnCompleted"));
nguon1.OnNext("foo");
nguon2.OnNext(100);
nguon2.OnNext(200);
nguon1.OnNext("bar");
Ví dụ với RxJava:
Observables.combineLatest(
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext { i -> println("Luồng trái phát ra") },
Observable.interval(150, TimeUnit.MILLISECONDS)
.doOnNext { i -> println("Luồng phải phát ra") })
{ i1, i2 -> i1.toString() + " - " + i2 }
.take(6)
Ví dụ với RxSwift:
let disposeBag = DisposeBag()
let chuSubject = PublishSubject<String>()
let soSubject = PublishSubject<Int>()
Observable.combineLatest(chuSubject, soSubject) { chuElement, soElement in
"\(chuElement) \(soElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
chuSubject.onNext("A")
chuSubject.onNext("B")
soSubject.onNext(1)
soSubject.onNext(2)
Toán tử Join/GroupJoin
Toán tử Join cho phép kết hợp các mục từ một Observable với các mục từ Observable khác dựa trên thời gian chúng tồn tại.
Ví dụ với RxNET:
var trai = new Subject<int>();
var phai = new Subject<int>();
trai.Join(
phai,
_ => Observable.Never<Unit>(),
_ => Observable.Never<Unit>(),
Tuple.Create)
.Subscribe(
tuple => Console.WriteLine("Trái: {0}, Phải: {1}", tuple.Item1, tuple.Item2));
trai.OnNext(1);
phai.OnNext(10);
phai.OnNext(100);
trai.OnNext(2);
Ví dụ với RxJava:
val trai = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { i -> "L$i" }
val phai = Observable.interval(200, TimeUnit.MILLISECONDS)
.map { i -> "R$i" }
trai
.join(
phai,
Function { i -> Observable.never() },
Function { i -> Observable.timer(0, TimeUnit.MILLISECONDS) },
BiFunction { l, r -> "$l - $r" }
)
.take(10)
Toán tử Merge
Toán tử Merge kết hợp các dữ liệu từ nhiều Observable thành một Observable duy nhất, duy trì thứ tự phát ra của chúng.
Ví dụ với RxNET:
var luong1 = Observable.Interval(TimeSpan.FromMilliseconds(250))
.Take(3);
var luong2 = Observable.Interval(TimeSpan.FromMilliseconds(150))
.Take(5)
.Select(i => i + 100);
luong1.Merge(luong2)
.Subscribe(Console.WriteLine);
Ví dụ với RxJava:
Observable.merge(
Observable.interval(250, TimeUnit.MILLISECONDS).map { "Luồng 1" },
Observable.interval(150, TimeUnit.MILLISECONDS).map { "Luồng 2" })
.take(10)
Ví dụ với RxSwift:
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("A")
subject1.onNext("B")
subject2.onNext("1")
subject2.onNext("2")
Toán tử StartWith
Toán tử StartWith cho phép phát ra một chuỗi dữ liệu được chỉ định trước khi phát ra dữ liệu từ Observable nguồn.
Ví dụ với RxNET:
var nguon = Observable.Range(0, 3);
var ketQua = nguon.StartWith(-3, -2, -1);
ketQua.Subscribe(Console.WriteLine);
Ví dụ với RxJava:
val giaTri = Observable.range(0, 3)
giaTri.startWith(listOf(-1, -2))
Ví dụ với RxSwift:
Observable.of("🐶", "🐱", "🐭", "🐹")
.startWith("1️⃣")
.startWith("2️⃣")
.startWith("3️⃣", "A", "B")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Toán tử Switch
Toán tử Switch kết hợp các Observable nhưng chỉ phát ra dữ liệu từ Observable gần nhất bắt đầu phát dữ liệu.
Ví dụ với RxNET:
var nguon = new Subject<int>();
nguon
.Select(i => Observable
.Interval(TimeSpan.FromSeconds(1)).Take(3)
.Select(l => (l + 1) * i))
.Switch()
.Subscribe(
i => Console.WriteLine("OnNext: {0}", i));
nguon.OnNext(10);
Thread.Sleep(2000);
nguon.OnNext(100);
Ví dụ với RxJava:
Observable.switchOnNext(
Observable.interval(100, TimeUnit.MILLISECONDS)
.map { i ->
Observable.interval(30, TimeUnit.MILLISECONDS)
.map { i2 -> i }
}
)
.take(9)
Ví dụ với RxSwift:
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "⚽️")
let subject2 = BehaviorSubject(value: "🍎")
let bien = Variable(subject1)
bien.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("🏈")
bien.value = subject2
subject2.onNext("🍐")
Toán tử WithLatestFrom
Toán tử WithLatestFrom kết hợp dữ liệu mới nhất từ hai hoặc nhiều Observable, nhưng chỉ khi Observable đầu tiên phát ra dữ liệu mới.
Ví dụ với RxNET:
var nguon1 = new Subject<string>();
var nguon2 = new Subject<int>();
nguon1
.WithLatestFrom(
nguon2,
(l, r) => string.Format("{0}-{1}", l, r))
.Subscribe(
s => Console.WriteLine("OnNext: {0}", s));
nguon1.OnNext("foo");
nguon2.OnNext(100);
nguon2.OnNext(200);
nguon1.OnNext("bar");
Ví dụ với RxJava:
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext { i -> println("Luồng trái phát ra") }.withLatestFrom(
Observable.interval(150, TimeUnit.MILLISECONDS)
.doOnNext { i -> println("Luồng phải phát ra") })
{ i1, i2 -> "$i1 - $i2" }
.take(6)
Ví dụ với RxSwift:
let disposeBag = DisposeBag()
let chuSubject = PublishSubject<String>()
let soSubject = PublishSubject<Int>()
chuSubject.withLatestFrom(soSubject) { chuElement, soElement in
"\(chuElement) \(soElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
chuSubject.onNext("A")
soSubject.onNext(1)
chuSubject.onNext("B")
soSubject.onNext(2)
chuSubject.onNext("C")
Toán tử Zip
Toán tử Zip kết hợp các dữ liệu từ hai hoặc nhiều Observable bằng cách gọi một hàm cụ thể trên các mục tương ứng từ mỗi Observable.
Ví dụ với RxNET:
Char.ConvertFromUtf32((int)i + 97));
//Zip values together
so.Zip(chu, (trai, phai) => new { Trai = trai, Phai = phai })
Ví dụ với RxJava:
Observables.zip(
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext { i -> println("Luồng trái phát ra $i") },
Observable.interval(150, TimeUnit.MILLISECONDS)
.doOnNext { i -> println("Luồng phải phát ra $i") })
{ i1, i2 -> "$i1 - $i2" }
.take(6)
Ví dụ với RxSwift:
let disposeBag = DisposeBag()
let chuSubject = PublishSubject<String>()
let soSubject = PublishSubject<Int>()
Observable.zip(chuSubject, soSubject) { chuElement, soElement in
"\(chuElement) \(soElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
chuSubject.onNext("A")
chuSubject.onNext("B")
soSubject.onNext(1)
soSubject.onNext(2)
chuSubject.onNext("C")
soSubject.onNext(3)