main.go raw

   1  package main
   2  
   3  import (
   4  	"bufio"
   5  	"bytes"
   6  	"context"
   7  	"flag"
   8  	"fmt"
   9  	"math/rand"
  10  	"os"
  11  	"os/signal"
  12  	"runtime"
  13  	"strings"
  14  	"sync"
  15  	"sync/atomic"
  16  	"time"
  17  
  18  	"next.orly.dev/pkg/lol/log"
  19  	"next.orly.dev/pkg/nostr/interfaces/signer/p8k"
  20  	"next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
  21  	"next.orly.dev/pkg/nostr/encoders/event"
  22  	"next.orly.dev/pkg/nostr/encoders/event/examples"
  23  	"next.orly.dev/pkg/nostr/encoders/filter"
  24  	"next.orly.dev/pkg/nostr/encoders/hex"
  25  	"next.orly.dev/pkg/nostr/encoders/kind"
  26  	"next.orly.dev/pkg/nostr/encoders/tag"
  27  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  28  	"next.orly.dev/pkg/nostr/ws"
  29  )
  30  
  31  // randomHex returns a hex-encoded string of n random bytes (2n hex chars)
  32  func randomHex(n int) string {
  33  	b := make([]byte, n)
  34  	_, _ = rand.Read(b)
  35  	return hex.Enc(b)
  36  }
  37  
  38  func makeEvent(rng *rand.Rand, signer *p8k.Signer) (*event.E, error) {
  39  	ev := &event.E{
  40  		CreatedAt: time.Now().Unix(),
  41  		Kind:      kind.TextNote.K,
  42  		Tags:      tag.NewS(),
  43  		Content:   []byte(fmt.Sprintf("stresstest %d", rng.Int63())),
  44  	}
  45  
  46  	// Random number of p-tags up to 100
  47  	nPTags := rng.Intn(101) // 0..100 inclusive
  48  	for i := 0; i < nPTags; i++ {
  49  		// random 32-byte pubkey in hex (64 chars)
  50  		phex := randomHex(32)
  51  		ev.Tags.Append(tag.NewFromAny("p", phex))
  52  	}
  53  
  54  	// Sign and verify to ensure pubkey, id and signature are coherent
  55  	if err := ev.Sign(signer); err != nil {
  56  		return nil, err
  57  	}
  58  	if ok, err := ev.Verify(); err != nil || !ok {
  59  		return nil, fmt.Errorf("event signature verification failed: %v", err)
  60  	}
  61  	return ev, nil
  62  }
  63  
  64  type RelayConn struct {
  65  	mu     sync.RWMutex
  66  	client *ws.Client
  67  	url    string
  68  }
  69  
  70  type CacheIndex struct {
  71  	events  []*event.E
  72  	ids     [][]byte
  73  	authors [][]byte
  74  	times   []int64
  75  	tags    map[byte][][]byte // single-letter tag -> list of values
  76  }
  77  
  78  func (rc *RelayConn) Get() *ws.Client {
  79  	rc.mu.RLock()
  80  	defer rc.mu.RUnlock()
  81  	return rc.client
  82  }
  83  
  84  func (rc *RelayConn) Reconnect(ctx context.Context) error {
  85  	rc.mu.Lock()
  86  	defer rc.mu.Unlock()
  87  	if rc.client != nil {
  88  		_ = rc.client.Close()
  89  	}
  90  	c, err := ws.RelayConnect(ctx, rc.url)
  91  	if err != nil {
  92  		return err
  93  	}
  94  	rc.client = c
  95  	return nil
  96  }
  97  
  98  // loadCacheAndIndex parses examples.Cache (JSONL of events) and builds an index
  99  func loadCacheAndIndex() (*CacheIndex, error) {
 100  	scanner := bufio.NewScanner(bytes.NewReader(examples.Cache))
 101  	idx := &CacheIndex{tags: make(map[byte][][]byte)}
 102  	for scanner.Scan() {
 103  		line := scanner.Bytes()
 104  		if len(bytes.TrimSpace(line)) == 0 {
 105  			continue
 106  		}
 107  		ev := event.New()
 108  		rem, err := ev.Unmarshal(line)
 109  		_ = rem
 110  		if err != nil {
 111  			// skip malformed lines
 112  			continue
 113  		}
 114  		idx.events = append(idx.events, ev)
 115  		// collect fields
 116  		if len(ev.ID) > 0 {
 117  			idx.ids = append(idx.ids, append([]byte(nil), ev.ID...))
 118  		}
 119  		if len(ev.Pubkey) > 0 {
 120  			idx.authors = append(idx.authors, append([]byte(nil), ev.Pubkey...))
 121  		}
 122  		idx.times = append(idx.times, ev.CreatedAt)
 123  		if ev.Tags != nil {
 124  			for _, tg := range *ev.Tags {
 125  				if tg == nil || tg.Len() < 2 {
 126  					continue
 127  				}
 128  				k := tg.Key()
 129  				if len(k) != 1 {
 130  					continue // only single-letter keys per requirement
 131  				}
 132  				key := k[0]
 133  				for _, v := range tg.T[1:] {
 134  					idx.tags[key] = append(
 135  						idx.tags[key], append([]byte(nil), v...),
 136  					)
 137  				}
 138  			}
 139  		}
 140  	}
 141  	return idx, nil
 142  }
 143  
 144  // publishCacheEvents uploads all cache events to the relay using multiple concurrent connections
 145  func publishCacheEvents(
 146  	ctx context.Context, relayURL string, idx *CacheIndex,
 147  ) (sentCount int) {
 148  	numWorkers := runtime.NumCPU()
 149  	log.I.F("using %d concurrent connections for cache upload", numWorkers)
 150  	
 151  	// Channel to distribute events to workers
 152  	eventChan := make(chan *event.E, len(idx.events))
 153  	var totalSent atomic.Int64
 154  	
 155  	// Fill the event channel
 156  	for _, ev := range idx.events {
 157  		eventChan <- ev
 158  	}
 159  	close(eventChan)
 160  	
 161  	// Start worker goroutines
 162  	var wg sync.WaitGroup
 163  	for i := 0; i < numWorkers; i++ {
 164  		wg.Add(1)
 165  		go func(workerID int) {
 166  			defer wg.Done()
 167  			
 168  			// Create separate connection for this worker
 169  			client, err := ws.RelayConnect(ctx, relayURL)
 170  			if err != nil {
 171  				log.E.F("worker %d: failed to connect: %v", workerID, err)
 172  				return
 173  			}
 174  			defer client.Close()
 175  			
 176  			rc := &RelayConn{client: client, url: relayURL}
 177  			workerSent := 0
 178  			
 179  			// Process events from the channel
 180  			for ev := range eventChan {
 181  				select {
 182  				case <-ctx.Done():
 183  					return
 184  				default:
 185  				}
 186  				
 187  				// Get client connection
 188  				wsClient := rc.Get()
 189  				if wsClient == nil {
 190  					if err := rc.Reconnect(ctx); err != nil {
 191  						log.E.F("worker %d: reconnect failed: %v", workerID, err)
 192  						continue
 193  					}
 194  					wsClient = rc.Get()
 195  				}
 196  				
 197  				// Send event without waiting for OK response (fire-and-forget)
 198  				envelope := eventenvelope.NewSubmissionWith(ev)
 199  				envBytes := envelope.Marshal(nil)
 200  				if err := <-wsClient.Write(envBytes); err != nil {
 201  					log.E.F("worker %d: write error: %v", workerID, err)
 202  					errStr := err.Error()
 203  					if strings.Contains(errStr, "connection closed") {
 204  						_ = rc.Reconnect(ctx)
 205  					}
 206  					time.Sleep(50 * time.Millisecond)
 207  					continue
 208  				}
 209  				
 210  				workerSent++
 211  				totalSent.Add(1)
 212  				log.T.F("worker %d: sent event %d (total: %d)", workerID, workerSent, totalSent.Load())
 213  				
 214  				// Small delay to prevent overwhelming the relay
 215  				select {
 216  				case <-time.After(10 * time.Millisecond):
 217  				case <-ctx.Done():
 218  					return
 219  				}
 220  			}
 221  			
 222  			log.I.F("worker %d: completed, sent %d events", workerID, workerSent)
 223  		}(i)
 224  	}
 225  	
 226  	// Wait for all workers to complete
 227  	wg.Wait()
 228  	
 229  	return int(totalSent.Load())
 230  }
 231  
 232  // buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value.
 233  func buildRandomFilter(idx *CacheIndex, rng *rand.Rand, mask int) *filter.F {
 234  	// pick a random base event as anchor for fields
 235  	i := rng.Intn(len(idx.events))
 236  	ev := idx.events[i]
 237  	f := filter.New()
 238  	// clear defaults we don't set
 239  	f.Kinds = kind.NewS() // we don't constrain kinds
 240  	// include fields based on mask bits: 1=id, 2=author, 4=timestamp, 8=tag
 241  	if mask&1 != 0 {
 242  		f.Ids.T = append(f.Ids.T, append([]byte(nil), ev.ID...))
 243  	}
 244  	if mask&2 != 0 {
 245  		f.Authors.T = append(f.Authors.T, append([]byte(nil), ev.Pubkey...))
 246  	}
 247  	if mask&4 != 0 {
 248  		// use a tight window around the event timestamp (exact match)
 249  		f.Since = timestamp.FromUnix(ev.CreatedAt)
 250  		f.Until = timestamp.FromUnix(ev.CreatedAt)
 251  	}
 252  	if mask&8 != 0 {
 253  		// choose a random single-letter tag from this event if present; fallback to global index
 254  		var key byte
 255  		var val []byte
 256  		chosen := false
 257  		if ev.Tags != nil {
 258  			for _, tg := range *ev.Tags {
 259  				if tg == nil || tg.Len() < 2 {
 260  					continue
 261  				}
 262  				k := tg.Key()
 263  				if len(k) == 1 {
 264  					key = k[0]
 265  					vv := tg.T[1:]
 266  					val = vv[rng.Intn(len(vv))]
 267  					chosen = true
 268  					break
 269  				}
 270  			}
 271  		}
 272  		if !chosen && len(idx.tags) > 0 {
 273  			// pick a random entry from global tags map
 274  			keys := make([]byte, 0, len(idx.tags))
 275  			for k := range idx.tags {
 276  				keys = append(keys, k)
 277  			}
 278  			key = keys[rng.Intn(len(keys))]
 279  			vals := idx.tags[key]
 280  			val = vals[rng.Intn(len(vals))]
 281  		}
 282  		if key != 0 && len(val) > 0 {
 283  			f.Tags.Append(tag.NewFromBytesSlice([]byte{key}, val))
 284  		}
 285  	}
 286  	return f
 287  }
 288  
 289  func publisherWorker(
 290  	ctx context.Context, rc *RelayConn, id int, stats *uint64,
 291  ) {
 292  	// Unique RNG per worker
 293  	src := rand.NewSource(time.Now().UnixNano() ^ int64(id<<16))
 294  	rng := rand.New(src)
 295  	// Generate and reuse signing key per worker
 296  	var signer *p8k.Signer
 297  	var err error
 298  	if signer, err = p8k.New(); err != nil {
 299  		log.E.F("worker %d: signer create error: %v", id, err)
 300  		return
 301  	}
 302  	if err := signer.Generate(); err != nil {
 303  		log.E.F("worker %d: signer generate error: %v", id, err)
 304  		return
 305  	}
 306  
 307  	for {
 308  		select {
 309  		case <-ctx.Done():
 310  			return
 311  		default:
 312  		}
 313  
 314  		ev, err := makeEvent(rng, signer)
 315  		if err != nil {
 316  			log.E.F("worker %d: makeEvent error: %v", id, err)
 317  			return
 318  		}
 319  
 320  		// Send event without waiting for OK response (fire-and-forget)
 321  		client := rc.Get()
 322  		if client == nil {
 323  			_ = rc.Reconnect(ctx)
 324  			continue
 325  		}
 326  		// Create EVENT envelope and send directly without waiting for OK
 327  		envelope := eventenvelope.NewSubmissionWith(ev)
 328  		envBytes := envelope.Marshal(nil)
 329  		if err := <-client.Write(envBytes); err != nil {
 330  			log.E.F("worker %d: write error: %v", id, err)
 331  			errStr := err.Error()
 332  			if strings.Contains(errStr, "connection closed") {
 333  				for attempt := 0; attempt < 5; attempt++ {
 334  					if ctx.Err() != nil {
 335  						return
 336  					}
 337  					if err := rc.Reconnect(ctx); err == nil {
 338  						log.I.F("worker %d: reconnected to %s", id, rc.url)
 339  						break
 340  					}
 341  					select {
 342  					case <-time.After(200 * time.Millisecond):
 343  					case <-ctx.Done():
 344  						return
 345  					}
 346  				}
 347  			}
 348  			// back off briefly on error to avoid tight loop if relay misbehaves
 349  			select {
 350  			case <-time.After(100 * time.Millisecond):
 351  			case <-ctx.Done():
 352  				return
 353  			}
 354  			continue
 355  		}
 356  
 357  		atomic.AddUint64(stats, 1)
 358  
 359  		// Randomly fluctuate pacing: small random sleep 0..50ms plus occasional longer jitter
 360  		sleep := time.Duration(rng.Intn(50)) * time.Millisecond
 361  		if rng.Intn(10) == 0 { // 10% chance add extra 100..400ms
 362  			sleep += time.Duration(100+rng.Intn(300)) * time.Millisecond
 363  		}
 364  		select {
 365  		case <-time.After(sleep):
 366  		case <-ctx.Done():
 367  			return
 368  		}
 369  	}
 370  }
 371  
 372  func queryWorker(
 373  	ctx context.Context, rc *RelayConn, idx *CacheIndex, id int,
 374  	queries, results *uint64, subTimeout time.Duration,
 375  	minInterval, maxInterval time.Duration,
 376  ) {
 377  	rng := rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(id<<24)))
 378  	mask := 1
 379  	for {
 380  		select {
 381  		case <-ctx.Done():
 382  			return
 383  		default:
 384  		}
 385  		if len(idx.events) == 0 {
 386  			time.Sleep(200 * time.Millisecond)
 387  			continue
 388  		}
 389  		f := buildRandomFilter(idx, rng, mask)
 390  		mask++
 391  		if mask > 15 { // all combinations of 4 criteria (excluding 0)
 392  			mask = 1
 393  		}
 394  		client := rc.Get()
 395  		if client == nil {
 396  			_ = rc.Reconnect(ctx)
 397  			continue
 398  		}
 399  		ff := filter.S{f}
 400  		sCtx, cancel := context.WithTimeout(ctx, subTimeout)
 401  		sub, err := client.Subscribe(
 402  			sCtx, &ff, ws.WithLabel("stresstest-query"),
 403  		)
 404  		if err != nil {
 405  			cancel()
 406  			// reconnect on connection issues
 407  			errStr := err.Error()
 408  			if strings.Contains(errStr, "connection closed") {
 409  				_ = rc.Reconnect(ctx)
 410  			}
 411  			continue
 412  		}
 413  		atomic.AddUint64(queries, 1)
 414  		// read until EOSE or timeout
 415  		innerDone := false
 416  		for !innerDone {
 417  			select {
 418  			case <-sCtx.Done():
 419  				innerDone = true
 420  			case <-sub.EndOfStoredEvents:
 421  				innerDone = true
 422  			case ev, ok := <-sub.Events:
 423  				if !ok {
 424  					innerDone = true
 425  					break
 426  				}
 427  				if ev != nil {
 428  					atomic.AddUint64(results, 1)
 429  				}
 430  			}
 431  		}
 432  		sub.Unsub()
 433  		cancel()
 434  		// wait a random interval between queries
 435  		interval := minInterval
 436  		if maxInterval > minInterval {
 437  			delta := rng.Int63n(int64(maxInterval - minInterval))
 438  			interval += time.Duration(delta)
 439  		}
 440  		select {
 441  		case <-time.After(interval):
 442  		case <-ctx.Done():
 443  			return
 444  		}
 445  	}
 446  }
 447  
 448  func startReader(ctx context.Context, rl *ws.Client, received *uint64) error {
 449  	// Broad filter: subscribe to text notes since now-5m to catch our own writes
 450  	f := filter.New()
 451  	f.Kinds = kind.NewS(kind.TextNote)
 452  	// We don't set authors to ensure we read all text notes coming in
 453  	ff := filter.S{f}
 454  	sub, err := rl.Subscribe(ctx, &ff, ws.WithLabel("stresstest-reader"))
 455  	if err != nil {
 456  		return err
 457  	}
 458  
 459  	go func() {
 460  		for {
 461  			select {
 462  			case <-ctx.Done():
 463  				return
 464  			case ev, ok := <-sub.Events:
 465  				if !ok {
 466  					return
 467  				}
 468  				if ev != nil {
 469  					atomic.AddUint64(received, 1)
 470  				}
 471  			}
 472  		}
 473  	}()
 474  
 475  	return nil
 476  }
 477  
 478  func main() {
 479  	var (
 480  		address        string
 481  		port           int
 482  		workers        int
 483  		duration       time.Duration
 484  		publishTimeout time.Duration
 485  		queryWorkers   int
 486  		queryTimeout   time.Duration
 487  		queryMinInt    time.Duration
 488  		queryMaxInt    time.Duration
 489  		skipCache      bool
 490  	)
 491  
 492  	flag.StringVar(
 493  		&address, "address", "localhost", "relay address (host or IP)",
 494  	)
 495  	flag.IntVar(&port, "port", 3334, "relay port")
 496  	flag.IntVar(
 497  		&workers, "workers", 8, "number of concurrent publisher workers",
 498  	)
 499  	flag.DurationVar(
 500  		&duration, "duration", 60*time.Second,
 501  		"how long to run the stress test",
 502  	)
 503  	flag.DurationVar(
 504  		&publishTimeout, "publish-timeout", 15*time.Second,
 505  		"timeout waiting for OK per publish",
 506  	)
 507  	flag.IntVar(
 508  		&queryWorkers, "query-workers", 4, "number of concurrent query workers",
 509  	)
 510  	flag.DurationVar(
 511  		&queryTimeout, "query-timeout", 3*time.Second,
 512  		"subscription timeout for queries",
 513  	)
 514  	flag.DurationVar(
 515  		&queryMinInt, "query-min-interval", 50*time.Millisecond,
 516  		"minimum interval between queries per worker",
 517  	)
 518  	flag.DurationVar(
 519  		&queryMaxInt, "query-max-interval", 300*time.Millisecond,
 520  		"maximum interval between queries per worker",
 521  	)
 522  	flag.BoolVar(
 523  		&skipCache, "skip-cache", false,
 524  		"skip uploading examples.Cache before running",
 525  	)
 526  	flag.Parse()
 527  
 528  	relayURL := fmt.Sprintf("ws://%s:%d", address, port)
 529  	log.I.F("stresstest: connecting to %s", relayURL)
 530  
 531  	ctx, cancel := context.WithCancel(context.Background())
 532  	defer cancel()
 533  
 534  	// Handle Ctrl+C
 535  	sigc := make(chan os.Signal, 1)
 536  	signal.Notify(sigc, os.Interrupt)
 537  	go func() {
 538  		select {
 539  		case <-sigc:
 540  			log.I.Ln("interrupt received, shutting down...")
 541  			cancel()
 542  		case <-ctx.Done():
 543  		}
 544  	}()
 545  
 546  	rl, err := ws.RelayConnect(ctx, relayURL)
 547  	if err != nil {
 548  		log.E.F("failed to connect to relay %s: %v", relayURL, err)
 549  		os.Exit(1)
 550  	}
 551  	defer rl.Close()
 552  
 553  	rc := &RelayConn{client: rl, url: relayURL}
 554  
 555  	// Load and publish cache events first (unless skipped)
 556  	idx, err := loadCacheAndIndex()
 557  	if err != nil {
 558  		log.E.F("failed to load examples.Cache: %v", err)
 559  	}
 560  	cacheSent := 0
 561  	if !skipCache && idx != nil && len(idx.events) > 0 {
 562  		log.I.F("sending %d events from examples.Cache...", len(idx.events))
 563  		cacheSent = publishCacheEvents(ctx, relayURL, idx)
 564  		log.I.F("sent %d/%d cache events", cacheSent, len(idx.events))
 565  	}
 566  
 567  	var pubOK uint64
 568  	var recvCount uint64
 569  	var qCount uint64
 570  	var qResults uint64
 571  
 572  	if err := startReader(ctx, rl, &recvCount); err != nil {
 573  		log.E.F("reader subscribe error: %v", err)
 574  		// continue anyway, we can still write
 575  	}
 576  
 577  	wg := sync.WaitGroup{}
 578  	// Start publisher workers
 579  	wg.Add(workers)
 580  	for i := 0; i < workers; i++ {
 581  		i := i
 582  		go func() {
 583  			defer wg.Done()
 584  			publisherWorker(ctx, rc, i, &pubOK)
 585  		}()
 586  	}
 587  	// Start query workers
 588  	if idx != nil && len(idx.events) > 0 && queryWorkers > 0 {
 589  		wg.Add(queryWorkers)
 590  		for i := 0; i < queryWorkers; i++ {
 591  			i := i
 592  			go func() {
 593  				defer wg.Done()
 594  				queryWorker(
 595  					ctx, rc, idx, i, &qCount, &qResults, queryTimeout,
 596  					queryMinInt, queryMaxInt,
 597  				)
 598  			}()
 599  		}
 600  	}
 601  
 602  	// Timer for duration and periodic stats
 603  	ticker := time.NewTicker(2 * time.Second)
 604  	defer ticker.Stop()
 605  	end := time.NewTimer(duration)
 606  	start := time.Now()
 607  
 608  loop:
 609  	for {
 610  		select {
 611  		case <-ticker.C:
 612  			elapsed := time.Since(start).Seconds()
 613  			p := atomic.LoadUint64(&pubOK)
 614  			r := atomic.LoadUint64(&recvCount)
 615  			qc := atomic.LoadUint64(&qCount)
 616  			qr := atomic.LoadUint64(&qResults)
 617  			log.I.F(
 618  				"elapsed=%.1fs sent=%d (%.0f/s) received=%d cache_sent=%d queries=%d results=%d",
 619  				elapsed, p, float64(p)/elapsed, r, cacheSent, qc, qr,
 620  			)
 621  		case <-end.C:
 622  			break loop
 623  		case <-ctx.Done():
 624  			break loop
 625  		}
 626  	}
 627  
 628  	cancel()
 629  	wg.Wait()
 630  	p := atomic.LoadUint64(&pubOK)
 631  	r := atomic.LoadUint64(&recvCount)
 632  	qc := atomic.LoadUint64(&qCount)
 633  	qr := atomic.LoadUint64(&qResults)
 634  	log.I.F(
 635  		"stresstest complete: cache_sent=%d sent=%d received=%d queries=%d results=%d duration=%s",
 636  		cacheSent, p, r, qc, qr,
 637  		time.Since(start).Truncate(time.Millisecond),
 638  	)
 639  }
 640