1 /*
2 *
3 * Copyright 2023 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 // Package idle contains a component for managing idleness (entering and exiting)
20 // based on RPC activity.
21 package idle
22 23 import (
24 "math"
25 "sync"
26 "sync/atomic"
27 "time"
28 )
29 30 // For overriding in unit tests.
31 var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
32 return time.AfterFunc(d, f)
33 }
34 35 // ClientConn is the functionality provided by grpc.ClientConn to enter and exit
36 // from idle mode.
37 type ClientConn interface {
38 ExitIdleMode()
39 EnterIdleMode()
40 }
41 42 // Manager implements idleness detection and calls the ClientConn to enter/exit
43 // idle mode when appropriate. Must be created by NewManager.
44 type Manager struct {
45 // State accessed atomically.
46 lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
47 activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
48 activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
49 closed int32 // Boolean; True when the manager is closed.
50 51 // Can be accessed without atomics or mutex since these are set at creation
52 // time and read-only after that.
53 cc ClientConn // Functionality provided by grpc.ClientConn.
54 timeout time.Duration
55 56 // idleMu is used to guarantee mutual exclusion in two scenarios:
57 // - Opposing intentions:
58 // - a: Idle timeout has fired and handleIdleTimeout() is trying to put
59 // the channel in idle mode because the channel has been inactive.
60 // - b: At the same time an RPC is made on the channel, and OnCallBegin()
61 // is trying to prevent the channel from going idle.
62 // - Competing intentions:
63 // - The channel is in idle mode and there are multiple RPCs starting at
64 // the same time, all trying to move the channel out of idle. Only one
65 // of them should succeed in doing so, while the other RPCs should
66 // piggyback on the first one and be successfully handled.
67 idleMu sync.RWMutex
68 actuallyIdle bool
69 timer *time.Timer
70 }
71 72 // NewManager creates a new idleness manager implementation for the
73 // given idle timeout. It begins in idle mode.
74 func NewManager(cc ClientConn, timeout time.Duration) *Manager {
75 return &Manager{
76 cc: cc,
77 timeout: timeout,
78 actuallyIdle: true,
79 activeCallsCount: -math.MaxInt32,
80 }
81 }
82 83 // resetIdleTimerLocked resets the idle timer to the given duration. Called
84 // when exiting idle mode or when the timer fires and we need to reset it.
85 func (m *Manager) resetIdleTimerLocked(d time.Duration) {
86 if m.isClosed() || m.timeout == 0 || m.actuallyIdle {
87 return
88 }
89 90 // It is safe to ignore the return value from Reset() because this method is
91 // only ever called from the timer callback or when exiting idle mode.
92 if m.timer != nil {
93 m.timer.Stop()
94 }
95 m.timer = timeAfterFunc(d, m.handleIdleTimeout)
96 }
97 98 func (m *Manager) resetIdleTimer(d time.Duration) {
99 m.idleMu.Lock()
100 defer m.idleMu.Unlock()
101 m.resetIdleTimerLocked(d)
102 }
103 104 // handleIdleTimeout is the timer callback that is invoked upon expiry of the
105 // configured idle timeout. The channel is considered inactive if there are no
106 // ongoing calls and no RPC activity since the last time the timer fired.
107 func (m *Manager) handleIdleTimeout() {
108 if m.isClosed() {
109 return
110 }
111 112 if atomic.LoadInt32(&m.activeCallsCount) > 0 {
113 m.resetIdleTimer(m.timeout)
114 return
115 }
116 117 // There has been activity on the channel since we last got here. Reset the
118 // timer and return.
119 if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
120 // Set the timer to fire after a duration of idle timeout, calculated
121 // from the time the most recent RPC completed.
122 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0)
123 m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout)
124 return
125 }
126 127 // Now that we've checked that there has been no activity, attempt to enter
128 // idle mode, which is very likely to succeed.
129 if m.tryEnterIdleMode(true) {
130 // Successfully entered idle mode. No timer needed until we exit idle.
131 return
132 }
133 134 // Failed to enter idle mode due to a concurrent RPC that kept the channel
135 // active, or because of an error from the channel. Undo the attempt to
136 // enter idle, and reset the timer to try again later.
137 m.resetIdleTimer(m.timeout)
138 }
139 140 // tryEnterIdleMode instructs the channel to enter idle mode. But before
141 // that, it performs a last minute check to ensure that no new RPC has come in,
142 // making the channel active.
143 //
144 // checkActivity controls if a check for RPC activity, since the last time the
145 // idle_timeout fired, is made.
146 147 // Return value indicates whether or not the channel moved to idle mode.
148 //
149 // Holds idleMu which ensures mutual exclusion with exitIdleMode.
150 func (m *Manager) tryEnterIdleMode(checkActivity bool) bool {
151 // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
152 // that the channel is either in idle mode or is trying to get there.
153 if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
154 // This CAS operation can fail if an RPC started after we checked for
155 // activity in the timer handler, or one was ongoing from before the
156 // last time the timer fired, or if a test is attempting to enter idle
157 // mode without checking. In all cases, abort going into idle mode.
158 return false
159 }
160 // N.B. if we fail to enter idle mode after this, we must re-add
161 // math.MaxInt32 to m.activeCallsCount.
162 163 m.idleMu.Lock()
164 defer m.idleMu.Unlock()
165 166 if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {
167 // We raced and lost to a new RPC. Very rare, but stop entering idle.
168 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
169 return false
170 }
171 if checkActivity && atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
172 // A very short RPC could have come in (and also finished) after we
173 // checked for calls count and activity in handleIdleTimeout(), but
174 // before the CAS operation. So, we need to check for activity again.
175 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
176 return false
177 }
178 179 // No new RPCs have come in since we set the active calls count value to
180 // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
181 // unconditionally now.
182 m.cc.EnterIdleMode()
183 m.actuallyIdle = true
184 return true
185 }
186 187 // EnterIdleModeForTesting instructs the channel to enter idle mode.
188 func (m *Manager) EnterIdleModeForTesting() {
189 m.tryEnterIdleMode(false)
190 }
191 192 // OnCallBegin is invoked at the start of every RPC.
193 func (m *Manager) OnCallBegin() {
194 if m.isClosed() {
195 return
196 }
197 198 if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
199 // Channel is not idle now. Set the activity bit and allow the call.
200 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
201 return
202 }
203 204 // Channel is either in idle mode or is in the process of moving to idle
205 // mode. Attempt to exit idle mode to allow this RPC.
206 m.ExitIdleMode()
207 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
208 }
209 210 // ExitIdleMode instructs m to call the ClientConn's ExitIdleMode and update its
211 // internal state.
212 func (m *Manager) ExitIdleMode() {
213 // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
214 m.idleMu.Lock()
215 defer m.idleMu.Unlock()
216 217 if m.isClosed() || !m.actuallyIdle {
218 // This can happen in three scenarios:
219 // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
220 // tryEnterIdleMode(). But before the latter could grab the lock, an RPC
221 // came in and OnCallBegin() noticed that the calls count is negative.
222 // - Channel is in idle mode, and multiple new RPCs come in at the same
223 // time, all of them notice a negative calls count in OnCallBegin and get
224 // here. The first one to get the lock would get the channel to exit idle.
225 // - Channel is not in idle mode, and the user calls Connect which calls
226 // m.ExitIdleMode.
227 //
228 // In any case, there is nothing to do here.
229 return
230 }
231 232 m.cc.ExitIdleMode()
233 234 // Undo the idle entry process. This also respects any new RPC attempts.
235 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
236 m.actuallyIdle = false
237 238 // Start a new timer to fire after the configured idle timeout.
239 m.resetIdleTimerLocked(m.timeout)
240 }
241 242 // UnsafeSetNotIdle instructs the Manager to update its internal state to
243 // reflect the reality that the channel is no longer in IDLE mode.
244 //
245 // N.B. This method is intended only for internal use by the gRPC client
246 // when it exits IDLE mode **manually** from `Dial`. The callsite must ensure:
247 // - The channel was **actually in IDLE mode** immediately prior to the call.
248 // - There is **no concurrent activity** that could cause the channel to exit
249 // IDLE mode *naturally* at the same time.
250 func (m *Manager) UnsafeSetNotIdle() {
251 m.idleMu.Lock()
252 defer m.idleMu.Unlock()
253 254 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
255 m.actuallyIdle = false
256 m.resetIdleTimerLocked(m.timeout)
257 }
258 259 // OnCallEnd is invoked at the end of every RPC.
260 func (m *Manager) OnCallEnd() {
261 if m.isClosed() {
262 return
263 }
264 265 // Record the time at which the most recent call finished.
266 atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano())
267 268 // Decrement the active calls count. This count can temporarily go negative
269 // when the timer callback is in the process of moving the channel to idle
270 // mode, but one or more RPCs come in and complete before the timer callback
271 // can get done with the process of moving to idle mode.
272 atomic.AddInt32(&m.activeCallsCount, -1)
273 }
274 275 func (m *Manager) isClosed() bool {
276 return atomic.LoadInt32(&m.closed) == 1
277 }
278 279 // Close stops the timer associated with the Manager, if it exists.
280 func (m *Manager) Close() {
281 atomic.StoreInt32(&m.closed, 1)
282 283 m.idleMu.Lock()
284 if m.timer != nil {
285 m.timer.Stop()
286 m.timer = nil
287 }
288 m.idleMu.Unlock()
289 }
290