mpmcqueueof.go raw

   1  //go:build go1.19
   2  // +build go1.19
   3  
   4  package xsync
   5  
   6  import (
   7  	"runtime"
   8  	"sync/atomic"
   9  	"unsafe"
  10  )
  11  
  12  // A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent
  13  // queue. It's a generic version of MPMCQueue.
  14  //
  15  // MPMCQueueOf instances must be created with NewMPMCQueueOf function.
  16  // A MPMCQueueOf must not be copied after first use.
  17  //
  18  // Based on the data structure from the following C++ library:
  19  // https://github.com/rigtorp/MPMCQueue
  20  type MPMCQueueOf[I any] struct {
  21  	cap  uint64
  22  	head uint64
  23  	//lint:ignore U1000 prevents false sharing
  24  	hpad [cacheLineSize - 8]byte
  25  	tail uint64
  26  	//lint:ignore U1000 prevents false sharing
  27  	tpad  [cacheLineSize - 8]byte
  28  	slots []slotOfPadded[I]
  29  }
  30  
  31  type slotOfPadded[I any] struct {
  32  	slotOf[I]
  33  	// Unfortunately, proper padding like the below one:
  34  	//
  35  	// pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte
  36  	//
  37  	// won't compile, so here we add a best-effort padding for items up to
  38  	// 56 bytes size.
  39  	//lint:ignore U1000 prevents false sharing
  40  	pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
  41  }
  42  
  43  type slotOf[I any] struct {
  44  	// atomic.Uint64 is used here to get proper 8 byte alignment on
  45  	// 32-bit archs.
  46  	turn atomic.Uint64
  47  	item I
  48  }
  49  
  50  // NewMPMCQueueOf creates a new MPMCQueueOf instance with the given
  51  // capacity.
  52  func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] {
  53  	if capacity < 1 {
  54  		panic("capacity must be positive number")
  55  	}
  56  	return &MPMCQueueOf[I]{
  57  		cap:   uint64(capacity),
  58  		slots: make([]slotOfPadded[I], capacity),
  59  	}
  60  }
  61  
  62  // Enqueue inserts the given item into the queue.
  63  // Blocks, if the queue is full.
  64  //
  65  // Deprecated: use TryEnqueue in combination with runtime.Gosched().
  66  func (q *MPMCQueueOf[I]) Enqueue(item I) {
  67  	head := atomic.AddUint64(&q.head, 1) - 1
  68  	slot := &q.slots[q.idx(head)]
  69  	turn := q.turn(head) * 2
  70  	for slot.turn.Load() != turn {
  71  		runtime.Gosched()
  72  	}
  73  	slot.item = item
  74  	slot.turn.Store(turn + 1)
  75  }
  76  
  77  // Dequeue retrieves and removes the item from the head of the queue.
  78  // Blocks, if the queue is empty.
  79  //
  80  // Deprecated: use TryDequeue in combination with runtime.Gosched().
  81  func (q *MPMCQueueOf[I]) Dequeue() I {
  82  	var zeroI I
  83  	tail := atomic.AddUint64(&q.tail, 1) - 1
  84  	slot := &q.slots[q.idx(tail)]
  85  	turn := q.turn(tail)*2 + 1
  86  	for slot.turn.Load() != turn {
  87  		runtime.Gosched()
  88  	}
  89  	item := slot.item
  90  	slot.item = zeroI
  91  	slot.turn.Store(turn + 1)
  92  	return item
  93  }
  94  
  95  // TryEnqueue inserts the given item into the queue. Does not block
  96  // and returns immediately. The result indicates that the queue isn't
  97  // full and the item was inserted.
  98  func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool {
  99  	head := atomic.LoadUint64(&q.head)
 100  	slot := &q.slots[q.idx(head)]
 101  	turn := q.turn(head) * 2
 102  	if slot.turn.Load() == turn {
 103  		if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
 104  			slot.item = item
 105  			slot.turn.Store(turn + 1)
 106  			return true
 107  		}
 108  	}
 109  	return false
 110  }
 111  
 112  // TryDequeue retrieves and removes the item from the head of the
 113  // queue. Does not block and returns immediately. The ok result
 114  // indicates that the queue isn't empty and an item was retrieved.
 115  func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) {
 116  	tail := atomic.LoadUint64(&q.tail)
 117  	slot := &q.slots[q.idx(tail)]
 118  	turn := q.turn(tail)*2 + 1
 119  	if slot.turn.Load() == turn {
 120  		if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
 121  			var zeroI I
 122  			item = slot.item
 123  			ok = true
 124  			slot.item = zeroI
 125  			slot.turn.Store(turn + 1)
 126  			return
 127  		}
 128  	}
 129  	return
 130  }
 131  
 132  func (q *MPMCQueueOf[I]) idx(i uint64) uint64 {
 133  	return i % q.cap
 134  }
 135  
 136  func (q *MPMCQueueOf[I]) turn(i uint64) uint64 {
 137  	return i / q.cap
 138  }
 139