mpmcqueue.go raw

   1  package xsync
   2  
   3  import (
   4  	"runtime"
   5  	"sync/atomic"
   6  	"unsafe"
   7  )
   8  
   9  // A MPMCQueue is a bounded multi-producer multi-consumer concurrent
  10  // queue.
  11  //
  12  // MPMCQueue instances must be created with NewMPMCQueue function.
  13  // A MPMCQueue must not be copied after first use.
  14  //
  15  // Based on the data structure from the following C++ library:
  16  // https://github.com/rigtorp/MPMCQueue
  17  type MPMCQueue struct {
  18  	cap  uint64
  19  	head uint64
  20  	//lint:ignore U1000 prevents false sharing
  21  	hpad [cacheLineSize - 8]byte
  22  	tail uint64
  23  	//lint:ignore U1000 prevents false sharing
  24  	tpad  [cacheLineSize - 8]byte
  25  	slots []slotPadded
  26  }
  27  
  28  type slotPadded struct {
  29  	slot
  30  	//lint:ignore U1000 prevents false sharing
  31  	pad [cacheLineSize - unsafe.Sizeof(slot{})]byte
  32  }
  33  
  34  type slot struct {
  35  	turn uint64
  36  	item interface{}
  37  }
  38  
  39  // NewMPMCQueue creates a new MPMCQueue instance with the given
  40  // capacity.
  41  func NewMPMCQueue(capacity int) *MPMCQueue {
  42  	if capacity < 1 {
  43  		panic("capacity must be positive number")
  44  	}
  45  	return &MPMCQueue{
  46  		cap:   uint64(capacity),
  47  		slots: make([]slotPadded, capacity),
  48  	}
  49  }
  50  
  51  // Enqueue inserts the given item into the queue.
  52  // Blocks, if the queue is full.
  53  //
  54  // Deprecated: use TryEnqueue in combination with runtime.Gosched().
  55  func (q *MPMCQueue) Enqueue(item interface{}) {
  56  	head := atomic.AddUint64(&q.head, 1) - 1
  57  	slot := &q.slots[q.idx(head)]
  58  	turn := q.turn(head) * 2
  59  	for atomic.LoadUint64(&slot.turn) != turn {
  60  		runtime.Gosched()
  61  	}
  62  	slot.item = item
  63  	atomic.StoreUint64(&slot.turn, turn+1)
  64  }
  65  
  66  // Dequeue retrieves and removes the item from the head of the queue.
  67  // Blocks, if the queue is empty.
  68  //
  69  // Deprecated: use TryDequeue in combination with runtime.Gosched().
  70  func (q *MPMCQueue) Dequeue() interface{} {
  71  	tail := atomic.AddUint64(&q.tail, 1) - 1
  72  	slot := &q.slots[q.idx(tail)]
  73  	turn := q.turn(tail)*2 + 1
  74  	for atomic.LoadUint64(&slot.turn) != turn {
  75  		runtime.Gosched()
  76  	}
  77  	item := slot.item
  78  	slot.item = nil
  79  	atomic.StoreUint64(&slot.turn, turn+1)
  80  	return item
  81  }
  82  
  83  // TryEnqueue inserts the given item into the queue. Does not block
  84  // and returns immediately. The result indicates that the queue isn't
  85  // full and the item was inserted.
  86  func (q *MPMCQueue) TryEnqueue(item interface{}) bool {
  87  	head := atomic.LoadUint64(&q.head)
  88  	slot := &q.slots[q.idx(head)]
  89  	turn := q.turn(head) * 2
  90  	if atomic.LoadUint64(&slot.turn) == turn {
  91  		if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
  92  			slot.item = item
  93  			atomic.StoreUint64(&slot.turn, turn+1)
  94  			return true
  95  		}
  96  	}
  97  	return false
  98  }
  99  
 100  // TryDequeue retrieves and removes the item from the head of the
 101  // queue. Does not block and returns immediately. The ok result
 102  // indicates that the queue isn't empty and an item was retrieved.
 103  func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) {
 104  	tail := atomic.LoadUint64(&q.tail)
 105  	slot := &q.slots[q.idx(tail)]
 106  	turn := q.turn(tail)*2 + 1
 107  	if atomic.LoadUint64(&slot.turn) == turn {
 108  		if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
 109  			item = slot.item
 110  			ok = true
 111  			slot.item = nil
 112  			atomic.StoreUint64(&slot.turn, turn+1)
 113  			return
 114  		}
 115  	}
 116  	return
 117  }
 118  
 119  func (q *MPMCQueue) idx(i uint64) uint64 {
 120  	return i % q.cap
 121  }
 122  
 123  func (q *MPMCQueue) turn(i uint64) uint64 {
 124  	return i / q.cap
 125  }
 126