main.go raw
1 package main
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "flag"
8 "fmt"
9 "math/rand"
10 "os"
11 "os/signal"
12 "runtime"
13 "strings"
14 "sync"
15 "sync/atomic"
16 "time"
17
18 "next.orly.dev/pkg/lol/log"
19 "next.orly.dev/pkg/nostr/interfaces/signer/p8k"
20 "next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
21 "next.orly.dev/pkg/nostr/encoders/event"
22 "next.orly.dev/pkg/nostr/encoders/event/examples"
23 "next.orly.dev/pkg/nostr/encoders/filter"
24 "next.orly.dev/pkg/nostr/encoders/hex"
25 "next.orly.dev/pkg/nostr/encoders/kind"
26 "next.orly.dev/pkg/nostr/encoders/tag"
27 "next.orly.dev/pkg/nostr/encoders/timestamp"
28 "next.orly.dev/pkg/nostr/ws"
29 )
30
31 // randomHex returns a hex-encoded string of n random bytes (2n hex chars)
32 func randomHex(n int) string {
33 b := make([]byte, n)
34 _, _ = rand.Read(b)
35 return hex.Enc(b)
36 }
37
38 func makeEvent(rng *rand.Rand, signer *p8k.Signer) (*event.E, error) {
39 ev := &event.E{
40 CreatedAt: time.Now().Unix(),
41 Kind: kind.TextNote.K,
42 Tags: tag.NewS(),
43 Content: []byte(fmt.Sprintf("stresstest %d", rng.Int63())),
44 }
45
46 // Random number of p-tags up to 100
47 nPTags := rng.Intn(101) // 0..100 inclusive
48 for i := 0; i < nPTags; i++ {
49 // random 32-byte pubkey in hex (64 chars)
50 phex := randomHex(32)
51 ev.Tags.Append(tag.NewFromAny("p", phex))
52 }
53
54 // Sign and verify to ensure pubkey, id and signature are coherent
55 if err := ev.Sign(signer); err != nil {
56 return nil, err
57 }
58 if ok, err := ev.Verify(); err != nil || !ok {
59 return nil, fmt.Errorf("event signature verification failed: %v", err)
60 }
61 return ev, nil
62 }
63
64 type RelayConn struct {
65 mu sync.RWMutex
66 client *ws.Client
67 url string
68 }
69
70 type CacheIndex struct {
71 events []*event.E
72 ids [][]byte
73 authors [][]byte
74 times []int64
75 tags map[byte][][]byte // single-letter tag -> list of values
76 }
77
78 func (rc *RelayConn) Get() *ws.Client {
79 rc.mu.RLock()
80 defer rc.mu.RUnlock()
81 return rc.client
82 }
83
84 func (rc *RelayConn) Reconnect(ctx context.Context) error {
85 rc.mu.Lock()
86 defer rc.mu.Unlock()
87 if rc.client != nil {
88 _ = rc.client.Close()
89 }
90 c, err := ws.RelayConnect(ctx, rc.url)
91 if err != nil {
92 return err
93 }
94 rc.client = c
95 return nil
96 }
97
98 // loadCacheAndIndex parses examples.Cache (JSONL of events) and builds an index
99 func loadCacheAndIndex() (*CacheIndex, error) {
100 scanner := bufio.NewScanner(bytes.NewReader(examples.Cache))
101 idx := &CacheIndex{tags: make(map[byte][][]byte)}
102 for scanner.Scan() {
103 line := scanner.Bytes()
104 if len(bytes.TrimSpace(line)) == 0 {
105 continue
106 }
107 ev := event.New()
108 rem, err := ev.Unmarshal(line)
109 _ = rem
110 if err != nil {
111 // skip malformed lines
112 continue
113 }
114 idx.events = append(idx.events, ev)
115 // collect fields
116 if len(ev.ID) > 0 {
117 idx.ids = append(idx.ids, append([]byte(nil), ev.ID...))
118 }
119 if len(ev.Pubkey) > 0 {
120 idx.authors = append(idx.authors, append([]byte(nil), ev.Pubkey...))
121 }
122 idx.times = append(idx.times, ev.CreatedAt)
123 if ev.Tags != nil {
124 for _, tg := range *ev.Tags {
125 if tg == nil || tg.Len() < 2 {
126 continue
127 }
128 k := tg.Key()
129 if len(k) != 1 {
130 continue // only single-letter keys per requirement
131 }
132 key := k[0]
133 for _, v := range tg.T[1:] {
134 idx.tags[key] = append(
135 idx.tags[key], append([]byte(nil), v...),
136 )
137 }
138 }
139 }
140 }
141 return idx, nil
142 }
143
144 // publishCacheEvents uploads all cache events to the relay using multiple concurrent connections
145 func publishCacheEvents(
146 ctx context.Context, relayURL string, idx *CacheIndex,
147 ) (sentCount int) {
148 numWorkers := runtime.NumCPU()
149 log.I.F("using %d concurrent connections for cache upload", numWorkers)
150
151 // Channel to distribute events to workers
152 eventChan := make(chan *event.E, len(idx.events))
153 var totalSent atomic.Int64
154
155 // Fill the event channel
156 for _, ev := range idx.events {
157 eventChan <- ev
158 }
159 close(eventChan)
160
161 // Start worker goroutines
162 var wg sync.WaitGroup
163 for i := 0; i < numWorkers; i++ {
164 wg.Add(1)
165 go func(workerID int) {
166 defer wg.Done()
167
168 // Create separate connection for this worker
169 client, err := ws.RelayConnect(ctx, relayURL)
170 if err != nil {
171 log.E.F("worker %d: failed to connect: %v", workerID, err)
172 return
173 }
174 defer client.Close()
175
176 rc := &RelayConn{client: client, url: relayURL}
177 workerSent := 0
178
179 // Process events from the channel
180 for ev := range eventChan {
181 select {
182 case <-ctx.Done():
183 return
184 default:
185 }
186
187 // Get client connection
188 wsClient := rc.Get()
189 if wsClient == nil {
190 if err := rc.Reconnect(ctx); err != nil {
191 log.E.F("worker %d: reconnect failed: %v", workerID, err)
192 continue
193 }
194 wsClient = rc.Get()
195 }
196
197 // Send event without waiting for OK response (fire-and-forget)
198 envelope := eventenvelope.NewSubmissionWith(ev)
199 envBytes := envelope.Marshal(nil)
200 if err := <-wsClient.Write(envBytes); err != nil {
201 log.E.F("worker %d: write error: %v", workerID, err)
202 errStr := err.Error()
203 if strings.Contains(errStr, "connection closed") {
204 _ = rc.Reconnect(ctx)
205 }
206 time.Sleep(50 * time.Millisecond)
207 continue
208 }
209
210 workerSent++
211 totalSent.Add(1)
212 log.T.F("worker %d: sent event %d (total: %d)", workerID, workerSent, totalSent.Load())
213
214 // Small delay to prevent overwhelming the relay
215 select {
216 case <-time.After(10 * time.Millisecond):
217 case <-ctx.Done():
218 return
219 }
220 }
221
222 log.I.F("worker %d: completed, sent %d events", workerID, workerSent)
223 }(i)
224 }
225
226 // Wait for all workers to complete
227 wg.Wait()
228
229 return int(totalSent.Load())
230 }
231
232 // buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value.
233 func buildRandomFilter(idx *CacheIndex, rng *rand.Rand, mask int) *filter.F {
234 // pick a random base event as anchor for fields
235 i := rng.Intn(len(idx.events))
236 ev := idx.events[i]
237 f := filter.New()
238 // clear defaults we don't set
239 f.Kinds = kind.NewS() // we don't constrain kinds
240 // include fields based on mask bits: 1=id, 2=author, 4=timestamp, 8=tag
241 if mask&1 != 0 {
242 f.Ids.T = append(f.Ids.T, append([]byte(nil), ev.ID...))
243 }
244 if mask&2 != 0 {
245 f.Authors.T = append(f.Authors.T, append([]byte(nil), ev.Pubkey...))
246 }
247 if mask&4 != 0 {
248 // use a tight window around the event timestamp (exact match)
249 f.Since = timestamp.FromUnix(ev.CreatedAt)
250 f.Until = timestamp.FromUnix(ev.CreatedAt)
251 }
252 if mask&8 != 0 {
253 // choose a random single-letter tag from this event if present; fallback to global index
254 var key byte
255 var val []byte
256 chosen := false
257 if ev.Tags != nil {
258 for _, tg := range *ev.Tags {
259 if tg == nil || tg.Len() < 2 {
260 continue
261 }
262 k := tg.Key()
263 if len(k) == 1 {
264 key = k[0]
265 vv := tg.T[1:]
266 val = vv[rng.Intn(len(vv))]
267 chosen = true
268 break
269 }
270 }
271 }
272 if !chosen && len(idx.tags) > 0 {
273 // pick a random entry from global tags map
274 keys := make([]byte, 0, len(idx.tags))
275 for k := range idx.tags {
276 keys = append(keys, k)
277 }
278 key = keys[rng.Intn(len(keys))]
279 vals := idx.tags[key]
280 val = vals[rng.Intn(len(vals))]
281 }
282 if key != 0 && len(val) > 0 {
283 f.Tags.Append(tag.NewFromBytesSlice([]byte{key}, val))
284 }
285 }
286 return f
287 }
288
289 func publisherWorker(
290 ctx context.Context, rc *RelayConn, id int, stats *uint64,
291 ) {
292 // Unique RNG per worker
293 src := rand.NewSource(time.Now().UnixNano() ^ int64(id<<16))
294 rng := rand.New(src)
295 // Generate and reuse signing key per worker
296 var signer *p8k.Signer
297 var err error
298 if signer, err = p8k.New(); err != nil {
299 log.E.F("worker %d: signer create error: %v", id, err)
300 return
301 }
302 if err := signer.Generate(); err != nil {
303 log.E.F("worker %d: signer generate error: %v", id, err)
304 return
305 }
306
307 for {
308 select {
309 case <-ctx.Done():
310 return
311 default:
312 }
313
314 ev, err := makeEvent(rng, signer)
315 if err != nil {
316 log.E.F("worker %d: makeEvent error: %v", id, err)
317 return
318 }
319
320 // Send event without waiting for OK response (fire-and-forget)
321 client := rc.Get()
322 if client == nil {
323 _ = rc.Reconnect(ctx)
324 continue
325 }
326 // Create EVENT envelope and send directly without waiting for OK
327 envelope := eventenvelope.NewSubmissionWith(ev)
328 envBytes := envelope.Marshal(nil)
329 if err := <-client.Write(envBytes); err != nil {
330 log.E.F("worker %d: write error: %v", id, err)
331 errStr := err.Error()
332 if strings.Contains(errStr, "connection closed") {
333 for attempt := 0; attempt < 5; attempt++ {
334 if ctx.Err() != nil {
335 return
336 }
337 if err := rc.Reconnect(ctx); err == nil {
338 log.I.F("worker %d: reconnected to %s", id, rc.url)
339 break
340 }
341 select {
342 case <-time.After(200 * time.Millisecond):
343 case <-ctx.Done():
344 return
345 }
346 }
347 }
348 // back off briefly on error to avoid tight loop if relay misbehaves
349 select {
350 case <-time.After(100 * time.Millisecond):
351 case <-ctx.Done():
352 return
353 }
354 continue
355 }
356
357 atomic.AddUint64(stats, 1)
358
359 // Randomly fluctuate pacing: small random sleep 0..50ms plus occasional longer jitter
360 sleep := time.Duration(rng.Intn(50)) * time.Millisecond
361 if rng.Intn(10) == 0 { // 10% chance add extra 100..400ms
362 sleep += time.Duration(100+rng.Intn(300)) * time.Millisecond
363 }
364 select {
365 case <-time.After(sleep):
366 case <-ctx.Done():
367 return
368 }
369 }
370 }
371
372 func queryWorker(
373 ctx context.Context, rc *RelayConn, idx *CacheIndex, id int,
374 queries, results *uint64, subTimeout time.Duration,
375 minInterval, maxInterval time.Duration,
376 ) {
377 rng := rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(id<<24)))
378 mask := 1
379 for {
380 select {
381 case <-ctx.Done():
382 return
383 default:
384 }
385 if len(idx.events) == 0 {
386 time.Sleep(200 * time.Millisecond)
387 continue
388 }
389 f := buildRandomFilter(idx, rng, mask)
390 mask++
391 if mask > 15 { // all combinations of 4 criteria (excluding 0)
392 mask = 1
393 }
394 client := rc.Get()
395 if client == nil {
396 _ = rc.Reconnect(ctx)
397 continue
398 }
399 ff := filter.S{f}
400 sCtx, cancel := context.WithTimeout(ctx, subTimeout)
401 sub, err := client.Subscribe(
402 sCtx, &ff, ws.WithLabel("stresstest-query"),
403 )
404 if err != nil {
405 cancel()
406 // reconnect on connection issues
407 errStr := err.Error()
408 if strings.Contains(errStr, "connection closed") {
409 _ = rc.Reconnect(ctx)
410 }
411 continue
412 }
413 atomic.AddUint64(queries, 1)
414 // read until EOSE or timeout
415 innerDone := false
416 for !innerDone {
417 select {
418 case <-sCtx.Done():
419 innerDone = true
420 case <-sub.EndOfStoredEvents:
421 innerDone = true
422 case ev, ok := <-sub.Events:
423 if !ok {
424 innerDone = true
425 break
426 }
427 if ev != nil {
428 atomic.AddUint64(results, 1)
429 }
430 }
431 }
432 sub.Unsub()
433 cancel()
434 // wait a random interval between queries
435 interval := minInterval
436 if maxInterval > minInterval {
437 delta := rng.Int63n(int64(maxInterval - minInterval))
438 interval += time.Duration(delta)
439 }
440 select {
441 case <-time.After(interval):
442 case <-ctx.Done():
443 return
444 }
445 }
446 }
447
448 func startReader(ctx context.Context, rl *ws.Client, received *uint64) error {
449 // Broad filter: subscribe to text notes since now-5m to catch our own writes
450 f := filter.New()
451 f.Kinds = kind.NewS(kind.TextNote)
452 // We don't set authors to ensure we read all text notes coming in
453 ff := filter.S{f}
454 sub, err := rl.Subscribe(ctx, &ff, ws.WithLabel("stresstest-reader"))
455 if err != nil {
456 return err
457 }
458
459 go func() {
460 for {
461 select {
462 case <-ctx.Done():
463 return
464 case ev, ok := <-sub.Events:
465 if !ok {
466 return
467 }
468 if ev != nil {
469 atomic.AddUint64(received, 1)
470 }
471 }
472 }
473 }()
474
475 return nil
476 }
477
478 func main() {
479 var (
480 address string
481 port int
482 workers int
483 duration time.Duration
484 publishTimeout time.Duration
485 queryWorkers int
486 queryTimeout time.Duration
487 queryMinInt time.Duration
488 queryMaxInt time.Duration
489 skipCache bool
490 )
491
492 flag.StringVar(
493 &address, "address", "localhost", "relay address (host or IP)",
494 )
495 flag.IntVar(&port, "port", 3334, "relay port")
496 flag.IntVar(
497 &workers, "workers", 8, "number of concurrent publisher workers",
498 )
499 flag.DurationVar(
500 &duration, "duration", 60*time.Second,
501 "how long to run the stress test",
502 )
503 flag.DurationVar(
504 &publishTimeout, "publish-timeout", 15*time.Second,
505 "timeout waiting for OK per publish",
506 )
507 flag.IntVar(
508 &queryWorkers, "query-workers", 4, "number of concurrent query workers",
509 )
510 flag.DurationVar(
511 &queryTimeout, "query-timeout", 3*time.Second,
512 "subscription timeout for queries",
513 )
514 flag.DurationVar(
515 &queryMinInt, "query-min-interval", 50*time.Millisecond,
516 "minimum interval between queries per worker",
517 )
518 flag.DurationVar(
519 &queryMaxInt, "query-max-interval", 300*time.Millisecond,
520 "maximum interval between queries per worker",
521 )
522 flag.BoolVar(
523 &skipCache, "skip-cache", false,
524 "skip uploading examples.Cache before running",
525 )
526 flag.Parse()
527
528 relayURL := fmt.Sprintf("ws://%s:%d", address, port)
529 log.I.F("stresstest: connecting to %s", relayURL)
530
531 ctx, cancel := context.WithCancel(context.Background())
532 defer cancel()
533
534 // Handle Ctrl+C
535 sigc := make(chan os.Signal, 1)
536 signal.Notify(sigc, os.Interrupt)
537 go func() {
538 select {
539 case <-sigc:
540 log.I.Ln("interrupt received, shutting down...")
541 cancel()
542 case <-ctx.Done():
543 }
544 }()
545
546 rl, err := ws.RelayConnect(ctx, relayURL)
547 if err != nil {
548 log.E.F("failed to connect to relay %s: %v", relayURL, err)
549 os.Exit(1)
550 }
551 defer rl.Close()
552
553 rc := &RelayConn{client: rl, url: relayURL}
554
555 // Load and publish cache events first (unless skipped)
556 idx, err := loadCacheAndIndex()
557 if err != nil {
558 log.E.F("failed to load examples.Cache: %v", err)
559 }
560 cacheSent := 0
561 if !skipCache && idx != nil && len(idx.events) > 0 {
562 log.I.F("sending %d events from examples.Cache...", len(idx.events))
563 cacheSent = publishCacheEvents(ctx, relayURL, idx)
564 log.I.F("sent %d/%d cache events", cacheSent, len(idx.events))
565 }
566
567 var pubOK uint64
568 var recvCount uint64
569 var qCount uint64
570 var qResults uint64
571
572 if err := startReader(ctx, rl, &recvCount); err != nil {
573 log.E.F("reader subscribe error: %v", err)
574 // continue anyway, we can still write
575 }
576
577 wg := sync.WaitGroup{}
578 // Start publisher workers
579 wg.Add(workers)
580 for i := 0; i < workers; i++ {
581 i := i
582 go func() {
583 defer wg.Done()
584 publisherWorker(ctx, rc, i, &pubOK)
585 }()
586 }
587 // Start query workers
588 if idx != nil && len(idx.events) > 0 && queryWorkers > 0 {
589 wg.Add(queryWorkers)
590 for i := 0; i < queryWorkers; i++ {
591 i := i
592 go func() {
593 defer wg.Done()
594 queryWorker(
595 ctx, rc, idx, i, &qCount, &qResults, queryTimeout,
596 queryMinInt, queryMaxInt,
597 )
598 }()
599 }
600 }
601
602 // Timer for duration and periodic stats
603 ticker := time.NewTicker(2 * time.Second)
604 defer ticker.Stop()
605 end := time.NewTimer(duration)
606 start := time.Now()
607
608 loop:
609 for {
610 select {
611 case <-ticker.C:
612 elapsed := time.Since(start).Seconds()
613 p := atomic.LoadUint64(&pubOK)
614 r := atomic.LoadUint64(&recvCount)
615 qc := atomic.LoadUint64(&qCount)
616 qr := atomic.LoadUint64(&qResults)
617 log.I.F(
618 "elapsed=%.1fs sent=%d (%.0f/s) received=%d cache_sent=%d queries=%d results=%d",
619 elapsed, p, float64(p)/elapsed, r, cacheSent, qc, qr,
620 )
621 case <-end.C:
622 break loop
623 case <-ctx.Done():
624 break loop
625 }
626 }
627
628 cancel()
629 wg.Wait()
630 p := atomic.LoadUint64(&pubOK)
631 r := atomic.LoadUint64(&recvCount)
632 qc := atomic.LoadUint64(&qCount)
633 qr := atomic.LoadUint64(&qResults)
634 log.I.F(
635 "stresstest complete: cache_sent=%d sent=%d received=%d queries=%d results=%d duration=%s",
636 cacheSent, p, r, qc, qr,
637 time.Since(start).Truncate(time.Millisecond),
638 )
639 }
640