本文是对发布在 Medium 上的文章 Kotlin Coroutines Flow in a nutshell 的翻译。
请注意,虽然本翻译按照
CC BY-SA 4.0
分发,但没有原作者的授权,我本质上不享有翻译权。所以如果想转载本文,你首先应该取得原作者的同意。
在我的上一篇文章中,我阐明了
RxJava 的工作原理。很长一段时间以来,在 Android 中,RxJava
是用于处理流和多线程的事实标准。但现在可以选择受 Google 推荐的,来自
JetBrains 的 Coroutines
(协程)。
虽然很多开发者对 Coroutines
仍存疑虑,但新项目很少依赖于
RxJava
。Flow
替代了它的位置。
本文关于……
在本文中,我会告诉你 Flow
如何工作。包含使用的基本准则和生命周期(lifecycle)管理,也就是说我们在讨论分配(Dispatching)。
本文的目标读者即包括第一次尝试理解 Flow
的人,也包括有一定经验的协程使用者。
链式调用
(context = Dispatchers.Main.immediate).launch() {
CoroutineScope()
doAction("Hey")
flowOf.onEach { doAction() }
.map { it.length }
.onStart { doAction() }
.flowOn(Dispatchers.Default)
.flatMapMerge {
()
doAction(1)
flowOf.flowOn(Dispatchers.Main)
.onEach { doAction() }
}
.flowOn(Dispatchers.IO)
.collect {
()
doAction}
}
}
我们的目标是搞清楚每个动作实际的效果,以何顺序调用,在哪个线程上执行。
基础和生命周期
Flow
的定义只有两个接口:
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
这两个接口是消费者-生产者模式(Consumer & Producer pattern)的基础。
在继续之前,我强烈建议你查看基于这两个接口编写的流 API 示例。
每个 Flow
链代表一组特定操作。每次操作会创建新的
Flow
对象,同时也会存储先前调用的 Flow
实例的引用。在调用 collet
方法之前,运算不会开始(冷流)。
Flow
的生命周期历经 5 个重要阶段:
启动 ⬇️
一个协程在使用特定 Dispatcher 的 CoroutineScope 上启动。 之后:Flow 创建、操作收集 & 数据发射。最终结果将在指定的 Dispatcher 上处理。
Flow 创建 ⬇️
在当前线程,运算从上到下创建。(类似于 Builder 模式。)
操作收集 ⬆️
自下而上进行,每个操作符收集上一个。(译注:也就是创建对上一个操作的引用)
数据发射(Emission)⬇️
数据发射开始于所有操作被成功收集,并最终调用了
collect
时。数据从上往下依次执行操作。取消或完成 整个链死亡 😵
链执行完毕或取消
让我们仔细看看每个阶段。
启动
// Scope 启动
val job = CoroutineScope(Dispatchers.Main.immediate).launch {
doAction()
/*.../*
}
一目了然。我们创建了一个在主线程上运行的协程作用域。doAction()
方法也在这个协程上启动。
Scope 返回了 Job
,可以用于管理生命周期。(例如,调用
cancel()
停止全部工作。)
Immediate Dispatcher(调度器)的作用
译注:这里是 Android Only 的内容,对其他平台不一定适用。非 Android 开发者跳过此节没有影响。
在 Android 中,切换到主线程的唯一方式是使用
Handler/Looper/MessageQueue
链。
这个逻辑隐藏在 HandlerContext
里,同时这也是
Dispatcher.Main
的隐藏逻辑。
// 使用 Looper.mainLooper() 创建 handler
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
(context, block)
cancelOnRejection}
}
假设我们已经在主线程了,但我们仍尝试用 handler.post
切换。
在这种情况下代码不会立刻执行,因为可能会影响用户体验,比如,造成屏幕闪烁。
代码必须等待 MessageQueue
上的其他命令完成。Dispatcher.Main.immediate
主要作用是跳过该队列并立即执行。
Dispatcher
有一个 isDispatchNeeded
方法以解决问题。在 HandlerContext
中,该方法这样实现:
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
Dispatchers.Main.immediate
在
HandlerContext
创建新实例,它将
invokeImmediately
设为 true
。因此,主线程的
Looper
将始终与当前线程的 Looper
比较,从而防止对 handler.post
的非必要调用。
Flow 创建
我们所写的 Flow
的第一个链是
flowOf("hey")
,在底层,可以看到显式创建了 Flow
的实例,并将值存在 lambda 中。lambda 将会在收集阶段被调用。
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
public fun <T> flowOf(value: T): Flow<T> = flow {
(value)
emit}
internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
.block()
collector}
}
}
之后,onEach
方法也会以同样的方式创建。
总之,该拓展函数同样会保留对先前 Flow
的引用。
直到 collect()
的所有其他运算符都这样创建,同时不执行任何真正的运算。
在创建阶段链的行为:
flowOf("Hey")
→ 缓存传递的值
onEach { doAction() }
→ 缓存 lambda 并在发射阶段执行
map {...}
→ 通过映射缓存 lambda
onStart { doAction() }
→ 缓存 lambda 并在收集阶段执行
flowOn(Dispatchers.Default)
→ 缓存赋值到该 Dispatcher 的操作
flatMapMerge {...}
→ 缓存 lambda,并在发射阶段执行
flowOn(Dispatchers.IO)
→ 缓存赋值到该 Dispatcher 的操作
最终,每个操作符,除了第一个,都保留对先前操作的引用,构成一个
LinkedList
创建操作将会在主线程执行。
收集
该过程自下而上进行,并在终端(terminal)操作符调用后立即开始。
Flow 中的终端操作:
collcet()
first()
toList()
toSet()
reduce()
fold()
当我们调用 collect
时,不收集整条链,而只收集上方的
flowOn
。
然后 flowOn
调用 collect
和自身之间的引用操作符,在这个例子里是
flatMapMerge
。这就是为何操作符保留对上游引用的原因。
在收集阶段链的行为:
flowOn(Dispatchers.IO)
→ 在 IO Dispatcher 上创建新的协程,改变创建的协程上下文 → 调用上游的 collect
flatMapMerge {...}
→ 调用上游的 collectflowOn(Dispatchers.Default)
→ 在 Default Dispatcher 上创建新的协程,改变创建的协程上下文 → 调用上游的 collect
onStart { doAction() }
→ 在 Default Dispatcher 上执行操作 → 调用上游的 collect
map {...}
→ 调用上游的 collect
onEach {...}
→ 调用上游的 collect
在所有操作之中,只有 onStart
和 flowOn
被执行。
线程切换了两次:第一次到 IO
,另一次是
Default
。
也就是说 flowOn
将被执行多次,并创建一些协程实例。
⚠️ 然而,flowOn 并不总在底层创建新协程。看下面的例子:
(Dispatchers.Main.immediate).launch {
CoroutineScope("Hey")
flowOf.onStart { doAction() }
.flowOn(Dispatchers.IO)
.onStart { doAction() }
.flowOn(Dispatchers.IO)
.onStart { doAction() }
.flowOn(Dispatchers.IO)
.collect()
}
我们有意地写了多次切换到同样 Dispatcher 的 flowOn
。
结果
// onStart1
_____________________________________: ProducerCoroutine{Active}@53f45ab)
Job: DefaultDispatcher-worker-1,5,main
Thread
// onStart2
_____________________________________: ProducerCoroutine{Active}@53f45ab)
Job: DefaultDispatcher-worker-1,5,main
Thread
// onStart3
_____________________________________: ProducerCoroutine{Active}@53f45ab)
Job: DefaultDispatcher-worker-1,5,main Thread
如你所见,协程实例只被创建了一次,并且绑定了一个线程。
发射
一旦到达没有对其他 Flow
引用的
Flow
,发射过程就开始了。从根 Flow
到最低。
flowOf("Hey")
→ 发射
hey
,Default DispatcheronEach { doAction() }
→ 执行操作,Default Dispatcher
map {...}
→ 映射,Default Dispatcher
onStart { doAction() }
→ 发射
3
,Default DispatcherflowOn(Dispathers.Default)
→ 发射
3
,Default DispatcherflatMapMerge { ... }
这个操作比较棘手,回想一下它的内容:
// ... { flatMapMerge () doAction(1) flowOf.flowOn(Dispatchers.Main) .onEach { doAction() } }
在
flatMapMerge
里面的链也会经历创建、收集、发射。之后,最终的值会被发射到下流。请注意,onEach 将在 IO Dispatcher 上执行。(在块执行之前恢复。)
与 RxJava 不同的是,Kotlin Flow 有上下文保留(Context Preservation)的概念,保证了上层流的上下文不会影响到下层流。
flowOn(Dispatchers.IO)
→ 发射
1
,IO Dispatchercollect
→ 在主线程调用收集器,尽管协程上下文在上游中被改变
结论
collcet()
方法被挂起,这迫使我们提前决定在哪个上下文中处理我们的链的结果。所有操作从上到下创建,从下到上收集。然后被一个
LinkedList
组织起来。发射过程从根 Flow 开始,从上到下执行。一些操作可能在收集阶段执行。例如,写入多少值,
onStart
就执行多少次。flowOn
创建一个新的协程,在参数中传递 Dispatcher 并更改上下文。(但是,如果我们有多个 flowOn 使用同一个调度程序,则实际上只会创建一个协程。)它只影响上游,从而保证符合上下文保留原则。
协程的创建和上下文的改变都可以在收集和发射两个阶段执行。
一个线程可能被多个协程共用。 如果你写了
flowOn
,如果当前上下文不同,肯定会创建一个新的协程。但是,不能保证线程不同。flatMapMerge
/flatMapConcat
仅在父链数据发射期间启动链。在根 Flow 收集过程中不执行任何操作。
在我的下一篇文章中,我将展示协程的底层,以及比常规线程更高效的原因。
敬请关注!
感谢 @kost.maksym 对内容的审阅。