resource.go raw

   1  //go:build go1.18
   2  // +build go1.18
   3  
   4  // Copyright (c) Microsoft Corporation. All rights reserved.
   5  // Licensed under the MIT License.
   6  
   7  package temporal
   8  
   9  import (
  10  	"sync"
  11  	"time"
  12  )
  13  
  14  // backoff sets a minimum wait time between eager update attempts. It's a variable so tests can manipulate it.
  15  var backoff = func(now, lastAttempt time.Time) bool {
  16  	return lastAttempt.Add(30 * time.Second).After(now)
  17  }
  18  
  19  // AcquireResource abstracts a method for refreshing a temporal resource.
  20  type AcquireResource[TResource, TState any] func(state TState) (newResource TResource, newExpiration time.Time, err error)
  21  
  22  // ShouldRefresh abstracts a method for indicating whether a resource should be refreshed before expiration.
  23  type ShouldRefresh[TResource, TState any] func(TResource, TState) bool
  24  
  25  // Resource is a temporal resource (usually a credential) that requires periodic refreshing.
  26  type Resource[TResource, TState any] struct {
  27  	// cond is used to synchronize access to the shared resource embodied by the remaining fields
  28  	cond *sync.Cond
  29  
  30  	// acquiring indicates that some thread/goroutine is in the process of acquiring/updating the resource
  31  	acquiring bool
  32  
  33  	// resource contains the value of the shared resource
  34  	resource TResource
  35  
  36  	// expiration indicates when the shared resource expires; it is 0 if the resource was never acquired
  37  	expiration time.Time
  38  
  39  	// lastAttempt indicates when a thread/goroutine last attempted to acquire/update the resource
  40  	lastAttempt time.Time
  41  
  42  	// shouldRefresh indicates whether the resource should be refreshed before expiration
  43  	shouldRefresh ShouldRefresh[TResource, TState]
  44  
  45  	// acquireResource is the callback function that actually acquires the resource
  46  	acquireResource AcquireResource[TResource, TState]
  47  }
  48  
  49  // NewResource creates a new Resource that uses the specified AcquireResource for refreshing.
  50  func NewResource[TResource, TState any](ar AcquireResource[TResource, TState]) *Resource[TResource, TState] {
  51  	r := &Resource[TResource, TState]{acquireResource: ar, cond: sync.NewCond(&sync.Mutex{})}
  52  	r.shouldRefresh = r.expiringSoon
  53  	return r
  54  }
  55  
  56  // ResourceOptions contains optional configuration for Resource
  57  type ResourceOptions[TResource, TState any] struct {
  58  	// ShouldRefresh indicates whether [Resource.Get] should acquire an updated resource despite
  59  	// the currently held resource not having expired. [Resource.Get] ignores all errors from
  60  	// refresh attempts triggered by ShouldRefresh returning true, and doesn't call ShouldRefresh
  61  	// when the resource has expired (it unconditionally updates expired resources). When
  62  	// ShouldRefresh is nil, [Resource.Get] refreshes the resource if it will expire within 5
  63  	// minutes.
  64  	ShouldRefresh ShouldRefresh[TResource, TState]
  65  }
  66  
  67  // NewResourceWithOptions creates a new Resource that uses the specified AcquireResource for refreshing.
  68  func NewResourceWithOptions[TResource, TState any](ar AcquireResource[TResource, TState], opts ResourceOptions[TResource, TState]) *Resource[TResource, TState] {
  69  	r := NewResource(ar)
  70  	if opts.ShouldRefresh != nil {
  71  		r.shouldRefresh = opts.ShouldRefresh
  72  	}
  73  	return r
  74  }
  75  
  76  // Get returns the underlying resource.
  77  // If the resource is fresh, no refresh is performed.
  78  func (er *Resource[TResource, TState]) Get(state TState) (TResource, error) {
  79  	now, acquire, expired := time.Now(), false, false
  80  
  81  	// acquire exclusive lock
  82  	er.cond.L.Lock()
  83  	resource := er.resource
  84  
  85  	for {
  86  		expired = er.expiration.IsZero() || er.expiration.Before(now)
  87  		if expired {
  88  			// The resource was never acquired or has expired
  89  			if !er.acquiring {
  90  				// If another thread/goroutine is not acquiring/updating the resource, this thread/goroutine will do it
  91  				er.acquiring, acquire = true, true
  92  				break
  93  			}
  94  			// Getting here means that this thread/goroutine will wait for the updated resource
  95  		} else if er.shouldRefresh(resource, state) {
  96  			if !(er.acquiring || backoff(now, er.lastAttempt)) {
  97  				// If another thread/goroutine is not acquiring/renewing the resource, and none has attempted
  98  				// to do so within the last 30 seconds, this thread/goroutine will do it
  99  				er.acquiring, acquire = true, true
 100  				break
 101  			}
 102  			// This thread/goroutine will use the existing resource value while another updates it
 103  			resource = er.resource
 104  			break
 105  		} else {
 106  			// The resource is not close to expiring, this thread/goroutine should use its current value
 107  			resource = er.resource
 108  			break
 109  		}
 110  		// If we get here, wait for the new resource value to be acquired/updated
 111  		er.cond.Wait()
 112  	}
 113  	er.cond.L.Unlock() // Release the lock so no threads/goroutines are blocked
 114  
 115  	var err error
 116  	if acquire {
 117  		// This thread/goroutine has been selected to acquire/update the resource
 118  		var expiration time.Time
 119  		var newValue TResource
 120  		er.lastAttempt = now
 121  		newValue, expiration, err = er.acquireResource(state)
 122  
 123  		// Atomically, update the shared resource's new value & expiration.
 124  		er.cond.L.Lock()
 125  		if err == nil {
 126  			// Update resource & expiration, return the new value
 127  			resource = newValue
 128  			er.resource, er.expiration = resource, expiration
 129  		} else if !expired {
 130  			// An eager update failed. Discard the error and return the current--still valid--resource value
 131  			err = nil
 132  		}
 133  		er.acquiring = false // Indicate that no thread/goroutine is currently acquiring the resource
 134  
 135  		// Wake up any waiting threads/goroutines since there is a resource they can ALL use
 136  		er.cond.L.Unlock()
 137  		er.cond.Broadcast()
 138  	}
 139  	return resource, err // Return the resource this thread/goroutine can use
 140  }
 141  
 142  // Expire marks the resource as expired, ensuring it's refreshed on the next call to Get().
 143  func (er *Resource[TResource, TState]) Expire() {
 144  	er.cond.L.Lock()
 145  	defer er.cond.L.Unlock()
 146  
 147  	// Reset the expiration as if we never got this resource to begin with
 148  	er.expiration = time.Time{}
 149  }
 150  
 151  func (er *Resource[TResource, TState]) expiringSoon(TResource, TState) bool {
 152  	// call time.Now() instead of using Get's value so ShouldRefresh doesn't need a time.Time parameter
 153  	return er.expiration.Add(-5 * time.Minute).Before(time.Now())
 154  }
 155