main.go raw

   1  package main
   2  
   3  import (
   4  	"context"
   5  	"encoding/json"
   6  	"flag"
   7  	"fmt"
   8  	"log"
   9  	"os"
  10  	"os/signal"
  11  	"sync/atomic"
  12  	"syscall"
  13  	"time"
  14  
  15  	"github.com/gorilla/websocket"
  16  	"next.orly.dev/pkg/interfaces/neterr"
  17  )
  18  
  19  var (
  20  	relayURL     = flag.String("url", "ws://localhost:3334", "Relay WebSocket URL")
  21  	duration     = flag.Int("duration", 60, "Test duration in seconds")
  22  	eventKind    = flag.Int("kind", 1, "Event kind to subscribe to")
  23  	verbose      = flag.Bool("v", false, "Verbose output")
  24  	subID        = flag.String("sub", "", "Subscription ID (default: auto-generated)")
  25  )
  26  
  27  type NostrEvent struct {
  28  	ID        string     `json:"id"`
  29  	PubKey    string     `json:"pubkey"`
  30  	CreatedAt int64      `json:"created_at"`
  31  	Kind      int        `json:"kind"`
  32  	Tags      [][]string `json:"tags"`
  33  	Content   string     `json:"content"`
  34  	Sig       string     `json:"sig"`
  35  }
  36  
  37  func main() {
  38  	flag.Parse()
  39  
  40  	log.SetFlags(log.Ltime | log.Lmicroseconds)
  41  
  42  	// Generate subscription ID if not provided
  43  	subscriptionID := *subID
  44  	if subscriptionID == "" {
  45  		subscriptionID = fmt.Sprintf("test-%d", time.Now().Unix())
  46  	}
  47  
  48  	log.Printf("Starting subscription stability test")
  49  	log.Printf("Relay: %s", *relayURL)
  50  	log.Printf("Duration: %d seconds", *duration)
  51  	log.Printf("Event kind: %d", *eventKind)
  52  	log.Printf("Subscription ID: %s", subscriptionID)
  53  	log.Println()
  54  
  55  	// Connect to relay
  56  	log.Printf("Connecting to %s...", *relayURL)
  57  	conn, _, err := websocket.DefaultDialer.Dial(*relayURL, nil)
  58  	if err != nil {
  59  		log.Fatalf("Failed to connect: %v", err)
  60  	}
  61  	defer conn.Close()
  62  	log.Printf("✓ Connected")
  63  	log.Println()
  64  
  65  	// Context for the test
  66  	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*duration+10)*time.Second)
  67  	defer cancel()
  68  
  69  	// Handle interrupts
  70  	sigChan := make(chan os.Signal, 1)
  71  	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
  72  	go func() {
  73  		<-sigChan
  74  		log.Println("\nInterrupted, shutting down...")
  75  		cancel()
  76  	}()
  77  
  78  	// Counters
  79  	var receivedCount atomic.Int64
  80  	var lastEventTime atomic.Int64
  81  	lastEventTime.Store(time.Now().Unix())
  82  
  83  	// Subscribe
  84  	reqMsg := map[string]interface{}{
  85  		"kinds": []int{*eventKind},
  86  	}
  87  	reqMsgBytes, _ := json.Marshal(reqMsg)
  88  	subscribeMsg := []interface{}{"REQ", subscriptionID, json.RawMessage(reqMsgBytes)}
  89  	subscribeMsgBytes, _ := json.Marshal(subscribeMsg)
  90  
  91  	log.Printf("Sending REQ: %s", string(subscribeMsgBytes))
  92  	if err := conn.WriteMessage(websocket.TextMessage, subscribeMsgBytes); err != nil {
  93  		log.Fatalf("Failed to send REQ: %v", err)
  94  	}
  95  
  96  	// Read messages
  97  	gotEOSE := false
  98  	readDone := make(chan struct{})
  99  	consecutiveTimeouts := 0
 100  	maxConsecutiveTimeouts := 20 // Exit if we get too many consecutive timeouts
 101  
 102  	go func() {
 103  		defer close(readDone)
 104  
 105  		for {
 106  			select {
 107  			case <-ctx.Done():
 108  				return
 109  			default:
 110  			}
 111  
 112  			conn.SetReadDeadline(time.Now().Add(5 * time.Second))
 113  			_, msg, err := conn.ReadMessage()
 114  			if err != nil {
 115  				// Check for normal close
 116  				if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
 117  					log.Println("Connection closed normally")
 118  					return
 119  				}
 120  
 121  				// Check if context was cancelled
 122  				if ctx.Err() != nil {
 123  					return
 124  				}
 125  
 126  				// Check for timeout errors (these are expected during idle periods)
 127  				if netErr, ok := err.(neterr.TimeoutError); ok && netErr.Timeout() {
 128  					consecutiveTimeouts++
 129  					if consecutiveTimeouts >= maxConsecutiveTimeouts {
 130  						log.Printf("Too many consecutive read timeouts (%d), connection may be dead", consecutiveTimeouts)
 131  						return
 132  					}
 133  					// Only log every 5th timeout to avoid spam
 134  					if *verbose && consecutiveTimeouts%5 == 0 {
 135  						log.Printf("Read timeout (idle period, %d consecutive)", consecutiveTimeouts)
 136  					}
 137  					continue
 138  				}
 139  
 140  				// For any other error, log and exit
 141  				log.Printf("Read error: %v", err)
 142  				return
 143  			}
 144  
 145  			// Reset timeout counter on successful read
 146  			consecutiveTimeouts = 0
 147  
 148  			// Parse message
 149  			var envelope []json.RawMessage
 150  			if err := json.Unmarshal(msg, &envelope); err != nil {
 151  				if *verbose {
 152  					log.Printf("Failed to parse message: %v", err)
 153  				}
 154  				continue
 155  			}
 156  
 157  			if len(envelope) < 2 {
 158  				continue
 159  			}
 160  
 161  			var msgType string
 162  			json.Unmarshal(envelope[0], &msgType)
 163  
 164  			// Check message type
 165  			switch msgType {
 166  			case "EOSE":
 167  				var recvSubID string
 168  				json.Unmarshal(envelope[1], &recvSubID)
 169  				if recvSubID == subscriptionID {
 170  					if !gotEOSE {
 171  						gotEOSE = true
 172  						log.Printf("✓ Received EOSE - subscription is active")
 173  						log.Println()
 174  						log.Println("Waiting for real-time events...")
 175  						log.Println()
 176  					}
 177  				}
 178  
 179  			case "EVENT":
 180  				var recvSubID string
 181  				json.Unmarshal(envelope[1], &recvSubID)
 182  				if recvSubID == subscriptionID {
 183  					var event NostrEvent
 184  					if err := json.Unmarshal(envelope[2], &event); err == nil {
 185  						count := receivedCount.Add(1)
 186  						lastEventTime.Store(time.Now().Unix())
 187  
 188  						eventIDShort := event.ID
 189  						if len(eventIDShort) > 8 {
 190  							eventIDShort = eventIDShort[:8]
 191  						}
 192  
 193  						log.Printf("[EVENT #%d] id=%s kind=%d created=%d",
 194  							count, eventIDShort, event.Kind, event.CreatedAt)
 195  
 196  						if *verbose {
 197  							log.Printf("  content: %s", event.Content)
 198  						}
 199  					}
 200  				}
 201  
 202  			case "NOTICE":
 203  				var notice string
 204  				json.Unmarshal(envelope[1], &notice)
 205  				log.Printf("[NOTICE] %s", notice)
 206  
 207  			case "CLOSED":
 208  				var recvSubID, reason string
 209  				json.Unmarshal(envelope[1], &recvSubID)
 210  				if len(envelope) > 2 {
 211  					json.Unmarshal(envelope[2], &reason)
 212  				}
 213  				if recvSubID == subscriptionID {
 214  					log.Printf("⚠ Subscription CLOSED by relay: %s", reason)
 215  					cancel()
 216  					return
 217  				}
 218  
 219  			case "OK":
 220  				// Ignore OK messages for this test
 221  
 222  			default:
 223  				if *verbose {
 224  					log.Printf("Unknown message type: %s", msgType)
 225  				}
 226  			}
 227  		}
 228  	}()
 229  
 230  	// Wait for EOSE with timeout
 231  	eoseTimeout := time.After(10 * time.Second)
 232  	for !gotEOSE {
 233  		select {
 234  		case <-eoseTimeout:
 235  			log.Printf("⚠ Warning: No EOSE received within 10 seconds")
 236  			gotEOSE = true // Continue anyway
 237  		case <-ctx.Done():
 238  			log.Println("Test cancelled before EOSE")
 239  			return
 240  		case <-time.After(100 * time.Millisecond):
 241  			// Keep waiting
 242  		}
 243  	}
 244  
 245  	// Monitor for subscription drops
 246  	startTime := time.Now()
 247  	endTime := startTime.Add(time.Duration(*duration) * time.Second)
 248  
 249  	// Start monitoring goroutine
 250  	go func() {
 251  		ticker := time.NewTicker(5 * time.Second)
 252  		defer ticker.Stop()
 253  
 254  		for {
 255  			select {
 256  			case <-ctx.Done():
 257  				return
 258  			case <-ticker.C:
 259  				elapsed := time.Since(startTime).Seconds()
 260  				lastEvent := lastEventTime.Load()
 261  				timeSinceLastEvent := time.Now().Unix() - lastEvent
 262  
 263  				log.Printf("[STATUS] Elapsed: %.0fs/%ds | Events: %d | Last event: %ds ago",
 264  					elapsed, *duration, receivedCount.Load(), timeSinceLastEvent)
 265  
 266  				// Warn if no events for a while (but only if we've seen events before)
 267  				if receivedCount.Load() > 0 && timeSinceLastEvent > 30 {
 268  					log.Printf("⚠ Warning: No events received for %ds - subscription may have dropped", timeSinceLastEvent)
 269  				}
 270  			}
 271  		}
 272  	}()
 273  
 274  	// Wait for test duration
 275  	log.Printf("Test running for %d seconds...", *duration)
 276  	log.Println("(You can publish events to the relay in another terminal)")
 277  	log.Println()
 278  
 279  	select {
 280  	case <-ctx.Done():
 281  		// Test completed or interrupted
 282  	case <-time.After(time.Until(endTime)):
 283  		// Duration elapsed
 284  	}
 285  
 286  	// Wait a bit for final events
 287  	time.Sleep(2 * time.Second)
 288  	cancel()
 289  
 290  	// Wait for reader to finish
 291  	select {
 292  	case <-readDone:
 293  	case <-time.After(5 * time.Second):
 294  		log.Println("Reader goroutine didn't exit cleanly")
 295  	}
 296  
 297  	// Send CLOSE
 298  	closeMsg := []interface{}{"CLOSE", subscriptionID}
 299  	closeMsgBytes, _ := json.Marshal(closeMsg)
 300  	conn.WriteMessage(websocket.TextMessage, closeMsgBytes)
 301  
 302  	// Print results
 303  	log.Println()
 304  	log.Println("===================================")
 305  	log.Println("Test Results")
 306  	log.Println("===================================")
 307  	log.Printf("Duration: %.1f seconds", time.Since(startTime).Seconds())
 308  	log.Printf("Events received: %d", receivedCount.Load())
 309  	log.Printf("Subscription ID: %s", subscriptionID)
 310  
 311  	lastEvent := lastEventTime.Load()
 312  	if lastEvent > startTime.Unix() {
 313  		log.Printf("Last event: %ds ago", time.Now().Unix()-lastEvent)
 314  	}
 315  
 316  	log.Println()
 317  
 318  	// Determine pass/fail
 319  	received := receivedCount.Load()
 320  	testDuration := time.Since(startTime).Seconds()
 321  
 322  	if received == 0 {
 323  		log.Println("⚠ No events received during test")
 324  		log.Println("This is expected if no events were published")
 325  		log.Println("To test properly, publish events while this is running:")
 326  		log.Println()
 327  		log.Println("  # In another terminal:")
 328  		log.Printf("  ./orly # Make sure relay is running\n")
 329  		log.Println()
 330  		log.Println("  # Then publish test events (implementation-specific)")
 331  	} else {
 332  		eventsPerSecond := float64(received) / testDuration
 333  		log.Printf("Rate: %.2f events/second", eventsPerSecond)
 334  
 335  		lastEvent := lastEventTime.Load()
 336  		timeSinceLastEvent := time.Now().Unix() - lastEvent
 337  
 338  		if timeSinceLastEvent < 10 {
 339  			log.Println()
 340  			log.Println("✓ TEST PASSED - Subscription remained stable")
 341  			log.Println("Events were received recently, indicating subscription is still active")
 342  		} else {
 343  			log.Println()
 344  			log.Printf("⚠ Potential issue - Last event was %ds ago", timeSinceLastEvent)
 345  			log.Println("Subscription may have dropped if events were still being published")
 346  		}
 347  	}
 348  }
 349