pipe.go raw

   1  package pipe
   2  
   3  import (
   4  	"io"
   5  	"os"
   6  
   7  	"github.com/p9c/p9/pkg/log"
   8  
   9  	"github.com/p9c/p9/pkg/interrupt"
  10  	"github.com/p9c/p9/pkg/qu"
  11  )
  12  
  13  // Consume listens for messages from a child process over a stdio pipe.
  14  func Consume(
  15  	quit qu.C,
  16  	handler func([]byte) error,
  17  	args ...string,
  18  ) *Worker {
  19  	var n int
  20  	var e error
  21  	D.Ln("spawning worker process", args)
  22  	var w *Worker
  23  	if w, e = Spawn(quit, args...); E.Chk(e) {
  24  	}
  25  	data := make([]byte, 8192)
  26  	go func() {
  27  	out:
  28  		for {
  29  			select {
  30  			case <-interrupt.HandlersDone.Wait():
  31  				D.Ln("quitting log consumer")
  32  				break out
  33  			case <-quit.Wait():
  34  				D.Ln("breaking on quit signal")
  35  				break out
  36  			default:
  37  			}
  38  			n, e = w.StdConn.Read(data)
  39  			if n == 0 {
  40  				F.Ln("read zero from pipe", args)
  41  				log.LogChanDisabled.Store(true)
  42  				break out
  43  			}
  44  			if E.Chk(e) && e != io.EOF {
  45  				// Probably the child process has died, so quit
  46  				E.Ln("err:", e)
  47  				break out
  48  			} else if n > 0 {
  49  				if e = handler(data[:n]); E.Chk(e) {
  50  				}
  51  			}
  52  		}
  53  	}()
  54  	return w
  55  }
  56  
  57  // Serve runs a goroutine processing the FEC encoded packets, gathering them and
  58  // decoding them to be delivered to a handler function
  59  func Serve(quit qu.C, handler func([]byte) error) *StdConn {
  60  	var n int
  61  	var e error
  62  	data := make([]byte, 8192)
  63  	go func() {
  64  		D.Ln("starting pipe server")
  65  	out:
  66  		for {
  67  			select {
  68  			case <-quit.Wait():
  69  				break out
  70  			default:
  71  			}
  72  			n, e = os.Stdin.Read(data)
  73  			if e != nil && e != io.EOF {
  74  				break out
  75  			}
  76  			if n > 0 {
  77  				if e = handler(data[:n]); E.Chk(e) {
  78  					break out
  79  				}
  80  			}
  81  		}
  82  		D.Ln("pipe server shut down")
  83  	}()
  84  	return New(os.Stdin, os.Stdout, quit)
  85  }
  86