1 package nostr
2 3 import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "sync/atomic"
9 )
10 11 // Subscription represents a subscription to a relay.
12 type Subscription struct {
13 counter int64
14 id string
15 16 Relay *Relay
17 Filters Filters
18 19 // for this to be treated as a COUNT and not a REQ this must be set
20 countResult chan CountEnvelope
21 22 // the Events channel emits all EVENTs that come in a Subscription
23 // will be closed when the subscription ends
24 Events chan *Event
25 mu sync.Mutex
26 27 // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
28 EndOfStoredEvents chan struct{}
29 30 // the ClosedReason channel emits the reason when a CLOSED message is received
31 ClosedReason chan string
32 33 // Context will be .Done() when the subscription ends
34 Context context.Context
35 36 // if it is not nil, checkDuplicate will be called for every event received
37 // if it returns true that event will not be processed further.
38 checkDuplicate func(id string, relay string) bool
39 40 // if it is not nil, checkDuplicateReplaceable will be called for every event received
41 // if it returns true that event will not be processed further.
42 checkDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
43 44 match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
45 live atomic.Bool
46 eosed atomic.Bool
47 cancel context.CancelCauseFunc
48 49 // this keeps track of the events we've received before the EOSE that we must dispatch before
50 // closing the EndOfStoredEvents channel
51 storedwg sync.WaitGroup
52 }
53 54 // SubscriptionOption is the type of the argument passed when instantiating relay connections.
55 // Some examples are WithLabel.
56 type SubscriptionOption interface {
57 IsSubscriptionOption()
58 }
59 60 // WithLabel puts a label on the subscription (it is prepended to the automatic id) that is sent to relays.
61 type WithLabel string
62 63 func (_ WithLabel) IsSubscriptionOption() {}
64 65 // WithCheckDuplicate sets checkDuplicate on the subscription
66 type WithCheckDuplicate func(id, relay string) bool
67 68 func (_ WithCheckDuplicate) IsSubscriptionOption() {}
69 70 // WithCheckDuplicateReplaceable sets checkDuplicateReplaceable on the subscription
71 type WithCheckDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
72 73 func (_ WithCheckDuplicateReplaceable) IsSubscriptionOption() {}
74 75 var (
76 _ SubscriptionOption = (WithLabel)("")
77 _ SubscriptionOption = (WithCheckDuplicate)(nil)
78 _ SubscriptionOption = (WithCheckDuplicateReplaceable)(nil)
79 )
80 81 func (sub *Subscription) start() {
82 <-sub.Context.Done()
83 84 // the subscription ends once the context is canceled (if not already)
85 sub.unsub(errors.New("context done on start()")) // this will set sub.live to false
86 87 // do this so we don't have the possibility of closing the Events channel and then trying to send to it
88 sub.mu.Lock()
89 close(sub.Events)
90 sub.mu.Unlock()
91 }
92 93 // GetID returns the subscription ID.
94 func (sub *Subscription) GetID() string { return sub.id }
95 96 func (sub *Subscription) dispatchEvent(evt *Event) {
97 added := false
98 if !sub.eosed.Load() {
99 sub.storedwg.Add(1)
100 added = true
101 }
102 103 go func() {
104 sub.mu.Lock()
105 defer sub.mu.Unlock()
106 107 if sub.live.Load() {
108 select {
109 case sub.Events <- evt:
110 case <-sub.Context.Done():
111 }
112 }
113 114 if added {
115 sub.storedwg.Done()
116 }
117 }()
118 }
119 120 func (sub *Subscription) dispatchEose() {
121 if sub.eosed.CompareAndSwap(false, true) {
122 sub.match = sub.Filters.MatchIgnoringTimestampConstraints
123 go func() {
124 sub.storedwg.Wait()
125 sub.EndOfStoredEvents <- struct{}{}
126 }()
127 }
128 }
129 130 // handleClosed handles the CLOSED message from a relay.
131 func (sub *Subscription) handleClosed(reason string) {
132 go func() {
133 sub.ClosedReason <- reason
134 sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay
135 sub.unsub(fmt.Errorf("CLOSED received: %s", reason))
136 }()
137 }
138 139 // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
140 // Unsub() also closes the channel sub.Events and makes a new one.
141 func (sub *Subscription) Unsub() {
142 sub.unsub(errors.New("Unsub() called"))
143 }
144 145 // unsub is the internal implementation of Unsub.
146 func (sub *Subscription) unsub(err error) {
147 // cancel the context (if it's not canceled already)
148 sub.cancel(err)
149 150 // mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation)
151 if sub.live.CompareAndSwap(true, false) {
152 sub.Close()
153 }
154 155 // remove subscription from our map
156 sub.Relay.Subscriptions.Delete(sub.counter)
157 }
158 159 // Close just sends a CLOSE message. You probably want Unsub() instead.
160 func (sub *Subscription) Close() {
161 if sub.Relay.IsConnected() {
162 closeMsg := CloseEnvelope(sub.id)
163 closeb, _ := (&closeMsg).MarshalJSON()
164 <-sub.Relay.Write(closeb)
165 }
166 }
167 168 // Sub sets sub.Filters and then calls sub.Fire(ctx).
169 // The subscription will be closed if the context expires.
170 func (sub *Subscription) Sub(_ context.Context, filters Filters) {
171 sub.Filters = filters
172 sub.Fire()
173 }
174 175 // Fire sends the "REQ" command to the relay.
176 func (sub *Subscription) Fire() error {
177 var reqb []byte
178 if sub.countResult == nil {
179 reqb, _ = ReqEnvelope{sub.id, sub.Filters}.MarshalJSON()
180 } else if len(sub.Filters) == 1 {
181 reqb, _ = CountEnvelope{sub.id, sub.Filters[0], nil, nil}.MarshalJSON()
182 } else {
183 return fmt.Errorf("unexpected sub configuration")
184 }
185 186 sub.live.Store(true)
187 if err := <-sub.Relay.Write(reqb); err != nil {
188 err := fmt.Errorf("failed to write: %w", err)
189 sub.cancel(err)
190 return err
191 }
192 193 return nil
194 }
195