1 package qu
2 3 import (
4 "strings"
5 "sync"
6 "time"
7 8 "github.com/p9c/p9/pkg/log"
9 "go.uber.org/atomic"
10 )
11 12 // C is your basic empty struct signalling channel
13 type C chan struct{}
14 15 var (
16 createdList []string
17 createdChannels []C
18 mx sync.Mutex
19 logEnabled = atomic.NewBool(false)
20 )
21 22 // SetLogging switches on and off the channel logging
23 func SetLogging(on bool) {
24 logEnabled.Store(on)
25 }
26 27 func l(a ...interface{}) {
28 if logEnabled.Load() {
29 D.Ln(a...)
30 }
31 }
32 33 // T creates an unbuffered chan struct{} for trigger and quit signalling (momentary and breaker switches)
34 func T() C {
35 mx.Lock()
36 defer mx.Unlock()
37 msg := log.Caller("chan from", 1)
38 l("created", msg)
39 createdList = append(createdList, msg)
40 o := make(C)
41 createdChannels = append(createdChannels, o)
42 return o
43 }
44 45 // Ts creates a buffered chan struct{} which is specifically intended for signalling without blocking, generally one is
46 // the size of buffer to be used, though there might be conceivable cases where the channel should accept more signals
47 // without blocking the caller
48 func Ts(n int) C {
49 mx.Lock()
50 defer mx.Unlock()
51 msg := log.Caller("buffered chan from", 1)
52 l("created", msg)
53 createdList = append(createdList, msg)
54 o := make(C, n)
55 createdChannels = append(createdChannels, o)
56 return o
57 }
58 59 // Q closes the channel, which makes it emit a nil every time it is selected
60 func (c C) Q() {
61 l(func() (o string) {
62 loc := getLocForChan(c)
63 mx.Lock()
64 defer mx.Unlock()
65 if !testChanIsClosed(c) {
66 close(c)
67 return "closing chan from " + loc + log.Caller("\n"+strings.Repeat(" ", 48)+"from", 1)
68 } else {
69 return "from" + log.Caller("", 1) + "\n" + strings.Repeat(" ", 48) +
70 "channel " + loc + " was already closed"
71 }
72 }(),
73 )
74 }
75 76 // Signal sends struct{}{} on the channel which functions as a momentary switch, useful in pairs for stop/start
77 func (c C) Signal() {
78 l(func() (o string) { return "signalling " + getLocForChan(c) }())
79 c <- struct{}{}
80 }
81 82 // Wait should be placed with a `<-` in a select case in addition to the channel variable name
83 func (c C) Wait() <-chan struct{} {
84 l(func() (o string) { return "waiting on " + getLocForChan(c) + log.Caller("at", 1) }())
85 return c
86 }
87 88 // testChanIsClosed allows you to see whether the channel has been closed so you can avoid a panic by trying to close or
89 // signal on it
90 func testChanIsClosed(ch C) (o bool) {
91 if ch == nil {
92 return true
93 }
94 select {
95 case <-ch:
96 o = true
97 default:
98 }
99 return
100 }
101 102 // getLocForChan finds which record connects to the channel in question
103 func getLocForChan(c C) (s string) {
104 s = "not found"
105 mx.Lock()
106 for i := range createdList {
107 if i >= len(createdChannels) {
108 break
109 }
110 if createdChannels[i] == c {
111 s = createdList[i]
112 }
113 }
114 mx.Unlock()
115 return
116 }
117 118 // once a minute clean up the channel cache to remove closed channels no longer in use
119 func init() {
120 go func() {
121 for {
122 <-time.After(time.Minute)
123 D.Ln("cleaning up closed channels")
124 var c []C
125 var ll []string
126 mx.Lock()
127 for i := range createdChannels {
128 if i >= len(createdList) {
129 break
130 }
131 if testChanIsClosed(createdChannels[i]) {
132 } else {
133 c = append(c, createdChannels[i])
134 ll = append(ll, createdList[i])
135 }
136 }
137 createdChannels = c
138 createdList = ll
139 mx.Unlock()
140 }
141 }()
142 }
143 144 // PrintChanState creates an output showing the current state of the channels being monitored
145 // This is a function for use by the programmer while debugging
146 func PrintChanState() {
147 mx.Lock()
148 for i := range createdChannels {
149 if i >= len(createdList) {
150 break
151 }
152 if testChanIsClosed(createdChannels[i]) {
153 _T.Ln(">>> closed", createdList[i])
154 } else {
155 _T.Ln("<<< open", createdList[i])
156 }
157 }
158 mx.Unlock()
159 }
160 161 // GetOpenChanCount returns the number of qu channels that are still open
162 // todo: this needs to only apply to unbuffered type
163 func GetOpenChanCount() (o int) {
164 mx.Lock()
165 var c int
166 for i := range createdChannels {
167 if i >= len(createdChannels) {
168 break
169 }
170 if testChanIsClosed(createdChannels[i]) {
171 c++
172 } else {
173 o++
174 }
175 }
176 mx.Unlock()
177 return
178 }
179