benchmark_adapter.go raw
1 package main
2
3 import (
4 "context"
5 "fmt"
6 "sort"
7 "sync"
8 "time"
9
10 "next.orly.dev/pkg/database"
11 "next.orly.dev/pkg/nostr/encoders/event"
12 "next.orly.dev/pkg/nostr/encoders/filter"
13 "next.orly.dev/pkg/nostr/encoders/kind"
14 "next.orly.dev/pkg/nostr/encoders/tag"
15 "next.orly.dev/pkg/nostr/encoders/timestamp"
16 "next.orly.dev/pkg/nostr/interfaces/signer/p8k"
17 )
18
19 // BenchmarkAdapter adapts a database.Database interface to work with benchmark tests
20 type BenchmarkAdapter struct {
21 config *BenchmarkConfig
22 db database.Database
23 results []*BenchmarkResult
24 mu sync.RWMutex
25 cachedEvents []*event.E // Cache generated events to avoid expensive re-generation
26 eventCacheMu sync.Mutex
27 }
28
29 // NewBenchmarkAdapter creates a new benchmark adapter
30 func NewBenchmarkAdapter(config *BenchmarkConfig, db database.Database) *BenchmarkAdapter {
31 return &BenchmarkAdapter{
32 config: config,
33 db: db,
34 results: make([]*BenchmarkResult, 0),
35 }
36 }
37
38 // RunPeakThroughputTest runs the peak throughput benchmark
39 func (ba *BenchmarkAdapter) RunPeakThroughputTest() {
40 fmt.Println("\n=== Peak Throughput Test ===")
41
42 start := time.Now()
43 var wg sync.WaitGroup
44 var totalEvents int64
45 var errors []error
46 var latencies []time.Duration
47 var mu sync.Mutex
48
49 events := ba.generateEvents(ba.config.NumEvents)
50 eventChan := make(chan *event.E, len(events))
51
52 // Fill event channel
53 for _, ev := range events {
54 eventChan <- ev
55 }
56 close(eventChan)
57
58 // Calculate per-worker rate to avoid mutex contention
59 perWorkerRate := 20000.0 / float64(ba.config.ConcurrentWorkers)
60
61 for i := 0; i < ba.config.ConcurrentWorkers; i++ {
62 wg.Add(1)
63 go func(workerID int) {
64 defer wg.Done()
65
66 // Each worker gets its own rate limiter
67 workerLimiter := NewRateLimiter(perWorkerRate)
68
69 ctx := context.Background()
70 for ev := range eventChan {
71 // Wait for rate limiter to allow this event
72 workerLimiter.Wait()
73
74 eventStart := time.Now()
75 _, err := ba.db.SaveEvent(ctx, ev)
76 latency := time.Since(eventStart)
77
78 mu.Lock()
79 if err != nil {
80 errors = append(errors, err)
81 } else {
82 totalEvents++
83 latencies = append(latencies, latency)
84 }
85 mu.Unlock()
86 }
87 }(i)
88 }
89
90 wg.Wait()
91 duration := time.Since(start)
92
93 // Calculate metrics
94 result := &BenchmarkResult{
95 TestName: "Peak Throughput",
96 Duration: duration,
97 TotalEvents: int(totalEvents),
98 EventsPerSecond: float64(totalEvents) / duration.Seconds(),
99 ConcurrentWorkers: ba.config.ConcurrentWorkers,
100 MemoryUsed: getMemUsage(),
101 }
102
103 if len(latencies) > 0 {
104 sort.Slice(latencies, func(i, j int) bool {
105 return latencies[i] < latencies[j]
106 })
107 result.AvgLatency = calculateAverage(latencies)
108 result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
109 result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
110 result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
111
112 bottom10 := latencies[:int(float64(len(latencies))*0.10)]
113 result.Bottom10Avg = calculateAverage(bottom10)
114 }
115
116 result.SuccessRate = float64(totalEvents) / float64(ba.config.NumEvents) * 100
117 if len(errors) > 0 {
118 result.Errors = make([]string, 0, len(errors))
119 for _, err := range errors {
120 result.Errors = append(result.Errors, err.Error())
121 }
122 }
123
124 ba.mu.Lock()
125 ba.results = append(ba.results, result)
126 ba.mu.Unlock()
127
128 ba.printResult(result)
129 }
130
131 // RunBurstPatternTest runs burst pattern test
132 func (ba *BenchmarkAdapter) RunBurstPatternTest() {
133 fmt.Println("\n=== Burst Pattern Test ===")
134
135 start := time.Now()
136 var totalEvents int64
137 var latencies []time.Duration
138 var mu sync.Mutex
139
140 ctx := context.Background()
141 burstSize := 100
142 bursts := ba.config.NumEvents / burstSize
143
144 // Create rate limiter: cap at 20,000 events/second globally
145 rateLimiter := NewRateLimiter(20000)
146
147 for i := 0; i < bursts; i++ {
148 // Generate a burst of events
149 events := ba.generateEvents(burstSize)
150
151 var wg sync.WaitGroup
152 for _, ev := range events {
153 wg.Add(1)
154 go func(e *event.E) {
155 defer wg.Done()
156
157 // Wait for rate limiter to allow this event
158 rateLimiter.Wait()
159
160 eventStart := time.Now()
161 _, err := ba.db.SaveEvent(ctx, e)
162 latency := time.Since(eventStart)
163
164 mu.Lock()
165 if err == nil {
166 totalEvents++
167 latencies = append(latencies, latency)
168 }
169 mu.Unlock()
170 }(ev)
171 }
172
173 wg.Wait()
174
175 // Short pause between bursts
176 time.Sleep(10 * time.Millisecond)
177 }
178
179 duration := time.Since(start)
180
181 result := &BenchmarkResult{
182 TestName: "Burst Pattern",
183 Duration: duration,
184 TotalEvents: int(totalEvents),
185 EventsPerSecond: float64(totalEvents) / duration.Seconds(),
186 ConcurrentWorkers: burstSize,
187 MemoryUsed: getMemUsage(),
188 SuccessRate: float64(totalEvents) / float64(ba.config.NumEvents) * 100,
189 }
190
191 if len(latencies) > 0 {
192 sort.Slice(latencies, func(i, j int) bool {
193 return latencies[i] < latencies[j]
194 })
195 result.AvgLatency = calculateAverage(latencies)
196 result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
197 result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
198 result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
199
200 bottom10 := latencies[:int(float64(len(latencies))*0.10)]
201 result.Bottom10Avg = calculateAverage(bottom10)
202 }
203
204 ba.mu.Lock()
205 ba.results = append(ba.results, result)
206 ba.mu.Unlock()
207
208 ba.printResult(result)
209 }
210
211 // RunMixedReadWriteTest runs mixed read/write test
212 func (ba *BenchmarkAdapter) RunMixedReadWriteTest() {
213 fmt.Println("\n=== Mixed Read/Write Test ===")
214
215 // First, populate some events
216 fmt.Println("Populating database with initial events...")
217 populateEvents := ba.generateEvents(1000)
218 ctx := context.Background()
219
220 for _, ev := range populateEvents {
221 ba.db.SaveEvent(ctx, ev)
222 }
223
224 start := time.Now()
225 var writeCount, readCount int64
226 var latencies []time.Duration
227 var mu sync.Mutex
228 var wg sync.WaitGroup
229
230 // Create rate limiter for writes: cap at 20,000 events/second
231 rateLimiter := NewRateLimiter(20000)
232
233 // Start workers doing mixed read/write
234 for i := 0; i < ba.config.ConcurrentWorkers; i++ {
235 wg.Add(1)
236 go func(workerID int) {
237 defer wg.Done()
238
239 events := ba.generateEvents(ba.config.NumEvents / ba.config.ConcurrentWorkers)
240
241 for idx, ev := range events {
242 eventStart := time.Now()
243
244 if idx%3 == 0 {
245 // Read operation
246 f := filter.New()
247 f.Kinds = kind.NewS(kind.TextNote)
248 limit := uint(10)
249 f.Limit = &limit
250 _, _ = ba.db.QueryEvents(ctx, f)
251
252 mu.Lock()
253 readCount++
254 mu.Unlock()
255 } else {
256 // Write operation - apply rate limiting
257 rateLimiter.Wait()
258 _, _ = ba.db.SaveEvent(ctx, ev)
259
260 mu.Lock()
261 writeCount++
262 mu.Unlock()
263 }
264
265 latency := time.Since(eventStart)
266 mu.Lock()
267 latencies = append(latencies, latency)
268 mu.Unlock()
269 }
270 }(i)
271 }
272
273 wg.Wait()
274 duration := time.Since(start)
275
276 result := &BenchmarkResult{
277 TestName: fmt.Sprintf("Mixed R/W (R:%d W:%d)", readCount, writeCount),
278 Duration: duration,
279 TotalEvents: int(writeCount + readCount),
280 EventsPerSecond: float64(writeCount+readCount) / duration.Seconds(),
281 ConcurrentWorkers: ba.config.ConcurrentWorkers,
282 MemoryUsed: getMemUsage(),
283 SuccessRate: 100.0,
284 }
285
286 if len(latencies) > 0 {
287 sort.Slice(latencies, func(i, j int) bool {
288 return latencies[i] < latencies[j]
289 })
290 result.AvgLatency = calculateAverage(latencies)
291 result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
292 result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
293 result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
294
295 bottom10 := latencies[:int(float64(len(latencies))*0.10)]
296 result.Bottom10Avg = calculateAverage(bottom10)
297 }
298
299 ba.mu.Lock()
300 ba.results = append(ba.results, result)
301 ba.mu.Unlock()
302
303 ba.printResult(result)
304 }
305
306 // RunQueryTest runs query performance test
307 func (ba *BenchmarkAdapter) RunQueryTest() {
308 fmt.Println("\n=== Query Performance Test ===")
309
310 // Populate with test data
311 fmt.Println("Populating database for query tests...")
312 events := ba.generateEvents(5000)
313 ctx := context.Background()
314
315 for _, ev := range events {
316 ba.db.SaveEvent(ctx, ev)
317 }
318
319 start := time.Now()
320 var queryCount int64
321 var latencies []time.Duration
322 var mu sync.Mutex
323 var wg sync.WaitGroup
324
325 queryTypes := []func() *filter.F{
326 func() *filter.F {
327 f := filter.New()
328 f.Kinds = kind.NewS(kind.TextNote)
329 limit := uint(100)
330 f.Limit = &limit
331 return f
332 },
333 func() *filter.F {
334 f := filter.New()
335 f.Kinds = kind.NewS(kind.TextNote, kind.Repost)
336 limit := uint(50)
337 f.Limit = &limit
338 return f
339 },
340 func() *filter.F {
341 f := filter.New()
342 limit := uint(10)
343 f.Limit = &limit
344 since := time.Now().Add(-1 * time.Hour).Unix()
345 f.Since = timestamp.FromUnix(since)
346 return f
347 },
348 }
349
350 // Run concurrent queries
351 iterations := 1000
352 for i := 0; i < ba.config.ConcurrentWorkers; i++ {
353 wg.Add(1)
354 go func() {
355 defer wg.Done()
356
357 for j := 0; j < iterations/ba.config.ConcurrentWorkers; j++ {
358 f := queryTypes[j%len(queryTypes)]()
359
360 queryStart := time.Now()
361 _, _ = ba.db.QueryEvents(ctx, f)
362 latency := time.Since(queryStart)
363
364 mu.Lock()
365 queryCount++
366 latencies = append(latencies, latency)
367 mu.Unlock()
368 }
369 }()
370 }
371
372 wg.Wait()
373 duration := time.Since(start)
374
375 result := &BenchmarkResult{
376 TestName: fmt.Sprintf("Query Performance (%d queries)", queryCount),
377 Duration: duration,
378 TotalEvents: int(queryCount),
379 EventsPerSecond: float64(queryCount) / duration.Seconds(),
380 ConcurrentWorkers: ba.config.ConcurrentWorkers,
381 MemoryUsed: getMemUsage(),
382 SuccessRate: 100.0,
383 }
384
385 if len(latencies) > 0 {
386 sort.Slice(latencies, func(i, j int) bool {
387 return latencies[i] < latencies[j]
388 })
389 result.AvgLatency = calculateAverage(latencies)
390 result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
391 result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
392 result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
393
394 bottom10 := latencies[:int(float64(len(latencies))*0.10)]
395 result.Bottom10Avg = calculateAverage(bottom10)
396 }
397
398 ba.mu.Lock()
399 ba.results = append(ba.results, result)
400 ba.mu.Unlock()
401
402 ba.printResult(result)
403 }
404
405 // RunConcurrentQueryStoreTest runs concurrent query and store test
406 func (ba *BenchmarkAdapter) RunConcurrentQueryStoreTest() {
407 fmt.Println("\n=== Concurrent Query+Store Test ===")
408
409 start := time.Now()
410 var storeCount, queryCount int64
411 var latencies []time.Duration
412 var mu sync.Mutex
413 var wg sync.WaitGroup
414
415 ctx := context.Background()
416
417 // Half workers write, half query
418 halfWorkers := ba.config.ConcurrentWorkers / 2
419 if halfWorkers < 1 {
420 halfWorkers = 1
421 }
422
423 // Create rate limiter for writes: cap at 20,000 events/second
424 rateLimiter := NewRateLimiter(20000)
425
426 // Writers
427 for i := 0; i < halfWorkers; i++ {
428 wg.Add(1)
429 go func() {
430 defer wg.Done()
431
432 events := ba.generateEvents(ba.config.NumEvents / halfWorkers)
433 for _, ev := range events {
434 // Wait for rate limiter to allow this event
435 rateLimiter.Wait()
436
437 eventStart := time.Now()
438 ba.db.SaveEvent(ctx, ev)
439 latency := time.Since(eventStart)
440
441 mu.Lock()
442 storeCount++
443 latencies = append(latencies, latency)
444 mu.Unlock()
445 }
446 }()
447 }
448
449 // Readers
450 for i := 0; i < halfWorkers; i++ {
451 wg.Add(1)
452 go func() {
453 defer wg.Done()
454
455 for j := 0; j < ba.config.NumEvents/halfWorkers; j++ {
456 f := filter.New()
457 f.Kinds = kind.NewS(kind.TextNote)
458 limit := uint(10)
459 f.Limit = &limit
460
461 queryStart := time.Now()
462 ba.db.QueryEvents(ctx, f)
463 latency := time.Since(queryStart)
464
465 mu.Lock()
466 queryCount++
467 latencies = append(latencies, latency)
468 mu.Unlock()
469
470 time.Sleep(1 * time.Millisecond)
471 }
472 }()
473 }
474
475 wg.Wait()
476 duration := time.Since(start)
477
478 result := &BenchmarkResult{
479 TestName: fmt.Sprintf("Concurrent Q+S (Q:%d S:%d)", queryCount, storeCount),
480 Duration: duration,
481 TotalEvents: int(storeCount + queryCount),
482 EventsPerSecond: float64(storeCount+queryCount) / duration.Seconds(),
483 ConcurrentWorkers: ba.config.ConcurrentWorkers,
484 MemoryUsed: getMemUsage(),
485 SuccessRate: 100.0,
486 }
487
488 if len(latencies) > 0 {
489 sort.Slice(latencies, func(i, j int) bool {
490 return latencies[i] < latencies[j]
491 })
492 result.AvgLatency = calculateAverage(latencies)
493 result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
494 result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
495 result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
496
497 bottom10 := latencies[:int(float64(len(latencies))*0.10)]
498 result.Bottom10Avg = calculateAverage(bottom10)
499 }
500
501 ba.mu.Lock()
502 ba.results = append(ba.results, result)
503 ba.mu.Unlock()
504
505 ba.printResult(result)
506 }
507
508 // generateEvents generates unique synthetic events with realistic content sizes
509 func (ba *BenchmarkAdapter) generateEvents(count int) []*event.E {
510 fmt.Printf("Generating %d unique synthetic events (minimum 300 bytes each)...\n", count)
511
512 // Create a single signer for all events (reusing key is faster)
513 signer := p8k.MustNew()
514 if err := signer.Generate(); err != nil {
515 panic(fmt.Sprintf("Failed to generate keypair: %v", err))
516 }
517
518 // Base timestamp - start from current time and increment
519 baseTime := time.Now().Unix()
520
521 // Minimum content size
522 const minContentSize = 300
523
524 // Base content template
525 baseContent := "This is a benchmark test event with realistic content size. "
526
527 // Pre-calculate how much padding we need
528 paddingNeeded := minContentSize - len(baseContent)
529 if paddingNeeded < 0 {
530 paddingNeeded = 0
531 }
532
533 // Create padding string (with varied characters for realistic size)
534 padding := make([]byte, paddingNeeded)
535 for i := range padding {
536 padding[i] = ' ' + byte(i%94) // Printable ASCII characters
537 }
538
539 events := make([]*event.E, count)
540 for i := 0; i < count; i++ {
541 ev := event.New()
542 ev.Kind = kind.TextNote.K
543 ev.CreatedAt = baseTime + int64(i) // Unique timestamp for each event
544 ev.Tags = tag.NewS()
545
546 // Create content with unique identifier and padding
547 ev.Content = []byte(fmt.Sprintf("%s Event #%d. %s", baseContent, i, string(padding)))
548
549 // Sign the event (this calculates ID and Sig)
550 if err := ev.Sign(signer); err != nil {
551 panic(fmt.Sprintf("Failed to sign event %d: %v", i, err))
552 }
553
554 events[i] = ev
555 }
556
557 // Print stats
558 totalSize := int64(0)
559 for _, ev := range events {
560 totalSize += int64(len(ev.Content))
561 }
562 avgSize := totalSize / int64(count)
563
564 fmt.Printf("Generated %d events:\n", count)
565 fmt.Printf(" Average content size: %d bytes\n", avgSize)
566 fmt.Printf(" All events are unique (incremental timestamps)\n")
567 fmt.Printf(" All events are properly signed\n\n")
568
569 return events
570 }
571
572 func (ba *BenchmarkAdapter) printResult(r *BenchmarkResult) {
573 fmt.Printf("\nResults for %s:\n", r.TestName)
574 fmt.Printf(" Duration: %v\n", r.Duration)
575 fmt.Printf(" Total Events: %d\n", r.TotalEvents)
576 fmt.Printf(" Events/sec: %.2f\n", r.EventsPerSecond)
577 fmt.Printf(" Success Rate: %.2f%%\n", r.SuccessRate)
578 fmt.Printf(" Workers: %d\n", r.ConcurrentWorkers)
579 fmt.Printf(" Memory Used: %.2f MB\n", float64(r.MemoryUsed)/1024/1024)
580
581 if r.AvgLatency > 0 {
582 fmt.Printf(" Avg Latency: %v\n", r.AvgLatency)
583 fmt.Printf(" P90 Latency: %v\n", r.P90Latency)
584 fmt.Printf(" P95 Latency: %v\n", r.P95Latency)
585 fmt.Printf(" P99 Latency: %v\n", r.P99Latency)
586 fmt.Printf(" Bottom 10%% Avg: %v\n", r.Bottom10Avg)
587 }
588
589 if len(r.Errors) > 0 {
590 fmt.Printf(" Errors: %d\n", len(r.Errors))
591 // Print first few errors as samples
592 sampleCount := 3
593 if len(r.Errors) < sampleCount {
594 sampleCount = len(r.Errors)
595 }
596 for i := 0; i < sampleCount; i++ {
597 fmt.Printf(" Sample %d: %s\n", i+1, r.Errors[i])
598 }
599 }
600 }
601
602 func (ba *BenchmarkAdapter) GenerateReport() {
603 // Delegate to main benchmark report generator
604 // We'll add the results to a file
605 fmt.Println("\n=== Benchmark Results Summary ===")
606 ba.mu.RLock()
607 defer ba.mu.RUnlock()
608
609 for _, result := range ba.results {
610 ba.printResult(result)
611 }
612 }
613
614 func (ba *BenchmarkAdapter) GenerateAsciidocReport() {
615 // TODO: Implement asciidoc report generation
616 fmt.Println("Asciidoc report generation not yet implemented for adapter")
617 }
618
619 func calculateAverage(durations []time.Duration) time.Duration {
620 if len(durations) == 0 {
621 return 0
622 }
623
624 var total time.Duration
625 for _, d := range durations {
626 total += d
627 }
628 return total / time.Duration(len(durations))
629 }
630