Kotlin-Coroutines-delay的原理
delay的原理
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
实质是将一个CancellableContinuation 交给CoroutineContext中的delay来schedule
那CancellableContinuation是如何包装的呢?
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
/*
* For non-atomic cancellation we setup parent-child relationship immediately
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
* properly supports cancellation.
*/
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
这里注意,好像就像突然发现断了一条路,为什么这里直接throw了一个Error?
@SinceKotlin("1.3")
@InlineOnly
@Suppress("WRONG_INVOCATION_KIND", "UNUSED_PARAMETER", "RedundantSuspendModifier")
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}
其实这里是Kotlin编译器Backend在编译阶段会生成特殊的字节码,所谓的Intrinsic指令(内建指令),不是由Kotlin代码实现,而是硬编码在Backend的源码中。这里粗略查看Kotlin的源码,在backend包下确实也看到了该字段的声明。
举一个常见的例子,比如这样一段代码,我们知道实际的执行代码是包装在SuspendLambda实现类中的invokeSuspend方法中。
fun testAtomic() {
runBlocking {
println("begin")
delay(1000)
println("mid")
delay(2000)
println("end")
}
}
那其invokeSuspend的字节码是怎样的呢?用IDEA反编译下
public final Object invokeSuspend(Object $result) {
String var2;
label17: {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Continuation var10001;
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
var2 = "begin";
System.out.println(var2);
var10001 = (Continuation)this;
this.label = 1;
if (DelayKt.delay(1000L, var10001) == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
case 2:
ResultKt.throwOnFailure($result);
break label17;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var2 = "mid";
System.out.println(var2);
var10001 = (Continuation)this;
this.label = 2;
if (DelayKt.delay(2000L, var10001) == var3) {
return var3;
}
}
var2 = "end";
System.out.println(var2);
return Unit.INSTANCE;
}
基于状态机的基础,我们看下不同状态下的实现。
在需要delay时,实际调用了DelayKt.delay,并把delay时间以及Continuation传递了进去
important suspend fun参数与返回值问题
而为什么明明代码中只有delay(long)的声明,但实际调用时却多传递了一个Continuation?
这是因为针对所有的suspend方法,在编译为字节码时,都会自动多添加一个参数Continuation来表示挂起的上下文,并添加返回值Any?协程是否挂起靠这个返回值判断。例如,针对一个参数为long 返回值为Unit的的suspend fun
public static final java.lang.Object testAtomic2(long, kotlin.coroutines.Continuation<? super java.lang.String>);
descriptor: (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
flags: (0x0019) ACC_PUBLIC, ACC_STATIC, ACC_FINAL
核心也就是从CancellableContinuation的CoroutineContext中获取到ContinuationInterceptor的Element,并调用scheduleResumeAfterDelay方法
发现是在EventLoopImplBase中有实现
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
/*
* Order is important here: first we schedule the heap and only then
* publish it to continuation. Otherwise, `DelayedResumeTask` would
* have to know how to be disposed of even when it wasn't scheduled yet.
*/
schedule(now, task)
continuation.disposeOnCancellation(task)
}
}
}
因为时间精度很高,需要把毫秒转化为纳秒,包装为一个DelayedResumeTask并schedule
fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
else -> error("unexpected result")
}
}
private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
if (isCompleted) return SCHEDULE_COMPLETED
val delayedQueue = _delayed.value ?: run {
_delayed.compareAndSet(null, DelayedTaskQueue(now))
_delayed.value!!
}
return delayedTask.scheduleTask(now, delayedQueue, this)
}
这里可以看到,本质上就是把这个task通过CAS的方式塞给eventLoop中的_delayed这个队列
而要注意,delay队列并不是一个优先队列,而是一个普通的线程安全队列。而根据之前的梳理,我们知道在eventloop中会不断检查是否需要将delay队列的task加入到queue中,并执行。再添加完后并及时检查是否需要unpark调度线程来立即执行这个task
由于delay也是suspend fun,因此在delay方法运行完毕后,会返回一个代表状态的值,这里会进行判断,若为协程suspend的标志位IntrinsicsKt.getCOROUTINE_SUSPENDED(),则直接返回该标志为表明这个协程被suspend
而回顾之前我们所说的task执行流程,执行完毕后,比如对于BlockingCoroutine
,则基于eventLoop中下一次执行任务的间隔来park当前线程。因此达到了delay的目的。