graph_traversal_benchmark.go raw

   1  package main
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"sort"
   7  	"sync"
   8  	"time"
   9  
  10  	"next.orly.dev/pkg/nostr/encoders/event"
  11  	"next.orly.dev/pkg/nostr/encoders/filter"
  12  	"next.orly.dev/pkg/nostr/encoders/hex"
  13  	"next.orly.dev/pkg/nostr/encoders/kind"
  14  	"next.orly.dev/pkg/nostr/encoders/tag"
  15  	"next.orly.dev/pkg/nostr/interfaces/signer/p8k"
  16  	"lukechampine.com/frand"
  17  	"next.orly.dev/pkg/database"
  18  )
  19  
  20  // GraphBenchNumPubkeys is the number of pubkeys to generate for graph benchmark.
  21  // Set via -graph-pubkeys flag; default 10000.
  22  var GraphBenchNumPubkeys = 10000
  23  
  24  const (
  25  	// GraphBenchMinFollows is the minimum number of follows per pubkey
  26  	GraphBenchMinFollows = 1
  27  	// GraphBenchMaxFollows is the maximum number of follows per pubkey
  28  	GraphBenchMaxFollows = 1000
  29  	// GraphBenchSeed is the deterministic seed for frand PRNG (fits in uint64)
  30  	GraphBenchSeed uint64 = 0x4E6F737472 // "Nostr" in hex
  31  	// GraphBenchTraversalDepth is the depth of graph traversal (3 = third degree)
  32  	GraphBenchTraversalDepth = 3
  33  )
  34  
  35  // GraphTraversalBenchmark benchmarks graph traversal using NIP-01 style queries
  36  type GraphTraversalBenchmark struct {
  37  	config  *BenchmarkConfig
  38  	db      *database.D
  39  	results []*BenchmarkResult
  40  	mu      sync.RWMutex
  41  
  42  	// Cached data for the benchmark
  43  	pubkeys  [][]byte          // 100k pubkeys as 32-byte arrays
  44  	signers  []*p8k.Signer     // signers for each pubkey
  45  	follows  [][]int           // follows[i] = list of indices that pubkey[i] follows
  46  	rng      *frand.RNG        // deterministic PRNG
  47  }
  48  
  49  // NewGraphTraversalBenchmark creates a new graph traversal benchmark
  50  func NewGraphTraversalBenchmark(config *BenchmarkConfig, db *database.D) *GraphTraversalBenchmark {
  51  	return &GraphTraversalBenchmark{
  52  		config:  config,
  53  		db:      db,
  54  		results: make([]*BenchmarkResult, 0),
  55  		rng:     frand.NewCustom(make([]byte, 32), 1024, 12), // ChaCha12 with seed buffer
  56  	}
  57  }
  58  
  59  // initializeDeterministicRNG initializes the PRNG with deterministic seed
  60  func (g *GraphTraversalBenchmark) initializeDeterministicRNG() {
  61  	// Create seed buffer from GraphBenchSeed (uint64 spread across 8 bytes)
  62  	seedBuf := make([]byte, 32)
  63  	seed := GraphBenchSeed
  64  	seedBuf[0] = byte(seed >> 56)
  65  	seedBuf[1] = byte(seed >> 48)
  66  	seedBuf[2] = byte(seed >> 40)
  67  	seedBuf[3] = byte(seed >> 32)
  68  	seedBuf[4] = byte(seed >> 24)
  69  	seedBuf[5] = byte(seed >> 16)
  70  	seedBuf[6] = byte(seed >> 8)
  71  	seedBuf[7] = byte(seed)
  72  	g.rng = frand.NewCustom(seedBuf, 1024, 12)
  73  }
  74  
  75  // generatePubkeys generates deterministic pubkeys using frand
  76  func (g *GraphTraversalBenchmark) generatePubkeys() {
  77  	fmt.Printf("Generating %d deterministic pubkeys...\n", GraphBenchNumPubkeys)
  78  	start := time.Now()
  79  
  80  	g.initializeDeterministicRNG()
  81  	g.pubkeys = make([][]byte, GraphBenchNumPubkeys)
  82  	g.signers = make([]*p8k.Signer, GraphBenchNumPubkeys)
  83  
  84  	for i := 0; i < GraphBenchNumPubkeys; i++ {
  85  		// Generate deterministic 32-byte secret key from PRNG
  86  		secretKey := make([]byte, 32)
  87  		g.rng.Read(secretKey)
  88  
  89  		// Create signer from secret key
  90  		signer := p8k.MustNew()
  91  		if err := signer.InitSec(secretKey); err != nil {
  92  			panic(fmt.Sprintf("failed to init signer %d: %v", i, err))
  93  		}
  94  
  95  		g.signers[i] = signer
  96  		g.pubkeys[i] = make([]byte, 32)
  97  		copy(g.pubkeys[i], signer.Pub())
  98  
  99  		if (i+1)%10000 == 0 {
 100  			fmt.Printf("  Generated %d/%d pubkeys...\n", i+1, GraphBenchNumPubkeys)
 101  		}
 102  	}
 103  
 104  	fmt.Printf("Generated %d pubkeys in %v\n", GraphBenchNumPubkeys, time.Since(start))
 105  }
 106  
 107  // generateFollowGraph generates the random follow graph with deterministic PRNG
 108  func (g *GraphTraversalBenchmark) generateFollowGraph() {
 109  	fmt.Printf("Generating follow graph (1-%d follows per pubkey)...\n", GraphBenchMaxFollows)
 110  	start := time.Now()
 111  
 112  	// Reset RNG to ensure deterministic follow graph
 113  	g.initializeDeterministicRNG()
 114  	// Skip the bytes used for pubkey generation
 115  	skipBuf := make([]byte, 32*GraphBenchNumPubkeys)
 116  	g.rng.Read(skipBuf)
 117  
 118  	g.follows = make([][]int, GraphBenchNumPubkeys)
 119  
 120  	totalFollows := 0
 121  	for i := 0; i < GraphBenchNumPubkeys; i++ {
 122  		// Determine number of follows for this pubkey (1 to 1000)
 123  		numFollows := int(g.rng.Uint64n(uint64(GraphBenchMaxFollows-GraphBenchMinFollows+1))) + GraphBenchMinFollows
 124  
 125  		// Generate random follow indices (excluding self)
 126  		followSet := make(map[int]struct{})
 127  		for len(followSet) < numFollows {
 128  			followIdx := int(g.rng.Uint64n(uint64(GraphBenchNumPubkeys)))
 129  			if followIdx != i {
 130  				followSet[followIdx] = struct{}{}
 131  			}
 132  		}
 133  
 134  		// Convert to slice
 135  		g.follows[i] = make([]int, 0, numFollows)
 136  		for idx := range followSet {
 137  			g.follows[i] = append(g.follows[i], idx)
 138  		}
 139  		totalFollows += numFollows
 140  
 141  		if (i+1)%10000 == 0 {
 142  			fmt.Printf("  Generated follow lists for %d/%d pubkeys...\n", i+1, GraphBenchNumPubkeys)
 143  		}
 144  	}
 145  
 146  	avgFollows := float64(totalFollows) / float64(GraphBenchNumPubkeys)
 147  	fmt.Printf("Generated follow graph in %v (avg %.1f follows/pubkey, total %d follows)\n",
 148  		time.Since(start), avgFollows, totalFollows)
 149  }
 150  
 151  // createFollowListEvents creates kind 3 follow list events in the database.
 152  // Signing is done sequentially (p256k1 uses a global SHA-256 context that is
 153  // not goroutine-safe), then saving is parallelized across workers.
 154  func (g *GraphTraversalBenchmark) createFollowListEvents() {
 155  	fmt.Println("Creating follow list events in database...")
 156  	start := time.Now()
 157  
 158  	ctx := context.Background()
 159  	baseTime := time.Now().Unix()
 160  
 161  	var mu sync.Mutex
 162  	var wg sync.WaitGroup
 163  	var successCount, errorCount int64
 164  	latencies := make([]time.Duration, 0, GraphBenchNumPubkeys)
 165  
 166  	// Use worker pool for parallel database saves
 167  	numWorkers := g.config.ConcurrentWorkers
 168  	if numWorkers < 1 {
 169  		numWorkers = 4
 170  	}
 171  
 172  	// Channel carries pre-signed events for parallel saving
 173  	eventChan := make(chan *event.E, numWorkers*4)
 174  
 175  	for w := 0; w < numWorkers; w++ {
 176  		wg.Add(1)
 177  		go func() {
 178  			defer wg.Done()
 179  
 180  			for ev := range eventChan {
 181  				// Save to database
 182  				eventStart := time.Now()
 183  				_, err := g.db.SaveEvent(ctx, ev)
 184  				latency := time.Since(eventStart)
 185  
 186  				mu.Lock()
 187  				if err != nil {
 188  					errorCount++
 189  				} else {
 190  					successCount++
 191  					latencies = append(latencies, latency)
 192  				}
 193  				mu.Unlock()
 194  
 195  				ev.Free()
 196  			}
 197  		}()
 198  	}
 199  
 200  	// Sign events sequentially (p256k1.TaggedHash uses a global hash.Hash
 201  	// that panics under concurrent access) and feed to save workers.
 202  	signErrors := 0
 203  	for i := 0; i < GraphBenchNumPubkeys; i++ {
 204  		ev := event.New()
 205  		ev.Kind = kind.FollowList.K
 206  		ev.CreatedAt = baseTime + int64(i)
 207  		ev.Content = []byte("")
 208  		ev.Tags = tag.NewS()
 209  
 210  		// Add p tags for all follows
 211  		for _, followIdx := range g.follows[i] {
 212  			pubkeyHex := hex.Enc(g.pubkeys[followIdx])
 213  			ev.Tags.Append(tag.NewFromAny("p", pubkeyHex))
 214  		}
 215  
 216  		// Sign the event (must be sequential — p256k1 global state)
 217  		if err := ev.Sign(g.signers[i]); err != nil {
 218  			signErrors++
 219  			ev.Free()
 220  			continue
 221  		}
 222  
 223  		eventChan <- ev
 224  
 225  		if (i+1)%10000 == 0 {
 226  			fmt.Printf("  Signed and queued %d/%d follow list events...\n", i+1, GraphBenchNumPubkeys)
 227  		}
 228  	}
 229  	close(eventChan)
 230  	wg.Wait()
 231  
 232  	duration := time.Since(start)
 233  	eventsPerSec := float64(successCount) / duration.Seconds()
 234  
 235  	// Calculate latency stats
 236  	var avgLatency, p95Latency, p99Latency time.Duration
 237  	if len(latencies) > 0 {
 238  		sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] })
 239  		avgLatency = calculateAvgLatency(latencies)
 240  		p95Latency = calculatePercentileLatency(latencies, 0.95)
 241  		p99Latency = calculatePercentileLatency(latencies, 0.99)
 242  	}
 243  
 244  	fmt.Printf("Created %d follow list events in %v (%.2f events/sec, save errors: %d, sign errors: %d)\n",
 245  		successCount, duration, eventsPerSec, errorCount, signErrors)
 246  	fmt.Printf("  Avg latency: %v, P95: %v, P99: %v\n", avgLatency, p95Latency, p99Latency)
 247  
 248  	// Record result for event creation phase
 249  	result := &BenchmarkResult{
 250  		TestName:          "Graph Setup (Follow Lists)",
 251  		Duration:          duration,
 252  		TotalEvents:       int(successCount),
 253  		EventsPerSecond:   eventsPerSec,
 254  		AvgLatency:        avgLatency,
 255  		P95Latency:        p95Latency,
 256  		P99Latency:        p99Latency,
 257  		ConcurrentWorkers: numWorkers,
 258  		MemoryUsed:        getMemUsage(),
 259  		SuccessRate:       float64(successCount) / float64(GraphBenchNumPubkeys) * 100,
 260  	}
 261  
 262  	g.mu.Lock()
 263  	g.results = append(g.results, result)
 264  	g.mu.Unlock()
 265  }
 266  
 267  // runThirdDegreeTraversal runs the third-degree graph traversal benchmark
 268  func (g *GraphTraversalBenchmark) runThirdDegreeTraversal() {
 269  	fmt.Printf("\n=== Third-Degree Graph Traversal Benchmark ===\n")
 270  	fmt.Printf("Traversing 3 degrees of follows for each of %d pubkeys...\n", GraphBenchNumPubkeys)
 271  
 272  	start := time.Now()
 273  	ctx := context.Background()
 274  
 275  	var mu sync.Mutex
 276  	var wg sync.WaitGroup
 277  	var totalQueries int64
 278  	var totalPubkeysFound int64
 279  	queryLatencies := make([]time.Duration, 0, GraphBenchNumPubkeys*3)
 280  	traversalLatencies := make([]time.Duration, 0, GraphBenchNumPubkeys)
 281  
 282  	// Sample a subset for detailed traversal (full 100k would take too long)
 283  	sampleSize := 1000
 284  	if sampleSize > GraphBenchNumPubkeys {
 285  		sampleSize = GraphBenchNumPubkeys
 286  	}
 287  
 288  	// Deterministic sampling
 289  	g.initializeDeterministicRNG()
 290  	sampleIndices := make([]int, sampleSize)
 291  	for i := 0; i < sampleSize; i++ {
 292  		sampleIndices[i] = int(g.rng.Uint64n(uint64(GraphBenchNumPubkeys)))
 293  	}
 294  
 295  	fmt.Printf("Sampling %d pubkeys for traversal...\n", sampleSize)
 296  
 297  	numWorkers := g.config.ConcurrentWorkers
 298  	if numWorkers < 1 {
 299  		numWorkers = 4
 300  	}
 301  
 302  	workChan := make(chan int, numWorkers*2)
 303  
 304  	for w := 0; w < numWorkers; w++ {
 305  		wg.Add(1)
 306  		go func() {
 307  			defer wg.Done()
 308  
 309  			for startIdx := range workChan {
 310  				traversalStart := time.Now()
 311  				foundPubkeys := make(map[string]struct{})
 312  
 313  				// Start with the initial pubkey
 314  				currentLevel := [][]byte{g.pubkeys[startIdx]}
 315  				startPubkeyHex := hex.Enc(g.pubkeys[startIdx])
 316  				foundPubkeys[startPubkeyHex] = struct{}{}
 317  
 318  				// Traverse 3 degrees
 319  				for depth := 0; depth < GraphBenchTraversalDepth; depth++ {
 320  					if len(currentLevel) == 0 {
 321  						break
 322  					}
 323  
 324  					nextLevel := make([][]byte, 0)
 325  
 326  					// Query follow lists for all pubkeys at current level
 327  					// Batch queries for efficiency
 328  					batchSize := 100
 329  					for batchStart := 0; batchStart < len(currentLevel); batchStart += batchSize {
 330  						batchEnd := batchStart + batchSize
 331  						if batchEnd > len(currentLevel) {
 332  							batchEnd = len(currentLevel)
 333  						}
 334  
 335  						batch := currentLevel[batchStart:batchEnd]
 336  
 337  						// Build filter for kind 3 events from these pubkeys
 338  						f := filter.New()
 339  						f.Kinds = kind.NewS(kind.FollowList)
 340  						f.Authors = tag.NewWithCap(len(batch))
 341  						for _, pk := range batch {
 342  							// Authors.T expects raw byte slices (pubkeys)
 343  							f.Authors.T = append(f.Authors.T, pk)
 344  						}
 345  
 346  						queryStart := time.Now()
 347  						events, err := g.db.QueryEvents(ctx, f)
 348  						queryLatency := time.Since(queryStart)
 349  
 350  						mu.Lock()
 351  						totalQueries++
 352  						queryLatencies = append(queryLatencies, queryLatency)
 353  						mu.Unlock()
 354  
 355  						if err != nil {
 356  							continue
 357  						}
 358  
 359  						// Extract followed pubkeys from p tags
 360  						for _, ev := range events {
 361  							for _, t := range *ev.Tags {
 362  								if len(t.T) >= 2 && string(t.T[0]) == "p" {
 363  									pubkeyHex := string(t.ValueHex())
 364  									if _, exists := foundPubkeys[pubkeyHex]; !exists {
 365  										foundPubkeys[pubkeyHex] = struct{}{}
 366  										// Decode hex to bytes for next level
 367  										if pkBytes, err := hex.Dec(pubkeyHex); err == nil {
 368  											nextLevel = append(nextLevel, pkBytes)
 369  										}
 370  									}
 371  								}
 372  							}
 373  							ev.Free()
 374  						}
 375  					}
 376  
 377  					currentLevel = nextLevel
 378  				}
 379  
 380  				traversalLatency := time.Since(traversalStart)
 381  
 382  				mu.Lock()
 383  				totalPubkeysFound += int64(len(foundPubkeys))
 384  				traversalLatencies = append(traversalLatencies, traversalLatency)
 385  				mu.Unlock()
 386  			}
 387  		}()
 388  	}
 389  
 390  	// Send work
 391  	for _, idx := range sampleIndices {
 392  		workChan <- idx
 393  	}
 394  	close(workChan)
 395  	wg.Wait()
 396  
 397  	duration := time.Since(start)
 398  
 399  	// Calculate statistics
 400  	var avgQueryLatency, p95QueryLatency, p99QueryLatency time.Duration
 401  	if len(queryLatencies) > 0 {
 402  		sort.Slice(queryLatencies, func(i, j int) bool { return queryLatencies[i] < queryLatencies[j] })
 403  		avgQueryLatency = calculateAvgLatency(queryLatencies)
 404  		p95QueryLatency = calculatePercentileLatency(queryLatencies, 0.95)
 405  		p99QueryLatency = calculatePercentileLatency(queryLatencies, 0.99)
 406  	}
 407  
 408  	var avgTraversalLatency, p95TraversalLatency, p99TraversalLatency time.Duration
 409  	if len(traversalLatencies) > 0 {
 410  		sort.Slice(traversalLatencies, func(i, j int) bool { return traversalLatencies[i] < traversalLatencies[j] })
 411  		avgTraversalLatency = calculateAvgLatency(traversalLatencies)
 412  		p95TraversalLatency = calculatePercentileLatency(traversalLatencies, 0.95)
 413  		p99TraversalLatency = calculatePercentileLatency(traversalLatencies, 0.99)
 414  	}
 415  
 416  	avgPubkeysPerTraversal := float64(totalPubkeysFound) / float64(sampleSize)
 417  	traversalsPerSec := float64(sampleSize) / duration.Seconds()
 418  	queriesPerSec := float64(totalQueries) / duration.Seconds()
 419  
 420  	fmt.Printf("\n=== Graph Traversal Results ===\n")
 421  	fmt.Printf("Traversals completed: %d\n", sampleSize)
 422  	fmt.Printf("Total queries: %d (%.2f queries/sec)\n", totalQueries, queriesPerSec)
 423  	fmt.Printf("Avg pubkeys found per traversal: %.1f\n", avgPubkeysPerTraversal)
 424  	fmt.Printf("Total duration: %v\n", duration)
 425  	fmt.Printf("\nQuery Latencies:\n")
 426  	fmt.Printf("  Avg: %v, P95: %v, P99: %v\n", avgQueryLatency, p95QueryLatency, p99QueryLatency)
 427  	fmt.Printf("\nFull Traversal Latencies (3 degrees):\n")
 428  	fmt.Printf("  Avg: %v, P95: %v, P99: %v\n", avgTraversalLatency, p95TraversalLatency, p99TraversalLatency)
 429  	fmt.Printf("Traversals/sec: %.2f\n", traversalsPerSec)
 430  
 431  	// Record result for traversal phase
 432  	result := &BenchmarkResult{
 433  		TestName:          "Graph Traversal (3 Degrees)",
 434  		Duration:          duration,
 435  		TotalEvents:       int(totalQueries),
 436  		EventsPerSecond:   traversalsPerSec,
 437  		AvgLatency:        avgTraversalLatency,
 438  		P90Latency:        calculatePercentileLatency(traversalLatencies, 0.90),
 439  		P95Latency:        p95TraversalLatency,
 440  		P99Latency:        p99TraversalLatency,
 441  		Bottom10Avg:       calculateBottom10Avg(traversalLatencies),
 442  		ConcurrentWorkers: numWorkers,
 443  		MemoryUsed:        getMemUsage(),
 444  		SuccessRate:       100.0,
 445  	}
 446  
 447  	g.mu.Lock()
 448  	g.results = append(g.results, result)
 449  	g.mu.Unlock()
 450  
 451  	// Also record query performance separately
 452  	queryResult := &BenchmarkResult{
 453  		TestName:          "Graph Queries (Follow Lists)",
 454  		Duration:          duration,
 455  		TotalEvents:       int(totalQueries),
 456  		EventsPerSecond:   queriesPerSec,
 457  		AvgLatency:        avgQueryLatency,
 458  		P90Latency:        calculatePercentileLatency(queryLatencies, 0.90),
 459  		P95Latency:        p95QueryLatency,
 460  		P99Latency:        p99QueryLatency,
 461  		Bottom10Avg:       calculateBottom10Avg(queryLatencies),
 462  		ConcurrentWorkers: numWorkers,
 463  		MemoryUsed:        getMemUsage(),
 464  		SuccessRate:       100.0,
 465  	}
 466  
 467  	g.mu.Lock()
 468  	g.results = append(g.results, queryResult)
 469  	g.mu.Unlock()
 470  }
 471  
 472  // SeedDatabase generates pubkeys, follow graph, and creates follow list events.
 473  // Call this before RunTraversal or PPG comparison to populate the database.
 474  func (g *GraphTraversalBenchmark) SeedDatabase() {
 475  	fmt.Println("\n╔════════════════════════════════════════════════════════╗")
 476  	fmt.Println("║      SEEDING GRAPH DATABASE                            ║")
 477  	fmt.Println("╚════════════════════════════════════════════════════════╝")
 478  
 479  	// Step 1: Generate pubkeys
 480  	g.generatePubkeys()
 481  
 482  	// Step 2: Generate follow graph
 483  	g.generateFollowGraph()
 484  
 485  	// Step 3: Create follow list events in database
 486  	g.createFollowListEvents()
 487  
 488  	fmt.Printf("\n=== Database Seeded ===\n\n")
 489  }
 490  
 491  // RegeneratePubkeys regenerates the deterministic pubkey/signer arrays
 492  // without re-seeding the database. Used when reusing an existing DB.
 493  func (g *GraphTraversalBenchmark) RegeneratePubkeys() {
 494  	g.generatePubkeys()
 495  	g.generateFollowGraph()
 496  }
 497  
 498  // RunSuite runs the complete graph traversal benchmark suite
 499  func (g *GraphTraversalBenchmark) RunSuite() {
 500  	g.SeedDatabase()
 501  	g.runThirdDegreeTraversal()
 502  	fmt.Printf("\n=== Graph Traversal Benchmark Complete ===\n\n")
 503  }
 504  
 505  // GetResults returns the benchmark results
 506  func (g *GraphTraversalBenchmark) GetResults() []*BenchmarkResult {
 507  	g.mu.RLock()
 508  	defer g.mu.RUnlock()
 509  	return g.results
 510  }
 511  
 512  // PrintResults prints the benchmark results
 513  func (g *GraphTraversalBenchmark) PrintResults() {
 514  	g.mu.RLock()
 515  	defer g.mu.RUnlock()
 516  
 517  	for _, result := range g.results {
 518  		fmt.Printf("\nTest: %s\n", result.TestName)
 519  		fmt.Printf("Duration: %v\n", result.Duration)
 520  		fmt.Printf("Total Events/Queries: %d\n", result.TotalEvents)
 521  		fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
 522  		fmt.Printf("Success Rate: %.1f%%\n", result.SuccessRate)
 523  		fmt.Printf("Concurrent Workers: %d\n", result.ConcurrentWorkers)
 524  		fmt.Printf("Memory Used: %d MB\n", result.MemoryUsed/(1024*1024))
 525  		fmt.Printf("Avg Latency: %v\n", result.AvgLatency)
 526  		fmt.Printf("P90 Latency: %v\n", result.P90Latency)
 527  		fmt.Printf("P95 Latency: %v\n", result.P95Latency)
 528  		fmt.Printf("P99 Latency: %v\n", result.P99Latency)
 529  		fmt.Printf("Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg)
 530  	}
 531  }
 532