wait.go raw

   1  package async
   2  
   3  import (
   4  	"fmt"
   5  	"time"
   6  )
   7  
   8  var (
   9  	defaultInterval = time.Second
  10  	defaultTimeout  = time.Minute * 5
  11  )
  12  
  13  type IntervalStrategy func() <-chan time.Time
  14  
  15  // WaitSyncConfig defines the waiting options.
  16  type WaitSyncConfig struct {
  17  	// This method will be called from another goroutine.
  18  	Get              func() (value any, isTerminal bool, err error)
  19  	IntervalStrategy IntervalStrategy
  20  	Timeout          time.Duration
  21  }
  22  
  23  // LinearIntervalStrategy defines a linear interval duration.
  24  func LinearIntervalStrategy(interval time.Duration) IntervalStrategy {
  25  	return func() <-chan time.Time {
  26  		return time.After(interval)
  27  	}
  28  }
  29  
  30  // FibonacciIntervalStrategy defines an interval duration who follow the Fibonacci sequence.
  31  func FibonacciIntervalStrategy(base time.Duration, factor float32) IntervalStrategy {
  32  	var x, y float32 = 0, 1
  33  
  34  	return func() <-chan time.Time {
  35  		x, y = y, x+(y*factor)
  36  		return time.After(time.Duration(x) * base)
  37  	}
  38  }
  39  
  40  // WaitSync waits and returns when a given stop condition is true or if an error occurs.
  41  func WaitSync(config *WaitSyncConfig) (terminalValue any, err error) {
  42  	// initialize configuration
  43  	if config.IntervalStrategy == nil {
  44  		config.IntervalStrategy = LinearIntervalStrategy(defaultInterval)
  45  	}
  46  
  47  	if config.Timeout == 0 {
  48  		config.Timeout = defaultTimeout
  49  	}
  50  
  51  	resultValue := make(chan any)
  52  	resultErr := make(chan error)
  53  	timeout := make(chan bool)
  54  
  55  	go func() {
  56  		for {
  57  			// get the payload
  58  			value, stopCondition, err := config.Get()
  59  			// send the payload
  60  			if err != nil {
  61  				resultErr <- err
  62  				return
  63  			}
  64  			if stopCondition {
  65  				resultValue <- value
  66  				return
  67  			}
  68  
  69  			// waiting for an interval before next get() call or a timeout
  70  			select {
  71  			case <-timeout:
  72  				return
  73  			case <-config.IntervalStrategy():
  74  				// sleep
  75  			}
  76  		}
  77  	}()
  78  
  79  	// waiting for a result or a timeout
  80  	select {
  81  	case val := <-resultValue:
  82  		return val, nil
  83  	case err := <-resultErr:
  84  		return nil, err
  85  	case <-time.After(config.Timeout):
  86  		timeout <- true
  87  		return nil, fmt.Errorf("timeout after %v", config.Timeout)
  88  	}
  89  }
  90