main.go raw

   1  package main
   2  
   3  import (
   4  	"bufio"
   5  	"bytes"
   6  	"context"
   7  	"encoding/json"
   8  	"flag"
   9  	"fmt"
  10  	"log"
  11  	"os"
  12  	"path/filepath"
  13  	"runtime"
  14  	"sort"
  15  	"strings"
  16  	"sync"
  17  	"time"
  18  
  19  	lol "next.orly.dev/pkg/lol"
  20  	"next.orly.dev/pkg/database"
  21  	"next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
  22  	"next.orly.dev/pkg/nostr/encoders/event"
  23  	examples "next.orly.dev/pkg/nostr/encoders/event/examples"
  24  	"next.orly.dev/pkg/nostr/encoders/filter"
  25  	"next.orly.dev/pkg/nostr/encoders/kind"
  26  	"next.orly.dev/pkg/nostr/encoders/tag"
  27  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  28  	"next.orly.dev/pkg/nostr/interfaces/signer/p8k"
  29  	"next.orly.dev/pkg/nostr/ws"
  30  )
  31  
  32  type BenchmarkConfig struct {
  33  	DataDir           string
  34  	NumEvents         int
  35  	ConcurrentWorkers int
  36  	TestDuration      time.Duration
  37  	BurstPattern      bool
  38  	ReportInterval    time.Duration
  39  
  40  	// Network load options
  41  	RelayURL   string
  42  	NetWorkers int
  43  	NetRate    int // events/sec per worker
  44  
  45  	// Backend selection
  46  	UseNeo4j      bool
  47  	UseRelySQLite bool
  48  
  49  	// Graph traversal benchmark
  50  	UseGraphTraversal        bool
  51  	UseNetworkGraphTraversal bool // Network-mode graph traversal (for multi-relay testing)
  52  	UsePPGComparison         bool // PPG vs baseline comparison benchmark
  53  }
  54  
  55  type BenchmarkResult struct {
  56  	TestName          string
  57  	Duration          time.Duration
  58  	TotalEvents       int
  59  	EventsPerSecond   float64
  60  	AvgLatency        time.Duration
  61  	P90Latency        time.Duration
  62  	P95Latency        time.Duration
  63  	P99Latency        time.Duration
  64  	Bottom10Avg       time.Duration
  65  	SuccessRate       float64
  66  	ConcurrentWorkers int
  67  	MemoryUsed        uint64
  68  	Errors            []string
  69  }
  70  
  71  // RateLimiter implements a simple token bucket rate limiter
  72  type RateLimiter struct {
  73  	rate       float64       // events per second
  74  	interval   time.Duration // time between events
  75  	lastEvent  time.Time
  76  	mu         sync.Mutex
  77  }
  78  
  79  // NewRateLimiter creates a rate limiter for the specified events per second
  80  func NewRateLimiter(eventsPerSecond float64) *RateLimiter {
  81  	return &RateLimiter{
  82  		rate:      eventsPerSecond,
  83  		interval:  time.Duration(float64(time.Second) / eventsPerSecond),
  84  		lastEvent: time.Now(),
  85  	}
  86  }
  87  
  88  // Wait blocks until the next event is allowed based on the rate limit
  89  func (rl *RateLimiter) Wait() {
  90  	rl.mu.Lock()
  91  	defer rl.mu.Unlock()
  92  
  93  	now := time.Now()
  94  	nextAllowed := rl.lastEvent.Add(rl.interval)
  95  
  96  	if now.Before(nextAllowed) {
  97  		time.Sleep(nextAllowed.Sub(now))
  98  		rl.lastEvent = nextAllowed
  99  	} else {
 100  		rl.lastEvent = now
 101  	}
 102  }
 103  
 104  type Benchmark struct {
 105  	config       *BenchmarkConfig
 106  	db           *database.D
 107  	results      []*BenchmarkResult
 108  	mu           sync.RWMutex
 109  	cachedEvents []*event.E // Real-world events from examples.Cache
 110  	eventCacheMu sync.Mutex
 111  }
 112  
 113  func main() {
 114  	lol.SetLogLevel("warn")
 115  	config := parseFlags()
 116  
 117  	if config.UseNetworkGraphTraversal {
 118  		// Network graph traversal mode: requires relay URL
 119  		if config.RelayURL == "" {
 120  			log.Fatal("Network graph traversal benchmark requires -relay-url flag")
 121  		}
 122  		runNetworkGraphTraversalBenchmark(config)
 123  		return
 124  	}
 125  
 126  	if config.RelayURL != "" {
 127  		// Network mode: connect to relay and generate traffic
 128  		runNetworkLoad(config)
 129  		return
 130  	}
 131  
 132  	if config.UseNeo4j {
 133  		// Run Neo4j benchmark
 134  		runNeo4jBenchmark(config)
 135  		return
 136  	}
 137  
 138  	if config.UseRelySQLite {
 139  		// Run Rely-SQLite benchmark
 140  		runRelySQLiteBenchmark(config)
 141  		return
 142  	}
 143  
 144  	if config.UseGraphTraversal {
 145  		// Run graph traversal benchmark
 146  		runGraphTraversalBenchmark(config)
 147  		return
 148  	}
 149  
 150  	// Run standard Badger benchmark
 151  	fmt.Printf("Starting Nostr Relay Benchmark (Badger Backend)\n")
 152  	fmt.Printf("Data Directory: %s\n", config.DataDir)
 153  	fmt.Printf(
 154  		"Events: %d, Workers: %d, Duration: %v\n",
 155  		config.NumEvents, config.ConcurrentWorkers, config.TestDuration,
 156  	)
 157  
 158  	benchmark := NewBenchmark(config)
 159  	defer benchmark.Close()
 160  
 161  	// Run benchmark suite twice with pauses
 162  	benchmark.RunSuite()
 163  
 164  	// Generate reports
 165  	benchmark.GenerateReport()
 166  	benchmark.GenerateAsciidocReport()
 167  }
 168  
 169  func runNeo4jBenchmark(config *BenchmarkConfig) {
 170  	fmt.Printf("Starting Nostr Relay Benchmark (Neo4j Backend)\n")
 171  	fmt.Printf("Data Directory: %s\n", config.DataDir)
 172  	fmt.Printf(
 173  		"Events: %d, Workers: %d\n",
 174  		config.NumEvents, config.ConcurrentWorkers,
 175  	)
 176  
 177  	neo4jBench, err := NewNeo4jBenchmark(config)
 178  	if err != nil {
 179  		log.Fatalf("Failed to create Neo4j benchmark: %v", err)
 180  	}
 181  	defer neo4jBench.Close()
 182  
 183  	// Run Neo4j benchmark suite
 184  	neo4jBench.RunSuite()
 185  
 186  	// Generate reports
 187  	neo4jBench.GenerateReport()
 188  	neo4jBench.GenerateAsciidocReport()
 189  }
 190  
 191  func runRelySQLiteBenchmark(config *BenchmarkConfig) {
 192  	fmt.Printf("Starting Nostr Relay Benchmark (Rely-SQLite Backend)\n")
 193  	fmt.Printf("Data Directory: %s\n", config.DataDir)
 194  	fmt.Printf(
 195  		"Events: %d, Workers: %d\n",
 196  		config.NumEvents, config.ConcurrentWorkers,
 197  	)
 198  
 199  	relysqliteBench, err := NewRelySQLiteBenchmark(config)
 200  	if err != nil {
 201  		log.Fatalf("Failed to create Rely-SQLite benchmark: %v", err)
 202  	}
 203  	defer relysqliteBench.Close()
 204  
 205  	// Run Rely-SQLite benchmark suite
 206  	relysqliteBench.RunSuite()
 207  
 208  	// Generate reports
 209  	relysqliteBench.GenerateReport()
 210  	relysqliteBench.GenerateAsciidocReport()
 211  }
 212  
 213  func runGraphTraversalBenchmark(config *BenchmarkConfig) {
 214  	fmt.Printf("Starting Graph Traversal Benchmark (Badger Backend)\n")
 215  	fmt.Printf("Data Directory: %s\n", config.DataDir)
 216  	fmt.Printf("Workers: %d\n", config.ConcurrentWorkers)
 217  	fmt.Printf("Pubkeys: %d, Follows per pubkey: %d-%d\n",
 218  		GraphBenchNumPubkeys, GraphBenchMinFollows, GraphBenchMaxFollows)
 219  
 220  	// Check if we can reuse an existing seeded database
 221  	reuseDB := false
 222  	if config.UsePPGComparison {
 223  		if info, err := os.Stat(filepath.Join(config.DataDir, "MANIFEST")); err == nil && info.Size() > 0 {
 224  			reuseDB = true
 225  			fmt.Printf("Reusing existing seeded database at %s\n", config.DataDir)
 226  		}
 227  	}
 228  
 229  	if !reuseDB {
 230  		// Clean up existing data directory
 231  		os.RemoveAll(config.DataDir)
 232  	}
 233  
 234  	ctx := context.Background()
 235  	cancel := func() {}
 236  
 237  	db, err := database.New(ctx, cancel, config.DataDir, "warn")
 238  	if err != nil {
 239  		log.Fatalf("Failed to create database: %v", err)
 240  	}
 241  	defer db.Close()
 242  
 243  	// Create graph traversal benchmark (always needed for pubkey arrays)
 244  	graphBench := NewGraphTraversalBenchmark(config, db)
 245  
 246  	if reuseDB {
 247  		// Regenerate pubkey/signer arrays (deterministic) without re-seeding DB
 248  		graphBench.RegeneratePubkeys()
 249  	} else {
 250  		// Seed the database with follow list events
 251  		graphBench.SeedDatabase()
 252  	}
 253  
 254  	// Run NIP-01 multi-hop traversal unless PPG mode is set (it has its own baseline)
 255  	if !config.UsePPGComparison {
 256  		graphBench.runThirdDegreeTraversal()
 257  	}
 258  
 259  	// Run PPG comparison if requested
 260  	if config.UsePPGComparison {
 261  		ppgBench := NewPPGBenchmark(config, db)
 262  		ppgBench.VerifyEquivalence(graphBench.pubkeys, 100)
 263  		ppgBench.RunSuite(graphBench.pubkeys)
 264  	}
 265  
 266  	// Generate reports
 267  	graphBench.PrintResults()
 268  	generateGraphTraversalAsciidocReport(config, graphBench.GetResults())
 269  }
 270  
 271  func generateGraphTraversalAsciidocReport(config *BenchmarkConfig, results []*BenchmarkResult) {
 272  	path := filepath.Join(config.DataDir, "graph_traversal_report.adoc")
 273  	file, err := os.Create(path)
 274  	if err != nil {
 275  		log.Printf("Failed to create report: %v", err)
 276  		return
 277  	}
 278  	defer file.Close()
 279  
 280  	file.WriteString("= Graph Traversal Benchmark Results\n\n")
 281  	file.WriteString(
 282  		fmt.Sprintf(
 283  			"Generated: %s\n\n", time.Now().Format(time.RFC3339),
 284  		),
 285  	)
 286  	file.WriteString(fmt.Sprintf("Pubkeys: %d\n", GraphBenchNumPubkeys))
 287  	file.WriteString(fmt.Sprintf("Follows per pubkey: %d-%d\n", GraphBenchMinFollows, GraphBenchMaxFollows))
 288  	file.WriteString(fmt.Sprintf("Traversal depth: %d degrees\n\n", GraphBenchTraversalDepth))
 289  
 290  	file.WriteString("[cols=\"1,^1,^1,^1,^1,^1,^1\",options=\"header\"]\n")
 291  	file.WriteString("|===\n")
 292  	file.WriteString("| Test | Events/sec | Avg Latency | P90 | P95 | P99 | Bottom 10% Avg\n")
 293  
 294  	for _, r := range results {
 295  		file.WriteString(fmt.Sprintf("| %s\n", r.TestName))
 296  		file.WriteString(fmt.Sprintf("| %.2f\n", r.EventsPerSecond))
 297  		file.WriteString(fmt.Sprintf("| %v\n", r.AvgLatency))
 298  		file.WriteString(fmt.Sprintf("| %v\n", r.P90Latency))
 299  		file.WriteString(fmt.Sprintf("| %v\n", r.P95Latency))
 300  		file.WriteString(fmt.Sprintf("| %v\n", r.P99Latency))
 301  		file.WriteString(fmt.Sprintf("| %v\n", r.Bottom10Avg))
 302  	}
 303  	file.WriteString("|===\n")
 304  
 305  	fmt.Printf("AsciiDoc report saved to: %s\n", path)
 306  }
 307  
 308  func runNetworkGraphTraversalBenchmark(config *BenchmarkConfig) {
 309  	fmt.Printf("Starting Network Graph Traversal Benchmark\n")
 310  	fmt.Printf("Relay URL: %s\n", config.RelayURL)
 311  	fmt.Printf("Workers: %d\n", config.ConcurrentWorkers)
 312  	fmt.Printf("Pubkeys: %d, Follows per pubkey: %d-%d\n",
 313  		GraphBenchNumPubkeys, GraphBenchMinFollows, GraphBenchMaxFollows)
 314  
 315  	ctx := context.Background()
 316  
 317  	// Create and run network graph traversal benchmark
 318  	netGraphBench := NewNetworkGraphTraversalBenchmark(config.RelayURL, config.ConcurrentWorkers)
 319  
 320  	if err := netGraphBench.RunSuite(ctx); err != nil {
 321  		log.Fatalf("Network graph traversal benchmark failed: %v", err)
 322  	}
 323  
 324  	// Generate reports
 325  	netGraphBench.PrintResults()
 326  	generateNetworkGraphTraversalAsciidocReport(config, netGraphBench.GetResults())
 327  }
 328  
 329  func generateNetworkGraphTraversalAsciidocReport(config *BenchmarkConfig, results []*BenchmarkResult) {
 330  	path := filepath.Join(config.DataDir, "network_graph_traversal_report.adoc")
 331  	file, err := os.Create(path)
 332  	if err != nil {
 333  		log.Printf("Failed to create report: %v", err)
 334  		return
 335  	}
 336  	defer file.Close()
 337  
 338  	file.WriteString("= Network Graph Traversal Benchmark Results\n\n")
 339  	file.WriteString(
 340  		fmt.Sprintf(
 341  			"Generated: %s\n\n", time.Now().Format(time.RFC3339),
 342  		),
 343  	)
 344  	file.WriteString(fmt.Sprintf("Relay URL: %s\n", config.RelayURL))
 345  	file.WriteString(fmt.Sprintf("Pubkeys: %d\n", GraphBenchNumPubkeys))
 346  	file.WriteString(fmt.Sprintf("Follows per pubkey: %d-%d\n", GraphBenchMinFollows, GraphBenchMaxFollows))
 347  	file.WriteString(fmt.Sprintf("Traversal depth: %d degrees\n\n", GraphBenchTraversalDepth))
 348  
 349  	file.WriteString("[cols=\"1,^1,^1,^1,^1,^1,^1\",options=\"header\"]\n")
 350  	file.WriteString("|===\n")
 351  	file.WriteString("| Test | Events/sec | Avg Latency | P90 | P95 | P99 | Bottom 10% Avg\n")
 352  
 353  	for _, r := range results {
 354  		file.WriteString(fmt.Sprintf("| %s\n", r.TestName))
 355  		file.WriteString(fmt.Sprintf("| %.2f\n", r.EventsPerSecond))
 356  		file.WriteString(fmt.Sprintf("| %v\n", r.AvgLatency))
 357  		file.WriteString(fmt.Sprintf("| %v\n", r.P90Latency))
 358  		file.WriteString(fmt.Sprintf("| %v\n", r.P95Latency))
 359  		file.WriteString(fmt.Sprintf("| %v\n", r.P99Latency))
 360  		file.WriteString(fmt.Sprintf("| %v\n", r.Bottom10Avg))
 361  	}
 362  	file.WriteString("|===\n")
 363  
 364  	fmt.Printf("AsciiDoc report saved to: %s\n", path)
 365  }
 366  
 367  func parseFlags() *BenchmarkConfig {
 368  	config := &BenchmarkConfig{}
 369  
 370  	flag.StringVar(
 371  		&config.DataDir, "datadir", "/tmp/benchmark_db", "Database directory",
 372  	)
 373  	flag.IntVar(
 374  		&config.NumEvents, "events", 10000, "Number of events to generate",
 375  	)
 376  	flag.IntVar(
 377  		&config.ConcurrentWorkers, "workers", max(2, runtime.NumCPU()/4),
 378  		"Number of concurrent workers (default: CPU cores / 4 for low CPU usage)",
 379  	)
 380  	flag.DurationVar(
 381  		&config.TestDuration, "duration", 60*time.Second, "Test duration",
 382  	)
 383  	flag.BoolVar(
 384  		&config.BurstPattern, "burst", true, "Enable burst pattern testing",
 385  	)
 386  	flag.DurationVar(
 387  		&config.ReportInterval, "report-interval", 10*time.Second,
 388  		"Report interval",
 389  	)
 390  
 391  	// Network mode flags
 392  	flag.StringVar(
 393  		&config.RelayURL, "relay-url", "",
 394  		"Relay WebSocket URL (enables network mode if set)",
 395  	)
 396  	flag.IntVar(
 397  		&config.NetWorkers, "net-workers", runtime.NumCPU(),
 398  		"Network workers (connections)",
 399  	)
 400  	flag.IntVar(&config.NetRate, "net-rate", 20, "Events per second per worker")
 401  
 402  	// Backend selection
 403  	flag.BoolVar(
 404  		&config.UseNeo4j, "neo4j", false,
 405  		"Use Neo4j backend (requires Docker)",
 406  	)
 407  	flag.BoolVar(
 408  		&config.UseRelySQLite, "relysqlite", false,
 409  		"Use rely-sqlite backend",
 410  	)
 411  
 412  	// Graph traversal benchmark
 413  	flag.BoolVar(
 414  		&config.UseGraphTraversal, "graph", false,
 415  		"Run graph traversal benchmark (100k pubkeys, 3-degree follows)",
 416  	)
 417  	flag.BoolVar(
 418  		&config.UseNetworkGraphTraversal, "graph-network", false,
 419  		"Run network graph traversal benchmark against relay specified by -relay-url",
 420  	)
 421  	flag.BoolVar(
 422  		&config.UsePPGComparison, "ppg", false,
 423  		"Run ppg/gpp vs baseline comparison benchmark (requires -graph to seed data)",
 424  	)
 425  	flag.IntVar(
 426  		&GraphBenchNumPubkeys, "graph-pubkeys", 10000,
 427  		"Number of pubkeys for graph benchmark (default 10000)",
 428  	)
 429  
 430  	flag.Parse()
 431  	return config
 432  }
 433  
 434  func runNetworkLoad(cfg *BenchmarkConfig) {
 435  	fmt.Printf(
 436  		"Network mode: relay=%s workers=%d rate=%d ev/s per worker duration=%s\n",
 437  		cfg.RelayURL, cfg.NetWorkers, cfg.NetRate, cfg.TestDuration,
 438  	)
 439  	// Create a timeout context for benchmark control only, not for connections
 440  	timeoutCtx, cancel := context.WithTimeout(
 441  		context.Background(), cfg.TestDuration,
 442  	)
 443  	defer cancel()
 444  
 445  	// Use a separate background context for relay connections to avoid
 446  	// cancelling the server when the benchmark timeout expires
 447  	connCtx := context.Background()
 448  
 449  	var wg sync.WaitGroup
 450  	if cfg.NetWorkers <= 0 {
 451  		cfg.NetWorkers = 1
 452  	}
 453  	if cfg.NetRate <= 0 {
 454  		cfg.NetRate = 1
 455  	}
 456  	for i := 0; i < cfg.NetWorkers; i++ {
 457  		wg.Add(1)
 458  		go func(workerID int) {
 459  			defer wg.Done()
 460  			// Connect to relay using non-cancellable context
 461  			rl, err := ws.RelayConnect(connCtx, cfg.RelayURL)
 462  			if err != nil {
 463  				fmt.Printf(
 464  					"worker %d: failed to connect to %s: %v\n", workerID,
 465  					cfg.RelayURL, err,
 466  				)
 467  				return
 468  			}
 469  			defer rl.Close()
 470  			fmt.Printf("worker %d: connected to %s\n", workerID, cfg.RelayURL)
 471  
 472  			// Signer for this worker
 473  			var keys *p8k.Signer
 474  			if keys, err = p8k.New(); err != nil {
 475  				fmt.Printf("worker %d: signer create failed: %v\n", workerID, err)
 476  				return
 477  			}
 478  			if err := keys.Generate(); err != nil {
 479  				fmt.Printf("worker %d: keygen failed: %v\n", workerID, err)
 480  				return
 481  			}
 482  
 483  			// Start a concurrent subscriber that listens for events published by this worker
 484  			// Build a filter that matches this worker's pubkey and kind=1, since now
 485  			since := time.Now().Unix()
 486  			go func() {
 487  				f := filter.New()
 488  				f.Kinds = kind.NewS(kind.TextNote)
 489  				f.Authors = tag.NewWithCap(1)
 490  				f.Authors.T = append(f.Authors.T, keys.Pub())
 491  				f.Since = timestamp.FromUnix(since)
 492  				sub, err := rl.Subscribe(connCtx, filter.NewS(f))
 493  				if err != nil {
 494  					fmt.Printf(
 495  						"worker %d: subscribe error: %v\n", workerID, err,
 496  					)
 497  					return
 498  				}
 499  				defer sub.Unsub()
 500  				recv := 0
 501  				for {
 502  					select {
 503  					case <-timeoutCtx.Done():
 504  						fmt.Printf(
 505  							"worker %d: subscriber exiting after %d events (benchmark timeout: %v)\n",
 506  							workerID, recv, timeoutCtx.Err(),
 507  						)
 508  						return
 509  					case <-rl.Context().Done():
 510  						fmt.Printf(
 511  							"worker %d: relay connection closed; cause=%v lastErr=%v\n",
 512  							workerID, rl.ConnectionCause(), rl.LastError(),
 513  						)
 514  						return
 515  					case <-sub.EndOfStoredEvents:
 516  						// continue streaming live events
 517  					case ev := <-sub.Events:
 518  						if ev == nil {
 519  							continue
 520  						}
 521  						recv++
 522  						if recv%100 == 0 {
 523  							fmt.Printf(
 524  								"worker %d: received %d matching events\n",
 525  								workerID, recv,
 526  							)
 527  						}
 528  						ev.Free()
 529  					}
 530  				}
 531  			}()
 532  
 533  			interval := time.Second / time.Duration(cfg.NetRate)
 534  			ticker := time.NewTicker(interval)
 535  			defer ticker.Stop()
 536  			count := 0
 537  			for {
 538  				select {
 539  				case <-timeoutCtx.Done():
 540  					fmt.Printf(
 541  						"worker %d: stopping after %d publishes\n", workerID,
 542  						count,
 543  					)
 544  					return
 545  				case <-ticker.C:
 546  					// Build and sign a simple text note event
 547  					ev := event.New()
 548  					ev.Kind = uint16(1)
 549  					ev.CreatedAt = time.Now().Unix()
 550  					ev.Tags = tag.NewS()
 551  					ev.Content = []byte(fmt.Sprintf(
 552  						"bench worker=%d n=%d", workerID, count,
 553  					))
 554  					if err := ev.Sign(keys); err != nil {
 555  						fmt.Printf("worker %d: sign error: %v\n", workerID, err)
 556  						ev.Free()
 557  						continue
 558  					}
 559  					// Async publish: don't wait for OK; this greatly increases throughput
 560  					ch := rl.Write(eventenvelope.NewSubmissionWith(ev).Marshal(nil))
 561  					// Non-blocking error check
 562  					select {
 563  					case err := <-ch:
 564  						if err != nil {
 565  							fmt.Printf(
 566  								"worker %d: write error: %v\n", workerID, err,
 567  							)
 568  						}
 569  					default:
 570  					}
 571  					if count%100 == 0 {
 572  						fmt.Printf(
 573  							"worker %d: sent %d events\n", workerID, count,
 574  						)
 575  					}
 576  					ev.Free()
 577  					count++
 578  				}
 579  			}
 580  		}(i)
 581  	}
 582  	wg.Wait()
 583  }
 584  
 585  func NewBenchmark(config *BenchmarkConfig) *Benchmark {
 586  	// Clean up existing data directory
 587  	os.RemoveAll(config.DataDir)
 588  
 589  	ctx := context.Background()
 590  	cancel := func() {}
 591  
 592  	db, err := database.New(ctx, cancel, config.DataDir, "warn")
 593  	if err != nil {
 594  		log.Fatalf("Failed to create database: %v", err)
 595  	}
 596  
 597  	b := &Benchmark{
 598  		config:  config,
 599  		db:      db,
 600  		results: make([]*BenchmarkResult, 0),
 601  	}
 602  
 603  	// Trigger compaction/GC before starting tests
 604  	b.compactDatabase()
 605  
 606  	return b
 607  }
 608  
 609  func (b *Benchmark) Close() {
 610  	if b.db != nil {
 611  		b.db.Close()
 612  	}
 613  }
 614  
 615  // RunSuite runs the full benchmark test suite
 616  func (b *Benchmark) RunSuite() {
 617  	fmt.Println("\n╔════════════════════════════════════════════════════════╗")
 618  	fmt.Println("║         BADGER BACKEND BENCHMARK SUITE                 ║")
 619  	fmt.Println("╚════════════════════════════════════════════════════════╝")
 620  
 621  	fmt.Printf("\n=== Starting Badger benchmark ===\n")
 622  
 623  	fmt.Printf("RunPeakThroughputTest (Badger)..\n")
 624  	b.RunPeakThroughputTest()
 625  	fmt.Println("Wiping database between tests...")
 626  	b.db.Wipe()
 627  	time.Sleep(10 * time.Second)
 628  
 629  	fmt.Printf("RunBurstPatternTest (Badger)..\n")
 630  	b.RunBurstPatternTest()
 631  	fmt.Println("Wiping database between tests...")
 632  	b.db.Wipe()
 633  	time.Sleep(10 * time.Second)
 634  
 635  	fmt.Printf("RunMixedReadWriteTest (Badger)..\n")
 636  	b.RunMixedReadWriteTest()
 637  	fmt.Println("Wiping database between tests...")
 638  	b.db.Wipe()
 639  	time.Sleep(10 * time.Second)
 640  
 641  	fmt.Printf("RunQueryTest (Badger)..\n")
 642  	b.RunQueryTest()
 643  	fmt.Println("Wiping database between tests...")
 644  	b.db.Wipe()
 645  	time.Sleep(10 * time.Second)
 646  
 647  	fmt.Printf("RunConcurrentQueryStoreTest (Badger)..\n")
 648  	b.RunConcurrentQueryStoreTest()
 649  
 650  	fmt.Printf("\n=== Badger benchmark completed ===\n\n")
 651  }
 652  
 653  // compactDatabase triggers a Badger value log GC before starting tests.
 654  func (b *Benchmark) compactDatabase() {
 655  	if b.db == nil || b.db.DB == nil {
 656  		return
 657  	}
 658  	// Attempt value log GC. Ignore errors; this is best-effort.
 659  	_ = b.db.DB.RunValueLogGC(0.5)
 660  }
 661  
 662  func (b *Benchmark) RunPeakThroughputTest() {
 663  	fmt.Println("\n=== Peak Throughput Test ===")
 664  
 665  	// Create latency recorder (writes to disk, not memory)
 666  	latencyRecorder, err := NewLatencyRecorder(b.config.DataDir, "peak_throughput")
 667  	if err != nil {
 668  		log.Fatalf("Failed to create latency recorder: %v", err)
 669  	}
 670  
 671  	start := time.Now()
 672  	var wg sync.WaitGroup
 673  	var totalEvents int64
 674  	var errorCount int64
 675  	var mu sync.Mutex
 676  
 677  	// Stream events from memory (real-world sample events)
 678  	eventChan, errChan := b.getEventChannel(b.config.NumEvents, 1000)
 679  
 680  	// Calculate per-worker rate: 20k events/sec total divided by worker count
 681  	// This prevents all workers from synchronizing and hitting DB simultaneously
 682  	perWorkerRate := 20000.0 / float64(b.config.ConcurrentWorkers)
 683  
 684  	// Start workers with rate limiting
 685  	ctx := context.Background()
 686  
 687  	for i := 0; i < b.config.ConcurrentWorkers; i++ {
 688  		wg.Add(1)
 689  		go func(workerID int) {
 690  			defer wg.Done()
 691  
 692  			// Each worker gets its own rate limiter to avoid mutex contention
 693  			workerLimiter := NewRateLimiter(perWorkerRate)
 694  
 695  			for ev := range eventChan {
 696  				// Wait for rate limiter to allow this event
 697  				workerLimiter.Wait()
 698  
 699  				eventStart := time.Now()
 700  				_, err := b.db.SaveEvent(ctx, ev)
 701  				latency := time.Since(eventStart)
 702  
 703  				mu.Lock()
 704  				if err != nil {
 705  					errorCount++
 706  				} else {
 707  					totalEvents++
 708  					if err := latencyRecorder.Record(latency); err != nil {
 709  						log.Printf("Failed to record latency: %v", err)
 710  					}
 711  				}
 712  				mu.Unlock()
 713  			}
 714  		}(i)
 715  	}
 716  
 717  	// Check for streaming errors
 718  	go func() {
 719  		for err := range errChan {
 720  			if err != nil {
 721  				log.Printf("Event stream error: %v", err)
 722  			}
 723  		}
 724  	}()
 725  
 726  	wg.Wait()
 727  	duration := time.Since(start)
 728  
 729  	// Flush latency data to disk before calculating stats
 730  	if err := latencyRecorder.Close(); err != nil {
 731  		log.Printf("Failed to close latency recorder: %v", err)
 732  	}
 733  
 734  	// Calculate statistics from disk
 735  	latencyStats, err := latencyRecorder.CalculateStats()
 736  	if err != nil {
 737  		log.Printf("Failed to calculate latency stats: %v", err)
 738  		latencyStats = &LatencyStats{}
 739  	}
 740  
 741  	// Calculate metrics
 742  	result := &BenchmarkResult{
 743  		TestName:          "Peak Throughput",
 744  		Duration:          duration,
 745  		TotalEvents:       int(totalEvents),
 746  		EventsPerSecond:   float64(totalEvents) / duration.Seconds(),
 747  		ConcurrentWorkers: b.config.ConcurrentWorkers,
 748  		MemoryUsed:        getMemUsage(),
 749  		AvgLatency:        latencyStats.Avg,
 750  		P90Latency:        latencyStats.P90,
 751  		P95Latency:        latencyStats.P95,
 752  		P99Latency:        latencyStats.P99,
 753  		Bottom10Avg:       latencyStats.Bottom10,
 754  	}
 755  
 756  	result.SuccessRate = float64(totalEvents) / float64(b.config.NumEvents) * 100
 757  
 758  	b.mu.Lock()
 759  	b.results = append(b.results, result)
 760  	b.mu.Unlock()
 761  
 762  	fmt.Printf(
 763  		"Events saved: %d/%d (%.1f%%), errors: %d\n",
 764  		totalEvents, b.config.NumEvents, result.SuccessRate, errorCount,
 765  	)
 766  	fmt.Printf("Duration: %v\n", duration)
 767  	fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
 768  	fmt.Printf("Avg latency: %v\n", result.AvgLatency)
 769  	fmt.Printf("P90 latency: %v\n", result.P90Latency)
 770  	fmt.Printf("P95 latency: %v\n", result.P95Latency)
 771  	fmt.Printf("P99 latency: %v\n", result.P99Latency)
 772  	fmt.Printf("Bottom 10%% Avg latency: %v\n", result.Bottom10Avg)
 773  }
 774  
 775  func (b *Benchmark) RunBurstPatternTest() {
 776  	fmt.Println("\n=== Burst Pattern Test ===")
 777  
 778  	// Create latency recorder (writes to disk, not memory)
 779  	latencyRecorder, err := NewLatencyRecorder(b.config.DataDir, "burst_pattern")
 780  	if err != nil {
 781  		log.Fatalf("Failed to create latency recorder: %v", err)
 782  	}
 783  
 784  	start := time.Now()
 785  	var totalEvents int64
 786  	var errorCount int64
 787  	var mu sync.Mutex
 788  
 789  	// Stream events from memory (real-world sample events)
 790  	eventChan, errChan := b.getEventChannel(b.config.NumEvents, 500)
 791  
 792  	// Check for streaming errors
 793  	go func() {
 794  		for err := range errChan {
 795  			if err != nil {
 796  				log.Printf("Event stream error: %v", err)
 797  			}
 798  		}
 799  	}()
 800  
 801  	// Simulate burst pattern: high activity periods followed by quiet periods
 802  	burstSize := b.config.NumEvents / 10 // 10% of events in each burst
 803  	quietPeriod := 500 * time.Millisecond
 804  	burstPeriod := 100 * time.Millisecond
 805  
 806  	ctx := context.Background()
 807  	var eventIndex int64
 808  
 809  	// Start persistent worker pool (prevents goroutine explosion)
 810  	numWorkers := b.config.ConcurrentWorkers
 811  	eventQueue := make(chan *event.E, numWorkers*4)
 812  	var wg sync.WaitGroup
 813  
 814  	// Calculate per-worker rate to avoid mutex contention
 815  	perWorkerRate := 20000.0 / float64(numWorkers)
 816  
 817  	for w := 0; w < numWorkers; w++ {
 818  		wg.Add(1)
 819  		go func() {
 820  			defer wg.Done()
 821  
 822  			// Each worker gets its own rate limiter
 823  			workerLimiter := NewRateLimiter(perWorkerRate)
 824  
 825  			for ev := range eventQueue {
 826  				// Wait for rate limiter to allow this event
 827  				workerLimiter.Wait()
 828  
 829  				eventStart := time.Now()
 830  				_, err := b.db.SaveEvent(ctx, ev)
 831  				latency := time.Since(eventStart)
 832  
 833  				mu.Lock()
 834  				if err != nil {
 835  					errorCount++
 836  				} else {
 837  					totalEvents++
 838  					// Record latency to disk instead of keeping in memory
 839  					if err := latencyRecorder.Record(latency); err != nil {
 840  						log.Printf("Failed to record latency: %v", err)
 841  					}
 842  				}
 843  				mu.Unlock()
 844  			}
 845  		}()
 846  	}
 847  
 848  	for int(eventIndex) < b.config.NumEvents && time.Since(start) < b.config.TestDuration {
 849  		// Burst period - send events rapidly
 850  		burstStart := time.Now()
 851  
 852  		for i := 0; i < burstSize && int(eventIndex) < b.config.NumEvents; i++ {
 853  			ev, ok := <-eventChan
 854  			if !ok {
 855  				break
 856  			}
 857  			eventQueue <- ev
 858  			eventIndex++
 859  			time.Sleep(burstPeriod / time.Duration(burstSize))
 860  		}
 861  
 862  		fmt.Printf(
 863  			"Burst completed: %d events in %v\n", burstSize,
 864  			time.Since(burstStart),
 865  		)
 866  
 867  		// Quiet period
 868  		time.Sleep(quietPeriod)
 869  	}
 870  
 871  	close(eventQueue)
 872  	wg.Wait()
 873  
 874  	duration := time.Since(start)
 875  
 876  	// Flush latency data to disk before calculating stats
 877  	if err := latencyRecorder.Close(); err != nil {
 878  		log.Printf("Failed to close latency recorder: %v", err)
 879  	}
 880  
 881  	// Calculate statistics from disk
 882  	latencyStats, err := latencyRecorder.CalculateStats()
 883  	if err != nil {
 884  		log.Printf("Failed to calculate latency stats: %v", err)
 885  		latencyStats = &LatencyStats{}
 886  	}
 887  
 888  	// Calculate metrics
 889  	result := &BenchmarkResult{
 890  		TestName:          "Burst Pattern",
 891  		Duration:          duration,
 892  		TotalEvents:       int(totalEvents),
 893  		EventsPerSecond:   float64(totalEvents) / duration.Seconds(),
 894  		ConcurrentWorkers: b.config.ConcurrentWorkers,
 895  		MemoryUsed:        getMemUsage(),
 896  		AvgLatency:        latencyStats.Avg,
 897  		P90Latency:        latencyStats.P90,
 898  		P95Latency:        latencyStats.P95,
 899  		P99Latency:        latencyStats.P99,
 900  		Bottom10Avg:       latencyStats.Bottom10,
 901  	}
 902  
 903  	result.SuccessRate = float64(totalEvents) / float64(eventIndex) * 100
 904  
 905  	b.mu.Lock()
 906  	b.results = append(b.results, result)
 907  	b.mu.Unlock()
 908  
 909  	fmt.Printf(
 910  		"Burst test completed: %d events in %v, errors: %d\n",
 911  		totalEvents, duration, errorCount,
 912  	)
 913  	fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
 914  }
 915  
 916  func (b *Benchmark) RunMixedReadWriteTest() {
 917  	fmt.Println("\n=== Mixed Read/Write Test ===")
 918  
 919  	start := time.Now()
 920  	var totalWrites, totalReads int64
 921  	var writeLatencies, readLatencies []time.Duration
 922  	var errors []error
 923  	var mu sync.Mutex
 924  
 925  	// Pre-populate with some events for reading
 926  	seedEvents := b.generateEvents(1000)
 927  	ctx := context.Background()
 928  
 929  	fmt.Println("Pre-populating database for read tests...")
 930  	for _, ev := range seedEvents {
 931  		b.db.SaveEvent(ctx, ev)
 932  	}
 933  
 934  	events := b.generateEvents(b.config.NumEvents)
 935  	var wg sync.WaitGroup
 936  
 937  	// Calculate per-worker rate to avoid mutex contention
 938  	perWorkerRate := 20000.0 / float64(b.config.ConcurrentWorkers)
 939  
 940  	// Start mixed read/write workers
 941  	for i := 0; i < b.config.ConcurrentWorkers; i++ {
 942  		wg.Add(1)
 943  		go func(workerID int) {
 944  			defer wg.Done()
 945  
 946  			// Each worker gets its own rate limiter
 947  			workerLimiter := NewRateLimiter(perWorkerRate)
 948  
 949  			eventIndex := workerID
 950  			for time.Since(start) < b.config.TestDuration && eventIndex < len(events) {
 951  				// Alternate between write and read operations
 952  				if eventIndex%2 == 0 {
 953  					// Write operation - apply rate limiting
 954  					workerLimiter.Wait()
 955  
 956  					writeStart := time.Now()
 957  					_, err := b.db.SaveEvent(ctx, events[eventIndex])
 958  					writeLatency := time.Since(writeStart)
 959  
 960  					mu.Lock()
 961  					if err != nil {
 962  						errors = append(errors, err)
 963  					} else {
 964  						totalWrites++
 965  						writeLatencies = append(writeLatencies, writeLatency)
 966  					}
 967  					mu.Unlock()
 968  				} else {
 969  					// Read operation
 970  					readStart := time.Now()
 971  					f := filter.New()
 972  					f.Kinds = kind.NewS(kind.TextNote)
 973  					limit := uint(10)
 974  					f.Limit = &limit
 975  					_, err := b.db.GetSerialsFromFilter(f)
 976  					readLatency := time.Since(readStart)
 977  
 978  					mu.Lock()
 979  					if err != nil {
 980  						errors = append(errors, err)
 981  					} else {
 982  						totalReads++
 983  						readLatencies = append(readLatencies, readLatency)
 984  					}
 985  					mu.Unlock()
 986  				}
 987  
 988  				eventIndex += b.config.ConcurrentWorkers
 989  				time.Sleep(10 * time.Millisecond) // Small delay between operations
 990  			}
 991  		}(i)
 992  	}
 993  
 994  	wg.Wait()
 995  	duration := time.Since(start)
 996  
 997  	// Calculate metrics
 998  	result := &BenchmarkResult{
 999  		TestName:          "Mixed Read/Write",
1000  		Duration:          duration,
1001  		TotalEvents:       int(totalWrites + totalReads),
1002  		EventsPerSecond:   float64(totalWrites+totalReads) / duration.Seconds(),
1003  		ConcurrentWorkers: b.config.ConcurrentWorkers,
1004  		MemoryUsed:        getMemUsage(),
1005  	}
1006  
1007  	// Calculate combined latencies for overall metrics
1008  	allLatencies := append(writeLatencies, readLatencies...)
1009  	if len(allLatencies) > 0 {
1010  		result.AvgLatency = calculateAvgLatency(allLatencies)
1011  		result.P90Latency = calculatePercentileLatency(allLatencies, 0.90)
1012  		result.P95Latency = calculatePercentileLatency(allLatencies, 0.95)
1013  		result.P99Latency = calculatePercentileLatency(allLatencies, 0.99)
1014  		result.Bottom10Avg = calculateBottom10Avg(allLatencies)
1015  	}
1016  
1017  	result.SuccessRate = float64(totalWrites+totalReads) / float64(len(events)) * 100
1018  
1019  	for _, err := range errors {
1020  		result.Errors = append(result.Errors, err.Error())
1021  	}
1022  
1023  	b.mu.Lock()
1024  	b.results = append(b.results, result)
1025  	b.mu.Unlock()
1026  
1027  	fmt.Printf(
1028  		"Mixed test completed: %d writes, %d reads in %v\n", totalWrites,
1029  		totalReads, duration,
1030  	)
1031  	fmt.Printf("Combined ops/sec: %.2f\n", result.EventsPerSecond)
1032  }
1033  
1034  // RunQueryTest specifically benchmarks the QueryEvents function performance
1035  func (b *Benchmark) RunQueryTest() {
1036  	fmt.Println("\n=== Query Test ===")
1037  
1038  	start := time.Now()
1039  	var totalQueries int64
1040  	var queryLatencies []time.Duration
1041  	var errors []error
1042  	var mu sync.Mutex
1043  
1044  	// Pre-populate with events for querying
1045  	numSeedEvents := 10000
1046  	seedEvents := b.generateEvents(numSeedEvents)
1047  	ctx := context.Background()
1048  
1049  	fmt.Printf(
1050  		"Pre-populating database with %d events for query tests...\n",
1051  		numSeedEvents,
1052  	)
1053  	for _, ev := range seedEvents {
1054  		b.db.SaveEvent(ctx, ev)
1055  	}
1056  
1057  	// Create different types of filters for querying
1058  	filters := []*filter.F{
1059  		func() *filter.F { // Kind filter
1060  			f := filter.New()
1061  			f.Kinds = kind.NewS(kind.TextNote)
1062  			limit := uint(100)
1063  			f.Limit = &limit
1064  			return f
1065  		}(),
1066  		func() *filter.F { // Tag filter
1067  			f := filter.New()
1068  			f.Tags = tag.NewS(
1069  				tag.NewFromBytesSlice(
1070  					[]byte("t"), []byte("benchmark"),
1071  				),
1072  			)
1073  			limit := uint(100)
1074  			f.Limit = &limit
1075  			return f
1076  		}(),
1077  		func() *filter.F { // Mixed filter
1078  			f := filter.New()
1079  			f.Kinds = kind.NewS(kind.TextNote)
1080  			f.Tags = tag.NewS(
1081  				tag.NewFromBytesSlice(
1082  					[]byte("t"), []byte("benchmark"),
1083  				),
1084  			)
1085  			limit := uint(50)
1086  			f.Limit = &limit
1087  			return f
1088  		}(),
1089  	}
1090  
1091  	var wg sync.WaitGroup
1092  	// Start query workers
1093  	for i := 0; i < b.config.ConcurrentWorkers; i++ {
1094  		wg.Add(1)
1095  		go func(workerID int) {
1096  			defer wg.Done()
1097  
1098  			filterIndex := workerID % len(filters)
1099  			queryCount := 0
1100  
1101  			for time.Since(start) < b.config.TestDuration {
1102  				// Rotate through different filters
1103  				f := filters[filterIndex]
1104  				filterIndex = (filterIndex + 1) % len(filters)
1105  
1106  				// Execute query
1107  				queryStart := time.Now()
1108  				events, err := b.db.QueryEvents(ctx, f)
1109  				queryLatency := time.Since(queryStart)
1110  
1111  				mu.Lock()
1112  				if err != nil {
1113  					errors = append(errors, err)
1114  				} else {
1115  					totalQueries++
1116  					queryLatencies = append(queryLatencies, queryLatency)
1117  
1118  					// Free event memory
1119  					for _, ev := range events {
1120  						ev.Free()
1121  					}
1122  				}
1123  				mu.Unlock()
1124  
1125  				queryCount++
1126  				// Always add delay to prevent CPU saturation (queries are CPU-intensive)
1127  				time.Sleep(1 * time.Millisecond)
1128  			}
1129  		}(i)
1130  	}
1131  
1132  	wg.Wait()
1133  	duration := time.Since(start)
1134  
1135  	// Calculate metrics
1136  	result := &BenchmarkResult{
1137  		TestName:          "Query Performance",
1138  		Duration:          duration,
1139  		TotalEvents:       int(totalQueries),
1140  		EventsPerSecond:   float64(totalQueries) / duration.Seconds(),
1141  		ConcurrentWorkers: b.config.ConcurrentWorkers,
1142  		MemoryUsed:        getMemUsage(),
1143  	}
1144  
1145  	if len(queryLatencies) > 0 {
1146  		result.AvgLatency = calculateAvgLatency(queryLatencies)
1147  		result.P90Latency = calculatePercentileLatency(queryLatencies, 0.90)
1148  		result.P95Latency = calculatePercentileLatency(queryLatencies, 0.95)
1149  		result.P99Latency = calculatePercentileLatency(queryLatencies, 0.99)
1150  		result.Bottom10Avg = calculateBottom10Avg(queryLatencies)
1151  	}
1152  
1153  	result.SuccessRate = 100.0 // No specific target count for queries
1154  
1155  	for _, err := range errors {
1156  		result.Errors = append(result.Errors, err.Error())
1157  	}
1158  
1159  	b.mu.Lock()
1160  	b.results = append(b.results, result)
1161  	b.mu.Unlock()
1162  
1163  	fmt.Printf(
1164  		"Query test completed: %d queries in %v\n", totalQueries, duration,
1165  	)
1166  	fmt.Printf("Queries/sec: %.2f\n", result.EventsPerSecond)
1167  	fmt.Printf("Avg query latency: %v\n", result.AvgLatency)
1168  	fmt.Printf("P95 query latency: %v\n", result.P95Latency)
1169  	fmt.Printf("P99 query latency: %v\n", result.P99Latency)
1170  }
1171  
1172  // RunConcurrentQueryStoreTest benchmarks the performance of concurrent query and store operations
1173  func (b *Benchmark) RunConcurrentQueryStoreTest() {
1174  	fmt.Println("\n=== Concurrent Query/Store Test ===")
1175  
1176  	start := time.Now()
1177  	var totalQueries, totalWrites int64
1178  	var queryLatencies, writeLatencies []time.Duration
1179  	var errors []error
1180  	var mu sync.Mutex
1181  
1182  	// Pre-populate with some events
1183  	numSeedEvents := 5000
1184  	seedEvents := b.generateEvents(numSeedEvents)
1185  	ctx := context.Background()
1186  
1187  	fmt.Printf(
1188  		"Pre-populating database with %d events for concurrent query/store test...\n",
1189  		numSeedEvents,
1190  	)
1191  	for _, ev := range seedEvents {
1192  		b.db.SaveEvent(ctx, ev)
1193  	}
1194  
1195  	// Generate events for writing during the test
1196  	writeEvents := b.generateEvents(b.config.NumEvents)
1197  
1198  	// Create filters for querying
1199  	filters := []*filter.F{
1200  		func() *filter.F { // Recent events filter
1201  			f := filter.New()
1202  			f.Since = timestamp.FromUnix(time.Now().Add(-10 * time.Minute).Unix())
1203  			limit := uint(100)
1204  			f.Limit = &limit
1205  			return f
1206  		}(),
1207  		func() *filter.F { // Kind and tag filter
1208  			f := filter.New()
1209  			f.Kinds = kind.NewS(kind.TextNote)
1210  			f.Tags = tag.NewS(
1211  				tag.NewFromBytesSlice(
1212  					[]byte("t"), []byte("benchmark"),
1213  				),
1214  			)
1215  			limit := uint(50)
1216  			f.Limit = &limit
1217  			return f
1218  		}(),
1219  	}
1220  
1221  	var wg sync.WaitGroup
1222  
1223  	// Half of the workers will be readers, half will be writers
1224  	numReaders := b.config.ConcurrentWorkers / 2
1225  	numWriters := b.config.ConcurrentWorkers - numReaders
1226  
1227  	// Calculate per-worker write rate to avoid mutex contention
1228  	perWorkerRate := 20000.0 / float64(numWriters)
1229  
1230  	// Start query workers (readers)
1231  	for i := 0; i < numReaders; i++ {
1232  		wg.Add(1)
1233  		go func(workerID int) {
1234  			defer wg.Done()
1235  
1236  			filterIndex := workerID % len(filters)
1237  			queryCount := 0
1238  
1239  			for time.Since(start) < b.config.TestDuration {
1240  				// Select a filter
1241  				f := filters[filterIndex]
1242  				filterIndex = (filterIndex + 1) % len(filters)
1243  
1244  				// Execute query
1245  				queryStart := time.Now()
1246  				events, err := b.db.QueryEvents(ctx, f)
1247  				queryLatency := time.Since(queryStart)
1248  
1249  				mu.Lock()
1250  				if err != nil {
1251  					errors = append(errors, err)
1252  				} else {
1253  					totalQueries++
1254  					queryLatencies = append(queryLatencies, queryLatency)
1255  
1256  					// Free event memory
1257  					for _, ev := range events {
1258  						ev.Free()
1259  					}
1260  				}
1261  				mu.Unlock()
1262  
1263  				queryCount++
1264  				// Always add delay to prevent CPU saturation (queries are CPU-intensive)
1265  				time.Sleep(1 * time.Millisecond)
1266  			}
1267  		}(i)
1268  	}
1269  
1270  	// Start write workers
1271  	for i := 0; i < numWriters; i++ {
1272  		wg.Add(1)
1273  		go func(workerID int) {
1274  			defer wg.Done()
1275  
1276  			// Each worker gets its own rate limiter
1277  			workerLimiter := NewRateLimiter(perWorkerRate)
1278  
1279  			eventIndex := workerID
1280  			writeCount := 0
1281  
1282  			for time.Since(start) < b.config.TestDuration && eventIndex < len(writeEvents) {
1283  				// Write operation - apply rate limiting
1284  				workerLimiter.Wait()
1285  
1286  				writeStart := time.Now()
1287  				_, err := b.db.SaveEvent(ctx, writeEvents[eventIndex])
1288  				writeLatency := time.Since(writeStart)
1289  
1290  				mu.Lock()
1291  				if err != nil {
1292  					errors = append(errors, err)
1293  				} else {
1294  					totalWrites++
1295  					writeLatencies = append(writeLatencies, writeLatency)
1296  				}
1297  				mu.Unlock()
1298  
1299  				eventIndex += numWriters
1300  				writeCount++
1301  			}
1302  		}(i)
1303  	}
1304  
1305  	wg.Wait()
1306  	duration := time.Since(start)
1307  
1308  	// Calculate metrics
1309  	totalOps := totalQueries + totalWrites
1310  	result := &BenchmarkResult{
1311  		TestName:          "Concurrent Query/Store",
1312  		Duration:          duration,
1313  		TotalEvents:       int(totalOps),
1314  		EventsPerSecond:   float64(totalOps) / duration.Seconds(),
1315  		ConcurrentWorkers: b.config.ConcurrentWorkers,
1316  		MemoryUsed:        getMemUsage(),
1317  	}
1318  
1319  	// Calculate combined latencies for overall metrics
1320  	allLatencies := append(queryLatencies, writeLatencies...)
1321  	if len(allLatencies) > 0 {
1322  		result.AvgLatency = calculateAvgLatency(allLatencies)
1323  		result.P90Latency = calculatePercentileLatency(allLatencies, 0.90)
1324  		result.P95Latency = calculatePercentileLatency(allLatencies, 0.95)
1325  		result.P99Latency = calculatePercentileLatency(allLatencies, 0.99)
1326  		result.Bottom10Avg = calculateBottom10Avg(allLatencies)
1327  	}
1328  
1329  	result.SuccessRate = 100.0 // No specific target
1330  
1331  	for _, err := range errors {
1332  		result.Errors = append(result.Errors, err.Error())
1333  	}
1334  
1335  	b.mu.Lock()
1336  	b.results = append(b.results, result)
1337  	b.mu.Unlock()
1338  
1339  	// Calculate separate metrics for queries and writes
1340  	var queryAvg, writeAvg time.Duration
1341  	if len(queryLatencies) > 0 {
1342  		queryAvg = calculateAvgLatency(queryLatencies)
1343  	}
1344  	if len(writeLatencies) > 0 {
1345  		writeAvg = calculateAvgLatency(writeLatencies)
1346  	}
1347  
1348  	fmt.Printf(
1349  		"Concurrent test completed: %d operations (%d queries, %d writes) in %v\n",
1350  		totalOps, totalQueries, totalWrites, duration,
1351  	)
1352  	fmt.Printf("Operations/sec: %.2f\n", result.EventsPerSecond)
1353  	fmt.Printf("Avg latency: %v\n", result.AvgLatency)
1354  	fmt.Printf("Avg query latency: %v\n", queryAvg)
1355  	fmt.Printf("Avg write latency: %v\n", writeAvg)
1356  	fmt.Printf("P95 latency: %v\n", result.P95Latency)
1357  	fmt.Printf("P99 latency: %v\n", result.P99Latency)
1358  }
1359  
1360  func (b *Benchmark) generateEvents(count int) []*event.E {
1361  	fmt.Printf("Generating %d unique synthetic events (minimum 300 bytes each)...\n", count)
1362  
1363  	// Create a single signer for all events (reusing key is faster)
1364  	signer := p8k.MustNew()
1365  	if err := signer.Generate(); err != nil {
1366  		log.Fatalf("Failed to generate keypair: %v", err)
1367  	}
1368  
1369  	// Base timestamp - start from current time and increment
1370  	baseTime := time.Now().Unix()
1371  
1372  	// Minimum content size
1373  	const minContentSize = 300
1374  
1375  	// Base content template
1376  	baseContent := "This is a benchmark test event with realistic content size. "
1377  
1378  	// Pre-calculate how much padding we need
1379  	paddingNeeded := minContentSize - len(baseContent)
1380  	if paddingNeeded < 0 {
1381  		paddingNeeded = 0
1382  	}
1383  
1384  	// Create padding string (with varied characters for realistic size)
1385  	padding := make([]byte, paddingNeeded)
1386  	for i := range padding {
1387  		padding[i] = ' ' + byte(i%94) // Printable ASCII characters
1388  	}
1389  
1390  	events := make([]*event.E, count)
1391  	for i := 0; i < count; i++ {
1392  		ev := event.New()
1393  		ev.Kind = kind.TextNote.K
1394  		ev.CreatedAt = baseTime + int64(i) // Unique timestamp for each event
1395  		ev.Tags = tag.NewS()
1396  
1397  		// Create content with unique identifier and padding
1398  		ev.Content = []byte(fmt.Sprintf("%s Event #%d. %s", baseContent, i, string(padding)))
1399  
1400  		// Sign the event (this calculates ID and Sig)
1401  		if err := ev.Sign(signer); err != nil {
1402  			log.Fatalf("Failed to sign event %d: %v", i, err)
1403  		}
1404  
1405  		events[i] = ev
1406  	}
1407  
1408  	// Print stats
1409  	totalSize := int64(0)
1410  	for _, ev := range events {
1411  		totalSize += int64(len(ev.Content))
1412  	}
1413  	avgSize := totalSize / int64(count)
1414  
1415  	fmt.Printf("Generated %d events:\n", count)
1416  	fmt.Printf("  Average content size: %d bytes\n", avgSize)
1417  	fmt.Printf("  All events are unique (incremental timestamps)\n")
1418  	fmt.Printf("  All events are properly signed\n\n")
1419  
1420  	return events
1421  }
1422  
1423  // printEventStats prints statistics about the loaded real-world events
1424  func (b *Benchmark) printEventStats() {
1425  	if len(b.cachedEvents) == 0 {
1426  		return
1427  	}
1428  
1429  	// Analyze event distribution
1430  	kindCounts := make(map[uint16]int)
1431  	var totalSize int64
1432  
1433  	for _, ev := range b.cachedEvents {
1434  		kindCounts[ev.Kind]++
1435  		totalSize += int64(len(ev.Content))
1436  	}
1437  
1438  	avgSize := totalSize / int64(len(b.cachedEvents))
1439  
1440  	fmt.Printf("\nEvent Statistics:\n")
1441  	fmt.Printf("  Total events: %d\n", len(b.cachedEvents))
1442  	fmt.Printf("  Average content size: %d bytes\n", avgSize)
1443  	fmt.Printf("  Event kinds found: %d unique\n", len(kindCounts))
1444  	fmt.Printf("  Most common kinds:\n")
1445  
1446  	// Print top 5 kinds
1447  	type kindCount struct {
1448  		kind  uint16
1449  		count int
1450  	}
1451  	var counts []kindCount
1452  	for k, c := range kindCounts {
1453  		counts = append(counts, kindCount{k, c})
1454  	}
1455  	sort.Slice(counts, func(i, j int) bool {
1456  		return counts[i].count > counts[j].count
1457  	})
1458  	for i := 0; i < min(5, len(counts)); i++ {
1459  		fmt.Printf("    Kind %d: %d events\n", counts[i].kind, counts[i].count)
1460  	}
1461  	fmt.Println()
1462  }
1463  
1464  // loadRealEvents loads events from embedded examples.Cache on first call
1465  func (b *Benchmark) loadRealEvents() {
1466  	b.eventCacheMu.Lock()
1467  	defer b.eventCacheMu.Unlock()
1468  
1469  	// Only load once
1470  	if len(b.cachedEvents) > 0 {
1471  		return
1472  	}
1473  
1474  	fmt.Println("Loading real-world sample events (11,596 events from 6 months of Nostr)...")
1475  	scanner := bufio.NewScanner(bytes.NewReader(examples.Cache))
1476  
1477  	buf := make([]byte, 0, 64*1024)
1478  	scanner.Buffer(buf, 1024*1024)
1479  
1480  	for scanner.Scan() {
1481  		var ev event.E
1482  		if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil {
1483  			fmt.Printf("Warning: failed to unmarshal event: %v\n", err)
1484  			continue
1485  		}
1486  		b.cachedEvents = append(b.cachedEvents, &ev)
1487  	}
1488  
1489  	if err := scanner.Err(); err != nil {
1490  		log.Fatalf("Failed to read events: %v", err)
1491  	}
1492  
1493  	fmt.Printf("Loaded %d real-world events (already signed, zero crypto overhead)\n", len(b.cachedEvents))
1494  	b.printEventStats()
1495  }
1496  
1497  // getEventChannel returns a channel that streams unique synthetic events
1498  // bufferSize controls memory usage - larger buffers improve throughput but use more memory
1499  func (b *Benchmark) getEventChannel(count int, bufferSize int) (<-chan *event.E, <-chan error) {
1500  	eventChan := make(chan *event.E, bufferSize)
1501  	errChan := make(chan error, 1)
1502  
1503  	go func() {
1504  		defer close(eventChan)
1505  		defer close(errChan)
1506  
1507  		// Create a single signer for all events
1508  		signer := p8k.MustNew()
1509  		if err := signer.Generate(); err != nil {
1510  			errChan <- fmt.Errorf("failed to generate keypair: %w", err)
1511  			return
1512  		}
1513  
1514  		// Base timestamp - start from current time and increment
1515  		baseTime := time.Now().Unix()
1516  
1517  		// Minimum content size
1518  		const minContentSize = 300
1519  
1520  		// Base content template
1521  		baseContent := "This is a benchmark test event with realistic content size. "
1522  
1523  		// Pre-calculate padding
1524  		paddingNeeded := minContentSize - len(baseContent)
1525  		if paddingNeeded < 0 {
1526  			paddingNeeded = 0
1527  		}
1528  
1529  		// Create padding string (with varied characters for realistic size)
1530  		padding := make([]byte, paddingNeeded)
1531  		for i := range padding {
1532  			padding[i] = ' ' + byte(i%94) // Printable ASCII characters
1533  		}
1534  
1535  		// Stream unique events
1536  		for i := 0; i < count; i++ {
1537  			ev := event.New()
1538  			ev.Kind = kind.TextNote.K
1539  			ev.CreatedAt = baseTime + int64(i) // Unique timestamp for each event
1540  			ev.Tags = tag.NewS()
1541  
1542  			// Create content with unique identifier and padding
1543  			ev.Content = []byte(fmt.Sprintf("%s Event #%d. %s", baseContent, i, string(padding)))
1544  
1545  			// Sign the event (this calculates ID and Sig)
1546  			if err := ev.Sign(signer); err != nil {
1547  				errChan <- fmt.Errorf("failed to sign event %d: %w", i, err)
1548  				return
1549  			}
1550  
1551  			eventChan <- ev
1552  		}
1553  	}()
1554  
1555  	return eventChan, errChan
1556  }
1557  
1558  // formatSize formats byte size in human-readable format
1559  func formatSize(bytes int) string {
1560  	if bytes == 0 {
1561  		return "Empty (0 bytes)"
1562  	}
1563  	if bytes < 1024 {
1564  		return fmt.Sprintf("%d bytes", bytes)
1565  	}
1566  	if bytes < 1024*1024 {
1567  		return fmt.Sprintf("%d KB", bytes/1024)
1568  	}
1569  	if bytes < 1024*1024*1024 {
1570  		return fmt.Sprintf("%d MB", bytes/(1024*1024))
1571  	}
1572  	return fmt.Sprintf("%.2f GB", float64(bytes)/(1024*1024*1024))
1573  }
1574  
1575  // min returns the minimum of two integers
1576  func min(a, b int) int {
1577  	if a < b {
1578  		return a
1579  	}
1580  	return b
1581  }
1582  
1583  // max returns the maximum of two integers
1584  func max(a, b int) int {
1585  	if a > b {
1586  		return a
1587  	}
1588  	return b
1589  }
1590  
1591  func (b *Benchmark) GenerateReport() {
1592  	fmt.Println("\n" + strings.Repeat("=", 80))
1593  	fmt.Println("BENCHMARK REPORT")
1594  	fmt.Println(strings.Repeat("=", 80))
1595  
1596  	b.mu.RLock()
1597  	defer b.mu.RUnlock()
1598  
1599  	for _, result := range b.results {
1600  		fmt.Printf("\nTest: %s\n", result.TestName)
1601  		fmt.Printf("Duration: %v\n", result.Duration)
1602  		fmt.Printf("Total Events: %d\n", result.TotalEvents)
1603  		fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
1604  		fmt.Printf("Success Rate: %.1f%%\n", result.SuccessRate)
1605  		fmt.Printf("Concurrent Workers: %d\n", result.ConcurrentWorkers)
1606  		fmt.Printf("Memory Used: %d MB\n", result.MemoryUsed/(1024*1024))
1607  		fmt.Printf("Avg Latency: %v\n", result.AvgLatency)
1608  		fmt.Printf("P90 Latency: %v\n", result.P90Latency)
1609  		fmt.Printf("P95 Latency: %v\n", result.P95Latency)
1610  		fmt.Printf("P99 Latency: %v\n", result.P99Latency)
1611  		fmt.Printf("Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg)
1612  
1613  		if len(result.Errors) > 0 {
1614  			fmt.Printf("Errors (%d):\n", len(result.Errors))
1615  			for i, err := range result.Errors {
1616  				if i < 5 { // Show first 5 errors
1617  					fmt.Printf("  - %s\n", err)
1618  				}
1619  			}
1620  			if len(result.Errors) > 5 {
1621  				fmt.Printf("  ... and %d more errors\n", len(result.Errors)-5)
1622  			}
1623  		}
1624  		fmt.Println(strings.Repeat("-", 40))
1625  	}
1626  
1627  	// Save report to file
1628  	reportPath := filepath.Join(b.config.DataDir, "benchmark_report.txt")
1629  	b.saveReportToFile(reportPath)
1630  	fmt.Printf("\nReport saved to: %s\n", reportPath)
1631  }
1632  
1633  func (b *Benchmark) saveReportToFile(path string) error {
1634  	file, err := os.Create(path)
1635  	if err != nil {
1636  		return err
1637  	}
1638  	defer file.Close()
1639  
1640  	file.WriteString("NOSTR RELAY BENCHMARK REPORT\n")
1641  	file.WriteString("============================\n\n")
1642  	file.WriteString(
1643  		fmt.Sprintf(
1644  			"Generated: %s\n", time.Now().Format(time.RFC3339),
1645  		),
1646  	)
1647  	file.WriteString(fmt.Sprintf("Relay: next.orly.dev\n"))
1648  	file.WriteString(fmt.Sprintf("Database: BadgerDB\n"))
1649  	file.WriteString(fmt.Sprintf("Workers: %d\n", b.config.ConcurrentWorkers))
1650  	file.WriteString(
1651  		fmt.Sprintf(
1652  			"Test Duration: %v\n\n", b.config.TestDuration,
1653  		),
1654  	)
1655  
1656  	b.mu.RLock()
1657  	defer b.mu.RUnlock()
1658  
1659  	for _, result := range b.results {
1660  		file.WriteString(fmt.Sprintf("Test: %s\n", result.TestName))
1661  		file.WriteString(fmt.Sprintf("Duration: %v\n", result.Duration))
1662  		file.WriteString(fmt.Sprintf("Events: %d\n", result.TotalEvents))
1663  		file.WriteString(
1664  			fmt.Sprintf(
1665  				"Events/sec: %.2f\n", result.EventsPerSecond,
1666  			),
1667  		)
1668  		file.WriteString(
1669  			fmt.Sprintf(
1670  				"Success Rate: %.1f%%\n", result.SuccessRate,
1671  			),
1672  		)
1673  		file.WriteString(fmt.Sprintf("Avg Latency: %v\n", result.AvgLatency))
1674  		file.WriteString(fmt.Sprintf("P90 Latency: %v\n", result.P90Latency))
1675  		file.WriteString(fmt.Sprintf("P95 Latency: %v\n", result.P95Latency))
1676  		file.WriteString(fmt.Sprintf("P99 Latency: %v\n", result.P99Latency))
1677  		file.WriteString(
1678  			fmt.Sprintf(
1679  				"Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg,
1680  			),
1681  		)
1682  		file.WriteString(
1683  			fmt.Sprintf(
1684  				"Memory: %d MB\n", result.MemoryUsed/(1024*1024),
1685  			),
1686  		)
1687  		file.WriteString("\n")
1688  	}
1689  
1690  	return nil
1691  }
1692  
1693  // GenerateAsciidocReport creates a simple AsciiDoc report alongside the text report.
1694  func (b *Benchmark) GenerateAsciidocReport() error {
1695  	path := filepath.Join(b.config.DataDir, "benchmark_report.adoc")
1696  	file, err := os.Create(path)
1697  	if err != nil {
1698  		return err
1699  	}
1700  	defer file.Close()
1701  
1702  	file.WriteString("= NOSTR Relay Benchmark Results\n\n")
1703  	file.WriteString(
1704  		fmt.Sprintf(
1705  			"Generated: %s\n\n", time.Now().Format(time.RFC3339),
1706  		),
1707  	)
1708  	file.WriteString("[cols=\"1,^1,^1,^1,^1,^1\",options=\"header\"]\n")
1709  	file.WriteString("|===\n")
1710  	file.WriteString("| Test | Events/sec | Avg Latency | P90 | P95 | Bottom 10% Avg\n")
1711  
1712  	b.mu.RLock()
1713  	defer b.mu.RUnlock()
1714  	for _, r := range b.results {
1715  		file.WriteString(fmt.Sprintf("| %s\n", r.TestName))
1716  		file.WriteString(fmt.Sprintf("| %.2f\n", r.EventsPerSecond))
1717  		file.WriteString(fmt.Sprintf("| %v\n", r.AvgLatency))
1718  		file.WriteString(fmt.Sprintf("| %v\n", r.P90Latency))
1719  		file.WriteString(fmt.Sprintf("| %v\n", r.P95Latency))
1720  		file.WriteString(fmt.Sprintf("| %v\n", r.Bottom10Avg))
1721  	}
1722  	file.WriteString("|===\n")
1723  
1724  	fmt.Printf("AsciiDoc report saved to: %s\n", path)
1725  	return nil
1726  }
1727  
1728  // Helper functions
1729  
1730  func calculateAvgLatency(latencies []time.Duration) time.Duration {
1731  	if len(latencies) == 0 {
1732  		return 0
1733  	}
1734  
1735  	var total time.Duration
1736  	for _, l := range latencies {
1737  		total += l
1738  	}
1739  	return total / time.Duration(len(latencies))
1740  }
1741  
1742  func calculatePercentileLatency(
1743  	latencies []time.Duration, percentile float64,
1744  ) time.Duration {
1745  	if len(latencies) == 0 {
1746  		return 0
1747  	}
1748  	// Sort a copy to avoid mutating caller slice
1749  	copySlice := make([]time.Duration, len(latencies))
1750  	copy(copySlice, latencies)
1751  	sort.Slice(
1752  		copySlice, func(i, j int) bool { return copySlice[i] < copySlice[j] },
1753  	)
1754  	index := int(float64(len(copySlice)-1) * percentile)
1755  	if index < 0 {
1756  		index = 0
1757  	}
1758  	if index >= len(copySlice) {
1759  		index = len(copySlice) - 1
1760  	}
1761  	return copySlice[index]
1762  }
1763  
1764  // calculateBottom10Avg returns the average latency of the slowest 10% of samples.
1765  func calculateBottom10Avg(latencies []time.Duration) time.Duration {
1766  	if len(latencies) == 0 {
1767  		return 0
1768  	}
1769  	copySlice := make([]time.Duration, len(latencies))
1770  	copy(copySlice, latencies)
1771  	sort.Slice(
1772  		copySlice, func(i, j int) bool { return copySlice[i] < copySlice[j] },
1773  	)
1774  	start := int(float64(len(copySlice)) * 0.9)
1775  	if start < 0 {
1776  		start = 0
1777  	}
1778  	if start >= len(copySlice) {
1779  		start = len(copySlice) - 1
1780  	}
1781  	var total time.Duration
1782  	for i := start; i < len(copySlice); i++ {
1783  		total += copySlice[i]
1784  	}
1785  	count := len(copySlice) - start
1786  	if count <= 0 {
1787  		return 0
1788  	}
1789  	return total / time.Duration(count)
1790  }
1791  
1792  func getMemUsage() uint64 {
1793  	var m runtime.MemStats
1794  	runtime.ReadMemStats(&m)
1795  	return m.Alloc
1796  }
1797