unbounded_executor.go raw

   1  package concurrent
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"runtime"
   7  	"runtime/debug"
   8  	"sync"
   9  	"time"
  10  	"reflect"
  11  )
  12  
  13  // HandlePanic logs goroutine panic by default
  14  var HandlePanic = func(recovered interface{}, funcName string) {
  15  	ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
  16  	ErrorLogger.Println(string(debug.Stack()))
  17  }
  18  
  19  // UnboundedExecutor is a executor without limits on counts of alive goroutines
  20  // it tracks the goroutine started by it, and can cancel them when shutdown
  21  type UnboundedExecutor struct {
  22  	ctx                   context.Context
  23  	cancel                context.CancelFunc
  24  	activeGoroutinesMutex *sync.Mutex
  25  	activeGoroutines      map[string]int
  26  	HandlePanic           func(recovered interface{}, funcName string)
  27  }
  28  
  29  // GlobalUnboundedExecutor has the life cycle of the program itself
  30  // any goroutine want to be shutdown before main exit can be started from this executor
  31  // GlobalUnboundedExecutor expects the main function to call stop
  32  // it does not magically knows the main function exits
  33  var GlobalUnboundedExecutor = NewUnboundedExecutor()
  34  
  35  // NewUnboundedExecutor creates a new UnboundedExecutor,
  36  // UnboundedExecutor can not be created by &UnboundedExecutor{}
  37  // HandlePanic can be set with a callback to override global HandlePanic
  38  func NewUnboundedExecutor() *UnboundedExecutor {
  39  	ctx, cancel := context.WithCancel(context.TODO())
  40  	return &UnboundedExecutor{
  41  		ctx:                   ctx,
  42  		cancel:                cancel,
  43  		activeGoroutinesMutex: &sync.Mutex{},
  44  		activeGoroutines:      map[string]int{},
  45  	}
  46  }
  47  
  48  // Go starts a new goroutine and tracks its lifecycle.
  49  // Panic will be recovered and logged automatically, except for StopSignal
  50  func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
  51  	pc := reflect.ValueOf(handler).Pointer()
  52  	f := runtime.FuncForPC(pc)
  53  	funcName := f.Name()
  54  	file, line := f.FileLine(pc)
  55  	executor.activeGoroutinesMutex.Lock()
  56  	defer executor.activeGoroutinesMutex.Unlock()
  57  	startFrom := fmt.Sprintf("%s:%d", file, line)
  58  	executor.activeGoroutines[startFrom] += 1
  59  	go func() {
  60  		defer func() {
  61  			recovered := recover()
  62  			// if you want to quit a goroutine without trigger HandlePanic
  63  			// use runtime.Goexit() to quit
  64  			if recovered != nil {
  65  				if executor.HandlePanic == nil {
  66  					HandlePanic(recovered, funcName)
  67  				} else {
  68  					executor.HandlePanic(recovered, funcName)
  69  				}
  70  			}
  71  			executor.activeGoroutinesMutex.Lock()
  72  			executor.activeGoroutines[startFrom] -= 1
  73  			executor.activeGoroutinesMutex.Unlock()
  74  		}()
  75  		handler(executor.ctx)
  76  	}()
  77  }
  78  
  79  // Stop cancel all goroutines started by this executor without wait
  80  func (executor *UnboundedExecutor) Stop() {
  81  	executor.cancel()
  82  }
  83  
  84  // StopAndWaitForever cancel all goroutines started by this executor and
  85  // wait until all goroutines exited
  86  func (executor *UnboundedExecutor) StopAndWaitForever() {
  87  	executor.StopAndWait(context.Background())
  88  }
  89  
  90  // StopAndWait cancel all goroutines started by this executor and wait.
  91  // Wait can be cancelled by the context passed in.
  92  func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
  93  	executor.cancel()
  94  	for {
  95  		oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
  96  		select {
  97  		case <-oneHundredMilliseconds.C:
  98  			if executor.checkNoActiveGoroutines() {
  99  				return
 100  			}
 101  		case <-ctx.Done():
 102  			return
 103  		}
 104  	}
 105  }
 106  
 107  func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
 108  	executor.activeGoroutinesMutex.Lock()
 109  	defer executor.activeGoroutinesMutex.Unlock()
 110  	for startFrom, count := range executor.activeGoroutines {
 111  		if count > 0 {
 112  			InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
 113  				"startFrom", startFrom,
 114  				"count", count)
 115  			return false
 116  		}
 117  	}
 118  	return true
 119  }
 120