Back
Featured image of post Kotlin Coroutine 协程 - 04 Async / Await

Kotlin Coroutine 协程 - 04 Async / Await

Kotlin 协程中的 Async / Await 与 结构化并发

本文是对 Kotlin Coroutines by Tutorials By Filip Babić and Nishant Srivastava 书本内容的转述和笔记书本内容是受原版权保护的内容

上一篇:Kotlin Coroutine 协程 - 03 挂起函数

Chapter 4: Async / Await

原文废话长达 500 行,我先用一个例子区分一下概念。

请读者思考,suspendCoroutineawait / async 之间的区别

试用以下代码,比较 suspendawait 的区别

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.launch
import kotlin.coroutines.suspendCoroutine
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime


@OptIn(ExperimentalTime::class)
fun main() {

    GlobalScope.launch {
        println(measureTime {
            val a = async { getSomeDataFromNetwork() }
            val b = async { getSomeDataFromNetwork() }
            println(a.await() + "1")
            println(b.await() + "11")
        })
    }

    GlobalScope.launch {
        println(measureTime {
            val a = getSomeDataBySuspendCoroutine()
            val b = getSomeDataBySuspendCoroutine()
            println(a + "2")
            println(b + "22")
        })
    }

    Thread.sleep(10000)
}

suspend fun getSomeDataBySuspendCoroutine(): String {
    return suspendCoroutine {
        it.resumeWith(Result.runCatching { getSomeDataFromNetwork() })
    }
}

fun getSomeDataFromNetwork(): String {
    Thread.sleep(3000)
    return "data "
}

不难发现,suspend 就是结合 Continuation 和整个协程上下文组成了一个简易回调系统,你可以看成挂起点相当于进入了一段异步代码,然后 Continuation 会自动在挂起点异步代码执行结束之后,进入挂起点之后的代码(相当于就是通过 Continuation 机制抹平了回调的阶梯代码),但是这并不是并行的,比如在上述代码中,getSomeDataBySuspendCoroutine 赋值的 a 与 b 之间,你可以理解为他们是嵌套包含的回调关系,所以并不是同时进行的并发,而在上面 async,便是在 suspend 提供的自动回调上提供的异步并发的支持。

综上,suspend 提供回调支持,async 提供异步并发支持。

从下面开始原文的长篇大论,原文最重要的是结构化并发的思想


到目前为止,你已经看到了如何利用协程和可挂起函数来桥接线程和执行异步工作,这些工作不会给你的程序增加很多开销。你还看到了如何从基于回调的 API 迁移到基于协程的 API,他们可以写出看上去阻塞但实际上异步的代码。

在本文中,你将看到如何建立类似的机制,这些机制不是阻塞的,可以异步和并行地工作。它们也可以返回值,就像你调用一个标准函数一样。听起来好得不象话?那么你会看到所有这些功能实际上是一个古老的概念,所以让我们开始吧。

此处省略其介绍历史的 Promises 和 Future,感兴趣可以自己查找资料

Using Async / Await 使用异步 / 等待模型

现在,你可能想知道 Async / Await 模式在 Kotlin 协程 API 中是如何工作的。与 Future 模式非常接近,Kotlin 的 async 返回一个 Deferred<T>。就像你会有一个 Future<T>,Deferred Value 只是包裹了一个即将获得的对象,目前而言,里面什么都没有,最终你可能可以获取到里面的值,也可能获取不到。你需要通过 await 请求 async,告诉它我现在需要使用这个值,函数在这里将会开始挂起(也就是挂起点进入异步,下半代码变成回调),当 await 最终请求到了值之后,Continuation 会恢复并开始运行 await 之后的代码。

该模式被称为 Async / Await 是有原因的,因为需要两个函数同时使用──async,用于准备和包装值,而 await,用于请求使用值。让我们看看这两个函数的签名是什么。

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

当你调用 async 时,你可以传递给它一个 CoroutineContext,将其与某个 JobDispatcher 绑定。你也可以用 CoroutineStart 参数以不同的模式启动它。但是,最重要的是,你必须传入一个 lambda 块,它可以访问你调用函数的 CoroutineScope,并需要返回一个值,它将尝试存储在 Continuation 中。它通过创建一个新的 DeferredCoroutine,用 lambda 块开始,并返回前述的协程。

这个函数将值包裹在一个协程中,它实现了 Deferred<T> 接口,你将在后面调用 await。一旦你调用 await,协程将尝试为你产生值或挂起,直到它出现。让我们检查一下 await 的签名,看看那里是否有什么有趣的东西。

/**
* Awaits for completion of this value without blocking a thread
* and resumes when deferred computation is complete,
* returning the resulting value or throwing the
* corresponding exception if the deferred was cancelled.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled
* or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
*
* This function can be used in [select] invocation 
* with [onAwait] clause.
* Use [isCompleted] to check for completion of this
* deferred value without waiting.
*/
public suspend fun await(): T

这个函数本身非常简单,但它的思想是很聪明的,你可以挂起整个函数或循环程序,而不是实际阻塞一个线程,只是让它在值准备好后再进行解析。现在是时候使用这种方法了,将目前的一个阻塞代码片段迁移到 async

fun main() {
  val userId = 992
  
  getUserByIdFromNetwork(userId) { user ->
    println(user)
  }
}

private fun getUserByIdFromNetwork(userId: Int, onUserReady: (User) -> Unit) {
  Thread.sleep(3000)

  onUserReady(User(userId, "Filip", "Babic"))
}

data class User(val id: Int, val name: String, val lastName: String)

这段代码试图从一个模拟的网络调用中获得一个用户,并将其打印出来。但问题是,它调用了 Thread.sleep(),使执行停止了 3 秒。就像你有一个阻塞的网络调用一样,你必须等待数据回来,然后才能使用它。更恶心的是,它还使用了一个回调,以便在数据准备好后将其传回给调用者。接下来,你要重构这个程序,使用协程 API 中的 asyncawait

getUserByIdFromNetwork 的代码改为如下:

private fun getUserByIdFromNetwork(userId: Int) = GlobalScope.async {
  Thread.sleep(3000)

  User(userId, "Filip", "Babic")
}

首先,你从参数中删除了回调,其次,你返回了一个 GlobalScope.async 块作为该函数的返回值。现在,该函数返回了一个值,而且它不依赖于回调来消耗它。

最后,你必须更新 main 中的代码,以使用新版本的 getUserByIdFromNetwork

fun main() {
  val userId = 992
  val userData = getUserByIdFromNetwork(userId)
  
  println(userData)
}

如果你运行这段代码,将会有类似如下的输出

DeferredCoroutine{Active}@6eee89b3

这是因为 getUserByIdFromNetwork 现在返回一个 Deferred<User>。为了获得用户,你必须对其调用 await。但是 await 是一个可挂起的函数,要调用 await,你必须在另一个可挂起函数或协程构建器中,所以你必须把上面的代码包在一个 launch 中:

fun main() {
  val userId = 992

  GlobalScope.launch {
    val userData = getUserByIdFromNetwork(userId)

    println(userData.await())
  }

  Thread.sleep(5000)
}

该流程与先前的代码类似,只是你还需要调用 await 来返回用户,以便将其打印出来。你已经成功地将基于回调的代码迁移到了 Async / Await 模式中。

看看我们做了那些变化: - 首先,你从函数中删除了回调,因为你将会返回一个值。 - 然后,你必须从 lambda 块中返回 async 的结果和 User。 - 最后,你必须把 await 包在一个协程中,因为它是一个可挂起函数。

这三个步骤是你将代码迁移到 Async / Await 所需要做的一切。

现在,它如何工作是另一回事。如前所述,它创建了一个协程,并用 Deferred<T> 值对其进行包装。通过 await 接口,你可以请求这个值并从当前代码开始挂起。

一旦你调用 await,你就挂起了函数的调用,有效地避免了线程的阻塞。然后你等待 lambda 块的执行,以便使用内部存储的值。当值准备好后,函数就不再被挂起,你的代码就会正常继续。

现在,这里的很多工作是由 Deferred 类型决定的,所以让我们看看它到底是什么。

Deferring Values 「延迟」值

每个 async 块都返回一个 Deferred<T>。这是驱动协程 API 的核心机制,理解它的工作原理非常重要。

当你调用 async,该函数会返回一个 Deferred。它通过创建一个 DeferredCoroutineLazyDeferredCoroutine 来实现。这样的协程有一个泛型推理,也实现了 Continuation<T> 接口,允许拦截执行流,并将值一直传递到调用点,就像 suspendCoroutine 一样。这类似于你以前见过的未来模式的工作方式。

一旦创建了协程,除非它的 CoroutineStartCoroutineStart.LAZY,否则它将立即启动。代码将开始在你用 Dispatchers 的上下文参数声明的线程中执行。一旦代码完成执行并产生一个值,它将被存储在内部。如果在任何时候,你调用 await,这将是一个可挂起函数调用,它将创建一个新的 Continuation 和执行流,等待值准备好使用。如果它还没有准备好,则会该函数将挂起进入等待。如果值已经被提供了,你会立即得到它。

你还可以检查一个「延迟」值的状态,因为它也实现了 Job 接口。你可以检查像 isActiveisCompleted 这样的标志来了解它当前的生命周期状态。如果 Job 被取消了,你可以用 getCompleted 来接收一个值或一个异常。如果 Job 没有被取消,你还可以通过 getCompletionExceptionOrNull,它会返回 CancellationExceptionnull。使用这些函数,你可以检查「延迟」值的完成状态。

因此,一个很好的方式来解释什么是「延迟」,是它是一个有结果的工作。工作可以并行运行,它可能产生也可能不产生一个结果值,它可以被取消并与其他工作联合。这提供了一个强大的 API,你可以按照你的意愿去做。还有一种使用「延迟」值的方法是,将他们与多种参数组合在一起。

Combining Multiple Deferred Values 组合多个「延迟」值

能够创建「延迟」值,在后台构建,但可以在主线程上通过一个函数调用进行访问,这是一件了不起的事情。但异步的真正力量在于能够将两个或更多的延迟值结合到一个函数调用中。让我们看看如何做到这一点。

到目前为止,你已经使用了一个模拟网络请求的例子,但现在是时候对这个例子进行扩展。你可能已经注意到项目中的 users.txt。这是一个包含 13,000 行文本的文件。大多数行包含了建立用户所需的信息,一个 ID,一个名字和一个姓氏。有些行是空的,有些行没有全部三个项目,有些行只是普通的胡言乱语。这背后的想法是读取整个文件,解析和分割每一行,并从中创建用户。之后,你将使用用户列表和你从模拟网络调用中得到的用户,看看该用户是否被存储在文件中。

有了这个,你就可以看到两个「延迟」值如何在一个函数调用中被引出和使用。现在,导航回到 AsyncAwait.kt。在那里,将以下代码添加到该文件中:

private fun readUsersFromFile(filePath: String) =
    GlobalScope.async {
      println("Reading the file of users")
      delay(1000)

      File(filePath)
          .readLines()
          .asSequence()
          .filter { it.isNotEmpty() }
          .map {
            val data = it.split(" ") // [id, name, lastName]

            if (data.size == 3) data else emptyList()
          }
          .filter {
            it.isNotEmpty()
          }
          .map {
            val userId = it[0].toInt()
            val name = it[1]
            val lastName = it[2]

            User(userId, name, lastName)
          }
          .toList()
    }
    
private fun checkUserExists(user: User, users: List<User>): Boolean {
  return user in users
}

有了这个,你就可以看到两个「延迟」值如何在一个函数调用中被引出和使用。现在,导航回到 AsyncAwait.kt。在那里,将以下代码添加到该文件中:

fun main() {
  val userId = 992

  GlobalScope.launch {
    val userDeferred = getUserByIdFromNetwork(userId)
    val usersFromFileDeferred = readUsersFromFile("users.txt")

    println("Finding user")
    val userStoredInFile = checkUserExists(
        userDeferred.await(), usersFromFileDeferred.await()
    )

    if (userStoredInFile) {
      println("Found user in file!")
    }
  }

  Thread.sleep(5000)
}

在上面的例子中: - 你通过调用 getUserByIdFromNetwork 创建一个 userDeferred。这将启动代码块,等待 3 秒钟来返回一个用户。 - 然后,你将 UsersFromFileDeferred 作为初始值,它将返回一个存储在 users.txt 中的用户列表。 - 一旦这两个值都被激活,你就可以调用 checkUserExists 来查看你从网络调用中收到的用户是否与从文件中加载的任何用户相匹配。 - checkUserExists 接收两个参数:一个是你需要查询的用户,另一个是需要查询的用户列表。

你传递给函数的参数是你之前准备的两个「延迟」值的 await 结果,因此,你有一行代码挂起了函数,等待这两个值。

这是使用多个递延值的正确方法。你在程序中有效地创建了一个挂起点,挂起了两个函数。

或者你可以让 checkUserExists 挂起,然后在其函数内部进行等待,但由于这些用户信息可能不仅仅作用于检查是否存在,或许你还可以拿它们去做一些别的事情,因此等待这些值会更方便。

如果你运行这段代码,你会立即看到以下输出。

Finding user
Reading the file of users
Retrieving user from network

大约 3 秒钟后,你应该看到 Found user in file!。这是因为围绕 Deferred 创建的协程并不偷懒,它们会立即启动──但只有当你等待取值时才会暂停代码。在调用 await 后,你将收到的值传递给 checkUserExists,然后你收到用户存在于文件中的输出。

使用这种方法,你可以结合任何数量的「延迟」值,反过来,实现智能和简单的并行,这并不是建立在数据流的回调概念之上的,所以代码是非常容易理解的,因为它类似于顺序的、同步的代码,即使它在幕后可能是完全异步的。这就是协程和 Async / Await 模式的真正力量。

然而,这段代码有一点并不理想的地方在于:

在涉及到取消和资源管理时,这段代码的结构并不那么好。让我们看看如何通过遵循 Jetbrains 的思想体系将这段代码打磨得完美无缺。

Being Cooperative and Structured 结构化并发

上面的代码例子构建得相当好,它们的目的是解释你如何并行地对多个值进行准备,并在此之后将它们传递给函数。

但是,如果出了问题或异步块构建不当,你就会阻塞一个线程,并有可能冻结整个系统。例如,如果你的异步块包含一个 while 循环,而条件中没有 break 策略,你的函数将永远不会返回值。此外,如果你建立的函数是做繁重的操作,并且需要大量的时间来完成,你应该能够在其执行的任何时候取消它们。否则,你就有可能取消它们的父 Job,但不能取消 Job 本身。那么你就会发现,即使它的 Job 已经被取消,但代码仍然在运行并占用着资源。

这就是为什么你的代码应该始终是合作式的。

在发布协程一段时间后,Jetbrains 开始看到各种各样的用法和库被围绕它们建立起来。Jetbrains 随后发布了一篇文章,澄清了一些细节,解释了人们最初对协程的例子和想法并不是 100% 正确。以你上面使用的函数为例,进行了扩展:

private suspend fun getUserByIdFromNetwork(userId: Int) = GlobalScope.async {
  println("Retrieving user from network")
  delay(3000)
  println("Still in the coroutine")

  User(userId, "Filip", "Babic") // we simulate the network call
}

如果你调用这个函数,这个简单的片段将在 3 秒后返回一个用户。由于它是一个挂起的函数,你需要把它包在一个协程构建器中,像这样。

fun main() {
  GlobalScope.launch {
    val deferredUser = getUserByIdFromNetwork(130)

    println(deferredUser.await())
  }
}

但是,如果你在 launch 块开始执行后取消了该 Job,会发生什么? getUserByIdFromNetwork 仍然会运行 3 秒钟并返回一个值,尽管父 Job 被取消了。这就造成了计算中的时间和资源的浪费,这些时间和资源可以更好地用于其他地方。

这就是为什么 Jetbrains 提出了结构化并发和合作式代码的想法。这个想法是编写反映其调用者状态的代码,并建立依赖于其父方状态的程序。简单地说,如果父 Job 被取消了,其子 Job 也应该被取消。默认情况下,这种行为确实存在于协程 API中,你已经看到了。

但你仍然可以编写不执行这种行为的代码。例如,如果你将上面的片段调整为以下内容。

fun main() {
  val launch = GlobalScope.launch {
    val dataDeferred = getUserByIdFromNetwork(1312)
    println("Not cancelled")
    // do something with the data

    val data = dataDeferred.await()
    println(data)
  }

  Thread.sleep(50)
  launch.cancel()

  while (true) { // stops the program from finishing
  }
}

你会看到 getUserByIdFromNetwork 一定会被完整执行,不管你是否取消了它所调用的循环程序。这是因为上面的片段不是合作性的,不允许代码在需要时挂起和取消。它不关心代码出发的原点,也不关心其父级的 Scope 和 Context。构建这个函数的正确方法是如下。

private suspend fun getUserByIdFromNetwork(
    userId: Int,
    parentScope: CoroutineScope) =
    parentScope.async {
      if (!isActive) {
        return@async User(0, "", "")
      }
      println("Retrieving user from network")
      delay(3000)
      println("Still in the coroutine")

      User(userId, "Filip", "Babic") // we simulate the network call
    }

该函数现在将父 CoroutineScope 实例作为一个参数,并从那里启动一个异步块。此外,它还从父 Job 中检查 isActive,这样,如果父 Job 被取消,它就一定不会继续执行。如果你再次从上面启动 main,传入父作用域,代码只会打印出 Retrieving user from network and Not Cancelled,之后它就不会继续等待 3 秒钟,或是向已经被终止的协程返回一个值。

最后,合并两个值的代码也可以改进。

GlobalScope.launch {
  val userDeferred = getUserByIdFromNetwork(userId)
  val usersFromFileDeferred = readUsersFromFile("users.txt")

  println("Finding user")
  val userStoredInFile = checkUserExists(
      userDeferred.await(), usersFromFileDeferred.await()
  )

  if (userStoredInFile) {
    println("Found user in file!")
  }
}

你可能会看到和以前一样的问题。你正在引出两个「延迟」值,并在下面使用它们,而没有检查 Job 的当前状态。由于这是在 GlobalScope 内 launch 的,除非你把返回的 Job 存储在某个地方,以便以后取消它,否则你将有效地创建一个 射后不理的(Fire-and-forget)函数,这可能会占用宝贵的资源。你可能在这里看到了一个模式。你应该始终控制你启动的 Job 和你启动它们的协程作用域。协程作用域会将协程绑定到一个对象的生命周期中。这就是为什么你不应该真正广泛地使用 GlobalScope,而应该提供自定义作用域

但是你如何保证你用来启动协程的 CoroutineScope 是正确的?通过自己实现该接口!

合作代码和结构化并发指南中指出,你应该将你的协程限制在具有明确定义的生命周期的对象上,比如 Android 应用环境中的 Activity。所以让我们在一个假的类中实现 CoroutineScope。创建一个新的 Kotlin 类,并将其命名为 CustomScope.kt。把里面的代码改成下面的样子。

class CustomScope : CoroutineScope {

  private var parentJob = Job()

  override val coroutineContext: CoroutineContext
    get() = Dispatchers.Main + parentJob
}

一旦你实现了这个接口,你就必须提供一个 CoroutineContext,这样协程就有一个可以运行的环境。建议的实现是一个 Dispatcher 的组合,用于默认线程和一个 Job 来绑定协程的生命周期。这是一个不错的默认实现,但它可以做得更好。在该类中添加以下函数:

fun onStart() {
  parentJob = Job()
}

fun onStop() {
  parentJob.cancel()
  // You can also cancel the whole scope
  // with `cancel(cause: CancellationException)`
}

通过添加这个功能,你可以停止和取消所有通过这个 Scope 启动的协程。通过启动它,你可以创建一个新的 Job,协程将依赖它的生命周期和可能的isActive 检查。一旦你调用 onStop,所有的协程将被取消,如果你已经实现了它们的合作,它们应该不会占用任何资源。现在,如果你想用新的作用域来启动协程。

fun main() {
  val scope = CustomScope()

  scope.launch {
    println("Launching in custom scope")
  }
  
  scope.onStop() //cancels all the coroutines
}

相当有用,而且相当干净!

你想解决的第二个问题是,可能会存在一个永远不会返回值的 async 块。假设你有下面的代码:

fun <T> produceValue(scope: CoroutineScope): Deferred<T> =
  scope.async {
    var data: T? = null

    requestData<T> { value ->
      data = value
    }

    while (data == null) {
        // dummy loop to keep the function alive
    }

    data!!
  }

这是一个函数,允许你通过包裹一个使用回调的现有函数,通过 async 提供任何类型的值。requestData 的实现并不重要,只要把它看作是一个可以获取任何类型数据的函数,并使用回调将其传递给你。

如果没有出错,代码将按预期工作。但如果回调从未被触发,而你是这样调用该函数的:

fun main() {
  GlobalScope.launch(context = Dispatchers.Main) {
    val data = produceValue<Int>(this)

    data.await()
  }
}

它最终可能会无限期地挂起启动块,即使你取消了从 launch 返回的 Job,如果 await 已经被无约束或主调度器调用,你也可能冻结系统和用户界面。这就是为什么你应该再次整合父 JobisActive 标志作为关键条件的破坏者。如果你把代码改为以下内容。

fun <T> produceValue(scope: CoroutineScope): Deferred<T> =
    scope.async {
      var data: T? = null

      requestData<T> { value ->
        data = value
      }

      while (data == null && scope.isActive) {
        // loop for data, while the scope is active
      }

      data!!
    }

你不仅依靠你的内部函数条件,而且还依靠 scope.isActive,允许你取消最外层的协程,反过来,也取消异步块。请注意,Jetbrains 团队已经增加了更好的取消协程的功能,所以冻结 UI 的情况应该不会发生,但安全总比遗憾好。然而,取消有时不能被传播。如果你的代码没有创建一个可以取消的挂起点,那么你只是在调用普通的、阻塞的代码,你仍然可以阻塞协程所在的线程。所以,对你的代码要有合作精神!

下一篇:Kotlin Coroutine 协程 - 05 协程上下文

comments powered by Disqus