1 // Package qu is a library for making handling signal (chan struct{}) channels
2 // simpler, as well as monitoring the state of the signal channels in an
3 // application.
4 package qu
5 6 import (
7 "fmt"
8 "strings"
9 "sync"
10 "time"
11 12 "go.uber.org/atomic"
13 "next.orly.dev/pkg/lol"
14 "next.orly.dev/pkg/lol/log"
15 )
16 17 // C is your basic empty struct signal channel
18 type C chan struct{}
19 20 var (
21 createdList []string
22 createdChannels []C
23 createdChannelBufferCounts []int
24 mx sync.Mutex
25 logEnabled = atomic.NewBool(false)
26 )
27 28 // SetLogging switches on and off the channel logging
29 func SetLogging(on bool) {
30 logEnabled.Store(on)
31 }
32 33 func l(a ...interface{}) {
34 if logEnabled.Load() {
35 log.D.Ln(a...)
36 }
37 }
38 39 func lc(cl func() string) {
40 if logEnabled.Load() {
41 log.D.Ln(cl())
42 }
43 }
44 45 // T creates an unbuffered chan struct{} for trigger and quit signalling
46 // (momentary and breaker switches)
47 func T() C {
48 mx.Lock()
49 defer mx.Unlock()
50 msg := fmt.Sprintf("chan from %s", lol.GetLoc(1))
51 l("created", msg)
52 createdList = append(createdList, msg)
53 o := make(C)
54 createdChannels = append(createdChannels, o)
55 createdChannelBufferCounts = append(createdChannelBufferCounts, 0)
56 return o
57 }
58 59 // Ts creates a buffered chan struct{} which is specifically intended for
60 // signalling without blocking, generally one is the size of buffer to be used,
61 // though there might be conceivable cases where the channel should accept more
62 // signals without blocking the caller
63 func Ts(n int) C {
64 mx.Lock()
65 defer mx.Unlock()
66 msg := fmt.Sprintf("buffered chan (%d) from %s", n, lol.GetLoc(1))
67 l("created", msg)
68 createdList = append(createdList, msg)
69 o := make(C, n)
70 createdChannels = append(createdChannels, o)
71 createdChannelBufferCounts = append(createdChannelBufferCounts, n)
72 return o
73 }
74 75 // Q closes the channel, which makes it emit a nil every time it is selected.
76 func (c C) Q() {
77 open := !testChanIsClosed(c)
78 lc(
79 func() (o string) {
80 lo := getLocForChan(c)
81 mx.Lock()
82 defer mx.Unlock()
83 if open {
84 return "closing chan from " + lo + "\n" + strings.Repeat(
85 " ",
86 48,
87 ) + "from" + lol.GetLoc(1)
88 } else {
89 return "from" + lol.GetLoc(1) + "\n" + strings.Repeat(" ", 48) +
90 "channel " + lo + " was already closed"
91 }
92 },
93 )
94 if open {
95 close(c)
96 }
97 }
98 99 // Signal sends struct{}{} on the channel which functions as a momentary switch,
100 // useful in pairs for stop/start
101 func (c C) Signal() {
102 lc(func() (o string) { return "signalling " + getLocForChan(c) })
103 if !testChanIsClosed(c) {
104 c <- struct{}{}
105 }
106 }
107 108 // Wait should be placed with a `<-` in a select case in addition to the channel
109 // variable name
110 func (c C) Wait() <-chan struct{} {
111 lc(
112 func() (o string) {
113 return fmt.Sprint(
114 "waiting on "+getLocForChan(c)+"at",
115 lol.GetLoc(1),
116 )
117 },
118 )
119 return c
120 }
121 122 // IsClosed exposes a test to see if the channel is closed
123 func (c C) IsClosed() bool {
124 return testChanIsClosed(c)
125 }
126 127 // testChanIsClosed allows you to see whether the channel has been closed so you
128 // can avoid a panic by trying to close or signal on it
129 func testChanIsClosed(ch C) (o bool) {
130 if ch == nil {
131 return true
132 }
133 select {
134 case <-ch:
135 o = true
136 default:
137 }
138 return
139 }
140 141 // getLocForChan finds which record connects to the channel in question
142 func getLocForChan(c C) (s string) {
143 s = "not found"
144 mx.Lock()
145 for i := range createdList {
146 if i >= len(createdChannels) {
147 break
148 }
149 if createdChannels[i] == c {
150 s = createdList[i]
151 }
152 }
153 mx.Unlock()
154 return
155 }
156 157 // once a minute clean up the channel cache to remove closed channels no longer
158 // in use
159 func init() {
160 go func() {
161 for {
162 <-time.After(time.Minute)
163 l("cleaning up closed channels")
164 var c []C
165 var ll []string
166 mx.Lock()
167 for i := range createdChannels {
168 if i >= len(createdList) {
169 break
170 }
171 if testChanIsClosed(createdChannels[i]) {
172 } else {
173 c = append(c, createdChannels[i])
174 ll = append(ll, createdList[i])
175 }
176 }
177 createdChannels = c
178 createdList = ll
179 mx.Unlock()
180 }
181 }()
182 }
183 184 // PrintChanState creates an output showing the current state of the channels
185 // being monitored This is a function for use by the programmer while debugging
186 func PrintChanState() {
187 mx.Lock()
188 for i := range createdChannels {
189 if i >= len(createdList) {
190 break
191 }
192 if testChanIsClosed(createdChannels[i]) {
193 log.T.Ln(">>> closed", createdList[i])
194 } else {
195 log.T.Ln("<<< open", createdList[i])
196 }
197 }
198 mx.Unlock()
199 }
200 201 // GetOpenUnbufferedChanCount returns the number of qu channels that are still
202 // open
203 func GetOpenUnbufferedChanCount() (o int) {
204 mx.Lock()
205 var c int
206 for i := range createdChannels {
207 if i >= len(createdChannels) {
208 break
209 }
210 // skip buffered channels
211 if createdChannelBufferCounts[i] > 0 {
212 continue
213 }
214 if testChanIsClosed(createdChannels[i]) {
215 c++
216 } else {
217 o++
218 }
219 }
220 mx.Unlock()
221 return
222 }
223 224 // GetOpenBufferedChanCount returns the number of qu channels that are still
225 // open
226 func GetOpenBufferedChanCount() (o int) {
227 mx.Lock()
228 var c int
229 for i := range createdChannels {
230 if i >= len(createdChannels) {
231 break
232 }
233 // skip unbuffered channels
234 if createdChannelBufferCounts[i] < 1 {
235 continue
236 }
237 if testChanIsClosed(createdChannels[i]) {
238 c++
239 } else {
240 o++
241 }
242 }
243 mx.Unlock()
244 return
245 }
246