graph_traversal_benchmark.go raw
1 package main
2
3 import (
4 "context"
5 "fmt"
6 "sort"
7 "sync"
8 "time"
9
10 "next.orly.dev/pkg/nostr/encoders/event"
11 "next.orly.dev/pkg/nostr/encoders/filter"
12 "next.orly.dev/pkg/nostr/encoders/hex"
13 "next.orly.dev/pkg/nostr/encoders/kind"
14 "next.orly.dev/pkg/nostr/encoders/tag"
15 "next.orly.dev/pkg/nostr/interfaces/signer/p8k"
16 "lukechampine.com/frand"
17 "next.orly.dev/pkg/database"
18 )
19
20 // GraphBenchNumPubkeys is the number of pubkeys to generate for graph benchmark.
21 // Set via -graph-pubkeys flag; default 10000.
22 var GraphBenchNumPubkeys = 10000
23
24 const (
25 // GraphBenchMinFollows is the minimum number of follows per pubkey
26 GraphBenchMinFollows = 1
27 // GraphBenchMaxFollows is the maximum number of follows per pubkey
28 GraphBenchMaxFollows = 1000
29 // GraphBenchSeed is the deterministic seed for frand PRNG (fits in uint64)
30 GraphBenchSeed uint64 = 0x4E6F737472 // "Nostr" in hex
31 // GraphBenchTraversalDepth is the depth of graph traversal (3 = third degree)
32 GraphBenchTraversalDepth = 3
33 )
34
35 // GraphTraversalBenchmark benchmarks graph traversal using NIP-01 style queries
36 type GraphTraversalBenchmark struct {
37 config *BenchmarkConfig
38 db *database.D
39 results []*BenchmarkResult
40 mu sync.RWMutex
41
42 // Cached data for the benchmark
43 pubkeys [][]byte // 100k pubkeys as 32-byte arrays
44 signers []*p8k.Signer // signers for each pubkey
45 follows [][]int // follows[i] = list of indices that pubkey[i] follows
46 rng *frand.RNG // deterministic PRNG
47 }
48
49 // NewGraphTraversalBenchmark creates a new graph traversal benchmark
50 func NewGraphTraversalBenchmark(config *BenchmarkConfig, db *database.D) *GraphTraversalBenchmark {
51 return &GraphTraversalBenchmark{
52 config: config,
53 db: db,
54 results: make([]*BenchmarkResult, 0),
55 rng: frand.NewCustom(make([]byte, 32), 1024, 12), // ChaCha12 with seed buffer
56 }
57 }
58
59 // initializeDeterministicRNG initializes the PRNG with deterministic seed
60 func (g *GraphTraversalBenchmark) initializeDeterministicRNG() {
61 // Create seed buffer from GraphBenchSeed (uint64 spread across 8 bytes)
62 seedBuf := make([]byte, 32)
63 seed := GraphBenchSeed
64 seedBuf[0] = byte(seed >> 56)
65 seedBuf[1] = byte(seed >> 48)
66 seedBuf[2] = byte(seed >> 40)
67 seedBuf[3] = byte(seed >> 32)
68 seedBuf[4] = byte(seed >> 24)
69 seedBuf[5] = byte(seed >> 16)
70 seedBuf[6] = byte(seed >> 8)
71 seedBuf[7] = byte(seed)
72 g.rng = frand.NewCustom(seedBuf, 1024, 12)
73 }
74
75 // generatePubkeys generates deterministic pubkeys using frand
76 func (g *GraphTraversalBenchmark) generatePubkeys() {
77 fmt.Printf("Generating %d deterministic pubkeys...\n", GraphBenchNumPubkeys)
78 start := time.Now()
79
80 g.initializeDeterministicRNG()
81 g.pubkeys = make([][]byte, GraphBenchNumPubkeys)
82 g.signers = make([]*p8k.Signer, GraphBenchNumPubkeys)
83
84 for i := 0; i < GraphBenchNumPubkeys; i++ {
85 // Generate deterministic 32-byte secret key from PRNG
86 secretKey := make([]byte, 32)
87 g.rng.Read(secretKey)
88
89 // Create signer from secret key
90 signer := p8k.MustNew()
91 if err := signer.InitSec(secretKey); err != nil {
92 panic(fmt.Sprintf("failed to init signer %d: %v", i, err))
93 }
94
95 g.signers[i] = signer
96 g.pubkeys[i] = make([]byte, 32)
97 copy(g.pubkeys[i], signer.Pub())
98
99 if (i+1)%10000 == 0 {
100 fmt.Printf(" Generated %d/%d pubkeys...\n", i+1, GraphBenchNumPubkeys)
101 }
102 }
103
104 fmt.Printf("Generated %d pubkeys in %v\n", GraphBenchNumPubkeys, time.Since(start))
105 }
106
107 // generateFollowGraph generates the random follow graph with deterministic PRNG
108 func (g *GraphTraversalBenchmark) generateFollowGraph() {
109 fmt.Printf("Generating follow graph (1-%d follows per pubkey)...\n", GraphBenchMaxFollows)
110 start := time.Now()
111
112 // Reset RNG to ensure deterministic follow graph
113 g.initializeDeterministicRNG()
114 // Skip the bytes used for pubkey generation
115 skipBuf := make([]byte, 32*GraphBenchNumPubkeys)
116 g.rng.Read(skipBuf)
117
118 g.follows = make([][]int, GraphBenchNumPubkeys)
119
120 totalFollows := 0
121 for i := 0; i < GraphBenchNumPubkeys; i++ {
122 // Determine number of follows for this pubkey (1 to 1000)
123 numFollows := int(g.rng.Uint64n(uint64(GraphBenchMaxFollows-GraphBenchMinFollows+1))) + GraphBenchMinFollows
124
125 // Generate random follow indices (excluding self)
126 followSet := make(map[int]struct{})
127 for len(followSet) < numFollows {
128 followIdx := int(g.rng.Uint64n(uint64(GraphBenchNumPubkeys)))
129 if followIdx != i {
130 followSet[followIdx] = struct{}{}
131 }
132 }
133
134 // Convert to slice
135 g.follows[i] = make([]int, 0, numFollows)
136 for idx := range followSet {
137 g.follows[i] = append(g.follows[i], idx)
138 }
139 totalFollows += numFollows
140
141 if (i+1)%10000 == 0 {
142 fmt.Printf(" Generated follow lists for %d/%d pubkeys...\n", i+1, GraphBenchNumPubkeys)
143 }
144 }
145
146 avgFollows := float64(totalFollows) / float64(GraphBenchNumPubkeys)
147 fmt.Printf("Generated follow graph in %v (avg %.1f follows/pubkey, total %d follows)\n",
148 time.Since(start), avgFollows, totalFollows)
149 }
150
151 // createFollowListEvents creates kind 3 follow list events in the database.
152 // Signing is done sequentially (p256k1 uses a global SHA-256 context that is
153 // not goroutine-safe), then saving is parallelized across workers.
154 func (g *GraphTraversalBenchmark) createFollowListEvents() {
155 fmt.Println("Creating follow list events in database...")
156 start := time.Now()
157
158 ctx := context.Background()
159 baseTime := time.Now().Unix()
160
161 var mu sync.Mutex
162 var wg sync.WaitGroup
163 var successCount, errorCount int64
164 latencies := make([]time.Duration, 0, GraphBenchNumPubkeys)
165
166 // Use worker pool for parallel database saves
167 numWorkers := g.config.ConcurrentWorkers
168 if numWorkers < 1 {
169 numWorkers = 4
170 }
171
172 // Channel carries pre-signed events for parallel saving
173 eventChan := make(chan *event.E, numWorkers*4)
174
175 for w := 0; w < numWorkers; w++ {
176 wg.Add(1)
177 go func() {
178 defer wg.Done()
179
180 for ev := range eventChan {
181 // Save to database
182 eventStart := time.Now()
183 _, err := g.db.SaveEvent(ctx, ev)
184 latency := time.Since(eventStart)
185
186 mu.Lock()
187 if err != nil {
188 errorCount++
189 } else {
190 successCount++
191 latencies = append(latencies, latency)
192 }
193 mu.Unlock()
194
195 ev.Free()
196 }
197 }()
198 }
199
200 // Sign events sequentially (p256k1.TaggedHash uses a global hash.Hash
201 // that panics under concurrent access) and feed to save workers.
202 signErrors := 0
203 for i := 0; i < GraphBenchNumPubkeys; i++ {
204 ev := event.New()
205 ev.Kind = kind.FollowList.K
206 ev.CreatedAt = baseTime + int64(i)
207 ev.Content = []byte("")
208 ev.Tags = tag.NewS()
209
210 // Add p tags for all follows
211 for _, followIdx := range g.follows[i] {
212 pubkeyHex := hex.Enc(g.pubkeys[followIdx])
213 ev.Tags.Append(tag.NewFromAny("p", pubkeyHex))
214 }
215
216 // Sign the event (must be sequential — p256k1 global state)
217 if err := ev.Sign(g.signers[i]); err != nil {
218 signErrors++
219 ev.Free()
220 continue
221 }
222
223 eventChan <- ev
224
225 if (i+1)%10000 == 0 {
226 fmt.Printf(" Signed and queued %d/%d follow list events...\n", i+1, GraphBenchNumPubkeys)
227 }
228 }
229 close(eventChan)
230 wg.Wait()
231
232 duration := time.Since(start)
233 eventsPerSec := float64(successCount) / duration.Seconds()
234
235 // Calculate latency stats
236 var avgLatency, p95Latency, p99Latency time.Duration
237 if len(latencies) > 0 {
238 sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] })
239 avgLatency = calculateAvgLatency(latencies)
240 p95Latency = calculatePercentileLatency(latencies, 0.95)
241 p99Latency = calculatePercentileLatency(latencies, 0.99)
242 }
243
244 fmt.Printf("Created %d follow list events in %v (%.2f events/sec, save errors: %d, sign errors: %d)\n",
245 successCount, duration, eventsPerSec, errorCount, signErrors)
246 fmt.Printf(" Avg latency: %v, P95: %v, P99: %v\n", avgLatency, p95Latency, p99Latency)
247
248 // Record result for event creation phase
249 result := &BenchmarkResult{
250 TestName: "Graph Setup (Follow Lists)",
251 Duration: duration,
252 TotalEvents: int(successCount),
253 EventsPerSecond: eventsPerSec,
254 AvgLatency: avgLatency,
255 P95Latency: p95Latency,
256 P99Latency: p99Latency,
257 ConcurrentWorkers: numWorkers,
258 MemoryUsed: getMemUsage(),
259 SuccessRate: float64(successCount) / float64(GraphBenchNumPubkeys) * 100,
260 }
261
262 g.mu.Lock()
263 g.results = append(g.results, result)
264 g.mu.Unlock()
265 }
266
267 // runThirdDegreeTraversal runs the third-degree graph traversal benchmark
268 func (g *GraphTraversalBenchmark) runThirdDegreeTraversal() {
269 fmt.Printf("\n=== Third-Degree Graph Traversal Benchmark ===\n")
270 fmt.Printf("Traversing 3 degrees of follows for each of %d pubkeys...\n", GraphBenchNumPubkeys)
271
272 start := time.Now()
273 ctx := context.Background()
274
275 var mu sync.Mutex
276 var wg sync.WaitGroup
277 var totalQueries int64
278 var totalPubkeysFound int64
279 queryLatencies := make([]time.Duration, 0, GraphBenchNumPubkeys*3)
280 traversalLatencies := make([]time.Duration, 0, GraphBenchNumPubkeys)
281
282 // Sample a subset for detailed traversal (full 100k would take too long)
283 sampleSize := 1000
284 if sampleSize > GraphBenchNumPubkeys {
285 sampleSize = GraphBenchNumPubkeys
286 }
287
288 // Deterministic sampling
289 g.initializeDeterministicRNG()
290 sampleIndices := make([]int, sampleSize)
291 for i := 0; i < sampleSize; i++ {
292 sampleIndices[i] = int(g.rng.Uint64n(uint64(GraphBenchNumPubkeys)))
293 }
294
295 fmt.Printf("Sampling %d pubkeys for traversal...\n", sampleSize)
296
297 numWorkers := g.config.ConcurrentWorkers
298 if numWorkers < 1 {
299 numWorkers = 4
300 }
301
302 workChan := make(chan int, numWorkers*2)
303
304 for w := 0; w < numWorkers; w++ {
305 wg.Add(1)
306 go func() {
307 defer wg.Done()
308
309 for startIdx := range workChan {
310 traversalStart := time.Now()
311 foundPubkeys := make(map[string]struct{})
312
313 // Start with the initial pubkey
314 currentLevel := [][]byte{g.pubkeys[startIdx]}
315 startPubkeyHex := hex.Enc(g.pubkeys[startIdx])
316 foundPubkeys[startPubkeyHex] = struct{}{}
317
318 // Traverse 3 degrees
319 for depth := 0; depth < GraphBenchTraversalDepth; depth++ {
320 if len(currentLevel) == 0 {
321 break
322 }
323
324 nextLevel := make([][]byte, 0)
325
326 // Query follow lists for all pubkeys at current level
327 // Batch queries for efficiency
328 batchSize := 100
329 for batchStart := 0; batchStart < len(currentLevel); batchStart += batchSize {
330 batchEnd := batchStart + batchSize
331 if batchEnd > len(currentLevel) {
332 batchEnd = len(currentLevel)
333 }
334
335 batch := currentLevel[batchStart:batchEnd]
336
337 // Build filter for kind 3 events from these pubkeys
338 f := filter.New()
339 f.Kinds = kind.NewS(kind.FollowList)
340 f.Authors = tag.NewWithCap(len(batch))
341 for _, pk := range batch {
342 // Authors.T expects raw byte slices (pubkeys)
343 f.Authors.T = append(f.Authors.T, pk)
344 }
345
346 queryStart := time.Now()
347 events, err := g.db.QueryEvents(ctx, f)
348 queryLatency := time.Since(queryStart)
349
350 mu.Lock()
351 totalQueries++
352 queryLatencies = append(queryLatencies, queryLatency)
353 mu.Unlock()
354
355 if err != nil {
356 continue
357 }
358
359 // Extract followed pubkeys from p tags
360 for _, ev := range events {
361 for _, t := range *ev.Tags {
362 if len(t.T) >= 2 && string(t.T[0]) == "p" {
363 pubkeyHex := string(t.ValueHex())
364 if _, exists := foundPubkeys[pubkeyHex]; !exists {
365 foundPubkeys[pubkeyHex] = struct{}{}
366 // Decode hex to bytes for next level
367 if pkBytes, err := hex.Dec(pubkeyHex); err == nil {
368 nextLevel = append(nextLevel, pkBytes)
369 }
370 }
371 }
372 }
373 ev.Free()
374 }
375 }
376
377 currentLevel = nextLevel
378 }
379
380 traversalLatency := time.Since(traversalStart)
381
382 mu.Lock()
383 totalPubkeysFound += int64(len(foundPubkeys))
384 traversalLatencies = append(traversalLatencies, traversalLatency)
385 mu.Unlock()
386 }
387 }()
388 }
389
390 // Send work
391 for _, idx := range sampleIndices {
392 workChan <- idx
393 }
394 close(workChan)
395 wg.Wait()
396
397 duration := time.Since(start)
398
399 // Calculate statistics
400 var avgQueryLatency, p95QueryLatency, p99QueryLatency time.Duration
401 if len(queryLatencies) > 0 {
402 sort.Slice(queryLatencies, func(i, j int) bool { return queryLatencies[i] < queryLatencies[j] })
403 avgQueryLatency = calculateAvgLatency(queryLatencies)
404 p95QueryLatency = calculatePercentileLatency(queryLatencies, 0.95)
405 p99QueryLatency = calculatePercentileLatency(queryLatencies, 0.99)
406 }
407
408 var avgTraversalLatency, p95TraversalLatency, p99TraversalLatency time.Duration
409 if len(traversalLatencies) > 0 {
410 sort.Slice(traversalLatencies, func(i, j int) bool { return traversalLatencies[i] < traversalLatencies[j] })
411 avgTraversalLatency = calculateAvgLatency(traversalLatencies)
412 p95TraversalLatency = calculatePercentileLatency(traversalLatencies, 0.95)
413 p99TraversalLatency = calculatePercentileLatency(traversalLatencies, 0.99)
414 }
415
416 avgPubkeysPerTraversal := float64(totalPubkeysFound) / float64(sampleSize)
417 traversalsPerSec := float64(sampleSize) / duration.Seconds()
418 queriesPerSec := float64(totalQueries) / duration.Seconds()
419
420 fmt.Printf("\n=== Graph Traversal Results ===\n")
421 fmt.Printf("Traversals completed: %d\n", sampleSize)
422 fmt.Printf("Total queries: %d (%.2f queries/sec)\n", totalQueries, queriesPerSec)
423 fmt.Printf("Avg pubkeys found per traversal: %.1f\n", avgPubkeysPerTraversal)
424 fmt.Printf("Total duration: %v\n", duration)
425 fmt.Printf("\nQuery Latencies:\n")
426 fmt.Printf(" Avg: %v, P95: %v, P99: %v\n", avgQueryLatency, p95QueryLatency, p99QueryLatency)
427 fmt.Printf("\nFull Traversal Latencies (3 degrees):\n")
428 fmt.Printf(" Avg: %v, P95: %v, P99: %v\n", avgTraversalLatency, p95TraversalLatency, p99TraversalLatency)
429 fmt.Printf("Traversals/sec: %.2f\n", traversalsPerSec)
430
431 // Record result for traversal phase
432 result := &BenchmarkResult{
433 TestName: "Graph Traversal (3 Degrees)",
434 Duration: duration,
435 TotalEvents: int(totalQueries),
436 EventsPerSecond: traversalsPerSec,
437 AvgLatency: avgTraversalLatency,
438 P90Latency: calculatePercentileLatency(traversalLatencies, 0.90),
439 P95Latency: p95TraversalLatency,
440 P99Latency: p99TraversalLatency,
441 Bottom10Avg: calculateBottom10Avg(traversalLatencies),
442 ConcurrentWorkers: numWorkers,
443 MemoryUsed: getMemUsage(),
444 SuccessRate: 100.0,
445 }
446
447 g.mu.Lock()
448 g.results = append(g.results, result)
449 g.mu.Unlock()
450
451 // Also record query performance separately
452 queryResult := &BenchmarkResult{
453 TestName: "Graph Queries (Follow Lists)",
454 Duration: duration,
455 TotalEvents: int(totalQueries),
456 EventsPerSecond: queriesPerSec,
457 AvgLatency: avgQueryLatency,
458 P90Latency: calculatePercentileLatency(queryLatencies, 0.90),
459 P95Latency: p95QueryLatency,
460 P99Latency: p99QueryLatency,
461 Bottom10Avg: calculateBottom10Avg(queryLatencies),
462 ConcurrentWorkers: numWorkers,
463 MemoryUsed: getMemUsage(),
464 SuccessRate: 100.0,
465 }
466
467 g.mu.Lock()
468 g.results = append(g.results, queryResult)
469 g.mu.Unlock()
470 }
471
472 // SeedDatabase generates pubkeys, follow graph, and creates follow list events.
473 // Call this before RunTraversal or PPG comparison to populate the database.
474 func (g *GraphTraversalBenchmark) SeedDatabase() {
475 fmt.Println("\n╔════════════════════════════════════════════════════════╗")
476 fmt.Println("║ SEEDING GRAPH DATABASE ║")
477 fmt.Println("╚════════════════════════════════════════════════════════╝")
478
479 // Step 1: Generate pubkeys
480 g.generatePubkeys()
481
482 // Step 2: Generate follow graph
483 g.generateFollowGraph()
484
485 // Step 3: Create follow list events in database
486 g.createFollowListEvents()
487
488 fmt.Printf("\n=== Database Seeded ===\n\n")
489 }
490
491 // RegeneratePubkeys regenerates the deterministic pubkey/signer arrays
492 // without re-seeding the database. Used when reusing an existing DB.
493 func (g *GraphTraversalBenchmark) RegeneratePubkeys() {
494 g.generatePubkeys()
495 g.generateFollowGraph()
496 }
497
498 // RunSuite runs the complete graph traversal benchmark suite
499 func (g *GraphTraversalBenchmark) RunSuite() {
500 g.SeedDatabase()
501 g.runThirdDegreeTraversal()
502 fmt.Printf("\n=== Graph Traversal Benchmark Complete ===\n\n")
503 }
504
505 // GetResults returns the benchmark results
506 func (g *GraphTraversalBenchmark) GetResults() []*BenchmarkResult {
507 g.mu.RLock()
508 defer g.mu.RUnlock()
509 return g.results
510 }
511
512 // PrintResults prints the benchmark results
513 func (g *GraphTraversalBenchmark) PrintResults() {
514 g.mu.RLock()
515 defer g.mu.RUnlock()
516
517 for _, result := range g.results {
518 fmt.Printf("\nTest: %s\n", result.TestName)
519 fmt.Printf("Duration: %v\n", result.Duration)
520 fmt.Printf("Total Events/Queries: %d\n", result.TotalEvents)
521 fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
522 fmt.Printf("Success Rate: %.1f%%\n", result.SuccessRate)
523 fmt.Printf("Concurrent Workers: %d\n", result.ConcurrentWorkers)
524 fmt.Printf("Memory Used: %d MB\n", result.MemoryUsed/(1024*1024))
525 fmt.Printf("Avg Latency: %v\n", result.AvgLatency)
526 fmt.Printf("P90 Latency: %v\n", result.P90Latency)
527 fmt.Printf("P95 Latency: %v\n", result.P95Latency)
528 fmt.Printf("P99 Latency: %v\n", result.P99Latency)
529 fmt.Printf("Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg)
530 }
531 }
532