conn.mx raw
1 package relay
2
3 import (
4 "smesh.lol/web/common/jsbridge/ws"
5 "smesh.lol/web/common/nostr"
6 )
7
8 // State constants for connection readiness.
9 const (
10 StateConnecting = 0
11 StateOpen = 1
12 StateClosed = 2
13 )
14
15 // Conn is a single relay WebSocket connection.
16 type Conn struct {
17 URL string
18 wsConn ws.Conn
19 state int
20 subs map[string]*Sub
21
22 // Callbacks set before Dial returns.
23 onReady func(bool)
24
25 onEvent func(string, *nostr.Event)
26 onEOSE func(string)
27 onOK func(string, bool, string)
28 onAuth func(string)
29
30 closing bool // true when Close() was called intentionally
31 pendingPublish []string // EVENT messages queued while connecting
32
33 // ScheduleReconnect is set by the consumer to provide a delayed callback.
34 // When a connection drops with active subscriptions, this is called with
35 // the reconnect function. The consumer wraps it in a timer (e.g. 5s delay).
36 ScheduleReconnect func(fn func())
37 }
38
39 // Dial opens a connection to a relay.
40 // Call OnReady to receive the open/fail notification, then Start to begin processing.
41 func Dial(url string) *Conn {
42 c := &Conn{
43 URL: url,
44 state: StateConnecting,
45 subs: map[string]*Sub{},
46 }
47
48 c.dial()
49 return c
50 }
51
52 func (c *Conn) dial() {
53 c.wsConn = ws.Dial(
54 c.URL,
55 func(connID int, data string) {
56 c.handleMessage(data)
57 },
58 func(connID int) {
59 c.state = StateOpen
60 if c.onReady != nil {
61 c.onReady(true)
62 c.onReady = nil
63 }
64 c.flushSubs()
65 c.flushPublish()
66 },
67 func(connID int, code int, reason string) {
68 c.state = StateClosed
69 if c.onReady != nil {
70 c.onReady(false)
71 c.onReady = nil
72 }
73 c.maybeReconnect()
74 },
75 func(connID int) {
76 c.state = StateClosed
77 if c.onReady != nil {
78 c.onReady(false)
79 c.onReady = nil
80 }
81 c.maybeReconnect()
82 },
83 )
84 }
85
86 func (c *Conn) maybeReconnect() {
87 if c.closing || len(c.subs) == 0 || c.ScheduleReconnect == nil {
88 return
89 }
90 c.state = StateConnecting // prevent pool from creating a duplicate
91 c.ScheduleReconnect(func() {
92 if c.closing {
93 return
94 }
95 c.dial()
96 })
97 }
98
99 // OnReady sets a callback that fires once when the connection opens (true) or fails (false).
100 func (c *Conn) OnReady(fn func(bool)) {
101 if c.state == StateOpen {
102 fn(true)
103 return
104 }
105 if c.state == StateClosed {
106 fn(false)
107 return
108 }
109 c.onReady = fn
110 }
111
112 // IsOpen returns whether the connection is open.
113 func (c *Conn) IsOpen() bool {
114 return c.state == StateOpen
115 }
116
117 func (c *Conn) handleMessage(msg string) {
118 label, subID, payload := nostr.ParseRelayMessage(msg)
119
120 switch label {
121 case "EVENT":
122 ev := nostr.ParseEvent(payload)
123 if ev == nil {
124 return
125 }
126 if sub, ok := c.subs[subID]; ok {
127 if sub.OnEvent != nil {
128 sub.OnEvent(ev)
129 }
130 }
131 if c.onEvent != nil {
132 c.onEvent(subID, ev)
133 }
134
135 case "EOSE":
136 if sub, ok := c.subs[subID]; ok {
137 sub.gotEOSE = true
138 if sub.OnEOSE != nil {
139 sub.OnEOSE()
140 }
141 }
142 if c.onEOSE != nil {
143 c.onEOSE(subID)
144 }
145
146 case "OK":
147 ok := len(payload) > 0 && payload[0] == 't'
148 msg := ""
149 idx := indexOf(payload, ':')
150 if idx >= 0 && idx+1 < len(payload) {
151 msg = payload[idx+1:]
152 }
153 if c.onOK != nil {
154 c.onOK(subID, ok, msg)
155 }
156
157 case "AUTH":
158 if c.onAuth != nil {
159 c.onAuth(payload)
160 }
161
162 case "NOTICE":
163 _ = payload
164 }
165 }
166
167 // Subscribe sends a REQ and tracks the subscription.
168 // If the connection is still opening, the REQ is deferred to flushSubs().
169 func (c *Conn) Subscribe(id string, filters []*nostr.Filter) *Sub {
170 sub := &Sub{
171 ID: id,
172 Filters: filters,
173 conn: c,
174 }
175 c.subs[id] = sub
176
177 if c.state == StateOpen {
178 msg := "[\"REQ\",\"" + id + "\""
179 for _, f := range filters {
180 msg += "," + f.Serialize()
181 }
182 msg += "]"
183 ws.Send(c.wsConn, msg)
184 }
185 // else: flushSubs() sends all stored subs when connection opens.
186
187 return sub
188 }
189
190 // Publish sends an EVENT message. Queues if the connection is still opening.
191 func (c *Conn) Publish(ev *nostr.Event) {
192 msg := "[\"EVENT\"," + eventJSON(ev) + "]"
193 if c.state != StateOpen {
194 c.pendingPublish = append(c.pendingPublish, msg)
195 return
196 }
197 ws.Send(c.wsConn, msg)
198 }
199
200 // CloseSubscription sends a CLOSE message.
201 func (c *Conn) CloseSubscription(id string) {
202 delete(c.subs, id)
203 msg := "[\"CLOSE\",\"" + id + "\"]"
204 ws.Send(c.wsConn, msg)
205 }
206
207 // Send sends a raw JSON message string.
208 func (c *Conn) Send(msg string) {
209 ws.Send(c.wsConn, msg)
210 }
211
212 // Close closes the connection intentionally (no reconnect).
213 func (c *Conn) Close() {
214 c.closing = true
215 c.state = StateClosed
216 ws.Close(c.wsConn)
217 }
218
219 // SetOnEvent sets a global event handler (all subscriptions).
220 func (c *Conn) SetOnEvent(fn func(string, *nostr.Event)) {
221 c.onEvent = fn
222 }
223
224 // SetOnEOSE sets a global EOSE handler.
225 func (c *Conn) SetOnEOSE(fn func(string)) {
226 c.onEOSE = fn
227 }
228
229 // SetOnOK sets a handler for OK responses.
230 func (c *Conn) SetOnOK(fn func(string, bool, string)) {
231 c.onOK = fn
232 }
233
234 // SetOnAuth sets a handler for AUTH challenges.
235 func (c *Conn) SetOnAuth(fn func(string)) {
236 c.onAuth = fn
237 }
238
239 // flushPublish sends queued EVENT messages that arrived while connecting.
240 func (c *Conn) flushPublish() {
241 for _, msg := range c.pendingPublish {
242 ws.Send(c.wsConn, msg)
243 }
244 c.pendingPublish = nil
245 }
246
247 // flushSubs re-sends REQ for all stored subscriptions (used after WS opens).
248 func (c *Conn) flushSubs() {
249 for _, sub := range c.subs {
250 msg := "[\"REQ\",\"" + sub.ID + "\""
251 for _, f := range sub.Filters {
252 msg += "," + f.Serialize()
253 }
254 msg += "]"
255 ws.Send(c.wsConn, msg)
256 }
257 }
258
259 func eventJSON(ev *nostr.Event) string {
260 buf := []byte{:0:512}
261 buf = append(buf, '{')
262 buf = append(buf, "\"id\":\""...)
263 buf = append(buf, ev.ID...)
264 buf = append(buf, "\",\"pubkey\":\""...)
265 buf = append(buf, ev.PubKey...)
266 buf = append(buf, "\",\"created_at\":"...)
267 buf = append(buf, itoa(ev.CreatedAt)...)
268 buf = append(buf, ",\"kind\":"...)
269 buf = append(buf, itoa(int64(ev.Kind))...)
270 buf = append(buf, ",\"tags\":"...)
271 buf = serializeTags(buf, ev.Tags)
272 buf = append(buf, ",\"content\":\""...)
273 buf = appendEscaped(buf, ev.Content)
274 buf = append(buf, "\",\"sig\":\""...)
275 buf = append(buf, ev.Sig...)
276 buf = append(buf, '"', '}')
277 return string(buf)
278 }
279
280 func serializeTags(buf []byte, tags nostr.Tags) []byte {
281 buf = append(buf, '[')
282 for i, tag := range tags {
283 if i > 0 {
284 buf = append(buf, ',')
285 }
286 buf = append(buf, '[')
287 for j, s := range tag {
288 if j > 0 {
289 buf = append(buf, ',')
290 }
291 buf = append(buf, '"')
292 buf = appendEscaped(buf, s)
293 buf = append(buf, '"')
294 }
295 buf = append(buf, ']')
296 }
297 buf = append(buf, ']')
298 return buf
299 }
300
301 func appendEscaped(buf []byte, s string) []byte {
302 for i := 0; i < len(s); i++ {
303 c := s[i]
304 switch c {
305 case '"':
306 buf = append(buf, '\\', '"')
307 case '\\':
308 buf = append(buf, '\\', '\\')
309 case '\n':
310 buf = append(buf, '\\', 'n')
311 case '\r':
312 buf = append(buf, '\\', 'r')
313 case '\t':
314 buf = append(buf, '\\', 't')
315 default:
316 buf = append(buf, c)
317 }
318 }
319 return buf
320 }
321
322 func itoa(n int64) string {
323 if n == 0 {
324 return "0"
325 }
326 neg := false
327 if n < 0 {
328 neg = true
329 n = -n
330 }
331 var b [20]byte
332 i := len(b)
333 for n > 0 {
334 i--
335 b[i] = byte('0' + n%10)
336 n /= 10
337 }
338 if neg {
339 i--
340 b[i] = '-'
341 }
342 return string(b[i:])
343 }
344
345 func indexOf(s string, c byte) int {
346 for i := 0; i < len(s); i++ {
347 if s[i] == c {
348 return i
349 }
350 }
351 return -1
352 }
353