engine.mx raw

   1  // Package store provides the Nostr event storage engine.
   2  // It uses an append-only WAL for event data and sorted flat files
   3  // for indexes. Single cooperative thread — no locks.
   4  package store
   5  
   6  import (
   7  	"bytes"
   8  	"fmt"
   9  	"math"
  10  	"os"
  11  	"path/filepath"
  12  	"sort"
  13  
  14  	"smesh.lol/pkg/nostr/event"
  15  	"smesh.lol/pkg/nostr/filter"
  16  	"smesh.lol/pkg/nostr/hex"
  17  	"smesh.lol/pkg/store/index"
  18  	"smesh.lol/pkg/store/serial"
  19  	"smesh.lol/pkg/store/sorted"
  20  	"smesh.lol/pkg/store/wal"
  21  )
  22  
  23  // Engine is the main storage engine.
  24  type Engine struct {
  25  	dir string
  26  	w   *wal.WAL
  27  
  28  	nextPubSer uint64
  29  
  30  	eid *sorted.File
  31  	fpc *sorted.File
  32  	sei *sorted.File
  33  	ca  *sorted.File
  34  	exp *sorted.File
  35  	kc  *sorted.File
  36  	pc  *sorted.File
  37  	kpc *sorted.File
  38  	tc  *sorted.File
  39  	tkc *sorted.File
  40  	tpc *sorted.File
  41  	tkp *sorted.File
  42  	wrd *sorted.File
  43  	pks *sorted.File
  44  	spk *sorted.File
  45  
  46  	// Graph indexes.
  47  	epg *sorted.File
  48  	peg *sorted.File
  49  	eeg *sorted.File
  50  	gee *sorted.File
  51  	ppg *sorted.File
  52  	gpp *sorted.File
  53  }
  54  
  55  // Open opens or creates a store at dir.
  56  func Open(dir string) (*Engine, error) {
  57  	idxDir := filepath.Join(dir, "idx")
  58  	if err := os.MkdirAll(idxDir, 0755); err != nil {
  59  		return nil, err
  60  	}
  61  	w, err := wal.Open(filepath.Join(dir, "wal"))
  62  	if err != nil {
  63  		return nil, err
  64  	}
  65  	e := &Engine{dir: dir, w: w, nextPubSer: 1}
  66  
  67  	open := func(name string, recLen, cmpLen int) (*sorted.File, error) {
  68  		return sorted.Open(filepath.Join(idxDir, name+".dat"), recLen, cmpLen)
  69  	}
  70  	if e.eid, err = open("eid", index.EidKeyLen, index.EidKeyLen); err != nil {
  71  		return nil, err
  72  	}
  73  	if e.fpc, err = open("fpc", index.FpcKeyLen, index.FpcCmpLen); err != nil {
  74  		return nil, err
  75  	}
  76  	if e.sei, err = open("sei", index.SeiRecLen, index.SeiKeyLen); err != nil {
  77  		return nil, err
  78  	}
  79  	if e.ca, err = open("ca", index.CAKeyLen, index.CAKeyLen); err != nil {
  80  		return nil, err
  81  	}
  82  	if e.exp, err = open("exp", index.ExpKeyLen, index.ExpKeyLen); err != nil {
  83  		return nil, err
  84  	}
  85  	if e.kc, err = open("kc", index.KCKeyLen, index.KCKeyLen); err != nil {
  86  		return nil, err
  87  	}
  88  	if e.pc, err = open("pc", index.PCKeyLen, index.PCKeyLen); err != nil {
  89  		return nil, err
  90  	}
  91  	if e.kpc, err = open("kpc", index.KPCKeyLen, index.KPCKeyLen); err != nil {
  92  		return nil, err
  93  	}
  94  	if e.tc, err = open("tc", index.TCKeyLen, index.TCKeyLen); err != nil {
  95  		return nil, err
  96  	}
  97  	if e.tkc, err = open("tkc", index.TKCKeyLen, index.TKCKeyLen); err != nil {
  98  		return nil, err
  99  	}
 100  	if e.tpc, err = open("tpc", index.TPCKeyLen, index.TPCKeyLen); err != nil {
 101  		return nil, err
 102  	}
 103  	if e.tkp, err = open("tkp", index.TKPKeyLen, index.TKPKeyLen); err != nil {
 104  		return nil, err
 105  	}
 106  	if e.wrd, err = open("wrd", index.WrdKeyLen, index.WrdKeyLen); err != nil {
 107  		return nil, err
 108  	}
 109  	if e.pks, err = open("pks", index.PksKeyLen, index.PksKeyLen); err != nil {
 110  		return nil, err
 111  	}
 112  	if e.spk, err = open("spk", index.SpkRecLen, index.SpkKeyLen); err != nil {
 113  		return nil, err
 114  	}
 115  	if e.epg, err = open("epg", index.EpgKeyLen, index.EpgKeyLen); err != nil {
 116  		return nil, err
 117  	}
 118  	if e.peg, err = open("peg", index.PegKeyLen, index.PegKeyLen); err != nil {
 119  		return nil, err
 120  	}
 121  	if e.eeg, err = open("eeg", index.EegKeyLen, index.EegKeyLen); err != nil {
 122  		return nil, err
 123  	}
 124  	if e.gee, err = open("gee", index.GeeKeyLen, index.GeeKeyLen); err != nil {
 125  		return nil, err
 126  	}
 127  	if e.ppg, err = open("ppg", index.PpgKeyLen, index.PpgKeyLen); err != nil {
 128  		return nil, err
 129  	}
 130  	if e.gpp, err = open("gpp", index.GppKeyLen, index.GppKeyLen); err != nil {
 131  		return nil, err
 132  	}
 133  
 134  	// Check for clean shutdown. If marker is missing, recover indexes from WAL.
 135  	cleanPath := filepath.Join(dir, ".clean")
 136  	if _, err := os.Stat(cleanPath); os.IsNotExist(err) {
 137  		fmt.Println("store: clean marker missing, rebuilding indexes from WAL...")
 138  		if err := e.rebuildIndexes(); err != nil {
 139  			return nil, fmt.Errorf("store: recovery failed: %w", err)
 140  		}
 141  		if err := e.Flush(); err != nil {
 142  			return nil, err
 143  		}
 144  		e.writeCleanMarker()
 145  	}
 146  
 147  	// Recover next pubkey serial from the spk index.
 148  	if rec, ok := e.spk.Last(); ok {
 149  		e.nextPubSer = serial.Get(rec[3:]) + 1
 150  	}
 151  	return e, nil
 152  }
 153  
 154  // Close flushes all indexes and closes the WAL.
 155  func (e *Engine) Close() error {
 156  	files := e.allFiles()
 157  	for _, f := range files {
 158  		if err := f.Close(); err != nil {
 159  			return err
 160  		}
 161  	}
 162  	return e.w.Close()
 163  }
 164  
 165  // Flush writes all buffered index data to disk and syncs the WAL.
 166  func (e *Engine) Flush() error {
 167  	for _, f := range e.allFiles() {
 168  		if err := f.Flush(); err != nil {
 169  			return err
 170  		}
 171  	}
 172  	if err := e.w.Sync(); err != nil {
 173  		return err
 174  	}
 175  	e.writeCleanMarker()
 176  	return nil
 177  }
 178  
 179  func (e *Engine) writeCleanMarker() {
 180  	os.WriteFile(filepath.Join(e.dir, ".clean"), []byte("ok"), 0644)
 181  }
 182  
 183  func (e *Engine) removeCleanMarker() {
 184  	os.Remove(filepath.Join(e.dir, ".clean"))
 185  }
 186  
 187  // rebuildIndexes clears all indexes and rebuilds from WAL.
 188  func (e *Engine) rebuildIndexes() error {
 189  	for _, f := range e.allFiles() {
 190  		if err := f.Clear(); err != nil {
 191  			return err
 192  		}
 193  	}
 194  	e.nextPubSer = 1
 195  
 196  	count := 0
 197  	err := e.w.ForEach(func(ser uint64, data []byte) bool {
 198  		ev := event.New()
 199  		if err := ev.UnmarshalBinary(bytes.NewReader(data)); err != nil {
 200  			return true // skip corrupt entries
 201  		}
 202  
 203  		pubHash := serial.PubHash(ev.Pubkey)
 204  		idHash := serial.IdHash(ev.ID)
 205  
 206  		e.eid.Put(index.MakeEid(idHash, ser))
 207  		e.sei.Put(index.MakeSeiRec(ser, ev.ID))
 208  		e.fpc.Put(index.MakeFpc(ser, ev.ID, pubHash, ev.CreatedAt))
 209  		e.ca.Put(index.MakeCA(ev.CreatedAt, ser))
 210  		e.kc.Put(index.MakeKC(ev.Kind, ev.CreatedAt, ser))
 211  		e.pc.Put(index.MakePC(pubHash, ev.CreatedAt, ser))
 212  		e.kpc.Put(index.MakeKPC(ev.Kind, pubHash, ev.CreatedAt, ser))
 213  
 214  		if ev.Tags != nil {
 215  			for _, tg := range *ev.Tags {
 216  				if tg.Len() < 2 {
 217  					continue
 218  				}
 219  				key := tg.Key()
 220  				if len(key) != 1 {
 221  					continue
 222  				}
 223  				tagKey := key[0]
 224  				val := tg.ValueHex()
 225  				if len(val) == 0 {
 226  					continue
 227  				}
 228  				valHash := serial.Ident(val)
 229  				e.tc.Put(index.MakeTC(tagKey, valHash, ev.CreatedAt, ser))
 230  				e.tkc.Put(index.MakeTKC(ev.Kind, tagKey, valHash, ev.CreatedAt, ser))
 231  				e.tpc.Put(index.MakeTPC(pubHash, tagKey, valHash, ev.CreatedAt, ser))
 232  				e.tkp.Put(index.MakeTKP(ev.Kind, pubHash, tagKey, valHash, ev.CreatedAt, ser))
 233  			}
 234  		}
 235  
 236  		if len(ev.Content) > 0 {
 237  			words := splitWords(ev.Content)
 238  			seen := map[string]bool{}
 239  			for _, w := range words {
 240  				if seen[string(w)] {
 241  					continue
 242  				}
 243  				seen[string(w)] = true
 244  				e.wrd.Put(index.MakeWrd(serial.Ident(w), ser))
 245  			}
 246  		}
 247  
 248  		e.populateGraphEdges(ev, ser)
 249  		count++
 250  		return true
 251  	})
 252  	if err != nil {
 253  		return err
 254  	}
 255  	fmt.Println("store: rebuilt indexes for", count, "events")
 256  	return nil
 257  }
 258  
 259  func (e *Engine) allFiles() []*sorted.File {
 260  	return []*sorted.File{
 261  		e.eid, e.fpc, e.sei, e.ca, e.exp, e.kc, e.pc, e.kpc,
 262  		e.tc, e.tkc, e.tpc, e.tkp, e.wrd, e.pks, e.spk,
 263  		e.epg, e.peg, e.eeg, e.gee, e.ppg, e.gpp,
 264  	}
 265  }
 266  
 267  // SaveEvent persists an event and all its indexes.
 268  func (e *Engine) SaveEvent(ev *event.E) error {
 269  	e.removeCleanMarker()
 270  
 271  	// Duplicate check via ID hash.
 272  	idHash := serial.IdHash(ev.ID)
 273  	if e.hasEvent(idHash, ev.ID) {
 274  		return fmt.Errorf("duplicate event")
 275  	}
 276  
 277  	// Write event bytes to WAL → serial.
 278  	data := ev.MarshalBinaryToBytes(nil)
 279  	ser, err := e.w.Append(data)
 280  	if err != nil {
 281  		return err
 282  	}
 283  
 284  	pubHash := serial.PubHash(ev.Pubkey)
 285  
 286  	// Core mappings.
 287  	e.eid.Put(index.MakeEid(idHash, ser))
 288  	e.sei.Put(index.MakeSeiRec(ser, ev.ID))
 289  	e.fpc.Put(index.MakeFpc(ser, ev.ID, pubHash, ev.CreatedAt))
 290  
 291  	// Query indexes.
 292  	e.ca.Put(index.MakeCA(ev.CreatedAt, ser))
 293  	e.kc.Put(index.MakeKC(ev.Kind, ev.CreatedAt, ser))
 294  	e.pc.Put(index.MakePC(pubHash, ev.CreatedAt, ser))
 295  	e.kpc.Put(index.MakeKPC(ev.Kind, pubHash, ev.CreatedAt, ser))
 296  
 297  	// Tag indexes.
 298  	if ev.Tags != nil {
 299  		for _, tg := range *ev.Tags {
 300  			if tg.Len() < 2 {
 301  				continue
 302  			}
 303  			key := tg.Key()
 304  			if len(key) != 1 {
 305  				continue
 306  			}
 307  			tagKey := key[0]
 308  			val := tg.ValueHex()
 309  			if len(val) == 0 {
 310  				continue
 311  			}
 312  			valHash := serial.Ident(val)
 313  			e.tc.Put(index.MakeTC(tagKey, valHash, ev.CreatedAt, ser))
 314  			e.tkc.Put(index.MakeTKC(ev.Kind, tagKey, valHash, ev.CreatedAt, ser))
 315  			e.tpc.Put(index.MakeTPC(pubHash, tagKey, valHash, ev.CreatedAt, ser))
 316  			e.tkp.Put(index.MakeTKP(ev.Kind, pubHash, tagKey, valHash, ev.CreatedAt, ser))
 317  		}
 318  	}
 319  
 320  	// Word index for search.
 321  	if len(ev.Content) > 0 {
 322  		words := splitWords(ev.Content)
 323  		seen := map[string]bool{}
 324  		for _, w := range words {
 325  			if seen[string(w)] {
 326  				continue
 327  			}
 328  			seen[string(w)] = true
 329  			e.wrd.Put(index.MakeWrd(serial.Ident(w), ser))
 330  		}
 331  	}
 332  
 333  	// Pubkey serial mapping + graph edges.
 334  	e.populateGraphEdges(ev, ser)
 335  	return nil
 336  }
 337  
 338  // populateGraphEdges creates all graph index entries for an event.
 339  func (e *Engine) populateGraphEdges(ev *event.E, ser uint64) {
 340  	authorSer := e.getOrCreatePubkeySerial(ev.Pubkey)
 341  	kind := ev.Kind
 342  
 343  	// epg/peg: author edge.
 344  	e.epg.Put(index.MakeEpg(ser, authorSer, kind, index.DirAuthor))
 345  	e.peg.Put(index.MakePeg(authorSer, kind, index.DirAuthor, ser))
 346  
 347  	if ev.Tags == nil {
 348  		return
 349  	}
 350  
 351  	for _, tg := range *ev.Tags {
 352  		if tg.Len() < 2 {
 353  			continue
 354  		}
 355  		key := tg.Key()
 356  		if len(key) != 1 {
 357  			continue
 358  		}
 359  
 360  		switch key[0] {
 361  		case 'p':
 362  			valHex := tg.ValueHex()
 363  			if len(valHex) != 64 {
 364  				continue
 365  			}
 366  			pkBytes, err := hex.Dec(string(valHex))
 367  			if err != nil || len(pkBytes) != 32 {
 368  				continue
 369  			}
 370  			pkSer := e.getOrCreatePubkeySerial(pkBytes)
 371  
 372  			// epg/peg: p-tag edges.
 373  			e.epg.Put(index.MakeEpg(ser, pkSer, kind, index.DirPTagOut))
 374  			e.peg.Put(index.MakePeg(pkSer, kind, index.DirPTagIn, ser))
 375  
 376  			// ppg/gpp: direct pubkey-to-pubkey edge (skip self-reference).
 377  			if pkSer != authorSer {
 378  				e.ppg.Put(index.MakePpg(authorSer, pkSer, kind, index.DirPKOut, ser))
 379  				e.gpp.Put(index.MakeGpp(pkSer, kind, index.DirPKIn, authorSer, ser))
 380  			}
 381  
 382  		case 'e':
 383  			valHex := tg.ValueHex()
 384  			if len(valHex) != 64 {
 385  				continue
 386  			}
 387  			evBytes, err := hex.Dec(string(valHex))
 388  			if err != nil || len(evBytes) != 32 {
 389  				continue
 390  			}
 391  			// Look up target event serial — only create edge if target exists.
 392  			tgtSer, ok := e.getEventSerial(evBytes)
 393  			if !ok {
 394  				continue
 395  			}
 396  			// eeg/gee: e-tag edges.
 397  			e.eeg.Put(index.MakeEeg(ser, tgtSer, kind, index.DirETagOut))
 398  			e.gee.Put(index.MakeGee(tgtSer, kind, index.DirETagIn, ser))
 399  		}
 400  	}
 401  }
 402  
 403  // getEventSerial returns the WAL serial for a 32-byte event ID, if it exists.
 404  func (e *Engine) getEventSerial(id []byte) (uint64, bool) {
 405  	idHash := serial.IdHash(id)
 406  	start := index.MakeEid(idHash, 0)
 407  	end := index.MakeEid(idHash, serial.Max)
 408  	var found uint64
 409  	var ok bool
 410  	e.eid.Scan(start, end, func(rec []byte) bool {
 411  		s := serial.Get(rec[3+serial.HashLen:])
 412  		if eid, exists := e.getEventIDBySerial(s); exists {
 413  			if bytes.Equal(eid, id) {
 414  				found = s
 415  				ok = true
 416  				return false
 417  			}
 418  		}
 419  		return true
 420  	})
 421  	return found, ok
 422  }
 423  
 424  // SearchWord returns serials of events containing the given word.
 425  func (e *Engine) SearchWord(word []byte) []uint64 {
 426  	wh := serial.Ident(word)
 427  	return e.collectSerials(e.wrd,
 428  		index.MakeWrd(wh, 0),
 429  		index.MakeWrd(wh, serial.Max),
 430  		index.WrdKeyLen)
 431  }
 432  
 433  func splitWords(content []byte) [][]byte {
 434  	var words [][]byte
 435  	var word []byte
 436  	for _, b := range content {
 437  		if b >= 'A' && b <= 'Z' {
 438  			word = append(word, b+32)
 439  		} else if (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9') {
 440  			word = append(word, b)
 441  		} else {
 442  			if len(word) >= 3 {
 443  				w := []byte{:len(word)}
 444  				copy(w, word)
 445  				words = append(words, w)
 446  			}
 447  			word = word[:0]
 448  		}
 449  	}
 450  	if len(word) >= 3 {
 451  		w := []byte{:len(word)}
 452  		copy(w, word)
 453  		words = append(words, w)
 454  	}
 455  	return words
 456  }
 457  
 458  // GetBySerial reads an event from the WAL by its serial.
 459  func (e *Engine) GetBySerial(ser uint64) (*event.E, error) {
 460  	data, err := e.w.Read(ser)
 461  	if err != nil {
 462  		return nil, err
 463  	}
 464  	ev := event.New()
 465  	if err := ev.UnmarshalBinary(bytes.NewReader(data)); err != nil {
 466  		return nil, err
 467  	}
 468  	return ev, nil
 469  }
 470  
 471  // QueryEvents returns events matching the filter, sorted newest-first.
 472  func (e *Engine) QueryEvents(f *filter.F) ([]*event.E, error) {
 473  	if f.Ids != nil && f.Ids.Len() > 0 {
 474  		return e.queryByIDs(f)
 475  	}
 476  
 477  	var since, until int64
 478  	if f.Since != nil && f.Since.I64() != 0 {
 479  		since = f.Since.I64()
 480  	}
 481  	if f.Until != nil && f.Until.I64() != 0 {
 482  		until = f.Until.I64()
 483  	} else {
 484  		until = math.MaxInt64
 485  	}
 486  
 487  	hasKinds := f.Kinds != nil && f.Kinds.Len() > 0
 488  	hasAuthors := f.Authors != nil && f.Authors.Len() > 0
 489  	hasTags := f.Tags != nil && f.Tags.Len() > 0
 490  
 491  	var serials []uint64
 492  	switch {
 493  	case hasTags && hasKinds && hasAuthors:
 494  		serials = e.scanTKP(f, since, until)
 495  	case hasTags && hasKinds:
 496  		serials = e.scanTKC(f, since, until)
 497  	case hasTags && hasAuthors:
 498  		serials = e.scanTPC(f, since, until)
 499  	case hasTags:
 500  		serials = e.scanTC(f, since, until)
 501  	case hasKinds && hasAuthors:
 502  		serials = e.scanKPC(f, since, until)
 503  	case hasKinds:
 504  		serials = e.scanKC(f, since, until)
 505  	case hasAuthors:
 506  		serials = e.scanPC(f, since, until)
 507  	default:
 508  		serials = e.scanCA(since, until)
 509  	}
 510  
 511  	// Deduplicate.
 512  	seen := map[uint64]bool{}
 513  	deduped := serials[:0]
 514  	for _, s := range serials {
 515  		if !seen[s] {
 516  			seen[s] = true
 517  			deduped = append(deduped, s)
 518  		}
 519  	}
 520  
 521  	// Fetch and apply full filter.
 522  	var results event.S
 523  	for _, ser := range deduped {
 524  		ev, err := e.GetBySerial(ser)
 525  		if err != nil {
 526  			continue
 527  		}
 528  		if f.Matches(ev) {
 529  			results = append(results, ev)
 530  		}
 531  	}
 532  
 533  	sort.Sort(results) // reverse chronological
 534  
 535  	if f.Limit != nil && int(*f.Limit) < len(results) {
 536  		results = results[:*f.Limit]
 537  	}
 538  	return results, nil
 539  }
 540  
 541  // DeleteEvent removes an event and its primary index entries.
 542  func (e *Engine) DeleteEvent(id []byte) error {
 543  	idHash := serial.IdHash(id)
 544  
 545  	// Find the serial for this event.
 546  	var ser uint64
 547  	var found bool
 548  	start := index.MakeEid(idHash, 0)
 549  	end := index.MakeEid(idHash, serial.Max)
 550  	e.eid.Scan(start, end, func(rec []byte) bool {
 551  		s := serial.Get(rec[3+serial.HashLen:])
 552  		if eid, ok := e.getEventIDBySerial(s); ok {
 553  			if bytes.Equal(eid, id) {
 554  				ser = s
 555  				found = true
 556  				return false
 557  			}
 558  		}
 559  		return true
 560  	})
 561  	if !found {
 562  		return fmt.Errorf("event not found")
 563  	}
 564  
 565  	// Read the event to get kind/pubkey/timestamp for index cleanup.
 566  	ev, err := e.GetBySerial(ser)
 567  	if err != nil {
 568  		return err
 569  	}
 570  
 571  	pubHash := serial.PubHash(ev.Pubkey)
 572  
 573  	// Delete primary index entries.
 574  	e.eid.Delete(index.MakeEid(idHash, ser))
 575  	e.ca.Delete(index.MakeCA(ev.CreatedAt, ser))
 576  	e.kc.Delete(index.MakeKC(ev.Kind, ev.CreatedAt, ser))
 577  	e.pc.Delete(index.MakePC(pubHash, ev.CreatedAt, ser))
 578  	e.kpc.Delete(index.MakeKPC(ev.Kind, pubHash, ev.CreatedAt, ser))
 579  
 580  	return nil
 581  }
 582  
 583  // GetByID retrieves an event by its 32-byte ID.
 584  func (e *Engine) GetByID(id []byte) (*event.E, error) {
 585  	idHash := serial.IdHash(id)
 586  	start := index.MakeEid(idHash, 0)
 587  	end := index.MakeEid(idHash, serial.Max)
 588  	var found *event.E
 589  	e.eid.Scan(start, end, func(rec []byte) bool {
 590  		ser := serial.Get(rec[3+serial.HashLen:])
 591  		if eid, ok := e.getEventIDBySerial(ser); ok {
 592  			if bytes.Equal(eid, id) {
 593  				if ev, err := e.GetBySerial(ser); err == nil {
 594  					found = ev
 595  					return false
 596  				}
 597  			}
 598  		}
 599  		return true
 600  	})
 601  	if found == nil {
 602  		return nil, fmt.Errorf("event not found")
 603  	}
 604  	return found, nil
 605  }
 606  
 607  // --- internal helpers ---
 608  
 609  func (e *Engine) hasEvent(idHash, fullID []byte) bool {
 610  	start := index.MakeEid(idHash, 0)
 611  	end := index.MakeEid(idHash, serial.Max)
 612  	var found bool
 613  	e.eid.Scan(start, end, func(rec []byte) bool {
 614  		ser := serial.Get(rec[3+serial.HashLen:])
 615  		if eid, ok := e.getEventIDBySerial(ser); ok {
 616  			if bytes.Equal(eid, fullID) {
 617  				found = true
 618  				return false
 619  			}
 620  		}
 621  		return true
 622  	})
 623  	return found
 624  }
 625  
 626  func (e *Engine) getOrCreatePubkeySerial(pubkey []byte) uint64 {
 627  	pubHash := serial.PubHash(pubkey)
 628  	start := index.MakePks(pubHash, 0)
 629  	end := index.MakePks(pubHash, serial.Max)
 630  	var found uint64
 631  	var exists bool
 632  	e.pks.Scan(start, end, func(rec []byte) bool {
 633  		found = serial.Get(rec[3+serial.HashLen:])
 634  		exists = true
 635  		return false
 636  	})
 637  	if exists {
 638  		return found
 639  	}
 640  	ser := e.nextPubSer
 641  	e.nextPubSer++
 642  	e.pks.Put(index.MakePks(pubHash, ser))
 643  	e.spk.Put(index.MakeSpkRec(ser, pubkey))
 644  	return ser
 645  }
 646  
 647  func (e *Engine) getEventIDBySerial(ser uint64) ([]byte, bool) {
 648  	key := index.MakeSei(ser)
 649  	rec, ok := e.sei.Get(key)
 650  	if !ok {
 651  		return nil, false
 652  	}
 653  	return rec[index.SeiKeyLen:], true
 654  }
 655  
 656  func (e *Engine) queryByIDs(f *filter.F) ([]*event.E, error) {
 657  	var results []*event.E
 658  	for _, id := range f.Ids.T {
 659  		idHash := serial.IdHash(id)
 660  		start := index.MakeEid(idHash, 0)
 661  		end := index.MakeEid(idHash, serial.Max)
 662  		e.eid.Scan(start, end, func(rec []byte) bool {
 663  			ser := serial.Get(rec[3+serial.HashLen:])
 664  			if fullID, ok := e.getEventIDBySerial(ser); ok {
 665  				if bytes.Equal(fullID, id) {
 666  					if ev, err := e.GetBySerial(ser); err == nil {
 667  						results = append(results, ev)
 668  					}
 669  				}
 670  			}
 671  			return true
 672  		})
 673  	}
 674  	return results, nil
 675  }
 676  
 677  func (e *Engine) collectSerials(idx *sorted.File, start, end []byte, keyLen int) []uint64 {
 678  	var out []uint64
 679  	idx.Scan(start, end, func(rec []byte) bool {
 680  		out = append(out, serial.Get(rec[keyLen-serial.Len:]))
 681  		return true
 682  	})
 683  	return out
 684  }
 685  
 686  func (e *Engine) scanCA(since, until int64) []uint64 {
 687  	return e.collectSerials(e.ca,
 688  		index.MakeCA(since, 0),
 689  		index.MakeCA(until, serial.Max),
 690  		index.CAKeyLen)
 691  }
 692  
 693  func (e *Engine) scanKC(f *filter.F, since, until int64) []uint64 {
 694  	var out []uint64
 695  	for _, k := range f.Kinds.K {
 696  		out = append(out, e.collectSerials(e.kc,
 697  			index.MakeKC(k.K, since, 0),
 698  			index.MakeKC(k.K, until, serial.Max),
 699  			index.KCKeyLen)...)
 700  	}
 701  	return out
 702  }
 703  
 704  func (e *Engine) scanPC(f *filter.F, since, until int64) []uint64 {
 705  	var out []uint64
 706  	for _, author := range f.Authors.T {
 707  		ph := serial.PubHash(author)
 708  		out = append(out, e.collectSerials(e.pc,
 709  			index.MakePC(ph, since, 0),
 710  			index.MakePC(ph, until, serial.Max),
 711  			index.PCKeyLen)...)
 712  	}
 713  	return out
 714  }
 715  
 716  func (e *Engine) scanKPC(f *filter.F, since, until int64) []uint64 {
 717  	var out []uint64
 718  	for _, k := range f.Kinds.K {
 719  		for _, author := range f.Authors.T {
 720  			ph := serial.PubHash(author)
 721  			out = append(out, e.collectSerials(e.kpc,
 722  				index.MakeKPC(k.K, ph, since, 0),
 723  				index.MakeKPC(k.K, ph, until, serial.Max),
 724  				index.KPCKeyLen)...)
 725  		}
 726  	}
 727  	return out
 728  }
 729  
 730  func (e *Engine) scanTC(f *filter.F, since, until int64) []uint64 {
 731  	var out []uint64
 732  	for _, tg := range *f.Tags {
 733  		if tg.Len() < 2 || len(tg.Key()) != 1 {
 734  			continue
 735  		}
 736  		tk := tg.Key()[0]
 737  		for _, val := range tg.T[1:] {
 738  			vh := serial.Ident(val)
 739  			out = append(out, e.collectSerials(e.tc,
 740  				index.MakeTC(tk, vh, since, 0),
 741  				index.MakeTC(tk, vh, until, serial.Max),
 742  				index.TCKeyLen)...)
 743  		}
 744  	}
 745  	return out
 746  }
 747  
 748  func (e *Engine) scanTKC(f *filter.F, since, until int64) []uint64 {
 749  	var out []uint64
 750  	for _, tg := range *f.Tags {
 751  		if tg.Len() < 2 || len(tg.Key()) != 1 {
 752  			continue
 753  		}
 754  		tk := tg.Key()[0]
 755  		for _, val := range tg.T[1:] {
 756  			vh := serial.Ident(val)
 757  			for _, k := range f.Kinds.K {
 758  				out = append(out, e.collectSerials(e.tkc,
 759  					index.MakeTKC(k.K, tk, vh, since, 0),
 760  					index.MakeTKC(k.K, tk, vh, until, serial.Max),
 761  					index.TKCKeyLen)...)
 762  			}
 763  		}
 764  	}
 765  	return out
 766  }
 767  
 768  func (e *Engine) scanTPC(f *filter.F, since, until int64) []uint64 {
 769  	var out []uint64
 770  	for _, tg := range *f.Tags {
 771  		if tg.Len() < 2 || len(tg.Key()) != 1 {
 772  			continue
 773  		}
 774  		tk := tg.Key()[0]
 775  		for _, val := range tg.T[1:] {
 776  			vh := serial.Ident(val)
 777  			for _, author := range f.Authors.T {
 778  				ph := serial.PubHash(author)
 779  				out = append(out, e.collectSerials(e.tpc,
 780  					index.MakeTPC(ph, tk, vh, since, 0),
 781  					index.MakeTPC(ph, tk, vh, until, serial.Max),
 782  					index.TPCKeyLen)...)
 783  			}
 784  		}
 785  	}
 786  	return out
 787  }
 788  
 789  func (e *Engine) scanTKP(f *filter.F, since, until int64) []uint64 {
 790  	var out []uint64
 791  	for _, tg := range *f.Tags {
 792  		if tg.Len() < 2 || len(tg.Key()) != 1 {
 793  			continue
 794  		}
 795  		tk := tg.Key()[0]
 796  		for _, val := range tg.T[1:] {
 797  			vh := serial.Ident(val)
 798  			for _, k := range f.Kinds.K {
 799  				for _, author := range f.Authors.T {
 800  					ph := serial.PubHash(author)
 801  					out = append(out, e.collectSerials(e.tkp,
 802  						index.MakeTKP(k.K, ph, tk, vh, since, 0),
 803  						index.MakeTKP(k.K, ph, tk, vh, until, serial.Max),
 804  						index.TKPKeyLen)...)
 805  				}
 806  			}
 807  		}
 808  	}
 809  	return out
 810  }
 811  
 812  // --- Graph traversal ---
 813  
 814  // GetReferencingEvents returns serials of events that reference targetSer via e-tags.
 815  // Uses the gee (reverse event-event) index.
 816  func (e *Engine) GetReferencingEvents(targetSer uint64) []uint64 {
 817  	start := index.MakeGee(targetSer, 0, 0, 0)
 818  	end := index.MakeGee(targetSer, 0xFFFF, 0xFF, serial.Max)
 819  	var out []uint64
 820  	e.gee.Scan(start, end, func(rec []byte) bool {
 821  		out = append(out, serial.Get(rec[11:]))
 822  		return true
 823  	})
 824  	return out
 825  }
 826  
 827  // GetETagTargets returns serials of events that srcSer references via e-tags.
 828  // Uses the eeg (forward event-event) index.
 829  func (e *Engine) GetETagTargets(srcSer uint64) []uint64 {
 830  	start := index.MakeEeg(srcSer, 0, 0, 0)
 831  	end := index.MakeEeg(srcSer, serial.Max, 0xFFFF, 0xFF)
 832  	var out []uint64
 833  	e.eeg.Scan(start, end, func(rec []byte) bool {
 834  		out = append(out, serial.Get(rec[8:]))
 835  		return true
 836  	})
 837  	return out
 838  }
 839  
 840  // TraverseThread performs BFS traversal of thread structure via e-tags.
 841  // Returns all event IDs (32-byte binary) reachable from seedID within maxDepth.
 842  func (e *Engine) TraverseThread(seedID []byte, maxDepth int, direction string) [][]byte {
 843  	seedSer, ok := e.getEventSerial(seedID)
 844  	if !ok {
 845  		return nil
 846  	}
 847  	if direction == "" {
 848  		direction = "both"
 849  	}
 850  
 851  	visited := map[uint64]bool{}
 852  	visited[seedSer] = true
 853  	frontier := []uint64{seedSer}
 854  
 855  	var allIDs [][]byte
 856  	emptyStreak := 0
 857  
 858  	for depth := 1; depth <= maxDepth; depth++ {
 859  		var next []uint64
 860  		found := 0
 861  
 862  		for _, evSer := range frontier {
 863  			if direction == "both" || direction == "inbound" {
 864  				for _, refSer := range e.GetReferencingEvents(evSer) {
 865  					if visited[refSer] {
 866  						continue
 867  					}
 868  					visited[refSer] = true
 869  					if eid, ok := e.getEventIDBySerial(refSer); ok {
 870  						allIDs = append(allIDs, eid)
 871  						found++
 872  					}
 873  					next = append(next, refSer)
 874  				}
 875  			}
 876  			if direction == "both" || direction == "outbound" {
 877  				for _, tgtSer := range e.GetETagTargets(evSer) {
 878  					if visited[tgtSer] {
 879  						continue
 880  					}
 881  					visited[tgtSer] = true
 882  					if eid, ok := e.getEventIDBySerial(tgtSer); ok {
 883  						allIDs = append(allIDs, eid)
 884  						found++
 885  					}
 886  					next = append(next, tgtSer)
 887  				}
 888  			}
 889  		}
 890  
 891  		if found == 0 {
 892  			emptyStreak++
 893  			if emptyStreak >= 2 {
 894  				break
 895  			}
 896  		} else {
 897  			emptyStreak = 0
 898  		}
 899  		frontier = next
 900  	}
 901  	return allIDs
 902  }
 903