idle.go raw

   1  /*
   2   *
   3   * Copyright 2023 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  // Package idle contains a component for managing idleness (entering and exiting)
  20  // based on RPC activity.
  21  package idle
  22  
  23  import (
  24  	"math"
  25  	"sync"
  26  	"sync/atomic"
  27  	"time"
  28  )
  29  
  30  // For overriding in unit tests.
  31  var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
  32  	return time.AfterFunc(d, f)
  33  }
  34  
  35  // ClientConn is the functionality provided by grpc.ClientConn to enter and exit
  36  // from idle mode.
  37  type ClientConn interface {
  38  	ExitIdleMode()
  39  	EnterIdleMode()
  40  }
  41  
  42  // Manager implements idleness detection and calls the ClientConn to enter/exit
  43  // idle mode when appropriate. Must be created by NewManager.
  44  type Manager struct {
  45  	// State accessed atomically.
  46  	lastCallEndTime           int64 // Unix timestamp in nanos; time when the most recent RPC completed.
  47  	activeCallsCount          int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
  48  	activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
  49  	closed                    int32 // Boolean; True when the manager is closed.
  50  
  51  	// Can be accessed without atomics or mutex since these are set at creation
  52  	// time and read-only after that.
  53  	cc      ClientConn // Functionality provided by grpc.ClientConn.
  54  	timeout time.Duration
  55  
  56  	// idleMu is used to guarantee mutual exclusion in two scenarios:
  57  	// - Opposing intentions:
  58  	//   - a: Idle timeout has fired and handleIdleTimeout() is trying to put
  59  	//     the channel in idle mode because the channel has been inactive.
  60  	//   - b: At the same time an RPC is made on the channel, and OnCallBegin()
  61  	//     is trying to prevent the channel from going idle.
  62  	// - Competing intentions:
  63  	//   - The channel is in idle mode and there are multiple RPCs starting at
  64  	//     the same time, all trying to move the channel out of idle. Only one
  65  	//     of them should succeed in doing so, while the other RPCs should
  66  	//     piggyback on the first one and be successfully handled.
  67  	idleMu       sync.RWMutex
  68  	actuallyIdle bool
  69  	timer        *time.Timer
  70  }
  71  
  72  // NewManager creates a new idleness manager implementation for the
  73  // given idle timeout.  It begins in idle mode.
  74  func NewManager(cc ClientConn, timeout time.Duration) *Manager {
  75  	return &Manager{
  76  		cc:               cc,
  77  		timeout:          timeout,
  78  		actuallyIdle:     true,
  79  		activeCallsCount: -math.MaxInt32,
  80  	}
  81  }
  82  
  83  // resetIdleTimerLocked resets the idle timer to the given duration.  Called
  84  // when exiting idle mode or when the timer fires and we need to reset it.
  85  func (m *Manager) resetIdleTimerLocked(d time.Duration) {
  86  	if m.isClosed() || m.timeout == 0 || m.actuallyIdle {
  87  		return
  88  	}
  89  
  90  	// It is safe to ignore the return value from Reset() because this method is
  91  	// only ever called from the timer callback or when exiting idle mode.
  92  	if m.timer != nil {
  93  		m.timer.Stop()
  94  	}
  95  	m.timer = timeAfterFunc(d, m.handleIdleTimeout)
  96  }
  97  
  98  func (m *Manager) resetIdleTimer(d time.Duration) {
  99  	m.idleMu.Lock()
 100  	defer m.idleMu.Unlock()
 101  	m.resetIdleTimerLocked(d)
 102  }
 103  
 104  // handleIdleTimeout is the timer callback that is invoked upon expiry of the
 105  // configured idle timeout. The channel is considered inactive if there are no
 106  // ongoing calls and no RPC activity since the last time the timer fired.
 107  func (m *Manager) handleIdleTimeout() {
 108  	if m.isClosed() {
 109  		return
 110  	}
 111  
 112  	if atomic.LoadInt32(&m.activeCallsCount) > 0 {
 113  		m.resetIdleTimer(m.timeout)
 114  		return
 115  	}
 116  
 117  	// There has been activity on the channel since we last got here. Reset the
 118  	// timer and return.
 119  	if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
 120  		// Set the timer to fire after a duration of idle timeout, calculated
 121  		// from the time the most recent RPC completed.
 122  		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0)
 123  		m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout)
 124  		return
 125  	}
 126  
 127  	// Now that we've checked that there has been no activity, attempt to enter
 128  	// idle mode, which is very likely to succeed.
 129  	if m.tryEnterIdleMode(true) {
 130  		// Successfully entered idle mode. No timer needed until we exit idle.
 131  		return
 132  	}
 133  
 134  	// Failed to enter idle mode due to a concurrent RPC that kept the channel
 135  	// active, or because of an error from the channel. Undo the attempt to
 136  	// enter idle, and reset the timer to try again later.
 137  	m.resetIdleTimer(m.timeout)
 138  }
 139  
 140  // tryEnterIdleMode instructs the channel to enter idle mode. But before
 141  // that, it performs a last minute check to ensure that no new RPC has come in,
 142  // making the channel active.
 143  //
 144  // checkActivity controls if a check for RPC activity, since the last time the
 145  // idle_timeout fired, is made.
 146  
 147  // Return value indicates whether or not the channel moved to idle mode.
 148  //
 149  // Holds idleMu which ensures mutual exclusion with exitIdleMode.
 150  func (m *Manager) tryEnterIdleMode(checkActivity bool) bool {
 151  	// Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
 152  	// that the channel is either in idle mode or is trying to get there.
 153  	if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
 154  		// This CAS operation can fail if an RPC started after we checked for
 155  		// activity in the timer handler, or one was ongoing from before the
 156  		// last time the timer fired, or if a test is attempting to enter idle
 157  		// mode without checking.  In all cases, abort going into idle mode.
 158  		return false
 159  	}
 160  	// N.B. if we fail to enter idle mode after this, we must re-add
 161  	// math.MaxInt32 to m.activeCallsCount.
 162  
 163  	m.idleMu.Lock()
 164  	defer m.idleMu.Unlock()
 165  
 166  	if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {
 167  		// We raced and lost to a new RPC. Very rare, but stop entering idle.
 168  		atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
 169  		return false
 170  	}
 171  	if checkActivity && atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
 172  		// A very short RPC could have come in (and also finished) after we
 173  		// checked for calls count and activity in handleIdleTimeout(), but
 174  		// before the CAS operation. So, we need to check for activity again.
 175  		atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
 176  		return false
 177  	}
 178  
 179  	// No new RPCs have come in since we set the active calls count value to
 180  	// -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
 181  	// unconditionally now.
 182  	m.cc.EnterIdleMode()
 183  	m.actuallyIdle = true
 184  	return true
 185  }
 186  
 187  // EnterIdleModeForTesting instructs the channel to enter idle mode.
 188  func (m *Manager) EnterIdleModeForTesting() {
 189  	m.tryEnterIdleMode(false)
 190  }
 191  
 192  // OnCallBegin is invoked at the start of every RPC.
 193  func (m *Manager) OnCallBegin() {
 194  	if m.isClosed() {
 195  		return
 196  	}
 197  
 198  	if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
 199  		// Channel is not idle now. Set the activity bit and allow the call.
 200  		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
 201  		return
 202  	}
 203  
 204  	// Channel is either in idle mode or is in the process of moving to idle
 205  	// mode. Attempt to exit idle mode to allow this RPC.
 206  	m.ExitIdleMode()
 207  	atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
 208  }
 209  
 210  // ExitIdleMode instructs m to call the ClientConn's ExitIdleMode and update its
 211  // internal state.
 212  func (m *Manager) ExitIdleMode() {
 213  	// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
 214  	m.idleMu.Lock()
 215  	defer m.idleMu.Unlock()
 216  
 217  	if m.isClosed() || !m.actuallyIdle {
 218  		// This can happen in three scenarios:
 219  		// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
 220  		//   tryEnterIdleMode(). But before the latter could grab the lock, an RPC
 221  		//   came in and OnCallBegin() noticed that the calls count is negative.
 222  		// - Channel is in idle mode, and multiple new RPCs come in at the same
 223  		//   time, all of them notice a negative calls count in OnCallBegin and get
 224  		//   here. The first one to get the lock would get the channel to exit idle.
 225  		// - Channel is not in idle mode, and the user calls Connect which calls
 226  		//   m.ExitIdleMode.
 227  		//
 228  		// In any case, there is nothing to do here.
 229  		return
 230  	}
 231  
 232  	m.cc.ExitIdleMode()
 233  
 234  	// Undo the idle entry process. This also respects any new RPC attempts.
 235  	atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
 236  	m.actuallyIdle = false
 237  
 238  	// Start a new timer to fire after the configured idle timeout.
 239  	m.resetIdleTimerLocked(m.timeout)
 240  }
 241  
 242  // UnsafeSetNotIdle instructs the Manager to update its internal state to
 243  // reflect the reality that the channel is no longer in IDLE mode.
 244  //
 245  // N.B. This method is intended only for internal use by the gRPC client
 246  // when it exits IDLE mode **manually** from `Dial`. The callsite must ensure:
 247  //   - The channel was **actually in IDLE mode** immediately prior to the call.
 248  //   - There is **no concurrent activity** that could cause the channel to exit
 249  //     IDLE mode *naturally* at the same time.
 250  func (m *Manager) UnsafeSetNotIdle() {
 251  	m.idleMu.Lock()
 252  	defer m.idleMu.Unlock()
 253  
 254  	atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
 255  	m.actuallyIdle = false
 256  	m.resetIdleTimerLocked(m.timeout)
 257  }
 258  
 259  // OnCallEnd is invoked at the end of every RPC.
 260  func (m *Manager) OnCallEnd() {
 261  	if m.isClosed() {
 262  		return
 263  	}
 264  
 265  	// Record the time at which the most recent call finished.
 266  	atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano())
 267  
 268  	// Decrement the active calls count. This count can temporarily go negative
 269  	// when the timer callback is in the process of moving the channel to idle
 270  	// mode, but one or more RPCs come in and complete before the timer callback
 271  	// can get done with the process of moving to idle mode.
 272  	atomic.AddInt32(&m.activeCallsCount, -1)
 273  }
 274  
 275  func (m *Manager) isClosed() bool {
 276  	return atomic.LoadInt32(&m.closed) == 1
 277  }
 278  
 279  // Close stops the timer associated with the Manager, if it exists.
 280  func (m *Manager) Close() {
 281  	atomic.StoreInt32(&m.closed, 1)
 282  
 283  	m.idleMu.Lock()
 284  	if m.timer != nil {
 285  		m.timer.Stop()
 286  		m.timer = nil
 287  	}
 288  	m.idleMu.Unlock()
 289  }
 290