Back
Featured image of post Kotlin Coroutine 协程 - 01 异步开发往事书

Kotlin Coroutine 协程 - 01 异步开发往事书

追忆异步理念和开发工具迭代的历史

本文是对 Kotlin Coroutines by Tutorials By Filip Babić and Nishant Srivastava 书本内容的转述和笔记书本内容是受原版权保护的内容

Chapter 1: 异步开发往事书

想象一个用户场景,你需要让用户上传图片,并需要在在上传时显示加载 UI(比如 Spinner 旋转加载条),并在结束上传时隐藏加载 UI。 很自然的,我们可以想到如下代码:

fun uploadImage(image: Image) { 
  showLoadingSpinner() 
  // Do some work 
  uploadService.upload(image) 
  // Work’s done, hide the spinner 
  hideLoadingSpinner() 
}

但是很遗憾,现实中的异步开发并没有我们想象中的那么简单。

这是因为,通常而言在 showLoadingSpinner() 中含有如下的 阻塞调用(Blocking Call)

fun showLoadingSpinner() { 
  showSpinnerView() 
  while(running) { 
    rotateSpinnerImage() delay() 
  } 
}

这意味着我们必须等待 showLoadingSpinner() 的调用结束,才能调用 uploadService.upload(image),这便造成了代码执行结果与预想中的并不一致。调用 showLoadingSpinner() 代码块,也被称为阻塞调用。

Blocking Calls 阻塞调用

阻塞调用最基本的逻辑是:「无返回,不结束」。

在上面的例子中,因为主线程在调用 showLoadingSpinner() 一直在等待其函数调用的返回,只要当 showLoadingSpinner() 没有返回值,那么主线程就会一直阻塞在 showLoadingSpinner() 不结束,这也就阻止了下一行代码 uploadService.upload(image) 的运行。

那么最简单的解决办法便呼之欲出,通过使用 多线程,分别是主线程(Main Thread),和 UI 线程(UI Thread),在不同的线程上并行执行代码操作。

Why Multithreading? 使用多线程的原因

在摩尔定律逐渐趋于边际效应递减即将失效边缘的今天,相较于 5-10 年前,我们不能寄希望于硬件速度在单核单线程层面的发展能使得我们完成所有的任务,况且,许多任务也并不是需要同时完成。

当下计算速度的发展将重心放在了核心数量以及其拥有的线程数量,这可以使得大量操作可以 同时(Concurrently) 并行运行,并以此节约大量执行时间,优化程序速度。

Comparing the Main and Worker Threads 比较主线程与子线程

主线程,和 UI 这一类的线程,最需要做的事情是 非阻塞式(No-blocking Call) 的分发给其他子线程,让子线程运行阻塞代码,以减轻主线程和 UI 线程的负担,并防止 死锁(Deadlock) 的发生。

就像在上一个例子讲到的,showLoadingSpinner() 应该被分配给一个新的线程去执行,这个新的线程只负责旋转 UI 的动画,并且可以立即返回不需要等待线程结束,这便是 非阻塞式的函数调用。而主线程应该继续渲染 UI,而 IO 线程会在 uploadService.upload(image) 结束后通知主线程隐藏旋转加载条。

Interacting with the UI Thread from the Background 线程互交互

线程之间并不是完全独立的,他们需要互相通信共享信息,就像 uploadService.upload(image) 需要通知主线程一样,线程之间需要通信并可能访问相同的数据和资源。

Share Data 数据共享

为了进行通信,不同的线程需要共享一些数据。例如,负责旋转旋转加载条的线程,需要通知主线程:「一个新的图像已经准备好被显示」

数据共享需要在异步的过程中达成某种同步,不然会导致执行顺序的巨大变化进而导致程序结果的不可预测性。

例如,如果主线程收到了已经上传好新图片,新图像可用的通知,而在显示之前,图像被其他线程替换了,这对于计算机程序来说是灾难性的。

在这种情况下,应用程序会跳过一帧,发生 竞赛条件(Race Condition)。因此,你需要某种 线程安全(Thread Safe) 的数据结构。这意味着,即使被多个线程同时访问,该数据结构也应正常工作,确保执行顺序的正常。

但是数据如果数据是只读的,并且不可更改和写入,在这种情况下,多个线程可以读取相同的数据而没有任何竞赛条件,这种 不可变的(Immutable) 对象总是线程安全的。

Queues 队列

队列不仅仅只是一个容器,它可以为提供一种同步,保证信息的顺序,而当队列为 阻塞队列(Block Queue) 时,队列可以阻塞当前线程,并等待新的信息。

队列是可以并行的,就像你在银行前总能看到有多排柜台一样。

Pipelines 管道

而当其他线程接受到了信息后,信息是通过 Pipeline 阻塞的经过每一个需要这些信息的地方。

管道是必须阻塞的,当流水线上的第一个工人没有完成第一步时,后面的工人在没有返回的情况下无法做任何事情。

Handling Work Completion Using Callbacks 通过回调来通知任务的完成

在异步开发的所有机制中,回调是最常被使用的。例如,当你分配给某人任务时会同时要求他在任务结束时按一个按钮,这个按钮便类似于让他们执行的代码,而执行任务的人是一个无阻塞的函数。

当我们将 uploadService.upload(image) 也移到另外的 IO 子线程后,我们需要他在结束时通知 Spinner 线程,让他停止显示旋转加载条。

fun uploadImage(image: Image) { 
  showLoadingSpinner() 
  uploadService.upload(image) { hideLoadingSpinner() } 
}

而且在 lambda 函数块中,你可以做任何事情,而不仅仅是隐藏一个加载旋钮,比如一般而言像是 uploadService.upload(image) 可能会返回上传状态,而你可以在 lambda 中直接使用这些传递的值,类似于 uploadService.upload(image) { returnCode -> showDialog(returnCode) }

然而,尽管回调是处理异步完成时任务的最常见的方法之一,但他们在异步编程发展的多年来逐渐变得臭名昭著。

Indentation / Callback Hell 缩进 / 回调地狱

回调在一般的情况下,处理一些简单的函数时,它们的语法是相当具有可读性的。然而,通常在更复杂的情况下,你需要进行多个异步的函数调用,并且还需要需要以某种方式连接或组合,将结果映射到更复杂的对象,这其中线程通信的开发难度极大。

比如,在一些场景下,你需要同时获取用户的姓名和头像,并将他们合并成一个对象,想象一下,回调将如何完成这样的需求。

因为这种开发难度,甚至会导致人们倾下于将这种代码 写成串行 的(也就是先获取姓名,再获取头像),然后最后合并,这使得 能并行执行的代码没有并行,最后导致完全不必要的性能损耗。

在这种情况下,代码的编写、维护和推理都变得非常困难。由于你不能从回调中返回一个值,而必须把它传递给 lambda 块本身,所以你必须对回调进行嵌套。这类似于在集合上嵌套 forEachmap 语句,其中每个操作都有其 lambda 参数。

此外,一些人认为回调从一开始就很难掌握。这些陡峭的学习曲线,再加上可能超过 10 层的多层嵌套结构产生的认知负担和可扩展性的缺乏,使得人们通过尝试其他方法寻找异步开发的解决方案。

这就是为什么诸如 RxJava 这类 Reactive Extensions 工具出现在了我们的生活。

Using reactive extensions for background work 使用 Rx

基于回调的方法最大的问题是很难将数据从一个函数传递到另一个函数,这导致了 嵌套的回调(Nested Callbacks)

但是如果你仔细审视一下队列和管道,它们是用数据流来操作的,只要你需要,你就可以 监听(Listen) 数据。

而 Rx 便是通过将异步操作包裹在事件流,用来缓解回调地狱的问题。

Rx 使用 观察者模式(Observer Pattern) 用以 观察(或者可以说是处理)数据流,并且通过拓展大量的 可观察流(Observable Stream) 操作符,扩展了可观察流的操作方式,使数据处理变得简洁而优雅。

你可以 订阅(Subscribe) 一个 事件流(Event Stream),以多种方式映射(Map)、过滤(Filter)、聚合(Reduce)和组合事件(Combine Events),以及使用一个 lambda 函数处理整个操作链中的错误。

之前加载、上传和调整图片大小的例子,可以使用 Rx 可以表示出来。

fun uploadImage(imagePath: String) { 
  loadImage(imagePath) 
      .doOnSubscribe(::showLoadingSpinner) 
      .flatMap(::resizeImage) 
      .flatMap(::uploadImage) 
      .subscribe(::hideLoadingSpinner, ::handleError)
}

::showLoadingSpinner 是 Kotlin 的函数引用方法,完整写出是这样 .doOnSubscribe { showLoadingSpinner(it) },而正是因为只有一个参数,编译器可以完全推测你就是想要将函数参数塞入另一个函数之中,所以你只需要告诉编译器,你要执行什么函数就可以了

当我们一开始看到这些代码可能会觉得很奇怪。但其实这就是一个通过使用一系列运算符来修改数据的数据流(这和我们曾经提到过的 Kotlin Flow 十分相似)

首先当 loadImage 与其下面一系列的代码被执行后,什么都不会发生。loadImage 只是创建了一个数据订阅流,最终是让 Rx 具体管理这些数据流的操作,并且告诉 Rx 这些数据流该按照什么顺序操作。如果当有数据进入时,代码将会从 flatMap 开始,将得到的数据输入给 resizeImage。所以如果没有数据,比如 loadImage 仍在阻塞和等待,那数据流便不会执行,但这已经和 uploadImage 函数本身没关系了,因为这些数据流的监听是被 Rx 管理的,所以不会使得 uploadImage 变成阻塞调用。同样的逻辑,第二个 flatMap 会一直等待上一个 flatMap 的执行结果,而当这一切结束之后会将流最后的结果交给 subscribe 中的 hideLoadingSpinner 也就是最终任务结束的回调方法,或者是处理过程中可能的报错 handleError

而其中的 doOnSubscribe 你可以理解为是订阅这个流会发生的事件,也就是当我们订阅这个流时,便会执行 showLoadingSpinner 函数,像是这样的函数还有很多,比如 doOnSuccessdoOnError

所以这些数据和操作流实际上不会被执行,直到有人使用 subscribe(onComplete, onError) 函数来订阅它们。

此外,如果在一个链中的任何操作发生错误或异常,它不会被抛出(Throw),程序也不会崩溃。相反,流会将这些错误沿着链传递,最终到达 onErrorlambda 块。回调很难做到这样的事情,当有错误发生时,它们会直接抛出异常,你必须自己用 try/catch 块来处理它,并进一步加剧缩进地狱和可读性的降低。

在异步开发方面,Rx 比回调干净很多,但它们也有一个更陡峭的学习曲线。由于有几十个操作符、各种不同类型的流和很多关于线程间切换的边缘案例,要完全理解它们需要很长的时间。

Diving deeper into the complexity of Rx 深入了解 Rx 复杂性

由于本文不是关于 Rx 的,你只能也只需要对它的正反两方面的特点有一个模糊的印象即可。

正如之前所说的,Rx 使异步编程变得干净可读。此外,与诸如 map, reduce, filter 等常见的数据处理的操作符相比,Rx 的 Observer Pattern 是一个强大的机制和设计理念。此外,流的错误处理概念为程序增加了额外的安全性。

但 Rx 并不完美,它和其他框架或范式一样有它的问题,其中一些问题最为尖锐。

首先,是学习曲线,学习 Rx 并不同于像是 Java -> Python 这种简单的语言切换,你必须要学习一些额外的概念,比如 Observer Pattern 和流,对于这些抽象概念的理解通常需要花费较长时间,就像你刚接触编程时一样。此外,你还会进一步的发现,Rx 不仅仅只是一个框架,它带来了一种全新的范式,即 反应式编程(Reactive Programming)。正因为如此,想要开始使用 Rx 是非常困难的,进一步掌握其运算符的特性就更难了。操作符的数量、线程调度(Thread Scheduling)的类型,以及两者之间的排列组合创造了许多的可能,这使得人们几乎不可能在短时间了解 Rx 的全部内容,并写出 Rx 的最佳实践。

Rx 的另一个问题是炒作。多年来,人们把 Rx 作为异步操作的瑞士军刀来使用,而不考虑其背后所承载的高复杂性。

这最终导致这类编程以 Rx 为主导,给现有的应用引入了更多不必要的复杂性。通过寻找各种变通方法和使用众多的设计模式,仅仅只是为了让 Rx 工作,引入了许许多多的不需要的复杂层。正因为如此,在安卓系统中,Rx 社区一直在争论程序员是否应该将网络请求作为数据流来表示,而不是仅仅使用回调或更简单的方法来处理单一事件。因为回调相较于于 Rx ,在没有 Rx 基础的人中看来,单层的简单回调显然更具有可读性。同样的辩论甚至在 ClickEventListener 发生,程序员是否也应该将点击作为事件流来表示?在这些问题上,Rx 社区的意见非常分歧。

那么,考虑上述种种这些,是否有更好或更简单的方法来处理异步任务?奇怪的是,有一个可以追溯到几十年前的概念,逐渐在最近成为一个热门话题。

A blast from the past 协程虚空往事书

本文是关于协程(Coroutines)的讨论,而它们是一种可追溯到 20 世纪 60 年代的机制。协程是一种处理异步编程的独特方式。这个概念是围绕着使用 挂起点(Suspension Point)、函数(Functions)和连续(Continuations) 作为语言中的第一类公民而展开的。

这些概念现在看起来是有点抽象,所以最好是展示一个例子。

fun fetchUser(userId: String) { 
  val user = userService.getUser(userId) // 1
  print("Fetching user") // 2
  print(user.name) // 3
  print("Fetched user") // 4
}

使用上面的代码片段,并重温你所学到的关于阻塞调用的知识,你会说执行顺序为 1,2,3,4。但是如果你仔细看一下这段代码,你会发现这并不是唯一可能的逻辑顺序。例如,1 和 2 的顺序并不重要,3 和 4 的顺序也不重要。而真正重要的是,用户数据在显示之前被获取,也就是 1 必须发生在 3 之前。你也可以把获取用户数据的时间,推迟到用户数据实际显示之前的一个方便的时间。以一种看上去更舒服,简单,透明的方式来管理这些问题,这就是被称为协程的黑魔法!

它们是一种利用系统的 调度(Dispatch)挂起工作(Suspending Work) 的能力,实现部分线程(Part thread)、部分回调(Part Callback)的机制。这样,你可以立即从调用中返回一个结果,而不需要使用回调、线程或流。换言之,一旦你启动一个协程,或者调用一个 Suspend 函数,这个函数的返回值结果会被很好的包裹起来,立刻返回给你,这个值就像一个玉米饼一样,从外观上看,馅饼皮是准备好了,但是直到你在使用到这个变量之前,或者说,吃这个馅饼之前,里面的代码不会被执行。

Explaining coroutines: The inner workings 浅析协程的内部原理

但这其实并不是真正的魔法,只是一种使用底层处理的聪明方法。getUser 函数将会被标记为 Suspend 函数,这意味着系统会在后台准备调用,你会得到一个未完成的、但包装好的玉米饼。但它可能还没有执行这个函数。系统会将它移到线程池中,在那里等待进一步的命令。一旦你准备吃玉米卷,并调用结果,程序就会阻塞,直到你得到一个准备好的玉米饼

知道了这一点,程序就可以跳过函数代码的其他部分,直到它到达使用这个函数返回结果的第一行代码,这被称为 Awaiting。然后到了使用返回结果的那个点,协程开始执行 getUser 函数,如果 getUser 还没有执行完的话,协程就会 Suspend 程序,等到执行完了再从这里恢复,再继续执行。

这意味着在调用本身和使用其返回结果之间,你可以做任何你想做的处理。因为编译器知道 Suspension Point 和函数是异步的,所以可以按照预想的顺序处理它们的执行,所以你可以写出可理解的、干净的代码,这是很容易扩展和维护的。

由于使用 Suspension Point 编写异步代码非常简单,你可以很容易地组合多个请求并且转换数据,没有回调地狱,奇怪的流,Map 来传递数据,或复杂的运算符来组合或转换结果。你所需要做的就是把函数标记为 Suspendable,并在一个协程块中调用它们。

另一件关于协程极其重要的事情是,它们不是线程。它们只是一种低层次的机制,利用线程池在多个现有线程之间切换工作。这使得你可以创建数以百万计的协程,而不至于使内存过大;但是一百万个线程所占用的内存,即便是今天最先进的计算机也会崩溃。

尽管许多语言都支持协程,但每种语言都有不同的实现方式。

Variations through history 历史的变迁

在查阅资料时我还发现了 这篇关于 JavaScript 协程写法变迁的文章,如果读者有兴趣可以另行翻阅。

原文内容和这篇资料几乎一致,甚至这篇资料更为详细,我便不重新复制在这里,引得啼笑皆非了。

下一篇:Kotlin Coroutine 协程 - 02 协程入门教程

comments powered by Disqus