1 package xsync
2 3 import (
4 "sync/atomic"
5 )
6 7 // A SPSCQueue is a bounded single-producer single-consumer concurrent
8 // queue. This means that not more than a single goroutine must be
9 // publishing items to the queue while not more than a single goroutine
10 // must be consuming those items.
11 //
12 // SPSCQueue instances must be created with NewSPSCQueue function.
13 // A SPSCQueue must not be copied after first use.
14 //
15 // Based on the data structure from the following article:
16 // https://rigtorp.se/ringbuffer/
17 type SPSCQueue struct {
18 cap uint64
19 pidx uint64
20 //lint:ignore U1000 prevents false sharing
21 pad0 [cacheLineSize - 8]byte
22 pcachedIdx uint64
23 //lint:ignore U1000 prevents false sharing
24 pad1 [cacheLineSize - 8]byte
25 cidx uint64
26 //lint:ignore U1000 prevents false sharing
27 pad2 [cacheLineSize - 8]byte
28 ccachedIdx uint64
29 //lint:ignore U1000 prevents false sharing
30 pad3 [cacheLineSize - 8]byte
31 items []interface{}
32 }
33 34 // NewSPSCQueue creates a new SPSCQueue instance with the given
35 // capacity.
36 func NewSPSCQueue(capacity int) *SPSCQueue {
37 if capacity < 1 {
38 panic("capacity must be positive number")
39 }
40 return &SPSCQueue{
41 cap: uint64(capacity + 1),
42 items: make([]interface{}, capacity+1),
43 }
44 }
45 46 // TryEnqueue inserts the given item into the queue. Does not block
47 // and returns immediately. The result indicates that the queue isn't
48 // full and the item was inserted.
49 func (q *SPSCQueue) TryEnqueue(item interface{}) bool {
50 // relaxed memory order would be enough here
51 idx := atomic.LoadUint64(&q.pidx)
52 nextIdx := idx + 1
53 if nextIdx == q.cap {
54 nextIdx = 0
55 }
56 cachedIdx := q.ccachedIdx
57 if nextIdx == cachedIdx {
58 cachedIdx = atomic.LoadUint64(&q.cidx)
59 q.ccachedIdx = cachedIdx
60 if nextIdx == cachedIdx {
61 return false
62 }
63 }
64 q.items[idx] = item
65 atomic.StoreUint64(&q.pidx, nextIdx)
66 return true
67 }
68 69 // TryDequeue retrieves and removes the item from the head of the
70 // queue. Does not block and returns immediately. The ok result
71 // indicates that the queue isn't empty and an item was retrieved.
72 func (q *SPSCQueue) TryDequeue() (item interface{}, ok bool) {
73 // relaxed memory order would be enough here
74 idx := atomic.LoadUint64(&q.cidx)
75 cachedIdx := q.pcachedIdx
76 if idx == cachedIdx {
77 cachedIdx = atomic.LoadUint64(&q.pidx)
78 q.pcachedIdx = cachedIdx
79 if idx == cachedIdx {
80 return
81 }
82 }
83 item = q.items[idx]
84 q.items[idx] = nil
85 ok = true
86 nextIdx := idx + 1
87 if nextIdx == q.cap {
88 nextIdx = 0
89 }
90 atomic.StoreUint64(&q.cidx, nextIdx)
91 return
92 }
93