1 /*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package transport
20 21 import (
22 "fmt"
23 "math"
24 "sync"
25 "sync/atomic"
26 )
27 28 // writeQuota is a soft limit on the amount of data a stream can
29 // schedule before some of it is written out.
30 type writeQuota struct {
31 _ noCopy
32 // get waits on read from when quota goes less than or equal to zero.
33 // replenish writes on it when quota goes positive again.
34 ch chan struct{}
35 // done is triggered in error case.
36 done <-chan struct{}
37 // replenish is called by loopyWriter to give quota back to.
38 // It is implemented as a field so that it can be updated
39 // by tests.
40 replenish func(n int)
41 quota int32
42 }
43 44 // init allows a writeQuota to be initialized in-place, which is useful for
45 // resetting a buffer or for avoiding a heap allocation when the buffer is
46 // embedded in another struct.
47 func (w *writeQuota) init(sz int32, done <-chan struct{}) {
48 w.quota = sz
49 w.ch = make(chan struct{}, 1)
50 w.done = done
51 w.replenish = w.realReplenish
52 }
53 54 func (w *writeQuota) get(sz int32) error {
55 for {
56 if atomic.LoadInt32(&w.quota) > 0 {
57 atomic.AddInt32(&w.quota, -sz)
58 return nil
59 }
60 select {
61 case <-w.ch:
62 continue
63 case <-w.done:
64 return errStreamDone
65 }
66 }
67 }
68 69 func (w *writeQuota) realReplenish(n int) {
70 sz := int32(n)
71 newQuota := atomic.AddInt32(&w.quota, sz)
72 previousQuota := newQuota - sz
73 if previousQuota <= 0 && newQuota > 0 {
74 select {
75 case w.ch <- struct{}{}:
76 default:
77 }
78 }
79 }
80 81 type trInFlow struct {
82 limit uint32
83 unacked uint32
84 effectiveWindowSize uint32
85 }
86 87 func (f *trInFlow) newLimit(n uint32) uint32 {
88 d := n - f.limit
89 f.limit = n
90 f.updateEffectiveWindowSize()
91 return d
92 }
93 94 func (f *trInFlow) onData(n uint32) uint32 {
95 f.unacked += n
96 if f.unacked < f.limit/4 {
97 f.updateEffectiveWindowSize()
98 return 0
99 }
100 return f.reset()
101 }
102 103 func (f *trInFlow) reset() uint32 {
104 w := f.unacked
105 f.unacked = 0
106 f.updateEffectiveWindowSize()
107 return w
108 }
109 110 func (f *trInFlow) updateEffectiveWindowSize() {
111 atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
112 }
113 114 func (f *trInFlow) getSize() uint32 {
115 return atomic.LoadUint32(&f.effectiveWindowSize)
116 }
117 118 // TODO(mmukhi): Simplify this code.
119 // inFlow deals with inbound flow control
120 type inFlow struct {
121 mu sync.Mutex
122 // The inbound flow control limit for pending data.
123 limit uint32
124 // pendingData is the overall data which have been received but not been
125 // consumed by applications.
126 pendingData uint32
127 // The amount of data the application has consumed but grpc has not sent
128 // window update for them. Used to reduce window update frequency.
129 pendingUpdate uint32
130 // delta is the extra window update given by receiver when an application
131 // is reading data bigger in size than the inFlow limit.
132 delta uint32
133 }
134 135 // newLimit updates the inflow window to a new value n.
136 // It assumes that n is always greater than the old limit.
137 func (f *inFlow) newLimit(n uint32) {
138 f.mu.Lock()
139 f.limit = n
140 f.mu.Unlock()
141 }
142 143 func (f *inFlow) maybeAdjust(n uint32) uint32 {
144 if n > uint32(math.MaxInt32) {
145 n = uint32(math.MaxInt32)
146 }
147 f.mu.Lock()
148 defer f.mu.Unlock()
149 // estSenderQuota is the receiver's view of the maximum number of bytes the sender
150 // can send without a window update.
151 estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
152 // estUntransmittedData is the maximum number of bytes the sends might not have put
153 // on the wire yet. A value of 0 or less means that we have already received all or
154 // more bytes than the application is requesting to read.
155 estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
156 // This implies that unless we send a window update, the sender won't be able to send all the bytes
157 // for this message. Therefore we must send an update over the limit since there's an active read
158 // request from the application.
159 if estUntransmittedData > estSenderQuota {
160 // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
161 if f.limit+n > maxWindowSize {
162 f.delta = maxWindowSize - f.limit
163 } else {
164 // Send a window update for the whole message and not just the difference between
165 // estUntransmittedData and estSenderQuota. This will be helpful in case the message
166 // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
167 f.delta = n
168 }
169 return f.delta
170 }
171 return 0
172 }
173 174 // onData is invoked when some data frame is received. It updates pendingData.
175 func (f *inFlow) onData(n uint32) error {
176 f.mu.Lock()
177 f.pendingData += n
178 if f.pendingData+f.pendingUpdate > f.limit+f.delta {
179 limit := f.limit
180 rcvd := f.pendingData + f.pendingUpdate
181 f.mu.Unlock()
182 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
183 }
184 f.mu.Unlock()
185 return nil
186 }
187 188 // onRead is invoked when the application reads the data. It returns the window size
189 // to be sent to the peer.
190 func (f *inFlow) onRead(n uint32) uint32 {
191 f.mu.Lock()
192 if f.pendingData == 0 {
193 f.mu.Unlock()
194 return 0
195 }
196 f.pendingData -= n
197 if n > f.delta {
198 n -= f.delta
199 f.delta = 0
200 } else {
201 f.delta -= n
202 n = 0
203 }
204 f.pendingUpdate += n
205 if f.pendingUpdate >= f.limit/4 {
206 wu := f.pendingUpdate
207 f.pendingUpdate = 0
208 f.mu.Unlock()
209 return wu
210 }
211 f.mu.Unlock()
212 return 0
213 }
214