client.mx raw
1 package ws
2
3 import (
4 "fmt"
5 "sync"
6
7 "smesh.lol/pkg/nostr/envelope"
8 "smesh.lol/pkg/nostr/event"
9 "smesh.lol/pkg/nostr/filter"
10 "smesh.lol/pkg/nostr/text"
11 )
12
13 // Client is a Nostr relay client.
14 type Client struct {
15 ws *Conn
16 url string
17 subs map[string]*Sub
18 mu sync.Mutex
19 idN int64
20 done chan struct{}
21 OKs chan *envelope.OK // buffered channel for OK responses
22 Err error // set when the read loop exits
23 }
24
25 // Sub is an active subscription.
26 type Sub struct {
27 ID string
28 Events chan *event.E
29 EOSE chan struct{}
30 eosed bool
31 }
32
33 // Connect dials a Nostr relay. Call RunReadLoop() to start processing
34 // incoming messages (blocks until the connection closes).
35 func Connect(rawURL string) (*Client, error) {
36 ws, err := Dial(rawURL)
37 if err != nil {
38 return nil, err
39 }
40 c := &Client{
41 ws: ws,
42 url: rawURL,
43 subs: map[string]*Sub{},
44 done: chan struct{}{},
45 OKs: chan *envelope.OK{64},
46 }
47 return c, nil
48 }
49
50 // RunReadLoop reads messages from the relay and dispatches them to
51 // subscriptions. Blocks until the connection closes.
52 func (c *Client) RunReadLoop() {
53 defer close(c.done)
54 for {
55 op, payload, err := c.ws.ReadMessage()
56 if err != nil {
57 c.Err = err
58 return
59 }
60 if op == OpClose {
61 return
62 }
63 if op != OpText {
64 continue
65 }
66 c.dispatch(payload)
67 }
68 }
69
70 // Subscribe sends a REQ and returns a subscription that delivers events.
71 func (c *Client) Subscribe(filters ...*filter.F) (*Sub, error) {
72 c.mu.Lock()
73 c.idN++
74 id := string(append([]byte(nil), fmt.Sprintf("s%d", c.idN)...))
75 sub := &Sub{
76 ID: id,
77 Events: chan *event.E{256},
78 EOSE: chan struct{}{1},
79 }
80 c.subs[id] = sub
81 c.mu.Unlock()
82
83 fs := filter.S(filters)
84 req := &envelope.Req{
85 Subscription: []byte(id),
86 Filters: &fs,
87 }
88 if err := c.ws.WriteText(req.Marshal(nil)); err != nil {
89 c.mu.Lock()
90 delete(c.subs, id)
91 c.mu.Unlock()
92 return nil, err
93 }
94 return sub, nil
95 }
96
97 // Publish sends an event to the relay.
98 func (c *Client) Publish(ev *event.E) error {
99 es := &envelope.EventSubmission{E: ev}
100 return c.ws.WriteText(es.Marshal(nil))
101 }
102
103 // Unsubscribe sends CLOSE and removes the subscription.
104 func (c *Client) Unsubscribe(sub *Sub) error {
105 cl := &envelope.Close{ID: []byte(sub.ID)}
106 if err := c.ws.WriteText(cl.Marshal(nil)); err != nil {
107 return err
108 }
109 c.mu.Lock()
110 delete(c.subs, sub.ID)
111 c.mu.Unlock()
112 return nil
113 }
114
115 // Close shuts down the client.
116 func (c *Client) Close() error {
117 c.mu.Lock()
118 for _, sub := range c.subs {
119 close(sub.Events)
120 }
121 c.subs = nil
122 c.mu.Unlock()
123 return c.ws.Close()
124 }
125
126 // Done returns a channel closed when the read loop exits.
127 func (c *Client) Done() <-chan struct{} { return c.done }
128
129 func (c *Client) dispatch(msg []byte) {
130 label, rem, err := envelope.Identify(msg)
131 if err != nil {
132 return
133 }
134 switch label {
135 case envelope.EventLabel:
136 c.handleEvent(rem)
137 case envelope.EOSELabel:
138 c.handleEOSE(rem)
139 case envelope.OKLabel:
140 c.handleOK(rem)
141 case envelope.ClosedLabel:
142 c.handleClosed(rem)
143 }
144 }
145
146 func (c *Client) handleEvent(rem []byte) {
147 subID, rem, err := text.UnmarshalQuoted(rem)
148 if err != nil {
149 return
150 }
151 // skip comma
152 for len(rem) > 0 && (rem[0] == ',' || rem[0] == ' ') {
153 rem = rem[1:]
154 }
155 ev := event.New()
156 if _, err = ev.Unmarshal(rem); err != nil {
157 return
158 }
159 c.mu.Lock()
160 sub := c.subs[string(subID)]
161 c.mu.Unlock()
162 if sub != nil {
163 select {
164 case sub.Events <- ev:
165 default:
166 }
167 }
168 }
169
170 func (c *Client) handleEOSE(rem []byte) {
171 subID, _, err := text.UnmarshalQuoted(rem)
172 if err != nil {
173 return
174 }
175 c.mu.Lock()
176 sub := c.subs[string(subID)]
177 c.mu.Unlock()
178 if sub != nil && !sub.eosed {
179 sub.eosed = true
180 select {
181 case sub.EOSE <- struct{}{}:
182 default:
183 }
184 }
185 }
186
187 func (c *Client) handleOK(rem []byte) {
188 var ok envelope.OK
189 if _, err := ok.Unmarshal(rem); err != nil {
190 return
191 }
192 select {
193 case c.OKs <- &ok:
194 default:
195 }
196 }
197
198 func (c *Client) handleClosed(rem []byte) {
199 subID, _, err := text.UnmarshalQuoted(rem)
200 if err != nil {
201 return
202 }
203 c.mu.Lock()
204 sub := c.subs[string(subID)]
205 if sub != nil {
206 delete(c.subs, string(subID))
207 }
208 c.mu.Unlock()
209 if sub != nil {
210 close(sub.Events)
211 }
212 }
213