1 //go:build go1.19
2 // +build go1.19
3 4 package xsync
5 6 import (
7 "sync/atomic"
8 )
9 10 // A SPSCQueueOf is a bounded single-producer single-consumer concurrent
11 // queue. This means that not more than a single goroutine must be
12 // publishing items to the queue while not more than a single goroutine
13 // must be consuming those items.
14 //
15 // SPSCQueueOf instances must be created with NewSPSCQueueOf function.
16 // A SPSCQueueOf must not be copied after first use.
17 //
18 // Based on the data structure from the following article:
19 // https://rigtorp.se/ringbuffer/
20 type SPSCQueueOf[I any] struct {
21 cap uint64
22 pidx uint64
23 //lint:ignore U1000 prevents false sharing
24 pad0 [cacheLineSize - 8]byte
25 pcachedIdx uint64
26 //lint:ignore U1000 prevents false sharing
27 pad1 [cacheLineSize - 8]byte
28 cidx uint64
29 //lint:ignore U1000 prevents false sharing
30 pad2 [cacheLineSize - 8]byte
31 ccachedIdx uint64
32 //lint:ignore U1000 prevents false sharing
33 pad3 [cacheLineSize - 8]byte
34 items []I
35 }
36 37 // NewSPSCQueueOf creates a new SPSCQueueOf instance with the given
38 // capacity.
39 func NewSPSCQueueOf[I any](capacity int) *SPSCQueueOf[I] {
40 if capacity < 1 {
41 panic("capacity must be positive number")
42 }
43 return &SPSCQueueOf[I]{
44 cap: uint64(capacity + 1),
45 items: make([]I, capacity+1),
46 }
47 }
48 49 // TryEnqueue inserts the given item into the queue. Does not block
50 // and returns immediately. The result indicates that the queue isn't
51 // full and the item was inserted.
52 func (q *SPSCQueueOf[I]) TryEnqueue(item I) bool {
53 // relaxed memory order would be enough here
54 idx := atomic.LoadUint64(&q.pidx)
55 next_idx := idx + 1
56 if next_idx == q.cap {
57 next_idx = 0
58 }
59 cached_idx := q.ccachedIdx
60 if next_idx == cached_idx {
61 cached_idx = atomic.LoadUint64(&q.cidx)
62 q.ccachedIdx = cached_idx
63 if next_idx == cached_idx {
64 return false
65 }
66 }
67 q.items[idx] = item
68 atomic.StoreUint64(&q.pidx, next_idx)
69 return true
70 }
71 72 // TryDequeue retrieves and removes the item from the head of the
73 // queue. Does not block and returns immediately. The ok result
74 // indicates that the queue isn't empty and an item was retrieved.
75 func (q *SPSCQueueOf[I]) TryDequeue() (item I, ok bool) {
76 // relaxed memory order would be enough here
77 idx := atomic.LoadUint64(&q.cidx)
78 cached_idx := q.pcachedIdx
79 if idx == cached_idx {
80 cached_idx = atomic.LoadUint64(&q.pidx)
81 q.pcachedIdx = cached_idx
82 if idx == cached_idx {
83 return
84 }
85 }
86 var zeroI I
87 item = q.items[idx]
88 q.items[idx] = zeroI
89 ok = true
90 next_idx := idx + 1
91 if next_idx == q.cap {
92 next_idx = 0
93 }
94 atomic.StoreUint64(&q.cidx, next_idx)
95 return
96 }
97