relay.go raw
1 package bridge
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "next.orly.dev/pkg/nostr/encoders/event"
11 "next.orly.dev/pkg/nostr/encoders/filter"
12 "next.orly.dev/pkg/nostr/interfaces/signer"
13 "next.orly.dev/pkg/nostr/ws"
14 "next.orly.dev/pkg/lol/log"
15 )
16
17 // RelayConn wraps a WebSocket relay connection with auto-reconnect.
18 // It satisfies the RelayConnection interface used by the bridge's Marmot
19 // client for standalone mode (connecting to an external relay).
20 type RelayConn struct {
21 url string
22 sign signer.I
23 conn *ws.Client
24 mu sync.RWMutex
25 ctx context.Context
26 cancel context.CancelFunc
27 authed bool
28 }
29
30 // NewRelayConn creates a new relay connection wrapper.
31 // The signer is used for NIP-42 authentication when the relay requires it.
32 func NewRelayConn(url string, sign signer.I) *RelayConn {
33 return &RelayConn{url: url, sign: sign}
34 }
35
36 // Connect establishes the WebSocket connection to the relay and
37 // pre-authenticates via NIP-42 so that subscriptions have proper access.
38 // In monolithic mode, the relay may not be listening yet, so Connect
39 // retries with exponential backoff for up to 30 seconds.
40 func (rc *RelayConn) Connect(ctx context.Context) error {
41 rc.ctx, rc.cancel = context.WithCancel(ctx)
42
43 delay := time.Second
44 maxDelay := 10 * time.Second
45 timeout := 5 * time.Minute
46 deadline := time.Now().Add(timeout)
47
48 var conn *ws.Client
49 var err error
50
51 for {
52 conn, err = ws.RelayConnect(rc.ctx, rc.url)
53 if err == nil {
54 break
55 }
56
57 if time.Now().After(deadline) {
58 return fmt.Errorf("connect to relay %s after %v: %w", rc.url, timeout, err)
59 }
60
61 log.D.F("bridge waiting for relay %s: %v (retrying in %v)", rc.url, err, delay)
62
63 select {
64 case <-time.After(delay):
65 if delay < maxDelay {
66 delay *= 2
67 }
68 case <-rc.ctx.Done():
69 return fmt.Errorf("connect to relay %s: %w", rc.url, rc.ctx.Err())
70 }
71 }
72
73 rc.mu.Lock()
74 rc.conn = conn
75 rc.authed = false
76 rc.mu.Unlock()
77
78 log.I.F("bridge connected to relay: %s", rc.url)
79
80 // Pre-authenticate so subscriptions get proper access level
81 if rc.sign != nil {
82 if err := rc.preAuth(conn); err != nil {
83 log.W.F("bridge pre-auth failed: %v (will retry on publish)", err)
84 }
85 }
86
87 return nil
88 }
89
90 // preAuth waits briefly for the relay's AUTH challenge, then authenticates.
91 func (rc *RelayConn) preAuth(conn *ws.Client) error {
92 // Give the relay time to send the AUTH challenge
93 time.Sleep(200 * time.Millisecond)
94
95 if err := conn.Auth(rc.ctx, rc.sign); err != nil {
96 return fmt.Errorf("auth: %w", err)
97 }
98
99 rc.mu.Lock()
100 rc.authed = true
101 rc.mu.Unlock()
102
103 log.I.F("bridge pre-authenticated with relay")
104 return nil
105 }
106
107 // Reconnect attempts to reconnect with exponential backoff.
108 func (rc *RelayConn) Reconnect() error {
109 delay := time.Second
110 maxDelay := 30 * time.Second
111
112 for {
113 select {
114 case <-rc.ctx.Done():
115 return rc.ctx.Err()
116 default:
117 }
118
119 conn, err := ws.RelayConnect(rc.ctx, rc.url)
120 if err == nil {
121 rc.mu.Lock()
122 rc.conn = conn
123 rc.authed = false
124 rc.mu.Unlock()
125 log.I.F("bridge reconnected to relay: %s", rc.url)
126
127 // Pre-authenticate after reconnect
128 if rc.sign != nil {
129 if err := rc.preAuth(conn); err != nil {
130 log.W.F("bridge pre-auth after reconnect failed: %v", err)
131 }
132 }
133
134 return nil
135 }
136
137 log.W.F("bridge relay reconnect failed: %v, retrying in %v", err, delay)
138 select {
139 case <-time.After(delay):
140 if delay < maxDelay {
141 delay *= 2
142 }
143 case <-rc.ctx.Done():
144 return rc.ctx.Err()
145 }
146 }
147 }
148
149 // Publish sends an event to the relay. If the relay responds with
150 // auth-required, the bridge authenticates via NIP-42 and retries once.
151 func (rc *RelayConn) Publish(ctx context.Context, ev *event.E) error {
152 rc.mu.RLock()
153 conn := rc.conn
154 rc.mu.RUnlock()
155
156 if conn == nil {
157 return fmt.Errorf("not connected to relay")
158 }
159
160 err := conn.Publish(ctx, ev)
161 if err == nil {
162 return nil
163 }
164
165 // Check if the error is auth-required
166 if !strings.Contains(err.Error(), "auth-required") {
167 return err
168 }
169
170 // Authenticate and retry
171 if rc.sign == nil {
172 return fmt.Errorf("auth required but no signer configured")
173 }
174
175 log.D.F("relay requires auth, authenticating...")
176
177 // Give the relay a moment to send the challenge
178 time.Sleep(100 * time.Millisecond)
179
180 if authErr := conn.Auth(ctx, rc.sign); authErr != nil {
181 return fmt.Errorf("auth failed: %w", authErr)
182 }
183
184 rc.mu.Lock()
185 rc.authed = true
186 rc.mu.Unlock()
187
188 log.I.F("bridge authenticated with relay")
189
190 // Retry the publish
191 return conn.Publish(ctx, ev)
192 }
193
194 // Subscribe creates a subscription on the relay and returns a stream of events.
195 func (rc *RelayConn) Subscribe(ctx context.Context, ff *filter.S) (*WsEventStream, error) {
196 rc.mu.RLock()
197 conn := rc.conn
198 rc.mu.RUnlock()
199
200 if conn == nil {
201 return nil, fmt.Errorf("not connected to relay")
202 }
203
204 sub, err := conn.Subscribe(ctx, ff)
205 if err != nil {
206 return nil, err
207 }
208
209 return &WsEventStream{sub: sub}, nil
210 }
211
212 // Close closes the relay connection.
213 func (rc *RelayConn) Close() {
214 if rc.cancel != nil {
215 rc.cancel()
216 }
217 rc.mu.Lock()
218 if rc.conn != nil {
219 rc.conn.Close()
220 rc.conn = nil
221 }
222 rc.mu.Unlock()
223 }
224
225 // WsEventStream wraps a ws.Subscription to deliver events.
226 type WsEventStream struct {
227 sub *ws.Subscription
228 }
229
230 // Events returns the channel of events.
231 func (s *WsEventStream) Events() <-chan *event.E {
232 return s.sub.Events
233 }
234
235 // Close unsubscribes from the relay.
236 func (s *WsEventStream) Close() {
237 s.sub.Unsub()
238 }
239