counter.go raw

   1  package xsync
   2  
   3  import (
   4  	"sync"
   5  	"sync/atomic"
   6  )
   7  
   8  // pool for P tokens
   9  var ptokenPool sync.Pool
  10  
  11  // a P token is used to point at the current OS thread (P)
  12  // on which the goroutine is run; exact identity of the thread,
  13  // as well as P migration tolerance, is not important since
  14  // it's used to as a best effort mechanism for assigning
  15  // concurrent operations (goroutines) to different stripes of
  16  // the counter
  17  type ptoken struct {
  18  	idx uint32
  19  	//lint:ignore U1000 prevents false sharing
  20  	pad [cacheLineSize - 4]byte
  21  }
  22  
  23  // A Counter is a striped int64 counter.
  24  //
  25  // Should be preferred over a single atomically updated int64
  26  // counter in high contention scenarios.
  27  //
  28  // A Counter must not be copied after first use.
  29  type Counter struct {
  30  	stripes []cstripe
  31  	mask    uint32
  32  }
  33  
  34  type cstripe struct {
  35  	c int64
  36  	//lint:ignore U1000 prevents false sharing
  37  	pad [cacheLineSize - 8]byte
  38  }
  39  
  40  // NewCounter creates a new Counter instance.
  41  func NewCounter() *Counter {
  42  	nstripes := nextPowOf2(parallelism())
  43  	c := Counter{
  44  		stripes: make([]cstripe, nstripes),
  45  		mask:    nstripes - 1,
  46  	}
  47  	return &c
  48  }
  49  
  50  // Inc increments the counter by 1.
  51  func (c *Counter) Inc() {
  52  	c.Add(1)
  53  }
  54  
  55  // Dec decrements the counter by 1.
  56  func (c *Counter) Dec() {
  57  	c.Add(-1)
  58  }
  59  
  60  // Add adds the delta to the counter.
  61  func (c *Counter) Add(delta int64) {
  62  	t, ok := ptokenPool.Get().(*ptoken)
  63  	if !ok {
  64  		t = new(ptoken)
  65  		t.idx = runtime_fastrand()
  66  	}
  67  	for {
  68  		stripe := &c.stripes[t.idx&c.mask]
  69  		cnt := atomic.LoadInt64(&stripe.c)
  70  		if atomic.CompareAndSwapInt64(&stripe.c, cnt, cnt+delta) {
  71  			break
  72  		}
  73  		// Give a try with another randomly selected stripe.
  74  		t.idx = runtime_fastrand()
  75  	}
  76  	ptokenPool.Put(t)
  77  }
  78  
  79  // Value returns the current counter value.
  80  // The returned value may not include all of the latest operations in
  81  // presence of concurrent modifications of the counter.
  82  func (c *Counter) Value() int64 {
  83  	v := int64(0)
  84  	for i := 0; i < len(c.stripes); i++ {
  85  		stripe := &c.stripes[i]
  86  		v += atomic.LoadInt64(&stripe.c)
  87  	}
  88  	return v
  89  }
  90  
  91  // Reset resets the counter to zero.
  92  // This method should only be used when it is known that there are
  93  // no concurrent modifications of the counter.
  94  func (c *Counter) Reset() {
  95  	for i := 0; i < len(c.stripes); i++ {
  96  		stripe := &c.stripes[i]
  97  		atomic.StoreInt64(&stripe.c, 0)
  98  	}
  99  }
 100