【Kotlin】协程的挂起恢复源码解析

本文介绍了CPS转换,后从反编译的源码层面介绍Kotlin协程挂起和恢复的原理
首先要了解的就是CPS转换。
CPS转换
在Kotlin协程中,挂起函数的执行是通过 Continuation Passing Style (CPS)转换 来实现的。CPS转换是一种将函数式编程中的函数调用转换为可传递的 Continuation 对象的过程。这里的转换是Kotlin编译器实现的,在跨平台属性上,也保证了流程的一致性。
CPS转换调用的过程如下:
- 当一个函数被调用时,它的参数和返回值会被封装在一个Continuation对象中。
- 函数的执行过程中,遇到挂起操作时,会将当前的Continuation对象传递给挂起函数。
- 挂起函数执行完毕后,会将结果封装在一个新的Continuation对象中,并将其传递给原始的Continuation对象。
- 原始的Continuation对象会继续执行,直到所有的挂起操作都完成。
假设我们有一个简单的suspend函数,它模拟了一个异步操作:
suspend fun fetchData(): String {
delay(1000) // 模拟耗时操作
return "Data fetched"
}
在CPS转换后,这个函数可能会被转换为类似以下的形式:
fun fetchData(continuation: Continuation<String>) {
delay(1000, object : Continuation<Unit> {
override val context: CoroutineContext = continuation.context
override fun resumeWith(result: Result<Unit>) {
continuation.resume("Data fetched")
}
})
}
在这个转换后的函数中,fetchData不再直接返回结果,而是通过continuation.resume方法将结果传递给调用者。简单来说,CPS其实就是函数通过回调传递结果的一种方式。
Kotlin协程通过将异步流程拆解为一系列 挂起点 ,对含有 suspend 关键字的函数进行了 CPS转换 ,即Continuation Passing Style转换,使其能够 接收Continuation对象 作为参数,并在异步操作完成后通过调用 Continuation 的恢复方法来继续执行协程。
在编译后的字节码中,协程的状态会被转换为状态机的形式,每个挂起点对应状态机的一个状态。当协程挂起时,它的执行状态会被保存在Continuation对象中,包括局部变量上下文和执行位置。
Continuation
Continuation (续体)是一个保存协程状态的对象,它记录了协程挂起的位置以及局部变量上下文,使得协程可以在任何时候从上次挂起的地方继续执行。Continuation是一个接口,它定义了 resumeWith 方法,用于恢复协程的执行。
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)//result 为返回的结果
}
- 续体是一个较为抽象的概念,简单来说它包装了协程在挂起之后应该继续执行的代码;在编译的过程中,一个完整的协程被分割切块成一个又一个续体。
- 在suspend函数或者 await 函数的挂起结束以后,它会调用 continuation 参数的 resumeWith 函数,来恢复执行suspend函数或者await 函数后面的代码。
CPS转换 使得协程能够在不阻塞线程的情况下执行异步操作。当协程挂起时,线程可以被释放去执行其他任务,从而提高了系统的并发性能。此外,CPS转换使得协程的挂起和恢复操作对开发者来说是透明的,开发者可以像编写同步代码一样编写异步代码。
发生 CPS 变换的函数,返回值类型变成了 Any?,这是因为这个函数在发生变换后,除了要返回它本身的返回值,还要返回一个标记CoroutineSingletons.COROUTINE_SUSPENDED,为了适配各种可能性,CPS 转换后的函数返回值类型就只能是 Any?了。
协程的启动
下面跟随启动,挂起,恢复的流程,从源码层面看看协程的核心原理。
测试代码入口:
object CoroutineExample {
private val TAG: String = "CoroutineExample"
fun main(){
// 启动协程,分析入口
GlobalScope.launch(Dispatchers.Main) {
request()
}
}
private suspend fun request(): String {
delay(2000)
Log.e(TAG, "request complete")
return "result from request"
}
}
从 CoroutineScope.launch 开始:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy){
LazyStandaloneCoroutine(newContext, block)
}else{
StandaloneCoroutine(newContext, active = true)
}
coroutine.start(start, coroutine, block)
return coroutine
}
- 参数一context:协程上下文,并不是我们平时理解的Android中的上下文,它是一种key-value数据结构。可以传入Main用于主线程调度。
- 参数二start:启动模式,此处我们没有传值则为默认值(DEFAULT),共有三种启动模式。
- DEFAULT:默认模式,创建即启动协程,可随时取消;
- ATOMIC:自动模式,创建即启动协程,启动前不可取消;
- LAZY:延迟启动模式,只有当调用start方法时才能启动。
- 参数三block:协程真正执行的代码块,即上面例子中launch{}闭包内的代码块。
SuspendLambda
CoroutineScope.launch中第三个参数类型为suspend CoroutineScope.() -> Unit函数,这是怎么来的呢?我们编写代码的时候并没有这个东西,其实它由编译器生成的,我们的 block代码块 经过编译器编译后会生成一个 继承Continuation 的 类SuspendLambda 。一起看下反编译的java代码,为了关注主要逻辑方便理解,去掉了一些无关代码大概代码如下:
public final void main() {
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getMain(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineExample var10000 = CoroutineExample.this;
this.label = 1;
if (var10000.request(this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Unit.INSTANCE;
}
···
}
从上面反编译的java代码中好像并不能很好的看出来协程中的block代码块具体编译长什么样子,但可以确定他是编译成了 Continuation类 ,因为我们可以看到实现的 invokeSuspend 方法实际是来自BaseContinuationImpl,而BaseContinuationImpl的父类就是Continuation。这个继承关系我们后面再说。既然从反编译的java代码中看的不明显,我们直接看上面例子的字节码文件,其中可以很明显的看到这样一段代码:
final class com/imile/pda/CoroutineExample$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2
这下恍然大悟,launch函数的第三个参数,即协程中的 block代码块 是一个编译后 继承了SuspendLambda并且实现了Function2的实例 。
SuspendLambda 本质上是一个 Continuation ,前面我们已经说过 Continuation 是一个有着恢复操作的接口,其 resume 方法可以恢复协程的执行。
SuspendLambda继承机构如下:
- Continuation: 续体,恢复协程的执行
- BaseContinuationImpl: 实现 resumeWith(Result) 方法,控制状态机的执行,定义了 invokeSuspend 抽象方法
- ContinuationImpl: 增加 intercepted 拦截器,实现线程调度等
- SuspendLambda: 封装协程体代码块
- 协程体代码块生成的子类: 实现 invokeSuspend 方法,其内实现状态机流转逻辑
每一层封装都对应添加了不同的功能,我们先忽略掉这些功能细节,着眼于我们的主线,继续跟进 launch 函数执行过程,由于第二个参数是默认值(DEFAULT),所以创建的是 StandaloneCoroutine , 最后启动协程:
// 启动协程
coroutine.start(start, coroutine, block)
// 启动协程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
上面 coroutine.start 的调用涉及到运算符重载,实际上会调到 CoroutineStart.invoke() 方法:
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
这里启动方式为默认的 DEFAULT ,所以接着往下看:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) = runSafely(completion) {
createCoroutineUnintercepted(receiver, completion)
.intercepted()
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
整理下调用链如下:
coroutine.start(start, coroutine, block)
-> CoroutineStart.start(block, receiver, this)
-> CoroutineStart.invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>)
-> block.startCoroutineCancellable(receiver, completion)
->
createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
最后走到 createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation) ,这里创建了一个协程,并链式调用 intercepted、resumeCancellable 方法,利用协程上下文中的 ContinuationInterceptor 对协程的执行进行拦截,intercepted 实际上调用的是 ContinuationImpl 的 intercepted 方法:
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
...
public fun intercepted(): Continuation<Any?> =
intercepted
?:(context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
...
}
context[ContinuationInterceptor]?.interceptContinuation调用的是 CoroutineDispatcher 的 interceptContinuation 方法:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
内部创建了一个 DispatchedContinuation 可分发的协程实例,我们继续进到看resumeCancellableWith 方法:
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
...
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
// 判断是否是DispatchedContinuation 根据我们前面的代码追踪 这里是DispatchedContinuation
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
// 判断是否需要线程调度
// 由于我们之前使用的是 `GlobalScope.launch(Main)` Android主线程调度器所以这里为true
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
...
}
最终走到 dispatcher.dispatch(context, this) 而这里的 dispatcher 就是通过工厂方法创建的 HandlerDispatcher ,dispatch() 函数第二个参数this是一个runnable这里为 DispatchedTask
HandlerDispatcher
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
...
// 最终执行这里的 dispatch方法 而handler则是android中的 MainHandler
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
...
}
这里借用 Android 的主线程消息队列来在主线程中执行 block Runnable而这个 Runnable 即为 DispatchedTask:
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
...
public final override fun run() {
...
withContinuationContext(continuation, delegate.countOrElement) {
...
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
// 异常情况下
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
// 异常情况下
continuation.resumeWithException(exception)
} else {
// step1:正常情况下走到这一步
continuation.resume(getSuccessfulResult(state))
}
}
}
...
}
}
//step2:这是Continuation的扩展函数,内部调用了resumeWith()
@InlineOnly public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
//step3:最终会调用到BaseContinuationImpl的resumeWith()方法中
internal abstract class BaseContinuationImpl(...) {
// 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由编译生成的协程相关类来实现,例如 CoroutineExample$main$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
最终调用到 continuation.resumeWith() 而 resumeWith() 中会调用 invokeSuspend,即之前编译器生成的 SuspendLambda 中的 invokeSuspend 方法:
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineExample var10000 = CoroutineExample.this;
this.label = 1;
if (var10000.request(this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
}
}
这段代码是一个状态机机制,每一个挂起点都是一种状态,协程恢复只是跳转到下一个状态,挂起点将执行过程分割成多个片段,利用状态机的机制保证各个片段按顺序执行。
可以看到 协程非阻塞的异步底层实现其实就是一种Callback回调 (这一点我们在介绍Continuation时有提到过),只不过有多个挂起点时就会有多个Callback回调,这里协程把多个Callback回调封装成了一个状态机。
以上就是协程的启动过程,下面我们再来看下协程中的重点挂起和恢复。
协程的挂起与恢复
协程的挂起和恢复有两个关键方法 : invokeSuspend() 和 resumeWith(Result)。我们以上一节中的例子,反编译后逆向剖析协程的挂起和恢复,先整体看下是怎样的一个过程。
suspend fun reqeust(): String {
delay(2000)
return "result from request"
}
反编译后的代码如下(为了方便理解,代码有删减和修改):
//1.函数返回值由String变成Object,入参也增加了Continuation参数
public final Object reqeust(@NotNull Continuation completion) {
//2.通过completion创建一个ContinuationImpl,并且复写了invokeSuspend()
Object continuation;
if (completion instanceof <undefinedtype>){
continuation = <undefinedtype>completion
}else{
continuation = new ContinuationImpl(completion) {
Object result;
int label; //初始值为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return request(this);//又调用了request()方法
}
};
}
Object $result = (continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//状态机
//3.方法被恢复的时候又会走到这里,第一次进入case 0分支,label的值从0变为1,第二次进入就会走case 1分支
switch(continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
continuation.label = 1;
//4.delay()方法被suspend修饰,传入一个continuation回调,返回一个object结果。这个结果要么是`COROUTINE_SUSPENDED`,否则就是真实结果。
Object delay = DelayKt.delay(2000L, continuation)
if (delay == var4) {
//如果是 COROUTINE_SUSPENDED 则直接return,就不会往下执行了,request()被暂停了。
// 如果不是COROUTINE_SUSPENDED,则说明不需要挂起,就会break跳出switch语句。正常返回继续往下执行后续外部代码。
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result from request";
}
ResultKt.throwOnFailure($result)是 Kotlin 协程中的一个重要方法,主要用于处理协程的异常情况。它的主要作用是: (1)检查异常:它会检查传入的$result对象,如果这个对象是一个异常(即协程执行过程中抛出的异常),它会立即抛出这个异常。 (2)确保正常执行:如果$result不是异常,则继续正常执行后续代码。
挂起过程
函数返回值由 String 变成 Object,编译器自动增加了Continuation参数,相当于帮我们添加Callback。
根据 completion 创建了一个 ContinuationImpl(如果已经创建就直接用,避免重复创建),复写了 invokeSuspend() 方法,在这个方法里面它又调用了 request() 方法,这里又调用了一次自己(是不是很神奇),并且把 continuation 传递进去。
在 switch 语句中,label 的默认初始值为 0,第一次会进入 case 0 分支,delay() 是一个挂起函数,传入上面的 continuation 参数,会有一个 Object 类型的返回值。这个结果要么是COROUTINE_SUSPENDED,否则就是真实结果。
DelayKt.delay(2000, continuation)的返回结果如果是 COROUTINE_SUSPENDED , 则直接 return ,那么方法执行就被结束了,方法就被挂起了。
函数即便被 suspend 修饰了,但是也未必会挂起。需要里面的代码编译后有返回值为 COROUTINE_SUSPENDED 这样的标记位才可以。
协程的挂起实际是方法的挂起,本质是return。
恢复过程
因为 delay() 是 IO操作,在2000ms后就会通过传递给它的 continuation 回调回来。
回调到 ContinuationImpl 类的 resumeWith() 方法,会再次调用 invokeSuspend() 方法,进而再次调用 request() 方法。
即反编译代码中的这一段:
Object continuation;
if (completion instanceof <undefinedtype>){
continuation = <undefinedtype>completion
}else{
continuation = new ContinuationImpl(completion) {
Object result;
int label; //初始值为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return request(this);//又调用了request()方法
}
};
}
程序会再次进入switch语句,由于第一次在 case 0 时把 label = 1 赋值为1,所以这次会进入 case 1 分支,检查无异常之后,再次 break ,并且返回了结果result from request。
并且 request() 的返回值作为 invokeSuspend() 的返回值返回。重新被执行的时候就代表着方法被恢复了。
看到大家一定会疑问, 步骤2中 invokeSuspend() 是如何被再次调用呢? 我们都知道 ContinuationImpl 的父类是 BaseContinuationImpl,实际上ContinuationImpl中调用的resumeWith()是来自父类。
BaseContinuationImpl
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
//这个实现是最终的,用于展开 resumeWith 递归。
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
// 1.调用 invokeSuspend()方法执行,执行协程的真正运算逻辑,拿到返回值
val outcome = invokeSuspend(param)
// 2.如果返回的还是COROUTINE_SUSPENDED则提前结束
if (outcome == COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
if (completion is BaseContinuationImpl) {
//3.如果 completion 是 BaseContinuationImpl,内部还有suspend方法,则会进入循环递归,继续执行和恢复
current = completion
param = outcome
} else {
//4.否则是最顶层的completion,则会调用resumeWith恢复上一层并且return
// 这里实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
}
实际上任何一个挂起函数它在恢复的时候都会调到 BaseContinuationImpl 的 resumeWith() 方法里面。
一但 invokeSuspend() 方法被执行,那么 request() 又会再次被调用, invokeSuspend() 就会拿到 request() 的返回值,在 ContinuationImpl 里面根据 val outcome = invokeSuspend() 的返回值来判断我们的 request() 方法恢复了之后的操作。
如果 outcome 是 COROUTINE_SUSPENDED 常量(可能挂起函数中又返回了一个挂起函数),说明你即使被恢复了,执行了一下, if (outcome == COROUTINE_SUSPENDED) return但是立马又被挂起了,所以又 return 了。
如果本次恢复 outcome 是一个正常的结果,就会走到 completion.resumeWith(outcome),当前被挂起的方法已经被执行完了,实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法,那么协程就恢复了。
我们知道 request() 肯定是会被协程调用的(从上面反编译代码知道会传递一个Continuation completion参数),request() 方法恢复完了就会让协程completion.resumeWith()去恢复,所以说协程的恢复是方法的恢复,本质其实是 callback(resumeWith) 回调。
一张图总结一下:

协程的核心是挂起——恢复,挂起——恢复的本质是return & callback回调
协程挂起
我们说过协程启动后会调用到上面这个 resumeWith() 方法,接着调用其 invokeSuspend() 方法:
当 invokeSuspend() 返回 COROUTINE_SUSPENDED 后,就直接 return 终止执行了,此时协程被挂起。 当 invokeSuspend() 返回非 COROUTINE_SUSPENDED 后,说明协程体执行完毕了,对于 launch 启动的协程体,传入的 completion 是 AbstractCoroutine 子类对象,最终会调用其 AbstractCoroutine.resumeWith() 方法做一些状态改变之类的收尾逻辑。至此协程便执行完毕了。
协程恢复
这里我们接着看上面第一条:协程执行到挂起函数被挂起后,当这个挂起函数执行完毕后是怎么恢复协程的,以下面挂起函数为例:
private suspend fun login() = withContext(Dispatchers.IO) {
Thread.sleep(2000)
return@withContext true
}
通过反编译可以看到上面挂起函数中的函数体也被编译成了 SuspendLambda 的子类,创建其实例时也需要传入 Continuation 续体参数(调用该挂起函数的协程所在续体)。贴下 withContext 的源码:
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// 创建 new context
val oldContext = uCont.context
val newContext = oldContext + context
// 检查新上下文是否作废
newContext.ensureActive()
// 新上下文与旧上下文相同
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// 新调度程序与旧调度程序相同
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// 上下文有变化,所以这个线程需要更新
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// 使用新的调度程序
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
首先调用了 suspendCoroutineUninterceptedOrReturn 方法,看注释知道可以通过它来获取到当前的续体对象 uCont, 接着有几条分支调用,但最终都是会通过续体对象来创建挂起函数体对应的 SuspendLambda 对象,并执行其 invokeSuspend() 方法,在其执行完毕后调用 uCont.resume() 来恢复协程,具体逻辑大家感兴趣可以自己跟代码,与前面大同小异。
至于其他的顶层挂起函数如 await(), suspendCoroutine(), suspendCancellableCoroutine() 等,其内部也是通过 suspendCoroutineUninterceptedOrReturn() 来获取到当前的续体对象,以便在挂起函数体执行完毕后,能通过这个续体对象恢复协程执行。
Desktop平台举例分析挂起恢复(Kotlin 2.1.0)
Kotlin测试代码如下:
class MySimpleTest {
suspend fun stephenTest(): String {
delay(500L)
return "result From stephenTest"
}
}
fun callFromOutside() {
CoroutineScope(Dispatchers.IO).launch {
val result = MySimpleTest().stephenTest()
println(result)
}
}
在callFromOutside函数中,我们创建了一个协程作用域,并在其中启动了一个协程。该协程将调用stephenTest函数。而stephenTest函数是一个挂起函数,它会暂停该协程的执行,直到delay函数返回。
将这个片段反编译成java代码,删掉导包和元数据注解信息等,分析过程见注释流程号:
public final class MySimpleTest {
public static final int $stable;
@Nullable
// (8)stephenTest函数本来是无参的,现在有一个Continuation类型的参数
// 这个就是外部调用代码块封装成的实例,stephenTest方法执行完毕,需要继续往下执行的代码都在这个对象里面
public final Object stephenTest(@NotNull Continuation $completion) {
Continuation $continuation;
// (9)label20: 是一个Java中的标签(label),主要用于控制流程跳转。在这里它被用来实现协程的挂起和恢复机制
label20: {
if ($completion instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)$completion;
// (10)用于检查当前协程是否处于挂起状态。Integer.MIN_VALUE 是一个特殊的标志位,用于标记协程是否被挂起。
// 它的值是10000000 00000000 00000000 00000000
// label首次传进来是1,即00000000 00000000 00000000 00000001,和Integer.MIN_VALUE按位与的结果为0,表示需要挂起,会走到11步,基于外部传入的 completion 对象创建一个新的ContinuationImpl对象
//=======================分割线====================
// (16)这里的label在15步被赋值成了10000000 00000000 00000000 00000001,按位与的结果是Integer.MIN_VALUE,即条件检查结果为真(即 != 0)
// 10000000 00000000 00000000 00000001减去Integer.MIN_VALUE,结果是1,即00000000 00000000 00000000 00000001
// 并将label20标签跳出循环,继续往下执行stephenTest的switch状态判断
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label20;
}
}
// (11)开始创建关于stephenTest代码块的ContinuationImpl对象,用于传递给下一个suspend函数
$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
// 初始值为0
int label;
@Nullable
// (13)delay执行完,调用resumeWith,触发这个invokeSuspend方法
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
//(14)将label = 1和Integer.MIN_VALUE按位或,
// 运算的结果是 10000000 00000000 00000000 00000001(即 -2147483647)
this.label |= Integer.MIN_VALUE;
//(15)重入调用stephenTest函数,这次是传入 $continuation 自己作为参数。
return MySimpleTest.this.stephenTest((Continuation)this);
}
};
}
Object $result = $continuation.result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
// (17) 本轮调用中,label值为1,检查无异常后,就会返回这个字符串
// "result From stephenTest"
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.label = 1;
// (12)调用delay函数,之后就和外部调用的(3)-(7)步流程一样.
// 传入ContinuationImpl对象,delay函数内部会判断是否需要挂起,如果需要挂起,就return掉本轮stephenTest方法的调用
// 进入了delay内部执行,等500ms过后,调用外部传进来的ContinuationImpl对象的 resumeWith 函数回调
// 而resumeWith方法,必然会调用到这个ContinuationImpl 对象自己的invokeSuspend方法,就跳转到第13步了
if (DelayKt.delay(500L, $continuation) == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result From stephenTest";
}
}
// CoroutineTestKt.java
public final class CoroutineTestKt {
public static final void callFromOutside() {
// (1)分析入口,从最外部的调用开始
BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getIO()), (CoroutineContext)null, (CoroutineStart)null, new Function2((Continuation)null) {
// (2)函数代码块里的任务,被封装在了继承自Continuation的一个匿名内部类对象中
// launch开始后,进入就会调用其invoke方法,并首次执行invokeSuspend方法,这时候label为0
int label;
// (18) 17步返回后,标志着 stephenTest 方法中 $continuation实例的invokeSuspend方法调用完毕
// 将调用completion的invokeSuspend方法
// (19)这时候外部的这个label值也已经为1了,就是继续往下执行了
public final Object invokeSuspend(Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
// (3)通过label来判断当前是到了哪一个状态
switch (this.label) {
case 0:
// (4)首先检查异常
ResultKt.throwOnFailure($result);
// (5)创建一个MySimpleTest对象,并调用其stephenTest方法
var10000 = new MySimpleTest();
// (6)将这个匿名内部类自己传进去,作为参数
Continuation var10001 = (Continuation)this;
// 将label状态设置为1,等下次再次调用invokeSuspend就会走switch的1的分支
this.label = 1;
var10000 = (MySimpleTest)var10000.stephenTest(var10001);
// (7) 如果 stephenTest 这个方法的返回值是COROUTINE_SUSPENDED,则表示该函数已暂停,我们也返回COROUTINE_SUSPENDED给调用者
// 通知这个函数是挂起函数,暂时不往下执行了
if (var10000 == var3) {
return var3;
}
// 转到MySimpleTest这个类分析 ->(8)
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = (MySimpleTest)$result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
// (20)挂起和恢复流程执行完毕,打印结果
String result = (String)var10000;
System.out.println(result);
return Unit.INSTANCE;
}
public final Continuation create(Object value, Continuation $completion) {
return (Continuation)(new <anonymous constructor>($completion));
}
public final Object invoke(CoroutineScope p1, Continuation p2) {
return ((<undefinedtype>)this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
}
// $FF: synthetic method
// $FF: bridge method
public Object invoke(Object p1, Object p2) {
return this.invoke((CoroutineScope)p1, (Continuation)p2);
}
}, 3, (Object)null);
}
}
以上的分析流程就是协程的挂起恢复过程。
自动的线程切换
引例
在Android上使用协程,从本地读取一个字符串,或其他耗时逻辑,可能会写下这样的代码:
// MainViewModel.kt
suspend fun getLocalString() = withContext(Dispatchers.IO) {
// 模拟IO操作
Thread.sleep(2000)
"result from local"
}
// MainActivity.kt
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 开启协程
lifecycleScope.launch {
val result = getLocalString()
tv_result.text = result
}
}
}
getLocalString() 方法,我们设置了上下文为IO,很明显直觉上会在 IO 线程中执行。
在 MainActivity 中,我们通过 lifecycleScope.launch 开启了一个协程,协程中调用 getLocalString() 方法,最初为主线程环境,是怎么从主线程切换到IO线程来运行这个方法的呢?
ContinuationInterceptor
Kotlin协程实现自动线程切换的核心在于其调度器(Dispatcher)机制,而调度器 Dispatcher 就是 ContinuationInterceptor 的实现。
ContinuationInterceptor 接口(简化):
public interface ContinuationInterceptor : CoroutineContext.Element {
// 拦截 Continuation 的恢复
fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
}
AbstractCoroutine (例如 StandaloneCoroutine) 的 resumeWith 方法:
当协程需要恢复时,通常会调用 Continuation 的 resumeWith 方法。在协程的底层实现中,例如 AbstractCoroutine (所有 Job 的子类,如 StandaloneCoroutine 继承的基类),其 resumeWith 方法会检查 CoroutineContext 中是否存在 ContinuationInterceptor。
// AbstractCoroutine.kt (简化)
override fun resumeWith(result: Result<T>) {
val context = this.context
val dispatcher = context[ContinuationInterceptor] as? ContinuationInterceptor
if (dispatcher == null) {
// 没有调度器,直接在当前线程执行
dispatchResume(result)
} else {
// 有调度器,通过调度器来分发恢复操作
dispatcher.dispatch(this, result) // 最终会调用 dispatchResume
}
}
Dispatcher 的 dispatch 方法
Dispatcher 的 dispatch 方法是实际执行线程切换的地方。不同的调度器有不同的实现。
Dispatchers.Default (例如 DefaultScheduler.kt):
Dispatchers.Default 通常使用一个共享的线程池来执行协程。
// DefaultScheduler.kt (简化)
internal object DefaultScheduler : CoroutineDispatcher(), Executor {
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 将 block (即恢复协程的Runnable) 提交到默认的线程池
DefaultExecutor.enqueue(block) // DefaultExecutor 是一个线程池
}
// ... 其他方法
}
当 dispatch 被调用时,它会将协程的恢复逻辑封装在一个 Runnable 中,然后提交给调度器底层的 线程池 。这个 Runnable 最终会在线程池中的某个线程上执行,从而实现了线程的切换。
Dispatchers.IO通常会使用一个独立的、容量更大的线程池,用于处理 IO 密集型任务。其dispatch逻辑与Default类似,只是提交给不同的线程池。Dispatchers.Main在 Android 上通常会与主线程的 Looper 绑定。
// AndroidMainDispatcherFactory.kt (简化)
internal class AndroidMainDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
// ...
return HandlerContext(Looper.getMainLooper(), "Main") // 包装了 Looper
}
}
// HandlerDispatcher.kt (简化)
class HandlerContext(...) : MainCoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 将 block 提交到 Looper 关联的 Handler
handler.post(block)
}
// ...
}
上面的代码可以看出, Dispatchers.Main 会将协程的恢复操作通过 Android Handler 的 post 方法发送到主线程的消息队列,从而确保协程在主线程上恢复执行。
suspendCoroutine / suspendCancellableCoroutine
除了编译器自动生成的挂起点,我们也可以手动创建挂起点,这通常通过 suspendCoroutine 或 suspendCancellableCoroutine 函数实现。
suspend fun manualSuspendExample(): String = suspendCancellableCoroutine { continuation ->
// 可以在这里执行一些异步操作
Thread {
Thread.sleep(1000)
continuation.resume("Resumed from another thread") // 在另一个线程调用 resume
}.start()
}
这里 continuation.resume(...) 的调用是关键。当这个 resume 被调用时,它会触发之前提到的 ContinuationInterceptor 机制,如果 CoroutineContext 中有调度器,就会通过调度器进行线程切换。
总结挂起和线程切换流程
- 协程挂起: 当协程遇到
suspend函数(如delay,或自定义的挂起函数),如果该函数需要等待某个异步操作完成,它会返回COROUTINE_SUSPENDED,并将当前的执行上下文(Continuation)保存起来。 - 异步操作完成: 当异步操作完成时(例如网络请求返回数据,或
delay时间到),会调用之前保存的Continuation对象的resumeWith方法。 - 调度器介入:
resumeWith方法会检查协程的CoroutineContext中是否存在ContinuationInterceptor(即Dispatcher)。 - 分发恢复: 如果存在调度器,
resumeWith会调用调度器的dispatch方法。 - 线程切换: 调度器的
dispatch方法会将协程的恢复逻辑(一个Runnable)提交到其管理的线程(如线程池中的线程,或 Android 主线程)。 - 协程恢复:
Runnable在目标线程上执行,调用实际的恢复逻辑,协程从挂起的地方继续执行。
通过这种 CPS转换 + 回调resume + 调度器拦截 的机制,Kotlin 协程得以在不阻塞线程的情况下,根据需要自动在不同的线程之间切换执行,从而实现高效的并发编程。