subscription.go raw

   1  package nostr
   2  
   3  import (
   4  	"context"
   5  	"errors"
   6  	"fmt"
   7  	"sync"
   8  	"sync/atomic"
   9  )
  10  
  11  // Subscription represents a subscription to a relay.
  12  type Subscription struct {
  13  	counter int64
  14  	id      string
  15  
  16  	Relay   *Relay
  17  	Filters Filters
  18  
  19  	// for this to be treated as a COUNT and not a REQ this must be set
  20  	countResult chan CountEnvelope
  21  
  22  	// the Events channel emits all EVENTs that come in a Subscription
  23  	// will be closed when the subscription ends
  24  	Events chan *Event
  25  	mu     sync.Mutex
  26  
  27  	// the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
  28  	EndOfStoredEvents chan struct{}
  29  
  30  	// the ClosedReason channel emits the reason when a CLOSED message is received
  31  	ClosedReason chan string
  32  
  33  	// Context will be .Done() when the subscription ends
  34  	Context context.Context
  35  
  36  	// if it is not nil, checkDuplicate will be called for every event received
  37  	// if it returns true that event will not be processed further.
  38  	checkDuplicate func(id string, relay string) bool
  39  
  40  	// if it is not nil, checkDuplicateReplaceable will be called for every event received
  41  	// if it returns true that event will not be processed further.
  42  	checkDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
  43  
  44  	match  func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
  45  	live   atomic.Bool
  46  	eosed  atomic.Bool
  47  	cancel context.CancelCauseFunc
  48  
  49  	// this keeps track of the events we've received before the EOSE that we must dispatch before
  50  	// closing the EndOfStoredEvents channel
  51  	storedwg sync.WaitGroup
  52  }
  53  
  54  // SubscriptionOption is the type of the argument passed when instantiating relay connections.
  55  // Some examples are WithLabel.
  56  type SubscriptionOption interface {
  57  	IsSubscriptionOption()
  58  }
  59  
  60  // WithLabel puts a label on the subscription (it is prepended to the automatic id) that is sent to relays.
  61  type WithLabel string
  62  
  63  func (_ WithLabel) IsSubscriptionOption() {}
  64  
  65  // WithCheckDuplicate sets checkDuplicate on the subscription
  66  type WithCheckDuplicate func(id, relay string) bool
  67  
  68  func (_ WithCheckDuplicate) IsSubscriptionOption() {}
  69  
  70  // WithCheckDuplicateReplaceable sets checkDuplicateReplaceable on the subscription
  71  type WithCheckDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
  72  
  73  func (_ WithCheckDuplicateReplaceable) IsSubscriptionOption() {}
  74  
  75  var (
  76  	_ SubscriptionOption = (WithLabel)("")
  77  	_ SubscriptionOption = (WithCheckDuplicate)(nil)
  78  	_ SubscriptionOption = (WithCheckDuplicateReplaceable)(nil)
  79  )
  80  
  81  func (sub *Subscription) start() {
  82  	<-sub.Context.Done()
  83  
  84  	// the subscription ends once the context is canceled (if not already)
  85  	sub.unsub(errors.New("context done on start()")) // this will set sub.live to false
  86  
  87  	// do this so we don't have the possibility of closing the Events channel and then trying to send to it
  88  	sub.mu.Lock()
  89  	close(sub.Events)
  90  	sub.mu.Unlock()
  91  }
  92  
  93  // GetID returns the subscription ID.
  94  func (sub *Subscription) GetID() string { return sub.id }
  95  
  96  func (sub *Subscription) dispatchEvent(evt *Event) {
  97  	added := false
  98  	if !sub.eosed.Load() {
  99  		sub.storedwg.Add(1)
 100  		added = true
 101  	}
 102  
 103  	go func() {
 104  		sub.mu.Lock()
 105  		defer sub.mu.Unlock()
 106  
 107  		if sub.live.Load() {
 108  			select {
 109  			case sub.Events <- evt:
 110  			case <-sub.Context.Done():
 111  			}
 112  		}
 113  
 114  		if added {
 115  			sub.storedwg.Done()
 116  		}
 117  	}()
 118  }
 119  
 120  func (sub *Subscription) dispatchEose() {
 121  	if sub.eosed.CompareAndSwap(false, true) {
 122  		sub.match = sub.Filters.MatchIgnoringTimestampConstraints
 123  		go func() {
 124  			sub.storedwg.Wait()
 125  			sub.EndOfStoredEvents <- struct{}{}
 126  		}()
 127  	}
 128  }
 129  
 130  // handleClosed handles the CLOSED message from a relay.
 131  func (sub *Subscription) handleClosed(reason string) {
 132  	go func() {
 133  		sub.ClosedReason <- reason
 134  		sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay
 135  		sub.unsub(fmt.Errorf("CLOSED received: %s", reason))
 136  	}()
 137  }
 138  
 139  // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
 140  // Unsub() also closes the channel sub.Events and makes a new one.
 141  func (sub *Subscription) Unsub() {
 142  	sub.unsub(errors.New("Unsub() called"))
 143  }
 144  
 145  // unsub is the internal implementation of Unsub.
 146  func (sub *Subscription) unsub(err error) {
 147  	// cancel the context (if it's not canceled already)
 148  	sub.cancel(err)
 149  
 150  	// mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation)
 151  	if sub.live.CompareAndSwap(true, false) {
 152  		sub.Close()
 153  	}
 154  
 155  	// remove subscription from our map
 156  	sub.Relay.Subscriptions.Delete(sub.counter)
 157  }
 158  
 159  // Close just sends a CLOSE message. You probably want Unsub() instead.
 160  func (sub *Subscription) Close() {
 161  	if sub.Relay.IsConnected() {
 162  		closeMsg := CloseEnvelope(sub.id)
 163  		closeb, _ := (&closeMsg).MarshalJSON()
 164  		<-sub.Relay.Write(closeb)
 165  	}
 166  }
 167  
 168  // Sub sets sub.Filters and then calls sub.Fire(ctx).
 169  // The subscription will be closed if the context expires.
 170  func (sub *Subscription) Sub(_ context.Context, filters Filters) {
 171  	sub.Filters = filters
 172  	sub.Fire()
 173  }
 174  
 175  // Fire sends the "REQ" command to the relay.
 176  func (sub *Subscription) Fire() error {
 177  	var reqb []byte
 178  	if sub.countResult == nil {
 179  		reqb, _ = ReqEnvelope{sub.id, sub.Filters}.MarshalJSON()
 180  	} else if len(sub.Filters) == 1 {
 181  		reqb, _ = CountEnvelope{sub.id, sub.Filters[0], nil, nil}.MarshalJSON()
 182  	} else {
 183  		return fmt.Errorf("unexpected sub configuration")
 184  	}
 185  
 186  	sub.live.Store(true)
 187  	if err := <-sub.Relay.Write(reqb); err != nil {
 188  		err := fmt.Errorf("failed to write: %w", err)
 189  		sub.cancel(err)
 190  		return err
 191  	}
 192  
 193  	return nil
 194  }
 195