main.go raw
1 package main
2
3 import (
4 "context"
5 "encoding/json"
6 "flag"
7 "fmt"
8 "log"
9 "os"
10 "os/signal"
11 "sync/atomic"
12 "syscall"
13 "time"
14
15 "github.com/gorilla/websocket"
16 "next.orly.dev/pkg/interfaces/neterr"
17 )
18
19 var (
20 relayURL = flag.String("url", "ws://localhost:3334", "Relay WebSocket URL")
21 duration = flag.Int("duration", 60, "Test duration in seconds")
22 eventKind = flag.Int("kind", 1, "Event kind to subscribe to")
23 verbose = flag.Bool("v", false, "Verbose output")
24 subID = flag.String("sub", "", "Subscription ID (default: auto-generated)")
25 )
26
27 type NostrEvent struct {
28 ID string `json:"id"`
29 PubKey string `json:"pubkey"`
30 CreatedAt int64 `json:"created_at"`
31 Kind int `json:"kind"`
32 Tags [][]string `json:"tags"`
33 Content string `json:"content"`
34 Sig string `json:"sig"`
35 }
36
37 func main() {
38 flag.Parse()
39
40 log.SetFlags(log.Ltime | log.Lmicroseconds)
41
42 // Generate subscription ID if not provided
43 subscriptionID := *subID
44 if subscriptionID == "" {
45 subscriptionID = fmt.Sprintf("test-%d", time.Now().Unix())
46 }
47
48 log.Printf("Starting subscription stability test")
49 log.Printf("Relay: %s", *relayURL)
50 log.Printf("Duration: %d seconds", *duration)
51 log.Printf("Event kind: %d", *eventKind)
52 log.Printf("Subscription ID: %s", subscriptionID)
53 log.Println()
54
55 // Connect to relay
56 log.Printf("Connecting to %s...", *relayURL)
57 conn, _, err := websocket.DefaultDialer.Dial(*relayURL, nil)
58 if err != nil {
59 log.Fatalf("Failed to connect: %v", err)
60 }
61 defer conn.Close()
62 log.Printf("✓ Connected")
63 log.Println()
64
65 // Context for the test
66 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*duration+10)*time.Second)
67 defer cancel()
68
69 // Handle interrupts
70 sigChan := make(chan os.Signal, 1)
71 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
72 go func() {
73 <-sigChan
74 log.Println("\nInterrupted, shutting down...")
75 cancel()
76 }()
77
78 // Counters
79 var receivedCount atomic.Int64
80 var lastEventTime atomic.Int64
81 lastEventTime.Store(time.Now().Unix())
82
83 // Subscribe
84 reqMsg := map[string]interface{}{
85 "kinds": []int{*eventKind},
86 }
87 reqMsgBytes, _ := json.Marshal(reqMsg)
88 subscribeMsg := []interface{}{"REQ", subscriptionID, json.RawMessage(reqMsgBytes)}
89 subscribeMsgBytes, _ := json.Marshal(subscribeMsg)
90
91 log.Printf("Sending REQ: %s", string(subscribeMsgBytes))
92 if err := conn.WriteMessage(websocket.TextMessage, subscribeMsgBytes); err != nil {
93 log.Fatalf("Failed to send REQ: %v", err)
94 }
95
96 // Read messages
97 gotEOSE := false
98 readDone := make(chan struct{})
99 consecutiveTimeouts := 0
100 maxConsecutiveTimeouts := 20 // Exit if we get too many consecutive timeouts
101
102 go func() {
103 defer close(readDone)
104
105 for {
106 select {
107 case <-ctx.Done():
108 return
109 default:
110 }
111
112 conn.SetReadDeadline(time.Now().Add(5 * time.Second))
113 _, msg, err := conn.ReadMessage()
114 if err != nil {
115 // Check for normal close
116 if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
117 log.Println("Connection closed normally")
118 return
119 }
120
121 // Check if context was cancelled
122 if ctx.Err() != nil {
123 return
124 }
125
126 // Check for timeout errors (these are expected during idle periods)
127 if netErr, ok := err.(neterr.TimeoutError); ok && netErr.Timeout() {
128 consecutiveTimeouts++
129 if consecutiveTimeouts >= maxConsecutiveTimeouts {
130 log.Printf("Too many consecutive read timeouts (%d), connection may be dead", consecutiveTimeouts)
131 return
132 }
133 // Only log every 5th timeout to avoid spam
134 if *verbose && consecutiveTimeouts%5 == 0 {
135 log.Printf("Read timeout (idle period, %d consecutive)", consecutiveTimeouts)
136 }
137 continue
138 }
139
140 // For any other error, log and exit
141 log.Printf("Read error: %v", err)
142 return
143 }
144
145 // Reset timeout counter on successful read
146 consecutiveTimeouts = 0
147
148 // Parse message
149 var envelope []json.RawMessage
150 if err := json.Unmarshal(msg, &envelope); err != nil {
151 if *verbose {
152 log.Printf("Failed to parse message: %v", err)
153 }
154 continue
155 }
156
157 if len(envelope) < 2 {
158 continue
159 }
160
161 var msgType string
162 json.Unmarshal(envelope[0], &msgType)
163
164 // Check message type
165 switch msgType {
166 case "EOSE":
167 var recvSubID string
168 json.Unmarshal(envelope[1], &recvSubID)
169 if recvSubID == subscriptionID {
170 if !gotEOSE {
171 gotEOSE = true
172 log.Printf("✓ Received EOSE - subscription is active")
173 log.Println()
174 log.Println("Waiting for real-time events...")
175 log.Println()
176 }
177 }
178
179 case "EVENT":
180 var recvSubID string
181 json.Unmarshal(envelope[1], &recvSubID)
182 if recvSubID == subscriptionID {
183 var event NostrEvent
184 if err := json.Unmarshal(envelope[2], &event); err == nil {
185 count := receivedCount.Add(1)
186 lastEventTime.Store(time.Now().Unix())
187
188 eventIDShort := event.ID
189 if len(eventIDShort) > 8 {
190 eventIDShort = eventIDShort[:8]
191 }
192
193 log.Printf("[EVENT #%d] id=%s kind=%d created=%d",
194 count, eventIDShort, event.Kind, event.CreatedAt)
195
196 if *verbose {
197 log.Printf(" content: %s", event.Content)
198 }
199 }
200 }
201
202 case "NOTICE":
203 var notice string
204 json.Unmarshal(envelope[1], ¬ice)
205 log.Printf("[NOTICE] %s", notice)
206
207 case "CLOSED":
208 var recvSubID, reason string
209 json.Unmarshal(envelope[1], &recvSubID)
210 if len(envelope) > 2 {
211 json.Unmarshal(envelope[2], &reason)
212 }
213 if recvSubID == subscriptionID {
214 log.Printf("⚠ Subscription CLOSED by relay: %s", reason)
215 cancel()
216 return
217 }
218
219 case "OK":
220 // Ignore OK messages for this test
221
222 default:
223 if *verbose {
224 log.Printf("Unknown message type: %s", msgType)
225 }
226 }
227 }
228 }()
229
230 // Wait for EOSE with timeout
231 eoseTimeout := time.After(10 * time.Second)
232 for !gotEOSE {
233 select {
234 case <-eoseTimeout:
235 log.Printf("⚠ Warning: No EOSE received within 10 seconds")
236 gotEOSE = true // Continue anyway
237 case <-ctx.Done():
238 log.Println("Test cancelled before EOSE")
239 return
240 case <-time.After(100 * time.Millisecond):
241 // Keep waiting
242 }
243 }
244
245 // Monitor for subscription drops
246 startTime := time.Now()
247 endTime := startTime.Add(time.Duration(*duration) * time.Second)
248
249 // Start monitoring goroutine
250 go func() {
251 ticker := time.NewTicker(5 * time.Second)
252 defer ticker.Stop()
253
254 for {
255 select {
256 case <-ctx.Done():
257 return
258 case <-ticker.C:
259 elapsed := time.Since(startTime).Seconds()
260 lastEvent := lastEventTime.Load()
261 timeSinceLastEvent := time.Now().Unix() - lastEvent
262
263 log.Printf("[STATUS] Elapsed: %.0fs/%ds | Events: %d | Last event: %ds ago",
264 elapsed, *duration, receivedCount.Load(), timeSinceLastEvent)
265
266 // Warn if no events for a while (but only if we've seen events before)
267 if receivedCount.Load() > 0 && timeSinceLastEvent > 30 {
268 log.Printf("⚠ Warning: No events received for %ds - subscription may have dropped", timeSinceLastEvent)
269 }
270 }
271 }
272 }()
273
274 // Wait for test duration
275 log.Printf("Test running for %d seconds...", *duration)
276 log.Println("(You can publish events to the relay in another terminal)")
277 log.Println()
278
279 select {
280 case <-ctx.Done():
281 // Test completed or interrupted
282 case <-time.After(time.Until(endTime)):
283 // Duration elapsed
284 }
285
286 // Wait a bit for final events
287 time.Sleep(2 * time.Second)
288 cancel()
289
290 // Wait for reader to finish
291 select {
292 case <-readDone:
293 case <-time.After(5 * time.Second):
294 log.Println("Reader goroutine didn't exit cleanly")
295 }
296
297 // Send CLOSE
298 closeMsg := []interface{}{"CLOSE", subscriptionID}
299 closeMsgBytes, _ := json.Marshal(closeMsg)
300 conn.WriteMessage(websocket.TextMessage, closeMsgBytes)
301
302 // Print results
303 log.Println()
304 log.Println("===================================")
305 log.Println("Test Results")
306 log.Println("===================================")
307 log.Printf("Duration: %.1f seconds", time.Since(startTime).Seconds())
308 log.Printf("Events received: %d", receivedCount.Load())
309 log.Printf("Subscription ID: %s", subscriptionID)
310
311 lastEvent := lastEventTime.Load()
312 if lastEvent > startTime.Unix() {
313 log.Printf("Last event: %ds ago", time.Now().Unix()-lastEvent)
314 }
315
316 log.Println()
317
318 // Determine pass/fail
319 received := receivedCount.Load()
320 testDuration := time.Since(startTime).Seconds()
321
322 if received == 0 {
323 log.Println("⚠ No events received during test")
324 log.Println("This is expected if no events were published")
325 log.Println("To test properly, publish events while this is running:")
326 log.Println()
327 log.Println(" # In another terminal:")
328 log.Printf(" ./orly # Make sure relay is running\n")
329 log.Println()
330 log.Println(" # Then publish test events (implementation-specific)")
331 } else {
332 eventsPerSecond := float64(received) / testDuration
333 log.Printf("Rate: %.2f events/second", eventsPerSecond)
334
335 lastEvent := lastEventTime.Load()
336 timeSinceLastEvent := time.Now().Unix() - lastEvent
337
338 if timeSinceLastEvent < 10 {
339 log.Println()
340 log.Println("✓ TEST PASSED - Subscription remained stable")
341 log.Println("Events were received recently, indicating subscription is still active")
342 } else {
343 log.Println()
344 log.Printf("⚠ Potential issue - Last event was %ds ago", timeSinceLastEvent)
345 log.Println("Subscription may have dropped if events were still being published")
346 }
347 }
348 }
349