1 package ws
2 3 import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "sync/atomic"
9 10 "next.orly.dev/pkg/nostr/encoders/envelopes/closeenvelope"
11 "next.orly.dev/pkg/nostr/encoders/envelopes/reqenvelope"
12 "next.orly.dev/pkg/nostr/encoders/event"
13 "next.orly.dev/pkg/nostr/encoders/filter"
14 "next.orly.dev/pkg/nostr/encoders/timestamp"
15 "next.orly.dev/pkg/lol/chk"
16 "next.orly.dev/pkg/lol/log"
17 )
18 19 type ReplaceableKey struct {
20 PubKey string
21 D string
22 }
23 24 // Subscription represents a subscription to a relay.
25 type Subscription struct {
26 counter int64
27 id []byte
28 29 Client *Client
30 Filters *filter.S
31 32 // the Events channel emits all EVENTs that come in a Subscription
33 // will be closed when the subscription ends
34 Events chan *event.E
35 mu sync.Mutex
36 37 // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
38 EndOfStoredEvents chan struct{}
39 40 // the ClosedReason channel emits the reason when a CLOSED message is received
41 ClosedReason chan string
42 43 // Context will be .Done() when the subscription ends
44 Context context.Context
45 46 // if it is not nil, checkDuplicate will be called for every event received
47 // if it returns true that event will not be processed further.
48 checkDuplicate func(id string, relay string) bool
49 50 // if it is not nil, checkDuplicateReplaceable will be called for every event received
51 // if it returns true that event will not be processed further.
52 checkDuplicateReplaceable func(rk ReplaceableKey, ts *timestamp.T) bool
53 54 match func(*event.E) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
55 live atomic.Bool
56 eosed atomic.Bool
57 cancel context.CancelCauseFunc
58 59 // this keeps track of the events we've received before the EOSE that we must dispatch before
60 // closing the EndOfStoredEvents channel
61 storedwg sync.WaitGroup
62 }
63 64 // SubscriptionOption is the type of the argument passed when instantiating relay connections.
65 // Some examples are WithLabel.
66 type SubscriptionOption interface {
67 IsSubscriptionOption()
68 }
69 70 // WithLabel puts a label on the subscription (it is prepended to the automatic id) that is sent to relays.
71 type WithLabel string
72 73 func (_ WithLabel) IsSubscriptionOption() {}
74 75 // WithCheckDuplicate sets checkDuplicate on the subscription
76 type WithCheckDuplicate func(id, relay string) bool
77 78 func (_ WithCheckDuplicate) IsSubscriptionOption() {}
79 80 // WithCheckDuplicateReplaceable sets checkDuplicateReplaceable on the subscription
81 type WithCheckDuplicateReplaceable func(rk ReplaceableKey, ts *timestamp.T) bool
82 83 func (_ WithCheckDuplicateReplaceable) IsSubscriptionOption() {}
84 85 var (
86 _ SubscriptionOption = (WithLabel)("")
87 _ SubscriptionOption = (WithCheckDuplicate)(nil)
88 _ SubscriptionOption = (WithCheckDuplicateReplaceable)(nil)
89 )
90 91 func (sub *Subscription) start() {
92 // Wait for the context to be done instead of blocking immediately
93 // This allows the subscription to receive events before terminating
94 sub.live.Store(true)
95 // debug: log start of subscription goroutine
96 log.T.F("WS.Subscription.start: started id=%s", sub.GetID())
97 <-sub.Context.Done()
98 // the subscription ends once the context is canceled (if not already)
99 log.T.F("WS.Subscription.start: context done for id=%s", sub.GetID())
100 sub.unsub(errors.New("context done on start()")) // this will set sub.live to false
101 // do this so we don't have the possibility of closing the Events channel and then trying to send to it
102 sub.mu.Lock()
103 close(sub.Events)
104 sub.mu.Unlock()
105 }
106 107 // GetID returns the subscription ID.
108 func (sub *Subscription) GetID() string { return string(sub.id) }
109 110 func (sub *Subscription) dispatchEvent(evt *event.E) {
111 added := false
112 if !sub.eosed.Load() {
113 sub.storedwg.Add(1)
114 added = true
115 }
116 go func() {
117 sub.mu.Lock()
118 defer sub.mu.Unlock()
119 120 if sub.live.Load() {
121 select {
122 case sub.Events <- evt:
123 case <-sub.Context.Done():
124 }
125 }
126 if added {
127 sub.storedwg.Done()
128 }
129 }()
130 }
131 132 func (sub *Subscription) dispatchEose() {
133 if sub.eosed.CompareAndSwap(false, true) {
134 sub.match = sub.Filters.MatchIgnoringTimestampConstraints
135 go func() {
136 sub.storedwg.Wait()
137 sub.EndOfStoredEvents <- struct{}{}
138 }()
139 }
140 }
141 142 // handleClosed handles the CLOSED message from a relay.
143 func (sub *Subscription) handleClosed(reason string) {
144 go func() {
145 sub.ClosedReason <- reason
146 sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay
147 sub.unsub(fmt.Errorf("CLOSED received: %s", reason))
148 }()
149 }
150 151 // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
152 // Unsub() also closes the channel sub.Events and makes a new one.
153 func (sub *Subscription) Unsub() {
154 sub.unsub(errors.New("Unsub() called"))
155 }
156 157 // unsub is the internal implementation of Unsub.
158 func (sub *Subscription) unsub(err error) {
159 // cancel the context (if it's not canceled already)
160 sub.cancel(err)
161 // mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation)
162 if sub.live.CompareAndSwap(true, false) {
163 sub.Close()
164 }
165 // remove subscription from our map
166 sub.Client.Subscriptions.Delete(string(sub.id))
167 }
168 169 // Close just sends a CLOSE message. You probably want Unsub() instead.
170 func (sub *Subscription) Close() {
171 if sub.Client.IsConnected() {
172 closeMsg := closeenvelope.NewFrom(sub.id)
173 closeb := closeMsg.Marshal(nil)
174 log.T.F(
175 "WS.Subscription.Close: outbound CLOSE to %s: %s", sub.Client.URL,
176 string(closeb),
177 )
178 <-sub.Client.Write(closeb)
179 }
180 }
181 182 // Sub sets sub.Filters and then calls sub.Fire(ctx).
183 // The subscription will be closed if the context expires.
184 func (sub *Subscription) Sub(_ context.Context, ff *filter.S) {
185 sub.Filters = ff
186 chk.E(sub.Fire())
187 }
188 189 // Fire sends the "REQ" command to the relay.
190 func (sub *Subscription) Fire() (err error) {
191 var reqb []byte
192 reqb = reqenvelope.NewFrom(sub.id, sub.Filters).Marshal(nil)
193 sub.live.Store(true)
194 log.T.F(
195 "WS.Subscription.Fire: sending REQ id=%s filters=%d bytes=%d",
196 sub.GetID(), len(*sub.Filters), len(reqb),
197 )
198 log.T.F(
199 "WS.Subscription.Fire: outbound REQ to %s: %s", sub.Client.URL,
200 string(reqb),
201 )
202 if err = <-sub.Client.Write(reqb); err != nil {
203 err = fmt.Errorf("failed to write: %w", err)
204 log.T.F(
205 "WS.Subscription.Fire: write failed id=%s: %v", sub.GetID(), err,
206 )
207 sub.cancel(err)
208 return
209 }
210 log.T.F("WS.Subscription.Fire: write ok id=%s", sub.GetID())
211 return
212 }
213