main.go raw
1 package rununit
2
3 import (
4 uberatomic "go.uber.org/atomic"
5
6 "github.com/p9c/p9/pkg/log"
7 "github.com/p9c/p9/pkg/pipe"
8
9 "github.com/p9c/p9/pkg/interrupt"
10 "github.com/p9c/p9/pkg/qu"
11 )
12
13 // RunUnit handles correctly starting and stopping child processes that have StdConn pipe logging enabled, allowing
14 // custom hooks to run on start and stop,
15 type RunUnit struct {
16 name string
17 args []string
18 running, shuttingDown uberatomic.Bool
19 commandChan chan bool
20 worker *pipe.Worker
21 quit qu.C
22 }
23
24 // New creates and starts a new rununit. run and stop functions are executed after starting and stopping. logger
25 // receives log entries and processes them (such as logging them).
26 func New(
27 name string, run, stop func(), logger func(ent *log.Entry) (e error),
28 pkgFilter func(pkg string) (out bool), quit qu.C, args ...string,
29 ) (r *RunUnit) {
30 r = &RunUnit{
31 name: name,
32 args: args,
33 commandChan: make(chan bool),
34 quit: qu.T(),
35 }
36 r.running.Store(false)
37 r.shuttingDown.Store(false)
38 go func() {
39 D.Ln("run unit command loop", args)
40 var e error
41 out:
42 for {
43 select {
44 case cmd := <-r.commandChan:
45 switch cmd {
46 case true:
47 D.Ln(r.running.Load(), "run called for", args)
48 if r.running.Load() {
49 D.Ln("already running", args)
50 continue
51 }
52 if r.worker != nil {
53 if e = r.worker.Kill(); E.Chk(e) {
54 }
55 }
56 // quit from rununit's quit, which closes after the main quit triggers stopping in the watcher loop
57 r.worker = pipe.LogConsume(r.quit, logger, pkgFilter, args...)
58 // D.S(r.worker)
59 pipe.Start(r.worker)
60 r.running.Store(true)
61 run()
62 // D.Ln(r.running.Load())
63 case false:
64 running := r.running.Load()
65 D.Ln("stop called for", args, running)
66 if !running {
67 D.Ln("wasn't running", args)
68 continue
69 }
70 pipe.Kill(r.worker)
71 // var e error
72 // if e = r.worker.Wait(); E.Chk(e) {
73 // }
74 r.running.Store(false)
75 stop()
76 D.Ln(args, "after stop", r.running.Load())
77 }
78 break
79 case <-r.quit.Wait():
80 D.Ln("runner stopped for", args)
81 break out
82 }
83 }
84 }()
85 // when the main quit signal is triggered, stop the run unit cleanly
86 go func() {
87 out:
88 select {
89 case <-quit.Wait():
90 D.Ln("runner quit trigger called", args)
91 running := r.running.Load()
92 if !running {
93 D.Ln("wasn't running", args)
94 break out
95 }
96 // r.quit.Q()
97 pipe.Kill(r.worker)
98 var e error
99 if e = r.worker.Wait(); E.Chk(e) {
100 }
101 r.running.Store(false)
102 stop()
103 D.Ln(args, "after stop", r.running.Load())
104 }
105 }()
106 interrupt.AddHandler(
107 func() {
108 quit.Q()
109 },
110 )
111 return
112 }
113
114 // Running returns whether the unit is running
115 func (r *RunUnit) Running() bool {
116 return r.running.Load()
117 }
118
119 // Start signals the run unit to start
120 func (r *RunUnit) Start() {
121 r.commandChan <- true
122 }
123
124 // Stop signals the run unit to stop
125 func (r *RunUnit) Stop() {
126 r.commandChan <- false
127 }
128
129 // Shutdown terminates the run unit
130 func (r *RunUnit) Shutdown() {
131 // debug.PrintStack()
132 if !r.shuttingDown.Load() {
133 r.shuttingDown.Store(true)
134 r.quit.Q()
135 }
136 }
137
138 // ShuttingDown returns true if the server is shuting down
139 func (r *RunUnit) ShuttingDown() bool {
140 return r.shuttingDown.Load()
141 }
142