1 package xsync
2 3 import (
4 "runtime"
5 "sync"
6 "sync/atomic"
7 "time"
8 )
9 10 // slow-down guard
11 const nslowdown = 7
12 13 // pool for reader tokens
14 var rtokenPool sync.Pool
15 16 // RToken is a reader lock token.
17 type RToken struct {
18 slot uint32
19 //lint:ignore U1000 prevents false sharing
20 pad [cacheLineSize - 4]byte
21 }
22 23 // A RBMutex is a reader biased reader/writer mutual exclusion lock.
24 // The lock can be held by an many readers or a single writer.
25 // The zero value for a RBMutex is an unlocked mutex.
26 //
27 // A RBMutex must not be copied after first use.
28 //
29 // RBMutex is based on a modified version of BRAVO
30 // (Biased Locking for Reader-Writer Locks) algorithm:
31 // https://arxiv.org/pdf/1810.01553.pdf
32 //
33 // RBMutex is a specialized mutex for scenarios, such as caches,
34 // where the vast majority of locks are acquired by readers and write
35 // lock acquire attempts are infrequent. In such scenarios, RBMutex
36 // performs better than sync.RWMutex on large multicore machines.
37 //
38 // RBMutex extends sync.RWMutex internally and uses it as the "reader
39 // bias disabled" fallback, so the same semantics apply. The only
40 // noticeable difference is in reader tokens returned from the
41 // RLock/RUnlock methods.
42 type RBMutex struct {
43 rslots []rslot
44 rmask uint32
45 rbias int32
46 inhibitUntil time.Time
47 rw sync.RWMutex
48 }
49 50 type rslot struct {
51 mu int32
52 //lint:ignore U1000 prevents false sharing
53 pad [cacheLineSize - 4]byte
54 }
55 56 // NewRBMutex creates a new RBMutex instance.
57 func NewRBMutex() *RBMutex {
58 nslots := nextPowOf2(parallelism())
59 mu := RBMutex{
60 rslots: make([]rslot, nslots),
61 rmask: nslots - 1,
62 rbias: 1,
63 }
64 return &mu
65 }
66 67 // TryRLock tries to lock m for reading without blocking.
68 // When TryRLock succeeds, it returns true and a reader token.
69 // In case of a failure, a false is returned.
70 func (mu *RBMutex) TryRLock() (bool, *RToken) {
71 if t := mu.fastRlock(); t != nil {
72 return true, t
73 }
74 // Optimistic slow path.
75 if mu.rw.TryRLock() {
76 if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
77 atomic.StoreInt32(&mu.rbias, 1)
78 }
79 return true, nil
80 }
81 return false, nil
82 }
83 84 // RLock locks m for reading and returns a reader token. The
85 // token must be used in the later RUnlock call.
86 //
87 // Should not be used for recursive read locking; a blocked Lock
88 // call excludes new readers from acquiring the lock.
89 func (mu *RBMutex) RLock() *RToken {
90 if t := mu.fastRlock(); t != nil {
91 return t
92 }
93 // Slow path.
94 mu.rw.RLock()
95 if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
96 atomic.StoreInt32(&mu.rbias, 1)
97 }
98 return nil
99 }
100 101 func (mu *RBMutex) fastRlock() *RToken {
102 if atomic.LoadInt32(&mu.rbias) == 1 {
103 t, ok := rtokenPool.Get().(*RToken)
104 if !ok {
105 t = new(RToken)
106 t.slot = runtime_fastrand()
107 }
108 // Try all available slots to distribute reader threads to slots.
109 for i := 0; i < len(mu.rslots); i++ {
110 slot := t.slot + uint32(i)
111 rslot := &mu.rslots[slot&mu.rmask]
112 rslotmu := atomic.LoadInt32(&rslot.mu)
113 if atomic.CompareAndSwapInt32(&rslot.mu, rslotmu, rslotmu+1) {
114 if atomic.LoadInt32(&mu.rbias) == 1 {
115 // Hot path succeeded.
116 t.slot = slot
117 return t
118 }
119 // The mutex is no longer reader biased. Roll back.
120 atomic.AddInt32(&rslot.mu, -1)
121 rtokenPool.Put(t)
122 return nil
123 }
124 // Contention detected. Give a try with the next slot.
125 }
126 }
127 return nil
128 }
129 130 // RUnlock undoes a single RLock call. A reader token obtained from
131 // the RLock call must be provided. RUnlock does not affect other
132 // simultaneous readers. A panic is raised if m is not locked for
133 // reading on entry to RUnlock.
134 func (mu *RBMutex) RUnlock(t *RToken) {
135 if t == nil {
136 mu.rw.RUnlock()
137 return
138 }
139 if atomic.AddInt32(&mu.rslots[t.slot&mu.rmask].mu, -1) < 0 {
140 panic("invalid reader state detected")
141 }
142 rtokenPool.Put(t)
143 }
144 145 // TryLock tries to lock m for writing without blocking.
146 func (mu *RBMutex) TryLock() bool {
147 if mu.rw.TryLock() {
148 if atomic.LoadInt32(&mu.rbias) == 1 {
149 atomic.StoreInt32(&mu.rbias, 0)
150 for i := 0; i < len(mu.rslots); i++ {
151 if atomic.LoadInt32(&mu.rslots[i].mu) > 0 {
152 // There is a reader. Roll back.
153 atomic.StoreInt32(&mu.rbias, 1)
154 mu.rw.Unlock()
155 return false
156 }
157 }
158 }
159 return true
160 }
161 return false
162 }
163 164 // Lock locks m for writing. If the lock is already locked for
165 // reading or writing, Lock blocks until the lock is available.
166 func (mu *RBMutex) Lock() {
167 mu.rw.Lock()
168 if atomic.LoadInt32(&mu.rbias) == 1 {
169 atomic.StoreInt32(&mu.rbias, 0)
170 start := time.Now()
171 for i := 0; i < len(mu.rslots); i++ {
172 for atomic.LoadInt32(&mu.rslots[i].mu) > 0 {
173 runtime.Gosched()
174 }
175 }
176 mu.inhibitUntil = time.Now().Add(time.Since(start) * nslowdown)
177 }
178 }
179 180 // Unlock unlocks m for writing. A panic is raised if m is not locked
181 // for writing on entry to Unlock.
182 //
183 // As with RWMutex, a locked RBMutex is not associated with a
184 // particular goroutine. One goroutine may RLock (Lock) a RBMutex and
185 // then arrange for another goroutine to RUnlock (Unlock) it.
186 func (mu *RBMutex) Unlock() {
187 mu.rw.Unlock()
188 }
189