client.go raw
1 package ws
2
3 import (
4 "bytes"
5 "context"
6 "crypto/tls"
7 "errors"
8 "fmt"
9 "net/http"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 "next.orly.dev/pkg/nostr/encoders/envelopes"
17 "next.orly.dev/pkg/nostr/encoders/envelopes/authenvelope"
18 "next.orly.dev/pkg/nostr/encoders/envelopes/closedenvelope"
19 "next.orly.dev/pkg/nostr/encoders/envelopes/eoseenvelope"
20 "next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
21 "next.orly.dev/pkg/nostr/encoders/envelopes/noticeenvelope"
22 "next.orly.dev/pkg/nostr/encoders/envelopes/okenvelope"
23 "next.orly.dev/pkg/nostr/encoders/event"
24 "next.orly.dev/pkg/nostr/encoders/filter"
25 "next.orly.dev/pkg/nostr/encoders/hex"
26 "next.orly.dev/pkg/nostr/encoders/kind"
27 "next.orly.dev/pkg/nostr/encoders/tag"
28 "next.orly.dev/pkg/nostr/interfaces/codec"
29 "next.orly.dev/pkg/nostr/interfaces/signer"
30 "next.orly.dev/pkg/nostr/utils/normalize"
31 "github.com/puzpuzpuz/xsync/v3"
32 "next.orly.dev/pkg/lol/chk"
33 "next.orly.dev/pkg/lol/log"
34 )
35
36 var subscriptionIDCounter atomic.Int64
37
38 // Client represents a connection to a Nostr relay.
39 type Client struct {
40 closeMutex sync.Mutex
41
42 URL string
43 requestHeader http.Header // e.g. for origin header
44
45 Connection *Connection
46 Subscriptions *xsync.MapOf[string, *Subscription]
47
48 ConnectionError error
49 connectionContext context.Context // will be canceled when the connection closes
50 connectionContextCancel context.CancelCauseFunc
51
52 challenge []byte // NIP-42 challenge, we only keep the last
53 notices chan []byte // NIP-01 NOTICEs
54 customHandler func(string) // nonstandard unparseable messages
55 okCallbacks *xsync.MapOf[string, func(bool, string)]
56 writeQueue chan writeRequest
57 subscriptionChannelCloseQueue chan []byte
58
59 // custom things that aren't often used
60 //
61 AssumeValid bool // this will skip verifying signatures for events received from this relay
62 }
63
64 type writeRequest struct {
65 msg []byte
66 answer chan error
67 }
68
69 // NewRelay returns a new relay. It takes a context that, when canceled, will close the relay connection.
70 func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Client {
71 ctx, cancel := context.WithCancelCause(ctx)
72 r := &Client{
73 URL: string(normalize.URL(url)),
74 connectionContext: ctx,
75 connectionContextCancel: cancel,
76 Subscriptions: xsync.NewMapOf[string, *Subscription](),
77 okCallbacks: xsync.NewMapOf[string, func(
78 bool, string,
79 )](),
80 writeQueue: make(chan writeRequest),
81 subscriptionChannelCloseQueue: make(chan []byte),
82 requestHeader: nil,
83 }
84
85 for _, opt := range opts {
86 opt.ApplyRelayOption(r)
87 }
88
89 return r
90 }
91
92 // RelayConnect returns a relay object connected to url.
93 //
94 // The given subscription is only used during the connection phase. Once successfully connected, cancelling ctx has no effect.
95 //
96 // The ongoing relay connection uses a background context. To close the connection, call r.Close().
97 // If you need fine grained long-term connection contexts, use NewRelay() instead.
98 func RelayConnect(ctx context.Context, url string, opts ...RelayOption) (
99 *Client, error,
100 ) {
101 r := NewRelay(context.Background(), url, opts...)
102 err := r.Connect(ctx)
103 return r, err
104 }
105
106 // RelayOption is the type of the argument passed when instantiating relay connections.
107 type RelayOption interface {
108 ApplyRelayOption(*Client)
109 }
110
111 var (
112 _ RelayOption = (WithCustomHandler)(nil)
113 _ RelayOption = (WithRequestHeader)(nil)
114 _ RelayOption = (WithNoticeHandler)(nil)
115 )
116
117 // WithCustomHandler must be a function that handles any relay message that couldn't be
118 // parsed as a standard envelope.
119 type WithCustomHandler func(data string)
120
121 func (ch WithCustomHandler) ApplyRelayOption(r *Client) {
122 r.customHandler = ch
123 }
124
125 // WithRequestHeader sets the HTTP request header of the websocket preflight request.
126 type WithRequestHeader http.Header
127
128 func (ch WithRequestHeader) ApplyRelayOption(r *Client) {
129 r.requestHeader = http.Header(ch)
130 }
131
132 // WithNoticeHandler must be a function that handles NOTICE messages from the relay.
133 type WithNoticeHandler func(notice []byte)
134
135 func (nh WithNoticeHandler) ApplyRelayOption(r *Client) {
136 r.notices = make(chan []byte, 8)
137 go func() {
138 for notice := range r.notices {
139 nh(notice)
140 }
141 }()
142 }
143
144 // String just returns the relay URL.
145 func (r *Client) String() string {
146 return r.URL
147 }
148
149 // Context retrieves the context that is associated with this relay connection.
150 // It will be closed when the relay is disconnected.
151 func (r *Client) Context() context.Context { return r.connectionContext }
152
153 // IsConnected returns true if the connection to this relay seems to be active.
154 func (r *Client) IsConnected() bool { return r.connectionContext.Err() == nil }
155
156 // ConnectionCause returns the cancel cause for the relay connection context.
157 func (r *Client) ConnectionCause() error { return context.Cause(r.connectionContext) }
158
159 // LastError returns the last connection error observed by the reader loop.
160 func (r *Client) LastError() error { return r.ConnectionError }
161
162 // Connect tries to establish a websocket connection to r.URL.
163 // If the context expires before the connection is complete, an error is returned.
164 // Once successfully connected, context expiration has no effect: call r.Close
165 // to close the connection.
166 //
167 // The given context here is only used during the connection phase. The long-living
168 // relay connection will be based on the context given to NewRelay().
169 func (r *Client) Connect(ctx context.Context) error {
170 return r.ConnectWithTLS(ctx, nil)
171 }
172
173 func extractSubID(jsonStr string) string {
174 // look for "EVENT" pattern
175 start := strings.Index(jsonStr, `"EVENT"`)
176 if start == -1 {
177 return ""
178 }
179
180 // move to the next quote
181 offset := strings.Index(jsonStr[start+7:], `"`)
182 if offset == -1 {
183 return ""
184 }
185
186 start += 7 + offset + 1
187
188 // find the ending quote
189 end := strings.Index(jsonStr[start:], `"`)
190
191 // get the contents
192 return jsonStr[start : start+end]
193 }
194
195 func subIdToSerial(subId string) int64 {
196 n := strings.Index(subId, ":")
197 if n < 0 || n > len(subId) {
198 return -1
199 }
200 serialId, _ := strconv.ParseInt(subId[0:n], 10, 64)
201 return serialId
202 }
203
204 // ConnectWithTLS is like Connect(), but takes a special tls.Config if you need that.
205 func (r *Client) ConnectWithTLS(
206 ctx context.Context, tlsConfig *tls.Config,
207 ) error {
208 if r.connectionContext == nil || r.Subscriptions == nil {
209 return fmt.Errorf("relay must be initialized with a call to NewRelay()")
210 }
211
212 if r.URL == "" {
213 return fmt.Errorf("invalid relay URL '%s'", r.URL)
214 }
215
216 if _, ok := ctx.Deadline(); !ok {
217 // if no timeout is set, force it to 7 seconds
218 var cancel context.CancelFunc
219 ctx, cancel = context.WithTimeoutCause(
220 ctx, 7*time.Second, errors.New("connection took too long"),
221 )
222 defer cancel()
223 }
224
225 conn, err := NewConnection(ctx, r.URL, r.requestHeader, tlsConfig)
226 if err != nil {
227 return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err)
228 }
229 r.Connection = conn
230
231 // ping every 29 seconds
232 ticker := time.NewTicker(29 * time.Second)
233
234 // queue all write operations here so we don't do mutex spaghetti
235 go func() {
236 var err error
237 for {
238 select {
239 case <-r.connectionContext.Done():
240 log.T.F(
241 "WS.Client: connection context done for %s: cause=%v lastErr=%v",
242 r.URL, context.Cause(r.connectionContext),
243 r.ConnectionError,
244 )
245 ticker.Stop()
246 r.Connection = nil
247
248 for _, sub := range r.Subscriptions.Range {
249 sub.unsub(
250 fmt.Errorf(
251 "relay connection closed: %w / %w",
252 context.Cause(r.connectionContext),
253 r.ConnectionError,
254 ),
255 )
256 }
257 return
258
259 case <-ticker.C:
260 err = r.Connection.Ping(r.connectionContext)
261 if err != nil && !strings.Contains(
262 err.Error(), "failed to wait for pong",
263 ) {
264 log.I.F(
265 "{%s} error writing ping: %v; closing websocket", r.URL,
266 err,
267 )
268 r.CloseWithReason(
269 fmt.Errorf(
270 "ping failed: %w", err,
271 ),
272 ) // this should trigger a context cancelation
273 return
274 }
275
276 case wr := <-r.writeQueue:
277 // all write requests will go through this to prevent races
278 // log.D.F("{%s} sending %v\n", r.URL, string(wr.msg))
279 log.T.F(
280 "WS.Client: outbound message to %s: %s", r.URL,
281 string(wr.msg),
282 )
283 if err = r.Connection.WriteMessage(
284 r.connectionContext, wr.msg,
285 ); err != nil {
286 wr.answer <- err
287 }
288 close(wr.answer)
289 }
290 }
291 }()
292
293 // general message reader loop
294 go func() {
295
296 for {
297 buf := new(bytes.Buffer)
298 for {
299 buf.Reset()
300 if err := conn.ReadMessage(
301 r.connectionContext, buf,
302 ); err != nil {
303 r.ConnectionError = err
304 log.T.F(
305 "WS.Client: reader loop error on %s: %v; closing connection",
306 r.URL, err,
307 )
308 r.CloseWithReason(fmt.Errorf("reader loop error: %w", err))
309 return
310 }
311 message := buf.Bytes()
312 var t string
313 if t, message, err = envelopes.Identify(message); chk.E(err) {
314 continue
315 }
316 switch t {
317 case noticeenvelope.L:
318 env := noticeenvelope.New()
319 if env, message, err = noticeenvelope.Parse(message); chk.E(err) {
320 continue
321 }
322 // see WithNoticeHandler
323 if r.notices != nil {
324 r.notices <- env.Message
325 } else {
326 log.E.F("NOTICE from %s: '%s'", r.URL, env.Message)
327 }
328 case authenvelope.L:
329 env := authenvelope.NewChallenge()
330 if env, message, err = authenvelope.ParseChallenge(message); chk.E(err) {
331 continue
332 }
333 if len(env.Challenge) == 0 {
334 continue
335 }
336 r.challenge = env.Challenge
337 case eventenvelope.L:
338 env := eventenvelope.NewResult()
339 if env, message, err = eventenvelope.ParseResult(message); chk.E(err) {
340 continue
341 }
342 if len(env.Subscription) == 0 {
343 continue
344 }
345 if sub, ok := r.Subscriptions.Load(string(env.Subscription)); !ok {
346 log.D.F(
347 "{%s} no subscription with id '%s'\n", r.URL,
348 env.Subscription,
349 )
350 continue
351 } else {
352 // check if the event matches the desired filter, ignore otherwise
353 if !sub.Filters.Match(env.Event) {
354 // log.D.F(
355 // "{%s} filter does not match: %v ~ %v\n", r.URL,
356 // sub.Filters, env.Event,
357 // )
358 continue
359 }
360 // check signature, ignore invalid, except from trusted (AssumeValid) relays
361 if !r.AssumeValid {
362 if ok, err = env.Event.Verify(); !ok {
363 log.E.F(
364 "{%s} bad signature on %s\n", r.URL,
365 env.Event,
366 )
367 continue
368 }
369 }
370 // dispatch this to the internal .events channel of the subscription
371 sub.dispatchEvent(env.Event)
372 }
373 case eoseenvelope.L:
374 env := eoseenvelope.New()
375 if env, message, err = eoseenvelope.Parse(message); chk.E(err) {
376 continue
377 }
378 if subscription, ok := r.Subscriptions.Load(string(env.Subscription)); ok {
379 subscription.dispatchEose()
380 }
381 case closedenvelope.L:
382 env := closedenvelope.New()
383 if env, message, err = closedenvelope.Parse(message); chk.E(err) {
384 continue
385 }
386 if subscription, ok := r.Subscriptions.Load(string(env.Subscription)); ok {
387 subscription.handleClosed(env.ReasonString())
388 }
389 case okenvelope.L:
390 env := okenvelope.New()
391 if env, message, err = okenvelope.Parse(message); chk.E(err) {
392 continue
393 }
394 eventIDHex := hex.Enc(env.EventID)
395 if okCallback, exist := r.okCallbacks.Load(eventIDHex); exist {
396 okCallback(env.OK, env.ReasonString())
397 }
398 }
399 }
400 }
401 }()
402
403 return nil
404 }
405
406 // Write queues an arbitrary message to be sent to the relay.
407 func (r *Client) Write(msg []byte) <-chan error {
408 ch := make(chan error)
409 select {
410 case r.writeQueue <- writeRequest{msg: msg, answer: ch}:
411 case <-r.connectionContext.Done():
412 go func() { ch <- fmt.Errorf("connection closed") }()
413 }
414 return ch
415 }
416
417 // Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an OK response.
418 func (r *Client) Publish(ctx context.Context, ev *event.E) error {
419 return r.publish(
420 ctx, hex.Enc(ev.ID), eventenvelope.NewSubmissionWith(ev),
421 )
422 }
423
424 // Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK response.
425 //
426 // You don't have to build the AUTH event yourself, this function takes a function to which the
427 // event that must be signed will be passed, so it's only necessary to sign that.
428 func (r *Client) Auth(
429 ctx context.Context, sign signer.I,
430 ) (err error) {
431 authEvent := &event.E{
432 CreatedAt: time.Now().Unix(),
433 Kind: kind.ClientAuthentication.K,
434 Tags: tag.NewS(
435 tag.NewFromAny("relay", r.URL),
436 tag.NewFromAny("challenge", r.challenge),
437 ),
438 Content: nil,
439 }
440 if err = authEvent.Sign(sign); err != nil {
441 return fmt.Errorf("error signing auth event: %w", err)
442 }
443
444 return r.publish(
445 ctx, hex.Enc(authEvent.ID), authenvelope.NewResponseWith(authEvent),
446 )
447 }
448
449 func (r *Client) publish(
450 ctx context.Context, id string, env codec.Envelope,
451 ) error {
452 var err error
453 var cancel context.CancelFunc
454
455 if _, ok := ctx.Deadline(); !ok {
456 // if no timeout is set, force it to 7 seconds
457 ctx, cancel = context.WithTimeoutCause(
458 ctx, 7*time.Second, fmt.Errorf("given up waiting for an OK"),
459 )
460 defer cancel()
461 } else {
462 // otherwise make the context cancellable so we can stop everything upon receiving an "OK"
463 ctx, cancel = context.WithCancel(ctx)
464 defer cancel()
465 }
466
467 // listen for an OK callback
468 gotOk := false
469 r.okCallbacks.Store(
470 id, func(ok bool, reason string) {
471 gotOk = true
472 if !ok {
473 err = fmt.Errorf("msg: %s", reason)
474 }
475 cancel()
476 },
477 )
478 defer r.okCallbacks.Delete(id)
479
480 // publish event
481 envb := env.Marshal(nil)
482 if err = <-r.Write(envb); err != nil {
483 return err
484 }
485
486 for {
487 select {
488 case <-ctx.Done():
489 // this will be called when we get an OK or when the context has been canceled
490 if gotOk {
491 return err
492 }
493 return ctx.Err()
494 case <-r.connectionContext.Done():
495 // this is caused when we lose connectivity
496 return err
497 }
498 }
499 }
500
501 // Subscribe sends a "REQ" command to the relay r as in NIP-01.
502 // Events are returned through the channel sub.Events.
503 // The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01).
504 //
505 // Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point.
506 // Failure to do that will result in a huge number of halted goroutines being created.
507 func (r *Client) Subscribe(
508 ctx context.Context, ff *filter.S, opts ...SubscriptionOption,
509 ) (*Subscription, error) {
510 sub := r.PrepareSubscription(ctx, ff, opts...)
511
512 if r.Connection == nil {
513 log.T.F(
514 "WS.Subscribe: not connected to %s; aborting sub id=%s", r.URL,
515 sub.GetID(),
516 )
517 return nil, fmt.Errorf("not connected to %s", r.URL)
518 }
519
520 log.T.F(
521 "WS.Subscribe: firing subscription id=%s to %s with %d filters",
522 sub.GetID(), r.URL, len(*ff),
523 )
524 if err := sub.Fire(); err != nil {
525 log.T.F(
526 "WS.Subscribe: Fire failed id=%s to %s: %v", sub.GetID(), r.URL,
527 err,
528 )
529 return nil, fmt.Errorf(
530 "couldn't subscribe to %v at %s: %w", ff, r.URL, err,
531 )
532 }
533 log.T.F("WS.Subscribe: Fire succeeded id=%s to %s", sub.GetID(), r.URL)
534
535 return sub, nil
536 }
537
538 // PrepareSubscription creates a subscription, but doesn't fire it.
539 //
540 // Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point.
541 // Failure to do that will result in a huge number of halted goroutines being created.
542 func (r *Client) PrepareSubscription(
543 ctx context.Context, ff *filter.S, opts ...SubscriptionOption,
544 ) (sub *Subscription) {
545 current := subscriptionIDCounter.Add(1)
546 ctx, cancel := context.WithCancelCause(ctx)
547 sub = &Subscription{
548 Client: r,
549 Context: ctx,
550 cancel: cancel,
551 counter: current,
552 Events: make(event.C),
553 EndOfStoredEvents: make(chan struct{}, 1),
554 ClosedReason: make(chan string, 1),
555 Filters: ff,
556 match: ff.Match,
557 }
558 label := ""
559 for _, opt := range opts {
560 switch o := opt.(type) {
561 case WithLabel:
562 label = string(o)
563 case WithCheckDuplicate:
564 sub.checkDuplicate = o
565 case WithCheckDuplicateReplaceable:
566 sub.checkDuplicateReplaceable = o
567 }
568 }
569 // subscription id computation — must copy since sub.id outlives this function
570 buf := subIdPool.Get().([]byte)[:0]
571 buf = strconv.AppendInt(buf, sub.counter, 10)
572 buf = append(buf, ':')
573 buf = append(buf, label...)
574 sub.id = make([]byte, len(buf))
575 copy(sub.id, buf)
576 subIdPool.Put(buf)
577 r.Subscriptions.Store(string(buf), sub)
578 // start handling events, eose, unsub etc:
579 go sub.start()
580 return sub
581 }
582
583 // QueryEvents subscribes to events matching the given filter and returns a channel of events.
584 //
585 // In most cases it's better to use SimplePool instead of this method.
586 func (r *Client) QueryEvents(
587 ctx context.Context, f *filter.F, opts ...SubscriptionOption,
588 ) (
589 evc event.C, err error,
590 ) {
591 var sub *Subscription
592 if sub, err = r.Subscribe(ctx, filter.NewS(f), opts...); chk.E(err) {
593 return
594 }
595 go func() {
596 for {
597 select {
598 case <-sub.ClosedReason:
599 case <-sub.EndOfStoredEvents:
600 case <-ctx.Done():
601 case <-r.Context().Done():
602 }
603 sub.unsub(errors.New("QueryEvents() ended"))
604 return
605 }
606 }()
607 evc = sub.Events
608 return
609 }
610
611 // QuerySync subscribes to events matching the given filter and returns a slice of events.
612 // This method blocks until all events are received or the context is canceled.
613 //
614 // In most cases it's better to use SimplePool instead of this method.
615 func (r *Client) QuerySync(
616 ctx context.Context, ff *filter.F, opts ...SubscriptionOption,
617 ) (
618 []*event.E, error,
619 ) {
620 if _, ok := ctx.Deadline(); !ok {
621 // if no timeout is set, force it to 7 seconds
622 var cancel context.CancelFunc
623 ctx, cancel = context.WithTimeoutCause(
624 ctx, 7*time.Second, errors.New("QuerySync() took too long"),
625 )
626 defer cancel()
627 }
628 var lim int
629 if ff.Limit != nil {
630 lim = int(*ff.Limit)
631 }
632 events := make([]*event.E, 0, max(lim, 250))
633 ch, err := r.QueryEvents(ctx, ff, opts...)
634 if err != nil {
635 return nil, err
636 }
637
638 for evt := range ch {
639 events = append(events, evt)
640 }
641
642 return events, nil
643 }
644
645 // Close closes the relay connection.
646 func (r *Client) Close() error { return r.CloseWithReason(errors.New("Close() called")) }
647
648 // CloseWithReason closes the relay connection with a specific reason that will be stored as the context cancel cause.
649 func (r *Client) CloseWithReason(reason error) error { return r.close(reason) }
650
651 func (r *Client) close(reason error) error {
652 r.closeMutex.Lock()
653 defer r.closeMutex.Unlock()
654
655 if r.connectionContextCancel == nil {
656 return fmt.Errorf("relay already closed")
657 }
658 log.T.F(
659 "WS.Client: closing connection to %s: reason=%v lastErr=%v", r.URL,
660 reason, r.ConnectionError,
661 )
662 r.connectionContextCancel(reason)
663 r.connectionContextCancel = nil
664
665 if r.Connection == nil {
666 return fmt.Errorf("relay not connected")
667 }
668
669 err := r.Connection.Close()
670 if err != nil {
671 return err
672 }
673
674 return nil
675 }
676
677 var subIdPool = sync.Pool{
678 New: func() any { return make([]byte, 0, 15) },
679 }
680