Giới thiệu về Data Flow trong Kotlin Coroutines
Data Flow trong Kotlin Coroutines cung cấp một cách mạnh mẽ để xử lý luồng dữ liệu không đồng bộ. Dưới đây là hướng dẫn chi tiết về cách sử dụng và các tính năng của Flow.
1. Cơ bản về Flow
1.1 So sánh Sequence và Flow
Sequence là một cấu trúc dữ liệu đồng bộ, còn Flow được thiết kế để xử lý các tác vụ không đồng bộ với khả năng gọi hàm treo (suspend).
val sequence = generateSequence {
Thread.sleep(1000) // Giả lập nhiệm vụ tốn thời gian
1
}.iterator()
sequence.forEach { println(it) }
Với Flow, bạn có thể dễ dàng thực hiện các tác vụ không đồng bộ:
fun createFlow(): Flow<Int> = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
}
runBlocking {
createFlow().collect { value -> println(value) }
}
1.2 Cách tạo Flow
- flow {}: Khởi tạo Flow bằng khối mã.
- flowOf(): Tạo Flow từ danh sách giá trị cố định.
- asFlow(): Chuyển đổi danh sách hoặc chuỗi thành Flow.
val customFlow = flow {
delay(500)
emit("Data 1")
}
val fixedFlow = flowOf("A", "B", "C")
val listFlow = listOf("X", "Y", "Z").asFlow()
1.3 Flow là dòng lạnh
Flow chỉ thực thi khi có ít nhất một collect. Điều này làm cho nó trở thành dòng "lạnh".
fun coldFlow(): Flow<String> = flow {
println("Flow bắt đầu")
delay(1000)
emit("Kết quả")
}
runBlocking {
val flow = coldFlow()
flow.collect { println(it) }
}
2. Các toán tử Flow
2.1 Toán tử cuối cùng
- collect: Thu thập tất cả các giá trị từ Flow.
- reduce/fold: Kết hợp các giá trị thành một kết quả duy nhất.
- launchIn: Bắt đầu Flow trong CoroutineScope.
val sum = (1..5).asFlow().reduce { acc, value -> acc + value }
val total = (1..5).asFlow().fold(10) { acc, value -> acc + value }
2.2 Xử lý Backpressure
Khi tốc độ phát dữ liệu vượt quá tốc độ xử lý, Flow cung cấp các chiến lược như buffer, conflate để quản lý backpressure.
(1..5).asFlow().onEach { delay(100) }.buffer(2).collect { println(it) }
2.3 Xử lý lỗi
Bạn có thể sử dụng catch để xử lý lỗi và retry để thực hiện lại khi gặp lỗi.
val safeFlow = (1..5).asFlow().map { if (it == 3) throw Exception() else it }
.catch { emit(-1) }
.retry(2)
runBlocking { safeFlow.collect { println(it) } }
3. StateFlow và SharedFlow
3.1 StateFlow
StateFlow là một container trạng thái quan sát được, thích hợp để quản lý trạng thái ứng dụng.
val stateFlow = MutableStateFlow("Default")
stateFlow.value = "New Value"
stateFlow.collect { println(it) }
3.2 SharedFlow
SharedFlow thích hợp cho việc chia sẻ luồng sự kiện giữa nhiều người đăng ký.
val sharedFlow = MutableSharedFlow<String>(replay = 1)
sharedFlow.tryEmit("Event 1")
sharedFlow.collect { println(it) }
3.3 Chuyển đổi Cold Flow thành Hot Flow
Sử dụng phương thức shareIn để biến Flow thành SharedFlow.
val hotFlow = coldFlow().shareIn(CoroutineScope(Dispatchers.Default), SharingStarted.Eagerly, 1)