Các toán tử xử lý dòng dữ liệu kết nối (connectable observables) đóng vai trò then chốt trong việc kiểm soát thời điểm phát hành và chia sẻ dữ liệu giữa nhiều người quan sát. Khác với các dòng dữ liệu thông thường, dòng kết nối cho phép tách biệt giai đoạn "chuẩn bị" và "phát hành", từ đó hỗ trợ các mô hình như phát sóng đồng thời, cache theo thời gian, hoặc đồng bộ hóa trạng thái.
Phân biệt ba loại dòng dữ liệu
- Dòng lạnh (Cold Observable): Chỉ bắt đầu tạo dữ liệu khi có ít nhất một người đăng ký. Mỗi lần đăng ký tạo ra một luồng độc lập — không chia sẻ trạng thái hay giá trị đã phát. Ví dụ:
Observable.interval(),Observable.fromIterable(). - Dòng nóng (Hot Observable): Phát dữ liệu liên tục ngay sau khi được khởi tạo, bất kể có người đăng ký hay không. Tất cả người đăng ký đều nhận cùng một bản sao dữ liệu tại thời điểm họ tham gia — nhưng bỏ lỡ mọi giá trị phát trước đó.
- Dòng kết nối (Connectable Observable): Là dạng trung gian — được xây dựng từ dòng lạnh nhưng chưa phát dữ liệu. Nó chỉ bắt đầu phát khi được gọi hàm
connect(), lúc đó mới chuyển thành dòng nóng.
Toán tử cơ bản để tạo và điều khiển dòng kết nối
Trong ReactiveX, các toán tử như publish(), publishLast(), và multicast() biến một dòng lạnh thành dòng kết nối. Chúng khác nhau ở cách chọn Subject làm bộ đệm trung gian:
publish()→ sử dụngPublishSubject: chỉ phát giá trị từ thời điểm kết nối trở đi.publishLast()→ sử dụngAsyncSubject: chỉ phát giá trị cuối cùng khi dòng hoàn tất.replay()→ sử dụngReplaySubject: lưu trữ và phát lại một phần hoặc toàn bộ lịch sử giá trị.multicast(subject)→ cho phép tùy chỉnh chủ thể đệm (subject) cụ thể.
Ví dụ minh họa với RxJava (đã tái cấu trúc logic để tránh trùng lặp):
Observable<Long> baseStream = Observable
.interval(300, TimeUnit.MILLISECONDS)
.map(i -> i * 2)
.take(4);
// Biến thành dòng kết nối, dùng PublishSubject
ConnectableObservable<Long> shared = baseStream.publish();
// Đăng ký trước khi kết nối → sẽ không nhận giá trị nào
Disposable first = shared.subscribe(v -> System.out.println("Lần 1: " + v));
// Kết nối → bắt đầu phát dữ liệu
shared.connect();
// Sau 800ms, đăng ký thêm người quan sát thứ hai
Completable.timer(800, TimeUnit.MILLISECONDS)
.andThen(shared.subscribe(v -> System.out.println("Lần 2: " + v)))
.subscribe();
// Kết quả: Lần 2 chỉ nhận giá trị từ thời điểm kết nối (0, 2, 4...) và giá trị đang phát tại thời điểm đăng ký.
Tự động quản lý kết nối với RefCount và Share
Khi sử dụng refCount(), dòng kết nối được gắn cơ chế đếm tham chiếu: số lượng người đăng ký tăng lên → tự động gọi connect(); giảm về 0 → tự động hủy kết nối. Điều này giúp loại bỏ việc gọi thủ công connect() và dispose() trên đối tượng kết nối.
share() là viết tắt của publish().refCount(), phù hợp khi cần hành vi giống dòng lạnh về mặt API, nhưng vẫn đảm bảo chia sẻ dữ liệu thực tế dưới nền.
Ví dụ với RxSwift (cải tiến cấu trúc):
let ticker = Observable<Int>
.timer(.seconds(1), scheduler: ConcurrentDispatchQueueScheduler(qos: .default))
.map { $0 % 7 }
.share(replay: 3, scope: .forever)
// Người quan sát A bắt đầu ngay lập tức
_ = ticker.subscribe(onNext: { print("[A] → \($0)") })
// Người quan sát B bắt đầu sau 2.5 giây → sẽ nhận 3 giá trị gần nhất đã được lưu
DispatchQueue.main.asyncAfter(deadline: .now() + 2.5) {
_ = ticker.subscribe(onNext: { print("[B] → \($0)") })
}
Lưu trữ và phát lại dữ liệu với Replay và Cache
replay() tạo dòng kết nối có khả năng lưu tạm dữ liệu vào bộ nhớ đệm. Có thể cấu hình theo số lượng phần tử (replay(count)) hoặc khoảng thời gian (replay(time, unit)). Khi người đăng ký mới tham gia, họ sẽ nhận lại toàn bộ dữ liệu trong phạm vi bộ đệm.
cache() là phiên bản đặc biệt của replay() dành riêng cho dòng kết thúc (finite observable). Nó lưu toàn bộ giá trị từ đầu đến cuối và phát lại nguyên bản mỗi lần đăng ký — tương đương replay(Long.MAX_VALUE) nhưng tối ưu hơn về tài nguyên.
Ví dụ với RxJava (viết lại hoàn toàn):
Observable<String> source = Observable
.just("Alpha", "Beta", "Gamma", "Delta")
.delay(200, TimeUnit.MILLISECONDS)
.cache(); // Lưu toàn bộ bốn chuỗi
// Lần đăng ký 1: nhận đầy đủ
source.subscribe(s -> System.out.println("L1: " + s));
// Lần đăng ký 2 sau 500ms: vẫn nhận lại toàn bộ, không delay
Completable.timer(500, TimeUnit.MILLISECONDS)
.andThen(source.subscribe(s -> System.out.println("L2: " + s)))
.subscribe();