Back
Featured image of post Kotlin Coroutine 协程 - 03 挂起函数

Kotlin Coroutine 协程 - 03 挂起函数

通过反编译的方法深入了解 Kotlin 协程挂起函数和挂起点延续性的本质

本文是对 Kotlin Coroutines by Tutorials By Filip Babić and Nishant Srivastava 书本内容的转述和笔记书本内容是受原版权保护的内容

上一篇:Kotlin Coroutine 协程 - 02 协程入门教程

Chapter 3: 挂起函数

TL; DR: 这里是废话

到目前为止,你已经学到了很多关于协程的知识。你已经看到了如何在没有任何线程分配或内存泄漏的开销的情况下启动协程并以此进行异步开发。然而,协程的基础是能够挂起代码,随意控制代码的执行流程,并以同种语法和同种顺序的代码结构从同步和异步操作中获取返回值。

在本文中,你将了解更多关于 可挂起函数(Suspendable Function)内部的工作方式。你将看到如何将回调代码转换为挂起方法,其调用方式与常规阻塞的函数相同。在整个过程中,你将了解到协程难题中最重要的一块是什么。

Suspending vs. Non-suspending 挂起与非挂起

TL; DR: 这里也是废话

到现在为止,你已经了解到协程依赖于「可挂起代码」和「可挂起函数」的概念。可挂起代码与常规代码最大的区别在于,系统有能力 暂停(也可以说是挂起) 可挂起代码的执行,并在以后继续执行。但是从调用的角度讲,两者基本相同。

如果你还做过尝试的话,复制一个你使用的函数,但在函数前添加 suspend 关键字。你会发现,尽管你可以用相同的参数调用这两个函数,但你必须将可挂起的函数包裹在一个 launch 块中,因为 Kotlin 的协程 API 是这样构建的,但实际的函数调用并没有改变。

系统在编译时通过 suspend 关键字来区分这两类函数,但是这些函数在哪里是不同的,他们运行起来如何不同,以及这两种函数在 Kotlin 协程中的挂起机制方面是如何工作的?通过分析每个函数生成的字节码,以及解释调用栈,就可以找到答案。

Analyzing a Regular Function 分析一个常规函数

如果你调用如下这样一个常规函数,它不依赖于回调或者协程。这个函数将有四个不同的变体。这个变体是最初级的,所以我们先来分析一下:

fun getUserStandard(userId: String): User {
  Thread.sleep(1000)

  return User(userId, "Filip")
}

该函数接收一个参数 userId,随之让当前线程休眠一秒钟,以此模拟一个长期运行的操作,而后它将会返回一个用户。实际上这个函数很简单,这里没有隐藏的机制在起作用。按 Tools ▶︎ Kotlin ▶︎ Show Kotlin Bytecode 来分析生成的字节码。之后,你应该看到 Kotlin 字节码窗口被打开,通过按下 Decompile 按钮,你就可以看到生成的反编译代码,它看起来应该像这样

@NotNull
public static final User getUserStandard(@NotNull String userId) {
  Intrinsics.checkParameterIsNotNull(userId, "userId");
  Thread.sleep(1000L);
  return new User(userId, "Filip");
}

你可以看到它与实际的代码没有什么区别,它就是直截了当地做了代码中所说的事情,对代码的唯一补充是空安全机制。

这个函数干净而简单,但这里的问题在于 Thread.sleep(1000) 的调用。如果你在主线程上调用这个函数,你会有效地将你的主线程或是用户界面冻结一秒钟。但是如果你使用回调来实现,并为长期运行的操作创建一个新的线程,就会好得多。这实际上是第二个例子,看看如何用回调来实现。

Implementing the Function with Callbacks 实现回调

更换原有 getUserStandard() 如下,这就实现了一个简单的回调

fun getUserFromNetworkCallback(
    userId: String,
    onUserReady: (User) -> Unit) {
  thread {
    Thread.sleep(1000)

    val user = User(userId, "Filip")
    onUserReady(user)
  }
  println("end")
}

同时更换原有 main() 如下

fun main() {
  getUserFromNetworkCallback("101") { user ->
    println(user)
  }
  println("main end")
}

然后再次运行字节码分析器,你便可以看到如下结果

public static final void getUserFromNetworkCallback(
@NotNull final String userId,
@NotNull final Function1 onUserReady) {
  Intrinsics.checkParameterIsNotNull(userId, "userId");
  Intrinsics.checkParameterIsNotNull(onUserReady, "onUserReady");
  ThreadsKt.thread$default(
  false,
  false,
  (ClassLoader)null,
  (String)null,
  0,
  (Function0)(new Function0 () {
    // $FF: synthetic method
    // $FF: bridge method
    public Object invoke() {
      this.invoke();
      return Unit.INSTANCE;
    }

    public final void invoke () {
      Thread.sleep(1000L);
      User user = new User(userId, "Filip");
      onUserReady.invoke(user);
    }
  }), 31, (Object)null);
  

  String var2 = "end";
  System.out.println(var2);
}

与之前生成的那段代码相比,这是一个相当大的变化。系统再次做了一系列的空检查,以强制执行类型系统。随后创建了一个新的线程,在该线程的 public final void invoke() 中,在这里它调用了封装的代码。

一旦系统运行 getUserFromNetworkCallback,这就会创建一个线程。一旦线程完全建立起来,它就会运行代码块,并通过回调将结果传播回来。如果你运行上面的代码,你会得到以下结果:

end
main end
User(id=101, name=Filip)

这意味着主函数会在 getUserFromNetworkCallback 完成之前完成。它启动的线程完成在主线程之后。这个函数相比上一个例子的优点在于,它把耗时操作从主线程上移除了,并使用回调来最终返回数据。但这里的问题是,在这个过程中如果发生了任何异常,你必须用一个 try/catch 块来包裹它。但是,一般而言 try/catch 块最好创建在创建值所在的地方(也就是 IO / 耗时操作所在的地方)。然而,如果你在那里捕捉到了一个异常,你如何将它传播到主代码中去呢?

这一般是通过回调函数增加一个 error 参数来实现的,允许回调返回一个值或一个异常。

Handling Happy and Unhappy Paths 处理「成功」与「异常」的路径

这一段就是在说,成功有一个分支,报错了也得有一个

在异步开发时,你通常有一个叫做 快乐路径(Happy Paths) 的东西。当一切都很顺利的时候,你的程序将采取的行动路线。与此相反,你有一个 不快乐路径(Unhappy Paths),也就是事情出错的时候。在上面的例子中,如果事情出了差错,你就没有办法在回调中处理这种情况。你要么用一个try/catch 块包住整个函数调用,要么从线程函数中捕捉异常。前者有点难看,因为你真的希望在同一个地方处理所有可能的路径。后者也不是很好,因为你能传递给回调的只是一个值,所以你必须传递一个可空的值,或者一个空对象,尽管这对于空安全来说不是一件好事,我们还是尝试使用这种方法作为演示。

为了使这个功能可用,而且更干净,一般我们将回调定义为一个双参数的 lambda,第一个参数是值,如果发生错误的话,那第二个参数便是错误。

fun getUserFromNetworkCallback(
    userId: String,
    onUserResponse: (User?, Throwable?) -> Unit) {
  thread {

    try {
      Thread.sleep(1000)
      val user = User(userId, "Filip")

      onUserResponse(user, null)
    } catch (error: Throwable) {
      onUserResponse(null, error)
    }
  }
}

回调现在可以接受一个值或一个错误,用于帮助调用者辨析其走了那一条路径,当按下 Decompile 按钮查看字节码时,在字节码反编译窗口,你将会看到以下内容。

public static final void getUserFromNetworkCallback(
@NotNull final String userId,
@NotNull final Function2 onUserResponse) {
  Intrinsics.checkParameterIsNotNull(userId, "userId");
  Intrinsics.checkParameterIsNotNull(onUserResponse, "onUserResponse");
  ThreadsKt.thread$default(
  false,
  false,
  (ClassLoader)null,
  (String)null,
  0,
  (Function0)(new Function0 () {
    // $FF: synthetic method
    // $FF: bridge method
    public Object invoke() {
      this.invoke();
      return Unit.INSTANCE;
    }

    public final void invoke () {
      try {
        Thread.sleep(1000L);
        User user = new User(userId, "Filip");
        onUserResponse.invoke(user, (Object)null);
      } catch (Throwable var2) {
        onUserResponse.invoke((Object)null, var2);
      }

    }
  }), 31, (Object)null);
}

代码没有什么变化,只是把所有的东西都包在一个 try/catch 里,然后把 (value, null)(null,error) 这一对传递给 user。回到 main(),将代码更改如下。

fun main() {
  getUserFromNetworkCallback("101") { user, error ->
    user?.run(::println)

    error?.printStackTrace()
  }
}

如果有一个非空的 user,你可以把它打印出来,或者用它做其他事情。

另一方面,如果有一个错误,你可以打印它的堆栈跟踪或检查错误类型等等。

这种方法比以前的方法好得多,但仍有一个问题。它依赖于回调,所以如果你需要三个或四个不同的请求和值,你就必须建立那个可怕的「回调地狱」

此外,每当你调用这样的函数时,还存在分配一个新线程的开销。

Analyzing a Suspendable Function 分析挂起函数

在回调例子中发现的问题和难处,是可以通过使用协程来补救的。

通过协程 API,你将

  • 去掉回调,用协程实现这个例子。
  • 提供有效的错误处理。
  • 删除新的线程分配开销。

为了解决所有这些障碍,你将学习协程 API 中的另一个函数 suspendCoroutine。这个函数允许你手动创建一个协程并处理其控制状态和流程。与 launch 块不同的是,launch 它是在幕后处理了的上述控制与流程。

但是,在我们冒险进入 suspendCoroutine 之前,先分析一下当你只是将 suspend 修改器添加到任意一个现有函数中时会发生什么?

suspend fun getUserSuspend(userId: String): User {
  delay(1000)

  return User(userId, "Filip")
}

这个函数与第一个例子非常相似,只是你添加了 suspend 修改器,而且你没有让线程休眠,而是调用 delay(),它可以让协程挂起一定的时间。你可能会觉得,这一些变化,并不会使得字节码差异巨大,对吗?

而事实是:

@Nullable
public static final Object getUserSuspend(
@NotNull String userId,
@NotNull Continuation var1) {
  Object $continuation;
  label28: {
    if (var1 instanceof < undefinedtype >) {
      $continuation = (<undefinedtype>)var1;
      if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
        ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
        break label28;
      }
    }

    $continuation = new ContinuationImpl(var1) {
    // $FF: synthetic field
    Object result;
    int label;
    Object L $0;

    @Nullable
    public final Object invokeSuspend (@NotNull Object result) {
      this.result = result;
      this.label | = Integer.MIN_VALUE;
      return MainKt.getUserSuspend((String)null, this);
    }
  };
  }

  Object var2 =((<undefinedtype>)$continuation).result;
  Object var4 = IntrinsicsKt . getCOROUTINE_SUSPENDED ();
  switch(((<undefinedtype>)$continuation).label) {
    case 0:
    if (var2 instanceof Failure) {
      throw ((Failure) var2).exception;
    }

    ((<undefinedtype>)$continuation).L$0 = userId;
    ((<undefinedtype>)$continuation).label = 1;
    if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
    return var4;
  }
    break;
    case 1:
    userId = (String)((<undefinedtype>)$continuation).L$0;
    if (var2 instanceof Failure) {
      throw ((Failure) var2).exception;
    }
    break;
    default:
    throw new IllegalStateException ("call to ’resume’ before ’invoke’ with coroutine");
  }

  return new User (userId, "Filip");
}

简而言之,就是把每一个挂起点上下拆成两段代码,然后通过参数里的 state 判断你现在应该执行哪一段,字节码膨胀神器了属于是🤣 这段的翻译有一些混乱,凑合着看吧

这个庞大的代码块与之前的例子相比有很大的不同。为了了解正在发生的事情,我们一步步来看这些代码。

  • 你会注意到的第一件事是函数的一个额外参数──延续(Continuation)。它构成了协程的全部基础,这也是可挂起函数不同于普通函数的最重要的区别。「延续」允许函数在挂起模式下工作。它们允许系统在挂起了一个函数后,回到该函数的原始调用点。你可以说,「延续」就是系统或当前运行的程序的回调,通过使用「延续」,系统知道如何引导函数的执行和引导他们的调用栈。

  • 也就是说,所有的函数实际上都有一个隐藏在内部与之相联系的 Continuation。系统用它来导航调用栈和一般的代码。然而,可挂起函数有一个额外的实例,以便它们可以被暂停,并且程序可以继续执行,最后使用第二个 Continuation,以导航回到可挂起函数调用点或接收其结果。

  • 剩下的代码首先检查我们在哪个 Continuation 中。由于每个可挂起函数可以创建多个 Continuation 对象。每个 Continuation 将描述该函数可以采取的一个流程。例如,你在一个可挂起函数上调用 delay(1000),你实际上是在创建另一个执行实例,它在一秒钟内完成,并返回到调用delay 的那一行。

代码包裹了 Continuation 的参数,并从内部调用该函数。一旦完成,它就检查当前活动的 Continuation 的标签。如果标签达到了 0,这意味着它处于最近一次执行的末端 - delay()。在这种情况下,它只是返回该执行的结果。最后,它还将标签增加到 1,以通知它已经过了 delay(),应该继续执行代码。

最后,如果标签是 1,也就是 continuation-stack 中最大的索引,可以这么说,这意味着函数在 delay() 之后已经恢复了,而且它已经准备好为你提供返回值──user。如果在这之前有什么问题,系统会抛出一个异常。

还有一种情况,即默认情况,如果系统试图用一个 Continuation 或执行流程来 resume() ,但如果它实际上没有调用该函数,就会抛出一个异常。这有时会发生在一个子 Job 在其父 Job 后完成的情况下。这是一个默认的后备机制,用于极为罕见的情况。如果你谨慎地使用你的协程和对应的 API 的话,父 Job 应该总是等待它们的子 Job

简而言之,系统对小的 状态机(State-machines) 和内部回调使用 Continuation,这样系统就知道如何在代码中导航,以及存在哪些执行流,应该在哪些点上挂起,并在以后恢复。状态是用标签描述的,它可以有多少个状态,就有多少个函数中的挂起点。

要调用新创建的函数,你可以使用下面的代码。

fun main() {
  GlobalScope.launch {
    val user = getUserSuspend("101")

    println(user)
  }
  
  Thread.sleep(1500)
}

这个函数调用就像第一个例子一样。不同的是它是可挂起的,所以你可以在一个协程中使用它,离开主线程。你也可以依赖来自协程 API 的内部线程,所以协程的启动几乎没有额外的开销。代码是有顺序的,尽管它是异步的。而且你可以在调用地点使用 try/catch 块,即使值可以异步产生。前面的例子中的所有问题都已经解决了!

Change Code to Suspendable 将代码转为可挂起的

另一个问题是,你应该在什么时候将现有的代码迁移到可挂起函数和协程之上?这是一个相对偏颇的问题,但还是有一些客观的指导原则可以遵循,以确定你是使用协程还是标准机制更好。

一般来说,如果你的代码中充满了复杂的线程,并且经常分配新的线程来完成你需要的工作,但你没有能力使用固定的线程池,而不是边走边创建新的线程,你应该迁移到协程。性能上的好处是立即可见的,因为协程 API 已经有预定义的线程机制,使你很容易在线程之间切换,并在线程之间分配多件任务。

这往往与第一个转换的原因相吻合,但如果你正在建立新的线程,由于异步或长期运行的操作,你往往会大量滥用回调,因为线程之间最简单的通信方式就是通过回调。而且,如果你在使用回调,你很可能在代码风格、可读性和理解函数背后的业务逻辑所需的认知负荷方面存在问题。在这种情况下,你也应该尝试将你的代码迁移到协程上。

参考第一篇文章:Kotlin Coroutine 协程 - 01 异步开发往事书

当有一些不属于你可以改变的 API 的时候,问题就来了。在这种情况下,你不能改变源代码。假设你有以下代码,但它来自一个外部库。

fun readFile(path: String, onReady: (File) -> Unit) {
  Thread.sleep(1000)
  // some heavy operation

  onReady(File(path))
}

这个函数迫使你使用回调,尽管你可能有更好的方法来处理长期运行的或异步的操作。但是你可以很容易地把这个函数包起来,通过 suspendCoroutine 方法:

suspend fun readFileSuspend(path: String): File =
    suspendCoroutine {
      readFile(path) { file ->
        it.resume(file)
      }
    }

这段代码完全没有问题,因为如果它成功地读取了一个文件,它将把它作为一个结果传递给协程。如果出了问题,它会抛出一个异常,你可以在调用的地方捕捉到它。用协程完全包裹可能的异步操作的能力是非常强大的。但是如果你的函数依赖于循环回调来不断产生值──比如订阅 Socket,那么像这样的协程就没有什么意义了。最好用 Channel 或 Flow API 来实现这样的机制,你将在「第 10 章:Channels」和「第 13 章:开始使用Coroutines Flow」中了解更多。

Elaborating Continuations 阐明协程中的「延续」

拥有「延续性」是区分标准函数和可挂起函数的关键。但,什么是「延续」?每次程序调用一个函数时,它都会被添加到程序的调用栈中。这是一个由所有函数它们被调用的顺序组成的堆栈。「延续」会操纵这个执行流程,并反过来帮助处理调用栈。

你已经知道 Continuation 实际上是一个回调,但在一个非常低的系统层次上实现。更精确的解释是,它是程序控制状态的一个抽象包装。它拥有控制程序如何和何时进一步执行的手段,以及它的结果是什么,可能是一个异常或一个值。

一旦一个函数完成,程序就会把它从堆栈中取出,然后继续执行下一个函数。诀窍在于,在每个函数执行完毕后,系统如何知道在哪里返回。这些信息被保存在前面提到的 Continuation 中。每个 Continuation 都保存了一些关于函数被调用时的上下文信息。比如局部变量、函数被传递的参数、被调用的线程等等。通过使用这些信息,系统可以简单地依靠 Continuation,告诉它在函数结束时需要去哪里。

试着看看,从函数调用到结束,函数和 Continuation 的生命周期是什么。

Living in the Stack 深入调用栈

当一个程序第一次启动时,其调用栈只有一个条目──初始函数,通常称为 main()。这是因为在它里面,还没有调用过其他的函数。初始函数很重要,因为当程序到达终点时,它会回调 main()Continuation,从而完成程序,并通知系统将其从内存中释放。

随着程序的运行,它调用其他函数,将它们加入到堆栈中。

因此,如果你有这样的代码 fun main() {},程序级 Continuation 的生命周期就包含在 main 函数的括号内。但是当另一个函数被调用时,系统做的第一件事就是为新函数创建一个新的 Continuation。它向新的 Continuation 添加一些信息,比如什么是父函数和它的需要延续对象–在这里是 main()。此外,它还传递了关于该函数在哪一行被调用、带有哪些参数以及其返回类型是什么的信息。

试想,运行这段代码会发生什么

fun main() {
  val numbers = listOf(1, 2, 5)
}
  • 系统创建了一个 Continuation,它将存在于 listOf() 之中。

  • 首先,它知道自己是在 main() 的第一行被调用的,所以当它完成后可以在代码的适当位置返回。

  • 接下来,它知道它的父类是 main()。这使得 listOf() 有办法完成整个程序,将调用一直传播到最初的 Continuation。例如,这可以在发生异常时发生。最后,它知道传递给 listOf() 的参数是一个变量参数数组,其值为 1、2、5,在函数结束时,我们应该收到一个 List<Int>

  • 有了所有这些信息,它就能导航函数的执行和生命周期,从调用点到返回语句。

从更深层次看,这就像声明了一个局部变量,用一个指向该变量的指针调用初始化函数,这样你就可以在 listOf() 中设置值,然后使用 goto 语句,返回到初始化函数调用后的一行,为使用该变量做好准备。

此处删除了一个原文极其重复且傻逼的比喻

Handling Continuation 处理「延续」

getUser() 的最后一个版本中,你使用了协程 API中的 suspendCoroutine。它是一个 Top-level 函数,允许你创建协程,就像 launch 一样,但专门用于返回值,而不是启动 JobsuspendCoroutine 的另一个特点是,它接收一个 lambda 作为参数,该参数的类型为(Continuation<T>) -> Unit。这意味着,你可以随意调用 Continuation 上的函数。这也就允许你手动控制状态和控制流的操作。

Continuation 上可用的函数是 resume, resumeWithresumeWithException。你还可以通过调用 continuation.context 来访问 CoroutineContext,但你将在后面的「第 5 章:Coroutine Context」中学习上下文。

多分析一下 Continuationresume 传递了一个成功的 T 类型的值,无论你想从一个循环程序中返回什么类型。resumeWithException 接收一个 Throwable,以备出现意外情况。这允许你用一个错误来结束循环程序,你可以在以后捕捉和处理这个错误。

有了这个功能,就可以从函数中返回值,这可能是异步的,而不知道后面有什么。就像一个 API 应该是这样的。你可能在想。但如果函数没有结束呢?

在这种情况下,你将再次等待一个值,而这个值并没有到来,这又一次导致了另一个问题,你的代码被无限地挂起

为了弥补这一点,最好对 Continuation 进行积极的处理。无论怎样,都要尽量产生一个结果,即使它只是一个异常。至少在这种情况下,你的函数会结束,你会有东西可以处理。很方便地,Continuation 有一个函数可以做到这一点。它被称为 resumeWith,它接收前面提到的 Result 单体。在某一时刻,结果只能是两种状态之一。要么是成功,持有你需要的值,要么是失败,持有异常。

它还持有一些实用的函数,比如 runCatching,它接收一个试图运行的 lambda 来获得 Success 的情况,并带有一些值。如果出了问题,在 try/catch 块的帮助下,它会捕获异常并在最后返回一个 Failure 结果。在 Continuation 协程收到结果后,解包后,你将得到值或者异常,这样你就可以自己处理。

无论何时你使用 suspendCoroutine,或者任何其他用 Continuation 协程恢复值的方法,都强烈建议你执行这种方法,这样你就不会出现永远无法完成的 Continuation 协程。

Creating Your Own Suspendable API 创建属于你的可挂起 API

我们提到 Jetbrains 对协程 API 很重要的想法之一是可扩展性。你已经看到了如何把你自己的函数变成可挂起的函数,但你可以做的另一件事是创建一套类似于 API 的实用工具,隐藏线程和上下文切换过程。

它使用 suspendCoroutine,和 ResultrunCatching 来尝试为你处理一个值。

suspend fun <T : Any> getValue(provider: () -> T): T =
    suspendCoroutine { continuation ->
      continuation.resumeWith(Result.runCatching { provider() })
    }

如果你试着调用的它的话,你写的代码应该会是这样

GlobalScope.launch {
  val user = getValue { getUserFromNetwork("101") }
    
  println(user)
}

这允许你抽象出所有试图获取一些数据的函数,通过网络、文件读取或数据库查询,并将它们推给后台线程,让主线程只关系渲染数据,而其余的代码则只关心获取数据。

接下来的两个例子极其简单,对线程切换很有用:

fun executeBackground(action: suspend () -> Unit) {
  GlobalScope.launch { action() }
}

fun executeMain(action: suspend () -> Unit) {
  GlobalScope.launch(context = Dispatchers.Main) { action() }
}

第一个是接收一个 lambda 块,并在后台使用默认的启动上下文运行它。第二个也接收 lambda 块,但使用 Dispatchers.Main 上下文运行它,所以你可以轻松地切换到主线程,而不需要知道实现的细节。

使用它们,你会有类似这样的代码:

executeBackground {
  val user = getValue { getUserFromNetwork("101") }

  executeMain { println(user) }
}

命名可以更好一点,但你会明白这背后的想法。现在你有与 GlobalScope.launch 块相同的行为,但你不需要依赖知道哪个作用域,以及哪个函数在幕后被使用。

多玩玩这些,根据你的需要,在它们上面建立更多的实用函数。

下一篇:Kotlin Coroutine 协程 - 04 Aynsc / Await

comments powered by Disqus