client.go raw

   1  package ws
   2  
   3  import (
   4  	"bytes"
   5  	"context"
   6  	"crypto/tls"
   7  	"errors"
   8  	"fmt"
   9  	"net/http"
  10  	"strconv"
  11  	"strings"
  12  	"sync"
  13  	"sync/atomic"
  14  	"time"
  15  
  16  	"next.orly.dev/pkg/nostr/encoders/envelopes"
  17  	"next.orly.dev/pkg/nostr/encoders/envelopes/authenvelope"
  18  	"next.orly.dev/pkg/nostr/encoders/envelopes/closedenvelope"
  19  	"next.orly.dev/pkg/nostr/encoders/envelopes/eoseenvelope"
  20  	"next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
  21  	"next.orly.dev/pkg/nostr/encoders/envelopes/noticeenvelope"
  22  	"next.orly.dev/pkg/nostr/encoders/envelopes/okenvelope"
  23  	"next.orly.dev/pkg/nostr/encoders/event"
  24  	"next.orly.dev/pkg/nostr/encoders/filter"
  25  	"next.orly.dev/pkg/nostr/encoders/hex"
  26  	"next.orly.dev/pkg/nostr/encoders/kind"
  27  	"next.orly.dev/pkg/nostr/encoders/tag"
  28  	"next.orly.dev/pkg/nostr/interfaces/codec"
  29  	"next.orly.dev/pkg/nostr/interfaces/signer"
  30  	"next.orly.dev/pkg/nostr/utils/normalize"
  31  	"github.com/puzpuzpuz/xsync/v3"
  32  	"next.orly.dev/pkg/lol/chk"
  33  	"next.orly.dev/pkg/lol/log"
  34  )
  35  
  36  var subscriptionIDCounter atomic.Int64
  37  
  38  // Client represents a connection to a Nostr relay.
  39  type Client struct {
  40  	closeMutex sync.Mutex
  41  
  42  	URL           string
  43  	requestHeader http.Header // e.g. for origin header
  44  
  45  	Connection    *Connection
  46  	Subscriptions *xsync.MapOf[string, *Subscription]
  47  
  48  	ConnectionError         error
  49  	connectionContext       context.Context // will be canceled when the connection closes
  50  	connectionContextCancel context.CancelCauseFunc
  51  
  52  	challenge                     []byte       // NIP-42 challenge, we only keep the last
  53  	notices                       chan []byte  // NIP-01 NOTICEs
  54  	customHandler                 func(string) // nonstandard unparseable messages
  55  	okCallbacks                   *xsync.MapOf[string, func(bool, string)]
  56  	writeQueue                    chan writeRequest
  57  	subscriptionChannelCloseQueue chan []byte
  58  
  59  	// custom things that aren't often used
  60  	//
  61  	AssumeValid bool // this will skip verifying signatures for events received from this relay
  62  }
  63  
  64  type writeRequest struct {
  65  	msg    []byte
  66  	answer chan error
  67  }
  68  
  69  // NewRelay returns a new relay. It takes a context that, when canceled, will close the relay connection.
  70  func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Client {
  71  	ctx, cancel := context.WithCancelCause(ctx)
  72  	r := &Client{
  73  		URL:                     string(normalize.URL(url)),
  74  		connectionContext:       ctx,
  75  		connectionContextCancel: cancel,
  76  		Subscriptions:           xsync.NewMapOf[string, *Subscription](),
  77  		okCallbacks: xsync.NewMapOf[string, func(
  78  			bool, string,
  79  		)](),
  80  		writeQueue:                    make(chan writeRequest),
  81  		subscriptionChannelCloseQueue: make(chan []byte),
  82  		requestHeader:                 nil,
  83  	}
  84  
  85  	for _, opt := range opts {
  86  		opt.ApplyRelayOption(r)
  87  	}
  88  
  89  	return r
  90  }
  91  
  92  // RelayConnect returns a relay object connected to url.
  93  //
  94  // The given subscription is only used during the connection phase. Once successfully connected, cancelling ctx has no effect.
  95  //
  96  // The ongoing relay connection uses a background context. To close the connection, call r.Close().
  97  // If you need fine grained long-term connection contexts, use NewRelay() instead.
  98  func RelayConnect(ctx context.Context, url string, opts ...RelayOption) (
  99  	*Client, error,
 100  ) {
 101  	r := NewRelay(context.Background(), url, opts...)
 102  	err := r.Connect(ctx)
 103  	return r, err
 104  }
 105  
 106  // RelayOption is the type of the argument passed when instantiating relay connections.
 107  type RelayOption interface {
 108  	ApplyRelayOption(*Client)
 109  }
 110  
 111  var (
 112  	_ RelayOption = (WithCustomHandler)(nil)
 113  	_ RelayOption = (WithRequestHeader)(nil)
 114  	_ RelayOption = (WithNoticeHandler)(nil)
 115  )
 116  
 117  // WithCustomHandler must be a function that handles any relay message that couldn't be
 118  // parsed as a standard envelope.
 119  type WithCustomHandler func(data string)
 120  
 121  func (ch WithCustomHandler) ApplyRelayOption(r *Client) {
 122  	r.customHandler = ch
 123  }
 124  
 125  // WithRequestHeader sets the HTTP request header of the websocket preflight request.
 126  type WithRequestHeader http.Header
 127  
 128  func (ch WithRequestHeader) ApplyRelayOption(r *Client) {
 129  	r.requestHeader = http.Header(ch)
 130  }
 131  
 132  // WithNoticeHandler must be a function that handles NOTICE messages from the relay.
 133  type WithNoticeHandler func(notice []byte)
 134  
 135  func (nh WithNoticeHandler) ApplyRelayOption(r *Client) {
 136  	r.notices = make(chan []byte, 8)
 137  	go func() {
 138  		for notice := range r.notices {
 139  			nh(notice)
 140  		}
 141  	}()
 142  }
 143  
 144  // String just returns the relay URL.
 145  func (r *Client) String() string {
 146  	return r.URL
 147  }
 148  
 149  // Context retrieves the context that is associated with this relay connection.
 150  // It will be closed when the relay is disconnected.
 151  func (r *Client) Context() context.Context { return r.connectionContext }
 152  
 153  // IsConnected returns true if the connection to this relay seems to be active.
 154  func (r *Client) IsConnected() bool { return r.connectionContext.Err() == nil }
 155  
 156  // ConnectionCause returns the cancel cause for the relay connection context.
 157  func (r *Client) ConnectionCause() error { return context.Cause(r.connectionContext) }
 158  
 159  // LastError returns the last connection error observed by the reader loop.
 160  func (r *Client) LastError() error { return r.ConnectionError }
 161  
 162  // Connect tries to establish a websocket connection to r.URL.
 163  // If the context expires before the connection is complete, an error is returned.
 164  // Once successfully connected, context expiration has no effect: call r.Close
 165  // to close the connection.
 166  //
 167  // The given context here is only used during the connection phase. The long-living
 168  // relay connection will be based on the context given to NewRelay().
 169  func (r *Client) Connect(ctx context.Context) error {
 170  	return r.ConnectWithTLS(ctx, nil)
 171  }
 172  
 173  func extractSubID(jsonStr string) string {
 174  	// look for "EVENT" pattern
 175  	start := strings.Index(jsonStr, `"EVENT"`)
 176  	if start == -1 {
 177  		return ""
 178  	}
 179  
 180  	// move to the next quote
 181  	offset := strings.Index(jsonStr[start+7:], `"`)
 182  	if offset == -1 {
 183  		return ""
 184  	}
 185  
 186  	start += 7 + offset + 1
 187  
 188  	// find the ending quote
 189  	end := strings.Index(jsonStr[start:], `"`)
 190  
 191  	// get the contents
 192  	return jsonStr[start : start+end]
 193  }
 194  
 195  func subIdToSerial(subId string) int64 {
 196  	n := strings.Index(subId, ":")
 197  	if n < 0 || n > len(subId) {
 198  		return -1
 199  	}
 200  	serialId, _ := strconv.ParseInt(subId[0:n], 10, 64)
 201  	return serialId
 202  }
 203  
 204  // ConnectWithTLS is like Connect(), but takes a special tls.Config if you need that.
 205  func (r *Client) ConnectWithTLS(
 206  	ctx context.Context, tlsConfig *tls.Config,
 207  ) error {
 208  	if r.connectionContext == nil || r.Subscriptions == nil {
 209  		return fmt.Errorf("relay must be initialized with a call to NewRelay()")
 210  	}
 211  
 212  	if r.URL == "" {
 213  		return fmt.Errorf("invalid relay URL '%s'", r.URL)
 214  	}
 215  
 216  	if _, ok := ctx.Deadline(); !ok {
 217  		// if no timeout is set, force it to 7 seconds
 218  		var cancel context.CancelFunc
 219  		ctx, cancel = context.WithTimeoutCause(
 220  			ctx, 7*time.Second, errors.New("connection took too long"),
 221  		)
 222  		defer cancel()
 223  	}
 224  
 225  	conn, err := NewConnection(ctx, r.URL, r.requestHeader, tlsConfig)
 226  	if err != nil {
 227  		return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err)
 228  	}
 229  	r.Connection = conn
 230  
 231  	// ping every 29 seconds
 232  	ticker := time.NewTicker(29 * time.Second)
 233  
 234  	// queue all write operations here so we don't do mutex spaghetti
 235  	go func() {
 236  		var err error
 237  		for {
 238  			select {
 239  			case <-r.connectionContext.Done():
 240  				log.T.F(
 241  					"WS.Client: connection context done for %s: cause=%v lastErr=%v",
 242  					r.URL, context.Cause(r.connectionContext),
 243  					r.ConnectionError,
 244  				)
 245  				ticker.Stop()
 246  				r.Connection = nil
 247  
 248  				for _, sub := range r.Subscriptions.Range {
 249  					sub.unsub(
 250  						fmt.Errorf(
 251  							"relay connection closed: %w / %w",
 252  							context.Cause(r.connectionContext),
 253  							r.ConnectionError,
 254  						),
 255  					)
 256  				}
 257  				return
 258  
 259  			case <-ticker.C:
 260  				err = r.Connection.Ping(r.connectionContext)
 261  				if err != nil && !strings.Contains(
 262  					err.Error(), "failed to wait for pong",
 263  				) {
 264  					log.I.F(
 265  						"{%s} error writing ping: %v; closing websocket", r.URL,
 266  						err,
 267  					)
 268  					r.CloseWithReason(
 269  						fmt.Errorf(
 270  							"ping failed: %w", err,
 271  						),
 272  					) // this should trigger a context cancelation
 273  					return
 274  				}
 275  
 276  			case wr := <-r.writeQueue:
 277  				// all write requests will go through this to prevent races
 278  				// log.D.F("{%s} sending %v\n", r.URL, string(wr.msg))
 279  				log.T.F(
 280  					"WS.Client: outbound message to %s: %s", r.URL,
 281  					string(wr.msg),
 282  				)
 283  				if err = r.Connection.WriteMessage(
 284  					r.connectionContext, wr.msg,
 285  				); err != nil {
 286  					wr.answer <- err
 287  				}
 288  				close(wr.answer)
 289  			}
 290  		}
 291  	}()
 292  
 293  	// general message reader loop
 294  	go func() {
 295  
 296  		for {
 297  			buf := new(bytes.Buffer)
 298  			for {
 299  				buf.Reset()
 300  				if err := conn.ReadMessage(
 301  					r.connectionContext, buf,
 302  				); err != nil {
 303  					r.ConnectionError = err
 304  					log.T.F(
 305  						"WS.Client: reader loop error on %s: %v; closing connection",
 306  						r.URL, err,
 307  					)
 308  					r.CloseWithReason(fmt.Errorf("reader loop error: %w", err))
 309  					return
 310  				}
 311  				message := buf.Bytes()
 312  				var t string
 313  				if t, message, err = envelopes.Identify(message); chk.E(err) {
 314  					continue
 315  				}
 316  				switch t {
 317  				case noticeenvelope.L:
 318  					env := noticeenvelope.New()
 319  					if env, message, err = noticeenvelope.Parse(message); chk.E(err) {
 320  						continue
 321  					}
 322  					// see WithNoticeHandler
 323  					if r.notices != nil {
 324  						r.notices <- env.Message
 325  					} else {
 326  						log.E.F("NOTICE from %s: '%s'", r.URL, env.Message)
 327  					}
 328  				case authenvelope.L:
 329  					env := authenvelope.NewChallenge()
 330  					if env, message, err = authenvelope.ParseChallenge(message); chk.E(err) {
 331  						continue
 332  					}
 333  					if len(env.Challenge) == 0 {
 334  						continue
 335  					}
 336  					r.challenge = env.Challenge
 337  				case eventenvelope.L:
 338  					env := eventenvelope.NewResult()
 339  					if env, message, err = eventenvelope.ParseResult(message); chk.E(err) {
 340  						continue
 341  					}
 342  					if len(env.Subscription) == 0 {
 343  						continue
 344  					}
 345  					if sub, ok := r.Subscriptions.Load(string(env.Subscription)); !ok {
 346  						log.D.F(
 347  							"{%s} no subscription with id '%s'\n", r.URL,
 348  							env.Subscription,
 349  						)
 350  						continue
 351  					} else {
 352  						// check if the event matches the desired filter, ignore otherwise
 353  						if !sub.Filters.Match(env.Event) {
 354  							// log.D.F(
 355  							// 	"{%s} filter does not match: %v ~ %v\n", r.URL,
 356  							// 	sub.Filters, env.Event,
 357  							// )
 358  							continue
 359  						}
 360  						// check signature, ignore invalid, except from trusted (AssumeValid) relays
 361  						if !r.AssumeValid {
 362  							if ok, err = env.Event.Verify(); !ok {
 363  								log.E.F(
 364  									"{%s} bad signature on %s\n", r.URL,
 365  									env.Event,
 366  								)
 367  								continue
 368  							}
 369  						}
 370  						// dispatch this to the internal .events channel of the subscription
 371  						sub.dispatchEvent(env.Event)
 372  					}
 373  				case eoseenvelope.L:
 374  					env := eoseenvelope.New()
 375  					if env, message, err = eoseenvelope.Parse(message); chk.E(err) {
 376  						continue
 377  					}
 378  					if subscription, ok := r.Subscriptions.Load(string(env.Subscription)); ok {
 379  						subscription.dispatchEose()
 380  					}
 381  				case closedenvelope.L:
 382  					env := closedenvelope.New()
 383  					if env, message, err = closedenvelope.Parse(message); chk.E(err) {
 384  						continue
 385  					}
 386  					if subscription, ok := r.Subscriptions.Load(string(env.Subscription)); ok {
 387  						subscription.handleClosed(env.ReasonString())
 388  					}
 389  				case okenvelope.L:
 390  					env := okenvelope.New()
 391  					if env, message, err = okenvelope.Parse(message); chk.E(err) {
 392  						continue
 393  					}
 394  					eventIDHex := hex.Enc(env.EventID)
 395  					if okCallback, exist := r.okCallbacks.Load(eventIDHex); exist {
 396  						okCallback(env.OK, env.ReasonString())
 397  					}
 398  				}
 399  			}
 400  		}
 401  	}()
 402  
 403  	return nil
 404  }
 405  
 406  // Write queues an arbitrary message to be sent to the relay.
 407  func (r *Client) Write(msg []byte) <-chan error {
 408  	ch := make(chan error)
 409  	select {
 410  	case r.writeQueue <- writeRequest{msg: msg, answer: ch}:
 411  	case <-r.connectionContext.Done():
 412  		go func() { ch <- fmt.Errorf("connection closed") }()
 413  	}
 414  	return ch
 415  }
 416  
 417  // Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an OK response.
 418  func (r *Client) Publish(ctx context.Context, ev *event.E) error {
 419  	return r.publish(
 420  		ctx, hex.Enc(ev.ID), eventenvelope.NewSubmissionWith(ev),
 421  	)
 422  }
 423  
 424  // Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK response.
 425  //
 426  // You don't have to build the AUTH event yourself, this function takes a function to which the
 427  // event that must be signed will be passed, so it's only necessary to sign that.
 428  func (r *Client) Auth(
 429  	ctx context.Context, sign signer.I,
 430  ) (err error) {
 431  	authEvent := &event.E{
 432  		CreatedAt: time.Now().Unix(),
 433  		Kind:      kind.ClientAuthentication.K,
 434  		Tags: tag.NewS(
 435  			tag.NewFromAny("relay", r.URL),
 436  			tag.NewFromAny("challenge", r.challenge),
 437  		),
 438  		Content: nil,
 439  	}
 440  	if err = authEvent.Sign(sign); err != nil {
 441  		return fmt.Errorf("error signing auth event: %w", err)
 442  	}
 443  
 444  	return r.publish(
 445  		ctx, hex.Enc(authEvent.ID), authenvelope.NewResponseWith(authEvent),
 446  	)
 447  }
 448  
 449  func (r *Client) publish(
 450  	ctx context.Context, id string, env codec.Envelope,
 451  ) error {
 452  	var err error
 453  	var cancel context.CancelFunc
 454  
 455  	if _, ok := ctx.Deadline(); !ok {
 456  		// if no timeout is set, force it to 7 seconds
 457  		ctx, cancel = context.WithTimeoutCause(
 458  			ctx, 7*time.Second, fmt.Errorf("given up waiting for an OK"),
 459  		)
 460  		defer cancel()
 461  	} else {
 462  		// otherwise make the context cancellable so we can stop everything upon receiving an "OK"
 463  		ctx, cancel = context.WithCancel(ctx)
 464  		defer cancel()
 465  	}
 466  
 467  	// listen for an OK callback
 468  	gotOk := false
 469  	r.okCallbacks.Store(
 470  		id, func(ok bool, reason string) {
 471  			gotOk = true
 472  			if !ok {
 473  				err = fmt.Errorf("msg: %s", reason)
 474  			}
 475  			cancel()
 476  		},
 477  	)
 478  	defer r.okCallbacks.Delete(id)
 479  
 480  	// publish event
 481  	envb := env.Marshal(nil)
 482  	if err = <-r.Write(envb); err != nil {
 483  		return err
 484  	}
 485  
 486  	for {
 487  		select {
 488  		case <-ctx.Done():
 489  			// this will be called when we get an OK or when the context has been canceled
 490  			if gotOk {
 491  				return err
 492  			}
 493  			return ctx.Err()
 494  		case <-r.connectionContext.Done():
 495  			// this is caused when we lose connectivity
 496  			return err
 497  		}
 498  	}
 499  }
 500  
 501  // Subscribe sends a "REQ" command to the relay r as in NIP-01.
 502  // Events are returned through the channel sub.Events.
 503  // The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01).
 504  //
 505  // Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point.
 506  // Failure to do that will result in a huge number of halted goroutines being created.
 507  func (r *Client) Subscribe(
 508  	ctx context.Context, ff *filter.S, opts ...SubscriptionOption,
 509  ) (*Subscription, error) {
 510  	sub := r.PrepareSubscription(ctx, ff, opts...)
 511  
 512  	if r.Connection == nil {
 513  		log.T.F(
 514  			"WS.Subscribe: not connected to %s; aborting sub id=%s", r.URL,
 515  			sub.GetID(),
 516  		)
 517  		return nil, fmt.Errorf("not connected to %s", r.URL)
 518  	}
 519  
 520  	log.T.F(
 521  		"WS.Subscribe: firing subscription id=%s to %s with %d filters",
 522  		sub.GetID(), r.URL, len(*ff),
 523  	)
 524  	if err := sub.Fire(); err != nil {
 525  		log.T.F(
 526  			"WS.Subscribe: Fire failed id=%s to %s: %v", sub.GetID(), r.URL,
 527  			err,
 528  		)
 529  		return nil, fmt.Errorf(
 530  			"couldn't subscribe to %v at %s: %w", ff, r.URL, err,
 531  		)
 532  	}
 533  	log.T.F("WS.Subscribe: Fire succeeded id=%s to %s", sub.GetID(), r.URL)
 534  
 535  	return sub, nil
 536  }
 537  
 538  // PrepareSubscription creates a subscription, but doesn't fire it.
 539  //
 540  // Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point.
 541  // Failure to do that will result in a huge number of halted goroutines being created.
 542  func (r *Client) PrepareSubscription(
 543  	ctx context.Context, ff *filter.S, opts ...SubscriptionOption,
 544  ) (sub *Subscription) {
 545  	current := subscriptionIDCounter.Add(1)
 546  	ctx, cancel := context.WithCancelCause(ctx)
 547  	sub = &Subscription{
 548  		Client:            r,
 549  		Context:           ctx,
 550  		cancel:            cancel,
 551  		counter:           current,
 552  		Events:            make(event.C),
 553  		EndOfStoredEvents: make(chan struct{}, 1),
 554  		ClosedReason:      make(chan string, 1),
 555  		Filters:           ff,
 556  		match:             ff.Match,
 557  	}
 558  	label := ""
 559  	for _, opt := range opts {
 560  		switch o := opt.(type) {
 561  		case WithLabel:
 562  			label = string(o)
 563  		case WithCheckDuplicate:
 564  			sub.checkDuplicate = o
 565  		case WithCheckDuplicateReplaceable:
 566  			sub.checkDuplicateReplaceable = o
 567  		}
 568  	}
 569  	// subscription id computation — must copy since sub.id outlives this function
 570  	buf := subIdPool.Get().([]byte)[:0]
 571  	buf = strconv.AppendInt(buf, sub.counter, 10)
 572  	buf = append(buf, ':')
 573  	buf = append(buf, label...)
 574  	sub.id = make([]byte, len(buf))
 575  	copy(sub.id, buf)
 576  	subIdPool.Put(buf)
 577  	r.Subscriptions.Store(string(buf), sub)
 578  	// start handling events, eose, unsub etc:
 579  	go sub.start()
 580  	return sub
 581  }
 582  
 583  // QueryEvents subscribes to events matching the given filter and returns a channel of events.
 584  //
 585  // In most cases it's better to use SimplePool instead of this method.
 586  func (r *Client) QueryEvents(
 587  	ctx context.Context, f *filter.F, opts ...SubscriptionOption,
 588  ) (
 589  	evc event.C, err error,
 590  ) {
 591  	var sub *Subscription
 592  	if sub, err = r.Subscribe(ctx, filter.NewS(f), opts...); chk.E(err) {
 593  		return
 594  	}
 595  	go func() {
 596  		for {
 597  			select {
 598  			case <-sub.ClosedReason:
 599  			case <-sub.EndOfStoredEvents:
 600  			case <-ctx.Done():
 601  			case <-r.Context().Done():
 602  			}
 603  			sub.unsub(errors.New("QueryEvents() ended"))
 604  			return
 605  		}
 606  	}()
 607  	evc = sub.Events
 608  	return
 609  }
 610  
 611  // QuerySync subscribes to events matching the given filter and returns a slice of events.
 612  // This method blocks until all events are received or the context is canceled.
 613  //
 614  // In most cases it's better to use SimplePool instead of this method.
 615  func (r *Client) QuerySync(
 616  	ctx context.Context, ff *filter.F, opts ...SubscriptionOption,
 617  ) (
 618  	[]*event.E, error,
 619  ) {
 620  	if _, ok := ctx.Deadline(); !ok {
 621  		// if no timeout is set, force it to 7 seconds
 622  		var cancel context.CancelFunc
 623  		ctx, cancel = context.WithTimeoutCause(
 624  			ctx, 7*time.Second, errors.New("QuerySync() took too long"),
 625  		)
 626  		defer cancel()
 627  	}
 628  	var lim int
 629  	if ff.Limit != nil {
 630  		lim = int(*ff.Limit)
 631  	}
 632  	events := make([]*event.E, 0, max(lim, 250))
 633  	ch, err := r.QueryEvents(ctx, ff, opts...)
 634  	if err != nil {
 635  		return nil, err
 636  	}
 637  
 638  	for evt := range ch {
 639  		events = append(events, evt)
 640  	}
 641  
 642  	return events, nil
 643  }
 644  
 645  // Close closes the relay connection.
 646  func (r *Client) Close() error { return r.CloseWithReason(errors.New("Close() called")) }
 647  
 648  // CloseWithReason closes the relay connection with a specific reason that will be stored as the context cancel cause.
 649  func (r *Client) CloseWithReason(reason error) error { return r.close(reason) }
 650  
 651  func (r *Client) close(reason error) error {
 652  	r.closeMutex.Lock()
 653  	defer r.closeMutex.Unlock()
 654  
 655  	if r.connectionContextCancel == nil {
 656  		return fmt.Errorf("relay already closed")
 657  	}
 658  	log.T.F(
 659  		"WS.Client: closing connection to %s: reason=%v lastErr=%v", r.URL,
 660  		reason, r.ConnectionError,
 661  	)
 662  	r.connectionContextCancel(reason)
 663  	r.connectionContextCancel = nil
 664  
 665  	if r.Connection == nil {
 666  		return fmt.Errorf("relay not connected")
 667  	}
 668  
 669  	err := r.Connection.Close()
 670  	if err != nil {
 671  		return err
 672  	}
 673  
 674  	return nil
 675  }
 676  
 677  var subIdPool = sync.Pool{
 678  	New: func() any { return make([]byte, 0, 15) },
 679  }
 680