quit.go raw

   1  package qu
   2  
   3  import (
   4  	"strings"
   5  	"sync"
   6  	"time"
   7  
   8  	"github.com/p9c/p9/pkg/log"
   9  	"go.uber.org/atomic"
  10  )
  11  
  12  // C is your basic empty struct signalling channel
  13  type C chan struct{}
  14  
  15  var (
  16  	createdList     []string
  17  	createdChannels []C
  18  	mx              sync.Mutex
  19  	logEnabled      = atomic.NewBool(false)
  20  )
  21  
  22  // SetLogging switches on and off the channel logging
  23  func SetLogging(on bool) {
  24  	logEnabled.Store(on)
  25  }
  26  
  27  func l(a ...interface{}) {
  28  	if logEnabled.Load() {
  29  		D.Ln(a...)
  30  	}
  31  }
  32  
  33  // T creates an unbuffered chan struct{} for trigger and quit signalling (momentary and breaker switches)
  34  func T() C {
  35  	mx.Lock()
  36  	defer mx.Unlock()
  37  	msg := log.Caller("chan from", 1)
  38  	l("created", msg)
  39  	createdList = append(createdList, msg)
  40  	o := make(C)
  41  	createdChannels = append(createdChannels, o)
  42  	return o
  43  }
  44  
  45  // Ts creates a buffered chan struct{} which is specifically intended for signalling without blocking, generally one is
  46  // the size of buffer to be used, though there might be conceivable cases where the channel should accept more signals
  47  // without blocking the caller
  48  func Ts(n int) C {
  49  	mx.Lock()
  50  	defer mx.Unlock()
  51  	msg := log.Caller("buffered chan from", 1)
  52  	l("created", msg)
  53  	createdList = append(createdList, msg)
  54  	o := make(C, n)
  55  	createdChannels = append(createdChannels, o)
  56  	return o
  57  }
  58  
  59  // Q closes the channel, which makes it emit a nil every time it is selected
  60  func (c C) Q() {
  61  	l(func() (o string) {
  62  		loc := getLocForChan(c)
  63  		mx.Lock()
  64  		defer mx.Unlock()
  65  		if !testChanIsClosed(c) {
  66  			close(c)
  67  			return "closing chan from " + loc + log.Caller("\n"+strings.Repeat(" ", 48)+"from", 1)
  68  		} else {
  69  			return "from" + log.Caller("", 1) + "\n" + strings.Repeat(" ", 48) +
  70  				"channel " + loc + " was already closed"
  71  		}
  72  	}(),
  73  	)
  74  }
  75  
  76  // Signal sends struct{}{} on the channel which functions as a momentary switch, useful in pairs for stop/start
  77  func (c C) Signal() {
  78  	l(func() (o string) { return "signalling " + getLocForChan(c) }())
  79  	c <- struct{}{}
  80  }
  81  
  82  // Wait should be placed with a `<-` in a select case in addition to the channel variable name
  83  func (c C) Wait() <-chan struct{} {
  84  	l(func() (o string) { return "waiting on " + getLocForChan(c) + log.Caller("at", 1) }())
  85  	return c
  86  }
  87  
  88  // testChanIsClosed allows you to see whether the channel has been closed so you can avoid a panic by trying to close or
  89  // signal on it
  90  func testChanIsClosed(ch C) (o bool) {
  91  	if ch == nil {
  92  		return true
  93  	}
  94  	select {
  95  	case <-ch:
  96  		o = true
  97  	default:
  98  	}
  99  	return
 100  }
 101  
 102  // getLocForChan finds which record connects to the channel in question
 103  func getLocForChan(c C) (s string) {
 104  	s = "not found"
 105  	mx.Lock()
 106  	for i := range createdList {
 107  		if i >= len(createdChannels) {
 108  			break
 109  		}
 110  		if createdChannels[i] == c {
 111  			s = createdList[i]
 112  		}
 113  	}
 114  	mx.Unlock()
 115  	return
 116  }
 117  
 118  // once a minute clean up the channel cache to remove closed channels no longer in use
 119  func init() {
 120  	go func() {
 121  		for {
 122  			<-time.After(time.Minute)
 123  			D.Ln("cleaning up closed channels")
 124  			var c []C
 125  			var ll []string
 126  			mx.Lock()
 127  			for i := range createdChannels {
 128  				if i >= len(createdList) {
 129  					break
 130  				}
 131  				if testChanIsClosed(createdChannels[i]) {
 132  				} else {
 133  					c = append(c, createdChannels[i])
 134  					ll = append(ll, createdList[i])
 135  				}
 136  			}
 137  			createdChannels = c
 138  			createdList = ll
 139  			mx.Unlock()
 140  		}
 141  	}()
 142  }
 143  
 144  // PrintChanState creates an output showing the current state of the channels being monitored
 145  // This is a function for use by the programmer while debugging
 146  func PrintChanState() {
 147  	mx.Lock()
 148  	for i := range createdChannels {
 149  		if i >= len(createdList) {
 150  			break
 151  		}
 152  		if testChanIsClosed(createdChannels[i]) {
 153  			_T.Ln(">>> closed", createdList[i])
 154  		} else {
 155  			_T.Ln("<<< open", createdList[i])
 156  		}
 157  	}
 158  	mx.Unlock()
 159  }
 160  
 161  // GetOpenChanCount returns the number of qu channels that are still open
 162  // todo: this needs to only apply to unbuffered type
 163  func GetOpenChanCount() (o int) {
 164  	mx.Lock()
 165  	var c int
 166  	for i := range createdChannels {
 167  		if i >= len(createdChannels) {
 168  			break
 169  		}
 170  		if testChanIsClosed(createdChannels[i]) {
 171  			c++
 172  		} else {
 173  			o++
 174  		}
 175  	}
 176  	mx.Unlock()
 177  	return
 178  }
 179