main.go raw

   1  package rununit
   2  
   3  import (
   4  	uberatomic "go.uber.org/atomic"
   5  
   6  	"github.com/p9c/p9/pkg/log"
   7  	"github.com/p9c/p9/pkg/pipe"
   8  
   9  	"github.com/p9c/p9/pkg/interrupt"
  10  	"github.com/p9c/p9/pkg/qu"
  11  )
  12  
  13  // RunUnit handles correctly starting and stopping child processes that have StdConn pipe logging enabled, allowing
  14  // custom hooks to run on start and stop,
  15  type RunUnit struct {
  16  	name                  string
  17  	args                  []string
  18  	running, shuttingDown uberatomic.Bool
  19  	commandChan           chan bool
  20  	worker                *pipe.Worker
  21  	quit                  qu.C
  22  }
  23  
  24  // New creates and starts a new rununit. run and stop functions are executed after starting and stopping. logger
  25  // receives log entries and processes them (such as logging them).
  26  func New(
  27  	name string, run, stop func(), logger func(ent *log.Entry) (e error),
  28  	pkgFilter func(pkg string) (out bool), quit qu.C, args ...string,
  29  ) (r *RunUnit) {
  30  	r = &RunUnit{
  31  		name:        name,
  32  		args:        args,
  33  		commandChan: make(chan bool),
  34  		quit:        qu.T(),
  35  	}
  36  	r.running.Store(false)
  37  	r.shuttingDown.Store(false)
  38  	go func() {
  39  		D.Ln("run unit command loop", args)
  40  		var e error
  41  	out:
  42  		for {
  43  			select {
  44  			case cmd := <-r.commandChan:
  45  				switch cmd {
  46  				case true:
  47  					D.Ln(r.running.Load(), "run called for", args)
  48  					if r.running.Load() {
  49  						D.Ln("already running", args)
  50  						continue
  51  					}
  52  					if r.worker != nil {
  53  						if e = r.worker.Kill(); E.Chk(e) {
  54  						}
  55  					}
  56  					// quit from rununit's quit, which closes after the main quit triggers stopping in the watcher loop
  57  					r.worker = pipe.LogConsume(r.quit, logger, pkgFilter, args...)
  58  					// D.S(r.worker)
  59  					pipe.Start(r.worker)
  60  					r.running.Store(true)
  61  					run()
  62  					// D.Ln(r.running.Load())
  63  				case false:
  64  					running := r.running.Load()
  65  					D.Ln("stop called for", args, running)
  66  					if !running {
  67  						D.Ln("wasn't running", args)
  68  						continue
  69  					}
  70  					pipe.Kill(r.worker)
  71  					// var e error
  72  					// if e = r.worker.Wait(); E.Chk(e) {
  73  					// }
  74  					r.running.Store(false)
  75  					stop()
  76  					D.Ln(args, "after stop", r.running.Load())
  77  				}
  78  				break
  79  			case <-r.quit.Wait():
  80  				D.Ln("runner stopped for", args)
  81  				break out
  82  			}
  83  		}
  84  	}()
  85  	// when the main quit signal is triggered, stop the run unit cleanly
  86  	go func() {
  87  	out:
  88  		select {
  89  		case <-quit.Wait():
  90  			D.Ln("runner quit trigger called", args)
  91  			running := r.running.Load()
  92  			if !running {
  93  				D.Ln("wasn't running", args)
  94  				break out
  95  			}
  96  			// r.quit.Q()
  97  			pipe.Kill(r.worker)
  98  			var e error
  99  			if e = r.worker.Wait(); E.Chk(e) {
 100  			}
 101  			r.running.Store(false)
 102  			stop()
 103  			D.Ln(args, "after stop", r.running.Load())
 104  		}
 105  	}()
 106  	interrupt.AddHandler(
 107  		func() {
 108  			quit.Q()
 109  		},
 110  	)
 111  	return
 112  }
 113  
 114  // Running returns whether the unit is running
 115  func (r *RunUnit) Running() bool {
 116  	return r.running.Load()
 117  }
 118  
 119  // Start signals the run unit to start
 120  func (r *RunUnit) Start() {
 121  	r.commandChan <- true
 122  }
 123  
 124  // Stop signals the run unit to stop
 125  func (r *RunUnit) Stop() {
 126  	r.commandChan <- false
 127  }
 128  
 129  // Shutdown terminates the run unit
 130  func (r *RunUnit) Shutdown() {
 131  	// debug.PrintStack()
 132  	if !r.shuttingDown.Load() {
 133  		r.shuttingDown.Store(true)
 134  		r.quit.Q()
 135  	}
 136  }
 137  
 138  // ShuttingDown returns true if the server is shuting down
 139  func (r *RunUnit) ShuttingDown() bool {
 140  	return r.shuttingDown.Load()
 141  }
 142