1 package chainclient
2 3 import (
4 "container/list"
5 6 "github.com/p9c/p9/pkg/qu"
7 )
8 9 // ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. Clients interact with the queue by pushing
10 // items into the in channel and popping items from the out channel. There is a goroutine that manages moving items from
11 // the in channel to the out channel in the correct order that must be started by calling Start().
12 type ConcurrentQueue struct {
13 chanIn chan interface{}
14 chanOut chan interface{}
15 quit qu.C
16 overflow *list.List
17 }
18 19 // NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is the capacity of the output channel. When
20 // the size of the queue is below this threshold, pushes do not incur the overhead of the less efficient overflow
21 // structure.
22 func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
23 return &ConcurrentQueue{
24 chanIn: make(chan interface{}),
25 chanOut: make(chan interface{}, bufferSize),
26 quit: qu.T(),
27 overflow: list.New(),
28 }
29 }
30 31 // ChanIn returns a channel that can be used to push new items into the queue.
32 func (cq *ConcurrentQueue) ChanIn() chan<- interface{} {
33 return cq.chanIn
34 }
35 36 // ChanOut returns a channel that can be used to pop items from the queue.
37 func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
38 return cq.chanOut
39 }
40 41 // Start begins a goroutine that manages moving items from the in channel to the out channel. The queue tries to move
42 // items directly to the out channel minimize overhead, but if the out channel is full it pushes items to an overflow
43 // queue. This must be called before using the queue.
44 func (cq *ConcurrentQueue) Start() {
45 go func() {
46 for {
47 nextElement := cq.overflow.Front()
48 if nextElement == nil {
49 // The overflow queue is empty, so incoming items can be pushed directly to the output channel. However,
50 // if output channel is full, we'll push to the overflow list instead.
51 select {
52 case item := <-cq.chanIn:
53 select {
54 case cq.chanOut <- item:
55 case <-cq.quit.Wait():
56 return
57 default:
58 cq.overflow.PushBack(item)
59 }
60 case <-cq.quit.Wait():
61 return
62 }
63 } else {
64 // The overflow queue is not empty, so any new items get pushed to the back to preserve order.
65 select {
66 case item := <-cq.chanIn:
67 cq.overflow.PushBack(item)
68 case cq.chanOut <- nextElement.Value:
69 cq.overflow.Remove(nextElement)
70 case <-cq.quit.Wait():
71 return
72 }
73 }
74 }
75 }()
76 }
77 78 // Stop ends the goroutine that moves items from the in channel to the out channel.
79 func (cq *ConcurrentQueue) Stop() {
80 cq.quit.Q()
81 }
82