engine.mx raw
1 // Package store provides the Nostr event storage engine.
2 // It uses an append-only WAL for event data and sorted flat files
3 // for indexes. Single cooperative thread — no locks.
4 package store
5
6 import (
7 "bytes"
8 "fmt"
9 "math"
10 "os"
11 "path/filepath"
12 "sort"
13
14 "smesh.lol/pkg/nostr/event"
15 "smesh.lol/pkg/nostr/filter"
16 "smesh.lol/pkg/nostr/hex"
17 "smesh.lol/pkg/store/index"
18 "smesh.lol/pkg/store/serial"
19 "smesh.lol/pkg/store/sorted"
20 "smesh.lol/pkg/store/wal"
21 )
22
23 // Engine is the main storage engine.
24 type Engine struct {
25 dir string
26 w *wal.WAL
27
28 nextPubSer uint64
29
30 eid *sorted.File
31 fpc *sorted.File
32 sei *sorted.File
33 ca *sorted.File
34 exp *sorted.File
35 kc *sorted.File
36 pc *sorted.File
37 kpc *sorted.File
38 tc *sorted.File
39 tkc *sorted.File
40 tpc *sorted.File
41 tkp *sorted.File
42 wrd *sorted.File
43 pks *sorted.File
44 spk *sorted.File
45
46 // Graph indexes.
47 epg *sorted.File
48 peg *sorted.File
49 eeg *sorted.File
50 gee *sorted.File
51 ppg *sorted.File
52 gpp *sorted.File
53 }
54
55 // Open opens or creates a store at dir.
56 func Open(dir string) (*Engine, error) {
57 idxDir := filepath.Join(dir, "idx")
58 if err := os.MkdirAll(idxDir, 0755); err != nil {
59 return nil, err
60 }
61 w, err := wal.Open(filepath.Join(dir, "wal"))
62 if err != nil {
63 return nil, err
64 }
65 e := &Engine{dir: dir, w: w, nextPubSer: 1}
66
67 open := func(name string, recLen, cmpLen int) (*sorted.File, error) {
68 return sorted.Open(filepath.Join(idxDir, name+".dat"), recLen, cmpLen)
69 }
70 if e.eid, err = open("eid", index.EidKeyLen, index.EidKeyLen); err != nil {
71 return nil, err
72 }
73 if e.fpc, err = open("fpc", index.FpcKeyLen, index.FpcCmpLen); err != nil {
74 return nil, err
75 }
76 if e.sei, err = open("sei", index.SeiRecLen, index.SeiKeyLen); err != nil {
77 return nil, err
78 }
79 if e.ca, err = open("ca", index.CAKeyLen, index.CAKeyLen); err != nil {
80 return nil, err
81 }
82 if e.exp, err = open("exp", index.ExpKeyLen, index.ExpKeyLen); err != nil {
83 return nil, err
84 }
85 if e.kc, err = open("kc", index.KCKeyLen, index.KCKeyLen); err != nil {
86 return nil, err
87 }
88 if e.pc, err = open("pc", index.PCKeyLen, index.PCKeyLen); err != nil {
89 return nil, err
90 }
91 if e.kpc, err = open("kpc", index.KPCKeyLen, index.KPCKeyLen); err != nil {
92 return nil, err
93 }
94 if e.tc, err = open("tc", index.TCKeyLen, index.TCKeyLen); err != nil {
95 return nil, err
96 }
97 if e.tkc, err = open("tkc", index.TKCKeyLen, index.TKCKeyLen); err != nil {
98 return nil, err
99 }
100 if e.tpc, err = open("tpc", index.TPCKeyLen, index.TPCKeyLen); err != nil {
101 return nil, err
102 }
103 if e.tkp, err = open("tkp", index.TKPKeyLen, index.TKPKeyLen); err != nil {
104 return nil, err
105 }
106 if e.wrd, err = open("wrd", index.WrdKeyLen, index.WrdKeyLen); err != nil {
107 return nil, err
108 }
109 if e.pks, err = open("pks", index.PksKeyLen, index.PksKeyLen); err != nil {
110 return nil, err
111 }
112 if e.spk, err = open("spk", index.SpkRecLen, index.SpkKeyLen); err != nil {
113 return nil, err
114 }
115 if e.epg, err = open("epg", index.EpgKeyLen, index.EpgKeyLen); err != nil {
116 return nil, err
117 }
118 if e.peg, err = open("peg", index.PegKeyLen, index.PegKeyLen); err != nil {
119 return nil, err
120 }
121 if e.eeg, err = open("eeg", index.EegKeyLen, index.EegKeyLen); err != nil {
122 return nil, err
123 }
124 if e.gee, err = open("gee", index.GeeKeyLen, index.GeeKeyLen); err != nil {
125 return nil, err
126 }
127 if e.ppg, err = open("ppg", index.PpgKeyLen, index.PpgKeyLen); err != nil {
128 return nil, err
129 }
130 if e.gpp, err = open("gpp", index.GppKeyLen, index.GppKeyLen); err != nil {
131 return nil, err
132 }
133
134 // Check for clean shutdown. If marker is missing, recover indexes from WAL.
135 cleanPath := filepath.Join(dir, ".clean")
136 if _, err := os.Stat(cleanPath); os.IsNotExist(err) {
137 fmt.Println("store: clean marker missing, rebuilding indexes from WAL...")
138 if err := e.rebuildIndexes(); err != nil {
139 return nil, fmt.Errorf("store: recovery failed: %w", err)
140 }
141 if err := e.Flush(); err != nil {
142 return nil, err
143 }
144 e.writeCleanMarker()
145 }
146
147 // Recover next pubkey serial from the spk index.
148 if rec, ok := e.spk.Last(); ok {
149 e.nextPubSer = serial.Get(rec[3:]) + 1
150 }
151 return e, nil
152 }
153
154 // Close flushes all indexes and closes the WAL.
155 func (e *Engine) Close() error {
156 files := e.allFiles()
157 for _, f := range files {
158 if err := f.Close(); err != nil {
159 return err
160 }
161 }
162 return e.w.Close()
163 }
164
165 // Flush writes all buffered index data to disk and syncs the WAL.
166 func (e *Engine) Flush() error {
167 for _, f := range e.allFiles() {
168 if err := f.Flush(); err != nil {
169 return err
170 }
171 }
172 if err := e.w.Sync(); err != nil {
173 return err
174 }
175 e.writeCleanMarker()
176 return nil
177 }
178
179 func (e *Engine) writeCleanMarker() {
180 os.WriteFile(filepath.Join(e.dir, ".clean"), []byte("ok"), 0644)
181 }
182
183 func (e *Engine) removeCleanMarker() {
184 os.Remove(filepath.Join(e.dir, ".clean"))
185 }
186
187 // rebuildIndexes clears all indexes and rebuilds from WAL.
188 func (e *Engine) rebuildIndexes() error {
189 for _, f := range e.allFiles() {
190 if err := f.Clear(); err != nil {
191 return err
192 }
193 }
194 e.nextPubSer = 1
195
196 count := 0
197 err := e.w.ForEach(func(ser uint64, data []byte) bool {
198 ev := event.New()
199 if err := ev.UnmarshalBinary(bytes.NewReader(data)); err != nil {
200 return true // skip corrupt entries
201 }
202
203 pubHash := serial.PubHash(ev.Pubkey)
204 idHash := serial.IdHash(ev.ID)
205
206 e.eid.Put(index.MakeEid(idHash, ser))
207 e.sei.Put(index.MakeSeiRec(ser, ev.ID))
208 e.fpc.Put(index.MakeFpc(ser, ev.ID, pubHash, ev.CreatedAt))
209 e.ca.Put(index.MakeCA(ev.CreatedAt, ser))
210 e.kc.Put(index.MakeKC(ev.Kind, ev.CreatedAt, ser))
211 e.pc.Put(index.MakePC(pubHash, ev.CreatedAt, ser))
212 e.kpc.Put(index.MakeKPC(ev.Kind, pubHash, ev.CreatedAt, ser))
213
214 if ev.Tags != nil {
215 for _, tg := range *ev.Tags {
216 if tg.Len() < 2 {
217 continue
218 }
219 key := tg.Key()
220 if len(key) != 1 {
221 continue
222 }
223 tagKey := key[0]
224 val := tg.ValueHex()
225 if len(val) == 0 {
226 continue
227 }
228 valHash := serial.Ident(val)
229 e.tc.Put(index.MakeTC(tagKey, valHash, ev.CreatedAt, ser))
230 e.tkc.Put(index.MakeTKC(ev.Kind, tagKey, valHash, ev.CreatedAt, ser))
231 e.tpc.Put(index.MakeTPC(pubHash, tagKey, valHash, ev.CreatedAt, ser))
232 e.tkp.Put(index.MakeTKP(ev.Kind, pubHash, tagKey, valHash, ev.CreatedAt, ser))
233 }
234 }
235
236 if len(ev.Content) > 0 {
237 words := splitWords(ev.Content)
238 seen := map[string]bool{}
239 for _, w := range words {
240 if seen[string(w)] {
241 continue
242 }
243 seen[string(w)] = true
244 e.wrd.Put(index.MakeWrd(serial.Ident(w), ser))
245 }
246 }
247
248 e.populateGraphEdges(ev, ser)
249 count++
250 return true
251 })
252 if err != nil {
253 return err
254 }
255 fmt.Println("store: rebuilt indexes for", count, "events")
256 return nil
257 }
258
259 func (e *Engine) allFiles() []*sorted.File {
260 return []*sorted.File{
261 e.eid, e.fpc, e.sei, e.ca, e.exp, e.kc, e.pc, e.kpc,
262 e.tc, e.tkc, e.tpc, e.tkp, e.wrd, e.pks, e.spk,
263 e.epg, e.peg, e.eeg, e.gee, e.ppg, e.gpp,
264 }
265 }
266
267 // SaveEvent persists an event and all its indexes.
268 func (e *Engine) SaveEvent(ev *event.E) error {
269 e.removeCleanMarker()
270
271 // Duplicate check via ID hash.
272 idHash := serial.IdHash(ev.ID)
273 if e.hasEvent(idHash, ev.ID) {
274 return fmt.Errorf("duplicate event")
275 }
276
277 // Write event bytes to WAL → serial.
278 data := ev.MarshalBinaryToBytes(nil)
279 ser, err := e.w.Append(data)
280 if err != nil {
281 return err
282 }
283
284 pubHash := serial.PubHash(ev.Pubkey)
285
286 // Core mappings.
287 e.eid.Put(index.MakeEid(idHash, ser))
288 e.sei.Put(index.MakeSeiRec(ser, ev.ID))
289 e.fpc.Put(index.MakeFpc(ser, ev.ID, pubHash, ev.CreatedAt))
290
291 // Query indexes.
292 e.ca.Put(index.MakeCA(ev.CreatedAt, ser))
293 e.kc.Put(index.MakeKC(ev.Kind, ev.CreatedAt, ser))
294 e.pc.Put(index.MakePC(pubHash, ev.CreatedAt, ser))
295 e.kpc.Put(index.MakeKPC(ev.Kind, pubHash, ev.CreatedAt, ser))
296
297 // Tag indexes.
298 if ev.Tags != nil {
299 for _, tg := range *ev.Tags {
300 if tg.Len() < 2 {
301 continue
302 }
303 key := tg.Key()
304 if len(key) != 1 {
305 continue
306 }
307 tagKey := key[0]
308 val := tg.ValueHex()
309 if len(val) == 0 {
310 continue
311 }
312 valHash := serial.Ident(val)
313 e.tc.Put(index.MakeTC(tagKey, valHash, ev.CreatedAt, ser))
314 e.tkc.Put(index.MakeTKC(ev.Kind, tagKey, valHash, ev.CreatedAt, ser))
315 e.tpc.Put(index.MakeTPC(pubHash, tagKey, valHash, ev.CreatedAt, ser))
316 e.tkp.Put(index.MakeTKP(ev.Kind, pubHash, tagKey, valHash, ev.CreatedAt, ser))
317 }
318 }
319
320 // Word index for search.
321 if len(ev.Content) > 0 {
322 words := splitWords(ev.Content)
323 seen := map[string]bool{}
324 for _, w := range words {
325 if seen[string(w)] {
326 continue
327 }
328 seen[string(w)] = true
329 e.wrd.Put(index.MakeWrd(serial.Ident(w), ser))
330 }
331 }
332
333 // Pubkey serial mapping + graph edges.
334 e.populateGraphEdges(ev, ser)
335 return nil
336 }
337
338 // populateGraphEdges creates all graph index entries for an event.
339 func (e *Engine) populateGraphEdges(ev *event.E, ser uint64) {
340 authorSer := e.getOrCreatePubkeySerial(ev.Pubkey)
341 kind := ev.Kind
342
343 // epg/peg: author edge.
344 e.epg.Put(index.MakeEpg(ser, authorSer, kind, index.DirAuthor))
345 e.peg.Put(index.MakePeg(authorSer, kind, index.DirAuthor, ser))
346
347 if ev.Tags == nil {
348 return
349 }
350
351 for _, tg := range *ev.Tags {
352 if tg.Len() < 2 {
353 continue
354 }
355 key := tg.Key()
356 if len(key) != 1 {
357 continue
358 }
359
360 switch key[0] {
361 case 'p':
362 valHex := tg.ValueHex()
363 if len(valHex) != 64 {
364 continue
365 }
366 pkBytes, err := hex.Dec(string(valHex))
367 if err != nil || len(pkBytes) != 32 {
368 continue
369 }
370 pkSer := e.getOrCreatePubkeySerial(pkBytes)
371
372 // epg/peg: p-tag edges.
373 e.epg.Put(index.MakeEpg(ser, pkSer, kind, index.DirPTagOut))
374 e.peg.Put(index.MakePeg(pkSer, kind, index.DirPTagIn, ser))
375
376 // ppg/gpp: direct pubkey-to-pubkey edge (skip self-reference).
377 if pkSer != authorSer {
378 e.ppg.Put(index.MakePpg(authorSer, pkSer, kind, index.DirPKOut, ser))
379 e.gpp.Put(index.MakeGpp(pkSer, kind, index.DirPKIn, authorSer, ser))
380 }
381
382 case 'e':
383 valHex := tg.ValueHex()
384 if len(valHex) != 64 {
385 continue
386 }
387 evBytes, err := hex.Dec(string(valHex))
388 if err != nil || len(evBytes) != 32 {
389 continue
390 }
391 // Look up target event serial — only create edge if target exists.
392 tgtSer, ok := e.getEventSerial(evBytes)
393 if !ok {
394 continue
395 }
396 // eeg/gee: e-tag edges.
397 e.eeg.Put(index.MakeEeg(ser, tgtSer, kind, index.DirETagOut))
398 e.gee.Put(index.MakeGee(tgtSer, kind, index.DirETagIn, ser))
399 }
400 }
401 }
402
403 // getEventSerial returns the WAL serial for a 32-byte event ID, if it exists.
404 func (e *Engine) getEventSerial(id []byte) (uint64, bool) {
405 idHash := serial.IdHash(id)
406 start := index.MakeEid(idHash, 0)
407 end := index.MakeEid(idHash, serial.Max)
408 var found uint64
409 var ok bool
410 e.eid.Scan(start, end, func(rec []byte) bool {
411 s := serial.Get(rec[3+serial.HashLen:])
412 if eid, exists := e.getEventIDBySerial(s); exists {
413 if bytes.Equal(eid, id) {
414 found = s
415 ok = true
416 return false
417 }
418 }
419 return true
420 })
421 return found, ok
422 }
423
424 // SearchWord returns serials of events containing the given word.
425 func (e *Engine) SearchWord(word []byte) []uint64 {
426 wh := serial.Ident(word)
427 return e.collectSerials(e.wrd,
428 index.MakeWrd(wh, 0),
429 index.MakeWrd(wh, serial.Max),
430 index.WrdKeyLen)
431 }
432
433 func splitWords(content []byte) [][]byte {
434 var words [][]byte
435 var word []byte
436 for _, b := range content {
437 if b >= 'A' && b <= 'Z' {
438 word = append(word, b+32)
439 } else if (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9') {
440 word = append(word, b)
441 } else {
442 if len(word) >= 3 {
443 w := []byte{:len(word)}
444 copy(w, word)
445 words = append(words, w)
446 }
447 word = word[:0]
448 }
449 }
450 if len(word) >= 3 {
451 w := []byte{:len(word)}
452 copy(w, word)
453 words = append(words, w)
454 }
455 return words
456 }
457
458 // GetBySerial reads an event from the WAL by its serial.
459 func (e *Engine) GetBySerial(ser uint64) (*event.E, error) {
460 data, err := e.w.Read(ser)
461 if err != nil {
462 return nil, err
463 }
464 ev := event.New()
465 if err := ev.UnmarshalBinary(bytes.NewReader(data)); err != nil {
466 return nil, err
467 }
468 return ev, nil
469 }
470
471 // QueryEvents returns events matching the filter, sorted newest-first.
472 func (e *Engine) QueryEvents(f *filter.F) ([]*event.E, error) {
473 if f.Ids != nil && f.Ids.Len() > 0 {
474 return e.queryByIDs(f)
475 }
476
477 var since, until int64
478 if f.Since != nil && f.Since.I64() != 0 {
479 since = f.Since.I64()
480 }
481 if f.Until != nil && f.Until.I64() != 0 {
482 until = f.Until.I64()
483 } else {
484 until = math.MaxInt64
485 }
486
487 hasKinds := f.Kinds != nil && f.Kinds.Len() > 0
488 hasAuthors := f.Authors != nil && f.Authors.Len() > 0
489 hasTags := f.Tags != nil && f.Tags.Len() > 0
490
491 var serials []uint64
492 switch {
493 case hasTags && hasKinds && hasAuthors:
494 serials = e.scanTKP(f, since, until)
495 case hasTags && hasKinds:
496 serials = e.scanTKC(f, since, until)
497 case hasTags && hasAuthors:
498 serials = e.scanTPC(f, since, until)
499 case hasTags:
500 serials = e.scanTC(f, since, until)
501 case hasKinds && hasAuthors:
502 serials = e.scanKPC(f, since, until)
503 case hasKinds:
504 serials = e.scanKC(f, since, until)
505 case hasAuthors:
506 serials = e.scanPC(f, since, until)
507 default:
508 serials = e.scanCA(since, until)
509 }
510
511 // Deduplicate.
512 seen := map[uint64]bool{}
513 deduped := serials[:0]
514 for _, s := range serials {
515 if !seen[s] {
516 seen[s] = true
517 deduped = append(deduped, s)
518 }
519 }
520
521 // Fetch and apply full filter.
522 var results event.S
523 for _, ser := range deduped {
524 ev, err := e.GetBySerial(ser)
525 if err != nil {
526 continue
527 }
528 if f.Matches(ev) {
529 results = append(results, ev)
530 }
531 }
532
533 sort.Sort(results) // reverse chronological
534
535 if f.Limit != nil && int(*f.Limit) < len(results) {
536 results = results[:*f.Limit]
537 }
538 return results, nil
539 }
540
541 // DeleteEvent removes an event and its primary index entries.
542 func (e *Engine) DeleteEvent(id []byte) error {
543 idHash := serial.IdHash(id)
544
545 // Find the serial for this event.
546 var ser uint64
547 var found bool
548 start := index.MakeEid(idHash, 0)
549 end := index.MakeEid(idHash, serial.Max)
550 e.eid.Scan(start, end, func(rec []byte) bool {
551 s := serial.Get(rec[3+serial.HashLen:])
552 if eid, ok := e.getEventIDBySerial(s); ok {
553 if bytes.Equal(eid, id) {
554 ser = s
555 found = true
556 return false
557 }
558 }
559 return true
560 })
561 if !found {
562 return fmt.Errorf("event not found")
563 }
564
565 // Read the event to get kind/pubkey/timestamp for index cleanup.
566 ev, err := e.GetBySerial(ser)
567 if err != nil {
568 return err
569 }
570
571 pubHash := serial.PubHash(ev.Pubkey)
572
573 // Delete primary index entries.
574 e.eid.Delete(index.MakeEid(idHash, ser))
575 e.ca.Delete(index.MakeCA(ev.CreatedAt, ser))
576 e.kc.Delete(index.MakeKC(ev.Kind, ev.CreatedAt, ser))
577 e.pc.Delete(index.MakePC(pubHash, ev.CreatedAt, ser))
578 e.kpc.Delete(index.MakeKPC(ev.Kind, pubHash, ev.CreatedAt, ser))
579
580 return nil
581 }
582
583 // GetByID retrieves an event by its 32-byte ID.
584 func (e *Engine) GetByID(id []byte) (*event.E, error) {
585 idHash := serial.IdHash(id)
586 start := index.MakeEid(idHash, 0)
587 end := index.MakeEid(idHash, serial.Max)
588 var found *event.E
589 e.eid.Scan(start, end, func(rec []byte) bool {
590 ser := serial.Get(rec[3+serial.HashLen:])
591 if eid, ok := e.getEventIDBySerial(ser); ok {
592 if bytes.Equal(eid, id) {
593 if ev, err := e.GetBySerial(ser); err == nil {
594 found = ev
595 return false
596 }
597 }
598 }
599 return true
600 })
601 if found == nil {
602 return nil, fmt.Errorf("event not found")
603 }
604 return found, nil
605 }
606
607 // --- internal helpers ---
608
609 func (e *Engine) hasEvent(idHash, fullID []byte) bool {
610 start := index.MakeEid(idHash, 0)
611 end := index.MakeEid(idHash, serial.Max)
612 var found bool
613 e.eid.Scan(start, end, func(rec []byte) bool {
614 ser := serial.Get(rec[3+serial.HashLen:])
615 if eid, ok := e.getEventIDBySerial(ser); ok {
616 if bytes.Equal(eid, fullID) {
617 found = true
618 return false
619 }
620 }
621 return true
622 })
623 return found
624 }
625
626 func (e *Engine) getOrCreatePubkeySerial(pubkey []byte) uint64 {
627 pubHash := serial.PubHash(pubkey)
628 start := index.MakePks(pubHash, 0)
629 end := index.MakePks(pubHash, serial.Max)
630 var found uint64
631 var exists bool
632 e.pks.Scan(start, end, func(rec []byte) bool {
633 found = serial.Get(rec[3+serial.HashLen:])
634 exists = true
635 return false
636 })
637 if exists {
638 return found
639 }
640 ser := e.nextPubSer
641 e.nextPubSer++
642 e.pks.Put(index.MakePks(pubHash, ser))
643 e.spk.Put(index.MakeSpkRec(ser, pubkey))
644 return ser
645 }
646
647 func (e *Engine) getEventIDBySerial(ser uint64) ([]byte, bool) {
648 key := index.MakeSei(ser)
649 rec, ok := e.sei.Get(key)
650 if !ok {
651 return nil, false
652 }
653 return rec[index.SeiKeyLen:], true
654 }
655
656 func (e *Engine) queryByIDs(f *filter.F) ([]*event.E, error) {
657 var results []*event.E
658 for _, id := range f.Ids.T {
659 idHash := serial.IdHash(id)
660 start := index.MakeEid(idHash, 0)
661 end := index.MakeEid(idHash, serial.Max)
662 e.eid.Scan(start, end, func(rec []byte) bool {
663 ser := serial.Get(rec[3+serial.HashLen:])
664 if fullID, ok := e.getEventIDBySerial(ser); ok {
665 if bytes.Equal(fullID, id) {
666 if ev, err := e.GetBySerial(ser); err == nil {
667 results = append(results, ev)
668 }
669 }
670 }
671 return true
672 })
673 }
674 return results, nil
675 }
676
677 func (e *Engine) collectSerials(idx *sorted.File, start, end []byte, keyLen int) []uint64 {
678 var out []uint64
679 idx.Scan(start, end, func(rec []byte) bool {
680 out = append(out, serial.Get(rec[keyLen-serial.Len:]))
681 return true
682 })
683 return out
684 }
685
686 func (e *Engine) scanCA(since, until int64) []uint64 {
687 return e.collectSerials(e.ca,
688 index.MakeCA(since, 0),
689 index.MakeCA(until, serial.Max),
690 index.CAKeyLen)
691 }
692
693 func (e *Engine) scanKC(f *filter.F, since, until int64) []uint64 {
694 var out []uint64
695 for _, k := range f.Kinds.K {
696 out = append(out, e.collectSerials(e.kc,
697 index.MakeKC(k.K, since, 0),
698 index.MakeKC(k.K, until, serial.Max),
699 index.KCKeyLen)...)
700 }
701 return out
702 }
703
704 func (e *Engine) scanPC(f *filter.F, since, until int64) []uint64 {
705 var out []uint64
706 for _, author := range f.Authors.T {
707 ph := serial.PubHash(author)
708 out = append(out, e.collectSerials(e.pc,
709 index.MakePC(ph, since, 0),
710 index.MakePC(ph, until, serial.Max),
711 index.PCKeyLen)...)
712 }
713 return out
714 }
715
716 func (e *Engine) scanKPC(f *filter.F, since, until int64) []uint64 {
717 var out []uint64
718 for _, k := range f.Kinds.K {
719 for _, author := range f.Authors.T {
720 ph := serial.PubHash(author)
721 out = append(out, e.collectSerials(e.kpc,
722 index.MakeKPC(k.K, ph, since, 0),
723 index.MakeKPC(k.K, ph, until, serial.Max),
724 index.KPCKeyLen)...)
725 }
726 }
727 return out
728 }
729
730 func (e *Engine) scanTC(f *filter.F, since, until int64) []uint64 {
731 var out []uint64
732 for _, tg := range *f.Tags {
733 if tg.Len() < 2 || len(tg.Key()) != 1 {
734 continue
735 }
736 tk := tg.Key()[0]
737 for _, val := range tg.T[1:] {
738 vh := serial.Ident(val)
739 out = append(out, e.collectSerials(e.tc,
740 index.MakeTC(tk, vh, since, 0),
741 index.MakeTC(tk, vh, until, serial.Max),
742 index.TCKeyLen)...)
743 }
744 }
745 return out
746 }
747
748 func (e *Engine) scanTKC(f *filter.F, since, until int64) []uint64 {
749 var out []uint64
750 for _, tg := range *f.Tags {
751 if tg.Len() < 2 || len(tg.Key()) != 1 {
752 continue
753 }
754 tk := tg.Key()[0]
755 for _, val := range tg.T[1:] {
756 vh := serial.Ident(val)
757 for _, k := range f.Kinds.K {
758 out = append(out, e.collectSerials(e.tkc,
759 index.MakeTKC(k.K, tk, vh, since, 0),
760 index.MakeTKC(k.K, tk, vh, until, serial.Max),
761 index.TKCKeyLen)...)
762 }
763 }
764 }
765 return out
766 }
767
768 func (e *Engine) scanTPC(f *filter.F, since, until int64) []uint64 {
769 var out []uint64
770 for _, tg := range *f.Tags {
771 if tg.Len() < 2 || len(tg.Key()) != 1 {
772 continue
773 }
774 tk := tg.Key()[0]
775 for _, val := range tg.T[1:] {
776 vh := serial.Ident(val)
777 for _, author := range f.Authors.T {
778 ph := serial.PubHash(author)
779 out = append(out, e.collectSerials(e.tpc,
780 index.MakeTPC(ph, tk, vh, since, 0),
781 index.MakeTPC(ph, tk, vh, until, serial.Max),
782 index.TPCKeyLen)...)
783 }
784 }
785 }
786 return out
787 }
788
789 func (e *Engine) scanTKP(f *filter.F, since, until int64) []uint64 {
790 var out []uint64
791 for _, tg := range *f.Tags {
792 if tg.Len() < 2 || len(tg.Key()) != 1 {
793 continue
794 }
795 tk := tg.Key()[0]
796 for _, val := range tg.T[1:] {
797 vh := serial.Ident(val)
798 for _, k := range f.Kinds.K {
799 for _, author := range f.Authors.T {
800 ph := serial.PubHash(author)
801 out = append(out, e.collectSerials(e.tkp,
802 index.MakeTKP(k.K, ph, tk, vh, since, 0),
803 index.MakeTKP(k.K, ph, tk, vh, until, serial.Max),
804 index.TKPKeyLen)...)
805 }
806 }
807 }
808 }
809 return out
810 }
811
812 // --- Graph traversal ---
813
814 // GetReferencingEvents returns serials of events that reference targetSer via e-tags.
815 // Uses the gee (reverse event-event) index.
816 func (e *Engine) GetReferencingEvents(targetSer uint64) []uint64 {
817 start := index.MakeGee(targetSer, 0, 0, 0)
818 end := index.MakeGee(targetSer, 0xFFFF, 0xFF, serial.Max)
819 var out []uint64
820 e.gee.Scan(start, end, func(rec []byte) bool {
821 out = append(out, serial.Get(rec[11:]))
822 return true
823 })
824 return out
825 }
826
827 // GetETagTargets returns serials of events that srcSer references via e-tags.
828 // Uses the eeg (forward event-event) index.
829 func (e *Engine) GetETagTargets(srcSer uint64) []uint64 {
830 start := index.MakeEeg(srcSer, 0, 0, 0)
831 end := index.MakeEeg(srcSer, serial.Max, 0xFFFF, 0xFF)
832 var out []uint64
833 e.eeg.Scan(start, end, func(rec []byte) bool {
834 out = append(out, serial.Get(rec[8:]))
835 return true
836 })
837 return out
838 }
839
840 // TraverseThread performs BFS traversal of thread structure via e-tags.
841 // Returns all event IDs (32-byte binary) reachable from seedID within maxDepth.
842 func (e *Engine) TraverseThread(seedID []byte, maxDepth int, direction string) [][]byte {
843 seedSer, ok := e.getEventSerial(seedID)
844 if !ok {
845 return nil
846 }
847 if direction == "" {
848 direction = "both"
849 }
850
851 visited := map[uint64]bool{}
852 visited[seedSer] = true
853 frontier := []uint64{seedSer}
854
855 var allIDs [][]byte
856 emptyStreak := 0
857
858 for depth := 1; depth <= maxDepth; depth++ {
859 var next []uint64
860 found := 0
861
862 for _, evSer := range frontier {
863 if direction == "both" || direction == "inbound" {
864 for _, refSer := range e.GetReferencingEvents(evSer) {
865 if visited[refSer] {
866 continue
867 }
868 visited[refSer] = true
869 if eid, ok := e.getEventIDBySerial(refSer); ok {
870 allIDs = append(allIDs, eid)
871 found++
872 }
873 next = append(next, refSer)
874 }
875 }
876 if direction == "both" || direction == "outbound" {
877 for _, tgtSer := range e.GetETagTargets(evSer) {
878 if visited[tgtSer] {
879 continue
880 }
881 visited[tgtSer] = true
882 if eid, ok := e.getEventIDBySerial(tgtSer); ok {
883 allIDs = append(allIDs, eid)
884 found++
885 }
886 next = append(next, tgtSer)
887 }
888 }
889 }
890
891 if found == 0 {
892 emptyStreak++
893 if emptyStreak >= 2 {
894 break
895 }
896 } else {
897 emptyStreak = 0
898 }
899 frontier = next
900 }
901 return allIDs
902 }
903