1 package pipe
2 3 import (
4 "io"
5 "os"
6 7 "github.com/p9c/p9/pkg/log"
8 9 "github.com/p9c/p9/pkg/interrupt"
10 "github.com/p9c/p9/pkg/qu"
11 )
12 13 // Consume listens for messages from a child process over a stdio pipe.
14 func Consume(
15 quit qu.C,
16 handler func([]byte) error,
17 args ...string,
18 ) *Worker {
19 var n int
20 var e error
21 D.Ln("spawning worker process", args)
22 var w *Worker
23 if w, e = Spawn(quit, args...); E.Chk(e) {
24 }
25 data := make([]byte, 8192)
26 go func() {
27 out:
28 for {
29 select {
30 case <-interrupt.HandlersDone.Wait():
31 D.Ln("quitting log consumer")
32 break out
33 case <-quit.Wait():
34 D.Ln("breaking on quit signal")
35 break out
36 default:
37 }
38 n, e = w.StdConn.Read(data)
39 if n == 0 {
40 F.Ln("read zero from pipe", args)
41 log.LogChanDisabled.Store(true)
42 break out
43 }
44 if E.Chk(e) && e != io.EOF {
45 // Probably the child process has died, so quit
46 E.Ln("err:", e)
47 break out
48 } else if n > 0 {
49 if e = handler(data[:n]); E.Chk(e) {
50 }
51 }
52 }
53 }()
54 return w
55 }
56 57 // Serve runs a goroutine processing the FEC encoded packets, gathering them and
58 // decoding them to be delivered to a handler function
59 func Serve(quit qu.C, handler func([]byte) error) *StdConn {
60 var n int
61 var e error
62 data := make([]byte, 8192)
63 go func() {
64 D.Ln("starting pipe server")
65 out:
66 for {
67 select {
68 case <-quit.Wait():
69 break out
70 default:
71 }
72 n, e = os.Stdin.Read(data)
73 if e != nil && e != io.EOF {
74 break out
75 }
76 if n > 0 {
77 if e = handler(data[:n]); E.Chk(e) {
78 break out
79 }
80 }
81 }
82 D.Ln("pipe server shut down")
83 }()
84 return New(os.Stdin, os.Stdout, quit)
85 }
86