Back
Featured image of post 全面阐明 Kotlin Coroutine Flow

全面阐明 Kotlin Coroutine Flow

翻译文章 - 主要讲述 Coroutine Flow 的工作原理。

本文是对发布在 Medium 上的文章 Kotlin Coroutines Flow in a nutshell 的翻译。

请注意,虽然本翻译按照 CC BY-SA 4.0 分发,但没有原作者的授权,我本质上不享有翻译权。所以如果想转载本文,你首先应该取得原作者的同意。

在我的上一篇文章中,我阐明了 RxJava 的工作原理。很长一段时间以来,在 Android 中,RxJava 是用于处理流和多线程的事实标准。但现在可以选择受 Google 推荐的,来自 JetBrains 的 Coroutines(协程)。

虽然很多开发者对 Coroutines 仍存疑虑,但新项目很少依赖于 RxJavaFlow 替代了它的位置。


本文关于……

在本文中,我会告诉你 Flow 如何工作。包含使用的基本准则和生命周期(lifecycle)管理,也就是说我们在讨论分配(Dispatching)。

本文的目标读者即包括第一次尝试理解 Flow 的人,也包括有一定经验的协程使用者。

链式调用

CoroutineScope(context = Dispatchers.Main.immediate).launch() {
  doAction()
  flowOf("Hey")
      .onEach { doAction() }
      .map { it.length }
      .onStart { doAction() }
      .flowOn(Dispatchers.Default)
      .flatMapMerge {
        doAction()
        flowOf(1)
            .flowOn(Dispatchers.Main)
            .onEach { doAction() }
      }
      .flowOn(Dispatchers.IO)
      .collect {
        doAction()
      }
  }
}

我们的目标是搞清楚每个动作实际的效果,以何顺序调用,在哪个线程上执行。

基础和生命周期

Flow 的定义只有两个接口:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}
public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

这两个接口是消费者-生产者模式(Consumer & Producer pattern)的基础。

在继续之前,我强烈建议你查看基于这两个接口编写的流 API 示例

每个 Flow 链代表一组特定操作。每次操作会创建新的 Flow 对象,同时也会存储先前调用的 Flow 实例的引用。在调用 collet 方法之前,运算不会开始(冷流)。

Flow 的生命周期历经 5 个重要阶段:

  1. 启动 ⬇️

    一个协程在使用特定 Dispatcher 的 CoroutineScope 上启动。 之后:Flow 创建、操作收集 & 数据发射。最终结果将在指定的 Dispatcher 上处理。

  2. Flow 创建 ⬇️

    在当前线程,运算从上到下创建。(类似于 Builder 模式。)

  3. 操作收集 ⬆️

    自下而上进行,每个操作符收集上一个。(译注:也就是创建对上一个操作的引用)

  4. 数据发射(Emission)⬇️

    数据发射开始于所有操作被成功收集,并最终调用了 collect 时。数据从上往下依次执行操作。

  5. 取消或完成 整个链死亡 😵

    链执行完毕或取消

让我们仔细看看每个阶段。

启动

// Scope 启动
val job = CoroutineScope(Dispatchers.Main.immediate).launch {
     doAction()
     /*.../*
}

一目了然。我们创建了一个在主线程上运行的协程作用域。doAction() 方法也在这个协程上启动。

Scope 返回了 Job,可以用于管理生命周期。(例如,调用 cancel() 停止全部工作。)

Immediate Dispatcher(调度器)的作用

译注:这里是 Android Only 的内容,对其他平台不一定适用。非 Android 开发者跳过此节没有影响。

在 Android 中,切换到主线程的唯一方式是使用 Handler/Looper/MessageQueue 链。

这个逻辑隐藏在 HandlerContext 里,同时这也是 Dispatcher.Main 的隐藏逻辑。

// 使用 Looper.mainLooper() 创建 handler
override fun dispatch(context: CoroutineContext, block: Runnable) {
    if (!handler.post(block)) {
        cancelOnRejection(context, block)
    }
}

假设我们已经在主线程了,但我们仍尝试用 handler.post 切换。

在这种情况下代码不会立刻执行,因为可能会影响用户体验,比如,造成屏幕闪烁。

代码必须等待 MessageQueue 上的其他命令完成。Dispatcher.Main.immediate 主要作用是跳过该队列并立即执行。

Dispatcher 有一个 isDispatchNeeded 方法以解决问题。在 HandlerContext 中,该方法这样实现:

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
    return !invokeImmediately || Looper.myLooper() != handler.looper
}

Dispatchers.Main.immediateHandlerContext 创建新实例,它将 invokeImmediately 设为 true。因此,主线程的 Looper 将始终与当前线程的 Looper 比较,从而防止对 handler.post 的非必要调用。

Flow 创建

我们所写的 Flow 的第一个链是 flowOf("hey"),在底层,可以看到显式创建了 Flow 的实例,并将值存在 lambda 中。lambda 将会在收集阶段被调用。

import kotlinx.coroutines.flow.internal.unsafeFlow as flow

public fun <T> flowOf(value: T): Flow<T> = flow {
    emit(value)
}

internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}

之后,onEach 方法也会以同样的方式创建。

总之,该拓展函数同样会保留对先前 Flow 的引用。

直到 collect() 的所有其他运算符都这样创建,同时不执行任何真正的运算。

在创建阶段链的行为:

  1. flowOf("Hey")

    → 缓存传递的值

  2. onEach { doAction() }

    → 缓存 lambda 并在发射阶段执行

  3. map {...}

    → 通过映射缓存 lambda

  4. onStart { doAction() }

    → 缓存 lambda 并在收集阶段执行

  5. flowOn(Dispatchers.Default)

    → 缓存赋值到该 Dispatcher 的操作

  6. flatMapMerge {...}

    → 缓存 lambda,并在发射阶段执行

  7. flowOn(Dispatchers.IO)

    → 缓存赋值到该 Dispatcher 的操作

最终,每个操作符,除了第一个,都保留对先前操作的引用,构成一个 LinkedList

创建操作将会在主线程执行。

收集

该过程自下而上进行,并在终端(terminal)操作符调用后立即开始。

Flow 中的终端操作:

  • collcet()
  • first()
  • toList()
  • toSet()
  • reduce()
  • fold()

当我们调用 collect 时,不收集整条链,而只收集上方的 flowOn

然后 flowOn 调用 collect 和自身之间的引用操作符,在这个例子里是 flatMapMerge。这就是为何操作符保留对上游引用的原因。

在收集阶段链的行为:

  1. flowOn(Dispatchers.IO)

    → 在 IO Dispatcher 上创建新的协程,改变创建的协程上下文 → 调用上游的 collect

  2. flatMapMerge {...} → 调用上游的 collect

  3. flowOn(Dispatchers.Default)

    → 在 Default Dispatcher 上创建新的协程,改变创建的协程上下文 → 调用上游的 collect

  4. onStart { doAction() }

    → 在 Default Dispatcher 上执行操作 → 调用上游的 collect

  5. map {...}

    → 调用上游的 collect

  6. onEach {...}

    → 调用上游的 collect

在所有操作之中,只有 onStartflowOn 被执行。

线程切换了两次:第一次到 IO,另一次是 Default

也就是说 flowOn 将被执行多次,并创建一些协程实例。

⚠️ 然而,flowOn 并不总在底层创建新协程。看下面的例子:

CoroutineScope(Dispatchers.Main.immediate).launch {
    flowOf("Hey")
        .onStart { doAction() }
        .flowOn(Dispatchers.IO)
        .onStart { doAction() }
        .flowOn(Dispatchers.IO)
        .onStart { doAction() }
        .flowOn(Dispatchers.IO)
        .collect()
}

我们有意地写了多次切换到同样 Dispatcher 的 flowOn

结果

// onStart1
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main

// onStart2
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main

// onStart3
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main

如你所见,协程实例只被创建了一次,并且绑定了一个线程。

发射

一旦到达没有对其他 Flow 引用的 Flow,发射过程就开始了。从根 Flow 到最低。

  1. flowOf("Hey")

    → 发射 hey,Default Dispatcher

  2. onEach { doAction() }

    → 执行操作,Default Dispatcher

  3. map {...}

    → 映射,Default Dispatcher

  4. onStart { doAction() }

    → 发射 3,Default Dispatcher

  5. flowOn(Dispathers.Default)

    → 发射 3,Default Dispatcher

  6. flatMapMerge { ... }

    这个操作比较棘手,回想一下它的内容:

    // ...
    flatMapMerge {
      doAction()
      flowOf(1)
        .flowOn(Dispatchers.Main)
        .onEach { doAction() }
    }

    flatMapMerge 里面的链也会经历创建、收集、发射。之后,最终的值会被发射到下流。

    请注意,onEach 将在 IO Dispatcher 上执行。(在块执行之前恢复。)

    与 RxJava 不同的是,Kotlin Flow 有上下文保留(Context Preservation)的概念,保证了上层流的上下文不会影响到下层流。

  7. flowOn(Dispatchers.IO)

    → 发射 1,IO Dispatcher

  8. collect

    → 在主线程调用收集器,尽管协程上下文在上游中被改变

结论

  • collcet() 方法被挂起,这迫使我们提前决定在哪个上下文中处理我们的链的结果。

  • 所有操作从上到下创建,从下到上收集。然后被一个 LinkedList 组织起来。发射过程从根 Flow 开始,从上到下执行。

  • 一些操作可能在收集阶段执行。例如,写入多少值,onStart 就执行多少次。

  • flowOn 创建一个新的协程,在参数中传递 Dispatcher 并更改上下文。(但是,如果我们有多个 flowOn 使用同一个调度程序,则实际上只会创建一个协程。)

    它只影响上游,从而保证符合上下文保留原则。

    协程的创建和上下文的改变都可以在收集和发射两个阶段执行。

  • 一个线程可能被多个协程共用。 如果你写了 flowOn,如果当前上下文不同,肯定会创建一个新的协程。但是,不能保证线程不同。

  • flatMapMerge / flatMapConcat 仅在父链数据发射期间启动链。

    在根 Flow 收集过程中不执行任何操作。

在我的下一篇文章中,我将展示协程的底层,以及比常规线程更高效的原因。

敬请关注!

感谢 @kost.maksym 对内容的审阅。

comments powered by Disqus