Golang版本环形队列实现¶
https://github.com/hedzr/go-ringbuf
下面将依据前面的背景知识实现一个无锁的环形队列(Circular Queue,Ring Buffer),尽可能地解除各种竞争状况。
基本定义¶
首先是队列的大小,多数已有的环形队列均推荐使用 2 的冥数(2^n
)为队列尺寸,其好处在于通过和 2^n-1
的 AND
操作即可将 head
/tail
指针绕回,避免了 mod
操作。在 CPU 指令集中,mod
操作依赖于一个 IDIV
(整数除法)运算,这通常是 AND
操作耗时的数倍。
其次是 ringbuf
结构体的对齐,在 Golang 中这是自动的,所以没有什么可说的。
为了解决 ringbuf
元素数组中每个元素 rbItem
的 CacheLine 问题,我们对其进行了填充,使得一个 rbItem
能够占满一个 CacheLine。
type (
Queue interface {
Enqueue(item interface{}) (err error)
Dequeue() (item interface{}, err error)
// Cap returns the outer capacity of the ring buffer.
Cap() uint32
// Size returns the quantity of items in the ring buffer queue
Size() uint32
IsEmpty() (b bool)
IsFull() (b bool)
}
RingBuffer interface {
io.Closer // for logger
Queue
Put(item interface{}) (err error)
Get() (item interface{}, err error)
// Quantity returns the quantity of items in the ring buffer queue
Quantity() uint32
Debug(enabled bool) (lastState bool)
ResetCounters()
}
ringBuf struct {
cap uint32
capModMask uint32
head uint32
tail uint32
putWaits uint64
getWaits uint64
_ [CacheLinePadSize]byte
data []rbItem
debugMode bool
logger *zap.Logger
}
rbItem struct {
readWrite uint64 // 0: writable, 1: readable, 2: write ok, 3: read ok
value interface{} // ptr
_ [CacheLinePadSize - 8 - 8]byte
// _ cpu.CacheLinePad
}
)
至于 Queue
和 RingBuffer
是库作者的例行操作,就不赘述了。
在 rbItem
中增加了 CacheLine 对齐字段,这是为了避免每个 producer/consumer 操作一个 rbItem
时彼此之间发生 false sharing 干扰。因此,多个 rbItem
无法被同时载入一个 Cache Line 中,所以数据竞争的问题也同时被避免了。实测呢,好像有点点改善,但也没有好到一倍的状态,不过即使没有性能改善,为了 DATA RACE 也必须进行这样的对齐,才能确保在这里不必引入一颗锁。
在 ringBuf
中同样也有对齐字段以免操作 head/tail/putWaits/getWaits 时关联载入 data 指针,不过正因为data是指针,所以这个对齐字段的效用通常为 0。为了代码可移植性(到 C++11 模版类?),暂且保留该对齐。
操作¶
测试:队列空,队列满¶
func (rb *ringBuf) IsEmpty() (b bool) {
var tail, head uint32
var quad uint64
quad = atomic.LoadUint64((*uint64)(unsafe.Pointer(&rb.head)))
head = (uint32)(quad & MaxUint32_64)
tail = (uint32)(quad >> 32)
// var tail, head uint32
// head = atomic.LoadUint32(&rb.head)
// tail = atomic.LoadUint32(&rb.tail)
b = head == tail
return
}
func (rb *ringBuf) IsFull() (b bool) {
var tail, head uint32
var quad uint64
quad = atomic.LoadUint64((*uint64)(unsafe.Pointer(&rb.head)))
head = (uint32)(quad & MaxUint32_64)
tail = (uint32)(quad >> 32)
b = ((tail + 1) & rb.capModMask) == head
return
}
关于 head/tail 的载入优化¶
由于我们限定了 head 和 tail 指针为 32 位整数,因此原子操作可以一次取得它们而不是两次。这能够带来小小的提升,实测证明了有其存在的价值。
如果 CPU 架构使用 Big Endian 模式,上述代码需要被调整。
在移植时才会考虑针对性改写。
考虑进一步优化:
如果是 C++/C/ASM的话,原子指令的相关函数调用可以去掉,quard,tail,head可以使用寄存器,也无需 AND 和 SHIFT 运算。
binary 包对此没有帮助。
进一步的针对 head/tail 的优化¶
由于 put 操作使用 tail 和 head,但不会修改 head(相应的 get 操作也类似于此),所以我们还有另一个选择进行性能提升:
分离 head 和 tail 的存储位置保证不会同时载入单一 CacheLine。
这个策略可以这样实现:
ringBuf struct {
cap uint32
capModMask uint32
_ [CacheLinePadSize - 8]byte
head uint32
_ [CacheLinePadSize - 4]byte
tail uint32
_ [CacheLinePadSize - 4]byte
putWaits uint64
_ [CacheLinePadSize - 8]byte
getWaits uint64
_ [CacheLinePadSize - 8]byte
data []rbItem
debugMode bool
logger *zap.Logger
}
// 此时需要分别取得 head tail:
var tail, head uint32
head = atomic.LoadUint32(&rb.head)
tail = atomic.LoadUint32(&rb.tail)
这种方法应该比当前的简要方案更具有优势。但是出于试验的目的,我们暂时没有应用该方案,而是将此优化推迟到 v1.x 发布时。目前 go-ringbuf v0.7.x 将采用前文述及到单次原子操作方案。
关于判定算法¶
我们采用标准的环形队列实现方案:
- 保留一个元素的空间
即不允许 tail 赶上 head,队尾节点和对首节点之间至少留有一个元素的空间。
如果 head == tail,队列空;
如果 (tail+1) % M == head,队列满。
Cap 和 Size¶
Cap()
表示环形队列的容量,Size()
及其同义词 Quantity()
返回的是当前队列中的元素数量。
func (rb *ringBuf) Quantity() uint32 {
return rb.Size()
}
func (rb *ringBuf) Size() uint32 {
var quantity uint32
// head = atomic.LoadUint32(&rb.head)
// tail = atomic.LoadUint32(&rb.tail)
var tail, head uint32
var quad uint64
quad = atomic.LoadUint64((*uint64)(unsafe.Pointer(&rb.head)))
head = (uint32)(quad & MaxUint32_64)
tail = (uint32)(quad >> 32)
if tail >= head {
quantity = tail - head
} else {
quantity = rb.capModMask + (tail - head)
}
return quantity
}
func (rb *ringBuf) Cap() uint32 {
return rb.cap
}
Enqueue / Put¶
一些无锁方案在实现之后会显得较为玄妙,这当然是体现了设计者的精巧思路的。其麻烦在于,配图配论文都不容易改善它们的可读性。
Put 算法¶
在我们的 ringBuf
中,解决无锁以及避让问题采用了比较明晰而且简洁的路子,其思路是这样的:
-
每个
rbItem
承载着一个标志readWrite
以及元素实体value
-
申请队尾写入权
当入队列操作时,首先期待 readWrite
== 0,这意味着这个队列尾部的 rbItem
是干净的,写入就绪的(即空闲状态);
一旦原子操作确认到这样的队列尾部 rbItem
,则其 readWrite
标志也被更新为 2。这表示着该 rbItem
被申请成功了。其它 producers 将无法取得该 rbItem
作为它们的写入目标了。
-
现在是时候更新
tail
指针到下一元素了 -
同时,我们可以安全地更新
value
成员(多个rbItem
无法被同时载入一个 Cache Line 中) -
最后,我们将
readWrite
标志更新为1
,这标志着入列操作已经完成,这个rbItem
现在是读出就绪的
func (rb *ringBuf) Enqueue(item interface{}) (err error) {
var tail, head, nt uint32
var holder *rbItem
for {
var quad uint64
quad = atomic.LoadUint64((*uint64)(unsafe.Pointer(&rb.head)))
head = (uint32)(quad & MaxUint32_64)
tail = (uint32)(quad >> 32)
// head = atomic.LoadUint32(&rb.head)
// tail = atomic.LoadUint32(&rb.tail)
nt = (tail + 1) & rb.capModMask
isFull := nt == head
if isFull {
err = ErrQueueFull
return
}
holder = &rb.data[tail]
if atomic.CompareAndSwapUint64(&holder.readWrite, 0, 2) {
holder.value = item
atomic.CompareAndSwapUint32(&rb.tail, tail, nt)
break
}
time.Sleep(1 * time.Nanosecond)
atomic.AddUint64(&rb.putWaits, 1)
}
if !atomic.CompareAndSwapUint64(&holder.readWrite, 2, 1) {
err = fmt.Errorf("[W] %w, 2=>1, %v", ErrRaced, holder.readWrite)
return
}
return
}
例外情况¶
-
步骤2
操作失败时,意味着其它 producers 已经拿到了队尾rbItem
的写入权,因此我们使用一个 1ns 的延迟并再次自旋,以求取得新的队尾写入权。 -
步骤3
的原子操作可能会失败。
这代表着这个队尾已经旧了,新的队尾已经被别的 producers 提交成功了。
因此,我们可以安全地忽略 tail
的更新不成功的问题。
步骤4
存在偶然的可能性会失败。
这种可能性发生时,代表着运行环境已经崩塌了,属于致命性错误。
暂时,我不能排除时算法错误的因素带来的这个错误,但目前经过大量工程实测验证来看,算法可能不是最优的,但没有缺陷。
理论上的证明需要另外成文了,以后再说了。
Dequeue / Get¶
Get算法¶
类似于 Put 算法,Get 算法操作 head 指针并写入 value 实体:
-
每个
rbItem
承载着一个标志readWrite
以及元素实体value
-
申请队首读取和更新权
当出队列操作时,首先期待 readWrite
== 1,这意味着这个队列尾部的 rbItem
是干净的,读出就绪的;
一旦原子操作确认到这样的队列尾部 rbItem
,则其 readWrite
标志也被更新为 3。这表示着该 rbItem
被申请成功了。其它 consumers 将无法取得该 rbItem
作为它们的读出目标了。
-
现在是时候更新
head
指针到下一元素了 -
同时,我们可以安全地读取
value
成员(多个rbItem
无法被同时载入一个 Cache Line 中) -
最后,我们将
readWrite
标志更新为0
,这标志着出列操作已经完成,这个rbItem
现在是空闲状态了。
func (rb *ringBuf) Dequeue() (item interface{}, err error) {
var tail, head, nh uint32
var holder *rbItem
for {
var quad uint64
quad = atomic.LoadUint64((*uint64)(unsafe.Pointer(&rb.head)))
head = (uint32)(quad & MaxUint32_64)
tail = (uint32)(quad >> 32)
// head = atomic.LoadUint32(&rb.head)
// tail = atomic.LoadUint32(&rb.tail)
isEmpty := head == tail
if isEmpty {
err = ErrQueueEmpty
return
}
holder = &rb.data[head]
if atomic.CompareAndSwapUint64(&holder.readWrite, 1, 3) {
item = holder.value
nh = (head + 1) & rb.capModMask
atomic.CompareAndSwapUint32(&rb.head, head, nh)
break
}
time.Sleep(1 * time.Nanosecond)
atomic.AddUint64(&rb.getWaits, 1)
}
if !atomic.CompareAndSwapUint64(&holder.readWrite, 3, 0) {
err = fmt.Errorf("[R] %w, 3=>0, %v", ErrRaced, holder.readWrite)
return
}
if item == nil {
err = fmt.Errorf("[ringbuf][GET] cap: %v, qty: %v, head: %v, tail: %v, new head: %v", rb.cap, rb.qty(head, tail), head, tail, nh)
if !rb.debugMode {
rb.logger.Warn("[ringbuf][GET] ", zap.Uint32("cap", rb.cap), zap.Uint32("qty", rb.qty(head, tail)), zap.Uint32("tail", tail), zap.Uint32("head", head), zap.Uint32("new head", nh))
}
rb.logger.Fatal("[ringbuf][GET] [ERR] unexpected nil element value FOUND!")
}
return
}
同样地,多个 consumers 通过 1ns 的自旋来申请读出权直至成功,这会带来潜在的阻塞问题。
既然 ringBuf
的用途时面向高吞吐、高并发场景,那么多个 consumers 在申请读出权失败时被阻塞也是调用者期待的行为,因为那个成功的 consumer 必然会在有限的步骤里(一般来说可能是 2.691µs 这个级别)释放这个被锁定的元素,所以往往自旋 1 到数次即可拿到下一个新的队首元素的读出权、或者是返回队列为空错误由调用者决定下一步行为。
注意 Enqueue 在相似的步骤中面对的情况是类似的,因而不再单独解析其行为。
由于自旋部分一定会在有限步骤中退出(成功,失败,空队列),所以这部分算法是 lock free 的。
小结¶
我们已经解释了关键性的无锁 Enqueue/Dequeue 思路,具体的实现代码在 https://github.com/hedzr/go-ringbuf 中可以找到。
性能¶
MPMC¶
我们在实现过程中作出了必要的取舍,目前看来算法正确性是有保障的。同时它的性能不俗:
$ go test ./ringbuf/rb -v -race -run 'TestRingBuf_MPut'
=== RUN TestRingBuf_MPut
TestRingBuf_MPut: rb_test.go:223: Grp: 16, Times: 1360000, use: 26.266041367s, 19.313µs/op
TestRingBuf_MPut: rb_test.go:224: Put: 1360000, use: 24.036637261s, 17.673µs/op | retry times: 0
TestRingBuf_MPut: rb_test.go:225: Get: 1360000, use: 2.229404106s, 1.639µs/op | retry times: 0
--- PASS: TestRingBuf_MPut (51.29s)
=== RUN TestRingBuf_MPut
TestRingBuf_MPut: rb_test.go:231: Grp: 16, Times: 1360000, use: 42.836537705s, 31.497µs/op
TestRingBuf_MPut: rb_test.go:232: Put: 1360000, use: 39.277276612s, 28.88µs/op | retry times: 0
TestRingBuf_MPut: rb_test.go:233: Get: 1360000, use: 3.559261093s, 2.617µs/op | retry times: 0
--- PASS: TestRingBuf_MPut (53.60s)
我们运行多生产者多消费者的手工 benchmark 测试可以得到以上结果,这里给出的结果实例是两次运行的结果,原因是单独的工作站测试的参考价值一般话且不稳定。从结果中我们可以看到 put + get 的操作平均值大约在 19.313µs/op .. 31.497µs/op 之间。
由于模拟了真实场景的消费情况,因此覆盖了 Enqueue/Dequeue 的各个分支尤其是做到了申请写入/读出权竞争状态的模拟,因此这里的数据比较有参考价值。
1.639µs/op 的出列(610M/s),17.673µs/op 的入列(57M/s)已经是达到了我们预期的目标了。
对照¶
为了横向比较,我们以相同方式运行了别的例子,得到的结论是 ringBuf 大概在 9.734µs/op 左右,而对照者大约在 8.373µs/op 左右。
之所以这组对照结果和 MPMC 有所不同,原因是对照组采用先一次性写入然后一次性读出的方式来计算 put+get 时长,所以我们使用同样的测试方式另行完成了对照测试。
在高性能服务器上,工作性能有可能达到 1~2µs/op 级别(增加核心、增加内存,选择高总线带宽):
=== RUN TestQueuePutGetLong
TestQueuePutGetLong: rb_test.go:399: Grp: 64, Times: 20800000, use: 2m1.90187948s, 5.86µs/op
TestQueuePutGetLong: rb_test.go:400: Put: 20800000, use: 52.716325629s, 2.534µs/op
TestQueuePutGetLong: rb_test.go:401: Get: 20800000, use: 57.048982796s, 2.742µs/op
--- PASS: TestQueuePutGetLong (121.90s)
说明¶
性能测试自己跑才能比较,因为我懒得去找标准机做 Benchmark。上面给出的数据是在 i5-5257U CPU @ 2.70GHz 的 MBP 上跑出来的,只能证明 Server 端表现可以更好,但不能当作独立的依据。
🔚¶
-
https://www.boost.org/doc/libs/1_39_0/libs/circular_buffer/doc/circular_buffer.html ↩
-
多核 CPU,CPU集群:预取,乱序执行,超标量流水线,并发编程 ↩
-
Paul E. McKenney: Memory Barriers: a Hardware View for Software Hackers ↩
-
内存屏障 (Wikipedia): https://en.wikipedia.org/wiki/Memory_barrier ↩
-
Intel® 64 and IA-32 Architectures Software Developer’s Manual, https://software.intel.com/en-us/articles/intel-sdm ↩
-
Memory Barriers/Fences, https://mechanical-sympathy.blogspot.jp/2011/07/memory-barriersfences.html ↩
-
Memory Barriers: a Hardware View for Software Hackers, Paul E. McKenney, Linux Technology Center, IBM Beaverton, https://www.researchgate.net/publication/228824849_Memory_Barriers_a_Hardware_View_for_Software_Hackers ↩
-
Intel Sandy Bridge Configuration, http://www.7-cpu.com/cpu/SandyBridge.html ↩
-
Intel’s Haswell CPU Microarchitecture, http://www.realworldtech.com/haswell-cpu/5/ ↩
-
Write Combining, http://mechanical-sympathy.blogspot.com/2011/07/write-combining.html ↩
-
Memory ordering, https://en.wikipedia.org/wiki/Memory_ordering ↩
-
LearnConcurrency: Read Advanced Go Concurrency Primitives, Study The Go Memory Model ↩