main.go raw
1 package main
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "flag"
9 "fmt"
10 "log"
11 "os"
12 "path/filepath"
13 "runtime"
14 "sort"
15 "strings"
16 "sync"
17 "time"
18
19 lol "next.orly.dev/pkg/lol"
20 "next.orly.dev/pkg/database"
21 "next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
22 "next.orly.dev/pkg/nostr/encoders/event"
23 examples "next.orly.dev/pkg/nostr/encoders/event/examples"
24 "next.orly.dev/pkg/nostr/encoders/filter"
25 "next.orly.dev/pkg/nostr/encoders/kind"
26 "next.orly.dev/pkg/nostr/encoders/tag"
27 "next.orly.dev/pkg/nostr/encoders/timestamp"
28 "next.orly.dev/pkg/nostr/interfaces/signer/p8k"
29 "next.orly.dev/pkg/nostr/ws"
30 )
31
32 type BenchmarkConfig struct {
33 DataDir string
34 NumEvents int
35 ConcurrentWorkers int
36 TestDuration time.Duration
37 BurstPattern bool
38 ReportInterval time.Duration
39
40 // Network load options
41 RelayURL string
42 NetWorkers int
43 NetRate int // events/sec per worker
44
45 // Backend selection
46 UseNeo4j bool
47 UseRelySQLite bool
48
49 // Graph traversal benchmark
50 UseGraphTraversal bool
51 UseNetworkGraphTraversal bool // Network-mode graph traversal (for multi-relay testing)
52 UsePPGComparison bool // PPG vs baseline comparison benchmark
53 }
54
55 type BenchmarkResult struct {
56 TestName string
57 Duration time.Duration
58 TotalEvents int
59 EventsPerSecond float64
60 AvgLatency time.Duration
61 P90Latency time.Duration
62 P95Latency time.Duration
63 P99Latency time.Duration
64 Bottom10Avg time.Duration
65 SuccessRate float64
66 ConcurrentWorkers int
67 MemoryUsed uint64
68 Errors []string
69 }
70
71 // RateLimiter implements a simple token bucket rate limiter
72 type RateLimiter struct {
73 rate float64 // events per second
74 interval time.Duration // time between events
75 lastEvent time.Time
76 mu sync.Mutex
77 }
78
79 // NewRateLimiter creates a rate limiter for the specified events per second
80 func NewRateLimiter(eventsPerSecond float64) *RateLimiter {
81 return &RateLimiter{
82 rate: eventsPerSecond,
83 interval: time.Duration(float64(time.Second) / eventsPerSecond),
84 lastEvent: time.Now(),
85 }
86 }
87
88 // Wait blocks until the next event is allowed based on the rate limit
89 func (rl *RateLimiter) Wait() {
90 rl.mu.Lock()
91 defer rl.mu.Unlock()
92
93 now := time.Now()
94 nextAllowed := rl.lastEvent.Add(rl.interval)
95
96 if now.Before(nextAllowed) {
97 time.Sleep(nextAllowed.Sub(now))
98 rl.lastEvent = nextAllowed
99 } else {
100 rl.lastEvent = now
101 }
102 }
103
104 type Benchmark struct {
105 config *BenchmarkConfig
106 db *database.D
107 results []*BenchmarkResult
108 mu sync.RWMutex
109 cachedEvents []*event.E // Real-world events from examples.Cache
110 eventCacheMu sync.Mutex
111 }
112
113 func main() {
114 lol.SetLogLevel("warn")
115 config := parseFlags()
116
117 if config.UseNetworkGraphTraversal {
118 // Network graph traversal mode: requires relay URL
119 if config.RelayURL == "" {
120 log.Fatal("Network graph traversal benchmark requires -relay-url flag")
121 }
122 runNetworkGraphTraversalBenchmark(config)
123 return
124 }
125
126 if config.RelayURL != "" {
127 // Network mode: connect to relay and generate traffic
128 runNetworkLoad(config)
129 return
130 }
131
132 if config.UseNeo4j {
133 // Run Neo4j benchmark
134 runNeo4jBenchmark(config)
135 return
136 }
137
138 if config.UseRelySQLite {
139 // Run Rely-SQLite benchmark
140 runRelySQLiteBenchmark(config)
141 return
142 }
143
144 if config.UseGraphTraversal {
145 // Run graph traversal benchmark
146 runGraphTraversalBenchmark(config)
147 return
148 }
149
150 // Run standard Badger benchmark
151 fmt.Printf("Starting Nostr Relay Benchmark (Badger Backend)\n")
152 fmt.Printf("Data Directory: %s\n", config.DataDir)
153 fmt.Printf(
154 "Events: %d, Workers: %d, Duration: %v\n",
155 config.NumEvents, config.ConcurrentWorkers, config.TestDuration,
156 )
157
158 benchmark := NewBenchmark(config)
159 defer benchmark.Close()
160
161 // Run benchmark suite twice with pauses
162 benchmark.RunSuite()
163
164 // Generate reports
165 benchmark.GenerateReport()
166 benchmark.GenerateAsciidocReport()
167 }
168
169 func runNeo4jBenchmark(config *BenchmarkConfig) {
170 fmt.Printf("Starting Nostr Relay Benchmark (Neo4j Backend)\n")
171 fmt.Printf("Data Directory: %s\n", config.DataDir)
172 fmt.Printf(
173 "Events: %d, Workers: %d\n",
174 config.NumEvents, config.ConcurrentWorkers,
175 )
176
177 neo4jBench, err := NewNeo4jBenchmark(config)
178 if err != nil {
179 log.Fatalf("Failed to create Neo4j benchmark: %v", err)
180 }
181 defer neo4jBench.Close()
182
183 // Run Neo4j benchmark suite
184 neo4jBench.RunSuite()
185
186 // Generate reports
187 neo4jBench.GenerateReport()
188 neo4jBench.GenerateAsciidocReport()
189 }
190
191 func runRelySQLiteBenchmark(config *BenchmarkConfig) {
192 fmt.Printf("Starting Nostr Relay Benchmark (Rely-SQLite Backend)\n")
193 fmt.Printf("Data Directory: %s\n", config.DataDir)
194 fmt.Printf(
195 "Events: %d, Workers: %d\n",
196 config.NumEvents, config.ConcurrentWorkers,
197 )
198
199 relysqliteBench, err := NewRelySQLiteBenchmark(config)
200 if err != nil {
201 log.Fatalf("Failed to create Rely-SQLite benchmark: %v", err)
202 }
203 defer relysqliteBench.Close()
204
205 // Run Rely-SQLite benchmark suite
206 relysqliteBench.RunSuite()
207
208 // Generate reports
209 relysqliteBench.GenerateReport()
210 relysqliteBench.GenerateAsciidocReport()
211 }
212
213 func runGraphTraversalBenchmark(config *BenchmarkConfig) {
214 fmt.Printf("Starting Graph Traversal Benchmark (Badger Backend)\n")
215 fmt.Printf("Data Directory: %s\n", config.DataDir)
216 fmt.Printf("Workers: %d\n", config.ConcurrentWorkers)
217 fmt.Printf("Pubkeys: %d, Follows per pubkey: %d-%d\n",
218 GraphBenchNumPubkeys, GraphBenchMinFollows, GraphBenchMaxFollows)
219
220 // Check if we can reuse an existing seeded database
221 reuseDB := false
222 if config.UsePPGComparison {
223 if info, err := os.Stat(filepath.Join(config.DataDir, "MANIFEST")); err == nil && info.Size() > 0 {
224 reuseDB = true
225 fmt.Printf("Reusing existing seeded database at %s\n", config.DataDir)
226 }
227 }
228
229 if !reuseDB {
230 // Clean up existing data directory
231 os.RemoveAll(config.DataDir)
232 }
233
234 ctx := context.Background()
235 cancel := func() {}
236
237 db, err := database.New(ctx, cancel, config.DataDir, "warn")
238 if err != nil {
239 log.Fatalf("Failed to create database: %v", err)
240 }
241 defer db.Close()
242
243 // Create graph traversal benchmark (always needed for pubkey arrays)
244 graphBench := NewGraphTraversalBenchmark(config, db)
245
246 if reuseDB {
247 // Regenerate pubkey/signer arrays (deterministic) without re-seeding DB
248 graphBench.RegeneratePubkeys()
249 } else {
250 // Seed the database with follow list events
251 graphBench.SeedDatabase()
252 }
253
254 // Run NIP-01 multi-hop traversal unless PPG mode is set (it has its own baseline)
255 if !config.UsePPGComparison {
256 graphBench.runThirdDegreeTraversal()
257 }
258
259 // Run PPG comparison if requested
260 if config.UsePPGComparison {
261 ppgBench := NewPPGBenchmark(config, db)
262 ppgBench.VerifyEquivalence(graphBench.pubkeys, 100)
263 ppgBench.RunSuite(graphBench.pubkeys)
264 }
265
266 // Generate reports
267 graphBench.PrintResults()
268 generateGraphTraversalAsciidocReport(config, graphBench.GetResults())
269 }
270
271 func generateGraphTraversalAsciidocReport(config *BenchmarkConfig, results []*BenchmarkResult) {
272 path := filepath.Join(config.DataDir, "graph_traversal_report.adoc")
273 file, err := os.Create(path)
274 if err != nil {
275 log.Printf("Failed to create report: %v", err)
276 return
277 }
278 defer file.Close()
279
280 file.WriteString("= Graph Traversal Benchmark Results\n\n")
281 file.WriteString(
282 fmt.Sprintf(
283 "Generated: %s\n\n", time.Now().Format(time.RFC3339),
284 ),
285 )
286 file.WriteString(fmt.Sprintf("Pubkeys: %d\n", GraphBenchNumPubkeys))
287 file.WriteString(fmt.Sprintf("Follows per pubkey: %d-%d\n", GraphBenchMinFollows, GraphBenchMaxFollows))
288 file.WriteString(fmt.Sprintf("Traversal depth: %d degrees\n\n", GraphBenchTraversalDepth))
289
290 file.WriteString("[cols=\"1,^1,^1,^1,^1,^1,^1\",options=\"header\"]\n")
291 file.WriteString("|===\n")
292 file.WriteString("| Test | Events/sec | Avg Latency | P90 | P95 | P99 | Bottom 10% Avg\n")
293
294 for _, r := range results {
295 file.WriteString(fmt.Sprintf("| %s\n", r.TestName))
296 file.WriteString(fmt.Sprintf("| %.2f\n", r.EventsPerSecond))
297 file.WriteString(fmt.Sprintf("| %v\n", r.AvgLatency))
298 file.WriteString(fmt.Sprintf("| %v\n", r.P90Latency))
299 file.WriteString(fmt.Sprintf("| %v\n", r.P95Latency))
300 file.WriteString(fmt.Sprintf("| %v\n", r.P99Latency))
301 file.WriteString(fmt.Sprintf("| %v\n", r.Bottom10Avg))
302 }
303 file.WriteString("|===\n")
304
305 fmt.Printf("AsciiDoc report saved to: %s\n", path)
306 }
307
308 func runNetworkGraphTraversalBenchmark(config *BenchmarkConfig) {
309 fmt.Printf("Starting Network Graph Traversal Benchmark\n")
310 fmt.Printf("Relay URL: %s\n", config.RelayURL)
311 fmt.Printf("Workers: %d\n", config.ConcurrentWorkers)
312 fmt.Printf("Pubkeys: %d, Follows per pubkey: %d-%d\n",
313 GraphBenchNumPubkeys, GraphBenchMinFollows, GraphBenchMaxFollows)
314
315 ctx := context.Background()
316
317 // Create and run network graph traversal benchmark
318 netGraphBench := NewNetworkGraphTraversalBenchmark(config.RelayURL, config.ConcurrentWorkers)
319
320 if err := netGraphBench.RunSuite(ctx); err != nil {
321 log.Fatalf("Network graph traversal benchmark failed: %v", err)
322 }
323
324 // Generate reports
325 netGraphBench.PrintResults()
326 generateNetworkGraphTraversalAsciidocReport(config, netGraphBench.GetResults())
327 }
328
329 func generateNetworkGraphTraversalAsciidocReport(config *BenchmarkConfig, results []*BenchmarkResult) {
330 path := filepath.Join(config.DataDir, "network_graph_traversal_report.adoc")
331 file, err := os.Create(path)
332 if err != nil {
333 log.Printf("Failed to create report: %v", err)
334 return
335 }
336 defer file.Close()
337
338 file.WriteString("= Network Graph Traversal Benchmark Results\n\n")
339 file.WriteString(
340 fmt.Sprintf(
341 "Generated: %s\n\n", time.Now().Format(time.RFC3339),
342 ),
343 )
344 file.WriteString(fmt.Sprintf("Relay URL: %s\n", config.RelayURL))
345 file.WriteString(fmt.Sprintf("Pubkeys: %d\n", GraphBenchNumPubkeys))
346 file.WriteString(fmt.Sprintf("Follows per pubkey: %d-%d\n", GraphBenchMinFollows, GraphBenchMaxFollows))
347 file.WriteString(fmt.Sprintf("Traversal depth: %d degrees\n\n", GraphBenchTraversalDepth))
348
349 file.WriteString("[cols=\"1,^1,^1,^1,^1,^1,^1\",options=\"header\"]\n")
350 file.WriteString("|===\n")
351 file.WriteString("| Test | Events/sec | Avg Latency | P90 | P95 | P99 | Bottom 10% Avg\n")
352
353 for _, r := range results {
354 file.WriteString(fmt.Sprintf("| %s\n", r.TestName))
355 file.WriteString(fmt.Sprintf("| %.2f\n", r.EventsPerSecond))
356 file.WriteString(fmt.Sprintf("| %v\n", r.AvgLatency))
357 file.WriteString(fmt.Sprintf("| %v\n", r.P90Latency))
358 file.WriteString(fmt.Sprintf("| %v\n", r.P95Latency))
359 file.WriteString(fmt.Sprintf("| %v\n", r.P99Latency))
360 file.WriteString(fmt.Sprintf("| %v\n", r.Bottom10Avg))
361 }
362 file.WriteString("|===\n")
363
364 fmt.Printf("AsciiDoc report saved to: %s\n", path)
365 }
366
367 func parseFlags() *BenchmarkConfig {
368 config := &BenchmarkConfig{}
369
370 flag.StringVar(
371 &config.DataDir, "datadir", "/tmp/benchmark_db", "Database directory",
372 )
373 flag.IntVar(
374 &config.NumEvents, "events", 10000, "Number of events to generate",
375 )
376 flag.IntVar(
377 &config.ConcurrentWorkers, "workers", max(2, runtime.NumCPU()/4),
378 "Number of concurrent workers (default: CPU cores / 4 for low CPU usage)",
379 )
380 flag.DurationVar(
381 &config.TestDuration, "duration", 60*time.Second, "Test duration",
382 )
383 flag.BoolVar(
384 &config.BurstPattern, "burst", true, "Enable burst pattern testing",
385 )
386 flag.DurationVar(
387 &config.ReportInterval, "report-interval", 10*time.Second,
388 "Report interval",
389 )
390
391 // Network mode flags
392 flag.StringVar(
393 &config.RelayURL, "relay-url", "",
394 "Relay WebSocket URL (enables network mode if set)",
395 )
396 flag.IntVar(
397 &config.NetWorkers, "net-workers", runtime.NumCPU(),
398 "Network workers (connections)",
399 )
400 flag.IntVar(&config.NetRate, "net-rate", 20, "Events per second per worker")
401
402 // Backend selection
403 flag.BoolVar(
404 &config.UseNeo4j, "neo4j", false,
405 "Use Neo4j backend (requires Docker)",
406 )
407 flag.BoolVar(
408 &config.UseRelySQLite, "relysqlite", false,
409 "Use rely-sqlite backend",
410 )
411
412 // Graph traversal benchmark
413 flag.BoolVar(
414 &config.UseGraphTraversal, "graph", false,
415 "Run graph traversal benchmark (100k pubkeys, 3-degree follows)",
416 )
417 flag.BoolVar(
418 &config.UseNetworkGraphTraversal, "graph-network", false,
419 "Run network graph traversal benchmark against relay specified by -relay-url",
420 )
421 flag.BoolVar(
422 &config.UsePPGComparison, "ppg", false,
423 "Run ppg/gpp vs baseline comparison benchmark (requires -graph to seed data)",
424 )
425 flag.IntVar(
426 &GraphBenchNumPubkeys, "graph-pubkeys", 10000,
427 "Number of pubkeys for graph benchmark (default 10000)",
428 )
429
430 flag.Parse()
431 return config
432 }
433
434 func runNetworkLoad(cfg *BenchmarkConfig) {
435 fmt.Printf(
436 "Network mode: relay=%s workers=%d rate=%d ev/s per worker duration=%s\n",
437 cfg.RelayURL, cfg.NetWorkers, cfg.NetRate, cfg.TestDuration,
438 )
439 // Create a timeout context for benchmark control only, not for connections
440 timeoutCtx, cancel := context.WithTimeout(
441 context.Background(), cfg.TestDuration,
442 )
443 defer cancel()
444
445 // Use a separate background context for relay connections to avoid
446 // cancelling the server when the benchmark timeout expires
447 connCtx := context.Background()
448
449 var wg sync.WaitGroup
450 if cfg.NetWorkers <= 0 {
451 cfg.NetWorkers = 1
452 }
453 if cfg.NetRate <= 0 {
454 cfg.NetRate = 1
455 }
456 for i := 0; i < cfg.NetWorkers; i++ {
457 wg.Add(1)
458 go func(workerID int) {
459 defer wg.Done()
460 // Connect to relay using non-cancellable context
461 rl, err := ws.RelayConnect(connCtx, cfg.RelayURL)
462 if err != nil {
463 fmt.Printf(
464 "worker %d: failed to connect to %s: %v\n", workerID,
465 cfg.RelayURL, err,
466 )
467 return
468 }
469 defer rl.Close()
470 fmt.Printf("worker %d: connected to %s\n", workerID, cfg.RelayURL)
471
472 // Signer for this worker
473 var keys *p8k.Signer
474 if keys, err = p8k.New(); err != nil {
475 fmt.Printf("worker %d: signer create failed: %v\n", workerID, err)
476 return
477 }
478 if err := keys.Generate(); err != nil {
479 fmt.Printf("worker %d: keygen failed: %v\n", workerID, err)
480 return
481 }
482
483 // Start a concurrent subscriber that listens for events published by this worker
484 // Build a filter that matches this worker's pubkey and kind=1, since now
485 since := time.Now().Unix()
486 go func() {
487 f := filter.New()
488 f.Kinds = kind.NewS(kind.TextNote)
489 f.Authors = tag.NewWithCap(1)
490 f.Authors.T = append(f.Authors.T, keys.Pub())
491 f.Since = timestamp.FromUnix(since)
492 sub, err := rl.Subscribe(connCtx, filter.NewS(f))
493 if err != nil {
494 fmt.Printf(
495 "worker %d: subscribe error: %v\n", workerID, err,
496 )
497 return
498 }
499 defer sub.Unsub()
500 recv := 0
501 for {
502 select {
503 case <-timeoutCtx.Done():
504 fmt.Printf(
505 "worker %d: subscriber exiting after %d events (benchmark timeout: %v)\n",
506 workerID, recv, timeoutCtx.Err(),
507 )
508 return
509 case <-rl.Context().Done():
510 fmt.Printf(
511 "worker %d: relay connection closed; cause=%v lastErr=%v\n",
512 workerID, rl.ConnectionCause(), rl.LastError(),
513 )
514 return
515 case <-sub.EndOfStoredEvents:
516 // continue streaming live events
517 case ev := <-sub.Events:
518 if ev == nil {
519 continue
520 }
521 recv++
522 if recv%100 == 0 {
523 fmt.Printf(
524 "worker %d: received %d matching events\n",
525 workerID, recv,
526 )
527 }
528 ev.Free()
529 }
530 }
531 }()
532
533 interval := time.Second / time.Duration(cfg.NetRate)
534 ticker := time.NewTicker(interval)
535 defer ticker.Stop()
536 count := 0
537 for {
538 select {
539 case <-timeoutCtx.Done():
540 fmt.Printf(
541 "worker %d: stopping after %d publishes\n", workerID,
542 count,
543 )
544 return
545 case <-ticker.C:
546 // Build and sign a simple text note event
547 ev := event.New()
548 ev.Kind = uint16(1)
549 ev.CreatedAt = time.Now().Unix()
550 ev.Tags = tag.NewS()
551 ev.Content = []byte(fmt.Sprintf(
552 "bench worker=%d n=%d", workerID, count,
553 ))
554 if err := ev.Sign(keys); err != nil {
555 fmt.Printf("worker %d: sign error: %v\n", workerID, err)
556 ev.Free()
557 continue
558 }
559 // Async publish: don't wait for OK; this greatly increases throughput
560 ch := rl.Write(eventenvelope.NewSubmissionWith(ev).Marshal(nil))
561 // Non-blocking error check
562 select {
563 case err := <-ch:
564 if err != nil {
565 fmt.Printf(
566 "worker %d: write error: %v\n", workerID, err,
567 )
568 }
569 default:
570 }
571 if count%100 == 0 {
572 fmt.Printf(
573 "worker %d: sent %d events\n", workerID, count,
574 )
575 }
576 ev.Free()
577 count++
578 }
579 }
580 }(i)
581 }
582 wg.Wait()
583 }
584
585 func NewBenchmark(config *BenchmarkConfig) *Benchmark {
586 // Clean up existing data directory
587 os.RemoveAll(config.DataDir)
588
589 ctx := context.Background()
590 cancel := func() {}
591
592 db, err := database.New(ctx, cancel, config.DataDir, "warn")
593 if err != nil {
594 log.Fatalf("Failed to create database: %v", err)
595 }
596
597 b := &Benchmark{
598 config: config,
599 db: db,
600 results: make([]*BenchmarkResult, 0),
601 }
602
603 // Trigger compaction/GC before starting tests
604 b.compactDatabase()
605
606 return b
607 }
608
609 func (b *Benchmark) Close() {
610 if b.db != nil {
611 b.db.Close()
612 }
613 }
614
615 // RunSuite runs the full benchmark test suite
616 func (b *Benchmark) RunSuite() {
617 fmt.Println("\n╔════════════════════════════════════════════════════════╗")
618 fmt.Println("║ BADGER BACKEND BENCHMARK SUITE ║")
619 fmt.Println("╚════════════════════════════════════════════════════════╝")
620
621 fmt.Printf("\n=== Starting Badger benchmark ===\n")
622
623 fmt.Printf("RunPeakThroughputTest (Badger)..\n")
624 b.RunPeakThroughputTest()
625 fmt.Println("Wiping database between tests...")
626 b.db.Wipe()
627 time.Sleep(10 * time.Second)
628
629 fmt.Printf("RunBurstPatternTest (Badger)..\n")
630 b.RunBurstPatternTest()
631 fmt.Println("Wiping database between tests...")
632 b.db.Wipe()
633 time.Sleep(10 * time.Second)
634
635 fmt.Printf("RunMixedReadWriteTest (Badger)..\n")
636 b.RunMixedReadWriteTest()
637 fmt.Println("Wiping database between tests...")
638 b.db.Wipe()
639 time.Sleep(10 * time.Second)
640
641 fmt.Printf("RunQueryTest (Badger)..\n")
642 b.RunQueryTest()
643 fmt.Println("Wiping database between tests...")
644 b.db.Wipe()
645 time.Sleep(10 * time.Second)
646
647 fmt.Printf("RunConcurrentQueryStoreTest (Badger)..\n")
648 b.RunConcurrentQueryStoreTest()
649
650 fmt.Printf("\n=== Badger benchmark completed ===\n\n")
651 }
652
653 // compactDatabase triggers a Badger value log GC before starting tests.
654 func (b *Benchmark) compactDatabase() {
655 if b.db == nil || b.db.DB == nil {
656 return
657 }
658 // Attempt value log GC. Ignore errors; this is best-effort.
659 _ = b.db.DB.RunValueLogGC(0.5)
660 }
661
662 func (b *Benchmark) RunPeakThroughputTest() {
663 fmt.Println("\n=== Peak Throughput Test ===")
664
665 // Create latency recorder (writes to disk, not memory)
666 latencyRecorder, err := NewLatencyRecorder(b.config.DataDir, "peak_throughput")
667 if err != nil {
668 log.Fatalf("Failed to create latency recorder: %v", err)
669 }
670
671 start := time.Now()
672 var wg sync.WaitGroup
673 var totalEvents int64
674 var errorCount int64
675 var mu sync.Mutex
676
677 // Stream events from memory (real-world sample events)
678 eventChan, errChan := b.getEventChannel(b.config.NumEvents, 1000)
679
680 // Calculate per-worker rate: 20k events/sec total divided by worker count
681 // This prevents all workers from synchronizing and hitting DB simultaneously
682 perWorkerRate := 20000.0 / float64(b.config.ConcurrentWorkers)
683
684 // Start workers with rate limiting
685 ctx := context.Background()
686
687 for i := 0; i < b.config.ConcurrentWorkers; i++ {
688 wg.Add(1)
689 go func(workerID int) {
690 defer wg.Done()
691
692 // Each worker gets its own rate limiter to avoid mutex contention
693 workerLimiter := NewRateLimiter(perWorkerRate)
694
695 for ev := range eventChan {
696 // Wait for rate limiter to allow this event
697 workerLimiter.Wait()
698
699 eventStart := time.Now()
700 _, err := b.db.SaveEvent(ctx, ev)
701 latency := time.Since(eventStart)
702
703 mu.Lock()
704 if err != nil {
705 errorCount++
706 } else {
707 totalEvents++
708 if err := latencyRecorder.Record(latency); err != nil {
709 log.Printf("Failed to record latency: %v", err)
710 }
711 }
712 mu.Unlock()
713 }
714 }(i)
715 }
716
717 // Check for streaming errors
718 go func() {
719 for err := range errChan {
720 if err != nil {
721 log.Printf("Event stream error: %v", err)
722 }
723 }
724 }()
725
726 wg.Wait()
727 duration := time.Since(start)
728
729 // Flush latency data to disk before calculating stats
730 if err := latencyRecorder.Close(); err != nil {
731 log.Printf("Failed to close latency recorder: %v", err)
732 }
733
734 // Calculate statistics from disk
735 latencyStats, err := latencyRecorder.CalculateStats()
736 if err != nil {
737 log.Printf("Failed to calculate latency stats: %v", err)
738 latencyStats = &LatencyStats{}
739 }
740
741 // Calculate metrics
742 result := &BenchmarkResult{
743 TestName: "Peak Throughput",
744 Duration: duration,
745 TotalEvents: int(totalEvents),
746 EventsPerSecond: float64(totalEvents) / duration.Seconds(),
747 ConcurrentWorkers: b.config.ConcurrentWorkers,
748 MemoryUsed: getMemUsage(),
749 AvgLatency: latencyStats.Avg,
750 P90Latency: latencyStats.P90,
751 P95Latency: latencyStats.P95,
752 P99Latency: latencyStats.P99,
753 Bottom10Avg: latencyStats.Bottom10,
754 }
755
756 result.SuccessRate = float64(totalEvents) / float64(b.config.NumEvents) * 100
757
758 b.mu.Lock()
759 b.results = append(b.results, result)
760 b.mu.Unlock()
761
762 fmt.Printf(
763 "Events saved: %d/%d (%.1f%%), errors: %d\n",
764 totalEvents, b.config.NumEvents, result.SuccessRate, errorCount,
765 )
766 fmt.Printf("Duration: %v\n", duration)
767 fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
768 fmt.Printf("Avg latency: %v\n", result.AvgLatency)
769 fmt.Printf("P90 latency: %v\n", result.P90Latency)
770 fmt.Printf("P95 latency: %v\n", result.P95Latency)
771 fmt.Printf("P99 latency: %v\n", result.P99Latency)
772 fmt.Printf("Bottom 10%% Avg latency: %v\n", result.Bottom10Avg)
773 }
774
775 func (b *Benchmark) RunBurstPatternTest() {
776 fmt.Println("\n=== Burst Pattern Test ===")
777
778 // Create latency recorder (writes to disk, not memory)
779 latencyRecorder, err := NewLatencyRecorder(b.config.DataDir, "burst_pattern")
780 if err != nil {
781 log.Fatalf("Failed to create latency recorder: %v", err)
782 }
783
784 start := time.Now()
785 var totalEvents int64
786 var errorCount int64
787 var mu sync.Mutex
788
789 // Stream events from memory (real-world sample events)
790 eventChan, errChan := b.getEventChannel(b.config.NumEvents, 500)
791
792 // Check for streaming errors
793 go func() {
794 for err := range errChan {
795 if err != nil {
796 log.Printf("Event stream error: %v", err)
797 }
798 }
799 }()
800
801 // Simulate burst pattern: high activity periods followed by quiet periods
802 burstSize := b.config.NumEvents / 10 // 10% of events in each burst
803 quietPeriod := 500 * time.Millisecond
804 burstPeriod := 100 * time.Millisecond
805
806 ctx := context.Background()
807 var eventIndex int64
808
809 // Start persistent worker pool (prevents goroutine explosion)
810 numWorkers := b.config.ConcurrentWorkers
811 eventQueue := make(chan *event.E, numWorkers*4)
812 var wg sync.WaitGroup
813
814 // Calculate per-worker rate to avoid mutex contention
815 perWorkerRate := 20000.0 / float64(numWorkers)
816
817 for w := 0; w < numWorkers; w++ {
818 wg.Add(1)
819 go func() {
820 defer wg.Done()
821
822 // Each worker gets its own rate limiter
823 workerLimiter := NewRateLimiter(perWorkerRate)
824
825 for ev := range eventQueue {
826 // Wait for rate limiter to allow this event
827 workerLimiter.Wait()
828
829 eventStart := time.Now()
830 _, err := b.db.SaveEvent(ctx, ev)
831 latency := time.Since(eventStart)
832
833 mu.Lock()
834 if err != nil {
835 errorCount++
836 } else {
837 totalEvents++
838 // Record latency to disk instead of keeping in memory
839 if err := latencyRecorder.Record(latency); err != nil {
840 log.Printf("Failed to record latency: %v", err)
841 }
842 }
843 mu.Unlock()
844 }
845 }()
846 }
847
848 for int(eventIndex) < b.config.NumEvents && time.Since(start) < b.config.TestDuration {
849 // Burst period - send events rapidly
850 burstStart := time.Now()
851
852 for i := 0; i < burstSize && int(eventIndex) < b.config.NumEvents; i++ {
853 ev, ok := <-eventChan
854 if !ok {
855 break
856 }
857 eventQueue <- ev
858 eventIndex++
859 time.Sleep(burstPeriod / time.Duration(burstSize))
860 }
861
862 fmt.Printf(
863 "Burst completed: %d events in %v\n", burstSize,
864 time.Since(burstStart),
865 )
866
867 // Quiet period
868 time.Sleep(quietPeriod)
869 }
870
871 close(eventQueue)
872 wg.Wait()
873
874 duration := time.Since(start)
875
876 // Flush latency data to disk before calculating stats
877 if err := latencyRecorder.Close(); err != nil {
878 log.Printf("Failed to close latency recorder: %v", err)
879 }
880
881 // Calculate statistics from disk
882 latencyStats, err := latencyRecorder.CalculateStats()
883 if err != nil {
884 log.Printf("Failed to calculate latency stats: %v", err)
885 latencyStats = &LatencyStats{}
886 }
887
888 // Calculate metrics
889 result := &BenchmarkResult{
890 TestName: "Burst Pattern",
891 Duration: duration,
892 TotalEvents: int(totalEvents),
893 EventsPerSecond: float64(totalEvents) / duration.Seconds(),
894 ConcurrentWorkers: b.config.ConcurrentWorkers,
895 MemoryUsed: getMemUsage(),
896 AvgLatency: latencyStats.Avg,
897 P90Latency: latencyStats.P90,
898 P95Latency: latencyStats.P95,
899 P99Latency: latencyStats.P99,
900 Bottom10Avg: latencyStats.Bottom10,
901 }
902
903 result.SuccessRate = float64(totalEvents) / float64(eventIndex) * 100
904
905 b.mu.Lock()
906 b.results = append(b.results, result)
907 b.mu.Unlock()
908
909 fmt.Printf(
910 "Burst test completed: %d events in %v, errors: %d\n",
911 totalEvents, duration, errorCount,
912 )
913 fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
914 }
915
916 func (b *Benchmark) RunMixedReadWriteTest() {
917 fmt.Println("\n=== Mixed Read/Write Test ===")
918
919 start := time.Now()
920 var totalWrites, totalReads int64
921 var writeLatencies, readLatencies []time.Duration
922 var errors []error
923 var mu sync.Mutex
924
925 // Pre-populate with some events for reading
926 seedEvents := b.generateEvents(1000)
927 ctx := context.Background()
928
929 fmt.Println("Pre-populating database for read tests...")
930 for _, ev := range seedEvents {
931 b.db.SaveEvent(ctx, ev)
932 }
933
934 events := b.generateEvents(b.config.NumEvents)
935 var wg sync.WaitGroup
936
937 // Calculate per-worker rate to avoid mutex contention
938 perWorkerRate := 20000.0 / float64(b.config.ConcurrentWorkers)
939
940 // Start mixed read/write workers
941 for i := 0; i < b.config.ConcurrentWorkers; i++ {
942 wg.Add(1)
943 go func(workerID int) {
944 defer wg.Done()
945
946 // Each worker gets its own rate limiter
947 workerLimiter := NewRateLimiter(perWorkerRate)
948
949 eventIndex := workerID
950 for time.Since(start) < b.config.TestDuration && eventIndex < len(events) {
951 // Alternate between write and read operations
952 if eventIndex%2 == 0 {
953 // Write operation - apply rate limiting
954 workerLimiter.Wait()
955
956 writeStart := time.Now()
957 _, err := b.db.SaveEvent(ctx, events[eventIndex])
958 writeLatency := time.Since(writeStart)
959
960 mu.Lock()
961 if err != nil {
962 errors = append(errors, err)
963 } else {
964 totalWrites++
965 writeLatencies = append(writeLatencies, writeLatency)
966 }
967 mu.Unlock()
968 } else {
969 // Read operation
970 readStart := time.Now()
971 f := filter.New()
972 f.Kinds = kind.NewS(kind.TextNote)
973 limit := uint(10)
974 f.Limit = &limit
975 _, err := b.db.GetSerialsFromFilter(f)
976 readLatency := time.Since(readStart)
977
978 mu.Lock()
979 if err != nil {
980 errors = append(errors, err)
981 } else {
982 totalReads++
983 readLatencies = append(readLatencies, readLatency)
984 }
985 mu.Unlock()
986 }
987
988 eventIndex += b.config.ConcurrentWorkers
989 time.Sleep(10 * time.Millisecond) // Small delay between operations
990 }
991 }(i)
992 }
993
994 wg.Wait()
995 duration := time.Since(start)
996
997 // Calculate metrics
998 result := &BenchmarkResult{
999 TestName: "Mixed Read/Write",
1000 Duration: duration,
1001 TotalEvents: int(totalWrites + totalReads),
1002 EventsPerSecond: float64(totalWrites+totalReads) / duration.Seconds(),
1003 ConcurrentWorkers: b.config.ConcurrentWorkers,
1004 MemoryUsed: getMemUsage(),
1005 }
1006
1007 // Calculate combined latencies for overall metrics
1008 allLatencies := append(writeLatencies, readLatencies...)
1009 if len(allLatencies) > 0 {
1010 result.AvgLatency = calculateAvgLatency(allLatencies)
1011 result.P90Latency = calculatePercentileLatency(allLatencies, 0.90)
1012 result.P95Latency = calculatePercentileLatency(allLatencies, 0.95)
1013 result.P99Latency = calculatePercentileLatency(allLatencies, 0.99)
1014 result.Bottom10Avg = calculateBottom10Avg(allLatencies)
1015 }
1016
1017 result.SuccessRate = float64(totalWrites+totalReads) / float64(len(events)) * 100
1018
1019 for _, err := range errors {
1020 result.Errors = append(result.Errors, err.Error())
1021 }
1022
1023 b.mu.Lock()
1024 b.results = append(b.results, result)
1025 b.mu.Unlock()
1026
1027 fmt.Printf(
1028 "Mixed test completed: %d writes, %d reads in %v\n", totalWrites,
1029 totalReads, duration,
1030 )
1031 fmt.Printf("Combined ops/sec: %.2f\n", result.EventsPerSecond)
1032 }
1033
1034 // RunQueryTest specifically benchmarks the QueryEvents function performance
1035 func (b *Benchmark) RunQueryTest() {
1036 fmt.Println("\n=== Query Test ===")
1037
1038 start := time.Now()
1039 var totalQueries int64
1040 var queryLatencies []time.Duration
1041 var errors []error
1042 var mu sync.Mutex
1043
1044 // Pre-populate with events for querying
1045 numSeedEvents := 10000
1046 seedEvents := b.generateEvents(numSeedEvents)
1047 ctx := context.Background()
1048
1049 fmt.Printf(
1050 "Pre-populating database with %d events for query tests...\n",
1051 numSeedEvents,
1052 )
1053 for _, ev := range seedEvents {
1054 b.db.SaveEvent(ctx, ev)
1055 }
1056
1057 // Create different types of filters for querying
1058 filters := []*filter.F{
1059 func() *filter.F { // Kind filter
1060 f := filter.New()
1061 f.Kinds = kind.NewS(kind.TextNote)
1062 limit := uint(100)
1063 f.Limit = &limit
1064 return f
1065 }(),
1066 func() *filter.F { // Tag filter
1067 f := filter.New()
1068 f.Tags = tag.NewS(
1069 tag.NewFromBytesSlice(
1070 []byte("t"), []byte("benchmark"),
1071 ),
1072 )
1073 limit := uint(100)
1074 f.Limit = &limit
1075 return f
1076 }(),
1077 func() *filter.F { // Mixed filter
1078 f := filter.New()
1079 f.Kinds = kind.NewS(kind.TextNote)
1080 f.Tags = tag.NewS(
1081 tag.NewFromBytesSlice(
1082 []byte("t"), []byte("benchmark"),
1083 ),
1084 )
1085 limit := uint(50)
1086 f.Limit = &limit
1087 return f
1088 }(),
1089 }
1090
1091 var wg sync.WaitGroup
1092 // Start query workers
1093 for i := 0; i < b.config.ConcurrentWorkers; i++ {
1094 wg.Add(1)
1095 go func(workerID int) {
1096 defer wg.Done()
1097
1098 filterIndex := workerID % len(filters)
1099 queryCount := 0
1100
1101 for time.Since(start) < b.config.TestDuration {
1102 // Rotate through different filters
1103 f := filters[filterIndex]
1104 filterIndex = (filterIndex + 1) % len(filters)
1105
1106 // Execute query
1107 queryStart := time.Now()
1108 events, err := b.db.QueryEvents(ctx, f)
1109 queryLatency := time.Since(queryStart)
1110
1111 mu.Lock()
1112 if err != nil {
1113 errors = append(errors, err)
1114 } else {
1115 totalQueries++
1116 queryLatencies = append(queryLatencies, queryLatency)
1117
1118 // Free event memory
1119 for _, ev := range events {
1120 ev.Free()
1121 }
1122 }
1123 mu.Unlock()
1124
1125 queryCount++
1126 // Always add delay to prevent CPU saturation (queries are CPU-intensive)
1127 time.Sleep(1 * time.Millisecond)
1128 }
1129 }(i)
1130 }
1131
1132 wg.Wait()
1133 duration := time.Since(start)
1134
1135 // Calculate metrics
1136 result := &BenchmarkResult{
1137 TestName: "Query Performance",
1138 Duration: duration,
1139 TotalEvents: int(totalQueries),
1140 EventsPerSecond: float64(totalQueries) / duration.Seconds(),
1141 ConcurrentWorkers: b.config.ConcurrentWorkers,
1142 MemoryUsed: getMemUsage(),
1143 }
1144
1145 if len(queryLatencies) > 0 {
1146 result.AvgLatency = calculateAvgLatency(queryLatencies)
1147 result.P90Latency = calculatePercentileLatency(queryLatencies, 0.90)
1148 result.P95Latency = calculatePercentileLatency(queryLatencies, 0.95)
1149 result.P99Latency = calculatePercentileLatency(queryLatencies, 0.99)
1150 result.Bottom10Avg = calculateBottom10Avg(queryLatencies)
1151 }
1152
1153 result.SuccessRate = 100.0 // No specific target count for queries
1154
1155 for _, err := range errors {
1156 result.Errors = append(result.Errors, err.Error())
1157 }
1158
1159 b.mu.Lock()
1160 b.results = append(b.results, result)
1161 b.mu.Unlock()
1162
1163 fmt.Printf(
1164 "Query test completed: %d queries in %v\n", totalQueries, duration,
1165 )
1166 fmt.Printf("Queries/sec: %.2f\n", result.EventsPerSecond)
1167 fmt.Printf("Avg query latency: %v\n", result.AvgLatency)
1168 fmt.Printf("P95 query latency: %v\n", result.P95Latency)
1169 fmt.Printf("P99 query latency: %v\n", result.P99Latency)
1170 }
1171
1172 // RunConcurrentQueryStoreTest benchmarks the performance of concurrent query and store operations
1173 func (b *Benchmark) RunConcurrentQueryStoreTest() {
1174 fmt.Println("\n=== Concurrent Query/Store Test ===")
1175
1176 start := time.Now()
1177 var totalQueries, totalWrites int64
1178 var queryLatencies, writeLatencies []time.Duration
1179 var errors []error
1180 var mu sync.Mutex
1181
1182 // Pre-populate with some events
1183 numSeedEvents := 5000
1184 seedEvents := b.generateEvents(numSeedEvents)
1185 ctx := context.Background()
1186
1187 fmt.Printf(
1188 "Pre-populating database with %d events for concurrent query/store test...\n",
1189 numSeedEvents,
1190 )
1191 for _, ev := range seedEvents {
1192 b.db.SaveEvent(ctx, ev)
1193 }
1194
1195 // Generate events for writing during the test
1196 writeEvents := b.generateEvents(b.config.NumEvents)
1197
1198 // Create filters for querying
1199 filters := []*filter.F{
1200 func() *filter.F { // Recent events filter
1201 f := filter.New()
1202 f.Since = timestamp.FromUnix(time.Now().Add(-10 * time.Minute).Unix())
1203 limit := uint(100)
1204 f.Limit = &limit
1205 return f
1206 }(),
1207 func() *filter.F { // Kind and tag filter
1208 f := filter.New()
1209 f.Kinds = kind.NewS(kind.TextNote)
1210 f.Tags = tag.NewS(
1211 tag.NewFromBytesSlice(
1212 []byte("t"), []byte("benchmark"),
1213 ),
1214 )
1215 limit := uint(50)
1216 f.Limit = &limit
1217 return f
1218 }(),
1219 }
1220
1221 var wg sync.WaitGroup
1222
1223 // Half of the workers will be readers, half will be writers
1224 numReaders := b.config.ConcurrentWorkers / 2
1225 numWriters := b.config.ConcurrentWorkers - numReaders
1226
1227 // Calculate per-worker write rate to avoid mutex contention
1228 perWorkerRate := 20000.0 / float64(numWriters)
1229
1230 // Start query workers (readers)
1231 for i := 0; i < numReaders; i++ {
1232 wg.Add(1)
1233 go func(workerID int) {
1234 defer wg.Done()
1235
1236 filterIndex := workerID % len(filters)
1237 queryCount := 0
1238
1239 for time.Since(start) < b.config.TestDuration {
1240 // Select a filter
1241 f := filters[filterIndex]
1242 filterIndex = (filterIndex + 1) % len(filters)
1243
1244 // Execute query
1245 queryStart := time.Now()
1246 events, err := b.db.QueryEvents(ctx, f)
1247 queryLatency := time.Since(queryStart)
1248
1249 mu.Lock()
1250 if err != nil {
1251 errors = append(errors, err)
1252 } else {
1253 totalQueries++
1254 queryLatencies = append(queryLatencies, queryLatency)
1255
1256 // Free event memory
1257 for _, ev := range events {
1258 ev.Free()
1259 }
1260 }
1261 mu.Unlock()
1262
1263 queryCount++
1264 // Always add delay to prevent CPU saturation (queries are CPU-intensive)
1265 time.Sleep(1 * time.Millisecond)
1266 }
1267 }(i)
1268 }
1269
1270 // Start write workers
1271 for i := 0; i < numWriters; i++ {
1272 wg.Add(1)
1273 go func(workerID int) {
1274 defer wg.Done()
1275
1276 // Each worker gets its own rate limiter
1277 workerLimiter := NewRateLimiter(perWorkerRate)
1278
1279 eventIndex := workerID
1280 writeCount := 0
1281
1282 for time.Since(start) < b.config.TestDuration && eventIndex < len(writeEvents) {
1283 // Write operation - apply rate limiting
1284 workerLimiter.Wait()
1285
1286 writeStart := time.Now()
1287 _, err := b.db.SaveEvent(ctx, writeEvents[eventIndex])
1288 writeLatency := time.Since(writeStart)
1289
1290 mu.Lock()
1291 if err != nil {
1292 errors = append(errors, err)
1293 } else {
1294 totalWrites++
1295 writeLatencies = append(writeLatencies, writeLatency)
1296 }
1297 mu.Unlock()
1298
1299 eventIndex += numWriters
1300 writeCount++
1301 }
1302 }(i)
1303 }
1304
1305 wg.Wait()
1306 duration := time.Since(start)
1307
1308 // Calculate metrics
1309 totalOps := totalQueries + totalWrites
1310 result := &BenchmarkResult{
1311 TestName: "Concurrent Query/Store",
1312 Duration: duration,
1313 TotalEvents: int(totalOps),
1314 EventsPerSecond: float64(totalOps) / duration.Seconds(),
1315 ConcurrentWorkers: b.config.ConcurrentWorkers,
1316 MemoryUsed: getMemUsage(),
1317 }
1318
1319 // Calculate combined latencies for overall metrics
1320 allLatencies := append(queryLatencies, writeLatencies...)
1321 if len(allLatencies) > 0 {
1322 result.AvgLatency = calculateAvgLatency(allLatencies)
1323 result.P90Latency = calculatePercentileLatency(allLatencies, 0.90)
1324 result.P95Latency = calculatePercentileLatency(allLatencies, 0.95)
1325 result.P99Latency = calculatePercentileLatency(allLatencies, 0.99)
1326 result.Bottom10Avg = calculateBottom10Avg(allLatencies)
1327 }
1328
1329 result.SuccessRate = 100.0 // No specific target
1330
1331 for _, err := range errors {
1332 result.Errors = append(result.Errors, err.Error())
1333 }
1334
1335 b.mu.Lock()
1336 b.results = append(b.results, result)
1337 b.mu.Unlock()
1338
1339 // Calculate separate metrics for queries and writes
1340 var queryAvg, writeAvg time.Duration
1341 if len(queryLatencies) > 0 {
1342 queryAvg = calculateAvgLatency(queryLatencies)
1343 }
1344 if len(writeLatencies) > 0 {
1345 writeAvg = calculateAvgLatency(writeLatencies)
1346 }
1347
1348 fmt.Printf(
1349 "Concurrent test completed: %d operations (%d queries, %d writes) in %v\n",
1350 totalOps, totalQueries, totalWrites, duration,
1351 )
1352 fmt.Printf("Operations/sec: %.2f\n", result.EventsPerSecond)
1353 fmt.Printf("Avg latency: %v\n", result.AvgLatency)
1354 fmt.Printf("Avg query latency: %v\n", queryAvg)
1355 fmt.Printf("Avg write latency: %v\n", writeAvg)
1356 fmt.Printf("P95 latency: %v\n", result.P95Latency)
1357 fmt.Printf("P99 latency: %v\n", result.P99Latency)
1358 }
1359
1360 func (b *Benchmark) generateEvents(count int) []*event.E {
1361 fmt.Printf("Generating %d unique synthetic events (minimum 300 bytes each)...\n", count)
1362
1363 // Create a single signer for all events (reusing key is faster)
1364 signer := p8k.MustNew()
1365 if err := signer.Generate(); err != nil {
1366 log.Fatalf("Failed to generate keypair: %v", err)
1367 }
1368
1369 // Base timestamp - start from current time and increment
1370 baseTime := time.Now().Unix()
1371
1372 // Minimum content size
1373 const minContentSize = 300
1374
1375 // Base content template
1376 baseContent := "This is a benchmark test event with realistic content size. "
1377
1378 // Pre-calculate how much padding we need
1379 paddingNeeded := minContentSize - len(baseContent)
1380 if paddingNeeded < 0 {
1381 paddingNeeded = 0
1382 }
1383
1384 // Create padding string (with varied characters for realistic size)
1385 padding := make([]byte, paddingNeeded)
1386 for i := range padding {
1387 padding[i] = ' ' + byte(i%94) // Printable ASCII characters
1388 }
1389
1390 events := make([]*event.E, count)
1391 for i := 0; i < count; i++ {
1392 ev := event.New()
1393 ev.Kind = kind.TextNote.K
1394 ev.CreatedAt = baseTime + int64(i) // Unique timestamp for each event
1395 ev.Tags = tag.NewS()
1396
1397 // Create content with unique identifier and padding
1398 ev.Content = []byte(fmt.Sprintf("%s Event #%d. %s", baseContent, i, string(padding)))
1399
1400 // Sign the event (this calculates ID and Sig)
1401 if err := ev.Sign(signer); err != nil {
1402 log.Fatalf("Failed to sign event %d: %v", i, err)
1403 }
1404
1405 events[i] = ev
1406 }
1407
1408 // Print stats
1409 totalSize := int64(0)
1410 for _, ev := range events {
1411 totalSize += int64(len(ev.Content))
1412 }
1413 avgSize := totalSize / int64(count)
1414
1415 fmt.Printf("Generated %d events:\n", count)
1416 fmt.Printf(" Average content size: %d bytes\n", avgSize)
1417 fmt.Printf(" All events are unique (incremental timestamps)\n")
1418 fmt.Printf(" All events are properly signed\n\n")
1419
1420 return events
1421 }
1422
1423 // printEventStats prints statistics about the loaded real-world events
1424 func (b *Benchmark) printEventStats() {
1425 if len(b.cachedEvents) == 0 {
1426 return
1427 }
1428
1429 // Analyze event distribution
1430 kindCounts := make(map[uint16]int)
1431 var totalSize int64
1432
1433 for _, ev := range b.cachedEvents {
1434 kindCounts[ev.Kind]++
1435 totalSize += int64(len(ev.Content))
1436 }
1437
1438 avgSize := totalSize / int64(len(b.cachedEvents))
1439
1440 fmt.Printf("\nEvent Statistics:\n")
1441 fmt.Printf(" Total events: %d\n", len(b.cachedEvents))
1442 fmt.Printf(" Average content size: %d bytes\n", avgSize)
1443 fmt.Printf(" Event kinds found: %d unique\n", len(kindCounts))
1444 fmt.Printf(" Most common kinds:\n")
1445
1446 // Print top 5 kinds
1447 type kindCount struct {
1448 kind uint16
1449 count int
1450 }
1451 var counts []kindCount
1452 for k, c := range kindCounts {
1453 counts = append(counts, kindCount{k, c})
1454 }
1455 sort.Slice(counts, func(i, j int) bool {
1456 return counts[i].count > counts[j].count
1457 })
1458 for i := 0; i < min(5, len(counts)); i++ {
1459 fmt.Printf(" Kind %d: %d events\n", counts[i].kind, counts[i].count)
1460 }
1461 fmt.Println()
1462 }
1463
1464 // loadRealEvents loads events from embedded examples.Cache on first call
1465 func (b *Benchmark) loadRealEvents() {
1466 b.eventCacheMu.Lock()
1467 defer b.eventCacheMu.Unlock()
1468
1469 // Only load once
1470 if len(b.cachedEvents) > 0 {
1471 return
1472 }
1473
1474 fmt.Println("Loading real-world sample events (11,596 events from 6 months of Nostr)...")
1475 scanner := bufio.NewScanner(bytes.NewReader(examples.Cache))
1476
1477 buf := make([]byte, 0, 64*1024)
1478 scanner.Buffer(buf, 1024*1024)
1479
1480 for scanner.Scan() {
1481 var ev event.E
1482 if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil {
1483 fmt.Printf("Warning: failed to unmarshal event: %v\n", err)
1484 continue
1485 }
1486 b.cachedEvents = append(b.cachedEvents, &ev)
1487 }
1488
1489 if err := scanner.Err(); err != nil {
1490 log.Fatalf("Failed to read events: %v", err)
1491 }
1492
1493 fmt.Printf("Loaded %d real-world events (already signed, zero crypto overhead)\n", len(b.cachedEvents))
1494 b.printEventStats()
1495 }
1496
1497 // getEventChannel returns a channel that streams unique synthetic events
1498 // bufferSize controls memory usage - larger buffers improve throughput but use more memory
1499 func (b *Benchmark) getEventChannel(count int, bufferSize int) (<-chan *event.E, <-chan error) {
1500 eventChan := make(chan *event.E, bufferSize)
1501 errChan := make(chan error, 1)
1502
1503 go func() {
1504 defer close(eventChan)
1505 defer close(errChan)
1506
1507 // Create a single signer for all events
1508 signer := p8k.MustNew()
1509 if err := signer.Generate(); err != nil {
1510 errChan <- fmt.Errorf("failed to generate keypair: %w", err)
1511 return
1512 }
1513
1514 // Base timestamp - start from current time and increment
1515 baseTime := time.Now().Unix()
1516
1517 // Minimum content size
1518 const minContentSize = 300
1519
1520 // Base content template
1521 baseContent := "This is a benchmark test event with realistic content size. "
1522
1523 // Pre-calculate padding
1524 paddingNeeded := minContentSize - len(baseContent)
1525 if paddingNeeded < 0 {
1526 paddingNeeded = 0
1527 }
1528
1529 // Create padding string (with varied characters for realistic size)
1530 padding := make([]byte, paddingNeeded)
1531 for i := range padding {
1532 padding[i] = ' ' + byte(i%94) // Printable ASCII characters
1533 }
1534
1535 // Stream unique events
1536 for i := 0; i < count; i++ {
1537 ev := event.New()
1538 ev.Kind = kind.TextNote.K
1539 ev.CreatedAt = baseTime + int64(i) // Unique timestamp for each event
1540 ev.Tags = tag.NewS()
1541
1542 // Create content with unique identifier and padding
1543 ev.Content = []byte(fmt.Sprintf("%s Event #%d. %s", baseContent, i, string(padding)))
1544
1545 // Sign the event (this calculates ID and Sig)
1546 if err := ev.Sign(signer); err != nil {
1547 errChan <- fmt.Errorf("failed to sign event %d: %w", i, err)
1548 return
1549 }
1550
1551 eventChan <- ev
1552 }
1553 }()
1554
1555 return eventChan, errChan
1556 }
1557
1558 // formatSize formats byte size in human-readable format
1559 func formatSize(bytes int) string {
1560 if bytes == 0 {
1561 return "Empty (0 bytes)"
1562 }
1563 if bytes < 1024 {
1564 return fmt.Sprintf("%d bytes", bytes)
1565 }
1566 if bytes < 1024*1024 {
1567 return fmt.Sprintf("%d KB", bytes/1024)
1568 }
1569 if bytes < 1024*1024*1024 {
1570 return fmt.Sprintf("%d MB", bytes/(1024*1024))
1571 }
1572 return fmt.Sprintf("%.2f GB", float64(bytes)/(1024*1024*1024))
1573 }
1574
1575 // min returns the minimum of two integers
1576 func min(a, b int) int {
1577 if a < b {
1578 return a
1579 }
1580 return b
1581 }
1582
1583 // max returns the maximum of two integers
1584 func max(a, b int) int {
1585 if a > b {
1586 return a
1587 }
1588 return b
1589 }
1590
1591 func (b *Benchmark) GenerateReport() {
1592 fmt.Println("\n" + strings.Repeat("=", 80))
1593 fmt.Println("BENCHMARK REPORT")
1594 fmt.Println(strings.Repeat("=", 80))
1595
1596 b.mu.RLock()
1597 defer b.mu.RUnlock()
1598
1599 for _, result := range b.results {
1600 fmt.Printf("\nTest: %s\n", result.TestName)
1601 fmt.Printf("Duration: %v\n", result.Duration)
1602 fmt.Printf("Total Events: %d\n", result.TotalEvents)
1603 fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
1604 fmt.Printf("Success Rate: %.1f%%\n", result.SuccessRate)
1605 fmt.Printf("Concurrent Workers: %d\n", result.ConcurrentWorkers)
1606 fmt.Printf("Memory Used: %d MB\n", result.MemoryUsed/(1024*1024))
1607 fmt.Printf("Avg Latency: %v\n", result.AvgLatency)
1608 fmt.Printf("P90 Latency: %v\n", result.P90Latency)
1609 fmt.Printf("P95 Latency: %v\n", result.P95Latency)
1610 fmt.Printf("P99 Latency: %v\n", result.P99Latency)
1611 fmt.Printf("Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg)
1612
1613 if len(result.Errors) > 0 {
1614 fmt.Printf("Errors (%d):\n", len(result.Errors))
1615 for i, err := range result.Errors {
1616 if i < 5 { // Show first 5 errors
1617 fmt.Printf(" - %s\n", err)
1618 }
1619 }
1620 if len(result.Errors) > 5 {
1621 fmt.Printf(" ... and %d more errors\n", len(result.Errors)-5)
1622 }
1623 }
1624 fmt.Println(strings.Repeat("-", 40))
1625 }
1626
1627 // Save report to file
1628 reportPath := filepath.Join(b.config.DataDir, "benchmark_report.txt")
1629 b.saveReportToFile(reportPath)
1630 fmt.Printf("\nReport saved to: %s\n", reportPath)
1631 }
1632
1633 func (b *Benchmark) saveReportToFile(path string) error {
1634 file, err := os.Create(path)
1635 if err != nil {
1636 return err
1637 }
1638 defer file.Close()
1639
1640 file.WriteString("NOSTR RELAY BENCHMARK REPORT\n")
1641 file.WriteString("============================\n\n")
1642 file.WriteString(
1643 fmt.Sprintf(
1644 "Generated: %s\n", time.Now().Format(time.RFC3339),
1645 ),
1646 )
1647 file.WriteString(fmt.Sprintf("Relay: next.orly.dev\n"))
1648 file.WriteString(fmt.Sprintf("Database: BadgerDB\n"))
1649 file.WriteString(fmt.Sprintf("Workers: %d\n", b.config.ConcurrentWorkers))
1650 file.WriteString(
1651 fmt.Sprintf(
1652 "Test Duration: %v\n\n", b.config.TestDuration,
1653 ),
1654 )
1655
1656 b.mu.RLock()
1657 defer b.mu.RUnlock()
1658
1659 for _, result := range b.results {
1660 file.WriteString(fmt.Sprintf("Test: %s\n", result.TestName))
1661 file.WriteString(fmt.Sprintf("Duration: %v\n", result.Duration))
1662 file.WriteString(fmt.Sprintf("Events: %d\n", result.TotalEvents))
1663 file.WriteString(
1664 fmt.Sprintf(
1665 "Events/sec: %.2f\n", result.EventsPerSecond,
1666 ),
1667 )
1668 file.WriteString(
1669 fmt.Sprintf(
1670 "Success Rate: %.1f%%\n", result.SuccessRate,
1671 ),
1672 )
1673 file.WriteString(fmt.Sprintf("Avg Latency: %v\n", result.AvgLatency))
1674 file.WriteString(fmt.Sprintf("P90 Latency: %v\n", result.P90Latency))
1675 file.WriteString(fmt.Sprintf("P95 Latency: %v\n", result.P95Latency))
1676 file.WriteString(fmt.Sprintf("P99 Latency: %v\n", result.P99Latency))
1677 file.WriteString(
1678 fmt.Sprintf(
1679 "Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg,
1680 ),
1681 )
1682 file.WriteString(
1683 fmt.Sprintf(
1684 "Memory: %d MB\n", result.MemoryUsed/(1024*1024),
1685 ),
1686 )
1687 file.WriteString("\n")
1688 }
1689
1690 return nil
1691 }
1692
1693 // GenerateAsciidocReport creates a simple AsciiDoc report alongside the text report.
1694 func (b *Benchmark) GenerateAsciidocReport() error {
1695 path := filepath.Join(b.config.DataDir, "benchmark_report.adoc")
1696 file, err := os.Create(path)
1697 if err != nil {
1698 return err
1699 }
1700 defer file.Close()
1701
1702 file.WriteString("= NOSTR Relay Benchmark Results\n\n")
1703 file.WriteString(
1704 fmt.Sprintf(
1705 "Generated: %s\n\n", time.Now().Format(time.RFC3339),
1706 ),
1707 )
1708 file.WriteString("[cols=\"1,^1,^1,^1,^1,^1\",options=\"header\"]\n")
1709 file.WriteString("|===\n")
1710 file.WriteString("| Test | Events/sec | Avg Latency | P90 | P95 | Bottom 10% Avg\n")
1711
1712 b.mu.RLock()
1713 defer b.mu.RUnlock()
1714 for _, r := range b.results {
1715 file.WriteString(fmt.Sprintf("| %s\n", r.TestName))
1716 file.WriteString(fmt.Sprintf("| %.2f\n", r.EventsPerSecond))
1717 file.WriteString(fmt.Sprintf("| %v\n", r.AvgLatency))
1718 file.WriteString(fmt.Sprintf("| %v\n", r.P90Latency))
1719 file.WriteString(fmt.Sprintf("| %v\n", r.P95Latency))
1720 file.WriteString(fmt.Sprintf("| %v\n", r.Bottom10Avg))
1721 }
1722 file.WriteString("|===\n")
1723
1724 fmt.Printf("AsciiDoc report saved to: %s\n", path)
1725 return nil
1726 }
1727
1728 // Helper functions
1729
1730 func calculateAvgLatency(latencies []time.Duration) time.Duration {
1731 if len(latencies) == 0 {
1732 return 0
1733 }
1734
1735 var total time.Duration
1736 for _, l := range latencies {
1737 total += l
1738 }
1739 return total / time.Duration(len(latencies))
1740 }
1741
1742 func calculatePercentileLatency(
1743 latencies []time.Duration, percentile float64,
1744 ) time.Duration {
1745 if len(latencies) == 0 {
1746 return 0
1747 }
1748 // Sort a copy to avoid mutating caller slice
1749 copySlice := make([]time.Duration, len(latencies))
1750 copy(copySlice, latencies)
1751 sort.Slice(
1752 copySlice, func(i, j int) bool { return copySlice[i] < copySlice[j] },
1753 )
1754 index := int(float64(len(copySlice)-1) * percentile)
1755 if index < 0 {
1756 index = 0
1757 }
1758 if index >= len(copySlice) {
1759 index = len(copySlice) - 1
1760 }
1761 return copySlice[index]
1762 }
1763
1764 // calculateBottom10Avg returns the average latency of the slowest 10% of samples.
1765 func calculateBottom10Avg(latencies []time.Duration) time.Duration {
1766 if len(latencies) == 0 {
1767 return 0
1768 }
1769 copySlice := make([]time.Duration, len(latencies))
1770 copy(copySlice, latencies)
1771 sort.Slice(
1772 copySlice, func(i, j int) bool { return copySlice[i] < copySlice[j] },
1773 )
1774 start := int(float64(len(copySlice)) * 0.9)
1775 if start < 0 {
1776 start = 0
1777 }
1778 if start >= len(copySlice) {
1779 start = len(copySlice) - 1
1780 }
1781 var total time.Duration
1782 for i := start; i < len(copySlice); i++ {
1783 total += copySlice[i]
1784 }
1785 count := len(copySlice) - start
1786 if count <= 0 {
1787 return 0
1788 }
1789 return total / time.Duration(count)
1790 }
1791
1792 func getMemUsage() uint64 {
1793 var m runtime.MemStats
1794 runtime.ReadMemStats(&m)
1795 return m.Alloc
1796 }
1797