paginator.go raw

   1  package nostr
   2  
   3  import (
   4  	"context"
   5  	"math"
   6  	"slices"
   7  	"time"
   8  )
   9  
  10  func (pool *SimplePool) PaginatorWithInterval(
  11  	interval time.Duration,
  12  ) func(ctx context.Context, urls []string, filter Filter, opts ...SubscriptionOption) chan RelayEvent {
  13  	return func(ctx context.Context, urls []string, filter Filter, opts ...SubscriptionOption) chan RelayEvent {
  14  		nextUntil := Now()
  15  		if filter.Until != nil {
  16  			nextUntil = *filter.Until
  17  		}
  18  
  19  		globalLimit := uint64(filter.Limit)
  20  		if globalLimit == 0 && !filter.LimitZero {
  21  			globalLimit = math.MaxUint64
  22  		}
  23  		var globalCount uint64 = 0
  24  		globalCh := make(chan RelayEvent)
  25  
  26  		repeatedCache := make([]string, 0, 300)
  27  		nextRepeatedCache := make([]string, 0, 300)
  28  
  29  		go func() {
  30  			defer close(globalCh)
  31  
  32  			for {
  33  				filter.Until = &nextUntil
  34  				time.Sleep(interval)
  35  
  36  				keepGoing := false
  37  				for evt := range pool.FetchMany(ctx, urls, filter, opts...) {
  38  					if slices.Contains(repeatedCache, evt.ID) {
  39  						continue
  40  					}
  41  
  42  					keepGoing = true // if we get one that isn't repeated, then keep trying to get more
  43  					nextRepeatedCache = append(nextRepeatedCache, evt.ID)
  44  
  45  					globalCh <- evt
  46  
  47  					globalCount++
  48  					if globalCount >= globalLimit {
  49  						return
  50  					}
  51  
  52  					if evt.CreatedAt < *filter.Until {
  53  						nextUntil = evt.CreatedAt
  54  					}
  55  				}
  56  
  57  				if !keepGoing {
  58  					return
  59  				}
  60  
  61  				repeatedCache = nextRepeatedCache
  62  				nextRepeatedCache = nextRepeatedCache[:0]
  63  			}
  64  		}()
  65  
  66  		return globalCh
  67  	}
  68  }
  69