benchmark_adapter.go raw

   1  package main
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"sort"
   7  	"sync"
   8  	"time"
   9  
  10  	"next.orly.dev/pkg/database"
  11  	"next.orly.dev/pkg/nostr/encoders/event"
  12  	"next.orly.dev/pkg/nostr/encoders/filter"
  13  	"next.orly.dev/pkg/nostr/encoders/kind"
  14  	"next.orly.dev/pkg/nostr/encoders/tag"
  15  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  16  	"next.orly.dev/pkg/nostr/interfaces/signer/p8k"
  17  )
  18  
  19  // BenchmarkAdapter adapts a database.Database interface to work with benchmark tests
  20  type BenchmarkAdapter struct {
  21  	config       *BenchmarkConfig
  22  	db           database.Database
  23  	results      []*BenchmarkResult
  24  	mu           sync.RWMutex
  25  	cachedEvents []*event.E // Cache generated events to avoid expensive re-generation
  26  	eventCacheMu sync.Mutex
  27  }
  28  
  29  // NewBenchmarkAdapter creates a new benchmark adapter
  30  func NewBenchmarkAdapter(config *BenchmarkConfig, db database.Database) *BenchmarkAdapter {
  31  	return &BenchmarkAdapter{
  32  		config:  config,
  33  		db:      db,
  34  		results: make([]*BenchmarkResult, 0),
  35  	}
  36  }
  37  
  38  // RunPeakThroughputTest runs the peak throughput benchmark
  39  func (ba *BenchmarkAdapter) RunPeakThroughputTest() {
  40  	fmt.Println("\n=== Peak Throughput Test ===")
  41  
  42  	start := time.Now()
  43  	var wg sync.WaitGroup
  44  	var totalEvents int64
  45  	var errors []error
  46  	var latencies []time.Duration
  47  	var mu sync.Mutex
  48  
  49  	events := ba.generateEvents(ba.config.NumEvents)
  50  	eventChan := make(chan *event.E, len(events))
  51  
  52  	// Fill event channel
  53  	for _, ev := range events {
  54  		eventChan <- ev
  55  	}
  56  	close(eventChan)
  57  
  58  	// Calculate per-worker rate to avoid mutex contention
  59  	perWorkerRate := 20000.0 / float64(ba.config.ConcurrentWorkers)
  60  
  61  	for i := 0; i < ba.config.ConcurrentWorkers; i++ {
  62  		wg.Add(1)
  63  		go func(workerID int) {
  64  			defer wg.Done()
  65  
  66  			// Each worker gets its own rate limiter
  67  			workerLimiter := NewRateLimiter(perWorkerRate)
  68  
  69  			ctx := context.Background()
  70  			for ev := range eventChan {
  71  				// Wait for rate limiter to allow this event
  72  				workerLimiter.Wait()
  73  
  74  				eventStart := time.Now()
  75  				_, err := ba.db.SaveEvent(ctx, ev)
  76  				latency := time.Since(eventStart)
  77  
  78  				mu.Lock()
  79  				if err != nil {
  80  					errors = append(errors, err)
  81  				} else {
  82  					totalEvents++
  83  					latencies = append(latencies, latency)
  84  				}
  85  				mu.Unlock()
  86  			}
  87  		}(i)
  88  	}
  89  
  90  	wg.Wait()
  91  	duration := time.Since(start)
  92  
  93  	// Calculate metrics
  94  	result := &BenchmarkResult{
  95  		TestName:          "Peak Throughput",
  96  		Duration:          duration,
  97  		TotalEvents:       int(totalEvents),
  98  		EventsPerSecond:   float64(totalEvents) / duration.Seconds(),
  99  		ConcurrentWorkers: ba.config.ConcurrentWorkers,
 100  		MemoryUsed:        getMemUsage(),
 101  	}
 102  
 103  	if len(latencies) > 0 {
 104  		sort.Slice(latencies, func(i, j int) bool {
 105  			return latencies[i] < latencies[j]
 106  		})
 107  		result.AvgLatency = calculateAverage(latencies)
 108  		result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
 109  		result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
 110  		result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
 111  
 112  		bottom10 := latencies[:int(float64(len(latencies))*0.10)]
 113  		result.Bottom10Avg = calculateAverage(bottom10)
 114  	}
 115  
 116  	result.SuccessRate = float64(totalEvents) / float64(ba.config.NumEvents) * 100
 117  	if len(errors) > 0 {
 118  		result.Errors = make([]string, 0, len(errors))
 119  		for _, err := range errors {
 120  			result.Errors = append(result.Errors, err.Error())
 121  		}
 122  	}
 123  
 124  	ba.mu.Lock()
 125  	ba.results = append(ba.results, result)
 126  	ba.mu.Unlock()
 127  
 128  	ba.printResult(result)
 129  }
 130  
 131  // RunBurstPatternTest runs burst pattern test
 132  func (ba *BenchmarkAdapter) RunBurstPatternTest() {
 133  	fmt.Println("\n=== Burst Pattern Test ===")
 134  
 135  	start := time.Now()
 136  	var totalEvents int64
 137  	var latencies []time.Duration
 138  	var mu sync.Mutex
 139  
 140  	ctx := context.Background()
 141  	burstSize := 100
 142  	bursts := ba.config.NumEvents / burstSize
 143  
 144  	// Create rate limiter: cap at 20,000 events/second globally
 145  	rateLimiter := NewRateLimiter(20000)
 146  
 147  	for i := 0; i < bursts; i++ {
 148  		// Generate a burst of events
 149  		events := ba.generateEvents(burstSize)
 150  
 151  		var wg sync.WaitGroup
 152  		for _, ev := range events {
 153  			wg.Add(1)
 154  			go func(e *event.E) {
 155  				defer wg.Done()
 156  
 157  				// Wait for rate limiter to allow this event
 158  				rateLimiter.Wait()
 159  
 160  				eventStart := time.Now()
 161  				_, err := ba.db.SaveEvent(ctx, e)
 162  				latency := time.Since(eventStart)
 163  
 164  				mu.Lock()
 165  				if err == nil {
 166  					totalEvents++
 167  					latencies = append(latencies, latency)
 168  				}
 169  				mu.Unlock()
 170  			}(ev)
 171  		}
 172  
 173  		wg.Wait()
 174  
 175  		// Short pause between bursts
 176  		time.Sleep(10 * time.Millisecond)
 177  	}
 178  
 179  	duration := time.Since(start)
 180  
 181  	result := &BenchmarkResult{
 182  		TestName:          "Burst Pattern",
 183  		Duration:          duration,
 184  		TotalEvents:       int(totalEvents),
 185  		EventsPerSecond:   float64(totalEvents) / duration.Seconds(),
 186  		ConcurrentWorkers: burstSize,
 187  		MemoryUsed:        getMemUsage(),
 188  		SuccessRate:       float64(totalEvents) / float64(ba.config.NumEvents) * 100,
 189  	}
 190  
 191  	if len(latencies) > 0 {
 192  		sort.Slice(latencies, func(i, j int) bool {
 193  			return latencies[i] < latencies[j]
 194  		})
 195  		result.AvgLatency = calculateAverage(latencies)
 196  		result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
 197  		result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
 198  		result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
 199  
 200  		bottom10 := latencies[:int(float64(len(latencies))*0.10)]
 201  		result.Bottom10Avg = calculateAverage(bottom10)
 202  	}
 203  
 204  	ba.mu.Lock()
 205  	ba.results = append(ba.results, result)
 206  	ba.mu.Unlock()
 207  
 208  	ba.printResult(result)
 209  }
 210  
 211  // RunMixedReadWriteTest runs mixed read/write test
 212  func (ba *BenchmarkAdapter) RunMixedReadWriteTest() {
 213  	fmt.Println("\n=== Mixed Read/Write Test ===")
 214  
 215  	// First, populate some events
 216  	fmt.Println("Populating database with initial events...")
 217  	populateEvents := ba.generateEvents(1000)
 218  	ctx := context.Background()
 219  
 220  	for _, ev := range populateEvents {
 221  		ba.db.SaveEvent(ctx, ev)
 222  	}
 223  
 224  	start := time.Now()
 225  	var writeCount, readCount int64
 226  	var latencies []time.Duration
 227  	var mu sync.Mutex
 228  	var wg sync.WaitGroup
 229  
 230  	// Create rate limiter for writes: cap at 20,000 events/second
 231  	rateLimiter := NewRateLimiter(20000)
 232  
 233  	// Start workers doing mixed read/write
 234  	for i := 0; i < ba.config.ConcurrentWorkers; i++ {
 235  		wg.Add(1)
 236  		go func(workerID int) {
 237  			defer wg.Done()
 238  
 239  			events := ba.generateEvents(ba.config.NumEvents / ba.config.ConcurrentWorkers)
 240  
 241  			for idx, ev := range events {
 242  				eventStart := time.Now()
 243  
 244  				if idx%3 == 0 {
 245  					// Read operation
 246  					f := filter.New()
 247  					f.Kinds = kind.NewS(kind.TextNote)
 248  					limit := uint(10)
 249  					f.Limit = &limit
 250  					_, _ = ba.db.QueryEvents(ctx, f)
 251  
 252  					mu.Lock()
 253  					readCount++
 254  					mu.Unlock()
 255  				} else {
 256  					// Write operation - apply rate limiting
 257  					rateLimiter.Wait()
 258  					_, _ = ba.db.SaveEvent(ctx, ev)
 259  
 260  					mu.Lock()
 261  					writeCount++
 262  					mu.Unlock()
 263  				}
 264  
 265  				latency := time.Since(eventStart)
 266  				mu.Lock()
 267  				latencies = append(latencies, latency)
 268  				mu.Unlock()
 269  			}
 270  		}(i)
 271  	}
 272  
 273  	wg.Wait()
 274  	duration := time.Since(start)
 275  
 276  	result := &BenchmarkResult{
 277  		TestName:          fmt.Sprintf("Mixed R/W (R:%d W:%d)", readCount, writeCount),
 278  		Duration:          duration,
 279  		TotalEvents:       int(writeCount + readCount),
 280  		EventsPerSecond:   float64(writeCount+readCount) / duration.Seconds(),
 281  		ConcurrentWorkers: ba.config.ConcurrentWorkers,
 282  		MemoryUsed:        getMemUsage(),
 283  		SuccessRate:       100.0,
 284  	}
 285  
 286  	if len(latencies) > 0 {
 287  		sort.Slice(latencies, func(i, j int) bool {
 288  			return latencies[i] < latencies[j]
 289  		})
 290  		result.AvgLatency = calculateAverage(latencies)
 291  		result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
 292  		result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
 293  		result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
 294  
 295  		bottom10 := latencies[:int(float64(len(latencies))*0.10)]
 296  		result.Bottom10Avg = calculateAverage(bottom10)
 297  	}
 298  
 299  	ba.mu.Lock()
 300  	ba.results = append(ba.results, result)
 301  	ba.mu.Unlock()
 302  
 303  	ba.printResult(result)
 304  }
 305  
 306  // RunQueryTest runs query performance test
 307  func (ba *BenchmarkAdapter) RunQueryTest() {
 308  	fmt.Println("\n=== Query Performance Test ===")
 309  
 310  	// Populate with test data
 311  	fmt.Println("Populating database for query tests...")
 312  	events := ba.generateEvents(5000)
 313  	ctx := context.Background()
 314  
 315  	for _, ev := range events {
 316  		ba.db.SaveEvent(ctx, ev)
 317  	}
 318  
 319  	start := time.Now()
 320  	var queryCount int64
 321  	var latencies []time.Duration
 322  	var mu sync.Mutex
 323  	var wg sync.WaitGroup
 324  
 325  	queryTypes := []func() *filter.F{
 326  		func() *filter.F {
 327  			f := filter.New()
 328  			f.Kinds = kind.NewS(kind.TextNote)
 329  			limit := uint(100)
 330  			f.Limit = &limit
 331  			return f
 332  		},
 333  		func() *filter.F {
 334  			f := filter.New()
 335  			f.Kinds = kind.NewS(kind.TextNote, kind.Repost)
 336  			limit := uint(50)
 337  			f.Limit = &limit
 338  			return f
 339  		},
 340  		func() *filter.F {
 341  			f := filter.New()
 342  			limit := uint(10)
 343  			f.Limit = &limit
 344  			since := time.Now().Add(-1 * time.Hour).Unix()
 345  			f.Since = timestamp.FromUnix(since)
 346  			return f
 347  		},
 348  	}
 349  
 350  	// Run concurrent queries
 351  	iterations := 1000
 352  	for i := 0; i < ba.config.ConcurrentWorkers; i++ {
 353  		wg.Add(1)
 354  		go func() {
 355  			defer wg.Done()
 356  
 357  			for j := 0; j < iterations/ba.config.ConcurrentWorkers; j++ {
 358  				f := queryTypes[j%len(queryTypes)]()
 359  
 360  				queryStart := time.Now()
 361  				_, _ = ba.db.QueryEvents(ctx, f)
 362  				latency := time.Since(queryStart)
 363  
 364  				mu.Lock()
 365  				queryCount++
 366  				latencies = append(latencies, latency)
 367  				mu.Unlock()
 368  			}
 369  		}()
 370  	}
 371  
 372  	wg.Wait()
 373  	duration := time.Since(start)
 374  
 375  	result := &BenchmarkResult{
 376  		TestName:          fmt.Sprintf("Query Performance (%d queries)", queryCount),
 377  		Duration:          duration,
 378  		TotalEvents:       int(queryCount),
 379  		EventsPerSecond:   float64(queryCount) / duration.Seconds(),
 380  		ConcurrentWorkers: ba.config.ConcurrentWorkers,
 381  		MemoryUsed:        getMemUsage(),
 382  		SuccessRate:       100.0,
 383  	}
 384  
 385  	if len(latencies) > 0 {
 386  		sort.Slice(latencies, func(i, j int) bool {
 387  			return latencies[i] < latencies[j]
 388  		})
 389  		result.AvgLatency = calculateAverage(latencies)
 390  		result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
 391  		result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
 392  		result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
 393  
 394  		bottom10 := latencies[:int(float64(len(latencies))*0.10)]
 395  		result.Bottom10Avg = calculateAverage(bottom10)
 396  	}
 397  
 398  	ba.mu.Lock()
 399  	ba.results = append(ba.results, result)
 400  	ba.mu.Unlock()
 401  
 402  	ba.printResult(result)
 403  }
 404  
 405  // RunConcurrentQueryStoreTest runs concurrent query and store test
 406  func (ba *BenchmarkAdapter) RunConcurrentQueryStoreTest() {
 407  	fmt.Println("\n=== Concurrent Query+Store Test ===")
 408  
 409  	start := time.Now()
 410  	var storeCount, queryCount int64
 411  	var latencies []time.Duration
 412  	var mu sync.Mutex
 413  	var wg sync.WaitGroup
 414  
 415  	ctx := context.Background()
 416  
 417  	// Half workers write, half query
 418  	halfWorkers := ba.config.ConcurrentWorkers / 2
 419  	if halfWorkers < 1 {
 420  		halfWorkers = 1
 421  	}
 422  
 423  	// Create rate limiter for writes: cap at 20,000 events/second
 424  	rateLimiter := NewRateLimiter(20000)
 425  
 426  	// Writers
 427  	for i := 0; i < halfWorkers; i++ {
 428  		wg.Add(1)
 429  		go func() {
 430  			defer wg.Done()
 431  
 432  			events := ba.generateEvents(ba.config.NumEvents / halfWorkers)
 433  			for _, ev := range events {
 434  				// Wait for rate limiter to allow this event
 435  				rateLimiter.Wait()
 436  
 437  				eventStart := time.Now()
 438  				ba.db.SaveEvent(ctx, ev)
 439  				latency := time.Since(eventStart)
 440  
 441  				mu.Lock()
 442  				storeCount++
 443  				latencies = append(latencies, latency)
 444  				mu.Unlock()
 445  			}
 446  		}()
 447  	}
 448  
 449  	// Readers
 450  	for i := 0; i < halfWorkers; i++ {
 451  		wg.Add(1)
 452  		go func() {
 453  			defer wg.Done()
 454  
 455  			for j := 0; j < ba.config.NumEvents/halfWorkers; j++ {
 456  				f := filter.New()
 457  				f.Kinds = kind.NewS(kind.TextNote)
 458  				limit := uint(10)
 459  				f.Limit = &limit
 460  
 461  				queryStart := time.Now()
 462  				ba.db.QueryEvents(ctx, f)
 463  				latency := time.Since(queryStart)
 464  
 465  				mu.Lock()
 466  				queryCount++
 467  				latencies = append(latencies, latency)
 468  				mu.Unlock()
 469  
 470  				time.Sleep(1 * time.Millisecond)
 471  			}
 472  		}()
 473  	}
 474  
 475  	wg.Wait()
 476  	duration := time.Since(start)
 477  
 478  	result := &BenchmarkResult{
 479  		TestName:          fmt.Sprintf("Concurrent Q+S (Q:%d S:%d)", queryCount, storeCount),
 480  		Duration:          duration,
 481  		TotalEvents:       int(storeCount + queryCount),
 482  		EventsPerSecond:   float64(storeCount+queryCount) / duration.Seconds(),
 483  		ConcurrentWorkers: ba.config.ConcurrentWorkers,
 484  		MemoryUsed:        getMemUsage(),
 485  		SuccessRate:       100.0,
 486  	}
 487  
 488  	if len(latencies) > 0 {
 489  		sort.Slice(latencies, func(i, j int) bool {
 490  			return latencies[i] < latencies[j]
 491  		})
 492  		result.AvgLatency = calculateAverage(latencies)
 493  		result.P90Latency = latencies[int(float64(len(latencies))*0.90)]
 494  		result.P95Latency = latencies[int(float64(len(latencies))*0.95)]
 495  		result.P99Latency = latencies[int(float64(len(latencies))*0.99)]
 496  
 497  		bottom10 := latencies[:int(float64(len(latencies))*0.10)]
 498  		result.Bottom10Avg = calculateAverage(bottom10)
 499  	}
 500  
 501  	ba.mu.Lock()
 502  	ba.results = append(ba.results, result)
 503  	ba.mu.Unlock()
 504  
 505  	ba.printResult(result)
 506  }
 507  
 508  // generateEvents generates unique synthetic events with realistic content sizes
 509  func (ba *BenchmarkAdapter) generateEvents(count int) []*event.E {
 510  	fmt.Printf("Generating %d unique synthetic events (minimum 300 bytes each)...\n", count)
 511  
 512  	// Create a single signer for all events (reusing key is faster)
 513  	signer := p8k.MustNew()
 514  	if err := signer.Generate(); err != nil {
 515  		panic(fmt.Sprintf("Failed to generate keypair: %v", err))
 516  	}
 517  
 518  	// Base timestamp - start from current time and increment
 519  	baseTime := time.Now().Unix()
 520  
 521  	// Minimum content size
 522  	const minContentSize = 300
 523  
 524  	// Base content template
 525  	baseContent := "This is a benchmark test event with realistic content size. "
 526  
 527  	// Pre-calculate how much padding we need
 528  	paddingNeeded := minContentSize - len(baseContent)
 529  	if paddingNeeded < 0 {
 530  		paddingNeeded = 0
 531  	}
 532  
 533  	// Create padding string (with varied characters for realistic size)
 534  	padding := make([]byte, paddingNeeded)
 535  	for i := range padding {
 536  		padding[i] = ' ' + byte(i%94) // Printable ASCII characters
 537  	}
 538  
 539  	events := make([]*event.E, count)
 540  	for i := 0; i < count; i++ {
 541  		ev := event.New()
 542  		ev.Kind = kind.TextNote.K
 543  		ev.CreatedAt = baseTime + int64(i) // Unique timestamp for each event
 544  		ev.Tags = tag.NewS()
 545  
 546  		// Create content with unique identifier and padding
 547  		ev.Content = []byte(fmt.Sprintf("%s Event #%d. %s", baseContent, i, string(padding)))
 548  
 549  		// Sign the event (this calculates ID and Sig)
 550  		if err := ev.Sign(signer); err != nil {
 551  			panic(fmt.Sprintf("Failed to sign event %d: %v", i, err))
 552  		}
 553  
 554  		events[i] = ev
 555  	}
 556  
 557  	// Print stats
 558  	totalSize := int64(0)
 559  	for _, ev := range events {
 560  		totalSize += int64(len(ev.Content))
 561  	}
 562  	avgSize := totalSize / int64(count)
 563  
 564  	fmt.Printf("Generated %d events:\n", count)
 565  	fmt.Printf("  Average content size: %d bytes\n", avgSize)
 566  	fmt.Printf("  All events are unique (incremental timestamps)\n")
 567  	fmt.Printf("  All events are properly signed\n\n")
 568  
 569  	return events
 570  }
 571  
 572  func (ba *BenchmarkAdapter) printResult(r *BenchmarkResult) {
 573  	fmt.Printf("\nResults for %s:\n", r.TestName)
 574  	fmt.Printf("  Duration: %v\n", r.Duration)
 575  	fmt.Printf("  Total Events: %d\n", r.TotalEvents)
 576  	fmt.Printf("  Events/sec: %.2f\n", r.EventsPerSecond)
 577  	fmt.Printf("  Success Rate: %.2f%%\n", r.SuccessRate)
 578  	fmt.Printf("  Workers: %d\n", r.ConcurrentWorkers)
 579  	fmt.Printf("  Memory Used: %.2f MB\n", float64(r.MemoryUsed)/1024/1024)
 580  
 581  	if r.AvgLatency > 0 {
 582  		fmt.Printf("  Avg Latency: %v\n", r.AvgLatency)
 583  		fmt.Printf("  P90 Latency: %v\n", r.P90Latency)
 584  		fmt.Printf("  P95 Latency: %v\n", r.P95Latency)
 585  		fmt.Printf("  P99 Latency: %v\n", r.P99Latency)
 586  		fmt.Printf("  Bottom 10%% Avg: %v\n", r.Bottom10Avg)
 587  	}
 588  
 589  	if len(r.Errors) > 0 {
 590  		fmt.Printf("  Errors: %d\n", len(r.Errors))
 591  		// Print first few errors as samples
 592  		sampleCount := 3
 593  		if len(r.Errors) < sampleCount {
 594  			sampleCount = len(r.Errors)
 595  		}
 596  		for i := 0; i < sampleCount; i++ {
 597  			fmt.Printf("    Sample %d: %s\n", i+1, r.Errors[i])
 598  		}
 599  	}
 600  }
 601  
 602  func (ba *BenchmarkAdapter) GenerateReport() {
 603  	// Delegate to main benchmark report generator
 604  	// We'll add the results to a file
 605  	fmt.Println("\n=== Benchmark Results Summary ===")
 606  	ba.mu.RLock()
 607  	defer ba.mu.RUnlock()
 608  
 609  	for _, result := range ba.results {
 610  		ba.printResult(result)
 611  	}
 612  }
 613  
 614  func (ba *BenchmarkAdapter) GenerateAsciidocReport() {
 615  	// TODO: Implement asciidoc report generation
 616  	fmt.Println("Asciidoc report generation not yet implemented for adapter")
 617  }
 618  
 619  func calculateAverage(durations []time.Duration) time.Duration {
 620  	if len(durations) == 0 {
 621  		return 0
 622  	}
 623  
 624  	var total time.Duration
 625  	for _, d := range durations {
 626  		total += d
 627  	}
 628  	return total / time.Duration(len(durations))
 629  }
 630