rbmutex.go raw

   1  package xsync
   2  
   3  import (
   4  	"runtime"
   5  	"sync"
   6  	"sync/atomic"
   7  	"time"
   8  )
   9  
  10  // slow-down guard
  11  const nslowdown = 7
  12  
  13  // pool for reader tokens
  14  var rtokenPool sync.Pool
  15  
  16  // RToken is a reader lock token.
  17  type RToken struct {
  18  	slot uint32
  19  	//lint:ignore U1000 prevents false sharing
  20  	pad [cacheLineSize - 4]byte
  21  }
  22  
  23  // A RBMutex is a reader biased reader/writer mutual exclusion lock.
  24  // The lock can be held by an many readers or a single writer.
  25  // The zero value for a RBMutex is an unlocked mutex.
  26  //
  27  // A RBMutex must not be copied after first use.
  28  //
  29  // RBMutex is based on a modified version of BRAVO
  30  // (Biased Locking for Reader-Writer Locks) algorithm:
  31  // https://arxiv.org/pdf/1810.01553.pdf
  32  //
  33  // RBMutex is a specialized mutex for scenarios, such as caches,
  34  // where the vast majority of locks are acquired by readers and write
  35  // lock acquire attempts are infrequent. In such scenarios, RBMutex
  36  // performs better than sync.RWMutex on large multicore machines.
  37  //
  38  // RBMutex extends sync.RWMutex internally and uses it as the "reader
  39  // bias disabled" fallback, so the same semantics apply. The only
  40  // noticeable difference is in reader tokens returned from the
  41  // RLock/RUnlock methods.
  42  type RBMutex struct {
  43  	rslots       []rslot
  44  	rmask        uint32
  45  	rbias        int32
  46  	inhibitUntil time.Time
  47  	rw           sync.RWMutex
  48  }
  49  
  50  type rslot struct {
  51  	mu int32
  52  	//lint:ignore U1000 prevents false sharing
  53  	pad [cacheLineSize - 4]byte
  54  }
  55  
  56  // NewRBMutex creates a new RBMutex instance.
  57  func NewRBMutex() *RBMutex {
  58  	nslots := nextPowOf2(parallelism())
  59  	mu := RBMutex{
  60  		rslots: make([]rslot, nslots),
  61  		rmask:  nslots - 1,
  62  		rbias:  1,
  63  	}
  64  	return &mu
  65  }
  66  
  67  // TryRLock tries to lock m for reading without blocking.
  68  // When TryRLock succeeds, it returns true and a reader token.
  69  // In case of a failure, a false is returned.
  70  func (mu *RBMutex) TryRLock() (bool, *RToken) {
  71  	if t := mu.fastRlock(); t != nil {
  72  		return true, t
  73  	}
  74  	// Optimistic slow path.
  75  	if mu.rw.TryRLock() {
  76  		if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
  77  			atomic.StoreInt32(&mu.rbias, 1)
  78  		}
  79  		return true, nil
  80  	}
  81  	return false, nil
  82  }
  83  
  84  // RLock locks m for reading and returns a reader token. The
  85  // token must be used in the later RUnlock call.
  86  //
  87  // Should not be used for recursive read locking; a blocked Lock
  88  // call excludes new readers from acquiring the lock.
  89  func (mu *RBMutex) RLock() *RToken {
  90  	if t := mu.fastRlock(); t != nil {
  91  		return t
  92  	}
  93  	// Slow path.
  94  	mu.rw.RLock()
  95  	if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
  96  		atomic.StoreInt32(&mu.rbias, 1)
  97  	}
  98  	return nil
  99  }
 100  
 101  func (mu *RBMutex) fastRlock() *RToken {
 102  	if atomic.LoadInt32(&mu.rbias) == 1 {
 103  		t, ok := rtokenPool.Get().(*RToken)
 104  		if !ok {
 105  			t = new(RToken)
 106  			t.slot = runtime_fastrand()
 107  		}
 108  		// Try all available slots to distribute reader threads to slots.
 109  		for i := 0; i < len(mu.rslots); i++ {
 110  			slot := t.slot + uint32(i)
 111  			rslot := &mu.rslots[slot&mu.rmask]
 112  			rslotmu := atomic.LoadInt32(&rslot.mu)
 113  			if atomic.CompareAndSwapInt32(&rslot.mu, rslotmu, rslotmu+1) {
 114  				if atomic.LoadInt32(&mu.rbias) == 1 {
 115  					// Hot path succeeded.
 116  					t.slot = slot
 117  					return t
 118  				}
 119  				// The mutex is no longer reader biased. Roll back.
 120  				atomic.AddInt32(&rslot.mu, -1)
 121  				rtokenPool.Put(t)
 122  				return nil
 123  			}
 124  			// Contention detected. Give a try with the next slot.
 125  		}
 126  	}
 127  	return nil
 128  }
 129  
 130  // RUnlock undoes a single RLock call. A reader token obtained from
 131  // the RLock call must be provided. RUnlock does not affect other
 132  // simultaneous readers. A panic is raised if m is not locked for
 133  // reading on entry to RUnlock.
 134  func (mu *RBMutex) RUnlock(t *RToken) {
 135  	if t == nil {
 136  		mu.rw.RUnlock()
 137  		return
 138  	}
 139  	if atomic.AddInt32(&mu.rslots[t.slot&mu.rmask].mu, -1) < 0 {
 140  		panic("invalid reader state detected")
 141  	}
 142  	rtokenPool.Put(t)
 143  }
 144  
 145  // TryLock tries to lock m for writing without blocking.
 146  func (mu *RBMutex) TryLock() bool {
 147  	if mu.rw.TryLock() {
 148  		if atomic.LoadInt32(&mu.rbias) == 1 {
 149  			atomic.StoreInt32(&mu.rbias, 0)
 150  			for i := 0; i < len(mu.rslots); i++ {
 151  				if atomic.LoadInt32(&mu.rslots[i].mu) > 0 {
 152  					// There is a reader. Roll back.
 153  					atomic.StoreInt32(&mu.rbias, 1)
 154  					mu.rw.Unlock()
 155  					return false
 156  				}
 157  			}
 158  		}
 159  		return true
 160  	}
 161  	return false
 162  }
 163  
 164  // Lock locks m for writing. If the lock is already locked for
 165  // reading or writing, Lock blocks until the lock is available.
 166  func (mu *RBMutex) Lock() {
 167  	mu.rw.Lock()
 168  	if atomic.LoadInt32(&mu.rbias) == 1 {
 169  		atomic.StoreInt32(&mu.rbias, 0)
 170  		start := time.Now()
 171  		for i := 0; i < len(mu.rslots); i++ {
 172  			for atomic.LoadInt32(&mu.rslots[i].mu) > 0 {
 173  				runtime.Gosched()
 174  			}
 175  		}
 176  		mu.inhibitUntil = time.Now().Add(time.Since(start) * nslowdown)
 177  	}
 178  }
 179  
 180  // Unlock unlocks m for writing. A panic is raised if m is not locked
 181  // for writing on entry to Unlock.
 182  //
 183  // As with RWMutex, a locked RBMutex is not associated with a
 184  // particular goroutine. One goroutine may RLock (Lock) a RBMutex and
 185  // then arrange for another goroutine to RUnlock (Unlock) it.
 186  func (mu *RBMutex) Unlock() {
 187  	mu.rw.Unlock()
 188  }
 189