1 // Copyright 2013 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 package poll
6 7 import "sync/atomic"
8 9 // fdMutex is a specialized synchronization primitive that manages
10 // lifetime of an fd and serializes access to Read, Write and Close
11 // methods on FD.
12 type fdMutex struct {
13 state uint64
14 rsema uint32
15 wsema uint32
16 }
17 18 // fdMutex.state is organized as follows:
19 // 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
20 // 1 bit - lock for read operations.
21 // 1 bit - lock for write operations.
22 // 20 bits - total number of references (read+write+misc).
23 // 20 bits - number of outstanding read waiters.
24 // 20 bits - number of outstanding write waiters.
25 const (
26 mutexClosed = 1 << 0
27 mutexRLock = 1 << 1
28 mutexWLock = 1 << 2
29 mutexRef = 1 << 3
30 mutexRefMask = (1<<20 - 1) << 3
31 mutexRWait = 1 << 23
32 mutexRMask = (1<<20 - 1) << 23
33 mutexWWait = 1 << 43
34 mutexWMask = (1<<20 - 1) << 43
35 )
36 37 const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
38 39 // Read operations must do rwlock(true)/rwunlock(true).
40 //
41 // Write operations must do rwlock(false)/rwunlock(false).
42 //
43 // Misc operations must do incref/decref.
44 // Misc operations include functions like setsockopt and setDeadline.
45 // They need to use incref/decref to ensure that they operate on the
46 // correct fd in presence of a concurrent close call (otherwise fd can
47 // be closed under their feet).
48 //
49 // Close operations must do increfAndClose/decref.
50 51 // incref adds a reference to mu.
52 // It reports whether mu is available for reading or writing.
53 func (mu *fdMutex) incref() bool {
54 for {
55 old := atomic.LoadUint64(&mu.state)
56 if old&mutexClosed != 0 {
57 return false
58 }
59 new := old + mutexRef
60 if new&mutexRefMask == 0 {
61 panic(overflowMsg)
62 }
63 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
64 return true
65 }
66 }
67 }
68 69 // increfAndClose sets the state of mu to closed.
70 // It returns false if the file was already closed.
71 func (mu *fdMutex) increfAndClose() bool {
72 for {
73 old := atomic.LoadUint64(&mu.state)
74 if old&mutexClosed != 0 {
75 return false
76 }
77 // Mark as closed and acquire a reference.
78 new := (old | mutexClosed) + mutexRef
79 if new&mutexRefMask == 0 {
80 panic(overflowMsg)
81 }
82 // Remove all read and write waiters.
83 new &^= mutexRMask | mutexWMask
84 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
85 // Wake all read and write waiters,
86 // they will observe closed flag after wakeup.
87 for old&mutexRMask != 0 {
88 old -= mutexRWait
89 runtime_Semrelease(&mu.rsema)
90 }
91 for old&mutexWMask != 0 {
92 old -= mutexWWait
93 runtime_Semrelease(&mu.wsema)
94 }
95 return true
96 }
97 }
98 }
99 100 // decref removes a reference from mu.
101 // It reports whether there is no remaining reference.
102 func (mu *fdMutex) decref() bool {
103 for {
104 old := atomic.LoadUint64(&mu.state)
105 if old&mutexRefMask == 0 {
106 panic("inconsistent poll.fdMutex")
107 }
108 new := old - mutexRef
109 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
110 return new&(mutexClosed|mutexRefMask) == mutexClosed
111 }
112 }
113 }
114 115 // lock adds a reference to mu and locks mu.
116 // It reports whether mu is available for reading or writing.
117 func (mu *fdMutex) rwlock(read bool) bool {
118 var mutexBit, mutexWait, mutexMask uint64
119 var mutexSema *uint32
120 if read {
121 mutexBit = mutexRLock
122 mutexWait = mutexRWait
123 mutexMask = mutexRMask
124 mutexSema = &mu.rsema
125 } else {
126 mutexBit = mutexWLock
127 mutexWait = mutexWWait
128 mutexMask = mutexWMask
129 mutexSema = &mu.wsema
130 }
131 for {
132 old := atomic.LoadUint64(&mu.state)
133 if old&mutexClosed != 0 {
134 return false
135 }
136 var new uint64
137 if old&mutexBit == 0 {
138 // Lock is free, acquire it.
139 new = (old | mutexBit) + mutexRef
140 if new&mutexRefMask == 0 {
141 panic(overflowMsg)
142 }
143 } else {
144 // Wait for lock.
145 new = old + mutexWait
146 if new&mutexMask == 0 {
147 panic(overflowMsg)
148 }
149 }
150 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
151 if old&mutexBit == 0 {
152 return true
153 }
154 runtime_Semacquire(mutexSema)
155 // The signaller has subtracted mutexWait.
156 }
157 }
158 }
159 160 // unlock removes a reference from mu and unlocks mu.
161 // It reports whether there is no remaining reference.
162 func (mu *fdMutex) rwunlock(read bool) bool {
163 var mutexBit, mutexWait, mutexMask uint64
164 var mutexSema *uint32
165 if read {
166 mutexBit = mutexRLock
167 mutexWait = mutexRWait
168 mutexMask = mutexRMask
169 mutexSema = &mu.rsema
170 } else {
171 mutexBit = mutexWLock
172 mutexWait = mutexWWait
173 mutexMask = mutexWMask
174 mutexSema = &mu.wsema
175 }
176 for {
177 old := atomic.LoadUint64(&mu.state)
178 if old&mutexBit == 0 || old&mutexRefMask == 0 {
179 panic("inconsistent poll.fdMutex")
180 }
181 // Drop lock, drop reference and wake read waiter if present.
182 new := (old &^ mutexBit) - mutexRef
183 if old&mutexMask != 0 {
184 new -= mutexWait
185 }
186 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
187 if old&mutexMask != 0 {
188 runtime_Semrelease(mutexSema)
189 }
190 return new&(mutexClosed|mutexRefMask) == mutexClosed
191 }
192 }
193 }
194 195 // Implemented in runtime package.
196 func runtime_Semacquire(sema *uint32)
197 func runtime_Semrelease(sema *uint32)
198 199 // incref adds a reference to fd.
200 // It returns an error when fd cannot be used.
201 func (fd *FD) incref() error {
202 if !fd.fdmu.incref() {
203 return errClosing(fd.isFile)
204 }
205 return nil
206 }
207 208 // decref removes a reference from fd.
209 // It also closes fd when the state of fd is set to closed and there
210 // is no remaining reference.
211 func (fd *FD) decref() error {
212 if fd.fdmu.decref() {
213 return fd.destroy()
214 }
215 return nil
216 }
217 218 // readLock adds a reference to fd and locks fd for reading.
219 // It returns an error when fd cannot be used for reading.
220 func (fd *FD) readLock() error {
221 if !fd.fdmu.rwlock(true) {
222 return errClosing(fd.isFile)
223 }
224 return nil
225 }
226 227 // readUnlock removes a reference from fd and unlocks fd for reading.
228 // It also closes fd when the state of fd is set to closed and there
229 // is no remaining reference.
230 func (fd *FD) readUnlock() {
231 if fd.fdmu.rwunlock(true) {
232 fd.destroy()
233 }
234 }
235 236 // writeLock adds a reference to fd and locks fd for writing.
237 // It returns an error when fd cannot be used for writing.
238 func (fd *FD) writeLock() error {
239 if !fd.fdmu.rwlock(false) {
240 return errClosing(fd.isFile)
241 }
242 return nil
243 }
244 245 // writeUnlock removes a reference from fd and unlocks fd for writing.
246 // It also closes fd when the state of fd is set to closed and there
247 // is no remaining reference.
248 func (fd *FD) writeUnlock() {
249 if fd.fdmu.rwunlock(false) {
250 fd.destroy()
251 }
252 }
253 254 // closing returns true if fd is closing.
255 func (fd *FD) closing() bool {
256 return atomic.LoadUint64(&fd.fdmu.state)&mutexClosed != 0
257 }
258