graph_traversal_network.go raw
1 package main
2
3 import (
4 "context"
5 "fmt"
6 "sort"
7 "sync"
8 "time"
9
10 "next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
11 "next.orly.dev/pkg/nostr/encoders/event"
12 "next.orly.dev/pkg/nostr/encoders/filter"
13 "next.orly.dev/pkg/nostr/encoders/hex"
14 "next.orly.dev/pkg/nostr/encoders/kind"
15 "next.orly.dev/pkg/nostr/encoders/tag"
16 "next.orly.dev/pkg/nostr/interfaces/signer/p8k"
17 "next.orly.dev/pkg/nostr/ws"
18 "lukechampine.com/frand"
19 )
20
21 // NetworkGraphTraversalBenchmark benchmarks graph traversal using NIP-01 queries over WebSocket
22 type NetworkGraphTraversalBenchmark struct {
23 relayURL string
24 relay *ws.Client
25 results []*BenchmarkResult
26 mu sync.RWMutex
27 workers int
28
29 // Cached data for the benchmark
30 pubkeys [][]byte // 100k pubkeys as 32-byte arrays
31 signers []*p8k.Signer // signers for each pubkey
32 follows [][]int // follows[i] = list of indices that pubkey[i] follows
33 rng *frand.RNG // deterministic PRNG
34 }
35
36 // NewNetworkGraphTraversalBenchmark creates a new network graph traversal benchmark
37 func NewNetworkGraphTraversalBenchmark(relayURL string, workers int) *NetworkGraphTraversalBenchmark {
38 return &NetworkGraphTraversalBenchmark{
39 relayURL: relayURL,
40 workers: workers,
41 results: make([]*BenchmarkResult, 0),
42 rng: frand.NewCustom(make([]byte, 32), 1024, 12), // ChaCha12 with seed buffer
43 }
44 }
45
46 // Connect establishes WebSocket connection to the relay
47 func (n *NetworkGraphTraversalBenchmark) Connect(ctx context.Context) error {
48 var err error
49 n.relay, err = ws.RelayConnect(ctx, n.relayURL)
50 if err != nil {
51 return fmt.Errorf("failed to connect to relay %s: %w", n.relayURL, err)
52 }
53 return nil
54 }
55
56 // Close closes the relay connection
57 func (n *NetworkGraphTraversalBenchmark) Close() {
58 if n.relay != nil {
59 n.relay.Close()
60 }
61 }
62
63 // initializeDeterministicRNG initializes the PRNG with deterministic seed
64 func (n *NetworkGraphTraversalBenchmark) initializeDeterministicRNG() {
65 // Create seed buffer from GraphBenchSeed (uint64 spread across 8 bytes)
66 seedBuf := make([]byte, 32)
67 seed := GraphBenchSeed
68 seedBuf[0] = byte(seed >> 56)
69 seedBuf[1] = byte(seed >> 48)
70 seedBuf[2] = byte(seed >> 40)
71 seedBuf[3] = byte(seed >> 32)
72 seedBuf[4] = byte(seed >> 24)
73 seedBuf[5] = byte(seed >> 16)
74 seedBuf[6] = byte(seed >> 8)
75 seedBuf[7] = byte(seed)
76 n.rng = frand.NewCustom(seedBuf, 1024, 12)
77 }
78
79 // generatePubkeys generates deterministic pubkeys using frand
80 func (n *NetworkGraphTraversalBenchmark) generatePubkeys() {
81 fmt.Printf("Generating %d deterministic pubkeys...\n", GraphBenchNumPubkeys)
82 start := time.Now()
83
84 n.initializeDeterministicRNG()
85 n.pubkeys = make([][]byte, GraphBenchNumPubkeys)
86 n.signers = make([]*p8k.Signer, GraphBenchNumPubkeys)
87
88 for i := 0; i < GraphBenchNumPubkeys; i++ {
89 // Generate deterministic 32-byte secret key from PRNG
90 secretKey := make([]byte, 32)
91 n.rng.Read(secretKey)
92
93 // Create signer from secret key
94 signer := p8k.MustNew()
95 if err := signer.InitSec(secretKey); err != nil {
96 panic(fmt.Sprintf("failed to init signer %d: %v", i, err))
97 }
98
99 n.signers[i] = signer
100 n.pubkeys[i] = make([]byte, 32)
101 copy(n.pubkeys[i], signer.Pub())
102
103 if (i+1)%10000 == 0 {
104 fmt.Printf(" Generated %d/%d pubkeys...\n", i+1, GraphBenchNumPubkeys)
105 }
106 }
107
108 fmt.Printf("Generated %d pubkeys in %v\n", GraphBenchNumPubkeys, time.Since(start))
109 }
110
111 // generateFollowGraph generates the random follow graph with deterministic PRNG
112 func (n *NetworkGraphTraversalBenchmark) generateFollowGraph() {
113 fmt.Printf("Generating follow graph (1-%d follows per pubkey)...\n", GraphBenchMaxFollows)
114 start := time.Now()
115
116 // Reset RNG to ensure deterministic follow graph
117 n.initializeDeterministicRNG()
118 // Skip the bytes used for pubkey generation
119 skipBuf := make([]byte, 32*GraphBenchNumPubkeys)
120 n.rng.Read(skipBuf)
121
122 n.follows = make([][]int, GraphBenchNumPubkeys)
123
124 totalFollows := 0
125 for i := 0; i < GraphBenchNumPubkeys; i++ {
126 // Determine number of follows for this pubkey (1 to 1000)
127 numFollows := int(n.rng.Uint64n(uint64(GraphBenchMaxFollows-GraphBenchMinFollows+1))) + GraphBenchMinFollows
128
129 // Generate random follow indices (excluding self)
130 followSet := make(map[int]struct{})
131 for len(followSet) < numFollows {
132 followIdx := int(n.rng.Uint64n(uint64(GraphBenchNumPubkeys)))
133 if followIdx != i {
134 followSet[followIdx] = struct{}{}
135 }
136 }
137
138 // Convert to slice
139 n.follows[i] = make([]int, 0, numFollows)
140 for idx := range followSet {
141 n.follows[i] = append(n.follows[i], idx)
142 }
143 totalFollows += numFollows
144
145 if (i+1)%10000 == 0 {
146 fmt.Printf(" Generated follow lists for %d/%d pubkeys...\n", i+1, GraphBenchNumPubkeys)
147 }
148 }
149
150 avgFollows := float64(totalFollows) / float64(GraphBenchNumPubkeys)
151 fmt.Printf("Generated follow graph in %v (avg %.1f follows/pubkey, total %d follows)\n",
152 time.Since(start), avgFollows, totalFollows)
153 }
154
155 // createFollowListEvents creates kind 3 follow list events via WebSocket
156 func (n *NetworkGraphTraversalBenchmark) createFollowListEvents(ctx context.Context) {
157 fmt.Println("Creating follow list events via WebSocket...")
158 start := time.Now()
159
160 baseTime := time.Now().Unix()
161
162 var mu sync.Mutex
163 var wg sync.WaitGroup
164 var successCount, errorCount int64
165 latencies := make([]time.Duration, 0, GraphBenchNumPubkeys)
166
167 // Use worker pool for parallel event creation
168 numWorkers := n.workers
169 if numWorkers < 1 {
170 numWorkers = 4
171 }
172
173 workChan := make(chan int, numWorkers*2)
174
175 // Rate limiter: cap at 1000 events/second per relay (to avoid overwhelming)
176 perWorkerRate := 1000.0 / float64(numWorkers)
177
178 for w := 0; w < numWorkers; w++ {
179 wg.Add(1)
180 go func() {
181 defer wg.Done()
182
183 workerLimiter := NewRateLimiter(perWorkerRate)
184
185 for i := range workChan {
186 workerLimiter.Wait()
187
188 ev := event.New()
189 ev.Kind = kind.FollowList.K
190 ev.CreatedAt = baseTime + int64(i)
191 ev.Content = []byte("")
192 ev.Tags = tag.NewS()
193
194 // Add p tags for all follows
195 for _, followIdx := range n.follows[i] {
196 pubkeyHex := hex.Enc(n.pubkeys[followIdx])
197 ev.Tags.Append(tag.NewFromAny("p", pubkeyHex))
198 }
199
200 // Sign the event
201 if err := ev.Sign(n.signers[i]); err != nil {
202 mu.Lock()
203 errorCount++
204 mu.Unlock()
205 ev.Free()
206 continue
207 }
208
209 // Publish via WebSocket
210 eventStart := time.Now()
211 errCh := n.relay.Write(eventenvelope.NewSubmissionWith(ev).Marshal(nil))
212
213 // Wait for write to complete
214 select {
215 case err := <-errCh:
216 latency := time.Since(eventStart)
217 mu.Lock()
218 if err != nil {
219 errorCount++
220 } else {
221 successCount++
222 latencies = append(latencies, latency)
223 }
224 mu.Unlock()
225 case <-ctx.Done():
226 mu.Lock()
227 errorCount++
228 mu.Unlock()
229 }
230
231 ev.Free()
232 }
233 }()
234 }
235
236 // Send work
237 for i := 0; i < GraphBenchNumPubkeys; i++ {
238 workChan <- i
239 if (i+1)%10000 == 0 {
240 fmt.Printf(" Queued %d/%d follow list events...\n", i+1, GraphBenchNumPubkeys)
241 }
242 }
243 close(workChan)
244 wg.Wait()
245
246 duration := time.Since(start)
247 eventsPerSec := float64(successCount) / duration.Seconds()
248
249 // Calculate latency stats
250 var avgLatency, p90Latency, p95Latency, p99Latency time.Duration
251 if len(latencies) > 0 {
252 sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] })
253 avgLatency = calculateAvgLatency(latencies)
254 p90Latency = calculatePercentileLatency(latencies, 0.90)
255 p95Latency = calculatePercentileLatency(latencies, 0.95)
256 p99Latency = calculatePercentileLatency(latencies, 0.99)
257 }
258
259 fmt.Printf("Created %d follow list events in %v (%.2f events/sec, errors: %d)\n",
260 successCount, duration, eventsPerSec, errorCount)
261 fmt.Printf(" Avg latency: %v, P95: %v, P99: %v\n", avgLatency, p95Latency, p99Latency)
262
263 // Record result for event creation phase
264 result := &BenchmarkResult{
265 TestName: "Graph Setup (Follow Lists)",
266 Duration: duration,
267 TotalEvents: int(successCount),
268 EventsPerSecond: eventsPerSec,
269 AvgLatency: avgLatency,
270 P90Latency: p90Latency,
271 P95Latency: p95Latency,
272 P99Latency: p99Latency,
273 Bottom10Avg: calculateBottom10Avg(latencies),
274 ConcurrentWorkers: numWorkers,
275 MemoryUsed: getMemUsage(),
276 SuccessRate: float64(successCount) / float64(GraphBenchNumPubkeys) * 100,
277 }
278
279 n.mu.Lock()
280 n.results = append(n.results, result)
281 n.mu.Unlock()
282 }
283
284 // runThirdDegreeTraversal runs the third-degree graph traversal benchmark via WebSocket
285 func (n *NetworkGraphTraversalBenchmark) runThirdDegreeTraversal(ctx context.Context) {
286 fmt.Printf("\n=== Third-Degree Graph Traversal Benchmark (Network) ===\n")
287 fmt.Printf("Traversing 3 degrees of follows via WebSocket...\n")
288
289 start := time.Now()
290
291 var mu sync.Mutex
292 var wg sync.WaitGroup
293 var totalQueries int64
294 var totalPubkeysFound int64
295 queryLatencies := make([]time.Duration, 0, 10000)
296 traversalLatencies := make([]time.Duration, 0, 1000)
297
298 // Sample a subset for detailed traversal
299 sampleSize := 1000
300 if sampleSize > GraphBenchNumPubkeys {
301 sampleSize = GraphBenchNumPubkeys
302 }
303
304 // Deterministic sampling
305 n.initializeDeterministicRNG()
306 sampleIndices := make([]int, sampleSize)
307 for i := 0; i < sampleSize; i++ {
308 sampleIndices[i] = int(n.rng.Uint64n(uint64(GraphBenchNumPubkeys)))
309 }
310
311 fmt.Printf("Sampling %d pubkeys for traversal...\n", sampleSize)
312
313 numWorkers := n.workers
314 if numWorkers < 1 {
315 numWorkers = 4
316 }
317
318 workChan := make(chan int, numWorkers*2)
319
320 for w := 0; w < numWorkers; w++ {
321 wg.Add(1)
322 go func() {
323 defer wg.Done()
324
325 for startIdx := range workChan {
326 traversalStart := time.Now()
327 foundPubkeys := make(map[string]struct{})
328
329 // Start with the initial pubkey
330 currentLevel := [][]byte{n.pubkeys[startIdx]}
331 startPubkeyHex := hex.Enc(n.pubkeys[startIdx])
332 foundPubkeys[startPubkeyHex] = struct{}{}
333
334 // Traverse 3 degrees
335 for depth := 0; depth < GraphBenchTraversalDepth; depth++ {
336 if len(currentLevel) == 0 {
337 break
338 }
339
340 nextLevel := make([][]byte, 0)
341
342 // Query follow lists for all pubkeys at current level
343 // Batch queries for efficiency
344 batchSize := 50
345 for batchStart := 0; batchStart < len(currentLevel); batchStart += batchSize {
346 batchEnd := batchStart + batchSize
347 if batchEnd > len(currentLevel) {
348 batchEnd = len(currentLevel)
349 }
350
351 batch := currentLevel[batchStart:batchEnd]
352
353 // Build filter for kind 3 events from these pubkeys
354 f := filter.New()
355 f.Kinds = kind.NewS(kind.FollowList)
356 f.Authors = tag.NewWithCap(len(batch))
357 for _, pk := range batch {
358 f.Authors.T = append(f.Authors.T, pk)
359 }
360
361 queryStart := time.Now()
362
363 // Subscribe and collect results
364 sub, err := n.relay.Subscribe(ctx, filter.NewS(f))
365 if err != nil {
366 continue
367 }
368
369 // Collect events with timeout
370 timeout := time.After(5 * time.Second)
371 events := make([]*event.E, 0)
372 collectLoop:
373 for {
374 select {
375 case ev := <-sub.Events:
376 if ev != nil {
377 events = append(events, ev)
378 }
379 case <-sub.EndOfStoredEvents:
380 break collectLoop
381 case <-timeout:
382 break collectLoop
383 case <-ctx.Done():
384 break collectLoop
385 }
386 }
387 sub.Unsub()
388
389 queryLatency := time.Since(queryStart)
390
391 mu.Lock()
392 totalQueries++
393 queryLatencies = append(queryLatencies, queryLatency)
394 mu.Unlock()
395
396 // Extract followed pubkeys from p tags
397 for _, ev := range events {
398 for _, t := range *ev.Tags {
399 if len(t.T) >= 2 && string(t.T[0]) == "p" {
400 pubkeyHex := string(t.ValueHex())
401 if _, exists := foundPubkeys[pubkeyHex]; !exists {
402 foundPubkeys[pubkeyHex] = struct{}{}
403 // Decode hex to bytes for next level
404 if pkBytes, err := hex.Dec(pubkeyHex); err == nil {
405 nextLevel = append(nextLevel, pkBytes)
406 }
407 }
408 }
409 }
410 ev.Free()
411 }
412 }
413
414 currentLevel = nextLevel
415 }
416
417 traversalLatency := time.Since(traversalStart)
418
419 mu.Lock()
420 totalPubkeysFound += int64(len(foundPubkeys))
421 traversalLatencies = append(traversalLatencies, traversalLatency)
422 mu.Unlock()
423 }
424 }()
425 }
426
427 // Send work
428 for _, idx := range sampleIndices {
429 workChan <- idx
430 }
431 close(workChan)
432 wg.Wait()
433
434 duration := time.Since(start)
435
436 // Calculate statistics
437 var avgQueryLatency, p90QueryLatency, p95QueryLatency, p99QueryLatency time.Duration
438 if len(queryLatencies) > 0 {
439 sort.Slice(queryLatencies, func(i, j int) bool { return queryLatencies[i] < queryLatencies[j] })
440 avgQueryLatency = calculateAvgLatency(queryLatencies)
441 p90QueryLatency = calculatePercentileLatency(queryLatencies, 0.90)
442 p95QueryLatency = calculatePercentileLatency(queryLatencies, 0.95)
443 p99QueryLatency = calculatePercentileLatency(queryLatencies, 0.99)
444 }
445
446 var avgTraversalLatency, p90TraversalLatency, p95TraversalLatency, p99TraversalLatency time.Duration
447 if len(traversalLatencies) > 0 {
448 sort.Slice(traversalLatencies, func(i, j int) bool { return traversalLatencies[i] < traversalLatencies[j] })
449 avgTraversalLatency = calculateAvgLatency(traversalLatencies)
450 p90TraversalLatency = calculatePercentileLatency(traversalLatencies, 0.90)
451 p95TraversalLatency = calculatePercentileLatency(traversalLatencies, 0.95)
452 p99TraversalLatency = calculatePercentileLatency(traversalLatencies, 0.99)
453 }
454
455 avgPubkeysPerTraversal := float64(totalPubkeysFound) / float64(sampleSize)
456 traversalsPerSec := float64(sampleSize) / duration.Seconds()
457 queriesPerSec := float64(totalQueries) / duration.Seconds()
458
459 fmt.Printf("\n=== Graph Traversal Results (Network) ===\n")
460 fmt.Printf("Traversals completed: %d\n", sampleSize)
461 fmt.Printf("Total queries: %d (%.2f queries/sec)\n", totalQueries, queriesPerSec)
462 fmt.Printf("Avg pubkeys found per traversal: %.1f\n", avgPubkeysPerTraversal)
463 fmt.Printf("Total duration: %v\n", duration)
464 fmt.Printf("\nQuery Latencies:\n")
465 fmt.Printf(" Avg: %v, P95: %v, P99: %v\n", avgQueryLatency, p95QueryLatency, p99QueryLatency)
466 fmt.Printf("\nFull Traversal Latencies (3 degrees):\n")
467 fmt.Printf(" Avg: %v, P95: %v, P99: %v\n", avgTraversalLatency, p95TraversalLatency, p99TraversalLatency)
468 fmt.Printf("Traversals/sec: %.2f\n", traversalsPerSec)
469
470 // Record result for traversal phase
471 result := &BenchmarkResult{
472 TestName: "Graph Traversal (3 Degrees)",
473 Duration: duration,
474 TotalEvents: int(totalQueries),
475 EventsPerSecond: traversalsPerSec,
476 AvgLatency: avgTraversalLatency,
477 P90Latency: p90TraversalLatency,
478 P95Latency: p95TraversalLatency,
479 P99Latency: p99TraversalLatency,
480 Bottom10Avg: calculateBottom10Avg(traversalLatencies),
481 ConcurrentWorkers: numWorkers,
482 MemoryUsed: getMemUsage(),
483 SuccessRate: 100.0,
484 }
485
486 n.mu.Lock()
487 n.results = append(n.results, result)
488 n.mu.Unlock()
489
490 // Also record query performance separately
491 queryResult := &BenchmarkResult{
492 TestName: "Graph Queries (Follow Lists)",
493 Duration: duration,
494 TotalEvents: int(totalQueries),
495 EventsPerSecond: queriesPerSec,
496 AvgLatency: avgQueryLatency,
497 P90Latency: p90QueryLatency,
498 P95Latency: p95QueryLatency,
499 P99Latency: p99QueryLatency,
500 Bottom10Avg: calculateBottom10Avg(queryLatencies),
501 ConcurrentWorkers: numWorkers,
502 MemoryUsed: getMemUsage(),
503 SuccessRate: 100.0,
504 }
505
506 n.mu.Lock()
507 n.results = append(n.results, queryResult)
508 n.mu.Unlock()
509 }
510
511 // RunSuite runs the complete network graph traversal benchmark suite
512 func (n *NetworkGraphTraversalBenchmark) RunSuite(ctx context.Context) error {
513 fmt.Println("\n╔════════════════════════════════════════════════════════╗")
514 fmt.Println("║ NETWORK GRAPH TRAVERSAL BENCHMARK (100k Pubkeys) ║")
515 fmt.Printf("║ Relay: %-46s ║\n", n.relayURL)
516 fmt.Println("╚════════════════════════════════════════════════════════╝")
517
518 // Step 1: Generate pubkeys
519 n.generatePubkeys()
520
521 // Step 2: Generate follow graph
522 n.generateFollowGraph()
523
524 // Step 3: Connect to relay
525 fmt.Printf("\nConnecting to relay: %s\n", n.relayURL)
526 if err := n.Connect(ctx); err != nil {
527 return fmt.Errorf("failed to connect: %w", err)
528 }
529 defer n.Close()
530 fmt.Println("Connected successfully!")
531
532 // Step 4: Create follow list events via WebSocket
533 n.createFollowListEvents(ctx)
534
535 // Small delay to ensure events are processed
536 fmt.Println("\nWaiting for events to be processed...")
537 time.Sleep(5 * time.Second)
538
539 // Step 5: Run third-degree traversal benchmark
540 n.runThirdDegreeTraversal(ctx)
541
542 fmt.Printf("\n=== Network Graph Traversal Benchmark Complete ===\n\n")
543 return nil
544 }
545
546 // GetResults returns the benchmark results
547 func (n *NetworkGraphTraversalBenchmark) GetResults() []*BenchmarkResult {
548 n.mu.RLock()
549 defer n.mu.RUnlock()
550 return n.results
551 }
552
553 // PrintResults prints the benchmark results
554 func (n *NetworkGraphTraversalBenchmark) PrintResults() {
555 n.mu.RLock()
556 defer n.mu.RUnlock()
557
558 for _, result := range n.results {
559 fmt.Printf("\nTest: %s\n", result.TestName)
560 fmt.Printf("Duration: %v\n", result.Duration)
561 fmt.Printf("Total Events/Queries: %d\n", result.TotalEvents)
562 fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
563 fmt.Printf("Success Rate: %.1f%%\n", result.SuccessRate)
564 fmt.Printf("Concurrent Workers: %d\n", result.ConcurrentWorkers)
565 fmt.Printf("Memory Used: %d MB\n", result.MemoryUsed/(1024*1024))
566 fmt.Printf("Avg Latency: %v\n", result.AvgLatency)
567 fmt.Printf("P90 Latency: %v\n", result.P90Latency)
568 fmt.Printf("P95 Latency: %v\n", result.P95Latency)
569 fmt.Printf("P99 Latency: %v\n", result.P99Latency)
570 fmt.Printf("Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg)
571 }
572 }
573