1 //go:build go1.18
2 // +build go1.18
3 4 // Copyright (c) Microsoft Corporation. All rights reserved.
5 // Licensed under the MIT License.
6 7 package temporal
8 9 import (
10 "sync"
11 "time"
12 )
13 14 // backoff sets a minimum wait time between eager update attempts. It's a variable so tests can manipulate it.
15 var backoff = func(now, lastAttempt time.Time) bool {
16 return lastAttempt.Add(30 * time.Second).After(now)
17 }
18 19 // AcquireResource abstracts a method for refreshing a temporal resource.
20 type AcquireResource[TResource, TState any] func(state TState) (newResource TResource, newExpiration time.Time, err error)
21 22 // ShouldRefresh abstracts a method for indicating whether a resource should be refreshed before expiration.
23 type ShouldRefresh[TResource, TState any] func(TResource, TState) bool
24 25 // Resource is a temporal resource (usually a credential) that requires periodic refreshing.
26 type Resource[TResource, TState any] struct {
27 // cond is used to synchronize access to the shared resource embodied by the remaining fields
28 cond *sync.Cond
29 30 // acquiring indicates that some thread/goroutine is in the process of acquiring/updating the resource
31 acquiring bool
32 33 // resource contains the value of the shared resource
34 resource TResource
35 36 // expiration indicates when the shared resource expires; it is 0 if the resource was never acquired
37 expiration time.Time
38 39 // lastAttempt indicates when a thread/goroutine last attempted to acquire/update the resource
40 lastAttempt time.Time
41 42 // shouldRefresh indicates whether the resource should be refreshed before expiration
43 shouldRefresh ShouldRefresh[TResource, TState]
44 45 // acquireResource is the callback function that actually acquires the resource
46 acquireResource AcquireResource[TResource, TState]
47 }
48 49 // NewResource creates a new Resource that uses the specified AcquireResource for refreshing.
50 func NewResource[TResource, TState any](ar AcquireResource[TResource, TState]) *Resource[TResource, TState] {
51 r := &Resource[TResource, TState]{acquireResource: ar, cond: sync.NewCond(&sync.Mutex{})}
52 r.shouldRefresh = r.expiringSoon
53 return r
54 }
55 56 // ResourceOptions contains optional configuration for Resource
57 type ResourceOptions[TResource, TState any] struct {
58 // ShouldRefresh indicates whether [Resource.Get] should acquire an updated resource despite
59 // the currently held resource not having expired. [Resource.Get] ignores all errors from
60 // refresh attempts triggered by ShouldRefresh returning true, and doesn't call ShouldRefresh
61 // when the resource has expired (it unconditionally updates expired resources). When
62 // ShouldRefresh is nil, [Resource.Get] refreshes the resource if it will expire within 5
63 // minutes.
64 ShouldRefresh ShouldRefresh[TResource, TState]
65 }
66 67 // NewResourceWithOptions creates a new Resource that uses the specified AcquireResource for refreshing.
68 func NewResourceWithOptions[TResource, TState any](ar AcquireResource[TResource, TState], opts ResourceOptions[TResource, TState]) *Resource[TResource, TState] {
69 r := NewResource(ar)
70 if opts.ShouldRefresh != nil {
71 r.shouldRefresh = opts.ShouldRefresh
72 }
73 return r
74 }
75 76 // Get returns the underlying resource.
77 // If the resource is fresh, no refresh is performed.
78 func (er *Resource[TResource, TState]) Get(state TState) (TResource, error) {
79 now, acquire, expired := time.Now(), false, false
80 81 // acquire exclusive lock
82 er.cond.L.Lock()
83 resource := er.resource
84 85 for {
86 expired = er.expiration.IsZero() || er.expiration.Before(now)
87 if expired {
88 // The resource was never acquired or has expired
89 if !er.acquiring {
90 // If another thread/goroutine is not acquiring/updating the resource, this thread/goroutine will do it
91 er.acquiring, acquire = true, true
92 break
93 }
94 // Getting here means that this thread/goroutine will wait for the updated resource
95 } else if er.shouldRefresh(resource, state) {
96 if !(er.acquiring || backoff(now, er.lastAttempt)) {
97 // If another thread/goroutine is not acquiring/renewing the resource, and none has attempted
98 // to do so within the last 30 seconds, this thread/goroutine will do it
99 er.acquiring, acquire = true, true
100 break
101 }
102 // This thread/goroutine will use the existing resource value while another updates it
103 resource = er.resource
104 break
105 } else {
106 // The resource is not close to expiring, this thread/goroutine should use its current value
107 resource = er.resource
108 break
109 }
110 // If we get here, wait for the new resource value to be acquired/updated
111 er.cond.Wait()
112 }
113 er.cond.L.Unlock() // Release the lock so no threads/goroutines are blocked
114 115 var err error
116 if acquire {
117 // This thread/goroutine has been selected to acquire/update the resource
118 var expiration time.Time
119 var newValue TResource
120 er.lastAttempt = now
121 newValue, expiration, err = er.acquireResource(state)
122 123 // Atomically, update the shared resource's new value & expiration.
124 er.cond.L.Lock()
125 if err == nil {
126 // Update resource & expiration, return the new value
127 resource = newValue
128 er.resource, er.expiration = resource, expiration
129 } else if !expired {
130 // An eager update failed. Discard the error and return the current--still valid--resource value
131 err = nil
132 }
133 er.acquiring = false // Indicate that no thread/goroutine is currently acquiring the resource
134 135 // Wake up any waiting threads/goroutines since there is a resource they can ALL use
136 er.cond.L.Unlock()
137 er.cond.Broadcast()
138 }
139 return resource, err // Return the resource this thread/goroutine can use
140 }
141 142 // Expire marks the resource as expired, ensuring it's refreshed on the next call to Get().
143 func (er *Resource[TResource, TState]) Expire() {
144 er.cond.L.Lock()
145 defer er.cond.L.Unlock()
146 147 // Reset the expiration as if we never got this resource to begin with
148 er.expiration = time.Time{}
149 }
150 151 func (er *Resource[TResource, TState]) expiringSoon(TResource, TState) bool {
152 // call time.Now() instead of using Get's value so ShouldRefresh doesn't need a time.Time parameter
153 return er.expiration.Add(-5 * time.Minute).Before(time.Now())
154 }
155