mpmcqueueof.go raw
1 //go:build go1.19
2 // +build go1.19
3
4 package xsync
5
6 import (
7 "runtime"
8 "sync/atomic"
9 "unsafe"
10 )
11
12 // A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent
13 // queue. It's a generic version of MPMCQueue.
14 //
15 // MPMCQueueOf instances must be created with NewMPMCQueueOf function.
16 // A MPMCQueueOf must not be copied after first use.
17 //
18 // Based on the data structure from the following C++ library:
19 // https://github.com/rigtorp/MPMCQueue
20 type MPMCQueueOf[I any] struct {
21 cap uint64
22 head uint64
23 //lint:ignore U1000 prevents false sharing
24 hpad [cacheLineSize - 8]byte
25 tail uint64
26 //lint:ignore U1000 prevents false sharing
27 tpad [cacheLineSize - 8]byte
28 slots []slotOfPadded[I]
29 }
30
31 type slotOfPadded[I any] struct {
32 slotOf[I]
33 // Unfortunately, proper padding like the below one:
34 //
35 // pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte
36 //
37 // won't compile, so here we add a best-effort padding for items up to
38 // 56 bytes size.
39 //lint:ignore U1000 prevents false sharing
40 pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
41 }
42
43 type slotOf[I any] struct {
44 // atomic.Uint64 is used here to get proper 8 byte alignment on
45 // 32-bit archs.
46 turn atomic.Uint64
47 item I
48 }
49
50 // NewMPMCQueueOf creates a new MPMCQueueOf instance with the given
51 // capacity.
52 func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] {
53 if capacity < 1 {
54 panic("capacity must be positive number")
55 }
56 return &MPMCQueueOf[I]{
57 cap: uint64(capacity),
58 slots: make([]slotOfPadded[I], capacity),
59 }
60 }
61
62 // Enqueue inserts the given item into the queue.
63 // Blocks, if the queue is full.
64 //
65 // Deprecated: use TryEnqueue in combination with runtime.Gosched().
66 func (q *MPMCQueueOf[I]) Enqueue(item I) {
67 head := atomic.AddUint64(&q.head, 1) - 1
68 slot := &q.slots[q.idx(head)]
69 turn := q.turn(head) * 2
70 for slot.turn.Load() != turn {
71 runtime.Gosched()
72 }
73 slot.item = item
74 slot.turn.Store(turn + 1)
75 }
76
77 // Dequeue retrieves and removes the item from the head of the queue.
78 // Blocks, if the queue is empty.
79 //
80 // Deprecated: use TryDequeue in combination with runtime.Gosched().
81 func (q *MPMCQueueOf[I]) Dequeue() I {
82 var zeroI I
83 tail := atomic.AddUint64(&q.tail, 1) - 1
84 slot := &q.slots[q.idx(tail)]
85 turn := q.turn(tail)*2 + 1
86 for slot.turn.Load() != turn {
87 runtime.Gosched()
88 }
89 item := slot.item
90 slot.item = zeroI
91 slot.turn.Store(turn + 1)
92 return item
93 }
94
95 // TryEnqueue inserts the given item into the queue. Does not block
96 // and returns immediately. The result indicates that the queue isn't
97 // full and the item was inserted.
98 func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool {
99 head := atomic.LoadUint64(&q.head)
100 slot := &q.slots[q.idx(head)]
101 turn := q.turn(head) * 2
102 if slot.turn.Load() == turn {
103 if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
104 slot.item = item
105 slot.turn.Store(turn + 1)
106 return true
107 }
108 }
109 return false
110 }
111
112 // TryDequeue retrieves and removes the item from the head of the
113 // queue. Does not block and returns immediately. The ok result
114 // indicates that the queue isn't empty and an item was retrieved.
115 func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) {
116 tail := atomic.LoadUint64(&q.tail)
117 slot := &q.slots[q.idx(tail)]
118 turn := q.turn(tail)*2 + 1
119 if slot.turn.Load() == turn {
120 if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
121 var zeroI I
122 item = slot.item
123 ok = true
124 slot.item = zeroI
125 slot.turn.Store(turn + 1)
126 return
127 }
128 }
129 return
130 }
131
132 func (q *MPMCQueueOf[I]) idx(i uint64) uint64 {
133 return i % q.cap
134 }
135
136 func (q *MPMCQueueOf[I]) turn(i uint64) uint64 {
137 return i / q.cap
138 }
139