spscqueue.go raw

   1  package xsync
   2  
   3  import (
   4  	"sync/atomic"
   5  )
   6  
   7  // A SPSCQueue is a bounded single-producer single-consumer concurrent
   8  // queue. This means that not more than a single goroutine must be
   9  // publishing items to the queue while not more than a single goroutine
  10  // must be consuming those items.
  11  //
  12  // SPSCQueue instances must be created with NewSPSCQueue function.
  13  // A SPSCQueue must not be copied after first use.
  14  //
  15  // Based on the data structure from the following article:
  16  // https://rigtorp.se/ringbuffer/
  17  type SPSCQueue struct {
  18  	cap  uint64
  19  	pidx uint64
  20  	//lint:ignore U1000 prevents false sharing
  21  	pad0       [cacheLineSize - 8]byte
  22  	pcachedIdx uint64
  23  	//lint:ignore U1000 prevents false sharing
  24  	pad1 [cacheLineSize - 8]byte
  25  	cidx uint64
  26  	//lint:ignore U1000 prevents false sharing
  27  	pad2       [cacheLineSize - 8]byte
  28  	ccachedIdx uint64
  29  	//lint:ignore U1000 prevents false sharing
  30  	pad3  [cacheLineSize - 8]byte
  31  	items []interface{}
  32  }
  33  
  34  // NewSPSCQueue creates a new SPSCQueue instance with the given
  35  // capacity.
  36  func NewSPSCQueue(capacity int) *SPSCQueue {
  37  	if capacity < 1 {
  38  		panic("capacity must be positive number")
  39  	}
  40  	return &SPSCQueue{
  41  		cap:   uint64(capacity + 1),
  42  		items: make([]interface{}, capacity+1),
  43  	}
  44  }
  45  
  46  // TryEnqueue inserts the given item into the queue. Does not block
  47  // and returns immediately. The result indicates that the queue isn't
  48  // full and the item was inserted.
  49  func (q *SPSCQueue) TryEnqueue(item interface{}) bool {
  50  	// relaxed memory order would be enough here
  51  	idx := atomic.LoadUint64(&q.pidx)
  52  	nextIdx := idx + 1
  53  	if nextIdx == q.cap {
  54  		nextIdx = 0
  55  	}
  56  	cachedIdx := q.ccachedIdx
  57  	if nextIdx == cachedIdx {
  58  		cachedIdx = atomic.LoadUint64(&q.cidx)
  59  		q.ccachedIdx = cachedIdx
  60  		if nextIdx == cachedIdx {
  61  			return false
  62  		}
  63  	}
  64  	q.items[idx] = item
  65  	atomic.StoreUint64(&q.pidx, nextIdx)
  66  	return true
  67  }
  68  
  69  // TryDequeue retrieves and removes the item from the head of the
  70  // queue. Does not block and returns immediately. The ok result
  71  // indicates that the queue isn't empty and an item was retrieved.
  72  func (q *SPSCQueue) TryDequeue() (item interface{}, ok bool) {
  73  	// relaxed memory order would be enough here
  74  	idx := atomic.LoadUint64(&q.cidx)
  75  	cachedIdx := q.pcachedIdx
  76  	if idx == cachedIdx {
  77  		cachedIdx = atomic.LoadUint64(&q.pidx)
  78  		q.pcachedIdx = cachedIdx
  79  		if idx == cachedIdx {
  80  			return
  81  		}
  82  	}
  83  	item = q.items[idx]
  84  	q.items[idx] = nil
  85  	ok = true
  86  	nextIdx := idx + 1
  87  	if nextIdx == q.cap {
  88  		nextIdx = 0
  89  	}
  90  	atomic.StoreUint64(&q.cidx, nextIdx)
  91  	return
  92  }
  93