pool.go raw
1 package nostr
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "math"
8 "net/http"
9 "slices"
10 "strings"
11 "sync"
12 "sync/atomic"
13 "time"
14
15 "github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
16 "github.com/puzpuzpuz/xsync/v3"
17 )
18
19 const (
20 seenAlreadyDropTick = time.Minute
21 )
22
23 // SimplePool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated.
24 type SimplePool struct {
25 Relays *xsync.MapOf[string, *Relay]
26 Context context.Context
27
28 authHandler func(context.Context, RelayEvent) error
29 cancel context.CancelCauseFunc
30
31 eventMiddleware func(RelayEvent)
32 duplicateMiddleware func(relay string, id string)
33 queryMiddleware func(relay string, pubkey string, kind int)
34
35 // custom things not often used
36 penaltyBoxMu sync.Mutex
37 penaltyBox map[string][2]float64
38 relayOptions []RelayOption
39 }
40
41 // DirectedFilter combines a Filter with a specific relay URL.
42 type DirectedFilter struct {
43 Filter
44 Relay string
45 }
46
47 // RelayEvent represents an event received from a specific relay.
48 type RelayEvent struct {
49 *Event
50 Relay *Relay
51 }
52
53 func (ie RelayEvent) String() string { return fmt.Sprintf("[%s] >> %s", ie.Relay.URL, ie.Event) }
54
55 // PoolOption is an interface for options that can be applied to a SimplePool.
56 type PoolOption interface {
57 ApplyPoolOption(*SimplePool)
58 }
59
60 // NewSimplePool creates a new SimplePool with the given context and options.
61 func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool {
62 ctx, cancel := context.WithCancelCause(ctx)
63
64 pool := &SimplePool{
65 Relays: xsync.NewMapOf[string, *Relay](),
66
67 Context: ctx,
68 cancel: cancel,
69 }
70
71 for _, opt := range opts {
72 opt.ApplyPoolOption(pool)
73 }
74
75 return pool
76 }
77
78 // WithRelayOptions sets options that will be used on every relay instance created by this pool.
79 func WithRelayOptions(ropts ...RelayOption) withRelayOptionsOpt {
80 return ropts
81 }
82
83 type withRelayOptionsOpt []RelayOption
84
85 func (h withRelayOptionsOpt) ApplyPoolOption(pool *SimplePool) {
86 pool.relayOptions = h
87 }
88
89 // WithAuthHandler must be a function that signs the auth event when called.
90 // it will be called whenever any relay in the pool returns a `CLOSED` message
91 // with the "auth-required:" prefix, only once for each relay
92 type WithAuthHandler func(ctx context.Context, authEvent RelayEvent) error
93
94 func (h WithAuthHandler) ApplyPoolOption(pool *SimplePool) {
95 pool.authHandler = h
96 }
97
98 // WithPenaltyBox just sets the penalty box mechanism so relays that fail to connect
99 // or that disconnect will be ignored for a while and we won't attempt to connect again.
100 func WithPenaltyBox() withPenaltyBoxOpt { return withPenaltyBoxOpt{} }
101
102 type withPenaltyBoxOpt struct{}
103
104 func (h withPenaltyBoxOpt) ApplyPoolOption(pool *SimplePool) {
105 pool.penaltyBox = make(map[string][2]float64)
106 go func() {
107 sleep := 30.0
108 for {
109 time.Sleep(time.Duration(sleep) * time.Second)
110
111 pool.penaltyBoxMu.Lock()
112 nextSleep := 300.0
113 for url, v := range pool.penaltyBox {
114 remainingSeconds := v[1]
115 remainingSeconds -= sleep
116 if remainingSeconds <= 0 {
117 pool.penaltyBox[url] = [2]float64{v[0], 0}
118 continue
119 } else {
120 pool.penaltyBox[url] = [2]float64{v[0], remainingSeconds}
121 }
122
123 if remainingSeconds < nextSleep {
124 nextSleep = remainingSeconds
125 }
126 }
127
128 sleep = nextSleep
129 pool.penaltyBoxMu.Unlock()
130 }
131 }()
132 }
133
134 // WithEventMiddleware is a function that will be called with all events received.
135 type WithEventMiddleware func(RelayEvent)
136
137 func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
138 pool.eventMiddleware = h
139 }
140
141 // WithDuplicateMiddleware is a function that will be called with all duplicate ids received.
142 type WithDuplicateMiddleware func(relay string, id string)
143
144 func (h WithDuplicateMiddleware) ApplyPoolOption(pool *SimplePool) {
145 pool.duplicateMiddleware = h
146 }
147
148 // WithAuthorKindQueryMiddleware is a function that will be called with every combination of relay+pubkey+kind queried
149 // in a .SubMany*() call -- when applicable (i.e. when the query contains a pubkey and a kind).
150 type WithAuthorKindQueryMiddleware func(relay string, pubkey string, kind int)
151
152 func (h WithAuthorKindQueryMiddleware) ApplyPoolOption(pool *SimplePool) {
153 pool.queryMiddleware = h
154 }
155
156 var (
157 _ PoolOption = (WithAuthHandler)(nil)
158 _ PoolOption = (WithEventMiddleware)(nil)
159 _ PoolOption = WithPenaltyBox()
160 _ PoolOption = WithRelayOptions(WithRequestHeader(http.Header{}))
161 )
162
163 // EnsureRelay ensures that a relay connection exists and is active.
164 // If the relay is not connected, it attempts to connect.
165 func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
166 nm := NormalizeURL(url)
167 defer namedLock(nm)()
168
169 relay, ok := pool.Relays.Load(nm)
170 if ok && relay == nil {
171 if pool.penaltyBox != nil {
172 pool.penaltyBoxMu.Lock()
173 defer pool.penaltyBoxMu.Unlock()
174 v, _ := pool.penaltyBox[nm]
175 if v[1] > 0 {
176 return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
177 }
178 }
179 } else if ok && relay.IsConnected() {
180 // already connected, unlock and return
181 return relay, nil
182 }
183
184 // try to connect
185 // we use this ctx here so when the pool dies everything dies
186 ctx, cancel := context.WithTimeoutCause(
187 pool.Context,
188 time.Second*15,
189 errors.New("connecting to the relay took too long"),
190 )
191 defer cancel()
192
193 relay = NewRelay(context.Background(), url, pool.relayOptions...)
194 if err := relay.Connect(ctx); err != nil {
195 if pool.penaltyBox != nil {
196 // putting relay in penalty box
197 pool.penaltyBoxMu.Lock()
198 defer pool.penaltyBoxMu.Unlock()
199 v, _ := pool.penaltyBox[nm]
200 pool.penaltyBox[nm] = [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)}
201 }
202 return nil, fmt.Errorf("failed to connect: %w", err)
203 }
204
205 pool.Relays.Store(nm, relay)
206 return relay, nil
207 }
208
209 // PublishResult represents the result of publishing an event to a relay.
210 type PublishResult struct {
211 Error error
212 RelayURL string
213 Relay *Relay
214 }
215
216 // PublishMany publishes an event to multiple relays and returns a channel of results emitted as they're received.
217 func (pool *SimplePool) PublishMany(ctx context.Context, urls []string, evt Event) chan PublishResult {
218 ch := make(chan PublishResult, len(urls))
219
220 wg := sync.WaitGroup{}
221 wg.Add(len(urls))
222 go func() {
223 for _, url := range urls {
224 go func() {
225 defer wg.Done()
226
227 relay, err := pool.EnsureRelay(url)
228 if err != nil {
229 ch <- PublishResult{err, url, nil}
230 return
231 }
232
233 if err := relay.Publish(ctx, evt); err == nil {
234 // success with no auth required
235 ch <- PublishResult{nil, url, relay}
236 } else if strings.HasPrefix(err.Error(), "msg: auth-required:") && pool.authHandler != nil {
237 // try to authenticate if we can
238 if authErr := relay.Auth(ctx, func(event *Event) error {
239 return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
240 }); authErr == nil {
241 if err := relay.Publish(ctx, evt); err == nil {
242 // success after auth
243 ch <- PublishResult{nil, url, relay}
244 } else {
245 // failure after auth
246 ch <- PublishResult{err, url, relay}
247 }
248 } else {
249 // failure to auth
250 ch <- PublishResult{fmt.Errorf("failed to auth: %w", authErr), url, relay}
251 }
252 } else {
253 // direct failure
254 ch <- PublishResult{err, url, relay}
255 }
256 }()
257 }
258
259 wg.Wait()
260 close(ch)
261 }()
262
263 return ch
264 }
265
266 // SubscribeMany opens a subscription with the given filter to multiple relays
267 // the subscriptions ends when the context is canceled or when all relays return a CLOSED.
268 func (pool *SimplePool) SubscribeMany(
269 ctx context.Context,
270 urls []string,
271 filter Filter,
272 opts ...SubscriptionOption,
273 ) chan RelayEvent {
274 return pool.subMany(ctx, urls, Filters{filter}, nil, opts...)
275 }
276
277 // FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays
278 // return an EOSE message.
279 func (pool *SimplePool) FetchMany(
280 ctx context.Context,
281 urls []string,
282 filter Filter,
283 opts ...SubscriptionOption,
284 ) chan RelayEvent {
285 return pool.SubManyEose(ctx, urls, Filters{filter}, opts...)
286 }
287
288 // Deprecated: SubMany is deprecated: use SubscribeMany instead.
289 func (pool *SimplePool) SubMany(
290 ctx context.Context,
291 urls []string,
292 filters Filters,
293 opts ...SubscriptionOption,
294 ) chan RelayEvent {
295 return pool.subMany(ctx, urls, filters, nil, opts...)
296 }
297
298 // SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when
299 // all subscriptions have received an EOSE
300 func (pool *SimplePool) SubscribeManyNotifyEOSE(
301 ctx context.Context,
302 urls []string,
303 filter Filter,
304 eoseChan chan struct{},
305 opts ...SubscriptionOption,
306 ) chan RelayEvent {
307 return pool.subMany(ctx, urls, Filters{filter}, eoseChan, opts...)
308 }
309
310 type ReplaceableKey struct {
311 PubKey string
312 D string
313 }
314
315 // FetchManyReplaceable is like FetchMany, but deduplicates replaceable and addressable events and returns
316 // only the latest for each "d" tag.
317 func (pool *SimplePool) FetchManyReplaceable(
318 ctx context.Context,
319 urls []string,
320 filter Filter,
321 opts ...SubscriptionOption,
322 ) *xsync.MapOf[ReplaceableKey, *Event] {
323 ctx, cancel := context.WithCancelCause(ctx)
324
325 results := xsync.NewMapOf[ReplaceableKey, *Event]()
326
327 wg := sync.WaitGroup{}
328 wg.Add(len(urls))
329
330 seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, Timestamp]()
331 opts = append(opts, WithCheckDuplicateReplaceable(func(rk ReplaceableKey, ts Timestamp) bool {
332 updated := false
333 seenAlreadyLatest.Compute(rk, func(latest Timestamp, _ bool) (newValue Timestamp, delete bool) {
334 if ts > latest {
335 updated = true // we are updating the most recent
336 return ts, false
337 }
338 return latest, false // the one we had was already more recent
339 })
340 return updated
341 }))
342
343 for _, url := range urls {
344 go func(nm string) {
345 defer wg.Done()
346
347 if mh := pool.queryMiddleware; mh != nil {
348 if filter.Kinds != nil && filter.Authors != nil {
349 for _, kind := range filter.Kinds {
350 for _, author := range filter.Authors {
351 mh(nm, author, kind)
352 }
353 }
354 }
355 }
356
357 relay, err := pool.EnsureRelay(nm)
358 if err != nil {
359 debugLogf("error connecting to %s with %v: %s", nm, filter, err)
360 return
361 }
362
363 hasAuthed := false
364
365 subscribe:
366 sub, err := relay.Subscribe(ctx, Filters{filter}, opts...)
367 if err != nil {
368 debugLogf("error subscribing to %s with %v: %s", relay, filter, err)
369 return
370 }
371
372 for {
373 select {
374 case <-ctx.Done():
375 return
376 case <-sub.EndOfStoredEvents:
377 return
378 case reason := <-sub.ClosedReason:
379 if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
380 // relay is requesting auth. if we can we will perform auth and try again
381 err := relay.Auth(ctx, func(event *Event) error {
382 return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
383 })
384 if err == nil {
385 hasAuthed = true // so we don't keep doing AUTH again and again
386 goto subscribe
387 }
388 }
389 debugLogf("CLOSED from %s: '%s'\n", nm, reason)
390 return
391 case evt, more := <-sub.Events:
392 if !more {
393 return
394 }
395
396 ie := RelayEvent{Event: evt, Relay: relay}
397 if mh := pool.eventMiddleware; mh != nil {
398 mh(ie)
399 }
400
401 results.Store(ReplaceableKey{evt.PubKey, evt.Tags.GetD()}, evt)
402 }
403 }
404 }(NormalizeURL(url))
405 }
406
407 // this will happen when all subscriptions get an eose (or when they die)
408 wg.Wait()
409 cancel(errors.New("all subscriptions ended"))
410
411 return results
412 }
413
414 func (pool *SimplePool) subMany(
415 ctx context.Context,
416 urls []string,
417 filters Filters,
418 eoseChan chan struct{},
419 opts ...SubscriptionOption,
420 ) chan RelayEvent {
421 ctx, cancel := context.WithCancelCause(ctx)
422 _ = cancel // do this so `go vet` will stop complaining
423 events := make(chan RelayEvent)
424 seenAlready := xsync.NewMapOf[string, Timestamp]()
425 ticker := time.NewTicker(seenAlreadyDropTick)
426
427 eoseWg := sync.WaitGroup{}
428 eoseWg.Add(len(urls))
429 if eoseChan != nil {
430 go func() {
431 eoseWg.Wait()
432 close(eoseChan)
433 }()
434 }
435
436 pending := xsync.NewCounter()
437 pending.Add(int64(len(urls)))
438 for i, url := range urls {
439 url = NormalizeURL(url)
440 urls[i] = url
441 if idx := slices.Index(urls, url); idx != i {
442 // skip duplicate relays in the list
443 eoseWg.Done()
444 continue
445 }
446
447 eosed := atomic.Bool{}
448 firstConnection := true
449
450 go func(nm string) {
451 defer func() {
452 pending.Dec()
453 if pending.Value() == 0 {
454 close(events)
455 cancel(fmt.Errorf("aborted: %w", context.Cause(ctx)))
456 }
457 if eosed.CompareAndSwap(false, true) {
458 eoseWg.Done()
459 }
460 }()
461
462 hasAuthed := false
463 interval := 3 * time.Second
464 for {
465 select {
466 case <-ctx.Done():
467 return
468 default:
469 }
470
471 var sub *Subscription
472
473 if mh := pool.queryMiddleware; mh != nil {
474 for _, filter := range filters {
475 if filter.Kinds != nil && filter.Authors != nil {
476 for _, kind := range filter.Kinds {
477 for _, author := range filter.Authors {
478 mh(nm, author, kind)
479 }
480 }
481 }
482 }
483 }
484
485 relay, err := pool.EnsureRelay(nm)
486 if err != nil {
487 // if we never connected to this just fail
488 if firstConnection {
489 return
490 }
491
492 // otherwise (if we were connected and got disconnected) keep trying to reconnect
493 debugLogf("%s reconnecting because connection failed\n", nm)
494 goto reconnect
495 }
496 firstConnection = false
497 hasAuthed = false
498
499 subscribe:
500 sub, err = relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(id, relay string) bool {
501 _, exists := seenAlready.Load(id)
502 if exists && pool.duplicateMiddleware != nil {
503 pool.duplicateMiddleware(relay, id)
504 }
505 return exists
506 }))...)
507 if err != nil {
508 debugLogf("%s reconnecting because subscription died\n", nm)
509 goto reconnect
510 }
511
512 go func() {
513 <-sub.EndOfStoredEvents
514
515 // guard here otherwise a resubscription will trigger a duplicate call to eoseWg.Done()
516 if eosed.CompareAndSwap(false, true) {
517 eoseWg.Done()
518 }
519 }()
520
521 // reset interval when we get a good subscription
522 interval = 3 * time.Second
523
524 for {
525 select {
526 case evt, more := <-sub.Events:
527 if !more {
528 // this means the connection was closed for weird reasons, like the server shut down
529 // so we will update the filters here to include only events seem from now on
530 // and try to reconnect until we succeed
531 now := Now()
532 for i := range filters {
533 filters[i].Since = &now
534 }
535 debugLogf("%s reconnecting because sub.Events is broken\n", nm)
536 goto reconnect
537 }
538
539 ie := RelayEvent{Event: evt, Relay: relay}
540 if mh := pool.eventMiddleware; mh != nil {
541 mh(ie)
542 }
543
544 select {
545 case events <- ie:
546 case <-ctx.Done():
547 return
548 }
549 case <-ticker.C:
550 if eosed.Load() {
551 old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
552 for id, value := range seenAlready.Range {
553 if value < old {
554 seenAlready.Delete(id)
555 }
556 }
557 }
558 case reason := <-sub.ClosedReason:
559 if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
560 // relay is requesting auth. if we can we will perform auth and try again
561 err := relay.Auth(ctx, func(event *Event) error {
562 return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
563 })
564 if err == nil {
565 hasAuthed = true // so we don't keep doing AUTH again and again
566 goto subscribe
567 }
568 } else {
569 debugLogf("CLOSED from %s: '%s'\n", nm, reason)
570 }
571
572 return
573 case <-ctx.Done():
574 return
575 }
576 }
577
578 reconnect:
579 // we will go back to the beginning of the loop and try to connect again and again
580 // until the context is canceled
581 time.Sleep(interval)
582 interval = interval * 17 / 10 // the next time we try we will wait longer
583 }
584 }(url)
585 }
586
587 return events
588 }
589
590 // Deprecated: SubManyEose is deprecated: use FetchMany instead.
591 func (pool *SimplePool) SubManyEose(
592 ctx context.Context,
593 urls []string,
594 filters Filters,
595 opts ...SubscriptionOption,
596 ) chan RelayEvent {
597 seenAlready := xsync.NewMapOf[string, struct{}]()
598 return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters,
599 WithCheckDuplicate(func(id, relay string) bool {
600 _, exists := seenAlready.LoadOrStore(id, struct{}{})
601 if exists && pool.duplicateMiddleware != nil {
602 pool.duplicateMiddleware(relay, id)
603 }
604 return exists
605 }),
606 opts...)
607 }
608
609 func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
610 ctx context.Context,
611 urls []string,
612 filters Filters,
613 wcd WithCheckDuplicate,
614 opts ...SubscriptionOption,
615 ) chan RelayEvent {
616 ctx, cancel := context.WithCancelCause(ctx)
617
618 events := make(chan RelayEvent)
619 wg := sync.WaitGroup{}
620 wg.Add(len(urls))
621
622 opts = append(opts, wcd)
623
624 go func() {
625 // this will happen when all subscriptions get an eose (or when they die)
626 wg.Wait()
627 cancel(errors.New("all subscriptions ended"))
628 close(events)
629 }()
630
631 for _, url := range urls {
632 go func(nm string) {
633 defer wg.Done()
634
635 if mh := pool.queryMiddleware; mh != nil {
636 for _, filter := range filters {
637 if filter.Kinds != nil && filter.Authors != nil {
638 for _, kind := range filter.Kinds {
639 for _, author := range filter.Authors {
640 mh(nm, author, kind)
641 }
642 }
643 }
644 }
645 }
646
647 relay, err := pool.EnsureRelay(nm)
648 if err != nil {
649 debugLogf("error connecting to %s with %v: %s", nm, filters, err)
650 return
651 }
652
653 hasAuthed := false
654
655 subscribe:
656 sub, err := relay.Subscribe(ctx, filters, opts...)
657 if err != nil {
658 debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
659 return
660 }
661
662 for {
663 select {
664 case <-ctx.Done():
665 return
666 case <-sub.EndOfStoredEvents:
667 return
668 case reason := <-sub.ClosedReason:
669 if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
670 // relay is requesting auth. if we can we will perform auth and try again
671 err := relay.Auth(ctx, func(event *Event) error {
672 return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
673 })
674 if err == nil {
675 hasAuthed = true // so we don't keep doing AUTH again and again
676 goto subscribe
677 }
678 }
679 debugLogf("CLOSED from %s: '%s'\n", nm, reason)
680 return
681 case evt, more := <-sub.Events:
682 if !more {
683 return
684 }
685
686 ie := RelayEvent{Event: evt, Relay: relay}
687 if mh := pool.eventMiddleware; mh != nil {
688 mh(ie)
689 }
690
691 select {
692 case events <- ie:
693 case <-ctx.Done():
694 return
695 }
696 }
697 }
698 }(NormalizeURL(url))
699 }
700
701 return events
702 }
703
704 // CountMany aggregates count results from multiple relays using NIP-45 HyperLogLog
705 func (pool *SimplePool) CountMany(
706 ctx context.Context,
707 urls []string,
708 filter Filter,
709 opts []SubscriptionOption,
710 ) int {
711 hll := hyperloglog.New(0) // offset is irrelevant here
712
713 wg := sync.WaitGroup{}
714 wg.Add(len(urls))
715 for _, url := range urls {
716 go func(nm string) {
717 defer wg.Done()
718 relay, err := pool.EnsureRelay(url)
719 if err != nil {
720 return
721 }
722 ce, err := relay.countInternal(ctx, Filters{filter}, opts...)
723 if err != nil {
724 return
725 }
726 if len(ce.HyperLogLog) != 256 {
727 return
728 }
729 hll.MergeRegisters(ce.HyperLogLog)
730 }(NormalizeURL(url))
731 }
732
733 wg.Wait()
734 return int(hll.Count())
735 }
736
737 // QuerySingle returns the first event returned by the first relay, cancels everything else.
738 func (pool *SimplePool) QuerySingle(
739 ctx context.Context,
740 urls []string,
741 filter Filter,
742 opts ...SubscriptionOption,
743 ) *RelayEvent {
744 ctx, cancel := context.WithCancelCause(ctx)
745 for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}, opts...) {
746 cancel(errors.New("got the first event and ended successfully"))
747 return &ievt
748 }
749 cancel(errors.New("SubManyEose() didn't get yield events"))
750 return nil
751 }
752
753 // BatchedSubManyEose performs batched subscriptions to multiple relays with different filters.
754 func (pool *SimplePool) BatchedSubManyEose(
755 ctx context.Context,
756 dfs []DirectedFilter,
757 opts ...SubscriptionOption,
758 ) chan RelayEvent {
759 res := make(chan RelayEvent)
760 wg := sync.WaitGroup{}
761 wg.Add(len(dfs))
762 seenAlready := xsync.NewMapOf[string, struct{}]()
763
764 for _, df := range dfs {
765 go func(df DirectedFilter) {
766 for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(ctx,
767 []string{df.Relay},
768 Filters{df.Filter},
769 WithCheckDuplicate(func(id, relay string) bool {
770 _, exists := seenAlready.LoadOrStore(id, struct{}{})
771 if exists && pool.duplicateMiddleware != nil {
772 pool.duplicateMiddleware(relay, id)
773 }
774 return exists
775 }), opts...,
776 ) {
777 select {
778 case res <- ie:
779 case <-ctx.Done():
780 wg.Done()
781 return
782 }
783 }
784 wg.Done()
785 }(df)
786 }
787
788 go func() {
789 wg.Wait()
790 close(res)
791 }()
792
793 return res
794 }
795
796 // Close closes the pool with the given reason.
797 func (pool *SimplePool) Close(reason string) {
798 pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason))
799 }
800