queue.go raw

   1  package chainclient
   2  
   3  import (
   4  	"container/list"
   5  	
   6  	"github.com/p9c/p9/pkg/qu"
   7  )
   8  
   9  // ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. Clients interact with the queue by pushing
  10  // items into the in channel and popping items from the out channel. There is a goroutine that manages moving items from
  11  // the in channel to the out channel in the correct order that must be started by calling Start().
  12  type ConcurrentQueue struct {
  13  	chanIn   chan interface{}
  14  	chanOut  chan interface{}
  15  	quit     qu.C
  16  	overflow *list.List
  17  }
  18  
  19  // NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is the capacity of the output channel. When
  20  // the size of the queue is below this threshold, pushes do not incur the overhead of the less efficient overflow
  21  // structure.
  22  func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
  23  	return &ConcurrentQueue{
  24  		chanIn:   make(chan interface{}),
  25  		chanOut:  make(chan interface{}, bufferSize),
  26  		quit:     qu.T(),
  27  		overflow: list.New(),
  28  	}
  29  }
  30  
  31  // ChanIn returns a channel that can be used to push new items into the queue.
  32  func (cq *ConcurrentQueue) ChanIn() chan<- interface{} {
  33  	return cq.chanIn
  34  }
  35  
  36  // ChanOut returns a channel that can be used to pop items from the queue.
  37  func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
  38  	return cq.chanOut
  39  }
  40  
  41  // Start begins a goroutine that manages moving items from the in channel to the out channel. The queue tries to move
  42  // items directly to the out channel minimize overhead, but if the out channel is full it pushes items to an overflow
  43  // queue. This must be called before using the queue.
  44  func (cq *ConcurrentQueue) Start() {
  45  	go func() {
  46  		for {
  47  			nextElement := cq.overflow.Front()
  48  			if nextElement == nil {
  49  				// The overflow queue is empty, so incoming items can be pushed directly to the output channel. However,
  50  				// if output channel is full, we'll push to the overflow list instead.
  51  				select {
  52  				case item := <-cq.chanIn:
  53  					select {
  54  					case cq.chanOut <- item:
  55  					case <-cq.quit.Wait():
  56  						return
  57  					default:
  58  						cq.overflow.PushBack(item)
  59  					}
  60  				case <-cq.quit.Wait():
  61  					return
  62  				}
  63  			} else {
  64  				// The overflow queue is not empty, so any new items get pushed to the back to preserve order.
  65  				select {
  66  				case item := <-cq.chanIn:
  67  					cq.overflow.PushBack(item)
  68  				case cq.chanOut <- nextElement.Value:
  69  					cq.overflow.Remove(nextElement)
  70  				case <-cq.quit.Wait():
  71  					return
  72  				}
  73  			}
  74  		}
  75  	}()
  76  }
  77  
  78  // Stop ends the goroutine that moves items from the in channel to the out channel.
  79  func (cq *ConcurrentQueue) Stop() {
  80  	cq.quit.Q()
  81  }
  82