Kotlin协程-CAS实现的安全队列
Kotlin协程-CAS实现的安全队列源码研读
引言
最近在啃Kotlin协程相关源码时,发现了其EventLoop内部的queue队列的安全实现方式。与Handler机制中的MessageQueue有些类似,但似乎又有不同。初步研读时发现自己对CAS相关知识还是不足。发现了这样一个小而精细的类,因此决定好好研读一下,同时增进下自己对CAS原理以及基于CAS的实际应用的理解。
相关代码版本
- 📖 org.jetbrains.kotlinx:kotlinx-coroutines-core: 1.10.1
- 📖 org.jetbrains.kotlinx:atomicfu: 0.19.0
引言
快速回顾下该队列的使用方式,其是在EventLoopImplBase中定义的成员变量queue
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)
private val _delayed = atomic<DelayedTaskQueue?>(null)
}
使用atomic函数定义的,实质是基于Kotlin atomicfu包
在EvnetLoop将协程scope入队时,则会尝试将其enqueue到这个queue中。在初始化时,就会将这个queue初始化为LockFreeTaskQueueCore
,这就是我们的主角了
回顾CAS加锁原理
CAS加锁原理是操作系统底层支持的原子操作,将读和更新一起完成,类似于数据库中的事务,要么成功,要么失败。
那接下来我们正式进入LockFreeTaskQueueCore的探究
结构
package kotlinx.coroutines.internal
/**
* Lock-free Multiply-Producer xxx-Consumer Queue core.
* @see LockFreeTaskQueue
*/
private typealias Core<E> = LockFreeTaskQueueCore<E>
internal class LockFreeTaskQueueCore<E : Any>(
private val capacity: Int,
private val singleConsumer: Boolean // true when there is only a single consumer (slightly faster)
) {
private val mask = capacity - 1
private val _next = atomic<Core<E>?>(null)
private val _state = atomic(0L)
private val array = atomicArrayOfNulls<Any?>(capacity)
}
首先注意到,这个类名称是LockFree,因此它是不需要锁的
其次,这是一个TaskQueue,作为一个Queue,必然是有进有出。在多线程中则表现为生产者-消费者的模式,是不是有点熟悉了?这是这里我们将一些加锁
因此我们只需要关注两个问题
- 队列容量:必须是2的倍数(init时后进行check)
- 是否只存在一个消费者:用于后续线程安全相关的判断
State
state是该CAS Queue的精髓,一切的操作都与State有关系。
可以看到,这里的它hold了一个Long型的_state变量
64位的Long,由Atomic保证了原子性,从低位到高位被分成了四个部分
- 1 -> 30: Head部分,表示消费者消费的索引
- 31 -> 60: Tail部分,表示生产者生产的索引
- 61: 标志位,当前队列是否被Frozen
- 62: 标志位,当前Queue是否关闭
- 63-64: 保留
Array
一个
操作
来看基于state的一些操作
inline fun <T> Long.withState(block: (head: Int, tail: Int) -> T): T {
val head = ((this and HEAD_MASK) shr HEAD_SHIFT).toInt()
val tail = ((this and TAIL_MASK) shr TAIL_SHIFT).toInt()
return block(head, tail)
}
这其实就是从state方法中取出head与tail的值,Int值表示,实际范围在
操作
入队 AddLast
fun addLast(element: E): Int {
_state.loop { state ->
if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add
state.withState { head, tail ->
val mask = this.mask // manually move instance field to local for performance
// If queue is Single-Consumer then there could be one element beyond head that we cannot overwrite,
// so we check for full queue with an extra margin of one element
if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
// If queue is Multi-Consumer then the consumer could still have not cleared element
// despite the above check for one free slot.
if (!singleConsumer && array[tail and mask].value != null) {
// There are two options in this situation
// 1. Spin-wait until consumer clears the slot
// 2. Freeze & resize to avoid spinning
// We use heuristic here to avoid memory-overallocation
// Freeze & reallocate when queue is small or more than half of the queue is used
if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1) {
return ADD_FROZEN
}
// otherwise spin
return@loop
}
val newTail = (tail + 1) and MAX_CAPACITY_MASK
if (_state.compareAndSet(state, state.updateTail(newTail))) {
// successfully added
array[tail and mask].value = element
// could have been frozen & copied before this item was set -- correct it by filling placeholder
var cur = this
while(true) {
if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet
cur = cur.next().fillPlaceholder(tail, element) ?: break
}
return ADD_SUCCESS // added successfully
}
}
}
}
这里我最开始理解这个方法的时候没太弄懂几处return的返回意义。其实是我没有注意到inline关键字,首先要理解inline fun的应用
inline fun在方法调用中的退出问题
要知道,inline fun在编译时,是直接将整段代码 copy 到调用处,因此在编译后,不再作为一个方法调用,因此这里的_state.loop完全 可以看作一个while循环。所以在return时,当直接return 数字时,则代表退出这个方法,而return@loop即代表仅仅终止当前循环,类似于continue。
在入队时,也就是一个生产者操作队列尝试添加一个元素,Queue做了这么几件事
- 启动一个while true的循环
- 检查state的标志,如果已经被Frozen或者Closed,则直接返回失败
- 拿到state的head和tail的索引值
- 如果(tail + 2) and mask == head and mask,就说明队列快满了(这次加一个元素,与生产者之间就只剩一个空元素了),则返回ADD_FROZEN,交给上层处理(实际上层会进行扩容,这里我们后续会讲到)
- 多消费者情况下会进行额外操作
- 将tail值+1代表Task数量+1,并尝试将state值通过CAS更新
- 若失败,则会重新进行循环,也就是重新拿到_state值并进行入队操作
- 成功,修改当前队列中指定位置的值,设置索引为当前元素,此后并进行placeholder补偿操作。
扩容
fun addLast(element: E): Boolean {
_cur.loop { cur ->
when (cur.addLast(element)) {
Core.ADD_SUCCESS -> return true
Core.ADD_CLOSED -> return false
Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
}
}
}
扩容操作发生在上层,即LockFreeTaskQueue
中,当添加元素使队满时,则会触发扩容操作
fun next(): LockFreeTaskQueueCore<E> = allocateOrGetNextCopy(markFrozen())
private fun markFrozen(): Long =
_state.updateAndGet { state ->
if (state and FROZEN_MASK != 0L) return state // already marked
state or FROZEN_MASK
}
private fun allocateOrGetNextCopy(state: Long): Core<E> {
_next.loop { next ->
if (next != null) return next // already allocated & copied
_next.compareAndSet(null, allocateNextCopy(state))
}
}
private fun allocateNextCopy(state: Long): Core<E> {
val next = LockFreeTaskQueueCore<E>(capacity * 2, singleConsumer)
state.withState { head, tail ->
var index = head
while (index and mask != tail and mask) {
// replace nulls with placeholders on copy
val value = array[index and mask].value ?: Placeholder(index)
next.array[index and next.mask].value = value
index++
}
next._state.value = state wo FROZEN_MASK
}
return next
}
- 先CAS标记当前state为FROZEN
- 载新的QueueCore中通过CAS 创建一个新的队列,容量翻倍,并将老队列的值拷贝到新队列中,需要注意的是mask要相应的变化。
这里PlaceHolder是什么呢?先按下不表
出队 removeFirstOrNull()
// REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
fun removeFirstOrNull(): Any? {
_state.loop { state ->
if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
state.withState { head, tail ->
if ((tail and mask) == (head and mask)) return null // empty
val element = array[head and mask].value
if (element == null) {
// If queue is Single-Consumer, then element == null only when add has not finished yet
if (singleConsumer) return null // consider it not added yet
// retry (spin) until consumer adds it
return@loop
}
// element == Placeholder can only be when add has not finished yet
if (element is Placeholder) return null // consider it not added yet
// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
val newHead = (head + 1) and MAX_CAPACITY_MASK
if (_state.compareAndSet(state, state.updateHead(newHead))) {
// Array could have been copied by another thread and it is perfectly fine, since only elements
// between head and tail were copied and there are no extra steps we should take here
array[head and mask].value = null // now can safely put null (state was updated)
return element // successfully removed in fast-path
}
// Multi-Consumer queue must retry this loop on CAS failure (another consumer might have removed element)
if (!singleConsumer) return@loop
// Single-consumer queue goes to slow-path for remove in case of interference
var cur = this
while (true) {
@Suppress("UNUSED_VALUE")
cur = cur.removeSlowPath(head, newHead) ?: return element
}
}
}
}
- 启动一个while true的循环
- 检查state的标志,如果已经被Frozen,则返回Frozen
- 若队空,则直接返回null
- 如果队不空
- 若只有一个消费者:则说明此时有生产者修改了state值,但还没实际加元素进来,这里将其视作为这个元素还没有添加,交给上层处理
- 若存在多个消费者:则自旋直到检查到添加了为止,因为若将空当作了没有元素,则可能错过这个元素。
- 判断element是否为placeholder,这里后续会提到
- CAS更新head的值,若成功更新,则置空队列那个索引位置的值
- 若没有成功更新
- 在多消费者模式下,重新尝试
- 在单消费模式下,只能说明又有新的生产者添加了一个元素,且队列也可能发生扩容,所以要想办法清除
好,现在进入到问题环节
异步问题整合
1. 在一个while循环中,比如CAS失败了,也就是另一个线程在中途被cpu调用并成功通过CAS修改了_state值,那在这一个线程中,通过CAS修改时得知此次操作失败,那重新循环时,拿到的_state值,为什么会是最新的?为什么原来的线程在每一次loop循环后能感知到这个变化?loop中能感知到state的值变化吗?
好的,恭喜你注意到这个点,它涉及到lambda原理、CAS操作原理、方法调用时引用传递类型以及线程同步等相关知识,别急,我们慢慢来理。
目前似乎就只知道,kotlin的一个block中,如果带有一个参数,那这个参数在这个block中的是不可变的,
来看下AtomicLong
的源码
public actual class AtomicLong internal constructor(value: Long, val trace: TraceBase) {
/**
* Reads/writes of this property maps to read/write of volatile variable.
*/
@Volatile
public actual var value: Long = value
set(value) {
field = value
if (trace !== None) trace { "set($value)" }
}
会发现其value字段上添加了@Volatile注解,该注解与Java中volatile关键字作用完全一样。我们知道,volatile关键字保证了跨线程的可见性以及禁止某些指令重排序。说具体点,也就是写操作会立刻刷新到主内存,读操作会强制从主内存取值。
再来看下loop的代码
public inline fun AtomicLong.loop(action: (Long) -> Unit): Nothing {
while (true) {
action(value)
}
}
这里的Long在传递时,到底是按值传递还是引用传递?
在Kotlin/JVM中,对数字类型采用“按需装箱”策略
- 对于Long(非空),编译后的默认类型为原生long
- 对于Long?(可空),编译后默认类型为java.lang.Long引用
而JVM中是基于值传递的
public static void test(Long num) {
num = 1000L;
}
public static void main(String[] args) {
Long num = 1L;
test(1L);
System.out.println(num);
}
对于如上代码,看看最终打印的num值是多少呢?1000还是1? 结果是1。这是因为JVM中基于值传递的本质以及形参引用赋值所决定的,如果还是不清楚,这里就不再赘述,可以再自行研究。
JVM中永远是按值传参(pass-by-value),区别只是值的内容,对于原生类型则直接复制字节数值,对于对象引用则复制对象在堆上的地址。同时Kotlin中规定了普通函数的参数默认是val不可更改,因为即时在本地修改这份拷贝,调用方也看不到,语法禁止可以避免这类看似能改但实际无效的机会
再回到loop这个方法,流程上:
- 在代码中,进入while循环时,lambda形式的action会被包装为Function, 并实现invoke接口方法,并在while循环中调用这个实例的invoke方法。
- invoke方法需要Long参数,读取AtomicLong.value,JVM把64位数据从主内存(volatile决定)复制到寄存器,这是一次值拷贝
- 在调用invoke(value)时,实参按值复制到形参槽位。因为是值传递(long),所以在一次loop中,或者说一次while循环中,该线程看到的形参state值是固定的。
所以,这里action传递的值为Long,JVM中为原生的long,直接copy数值到val到形参,所以是不可感知的。
而在每次loop循环时是可知的,是因为每次loop后,也就是进入到while循环,此时会重新获取JVM值,也就是再走一次JVM拿内存值->传参到流程,这个时候,因为是从内存拿值,所以拿到的一定是最新的。就没有问题
2. 为什么(tail + 2) and mask == head and mask)会返回FROZEN状态
tail代表的是生产者索引(也就是下一个task入队时,对应的索引),head代表的是消费者索引(即待出队的task的索引)
当尝试 add 一个task到队尾时,真正队列满的情况应该是
经典环形缓冲区的设计
首先明白在经典的环形缓冲区的设计中,我们常用
为什么要这样做?
因为会引起无法判断 队满/队空 的情况,当从GPT中学习到这一点时,发现数据结构学到的知识已经还给老师了
在最初时,
因此,这里至少要预留一个位置,那为什么又预留了一个呢?
其实这个问题我研究了半天,但后续发现其实是一个历史原因,并不涉及到线程安全问题,答案就是如今预留一个已经足够。 Some problem about addLast
in LockFreeTaskQueue #4205 New issue,这里不再深究。后续自己提了个PR到kotlin/coroutines仓库
因此后续的说明我都会以只留一个槽位为前提
3. PlaceHolder到底是什么
PlacerHolder,故名思义就是个占位置的。那它到底是帮谁占位置呢?
//..{
var cur = this
while(true) {
if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet
cur = cur.next().fillPlaceholder(tail, element) ?: break
}
//..}
private fun fillPlaceholder(index: Int, element: E): Core<E>? {
val old = array[index and mask].value
/*
* addLast actions:
* 1) Commit tail slot
* 2) Write element to array slot
* 3) Check for array copy
*
* If copy happened between 2 and 3 then the consumer might have consumed our element,
* then another producer might have written its placeholder in our slot, so we should
* perform *unique* check that current placeholder is our to avoid overwriting another producer placeholder
*/
if (old is Placeholder && old.index == index) {
array[index and mask].value = element
// we've corrected missing element, should check if that propagated to further copies, just in case
return this
}
// it is Ok, no need for further action
return null
}
别忘记我们会面临一些多线程的情况,考虑以下情况
- A生产者线程尝试添加一个元素,CAS更新state成功了,但还没有实际地更新队列中元素的值。
- 此时B生产者线程也尝试来添加一个元素,发现队列满了,CAS设置该Core状态FROZEN并创建一个新的Core,扩容队列。在B扩容的时候,在拷贝A插入的索引处的值时,会发现其是空,因为A还没实际添加,但本来是应该有值的(因为state中head与tail均是有效的)。所以这里类似打了一个标记,后续再处理。
- A线程继续执行,为老Core的队列对应索引位置的值(但其实由于其state已经为FROZEN所以是无意义的)。所以之后需要改新的Core队列中对应索引的值。所以会调用到fillPlaceHolder方法,也就是在最新的队列中补起这个因多线程切换以及扩容而遗漏的值。理解下这里的
cur.next().fillPlaceholder(tail, element) ?: break
搭配while循环来确保Core是最新的.
index检验
同时也要注意这里用到了PlaceHolder中index与addLast的index比较,确保要填充的是我们当前的PlaceHolder。
比如一个生产者添加了一个元素(CAS成功但还没改变队列的值),后续又过了一段时间,发生扩容,新Core队列中该位置值为PlaceHolder(old).
而后续又过了很久,另一个生产者向新Core中添加了一个元素(CAS成功但还没改变队列的值)过一段时间,这个新的队列又发生了扩容,最新的Core队列中该位置值为PlaceHolder(new).
现在back to最初生产者线程操作上,要进行值的补偿。则会找到最新的Core,此时若没有index辅助check,则它发现该位置是placeholder,并还原值为第一次添加的。而后续第二个生产者线程添加的值,则无法被更新(因为此时这个新Core该位置的值已不再是PlaceHolder)。因此会造成两次顺序写值,但最新一次的值没有得到更新的情况
4. Add 操作为什么要做判空的考量
if (!singleConsumer && array[tail and mask].value != null) {
// There are two options in this situation
// 1. Spin-wait until consumer clears the slot
// 2. Freeze & resize to avoid spinning
// We use heuristic here to avoid memory-overallocation
// Freeze & reallocate when queue is small or more than half of the queue is used
if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1) {
return ADD_FROZEN
}
// otherwise spin
return@loop
}
首先要注意到,操作队列时,在不同时段有有以下的操作
插入
- 读值
- CAS改state
- 写入
删除
- 读值
- CAS改state
- 置空
因此考虑下这样的情况,在容量为4的情况下(最大容量为3)
首先考虑,也就是,先Remove再Add新元素,并且每次操作都是成功地通过CAS修改的了state
的值,但还没实际更新队列中的元素。最后就会出现这样的一种情况:
槽位为0的元素中,目前元素是a,属于上一次消费者线程还没清除的,但新的生产者线程还没放入新元素。
在单消费者线程中,这样没问题,因为 Remove->Add->Remove,则下一次的Remove一定发生在上一次Remove操作彻底完成。又因为队列中始终有一个槽位的空位,因此不会存在元素还没有被置空的情况。
但在多消费者模式下,就会存在时序问题
- 如果时序是一样的,即先删除a再添加e,则没有问题。
- 但如果是先替换改元素为e,再置空,则这就会出问题。
因此需要额外判断,也就是这里代码的意义。
如果在多消费者模式且待添加的元素的槽位上有值,则说明这个值还没有被删除,进行自旋,等待多线程下另一个线程把这个槽的值清空。即 生产者不会覆盖槽位的值,而是必须等待其值为空后再写
同时这里还有一些优化:
当queue的容量很小(1024)或队列中的元素超过了容量的一半,那就扩容(说明当前消费者处理的速度明显慢于生产者写入的速度,因为可能造成生产者阻塞)
5. remove时为什么要判断是否是placeholder
例如一个元素在扩容后,存在一个值为placeholder,即上一个生产者还没有写值到队列中。此时下一个消费者线程要取这个值,此时会发现是PlaceHolder
因此这里就规定如果上一个生产者线程还没写值到队列,则消费者线程取这个值就为null
6. remove失败为什么会根据消费者数目来进行判断
当remove失败时,在多消费者情况下,有很多种可能
- 有另一个消费者成功执行了remove,因此这个消费者需要重试,自旋
- 另一个生产者成功执行了addLast,因此这个消费者也需要重试,自旋
- 另一个生产者执行了addLast触发了FROZEN并扩容,因此需要重试
在单消费者模式下,情况就很简单
- 另一个生产者成功执行了addLast
- 另一个生产者执行了addLast触发了FROZEN并扩容,则需要在新Core中remove掉这个值
因此就有了removeSlowPath
这个方法
private fun removeSlowPath(oldHead: Int, newHead: Int): Core<E>? {
_state.loop { state ->
state.withState { head, _ ->
assert { head == oldHead } // "This queue can have only one consumer"
if (state and FROZEN_MASK != 0L) {
// state was already frozen, so removed element was copied to next
return next() // continue to correct head in next
}
if (_state.compareAndSet(state, state.updateHead(newHead))) {
array[head and mask].value = null // now can safely put null (state was updated)
return null
}
}
}
}
它的核心就是在单消费者模式下,确保在head tail无误时能成功取出一个值,哪怕可能会自旋一段时间。
- 看当前core是否被FROZEN,如果是则返回next(),确保是最新的
- 在最新的core下,不断自旋重试,尝试CAS更新head值。
外层包装类
internal open class LockFreeTaskQueue<E : Any>(
singleConsumer: Boolean // true when there is only a single consumer (slightly faster & lock-free)
) {
private val _cur = atomic(Core<E>(Core.INITIAL_CAPACITY, singleConsumer))
// Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
val isEmpty: Boolean get() = _cur.value.isEmpty
val size: Int get() = _cur.value.size
fun close() {
_cur.loop { cur ->
if (cur.close()) return // closed this copy
_cur.compareAndSet(cur, cur.next()) // move to next
}
}
fun addLast(element: E): Boolean {
_cur.loop { cur ->
when (cur.addLast(element)) {
Core.ADD_SUCCESS -> return true
Core.ADD_CLOSED -> return false
Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
}
}
}
@Suppress("UNCHECKED_CAST")
fun removeFirstOrNull(): E? {
_cur.loop { cur ->
val result = cur.removeFirstOrNull()
if (result !== Core.REMOVE_FROZEN) return result as E?
_cur.compareAndSet(cur, cur.next())
}
}
// Used for validation in tests only
fun <R> map(transform: (E) -> R): List<R> = _cur.value.map(transform)
// Used for validation in tests only
fun isClosed(): Boolean = _cur.value.isClosed()
}