main.go raw

   1  package main
   2  
   3  import (
   4  	"context"
   5  	"encoding/json"
   6  	"flag"
   7  	"fmt"
   8  	"log"
   9  	"os"
  10  	"os/signal"
  11  	"syscall"
  12  	"time"
  13  
  14  	"github.com/gorilla/websocket"
  15  	"next.orly.dev/pkg/interfaces/neterr"
  16  )
  17  
  18  var (
  19  	relayURL = flag.String("url", "ws://localhost:3334", "Relay WebSocket URL")
  20  	duration = flag.Int("duration", 120, "Test duration in seconds")
  21  )
  22  
  23  func main() {
  24  	flag.Parse()
  25  
  26  	log.SetFlags(log.Ltime)
  27  
  28  	fmt.Println("===================================")
  29  	fmt.Println("Simple Subscription Stability Test")
  30  	fmt.Println("===================================")
  31  	fmt.Printf("Relay: %s\n", *relayURL)
  32  	fmt.Printf("Duration: %d seconds\n", *duration)
  33  	fmt.Println()
  34  	fmt.Println("This test verifies that subscriptions remain")
  35  	fmt.Println("active without dropping over the test period.")
  36  	fmt.Println()
  37  
  38  	// Connect to relay
  39  	log.Printf("Connecting to %s...", *relayURL)
  40  	conn, _, err := websocket.DefaultDialer.Dial(*relayURL, nil)
  41  	if err != nil {
  42  		log.Fatalf("Failed to connect: %v", err)
  43  	}
  44  	defer conn.Close()
  45  	log.Printf("✓ Connected")
  46  
  47  	// Context for the test
  48  	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*duration+10)*time.Second)
  49  	defer cancel()
  50  
  51  	// Handle interrupts
  52  	sigChan := make(chan os.Signal, 1)
  53  	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
  54  	go func() {
  55  		<-sigChan
  56  		log.Println("\nInterrupted, shutting down...")
  57  		cancel()
  58  	}()
  59  
  60  	// Subscribe
  61  	subID := fmt.Sprintf("stability-test-%d", time.Now().Unix())
  62  	reqMsg := []interface{}{"REQ", subID, map[string]interface{}{"kinds": []int{1}}}
  63  	reqMsgBytes, _ := json.Marshal(reqMsg)
  64  
  65  	log.Printf("Sending subscription: %s", subID)
  66  	if err := conn.WriteMessage(websocket.TextMessage, reqMsgBytes); err != nil {
  67  		log.Fatalf("Failed to send REQ: %v", err)
  68  	}
  69  
  70  	// Track connection health
  71  	lastMessageTime := time.Now()
  72  	gotEOSE := false
  73  	messageCount := 0
  74  	pingCount := 0
  75  
  76  	// Read goroutine
  77  	readDone := make(chan struct{})
  78  	go func() {
  79  		defer close(readDone)
  80  
  81  		for {
  82  			select {
  83  			case <-ctx.Done():
  84  				return
  85  			default:
  86  			}
  87  
  88  			conn.SetReadDeadline(time.Now().Add(10 * time.Second))
  89  			msgType, msg, err := conn.ReadMessage()
  90  			if err != nil {
  91  				if ctx.Err() != nil {
  92  					return
  93  				}
  94  				if netErr, ok := err.(neterr.TimeoutError); ok && netErr.Timeout() {
  95  					continue
  96  				}
  97  				log.Printf("Read error: %v", err)
  98  				return
  99  			}
 100  
 101  			lastMessageTime = time.Now()
 102  			messageCount++
 103  
 104  			// Handle PING
 105  			if msgType == websocket.PingMessage {
 106  				pingCount++
 107  				log.Printf("Received PING #%d, sending PONG", pingCount)
 108  				conn.WriteMessage(websocket.PongMessage, nil)
 109  				continue
 110  			}
 111  
 112  			// Parse message
 113  			var envelope []json.RawMessage
 114  			if err := json.Unmarshal(msg, &envelope); err != nil {
 115  				continue
 116  			}
 117  
 118  			if len(envelope) < 2 {
 119  				continue
 120  			}
 121  
 122  			var msgTypeStr string
 123  			json.Unmarshal(envelope[0], &msgTypeStr)
 124  
 125  			switch msgTypeStr {
 126  			case "EOSE":
 127  				var recvSubID string
 128  				json.Unmarshal(envelope[1], &recvSubID)
 129  				if recvSubID == subID && !gotEOSE {
 130  					gotEOSE = true
 131  					log.Printf("✓ Received EOSE - subscription is active")
 132  				}
 133  
 134  			case "EVENT":
 135  				var recvSubID string
 136  				json.Unmarshal(envelope[1], &recvSubID)
 137  				if recvSubID == subID {
 138  					log.Printf("Received EVENT (subscription still active)")
 139  				}
 140  
 141  			case "CLOSED":
 142  				var recvSubID string
 143  				json.Unmarshal(envelope[1], &recvSubID)
 144  				if recvSubID == subID {
 145  					log.Printf("⚠ Subscription CLOSED by relay!")
 146  					cancel()
 147  					return
 148  				}
 149  
 150  			case "NOTICE":
 151  				var notice string
 152  				json.Unmarshal(envelope[1], &notice)
 153  				log.Printf("NOTICE: %s", notice)
 154  			}
 155  		}
 156  	}()
 157  
 158  	// Wait for EOSE
 159  	log.Println("Waiting for EOSE...")
 160  	for !gotEOSE && ctx.Err() == nil {
 161  		time.Sleep(100 * time.Millisecond)
 162  	}
 163  
 164  	if !gotEOSE {
 165  		log.Fatal("Did not receive EOSE")
 166  	}
 167  
 168  	// Monitor loop
 169  	startTime := time.Now()
 170  	ticker := time.NewTicker(10 * time.Second)
 171  	defer ticker.Stop()
 172  
 173  	log.Println()
 174  	log.Printf("Subscription is active. Monitoring for %d seconds...", *duration)
 175  	log.Println("(Subscription should stay active even without events)")
 176  	log.Println()
 177  
 178  	for {
 179  		select {
 180  		case <-ctx.Done():
 181  			goto done
 182  		case <-ticker.C:
 183  			elapsed := time.Since(startTime)
 184  			timeSinceMessage := time.Since(lastMessageTime)
 185  
 186  			log.Printf("[%3.0fs/%ds] Messages: %d | Last message: %.0fs ago | Status: %s",
 187  				elapsed.Seconds(),
 188  				*duration,
 189  				messageCount,
 190  				timeSinceMessage.Seconds(),
 191  				getStatus(timeSinceMessage),
 192  			)
 193  
 194  			// Check if we've reached duration
 195  			if elapsed >= time.Duration(*duration)*time.Second {
 196  				goto done
 197  			}
 198  		}
 199  	}
 200  
 201  done:
 202  	cancel()
 203  
 204  	// Wait for reader
 205  	select {
 206  	case <-readDone:
 207  	case <-time.After(2 * time.Second):
 208  	}
 209  
 210  	// Send CLOSE
 211  	closeMsg := []interface{}{"CLOSE", subID}
 212  	closeMsgBytes, _ := json.Marshal(closeMsg)
 213  	conn.WriteMessage(websocket.TextMessage, closeMsgBytes)
 214  
 215  	// Results
 216  	elapsed := time.Since(startTime)
 217  	timeSinceMessage := time.Since(lastMessageTime)
 218  
 219  	fmt.Println()
 220  	fmt.Println("===================================")
 221  	fmt.Println("Test Results")
 222  	fmt.Println("===================================")
 223  	fmt.Printf("Duration: %.1f seconds\n", elapsed.Seconds())
 224  	fmt.Printf("Total messages: %d\n", messageCount)
 225  	fmt.Printf("Last message: %.0f seconds ago\n", timeSinceMessage.Seconds())
 226  	fmt.Println()
 227  
 228  	// Determine success
 229  	if timeSinceMessage < 15*time.Second {
 230  		// Recent message - subscription is alive
 231  		fmt.Println("✓ TEST PASSED")
 232  		fmt.Println("Subscription remained active throughout test period.")
 233  		fmt.Println("Recent messages indicate healthy connection.")
 234  	} else if timeSinceMessage < 30*time.Second {
 235  		// Somewhat recent - probably OK
 236  		fmt.Println("✓ TEST LIKELY PASSED")
 237  		fmt.Println("Subscription appears active (message received recently).")
 238  		fmt.Println("Some delay is normal if relay is idle.")
 239  	} else if messageCount > 0 {
 240  		// Got EOSE but nothing since
 241  		fmt.Println("⚠ INCONCLUSIVE")
 242  		fmt.Println("Subscription was established but no activity since.")
 243  		fmt.Println("This is expected if relay has no events and doesn't send pings.")
 244  		fmt.Println("To properly test, publish events during the test period.")
 245  	} else {
 246  		// No messages at all
 247  		fmt.Println("✗ TEST FAILED")
 248  		fmt.Println("No messages received - subscription may have failed.")
 249  	}
 250  
 251  	fmt.Println()
 252  	fmt.Println("Note: This test verifies the subscription stays registered.")
 253  	fmt.Println("For full testing, publish events while this runs and verify")
 254  	fmt.Println("they are received throughout the entire test duration.")
 255  }
 256  
 257  func getStatus(timeSince time.Duration) string {
 258  	seconds := timeSince.Seconds()
 259  	switch {
 260  	case seconds < 10:
 261  		return "ACTIVE (recent message)"
 262  	case seconds < 30:
 263  		return "IDLE (normal)"
 264  	case seconds < 60:
 265  		return "QUIET (possibly normal)"
 266  	default:
 267  		return "STALE (may have dropped)"
 268  	}
 269  }
 270