### Bộ lập lịch, đa luồng và kiểm thử
Nội dung chính của bài viết tập trung vào cơ chế điều phối luồng xử lý và phương pháp kiểm thử trong ReactiveX.
SubscribeOn / ObserveOn
- SubscribeOn và ObserveOn xác định luồng xử lý cho quá trình phát dữ liệu và quan sát dữ liệu:
- SubscribeOn ảnh hưởng toàn bộ quá trình từ tạo luồng đến hủy bỏ
- ObserveOn chỉ định luồng cho các callback OnNext/OnCompleted/OnError
- Khi kết hợp cả hai, ObserveOn có độ ưu tiên cao hơn
Ví dụ minh họa
Console.WriteLine("Bắt đầu trên threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var nguon = Observable.Create<int>(observer =>
{
Console.WriteLine("Khởi động trên threadId:{0}", Thread.CurrentThread.ManagedThreadId);
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
Console.WriteLine("Hoàn tất trên threadId:{0}", Thread.CurrentThread.ManagedThreadId);
return Disposable.Empty;
});
nguon.Subscribe(
giaTri => Console.WriteLine("Nhận {1} trên threadId:{0}", Thread.CurrentThread.ManagedThreadId, giaTri),
() => Console.WriteLine("Kết thúc trên threadId:{0}", Thread.CurrentThread.ManagedThreadId)
);
Console.WriteLine("Kết nối trên threadId:{0}", Thread.CurrentThread.ManagedThreadId);
Kết quả:
Bắt đầu trên threadId:1
Khởi động trên threadId:1
Nhận 1 trên threadId:1
Nhận 2 trên threadId:1
Nhận 3 trên threadId:1
Kết thúc trên threadId:1
Hoàn tất trên threadId:1
Kết nối trên threadId:1
Loại bộ lập lịch
- ImmediateScheduler: Thực thi ngay lập tức trên luồng hiện tại
- NewThreadScheduler: Tạo luồng mới cho mỗi tác vụ
- ThreadPoolScheduler: Sử dụng pool luồng hệ thống
- TestScheduler: Dành cho kiểm thử logic thời gian
Kiểm thử với TestScheduler
var scheduler = new TestScheduler();
var ketQua = new List<long>();
var luong = Observable.Interval(TimeSpan.FromSeconds(1), scheduler).Take(5);
luong.Subscribe(ketQua.Add);
scheduler.Start();
Console.WriteLine(ketQua.SequenceEqual(new[]{0,1,2,3,4}));
Kết quả:
True
Kiểm soát thời gian ảo
val scheduler = TestScheduler()
val worker = scheduler.createWorker()
worker.schedule { println("Ngay lập tức") }
worker.schedule({ println("Sau 20s") }, 20, TimeUnit.SECONDS)
worker.schedule({ println("Sau 40s") }, 40, TimeUnit.SECONDS)
scheduler.advanceTimeTo(20, TimeUnit.SECONDS)
Kết quả:
Ngay lập tức
Sau 20s
So sánh Immediate vs Trampoline
ScheduleTasks(Scheduler.Immediate);
// Kết quả:
// outer start.
// innerAction start.
// leafAction.
// innerAction end.
// outer end.
ScheduleTasks(Scheduler.CurrentThread);
// Kết quả:
// outer start.
// outer end.
// innerAction start.
// leafAction.
// innerAction end.