qu.go raw

   1  // Package qu is a library for making handling signal (chan struct{}) channels
   2  // simpler, as well as monitoring the state of the signal channels in an
   3  // application.
   4  package qu
   5  
   6  import (
   7  	"fmt"
   8  	"strings"
   9  	"sync"
  10  	"time"
  11  
  12  	"go.uber.org/atomic"
  13  	"next.orly.dev/pkg/lol"
  14  	"next.orly.dev/pkg/lol/log"
  15  )
  16  
  17  // C is your basic empty struct signal channel
  18  type C chan struct{}
  19  
  20  var (
  21  	createdList                []string
  22  	createdChannels            []C
  23  	createdChannelBufferCounts []int
  24  	mx                         sync.Mutex
  25  	logEnabled                 = atomic.NewBool(false)
  26  )
  27  
  28  // SetLogging switches on and off the channel logging
  29  func SetLogging(on bool) {
  30  	logEnabled.Store(on)
  31  }
  32  
  33  func l(a ...interface{}) {
  34  	if logEnabled.Load() {
  35  		log.D.Ln(a...)
  36  	}
  37  }
  38  
  39  func lc(cl func() string) {
  40  	if logEnabled.Load() {
  41  		log.D.Ln(cl())
  42  	}
  43  }
  44  
  45  // T creates an unbuffered chan struct{} for trigger and quit signalling
  46  // (momentary and breaker switches)
  47  func T() C {
  48  	mx.Lock()
  49  	defer mx.Unlock()
  50  	msg := fmt.Sprintf("chan from %s", lol.GetLoc(1))
  51  	l("created", msg)
  52  	createdList = append(createdList, msg)
  53  	o := make(C)
  54  	createdChannels = append(createdChannels, o)
  55  	createdChannelBufferCounts = append(createdChannelBufferCounts, 0)
  56  	return o
  57  }
  58  
  59  // Ts creates a buffered chan struct{} which is specifically intended for
  60  // signalling without blocking, generally one is the size of buffer to be used,
  61  // though there might be conceivable cases where the channel should accept more
  62  // signals without blocking the caller
  63  func Ts(n int) C {
  64  	mx.Lock()
  65  	defer mx.Unlock()
  66  	msg := fmt.Sprintf("buffered chan (%d) from %s", n, lol.GetLoc(1))
  67  	l("created", msg)
  68  	createdList = append(createdList, msg)
  69  	o := make(C, n)
  70  	createdChannels = append(createdChannels, o)
  71  	createdChannelBufferCounts = append(createdChannelBufferCounts, n)
  72  	return o
  73  }
  74  
  75  // Q closes the channel, which makes it emit a nil every time it is selected.
  76  func (c C) Q() {
  77  	open := !testChanIsClosed(c)
  78  	lc(
  79  		func() (o string) {
  80  			lo := getLocForChan(c)
  81  			mx.Lock()
  82  			defer mx.Unlock()
  83  			if open {
  84  				return "closing chan from " + lo + "\n" + strings.Repeat(
  85  					" ",
  86  					48,
  87  				) + "from" + lol.GetLoc(1)
  88  			} else {
  89  				return "from" + lol.GetLoc(1) + "\n" + strings.Repeat(" ", 48) +
  90  					"channel " + lo + " was already closed"
  91  			}
  92  		},
  93  	)
  94  	if open {
  95  		close(c)
  96  	}
  97  }
  98  
  99  // Signal sends struct{}{} on the channel which functions as a momentary switch,
 100  // useful in pairs for stop/start
 101  func (c C) Signal() {
 102  	lc(func() (o string) { return "signalling " + getLocForChan(c) })
 103  	if !testChanIsClosed(c) {
 104  		c <- struct{}{}
 105  	}
 106  }
 107  
 108  // Wait should be placed with a `<-` in a select case in addition to the channel
 109  // variable name
 110  func (c C) Wait() <-chan struct{} {
 111  	lc(
 112  		func() (o string) {
 113  			return fmt.Sprint(
 114  				"waiting on "+getLocForChan(c)+"at",
 115  				lol.GetLoc(1),
 116  			)
 117  		},
 118  	)
 119  	return c
 120  }
 121  
 122  // IsClosed exposes a test to see if the channel is closed
 123  func (c C) IsClosed() bool {
 124  	return testChanIsClosed(c)
 125  }
 126  
 127  // testChanIsClosed allows you to see whether the channel has been closed so you
 128  // can avoid a panic by trying to close or signal on it
 129  func testChanIsClosed(ch C) (o bool) {
 130  	if ch == nil {
 131  		return true
 132  	}
 133  	select {
 134  	case <-ch:
 135  		o = true
 136  	default:
 137  	}
 138  	return
 139  }
 140  
 141  // getLocForChan finds which record connects to the channel in question
 142  func getLocForChan(c C) (s string) {
 143  	s = "not found"
 144  	mx.Lock()
 145  	for i := range createdList {
 146  		if i >= len(createdChannels) {
 147  			break
 148  		}
 149  		if createdChannels[i] == c {
 150  			s = createdList[i]
 151  		}
 152  	}
 153  	mx.Unlock()
 154  	return
 155  }
 156  
 157  // once a minute clean up the channel cache to remove closed channels no longer
 158  // in use
 159  func init() {
 160  	go func() {
 161  		for {
 162  			<-time.After(time.Minute)
 163  			l("cleaning up closed channels")
 164  			var c []C
 165  			var ll []string
 166  			mx.Lock()
 167  			for i := range createdChannels {
 168  				if i >= len(createdList) {
 169  					break
 170  				}
 171  				if testChanIsClosed(createdChannels[i]) {
 172  				} else {
 173  					c = append(c, createdChannels[i])
 174  					ll = append(ll, createdList[i])
 175  				}
 176  			}
 177  			createdChannels = c
 178  			createdList = ll
 179  			mx.Unlock()
 180  		}
 181  	}()
 182  }
 183  
 184  // PrintChanState creates an output showing the current state of the channels
 185  // being monitored This is a function for use by the programmer while debugging
 186  func PrintChanState() {
 187  	mx.Lock()
 188  	for i := range createdChannels {
 189  		if i >= len(createdList) {
 190  			break
 191  		}
 192  		if testChanIsClosed(createdChannels[i]) {
 193  			log.T.Ln(">>> closed", createdList[i])
 194  		} else {
 195  			log.T.Ln("<<< open", createdList[i])
 196  		}
 197  	}
 198  	mx.Unlock()
 199  }
 200  
 201  // GetOpenUnbufferedChanCount returns the number of qu channels that are still
 202  // open
 203  func GetOpenUnbufferedChanCount() (o int) {
 204  	mx.Lock()
 205  	var c int
 206  	for i := range createdChannels {
 207  		if i >= len(createdChannels) {
 208  			break
 209  		}
 210  		// skip buffered channels
 211  		if createdChannelBufferCounts[i] > 0 {
 212  			continue
 213  		}
 214  		if testChanIsClosed(createdChannels[i]) {
 215  			c++
 216  		} else {
 217  			o++
 218  		}
 219  	}
 220  	mx.Unlock()
 221  	return
 222  }
 223  
 224  // GetOpenBufferedChanCount returns the number of qu channels that are still
 225  // open
 226  func GetOpenBufferedChanCount() (o int) {
 227  	mx.Lock()
 228  	var c int
 229  	for i := range createdChannels {
 230  		if i >= len(createdChannels) {
 231  			break
 232  		}
 233  		// skip unbuffered channels
 234  		if createdChannelBufferCounts[i] < 1 {
 235  			continue
 236  		}
 237  		if testChanIsClosed(createdChannels[i]) {
 238  			c++
 239  		} else {
 240  			o++
 241  		}
 242  	}
 243  	mx.Unlock()
 244  	return
 245  }
 246