spscqueueof.go raw

   1  //go:build go1.19
   2  // +build go1.19
   3  
   4  package xsync
   5  
   6  import (
   7  	"sync/atomic"
   8  )
   9  
  10  // A SPSCQueueOf is a bounded single-producer single-consumer concurrent
  11  // queue. This means that not more than a single goroutine must be
  12  // publishing items to the queue while not more than a single goroutine
  13  // must be consuming those items.
  14  //
  15  // SPSCQueueOf instances must be created with NewSPSCQueueOf function.
  16  // A SPSCQueueOf must not be copied after first use.
  17  //
  18  // Based on the data structure from the following article:
  19  // https://rigtorp.se/ringbuffer/
  20  type SPSCQueueOf[I any] struct {
  21  	cap  uint64
  22  	pidx uint64
  23  	//lint:ignore U1000 prevents false sharing
  24  	pad0       [cacheLineSize - 8]byte
  25  	pcachedIdx uint64
  26  	//lint:ignore U1000 prevents false sharing
  27  	pad1 [cacheLineSize - 8]byte
  28  	cidx uint64
  29  	//lint:ignore U1000 prevents false sharing
  30  	pad2       [cacheLineSize - 8]byte
  31  	ccachedIdx uint64
  32  	//lint:ignore U1000 prevents false sharing
  33  	pad3  [cacheLineSize - 8]byte
  34  	items []I
  35  }
  36  
  37  // NewSPSCQueueOf creates a new SPSCQueueOf instance with the given
  38  // capacity.
  39  func NewSPSCQueueOf[I any](capacity int) *SPSCQueueOf[I] {
  40  	if capacity < 1 {
  41  		panic("capacity must be positive number")
  42  	}
  43  	return &SPSCQueueOf[I]{
  44  		cap:   uint64(capacity + 1),
  45  		items: make([]I, capacity+1),
  46  	}
  47  }
  48  
  49  // TryEnqueue inserts the given item into the queue. Does not block
  50  // and returns immediately. The result indicates that the queue isn't
  51  // full and the item was inserted.
  52  func (q *SPSCQueueOf[I]) TryEnqueue(item I) bool {
  53  	// relaxed memory order would be enough here
  54  	idx := atomic.LoadUint64(&q.pidx)
  55  	next_idx := idx + 1
  56  	if next_idx == q.cap {
  57  		next_idx = 0
  58  	}
  59  	cached_idx := q.ccachedIdx
  60  	if next_idx == cached_idx {
  61  		cached_idx = atomic.LoadUint64(&q.cidx)
  62  		q.ccachedIdx = cached_idx
  63  		if next_idx == cached_idx {
  64  			return false
  65  		}
  66  	}
  67  	q.items[idx] = item
  68  	atomic.StoreUint64(&q.pidx, next_idx)
  69  	return true
  70  }
  71  
  72  // TryDequeue retrieves and removes the item from the head of the
  73  // queue. Does not block and returns immediately. The ok result
  74  // indicates that the queue isn't empty and an item was retrieved.
  75  func (q *SPSCQueueOf[I]) TryDequeue() (item I, ok bool) {
  76  	// relaxed memory order would be enough here
  77  	idx := atomic.LoadUint64(&q.cidx)
  78  	cached_idx := q.pcachedIdx
  79  	if idx == cached_idx {
  80  		cached_idx = atomic.LoadUint64(&q.pidx)
  81  		q.pcachedIdx = cached_idx
  82  		if idx == cached_idx {
  83  			return
  84  		}
  85  	}
  86  	var zeroI I
  87  	item = q.items[idx]
  88  	q.items[idx] = zeroI
  89  	ok = true
  90  	next_idx := idx + 1
  91  	if next_idx == q.cap {
  92  		next_idx = 0
  93  	}
  94  	atomic.StoreUint64(&q.cidx, next_idx)
  95  	return
  96  }
  97