pool.go raw

   1  package nostr
   2  
   3  import (
   4  	"context"
   5  	"errors"
   6  	"fmt"
   7  	"math"
   8  	"net/http"
   9  	"slices"
  10  	"strings"
  11  	"sync"
  12  	"sync/atomic"
  13  	"time"
  14  
  15  	"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
  16  	"github.com/puzpuzpuz/xsync/v3"
  17  )
  18  
  19  const (
  20  	seenAlreadyDropTick = time.Minute
  21  )
  22  
  23  // SimplePool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated.
  24  type SimplePool struct {
  25  	Relays  *xsync.MapOf[string, *Relay]
  26  	Context context.Context
  27  
  28  	authHandler func(context.Context, RelayEvent) error
  29  	cancel      context.CancelCauseFunc
  30  
  31  	eventMiddleware     func(RelayEvent)
  32  	duplicateMiddleware func(relay string, id string)
  33  	queryMiddleware     func(relay string, pubkey string, kind int)
  34  
  35  	// custom things not often used
  36  	penaltyBoxMu sync.Mutex
  37  	penaltyBox   map[string][2]float64
  38  	relayOptions []RelayOption
  39  }
  40  
  41  // DirectedFilter combines a Filter with a specific relay URL.
  42  type DirectedFilter struct {
  43  	Filter
  44  	Relay string
  45  }
  46  
  47  // RelayEvent represents an event received from a specific relay.
  48  type RelayEvent struct {
  49  	*Event
  50  	Relay *Relay
  51  }
  52  
  53  func (ie RelayEvent) String() string { return fmt.Sprintf("[%s] >> %s", ie.Relay.URL, ie.Event) }
  54  
  55  // PoolOption is an interface for options that can be applied to a SimplePool.
  56  type PoolOption interface {
  57  	ApplyPoolOption(*SimplePool)
  58  }
  59  
  60  // NewSimplePool creates a new SimplePool with the given context and options.
  61  func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool {
  62  	ctx, cancel := context.WithCancelCause(ctx)
  63  
  64  	pool := &SimplePool{
  65  		Relays: xsync.NewMapOf[string, *Relay](),
  66  
  67  		Context: ctx,
  68  		cancel:  cancel,
  69  	}
  70  
  71  	for _, opt := range opts {
  72  		opt.ApplyPoolOption(pool)
  73  	}
  74  
  75  	return pool
  76  }
  77  
  78  // WithRelayOptions sets options that will be used on every relay instance created by this pool.
  79  func WithRelayOptions(ropts ...RelayOption) withRelayOptionsOpt {
  80  	return ropts
  81  }
  82  
  83  type withRelayOptionsOpt []RelayOption
  84  
  85  func (h withRelayOptionsOpt) ApplyPoolOption(pool *SimplePool) {
  86  	pool.relayOptions = h
  87  }
  88  
  89  // WithAuthHandler must be a function that signs the auth event when called.
  90  // it will be called whenever any relay in the pool returns a `CLOSED` message
  91  // with the "auth-required:" prefix, only once for each relay
  92  type WithAuthHandler func(ctx context.Context, authEvent RelayEvent) error
  93  
  94  func (h WithAuthHandler) ApplyPoolOption(pool *SimplePool) {
  95  	pool.authHandler = h
  96  }
  97  
  98  // WithPenaltyBox just sets the penalty box mechanism so relays that fail to connect
  99  // or that disconnect will be ignored for a while and we won't attempt to connect again.
 100  func WithPenaltyBox() withPenaltyBoxOpt { return withPenaltyBoxOpt{} }
 101  
 102  type withPenaltyBoxOpt struct{}
 103  
 104  func (h withPenaltyBoxOpt) ApplyPoolOption(pool *SimplePool) {
 105  	pool.penaltyBox = make(map[string][2]float64)
 106  	go func() {
 107  		sleep := 30.0
 108  		for {
 109  			time.Sleep(time.Duration(sleep) * time.Second)
 110  
 111  			pool.penaltyBoxMu.Lock()
 112  			nextSleep := 300.0
 113  			for url, v := range pool.penaltyBox {
 114  				remainingSeconds := v[1]
 115  				remainingSeconds -= sleep
 116  				if remainingSeconds <= 0 {
 117  					pool.penaltyBox[url] = [2]float64{v[0], 0}
 118  					continue
 119  				} else {
 120  					pool.penaltyBox[url] = [2]float64{v[0], remainingSeconds}
 121  				}
 122  
 123  				if remainingSeconds < nextSleep {
 124  					nextSleep = remainingSeconds
 125  				}
 126  			}
 127  
 128  			sleep = nextSleep
 129  			pool.penaltyBoxMu.Unlock()
 130  		}
 131  	}()
 132  }
 133  
 134  // WithEventMiddleware is a function that will be called with all events received.
 135  type WithEventMiddleware func(RelayEvent)
 136  
 137  func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
 138  	pool.eventMiddleware = h
 139  }
 140  
 141  // WithDuplicateMiddleware is a function that will be called with all duplicate ids received.
 142  type WithDuplicateMiddleware func(relay string, id string)
 143  
 144  func (h WithDuplicateMiddleware) ApplyPoolOption(pool *SimplePool) {
 145  	pool.duplicateMiddleware = h
 146  }
 147  
 148  // WithAuthorKindQueryMiddleware is a function that will be called with every combination of relay+pubkey+kind queried
 149  // in a .SubMany*() call -- when applicable (i.e. when the query contains a pubkey and a kind).
 150  type WithAuthorKindQueryMiddleware func(relay string, pubkey string, kind int)
 151  
 152  func (h WithAuthorKindQueryMiddleware) ApplyPoolOption(pool *SimplePool) {
 153  	pool.queryMiddleware = h
 154  }
 155  
 156  var (
 157  	_ PoolOption = (WithAuthHandler)(nil)
 158  	_ PoolOption = (WithEventMiddleware)(nil)
 159  	_ PoolOption = WithPenaltyBox()
 160  	_ PoolOption = WithRelayOptions(WithRequestHeader(http.Header{}))
 161  )
 162  
 163  // EnsureRelay ensures that a relay connection exists and is active.
 164  // If the relay is not connected, it attempts to connect.
 165  func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
 166  	nm := NormalizeURL(url)
 167  	defer namedLock(nm)()
 168  
 169  	relay, ok := pool.Relays.Load(nm)
 170  	if ok && relay == nil {
 171  		if pool.penaltyBox != nil {
 172  			pool.penaltyBoxMu.Lock()
 173  			defer pool.penaltyBoxMu.Unlock()
 174  			v, _ := pool.penaltyBox[nm]
 175  			if v[1] > 0 {
 176  				return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
 177  			}
 178  		}
 179  	} else if ok && relay.IsConnected() {
 180  		// already connected, unlock and return
 181  		return relay, nil
 182  	}
 183  
 184  	// try to connect
 185  	// we use this ctx here so when the pool dies everything dies
 186  	ctx, cancel := context.WithTimeoutCause(
 187  		pool.Context,
 188  		time.Second*15,
 189  		errors.New("connecting to the relay took too long"),
 190  	)
 191  	defer cancel()
 192  
 193  	relay = NewRelay(context.Background(), url, pool.relayOptions...)
 194  	if err := relay.Connect(ctx); err != nil {
 195  		if pool.penaltyBox != nil {
 196  			// putting relay in penalty box
 197  			pool.penaltyBoxMu.Lock()
 198  			defer pool.penaltyBoxMu.Unlock()
 199  			v, _ := pool.penaltyBox[nm]
 200  			pool.penaltyBox[nm] = [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)}
 201  		}
 202  		return nil, fmt.Errorf("failed to connect: %w", err)
 203  	}
 204  
 205  	pool.Relays.Store(nm, relay)
 206  	return relay, nil
 207  }
 208  
 209  // PublishResult represents the result of publishing an event to a relay.
 210  type PublishResult struct {
 211  	Error    error
 212  	RelayURL string
 213  	Relay    *Relay
 214  }
 215  
 216  // PublishMany publishes an event to multiple relays and returns a channel of results emitted as they're received.
 217  func (pool *SimplePool) PublishMany(ctx context.Context, urls []string, evt Event) chan PublishResult {
 218  	ch := make(chan PublishResult, len(urls))
 219  
 220  	wg := sync.WaitGroup{}
 221  	wg.Add(len(urls))
 222  	go func() {
 223  		for _, url := range urls {
 224  			go func() {
 225  				defer wg.Done()
 226  
 227  				relay, err := pool.EnsureRelay(url)
 228  				if err != nil {
 229  					ch <- PublishResult{err, url, nil}
 230  					return
 231  				}
 232  
 233  				if err := relay.Publish(ctx, evt); err == nil {
 234  					// success with no auth required
 235  					ch <- PublishResult{nil, url, relay}
 236  				} else if strings.HasPrefix(err.Error(), "msg: auth-required:") && pool.authHandler != nil {
 237  					// try to authenticate if we can
 238  					if authErr := relay.Auth(ctx, func(event *Event) error {
 239  						return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
 240  					}); authErr == nil {
 241  						if err := relay.Publish(ctx, evt); err == nil {
 242  							// success after auth
 243  							ch <- PublishResult{nil, url, relay}
 244  						} else {
 245  							// failure after auth
 246  							ch <- PublishResult{err, url, relay}
 247  						}
 248  					} else {
 249  						// failure to auth
 250  						ch <- PublishResult{fmt.Errorf("failed to auth: %w", authErr), url, relay}
 251  					}
 252  				} else {
 253  					// direct failure
 254  					ch <- PublishResult{err, url, relay}
 255  				}
 256  			}()
 257  		}
 258  
 259  		wg.Wait()
 260  		close(ch)
 261  	}()
 262  
 263  	return ch
 264  }
 265  
 266  // SubscribeMany opens a subscription with the given filter to multiple relays
 267  // the subscriptions ends when the context is canceled or when all relays return a CLOSED.
 268  func (pool *SimplePool) SubscribeMany(
 269  	ctx context.Context,
 270  	urls []string,
 271  	filter Filter,
 272  	opts ...SubscriptionOption,
 273  ) chan RelayEvent {
 274  	return pool.subMany(ctx, urls, Filters{filter}, nil, opts...)
 275  }
 276  
 277  // FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays
 278  // return an EOSE message.
 279  func (pool *SimplePool) FetchMany(
 280  	ctx context.Context,
 281  	urls []string,
 282  	filter Filter,
 283  	opts ...SubscriptionOption,
 284  ) chan RelayEvent {
 285  	return pool.SubManyEose(ctx, urls, Filters{filter}, opts...)
 286  }
 287  
 288  // Deprecated: SubMany is deprecated: use SubscribeMany instead.
 289  func (pool *SimplePool) SubMany(
 290  	ctx context.Context,
 291  	urls []string,
 292  	filters Filters,
 293  	opts ...SubscriptionOption,
 294  ) chan RelayEvent {
 295  	return pool.subMany(ctx, urls, filters, nil, opts...)
 296  }
 297  
 298  // SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when
 299  // all subscriptions have received an EOSE
 300  func (pool *SimplePool) SubscribeManyNotifyEOSE(
 301  	ctx context.Context,
 302  	urls []string,
 303  	filter Filter,
 304  	eoseChan chan struct{},
 305  	opts ...SubscriptionOption,
 306  ) chan RelayEvent {
 307  	return pool.subMany(ctx, urls, Filters{filter}, eoseChan, opts...)
 308  }
 309  
 310  type ReplaceableKey struct {
 311  	PubKey string
 312  	D      string
 313  }
 314  
 315  // FetchManyReplaceable is like FetchMany, but deduplicates replaceable and addressable events and returns
 316  // only the latest for each "d" tag.
 317  func (pool *SimplePool) FetchManyReplaceable(
 318  	ctx context.Context,
 319  	urls []string,
 320  	filter Filter,
 321  	opts ...SubscriptionOption,
 322  ) *xsync.MapOf[ReplaceableKey, *Event] {
 323  	ctx, cancel := context.WithCancelCause(ctx)
 324  
 325  	results := xsync.NewMapOf[ReplaceableKey, *Event]()
 326  
 327  	wg := sync.WaitGroup{}
 328  	wg.Add(len(urls))
 329  
 330  	seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, Timestamp]()
 331  	opts = append(opts, WithCheckDuplicateReplaceable(func(rk ReplaceableKey, ts Timestamp) bool {
 332  		updated := false
 333  		seenAlreadyLatest.Compute(rk, func(latest Timestamp, _ bool) (newValue Timestamp, delete bool) {
 334  			if ts > latest {
 335  				updated = true // we are updating the most recent
 336  				return ts, false
 337  			}
 338  			return latest, false // the one we had was already more recent
 339  		})
 340  		return updated
 341  	}))
 342  
 343  	for _, url := range urls {
 344  		go func(nm string) {
 345  			defer wg.Done()
 346  
 347  			if mh := pool.queryMiddleware; mh != nil {
 348  				if filter.Kinds != nil && filter.Authors != nil {
 349  					for _, kind := range filter.Kinds {
 350  						for _, author := range filter.Authors {
 351  							mh(nm, author, kind)
 352  						}
 353  					}
 354  				}
 355  			}
 356  
 357  			relay, err := pool.EnsureRelay(nm)
 358  			if err != nil {
 359  				debugLogf("error connecting to %s with %v: %s", nm, filter, err)
 360  				return
 361  			}
 362  
 363  			hasAuthed := false
 364  
 365  		subscribe:
 366  			sub, err := relay.Subscribe(ctx, Filters{filter}, opts...)
 367  			if err != nil {
 368  				debugLogf("error subscribing to %s with %v: %s", relay, filter, err)
 369  				return
 370  			}
 371  
 372  			for {
 373  				select {
 374  				case <-ctx.Done():
 375  					return
 376  				case <-sub.EndOfStoredEvents:
 377  					return
 378  				case reason := <-sub.ClosedReason:
 379  					if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
 380  						// relay is requesting auth. if we can we will perform auth and try again
 381  						err := relay.Auth(ctx, func(event *Event) error {
 382  							return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
 383  						})
 384  						if err == nil {
 385  							hasAuthed = true // so we don't keep doing AUTH again and again
 386  							goto subscribe
 387  						}
 388  					}
 389  					debugLogf("CLOSED from %s: '%s'\n", nm, reason)
 390  					return
 391  				case evt, more := <-sub.Events:
 392  					if !more {
 393  						return
 394  					}
 395  
 396  					ie := RelayEvent{Event: evt, Relay: relay}
 397  					if mh := pool.eventMiddleware; mh != nil {
 398  						mh(ie)
 399  					}
 400  
 401  					results.Store(ReplaceableKey{evt.PubKey, evt.Tags.GetD()}, evt)
 402  				}
 403  			}
 404  		}(NormalizeURL(url))
 405  	}
 406  
 407  	// this will happen when all subscriptions get an eose (or when they die)
 408  	wg.Wait()
 409  	cancel(errors.New("all subscriptions ended"))
 410  
 411  	return results
 412  }
 413  
 414  func (pool *SimplePool) subMany(
 415  	ctx context.Context,
 416  	urls []string,
 417  	filters Filters,
 418  	eoseChan chan struct{},
 419  	opts ...SubscriptionOption,
 420  ) chan RelayEvent {
 421  	ctx, cancel := context.WithCancelCause(ctx)
 422  	_ = cancel // do this so `go vet` will stop complaining
 423  	events := make(chan RelayEvent)
 424  	seenAlready := xsync.NewMapOf[string, Timestamp]()
 425  	ticker := time.NewTicker(seenAlreadyDropTick)
 426  
 427  	eoseWg := sync.WaitGroup{}
 428  	eoseWg.Add(len(urls))
 429  	if eoseChan != nil {
 430  		go func() {
 431  			eoseWg.Wait()
 432  			close(eoseChan)
 433  		}()
 434  	}
 435  
 436  	pending := xsync.NewCounter()
 437  	pending.Add(int64(len(urls)))
 438  	for i, url := range urls {
 439  		url = NormalizeURL(url)
 440  		urls[i] = url
 441  		if idx := slices.Index(urls, url); idx != i {
 442  			// skip duplicate relays in the list
 443  			eoseWg.Done()
 444  			continue
 445  		}
 446  
 447  		eosed := atomic.Bool{}
 448  		firstConnection := true
 449  
 450  		go func(nm string) {
 451  			defer func() {
 452  				pending.Dec()
 453  				if pending.Value() == 0 {
 454  					close(events)
 455  					cancel(fmt.Errorf("aborted: %w", context.Cause(ctx)))
 456  				}
 457  				if eosed.CompareAndSwap(false, true) {
 458  					eoseWg.Done()
 459  				}
 460  			}()
 461  
 462  			hasAuthed := false
 463  			interval := 3 * time.Second
 464  			for {
 465  				select {
 466  				case <-ctx.Done():
 467  					return
 468  				default:
 469  				}
 470  
 471  				var sub *Subscription
 472  
 473  				if mh := pool.queryMiddleware; mh != nil {
 474  					for _, filter := range filters {
 475  						if filter.Kinds != nil && filter.Authors != nil {
 476  							for _, kind := range filter.Kinds {
 477  								for _, author := range filter.Authors {
 478  									mh(nm, author, kind)
 479  								}
 480  							}
 481  						}
 482  					}
 483  				}
 484  
 485  				relay, err := pool.EnsureRelay(nm)
 486  				if err != nil {
 487  					// if we never connected to this just fail
 488  					if firstConnection {
 489  						return
 490  					}
 491  
 492  					// otherwise (if we were connected and got disconnected) keep trying to reconnect
 493  					debugLogf("%s reconnecting because connection failed\n", nm)
 494  					goto reconnect
 495  				}
 496  				firstConnection = false
 497  				hasAuthed = false
 498  
 499  			subscribe:
 500  				sub, err = relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(id, relay string) bool {
 501  					_, exists := seenAlready.Load(id)
 502  					if exists && pool.duplicateMiddleware != nil {
 503  						pool.duplicateMiddleware(relay, id)
 504  					}
 505  					return exists
 506  				}))...)
 507  				if err != nil {
 508  					debugLogf("%s reconnecting because subscription died\n", nm)
 509  					goto reconnect
 510  				}
 511  
 512  				go func() {
 513  					<-sub.EndOfStoredEvents
 514  
 515  					// guard here otherwise a resubscription will trigger a duplicate call to eoseWg.Done()
 516  					if eosed.CompareAndSwap(false, true) {
 517  						eoseWg.Done()
 518  					}
 519  				}()
 520  
 521  				// reset interval when we get a good subscription
 522  				interval = 3 * time.Second
 523  
 524  				for {
 525  					select {
 526  					case evt, more := <-sub.Events:
 527  						if !more {
 528  							// this means the connection was closed for weird reasons, like the server shut down
 529  							// so we will update the filters here to include only events seem from now on
 530  							// and try to reconnect until we succeed
 531  							now := Now()
 532  							for i := range filters {
 533  								filters[i].Since = &now
 534  							}
 535  							debugLogf("%s reconnecting because sub.Events is broken\n", nm)
 536  							goto reconnect
 537  						}
 538  
 539  						ie := RelayEvent{Event: evt, Relay: relay}
 540  						if mh := pool.eventMiddleware; mh != nil {
 541  							mh(ie)
 542  						}
 543  
 544  						select {
 545  						case events <- ie:
 546  						case <-ctx.Done():
 547  							return
 548  						}
 549  					case <-ticker.C:
 550  						if eosed.Load() {
 551  							old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
 552  							for id, value := range seenAlready.Range {
 553  								if value < old {
 554  									seenAlready.Delete(id)
 555  								}
 556  							}
 557  						}
 558  					case reason := <-sub.ClosedReason:
 559  						if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
 560  							// relay is requesting auth. if we can we will perform auth and try again
 561  							err := relay.Auth(ctx, func(event *Event) error {
 562  								return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
 563  							})
 564  							if err == nil {
 565  								hasAuthed = true // so we don't keep doing AUTH again and again
 566  								goto subscribe
 567  							}
 568  						} else {
 569  							debugLogf("CLOSED from %s: '%s'\n", nm, reason)
 570  						}
 571  
 572  						return
 573  					case <-ctx.Done():
 574  						return
 575  					}
 576  				}
 577  
 578  			reconnect:
 579  				// we will go back to the beginning of the loop and try to connect again and again
 580  				// until the context is canceled
 581  				time.Sleep(interval)
 582  				interval = interval * 17 / 10 // the next time we try we will wait longer
 583  			}
 584  		}(url)
 585  	}
 586  
 587  	return events
 588  }
 589  
 590  // Deprecated: SubManyEose is deprecated: use FetchMany instead.
 591  func (pool *SimplePool) SubManyEose(
 592  	ctx context.Context,
 593  	urls []string,
 594  	filters Filters,
 595  	opts ...SubscriptionOption,
 596  ) chan RelayEvent {
 597  	seenAlready := xsync.NewMapOf[string, struct{}]()
 598  	return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters,
 599  		WithCheckDuplicate(func(id, relay string) bool {
 600  			_, exists := seenAlready.LoadOrStore(id, struct{}{})
 601  			if exists && pool.duplicateMiddleware != nil {
 602  				pool.duplicateMiddleware(relay, id)
 603  			}
 604  			return exists
 605  		}),
 606  		opts...)
 607  }
 608  
 609  func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
 610  	ctx context.Context,
 611  	urls []string,
 612  	filters Filters,
 613  	wcd WithCheckDuplicate,
 614  	opts ...SubscriptionOption,
 615  ) chan RelayEvent {
 616  	ctx, cancel := context.WithCancelCause(ctx)
 617  
 618  	events := make(chan RelayEvent)
 619  	wg := sync.WaitGroup{}
 620  	wg.Add(len(urls))
 621  
 622  	opts = append(opts, wcd)
 623  
 624  	go func() {
 625  		// this will happen when all subscriptions get an eose (or when they die)
 626  		wg.Wait()
 627  		cancel(errors.New("all subscriptions ended"))
 628  		close(events)
 629  	}()
 630  
 631  	for _, url := range urls {
 632  		go func(nm string) {
 633  			defer wg.Done()
 634  
 635  			if mh := pool.queryMiddleware; mh != nil {
 636  				for _, filter := range filters {
 637  					if filter.Kinds != nil && filter.Authors != nil {
 638  						for _, kind := range filter.Kinds {
 639  							for _, author := range filter.Authors {
 640  								mh(nm, author, kind)
 641  							}
 642  						}
 643  					}
 644  				}
 645  			}
 646  
 647  			relay, err := pool.EnsureRelay(nm)
 648  			if err != nil {
 649  				debugLogf("error connecting to %s with %v: %s", nm, filters, err)
 650  				return
 651  			}
 652  
 653  			hasAuthed := false
 654  
 655  		subscribe:
 656  			sub, err := relay.Subscribe(ctx, filters, opts...)
 657  			if err != nil {
 658  				debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
 659  				return
 660  			}
 661  
 662  			for {
 663  				select {
 664  				case <-ctx.Done():
 665  					return
 666  				case <-sub.EndOfStoredEvents:
 667  					return
 668  				case reason := <-sub.ClosedReason:
 669  					if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
 670  						// relay is requesting auth. if we can we will perform auth and try again
 671  						err := relay.Auth(ctx, func(event *Event) error {
 672  							return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
 673  						})
 674  						if err == nil {
 675  							hasAuthed = true // so we don't keep doing AUTH again and again
 676  							goto subscribe
 677  						}
 678  					}
 679  					debugLogf("CLOSED from %s: '%s'\n", nm, reason)
 680  					return
 681  				case evt, more := <-sub.Events:
 682  					if !more {
 683  						return
 684  					}
 685  
 686  					ie := RelayEvent{Event: evt, Relay: relay}
 687  					if mh := pool.eventMiddleware; mh != nil {
 688  						mh(ie)
 689  					}
 690  
 691  					select {
 692  					case events <- ie:
 693  					case <-ctx.Done():
 694  						return
 695  					}
 696  				}
 697  			}
 698  		}(NormalizeURL(url))
 699  	}
 700  
 701  	return events
 702  }
 703  
 704  // CountMany aggregates count results from multiple relays using NIP-45 HyperLogLog
 705  func (pool *SimplePool) CountMany(
 706  	ctx context.Context,
 707  	urls []string,
 708  	filter Filter,
 709  	opts []SubscriptionOption,
 710  ) int {
 711  	hll := hyperloglog.New(0) // offset is irrelevant here
 712  
 713  	wg := sync.WaitGroup{}
 714  	wg.Add(len(urls))
 715  	for _, url := range urls {
 716  		go func(nm string) {
 717  			defer wg.Done()
 718  			relay, err := pool.EnsureRelay(url)
 719  			if err != nil {
 720  				return
 721  			}
 722  			ce, err := relay.countInternal(ctx, Filters{filter}, opts...)
 723  			if err != nil {
 724  				return
 725  			}
 726  			if len(ce.HyperLogLog) != 256 {
 727  				return
 728  			}
 729  			hll.MergeRegisters(ce.HyperLogLog)
 730  		}(NormalizeURL(url))
 731  	}
 732  
 733  	wg.Wait()
 734  	return int(hll.Count())
 735  }
 736  
 737  // QuerySingle returns the first event returned by the first relay, cancels everything else.
 738  func (pool *SimplePool) QuerySingle(
 739  	ctx context.Context,
 740  	urls []string,
 741  	filter Filter,
 742  	opts ...SubscriptionOption,
 743  ) *RelayEvent {
 744  	ctx, cancel := context.WithCancelCause(ctx)
 745  	for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}, opts...) {
 746  		cancel(errors.New("got the first event and ended successfully"))
 747  		return &ievt
 748  	}
 749  	cancel(errors.New("SubManyEose() didn't get yield events"))
 750  	return nil
 751  }
 752  
 753  // BatchedSubManyEose performs batched subscriptions to multiple relays with different filters.
 754  func (pool *SimplePool) BatchedSubManyEose(
 755  	ctx context.Context,
 756  	dfs []DirectedFilter,
 757  	opts ...SubscriptionOption,
 758  ) chan RelayEvent {
 759  	res := make(chan RelayEvent)
 760  	wg := sync.WaitGroup{}
 761  	wg.Add(len(dfs))
 762  	seenAlready := xsync.NewMapOf[string, struct{}]()
 763  
 764  	for _, df := range dfs {
 765  		go func(df DirectedFilter) {
 766  			for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(ctx,
 767  				[]string{df.Relay},
 768  				Filters{df.Filter},
 769  				WithCheckDuplicate(func(id, relay string) bool {
 770  					_, exists := seenAlready.LoadOrStore(id, struct{}{})
 771  					if exists && pool.duplicateMiddleware != nil {
 772  						pool.duplicateMiddleware(relay, id)
 773  					}
 774  					return exists
 775  				}), opts...,
 776  			) {
 777  				select {
 778  				case res <- ie:
 779  				case <-ctx.Done():
 780  					wg.Done()
 781  					return
 782  				}
 783  			}
 784  			wg.Done()
 785  		}(df)
 786  	}
 787  
 788  	go func() {
 789  		wg.Wait()
 790  		close(res)
 791  	}()
 792  
 793  	return res
 794  }
 795  
 796  // Close closes the pool with the given reason.
 797  func (pool *SimplePool) Close(reason string) {
 798  	pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason))
 799  }
 800