mpmcqueue.go raw
1 package xsync
2
3 import (
4 "runtime"
5 "sync/atomic"
6 "unsafe"
7 )
8
9 // A MPMCQueue is a bounded multi-producer multi-consumer concurrent
10 // queue.
11 //
12 // MPMCQueue instances must be created with NewMPMCQueue function.
13 // A MPMCQueue must not be copied after first use.
14 //
15 // Based on the data structure from the following C++ library:
16 // https://github.com/rigtorp/MPMCQueue
17 type MPMCQueue struct {
18 cap uint64
19 head uint64
20 //lint:ignore U1000 prevents false sharing
21 hpad [cacheLineSize - 8]byte
22 tail uint64
23 //lint:ignore U1000 prevents false sharing
24 tpad [cacheLineSize - 8]byte
25 slots []slotPadded
26 }
27
28 type slotPadded struct {
29 slot
30 //lint:ignore U1000 prevents false sharing
31 pad [cacheLineSize - unsafe.Sizeof(slot{})]byte
32 }
33
34 type slot struct {
35 turn uint64
36 item interface{}
37 }
38
39 // NewMPMCQueue creates a new MPMCQueue instance with the given
40 // capacity.
41 func NewMPMCQueue(capacity int) *MPMCQueue {
42 if capacity < 1 {
43 panic("capacity must be positive number")
44 }
45 return &MPMCQueue{
46 cap: uint64(capacity),
47 slots: make([]slotPadded, capacity),
48 }
49 }
50
51 // Enqueue inserts the given item into the queue.
52 // Blocks, if the queue is full.
53 //
54 // Deprecated: use TryEnqueue in combination with runtime.Gosched().
55 func (q *MPMCQueue) Enqueue(item interface{}) {
56 head := atomic.AddUint64(&q.head, 1) - 1
57 slot := &q.slots[q.idx(head)]
58 turn := q.turn(head) * 2
59 for atomic.LoadUint64(&slot.turn) != turn {
60 runtime.Gosched()
61 }
62 slot.item = item
63 atomic.StoreUint64(&slot.turn, turn+1)
64 }
65
66 // Dequeue retrieves and removes the item from the head of the queue.
67 // Blocks, if the queue is empty.
68 //
69 // Deprecated: use TryDequeue in combination with runtime.Gosched().
70 func (q *MPMCQueue) Dequeue() interface{} {
71 tail := atomic.AddUint64(&q.tail, 1) - 1
72 slot := &q.slots[q.idx(tail)]
73 turn := q.turn(tail)*2 + 1
74 for atomic.LoadUint64(&slot.turn) != turn {
75 runtime.Gosched()
76 }
77 item := slot.item
78 slot.item = nil
79 atomic.StoreUint64(&slot.turn, turn+1)
80 return item
81 }
82
83 // TryEnqueue inserts the given item into the queue. Does not block
84 // and returns immediately. The result indicates that the queue isn't
85 // full and the item was inserted.
86 func (q *MPMCQueue) TryEnqueue(item interface{}) bool {
87 head := atomic.LoadUint64(&q.head)
88 slot := &q.slots[q.idx(head)]
89 turn := q.turn(head) * 2
90 if atomic.LoadUint64(&slot.turn) == turn {
91 if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
92 slot.item = item
93 atomic.StoreUint64(&slot.turn, turn+1)
94 return true
95 }
96 }
97 return false
98 }
99
100 // TryDequeue retrieves and removes the item from the head of the
101 // queue. Does not block and returns immediately. The ok result
102 // indicates that the queue isn't empty and an item was retrieved.
103 func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) {
104 tail := atomic.LoadUint64(&q.tail)
105 slot := &q.slots[q.idx(tail)]
106 turn := q.turn(tail)*2 + 1
107 if atomic.LoadUint64(&slot.turn) == turn {
108 if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
109 item = slot.item
110 ok = true
111 slot.item = nil
112 atomic.StoreUint64(&slot.turn, turn+1)
113 return
114 }
115 }
116 return
117 }
118
119 func (q *MPMCQueue) idx(i uint64) uint64 {
120 return i % q.cap
121 }
122
123 func (q *MPMCQueue) turn(i uint64) uint64 {
124 return i / q.cap
125 }
126