ticker.go raw

   1  package backoff
   2  
   3  import (
   4  	"context"
   5  	"sync"
   6  	"time"
   7  )
   8  
   9  // Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff.
  10  //
  11  // Ticks will continue to arrive when the previous operation is still running,
  12  // so operations that take a while to fail could run in quick succession.
  13  type Ticker struct {
  14  	C        <-chan time.Time
  15  	c        chan time.Time
  16  	b        BackOff
  17  	ctx      context.Context
  18  	timer    Timer
  19  	stop     chan struct{}
  20  	stopOnce sync.Once
  21  }
  22  
  23  // NewTicker returns a new Ticker containing a channel that will send
  24  // the time at times specified by the BackOff argument. Ticker is
  25  // guaranteed to tick at least once.  The channel is closed when Stop
  26  // method is called or BackOff stops. It is not safe to manipulate the
  27  // provided backoff policy (notably calling NextBackOff or Reset)
  28  // while the ticker is running.
  29  func NewTicker(b BackOff) *Ticker {
  30  	return NewTickerWithTimer(b, &defaultTimer{})
  31  }
  32  
  33  // NewTickerWithTimer returns a new Ticker with a custom timer.
  34  // A default timer that uses system timer is used when nil is passed.
  35  func NewTickerWithTimer(b BackOff, timer Timer) *Ticker {
  36  	if timer == nil {
  37  		timer = &defaultTimer{}
  38  	}
  39  	c := make(chan time.Time)
  40  	t := &Ticker{
  41  		C:     c,
  42  		c:     c,
  43  		b:     b,
  44  		ctx:   getContext(b),
  45  		timer: timer,
  46  		stop:  make(chan struct{}),
  47  	}
  48  	t.b.Reset()
  49  	go t.run()
  50  	return t
  51  }
  52  
  53  // Stop turns off a ticker. After Stop, no more ticks will be sent.
  54  func (t *Ticker) Stop() {
  55  	t.stopOnce.Do(func() { close(t.stop) })
  56  }
  57  
  58  func (t *Ticker) run() {
  59  	c := t.c
  60  	defer close(c)
  61  
  62  	// Ticker is guaranteed to tick at least once.
  63  	afterC := t.send(time.Now())
  64  
  65  	for {
  66  		if afterC == nil {
  67  			return
  68  		}
  69  
  70  		select {
  71  		case tick := <-afterC:
  72  			afterC = t.send(tick)
  73  		case <-t.stop:
  74  			t.c = nil // Prevent future ticks from being sent to the channel.
  75  			return
  76  		case <-t.ctx.Done():
  77  			return
  78  		}
  79  	}
  80  }
  81  
  82  func (t *Ticker) send(tick time.Time) <-chan time.Time {
  83  	select {
  84  	case t.c <- tick:
  85  	case <-t.stop:
  86  		return nil
  87  	}
  88  
  89  	next := t.b.NextBackOff()
  90  	if next == Stop {
  91  		t.Stop()
  92  		return nil
  93  	}
  94  
  95  	t.timer.Start(next)
  96  	return t.timer.C()
  97  }
  98