1 package xsync
2 3 import (
4 "sync"
5 "sync/atomic"
6 )
7 8 // pool for P tokens
9 var ptokenPool sync.Pool
10 11 // a P token is used to point at the current OS thread (P)
12 // on which the goroutine is run; exact identity of the thread,
13 // as well as P migration tolerance, is not important since
14 // it's used to as a best effort mechanism for assigning
15 // concurrent operations (goroutines) to different stripes of
16 // the counter
17 type ptoken struct {
18 idx uint32
19 //lint:ignore U1000 prevents false sharing
20 pad [cacheLineSize - 4]byte
21 }
22 23 // A Counter is a striped int64 counter.
24 //
25 // Should be preferred over a single atomically updated int64
26 // counter in high contention scenarios.
27 //
28 // A Counter must not be copied after first use.
29 type Counter struct {
30 stripes []cstripe
31 mask uint32
32 }
33 34 type cstripe struct {
35 c int64
36 //lint:ignore U1000 prevents false sharing
37 pad [cacheLineSize - 8]byte
38 }
39 40 // NewCounter creates a new Counter instance.
41 func NewCounter() *Counter {
42 nstripes := nextPowOf2(parallelism())
43 c := Counter{
44 stripes: make([]cstripe, nstripes),
45 mask: nstripes - 1,
46 }
47 return &c
48 }
49 50 // Inc increments the counter by 1.
51 func (c *Counter) Inc() {
52 c.Add(1)
53 }
54 55 // Dec decrements the counter by 1.
56 func (c *Counter) Dec() {
57 c.Add(-1)
58 }
59 60 // Add adds the delta to the counter.
61 func (c *Counter) Add(delta int64) {
62 t, ok := ptokenPool.Get().(*ptoken)
63 if !ok {
64 t = new(ptoken)
65 t.idx = runtime_fastrand()
66 }
67 for {
68 stripe := &c.stripes[t.idx&c.mask]
69 cnt := atomic.LoadInt64(&stripe.c)
70 if atomic.CompareAndSwapInt64(&stripe.c, cnt, cnt+delta) {
71 break
72 }
73 // Give a try with another randomly selected stripe.
74 t.idx = runtime_fastrand()
75 }
76 ptokenPool.Put(t)
77 }
78 79 // Value returns the current counter value.
80 // The returned value may not include all of the latest operations in
81 // presence of concurrent modifications of the counter.
82 func (c *Counter) Value() int64 {
83 v := int64(0)
84 for i := 0; i < len(c.stripes); i++ {
85 stripe := &c.stripes[i]
86 v += atomic.LoadInt64(&stripe.c)
87 }
88 return v
89 }
90 91 // Reset resets the counter to zero.
92 // This method should only be used when it is known that there are
93 // no concurrent modifications of the counter.
94 func (c *Counter) Reset() {
95 for i := 0; i < len(c.stripes); i++ {
96 stripe := &c.stripes[i]
97 atomic.StoreInt64(&stripe.c, 0)
98 }
99 }
100