fd_mutex.mx raw

   1  // Copyright 2013 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package poll
   6  
   7  import "sync/atomic"
   8  
   9  // fdMutex is a specialized synchronization primitive that manages
  10  // lifetime of an fd and serializes access to Read, Write and Close
  11  // methods on FD.
  12  type fdMutex struct {
  13  	state uint64
  14  	rsema uint32
  15  	wsema uint32
  16  }
  17  
  18  // fdMutex.state is organized as follows:
  19  // 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
  20  // 1 bit - lock for read operations.
  21  // 1 bit - lock for write operations.
  22  // 20 bits - total number of references (read+write+misc).
  23  // 20 bits - number of outstanding read waiters.
  24  // 20 bits - number of outstanding write waiters.
  25  const (
  26  	mutexClosed  = 1 << 0
  27  	mutexRLock   = 1 << 1
  28  	mutexWLock   = 1 << 2
  29  	mutexRef     = 1 << 3
  30  	mutexRefMask = (1<<20 - 1) << 3
  31  	mutexRWait   = 1 << 23
  32  	mutexRMask   = (1<<20 - 1) << 23
  33  	mutexWWait   = 1 << 43
  34  	mutexWMask   = (1<<20 - 1) << 43
  35  )
  36  
  37  const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
  38  
  39  // Read operations must do rwlock(true)/rwunlock(true).
  40  //
  41  // Write operations must do rwlock(false)/rwunlock(false).
  42  //
  43  // Misc operations must do incref/decref.
  44  // Misc operations include functions like setsockopt and setDeadline.
  45  // They need to use incref/decref to ensure that they operate on the
  46  // correct fd in presence of a concurrent close call (otherwise fd can
  47  // be closed under their feet).
  48  //
  49  // Close operations must do increfAndClose/decref.
  50  
  51  // incref adds a reference to mu.
  52  // It reports whether mu is available for reading or writing.
  53  func (mu *fdMutex) incref() bool {
  54  	for {
  55  		old := atomic.LoadUint64(&mu.state)
  56  		if old&mutexClosed != 0 {
  57  			return false
  58  		}
  59  		new := old + mutexRef
  60  		if new&mutexRefMask == 0 {
  61  			panic(overflowMsg)
  62  		}
  63  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
  64  			return true
  65  		}
  66  	}
  67  }
  68  
  69  // increfAndClose sets the state of mu to closed.
  70  // It returns false if the file was already closed.
  71  func (mu *fdMutex) increfAndClose() bool {
  72  	for {
  73  		old := atomic.LoadUint64(&mu.state)
  74  		if old&mutexClosed != 0 {
  75  			return false
  76  		}
  77  		// Mark as closed and acquire a reference.
  78  		new := (old | mutexClosed) + mutexRef
  79  		if new&mutexRefMask == 0 {
  80  			panic(overflowMsg)
  81  		}
  82  		// Remove all read and write waiters.
  83  		new &^= mutexRMask | mutexWMask
  84  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
  85  			// Wake all read and write waiters,
  86  			// they will observe closed flag after wakeup.
  87  			for old&mutexRMask != 0 {
  88  				old -= mutexRWait
  89  				runtime_Semrelease(&mu.rsema)
  90  			}
  91  			for old&mutexWMask != 0 {
  92  				old -= mutexWWait
  93  				runtime_Semrelease(&mu.wsema)
  94  			}
  95  			return true
  96  		}
  97  	}
  98  }
  99  
 100  // decref removes a reference from mu.
 101  // It reports whether there is no remaining reference.
 102  func (mu *fdMutex) decref() bool {
 103  	for {
 104  		old := atomic.LoadUint64(&mu.state)
 105  		if old&mutexRefMask == 0 {
 106  			panic("inconsistent poll.fdMutex")
 107  		}
 108  		new := old - mutexRef
 109  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
 110  			return new&(mutexClosed|mutexRefMask) == mutexClosed
 111  		}
 112  	}
 113  }
 114  
 115  // lock adds a reference to mu and locks mu.
 116  // It reports whether mu is available for reading or writing.
 117  func (mu *fdMutex) rwlock(read bool) bool {
 118  	var mutexBit, mutexWait, mutexMask uint64
 119  	var mutexSema *uint32
 120  	if read {
 121  		mutexBit = mutexRLock
 122  		mutexWait = mutexRWait
 123  		mutexMask = mutexRMask
 124  		mutexSema = &mu.rsema
 125  	} else {
 126  		mutexBit = mutexWLock
 127  		mutexWait = mutexWWait
 128  		mutexMask = mutexWMask
 129  		mutexSema = &mu.wsema
 130  	}
 131  	for {
 132  		old := atomic.LoadUint64(&mu.state)
 133  		if old&mutexClosed != 0 {
 134  			return false
 135  		}
 136  		var new uint64
 137  		if old&mutexBit == 0 {
 138  			// Lock is free, acquire it.
 139  			new = (old | mutexBit) + mutexRef
 140  			if new&mutexRefMask == 0 {
 141  				panic(overflowMsg)
 142  			}
 143  		} else {
 144  			// Wait for lock.
 145  			new = old + mutexWait
 146  			if new&mutexMask == 0 {
 147  				panic(overflowMsg)
 148  			}
 149  		}
 150  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
 151  			if old&mutexBit == 0 {
 152  				return true
 153  			}
 154  			runtime_Semacquire(mutexSema)
 155  			// The signaller has subtracted mutexWait.
 156  		}
 157  	}
 158  }
 159  
 160  // unlock removes a reference from mu and unlocks mu.
 161  // It reports whether there is no remaining reference.
 162  func (mu *fdMutex) rwunlock(read bool) bool {
 163  	var mutexBit, mutexWait, mutexMask uint64
 164  	var mutexSema *uint32
 165  	if read {
 166  		mutexBit = mutexRLock
 167  		mutexWait = mutexRWait
 168  		mutexMask = mutexRMask
 169  		mutexSema = &mu.rsema
 170  	} else {
 171  		mutexBit = mutexWLock
 172  		mutexWait = mutexWWait
 173  		mutexMask = mutexWMask
 174  		mutexSema = &mu.wsema
 175  	}
 176  	for {
 177  		old := atomic.LoadUint64(&mu.state)
 178  		if old&mutexBit == 0 || old&mutexRefMask == 0 {
 179  			panic("inconsistent poll.fdMutex")
 180  		}
 181  		// Drop lock, drop reference and wake read waiter if present.
 182  		new := (old &^ mutexBit) - mutexRef
 183  		if old&mutexMask != 0 {
 184  			new -= mutexWait
 185  		}
 186  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
 187  			if old&mutexMask != 0 {
 188  				runtime_Semrelease(mutexSema)
 189  			}
 190  			return new&(mutexClosed|mutexRefMask) == mutexClosed
 191  		}
 192  	}
 193  }
 194  
 195  // Implemented in runtime package.
 196  func runtime_Semacquire(sema *uint32)
 197  func runtime_Semrelease(sema *uint32)
 198  
 199  // incref adds a reference to fd.
 200  // It returns an error when fd cannot be used.
 201  func (fd *FD) incref() error {
 202  	if !fd.fdmu.incref() {
 203  		return errClosing(fd.isFile)
 204  	}
 205  	return nil
 206  }
 207  
 208  // decref removes a reference from fd.
 209  // It also closes fd when the state of fd is set to closed and there
 210  // is no remaining reference.
 211  func (fd *FD) decref() error {
 212  	if fd.fdmu.decref() {
 213  		return fd.destroy()
 214  	}
 215  	return nil
 216  }
 217  
 218  // readLock adds a reference to fd and locks fd for reading.
 219  // It returns an error when fd cannot be used for reading.
 220  func (fd *FD) readLock() error {
 221  	if !fd.fdmu.rwlock(true) {
 222  		return errClosing(fd.isFile)
 223  	}
 224  	return nil
 225  }
 226  
 227  // readUnlock removes a reference from fd and unlocks fd for reading.
 228  // It also closes fd when the state of fd is set to closed and there
 229  // is no remaining reference.
 230  func (fd *FD) readUnlock() {
 231  	if fd.fdmu.rwunlock(true) {
 232  		fd.destroy()
 233  	}
 234  }
 235  
 236  // writeLock adds a reference to fd and locks fd for writing.
 237  // It returns an error when fd cannot be used for writing.
 238  func (fd *FD) writeLock() error {
 239  	if !fd.fdmu.rwlock(false) {
 240  		return errClosing(fd.isFile)
 241  	}
 242  	return nil
 243  }
 244  
 245  // writeUnlock removes a reference from fd and unlocks fd for writing.
 246  // It also closes fd when the state of fd is set to closed and there
 247  // is no remaining reference.
 248  func (fd *FD) writeUnlock() {
 249  	if fd.fdmu.rwunlock(false) {
 250  		fd.destroy()
 251  	}
 252  }
 253  
 254  // closing returns true if fd is closing.
 255  func (fd *FD) closing() bool {
 256  	return atomic.LoadUint64(&fd.fdmu.state)&mutexClosed != 0
 257  }
 258