connection.go raw
1 package archive
2
3 import (
4 "context"
5 "sync"
6 "time"
7
8 "next.orly.dev/pkg/nostr/encoders/event"
9 "next.orly.dev/pkg/nostr/encoders/filter"
10 "next.orly.dev/pkg/nostr/ws"
11 "next.orly.dev/pkg/lol/log"
12 )
13
14 // RelayConnection manages a single archive relay connection.
15 type RelayConnection struct {
16 url string
17 client *ws.Client
18 ctx context.Context
19 cancel context.CancelFunc
20
21 // Connection state
22 mu sync.RWMutex
23 lastConnect time.Time
24 reconnectDelay time.Duration
25 connected bool
26 }
27
28 const (
29 // Initial delay between reconnection attempts
30 initialReconnectDelay = 5 * time.Second
31 // Maximum delay between reconnection attempts
32 maxReconnectDelay = 5 * time.Minute
33 // Connection timeout
34 connectTimeout = 10 * time.Second
35 // Query timeout (per query, not global)
36 queryTimeout = 30 * time.Second
37 )
38
39 // NewRelayConnection creates a new relay connection.
40 func NewRelayConnection(parentCtx context.Context, url string) *RelayConnection {
41 ctx, cancel := context.WithCancel(parentCtx)
42 return &RelayConnection{
43 url: url,
44 ctx: ctx,
45 cancel: cancel,
46 reconnectDelay: initialReconnectDelay,
47 }
48 }
49
50 // Connect establishes a connection to the archive relay.
51 func (rc *RelayConnection) Connect() error {
52 rc.mu.Lock()
53 defer rc.mu.Unlock()
54
55 if rc.connected && rc.client != nil {
56 return nil
57 }
58
59 connectCtx, cancel := context.WithTimeout(rc.ctx, connectTimeout)
60 defer cancel()
61
62 client, err := ws.RelayConnect(connectCtx, rc.url)
63 if err != nil {
64 rc.reconnectDelay = min(rc.reconnectDelay*2, maxReconnectDelay)
65 return err
66 }
67
68 rc.client = client
69 rc.connected = true
70 rc.lastConnect = time.Now()
71 rc.reconnectDelay = initialReconnectDelay
72
73 log.D.F("archive: connected to %s", rc.url)
74
75 return nil
76 }
77
78 // Query executes a query against the archive relay.
79 // Returns a slice of events matching the filter.
80 func (rc *RelayConnection) Query(ctx context.Context, f *filter.F) ([]*event.E, error) {
81 rc.mu.RLock()
82 client := rc.client
83 connected := rc.connected
84 rc.mu.RUnlock()
85
86 if !connected || client == nil {
87 if err := rc.Connect(); err != nil {
88 return nil, err
89 }
90 rc.mu.RLock()
91 client = rc.client
92 rc.mu.RUnlock()
93 }
94
95 // Create query context with timeout
96 queryCtx, cancel := context.WithTimeout(ctx, queryTimeout)
97 defer cancel()
98
99 // Subscribe to the filter
100 sub, err := client.Subscribe(queryCtx, filter.NewS(f))
101 if err != nil {
102 rc.handleDisconnection()
103 return nil, err
104 }
105 defer sub.Unsub()
106
107 // Collect events until EOSE or timeout
108 var events []*event.E
109
110 for {
111 select {
112 case <-queryCtx.Done():
113 return events, nil
114 case <-sub.EndOfStoredEvents:
115 return events, nil
116 case ev := <-sub.Events:
117 if ev == nil {
118 return events, nil
119 }
120 events = append(events, ev)
121 }
122 }
123 }
124
125 // handleDisconnection marks the connection as disconnected.
126 func (rc *RelayConnection) handleDisconnection() {
127 rc.mu.Lock()
128 defer rc.mu.Unlock()
129
130 rc.connected = false
131 if rc.client != nil {
132 rc.client.Close()
133 rc.client = nil
134 }
135 }
136
137 // IsConnected returns whether the relay is currently connected.
138 func (rc *RelayConnection) IsConnected() bool {
139 rc.mu.RLock()
140 defer rc.mu.RUnlock()
141
142 if !rc.connected || rc.client == nil {
143 return false
144 }
145
146 // Check if client is still connected
147 return rc.client.IsConnected()
148 }
149
150 // Close closes the relay connection.
151 func (rc *RelayConnection) Close() {
152 rc.cancel()
153
154 rc.mu.Lock()
155 defer rc.mu.Unlock()
156
157 rc.connected = false
158 if rc.client != nil {
159 rc.client.Close()
160 rc.client = nil
161 }
162 }
163
164 // URL returns the relay URL.
165 func (rc *RelayConnection) URL() string {
166 return rc.url
167 }
168
169 // min returns the smaller of two durations.
170 func min(a, b time.Duration) time.Duration {
171 if a < b {
172 return a
173 }
174 return b
175 }
176