协程调用流程全梳理
Kotlin-Coroutine-协程调用流程全梳理
先从一切的基础看起,要在一个常规方法中启动一个协程方法,首先我们会调用runBlocking
这个函数,因此来看看它的实现吧
协程启动流程
actual关键字
这里 actual
关键字与expect
对应,表示在某种平台上的具体实现
还是先放上流程总结
三要素:
- EventLoop:当前线程ThreadLocal中获取得到的(null or create)
- CoroutineContext:基于EventLoop以及contextInterceptor(可null)创建一个新的CoroutineContext
- Coroutines: 新建一个BlockingCoroutine(AbstractCoroutine)来将这个block块入队到具体的该EventLoop中
@Throws(InterruptedException::class)
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
// Step1. 通知编译器协程lambda代码块执行次数
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
从方法调用上,没有什么问题,两个参数,协程上下文以及要调用的协程代码块,但有个问题很有趣
为什么代码中可以直接调用runBlocking {}
而不需要指明CoroutineContext?
这是为什么?从代码层面来说这行不通啊,因为context没有默认实现啊
其实这是在另一个地方声明了expect的方法(kotlinx-coroutines-core/concurrent/Builders.concurrent.kt
)
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
这其实与expect
与actual
的语法有关,同时涉及到Kotlin Multiplatform相关内容。在Kotlin Multiplatform中,expect可以有默认的实现,但不允许actual有默认实现。实际编译时,允许具体平台中使用在expect中的默认构造。这跟Kotlin的设计有关,默认参数在 Kotlin 中是调用端行为(call-site behavior),不是实现端行为。默认值由调用端控制,平台实现不得提供新默认值,这也保证了编译器行为一致。同时如果允许actual中有默认值也会产生问题,因为在编译器将getAppName
展开为getAppName("123")
,即expect中声明的默认值,而不会再次进行编译。
为了验证这一点,我还专门体验了Kotlin Multiplatform App, 非常有意思!
因此可以看出,context在未传入时的默认实现是EmptyCoroutineContext
本质是CoroutineContext
CoroutineContext是一个基础接口,可以理解为包含了多个Context实例的集合
public interface Element : CoroutineContext {
而Element也是基于CoroutineContext,类似于是一个具体的实例
编译器标记
查阅GPT后发现这是一个标记性函数,用于给编译器描述函数行为的契约Contract,本身在运行时什么都不做,只是通知编译器。
/**
* Specifies the contract of a function.
* The contract description must be at the beginning of a function and have at least one effect.
* Only the top-level functions can have a contract for now.
*/
@ContractsDsl
@ExperimentalContracts
@InlineOnly
@SinceKotlin("1.3")
public inline fun contract(builder: ContractBuilder.() -> Unit) { }
@ContractsDsl public fun <R> callsInPlace(lambda: Function<R>, kind: InvocationKind = InvocationKind.UNKNOWN): CallsInPlace
这个函数是来告诉编译器,我这个lambda是在函数内调用的。定义了这个lambda函数在函数题中被调用的次数
这里用了InvocationKind.EXACTLY_ONCE
,也就是告诉编译器,我只会在函数内调用一次lambda。因为编译器可以把变量从不可知状态推断为已初始化
ContinuationInterceptor
获取contextInterceptor
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
获取当前线程以及协程上下文相关的Interceptor,默认情况下是null
你可能会好奇为什么能用context[ContinuationInterceptor]
,这也非常有意思
为什么能用context[ContinuationInterceptor]
阅读源码会发现,CoroutineContext中的operator的get方法,需要的参数是key: Key<E>
,但ContinuationInterceptor其实是一个CoroutineContext.Element,不是Key。为什么可以这样?
因为ContinuationInterceptor 类型中包含一个 companion object,companion object 可以被自动解析为类名。所以是可行的。(注意,Kotlin中一个类(或接口)只能有一个 companion object,这也是当然的)。所以我们最初时定义的类的静态变量也是用的这种方式companion object {},现在想想其实本质就是这样的。
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
EventLoop
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
}
internal object ThreadLocalEventLoop {
private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
internal fun currentOrNull(): EventLoop? =
ref.get()
internal fun resetEventLoop() {
ref.set(null)
}
internal fun setEventLoop(eventLoop: EventLoop) {
ref.set(eventLoop)
}
}
internal actual fun<T> commonThreadLocal(name: Symbol): CommonThreadLocal<T> = ThreadLocal()
internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
internal abstract class EventLoop : CoroutineDispatcher()
常规情况下,从ThreadLocalEventLoop中拿到eventLoop。本质上就是从线程的ThreadLocal中拿EventLoop这个对象,如果没有则创建一个(默认没有)。
可以看到实际上是创建了BlockingEventLoop
,至于它是什么,我们后面再说。EventLoop可以看作是协程上下文调度器的一个抽象基类,猜测应该是有类似队列,循环调度之类的功能,其本质是一个CoroutineContext.Element也是一个CoroutineContext
创建CoroutineContext
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
这里就通过GlobalScope,基于传递的context以及eventloop创建了一个新的协程上下文。由于context是一个CoroutineContext, 且EventLoop实质也是CoroutineContext类型,因此符合其定义的operator方法plus,而在EmptyCoroutineContext中的实现为
public override fun plus(context: CoroutineContext): CoroutineContext = context
因此可以看到这个方法最终在常规情况下返回到是eventLoop
@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = foldCopies(coroutineContext, context, true)
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
可以看到GlobalScope本质就是一个EmptyCoroutineContext
这里newCoroutineContext做了这样的事。foldCopies: 将两个context以某种规则进行处理,具体为合并两个协程上下文,并确保线程局部变量能正确地被复制或合并到新协程中。
这里因为在常规情况下就返回originalContext + appendContext,实际就是EmptyCoroutineContext.plus(BlockingEventLoop), 最终得到的就是BlockingEventLoop
而BlockingEventLoop实际又基于CoroutineDispatcher -> AbstractCoroutineContextElement(ContinuationInterceptor),所以combined[ContinuationInterceptor]不为空。(这部分可具体看代码验证)
因此最终返回的还是BlockingEventLoop,这里的newContext也就是BlockingEventLoop
好的,暂时忽略掉else后的语句,看下文
AbstractCoroutine
BlockingCoroutine
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
BlockingCoroutine本质是一个AbstractCoroutine,它是Coroutines实现的抽象基类
private class BlockingCoroutine<T>(
parentContext: CoroutineContext,
private val blockedThread: Thread,
private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true, true)
@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
init {
/*
* Setup parent-child relationship between the parent in the context and the current coroutine.
* It may cause this coroutine to become _cancelling_ if the parent is already cancelled.
* It is dangerous to install parent-child relationship here if the coroutine class
* operates its state from within onCancelled or onCancelling
* (with exceptions for rx integrations that can't have any parent)
*/
if (initParentJob) initParentJob(parentContext[Job])
}
可以看到BlockingCoroutine会初始化ParentJob
这里parentContext其实就是BlockingEventLoop,不存在Job的Key,因此会指定
protected fun initParentJob(parent: Job?) {
assert { parentHandle == null }
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
...
}
AbstractCoroutine
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
}
发现这里十分的巧妙,仔细看start并不是一个内部的方法,而是一个参数 CoroutineStart, 本质是一个枚举
他对应的是协程的不同几种启动方式以及对应的启动方法
public enum class CoroutineStart {
DEFAULT,
LAZY,
ATOMIC,
UNDISPATCHED;
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
}
本质是 策略模式 + 扩展函数,operator fun invoke则赋予了这个枚举类能够以枚举的方式运行方法。
可以看到,对于一般的方式,我们是调用了对应suspend scope的startCoroutineCancellable方法
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
) = runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
dispatcherFailure(completion, e)
}
}
runSafely也就是在运行block块失败时会分发错误,这里不细究
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
@SinceKotlin("1.3")
internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
/** implementation of this function is replaced by debugger */
return completion
}
注意这里(suspend R.() -> T)也就是我们的SuspendLambda
, 参数completion是BlockingCoroutine
,而由于SuspendLambda本身基于BaseContinuationImpl,因此会调用create方法,也就是之前我们提到的自己码中重写的create方法。具体类似
public final kotlin.coroutines.Continuation<kotlin.Unit> create(java.lang.Object, kotlin.coroutines.Continuation<?>);
descriptor: (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
flags: (0x0011) ACC_PUBLIC, ACC_FINAL
Code:
stack=3, locals=3, args_size=3
0: new #2 // class org/example/Main3Kt$testAtomic$1
3: dup
4: aload_2
5: invokespecial #80 // Method "<init>":(Lkotlin/coroutines/Continuation;)V
8: checkcast #82 // class kotlin/coroutines/Continuation
11: areturn
也就是这里create方法创建了生成的基于SuspendLambda的testAtomic,我们知道在之前方法调用时,例如testAtomic内部,会创建一个Main3Kt\$testAtomic\$1(null)
,而后会在这里,真正的创建带有continuation的Main3Kt\$testAtomic\$1
实例,这里的continuation就是这个completion,也就是BlockingCoroutine
为什么会创建两个testAtomic代理实例?
在 Kotlin 中,协程被编译成一个 SuspendLambda 子类,它维护自己的状态(label 字段),每次挂起和恢复都要重新构造“下一步”的 continuation 对象。
因此,可以理解为这里create后才创建的是真正的SuspendLambda实现类
Intercepted
在创建SuspendLambda对象后会调用到intercepted方法 (SuspendLambda -> ContinuationImpl)
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
// ContinuationImpl.kt
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
这里可以看到,在我们构建SuspendLambda时,若只传入了completion(也即是字节码create中实现的),那则会将completion的context作为_context。因此这里其实是使用的BlockingCoroutine中的context,溯源会发现就是一开始我们传入的EventLoop.至于为什么还有点模糊
后我们在BlockingEventLoop中看interceptContinuation方法,会发现其是接口ContinuationInterceptor定义的方法,而BlockingEventLoop -> EventLoop -> CoroutineDispather -> ContinuationInterceptor,
在CoroutineDispatcher中,有
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
this即位CoroutineDispatcher,而continuation为SuspendLambda,最终实际返回了一个DispatchedContinuation
看看其父类,会发现有如下关系:
DispatchedContinuation -> DispatchedTask<T> -> SchedulerTask<T> -> Runnable
诶,似乎看起来就是一个任务的Task?而这里构造时传入的continuation,也就是这个SuspendLambda,诶,是不是觉得有点通了?接下来似乎就只需要看如何把这个Task以某种方式安排并在某个时刻启动就行了?
因此我们再来看最后一步:resumeCancellableWith
这也是一个基于Continuation扩展函数,并带了一个Result.success(Unit),也就是执行成功后什么也不返回
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
这里已经确定是类型为DispatchedContinuation,所以直接进入到resumeCancellableWith
// DispatchedContinuation
@Suppress("NOTHING_TO_INLINE")
internal inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.safeIsDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.safeDispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
进入到DispatchedContinuation内部,发现其依据dispatcher来判断是否要分派这个任务,看是立即执行还是进行分派
而dispatcher实际是BlockingEventLoop -> EventLoop -> CoroutineDispatcher中定义的open方法,默认是需要分派的
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
因此这里会使用dispatcher进行分发,根据BlockingEventLoop的继承关系,会发现在EventLoopImplBase中有实现该方法
final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
open fun enqueue(task: Runnable) {
// are there some delayed tasks that should execute before this one? If so, move them to the queue first.
enqueueDelayedTasks()
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
DefaultExecutor.enqueue(task)
}
}
- enqueue一个task前先看看有没有延时任务delayedTask需要添加?
- enqueue该task
Task Enqueue
核心原理也就是基于CAS实现的安全队列将这个Task添加到其中
@Suppress("UNCHECKED_CAST")
private fun enqueueImpl(task: Runnable): Boolean {
_queue.loop { queue ->
if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
when (queue) {
null -> if (_queue.compareAndSet(null, task)) return true
is Queue<*> -> {
when ((queue as Queue<Runnable>).addLast(task)) {
Queue.ADD_SUCCESS -> return true
Queue.ADD_CLOSED -> return false
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
}
}
else -> when {
queue === CLOSED_EMPTY -> return false
else -> {
// update to full-blown queue to add one more
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
newQueue.addLast(task)
if (_queue.compareAndSet(queue, newQueue)) return true
}
}
}
}
}
这里需要注意,_queue是基于EventLoopImplBase下的atomic对象,_queue中存储的对象都是需要立即执行的。实际使用中时是Queue<T>, 实际上是LockFreeTaskQueueCore<T>, 这是一个仅由CAS实现的线程安全的队列,如果要探究其原理,请在这篇文章中了解~
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)
// Allocated only only once
private val _delayed = atomic<DelayedTaskQueue?>(null)
DelayedTaskEnqueue
再来看之前到enqueue delayed task流程
private fun enqueueDelayedTasks() {
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
}
获取到delayed列表,当schedule的时间>=当前时间时,就把指定任务enqueue到_queue中。但这里的前提应该是delayed task已经添加到delayed列表中。这一点我们后续也会提到
unpark
internal actual abstract class EventLoopImplPlatform: EventLoop() {
protected actual fun unpark() {
val thread = thread // atomic read
if (Thread.currentThread() !== thread)
unpark(thread)
}
在enqueue完任务后,则会通过unpark来唤醒调度线程执行任务。
好,现在明面上的代码流程都走完了,那这个queue应该是在EventLoop某处一直被轮训,就像Handler中的MessageQueue一样,因此我们需要关注任务是何时被取出以及执行的呢?
在runBlocking的源码中,还有最后一句
return coroutine.joinBlocking()
似乎是就让主线程等待这个blocking执行完毕?让我们看看到底做了什么
// BlockingCoroutine
@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
@Suppress("DEPRECATION")
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// now return result
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
registerTimeLoopThread作用是什么?
registerTimeLoopThread() 的作用是将当前线程注册为“时间事件循环线程”(time loop thread),让其能够驱动协程中的定时器(比如 delay() 或 withTimeout())和调度器正确运行。允许 SystemTimeSource.markNow() 在这个线程内做快速时间戳缓存
看到while这部分的核心逻辑,是不是觉得有点熟悉了?(注意:这里的一切操作都是在启动线程中(此处是主线程)中执行)
- 启动一个while true循环
- 判断线程是否被取消?如果是,则取消掉
- EventLoop来执行执行Event,并返回下一次该唤醒该调度线程的时间间隔
- 定时休眠该线程
- 完成后,取出state并返回result
runBlocking核心
其实runBlocking的核心流程就是在当前线程中基于eventloop去取出待执行的任务,或者park该线程,直到任务执行完毕.
到这里,我们彻底走完了runBlocking的流程!但还有一些细节,我们还没讨论完,因此继续走
EventLoop.processNextEvent
// EventLoopImplBase
override fun processNextEvent(): Long {
// unconfined events take priority
if (processUnconfinedEvent()) return 0
// queue all delayed tasks that are due to be executed
enqueueDelayedTasks()
// then process one event from queue
val task = dequeue()
if (task != null) {
platformAutoreleasePool { task.run() }
return 0
}
return nextTime
}
internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
// EventLoopImplBase
@Suppress("UNCHECKED_CAST")
private fun dequeue(): Runnable? {
_queue.loop { queue ->
when (queue) {
null -> return null
is Queue<*> -> {
val result = (queue as Queue<Runnable>).removeFirstOrNull()
if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
_queue.compareAndSet(queue, queue.next())
}
else -> when {
queue === CLOSED_EMPTY -> return null
else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
}
}
}
}
大致逻辑是:
- 优先处理Unconfined类型的Event(在unconfinedQueue中)
- 调用enqueueDelayedTasks,将delayedQueue中需要入队的task入队
- 取出task,需要注意的是queue是CAS的存取
- run这个task
- 返回下一次任务执行的时间间隔。如果queue中有值则返回0,如果delayedtask中有值则返回delayedTask的值与当前时间的差值,这里调用的是nanoTime,单位是纳秒(
)
override val nextTime: Long
get() {
if (super.nextTime == 0L) return 0L
val queue = _queue.value
when {
queue === null -> {} // empty queue -- proceed
queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
else -> return 0 // non-empty queue
}
val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
}
如何启动Task?
我们知道task.run是调度线程中启动的,但实际可能是多线程的情况呀?可能在UI线程或IO线程中运行这个task,那它是怎么做到的呢?让我们再看看task的启动
回忆我们之前所探究的,task是存在于queue中的,本质是DispatchedContinuation -> DispatchedTask<T> -> SchedulerTask<T> -> Runnable,因此我们看看run方法在哪里重写了呢?
从关系链中梳理,发现在DispatchedTask有重写run方法
final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: DispatchException) {
handleCoroutineException(delegate.context, e.cause)
} catch (e: Throwable) {
handleFatalException(e)
}
}
这里delegate成员变量在DispatchedContinuation中就是this
override val delegate: Continuation<T>
get() = this
然后似乎是通过withContinuationContext启动了一个新的什么东西?线程?然后再执行
那我们看下withContinuationContext做了什么事情,首先要注意,它虽然是withxxxContext,但本质还是一个普通方法。
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
val context = continuation.context
val oldValue = updateThreadContext(context, countOrElement)
val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
// Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
continuation.updateUndispatchedCompletion(context, oldValue)
} else {
null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
}
try {
return block()
} finally {
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext()) {
restoreThreadContext(context, oldValue)
}
}
}
updateThreadContext
DispatchedContinuation
中存在一个变量countOrElement
@JvmField // pre-cached value to avoid ctx.fold on every resumption
internal val countOrElement = threadContextElements(context)
// ThreadContext.kt
// Counts ThreadContextElements in the context
// Any? here is Int | ThreadContextElement (when count is one)
private val countAll =
fun (countOrElement: Any?, element: CoroutineContext.Element): Any? {
if (element is ThreadContextElement<*>) {
val inCount = countOrElement as? Int ?: 1
return if (inCount == 0) element else inCount + 1
}
return countOrElement
}
countAll的作用是返回CoroutineContext中的特定值,countAll是一个方法函数,这一点需要注意一下
- 如果只存在一个ThreadContextElement,则直接返回
- 如果存在多个ThreadContextElemnt,则返回数目
默认情况下,countOrElement值为0
// countOrElement is pre-cached in dispatched continuation
// returns NO_THREAD_ELEMENTS if the contest does not have any ThreadContextElements
internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? {
@Suppress("NAME_SHADOWING")
val countOrElement = countOrElement ?: threadContextElements(context)
@Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS")
return when {
countOrElement === 0 -> NO_THREAD_ELEMENTS // very fast path when there are no active ThreadContextElements
// ^^^ identity comparison for speed, we know zero always has the same identity
countOrElement is Int -> {
// slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values
context.fold(ThreadState(context, countOrElement), updateState)
}
else -> {
// fast path for one ThreadContextElement (no allocations, no additional context scan)
@Suppress("UNCHECKED_CAST")
val element = countOrElement as ThreadContextElement<Any?>
element.updateThreadContext(context)
}
}
}
因此最终的oldValue值就为NO_THREAD_ELEMENTS,即不存在ThreadContextElements
然后则会执行block块
获取到Job这一个Element,若job是未激活状态
否则,调用continuation的resume方法,由于类型是SuspendLambda,因此定位到BaseContinuationImpl
中的resumeWith方法
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
// ContinuationImpl.kt
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
注意看,关键的一步来了,在while循环中,调用了invokeSupsend这个方法,是否有点熟悉?之前在基于SuspendLambda的实现类中,有实现这个方法
这个方法本质定义在BaseContinuaztionImpl中
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
在实现类中,类似于如下:
public final Object invokeSuspend(Object var1) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(var1);
String var2 = "123";
System.out.println(var2);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
如果outcome != COROUTINE_SUSPENDED,则说明成功执行了,则返回Result.success(outcome)否则为Result.failure(exception)
Outcome == COROUTINE_SUSPENDED时,则打断,这里我们后续会讲到
因此到这里,我们基本上走完了runBlocking的流程!
协程阻塞后是如何返回的
以上我们已经大致梳理出了协程是如何被启动的,那Android中使用协程往往意味着会有一些IO操作以及阻塞。而我们实际使用时,用的一些三方库,比如Room、Retrofit2等,已经对IO方法内置了suspend语法,所以直接调用就好,但我们其实仍然不知道如何在协程中启动一个常规的IO阻塞方法并且阻塞。
如何将常规的IO阻塞操作包装为协程
比如我们现在想自己用协程实现一个阻塞操作,我们该怎么做?其实自己之前一直没有想过。
可以想想,我们之前知道协程家庭中规定Continuation接口,就是负责协程的状态恢复,有一个resume的方法。
这里我们就不研究具体三方库的实现原理了,直接给出结论。
suspend fun fakeNetworkCall() : String = suspendCoroutine { continuation ->
Thread {
println("Fake Thread Begin: ${Thread.currentThread().name}")
Thread.sleep(5000)
continuation.resume("Success")
println("Fake Thread End: ${Thread.currentThread().name}")
}.start()
}
所以本质上,我们也是新创建了一个线程来执行IO操作。
因此我们是
- suspendCoroutine声明了一个阻塞的协程方法,这个方法体中会返回continuation
- 新开一个线程并进行休眠操作,来模拟IO阻塞
- 使用continuation来返回到执行点
Continuation
来看下Continuation的接口定义
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
从定义上,可以看出,其代表的是在挂起后 恢复并返回一个具体的值
泛型的类型投影 in 逆变 Contravariant
这里需要回顾下Kotlin中泛型的类型投影关键字,包括
- out 协变 Covariant: 只能读 不能写
- in 逆变 Contravariant: 只能写 不能读
suspendCoroutine
关于这里的suspendCoroutine
,我们其实在一些地方已经见过了。他的作用也就是获取挂起函数中当前的Continuation实例,并挂起当前正在运行的协程
/**
* Obtains the current continuation instance inside suspend functions and suspends
* the currently running coroutine.
*
* In this function both [Continuation.resume] and [Continuation.resumeWithException] can be used either synchronously in
* the same stack-frame where the suspension function is run or asynchronously later in the same thread or
* from a different thread of execution. Subsequent invocation of any resume function will produce an [IllegalStateException].
*/
@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
}
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}
这里suspendCoroutineUninterceptedOrReturn会由编译器后续进行字节码层面的补充。
可以看到在suspendCoroutine中将continuation包装为一个SafeContinuation,这个我们也会在后面讲到。
回顾协程如何被挂起
从前文中,我们已经知道协程是如何被挂起的,核心就是基于状态机,但也需要基于字节码层去看
对于如下的代码,来看看其class文件
suspend fun fakeNetworkCall() : String = suspendCoroutine { continuation ->
Thread {
Thread.sleep(5000)
continuation.resume("Success")
}.start()
}
fun main() {
runBlocking(Dispatchers.Default) {
launch(Dispatchers.Default) {
val str = fakeNetworkCall()
println(str)
}
}
}
对于其字节码,我们直接反编译来看看java层代码(这里应该很熟悉了),也就是在SuspendLambda的实现类中,实现了invokeSuspend,并在这个方法中进行一些状态的判断。
public final Object invokeSuspend(Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
Continuation var4 = (Continuation)this;
this.label = 1;
var10000 = CustomCoroutineKt.fakeNetworkCall(var4);
if (var10000 == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
String str = (String)var10000;
System.out.println(str);
return Unit.INSTANCE;
}
同时再回顾一下,对于所有suspend方法在字节码层面都会默认地,额外增加一个continuation的参数,并会返回表示协程阻塞状态的值
这里调用fakeNetworkCall方法时,则会把当前对象this传递给这个方法中并判断是否协程是否发生了阻塞。而当这个invokeSuspend返回时,让我们来快速回顾一下
一个协程被提交时,其链如下:
continuation[DispatchedContinuation].resumeWith -> CoroutineScheduler -> Worker -> while(findTask) (park if needed) -> task.run -> DispatchedTask.run -> continuation[SuspendLambda(actually BaseContinuationImpl)].resume -> ($xxxx$1.class)invokeSuspend
这里补充一个调用链的图
需要注意的是,两次resumeWith的调用对象以及调用方法是不同的,第一次resume只是分派,而第二次resume是为了执行具体代码。
探究协程如何恢复
因此我们猜测这里fakeNetworkCall启动一个线程后,会将当前协程状态值置为suspend,返回再返回到上层时同样返回suspend,BaseContinuationImpl.resume在中检测到invokeSuspend返回值为suspend时则会直接return,最终一直回退,返回到Worker的while中,要么阻塞要么继续执行下一个任务。
因此我们来看下对这个suspend 方法,编译器是如何处理的, 反编译后代码如下
public static final Object fakeNetworkCall(@NotNull Continuation $completion) {
SafeContinuation var2 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
final Continuation continuation = (Continuation)var2;
int var4 = false;
Class var5 = continuation.getClass();
System.out.println(var5);
(new Thread((Runnable)(new Runnable() {
public final void run() {
Thread.sleep(5000L);
Result.Companion var10001 = Result.Companion;
continuation.resumeWith(Result.constructor-impl("Success"));
}
}))).start();
Object var10000 = var2.getOrThrow();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
return var10000;
}
即先包装了一个SafeContinuation,而后开启一个线程,并调用SafeContinuation中的getOrThrow方法,实质就是通过CAS方法改变其中的状态值result为suspend。并且需要注意,这个SafeContinuation会在新开的线程中调用,也就是调用SafeContinuation的resumeWith方法
@PublishedApi
internal actual fun getOrThrow(): Any? {
var result = this.result // atomic read
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Result.Failure -> throw result.exception
else -> result // either COROUTINE_SUSPENDED or data
}
}
lambda字节码小知识
实际查看生成的字节码会发现还有一个fakeNetWork$2$1的class文件,但注意他跟协程没有关系,因为这是Java8 Lambda特性引入。使用匿名内部类时编译器会自动生成实体类。其实就是一个基于Runnable的实现类。这里不要混淆了
而在新的线程中,Thread sleep后同样调用了SafeContinuation的resumeWith方法,核心逻辑就是当检测到内部的result状态量为suspend时,则通过代理的Continuation[DispatchedContinuation]调用resumeWith方法,再次由Dispatcher指定某个线程来完成这个“新”任务。
public actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
val cur = this.result // atomic read
when {
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
某个Worker再次运行到这个task时,调用到invokeSuspend方法,这次因为状态(label)已经改变,因此会调用到后续的代码,将result值赋值为resume过来的值,执行println代码并返回。
因此到这里,我们完成了协程从开启、阻塞到恢复到完整流程!
协程恢复后的线程问题
要注意的是,协程恢复后,再次执行时,其线程可能是不一样的(都取自同一个Dispatcher线程池),但实际是由哪个线程执行是不确定的。这是由CoroutineScheduler的分派机制决定的。