migrations.go raw
1 //go:build !(js && wasm)
2
3 package database
4
5 import (
6 "bytes"
7 "context"
8 "sort"
9
10 "github.com/dgraph-io/badger/v4"
11 "next.orly.dev/pkg/lol/chk"
12 "next.orly.dev/pkg/lol/log"
13 "next.orly.dev/pkg/database/indexes"
14 "next.orly.dev/pkg/database/indexes/types"
15 "next.orly.dev/pkg/nostr/encoders/event"
16 "next.orly.dev/pkg/nostr/encoders/hex"
17 "next.orly.dev/pkg/nostr/encoders/ints"
18 "next.orly.dev/pkg/nostr/encoders/kind"
19 )
20
21 const (
22 currentVersion uint32 = 10
23 )
24
25 func (d *D) RunMigrations() {
26 var err error
27 var dbVersion uint32
28 // first find the current version tag if any
29 if err = d.View(
30 func(txn *badger.Txn) (err error) {
31 buf := new(bytes.Buffer)
32 if err = indexes.VersionEnc(nil).MarshalWrite(buf); chk.E(err) {
33 return
34 }
35 verPrf := new(bytes.Buffer)
36 if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) {
37 return
38 }
39 it := txn.NewIterator(
40 badger.IteratorOptions{
41 Prefix: verPrf.Bytes(),
42 },
43 )
44 defer it.Close()
45 ver := indexes.VersionVars()
46 for it.Rewind(); it.Valid(); it.Next() {
47 // there should only be one
48 item := it.Item()
49 key := item.Key()
50 if err = indexes.VersionDec(ver).UnmarshalRead(
51 bytes.NewBuffer(key),
52 ); chk.E(err) {
53 return
54 }
55 log.I.F("found version tag: %d", ver.Get())
56 dbVersion = ver.Get()
57 }
58 return
59 },
60 ); chk.E(err) {
61 }
62 if dbVersion == 0 {
63 // write the version tag now (ensure any old tags are removed first)
64 if err = d.writeVersionTag(currentVersion); chk.E(err) {
65 return
66 }
67 }
68 if dbVersion < 1 {
69 log.I.F("migrating to version 1...")
70 // the first migration is expiration tags
71 d.UpdateExpirationTags()
72 // bump to version 1
73 _ = d.writeVersionTag(1)
74 }
75 if dbVersion < 2 {
76 log.I.F("migrating to version 2...")
77 // backfill word indexes
78 d.UpdateWordIndexes()
79 // bump to version 2
80 _ = d.writeVersionTag(2)
81 }
82 if dbVersion < 3 {
83 log.I.F("migrating to version 3...")
84 // clean up ephemeral events that should never have been stored
85 d.CleanupEphemeralEvents()
86 // bump to version 3
87 _ = d.writeVersionTag(3)
88 }
89 if dbVersion < 4 {
90 log.I.F("migrating to version 4...")
91 // convert small events to inline storage (Reiser4 optimization)
92 d.ConvertSmallEventsToInline()
93 // bump to version 4
94 _ = d.writeVersionTag(4)
95 }
96 if dbVersion < 5 {
97 log.I.F("migrating to version 5...")
98 // re-encode events with optimized tag binary format (e/p tags)
99 d.ReencodeEventsWithOptimizedTags()
100 // bump to version 5
101 _ = d.writeVersionTag(5)
102 }
103 if dbVersion < 6 {
104 log.I.F("migrating to version 6...")
105 // convert events to compact serial-reference format
106 // This replaces 32-byte IDs/pubkeys with 5-byte serial references
107 d.ConvertToCompactEventFormat()
108 // bump to version 6
109 _ = d.writeVersionTag(6)
110 }
111 if dbVersion < 7 {
112 log.I.F("migrating to version 7...")
113 // Rebuild word indexes with unicode normalization (small caps, fraktur → ASCII)
114 // This consolidates duplicate indexes from decorative unicode text
115 d.RebuildWordIndexesWithNormalization()
116 // bump to version 7
117 _ = d.writeVersionTag(7)
118 }
119 if dbVersion < 8 {
120 log.I.F("migrating to version 8...")
121 // Backfill e-tag graph indexes (eeg/gee) for graph query support
122 // This creates edges for all existing events with e-tags
123 d.BackfillETagGraph()
124 // bump to version 8
125 _ = d.writeVersionTag(8)
126 }
127 if dbVersion < 9 {
128 log.I.F("migrating to version 9...")
129 // Backfill SerialEventId mappings for legacy events that were skipped
130 // during the v6 compact format migration because their ID starts with 0x01
131 // (which was mistakenly interpreted as CompactFormatVersion)
132 d.BackfillMissingSerialEventIdMappings()
133 // bump to version 9
134 _ = d.writeVersionTag(9)
135 }
136 if dbVersion < 10 {
137 log.I.F("migrating to version 10...")
138 // Backfill pubkey-to-pubkey (noun-noun) graph indexes (ppg/gpp)
139 // This materializes direct pubkey→pubkey edges from p-tag relationships,
140 // collapsing the two-hop pubkey→event→pubkey traversal into single-hop lookups
141 d.BackfillPubkeyPubkeyGraph()
142 // bump to version 10
143 _ = d.writeVersionTag(10)
144 }
145 }
146
147 // writeVersionTag writes a new version tag key to the database (no value)
148 func (d *D) writeVersionTag(ver uint32) (err error) {
149 return d.Update(
150 func(txn *badger.Txn) (err error) {
151 // delete any existing version keys first (there should only be one, but be safe)
152 verPrf := new(bytes.Buffer)
153 if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) {
154 return
155 }
156 it := txn.NewIterator(badger.IteratorOptions{Prefix: verPrf.Bytes()})
157 defer it.Close()
158 for it.Rewind(); it.Valid(); it.Next() {
159 item := it.Item()
160 key := item.KeyCopy(nil)
161 if err = txn.Delete(key); chk.E(err) {
162 return
163 }
164 }
165
166 // now write the new version key
167 buf := new(bytes.Buffer)
168 vv := new(types.Uint32)
169 vv.Set(ver)
170 if err = indexes.VersionEnc(vv).MarshalWrite(buf); chk.E(err) {
171 return
172 }
173 return txn.Set(buf.Bytes(), nil)
174 },
175 )
176 }
177
178 func (d *D) UpdateWordIndexes() {
179 var err error
180 var wordIndexes [][]byte
181 // iterate all events and generate word index keys from content and tags
182 if err = d.View(
183 func(txn *badger.Txn) (err error) {
184 prf := new(bytes.Buffer)
185 if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
186 return
187 }
188 it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
189 defer it.Close()
190 for it.Rewind(); it.Valid(); it.Next() {
191 item := it.Item()
192 var val []byte
193 if val, err = item.ValueCopy(nil); chk.E(err) {
194 continue
195 }
196 // decode the event
197 ev := new(event.E)
198 if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
199 continue
200 }
201 // log.I.F("updating word indexes for event: %s", ev.Serialize())
202 // read serial from key
203 key := item.Key()
204 ser := indexes.EventVars()
205 if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
206 continue
207 }
208 // collect unique word hashes for this event
209 seen := make(map[string]struct{})
210 // from content
211 if len(ev.Content) > 0 {
212 for _, h := range TokenHashes(ev.Content) {
213 seen[string(h)] = struct{}{}
214 }
215 }
216 // from all tag fields (key and values)
217 if ev.Tags != nil && ev.Tags.Len() > 0 {
218 for _, t := range *ev.Tags {
219 for _, field := range t.T {
220 if len(field) == 0 {
221 continue
222 }
223 for _, h := range TokenHashes(field) {
224 seen[string(h)] = struct{}{}
225 }
226 }
227 }
228 }
229 // build keys
230 for k := range seen {
231 w := new(types.Word)
232 w.FromWord([]byte(k))
233 buf := new(bytes.Buffer)
234 if err = indexes.WordEnc(
235 w, ser,
236 ).MarshalWrite(buf); chk.E(err) {
237 continue
238 }
239 wordIndexes = append(wordIndexes, buf.Bytes())
240 }
241 }
242 return
243 },
244 ); chk.E(err) {
245 return
246 }
247 // sort the indexes for ordered writes
248 sort.Slice(
249 wordIndexes, func(i, j int) bool {
250 return bytes.Compare(
251 wordIndexes[i], wordIndexes[j],
252 ) < 0
253 },
254 )
255 // write in a batch
256 batch := d.NewWriteBatch()
257 for _, v := range wordIndexes {
258 if err = batch.Set(v, nil); chk.E(err) {
259 continue
260 }
261 }
262 _ = batch.Flush()
263 }
264
265 func (d *D) UpdateExpirationTags() {
266 var err error
267 var expIndexes [][]byte
268 // iterate all event records and decode and look for version tags
269 if err = d.View(
270 func(txn *badger.Txn) (err error) {
271 prf := new(bytes.Buffer)
272 if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
273 return
274 }
275 it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
276 defer it.Close()
277 for it.Rewind(); it.Valid(); it.Next() {
278 item := it.Item()
279 var val []byte
280 if val, err = item.ValueCopy(nil); chk.E(err) {
281 continue
282 }
283 // decode the event
284 ev := new(event.E)
285 if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
286 continue
287 }
288 expTag := ev.Tags.GetFirst([]byte("expiration"))
289 if expTag == nil {
290 continue
291 }
292 expTS := ints.New(0)
293 if _, err = expTS.Unmarshal(expTag.Value()); chk.E(err) {
294 continue
295 }
296 key := item.Key()
297 ser := indexes.EventVars()
298 if err = indexes.EventDec(ser).UnmarshalRead(
299 bytes.NewBuffer(key),
300 ); chk.E(err) {
301 continue
302 }
303 // create the expiration tag
304 exp, _ := indexes.ExpirationVars()
305 exp.Set(expTS.N)
306 expBuf := new(bytes.Buffer)
307 if err = indexes.ExpirationEnc(
308 exp, ser,
309 ).MarshalWrite(expBuf); chk.E(err) {
310 continue
311 }
312 expIndexes = append(expIndexes, expBuf.Bytes())
313 }
314 return
315 },
316 ); chk.E(err) {
317 return
318 }
319 // sort the indexes first so they're written in order, improving compaction
320 // and iteration.
321 sort.Slice(
322 expIndexes, func(i, j int) bool {
323 return bytes.Compare(expIndexes[i], expIndexes[j]) < 0
324 },
325 )
326 // write the collected indexes
327 batch := d.NewWriteBatch()
328 for _, v := range expIndexes {
329 if err = batch.Set(v, nil); chk.E(err) {
330 continue
331 }
332 }
333 if err = batch.Flush(); chk.E(err) {
334 return
335 }
336 }
337
338 func (d *D) CleanupEphemeralEvents() {
339 log.I.F("cleaning up ephemeral events (kinds 20000-29999)...")
340 var err error
341 var ephemeralEvents [][]byte
342
343 // iterate all event records and find ephemeral events
344 if err = d.View(
345 func(txn *badger.Txn) (err error) {
346 prf := new(bytes.Buffer)
347 if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
348 return
349 }
350 it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
351 defer it.Close()
352 for it.Rewind(); it.Valid(); it.Next() {
353 item := it.Item()
354 var val []byte
355 if val, err = item.ValueCopy(nil); chk.E(err) {
356 continue
357 }
358 // decode the event
359 ev := new(event.E)
360 if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
361 continue
362 }
363 // check if it's an ephemeral event (kinds 20000-29999)
364 if ev.Kind >= 20000 && ev.Kind <= 29999 {
365 ephemeralEvents = append(ephemeralEvents, ev.ID)
366 }
367 }
368 return
369 },
370 ); chk.E(err) {
371 return
372 }
373
374 // delete all found ephemeral events
375 deletedCount := 0
376 for _, eventId := range ephemeralEvents {
377 if err = d.DeleteEvent(context.Background(), eventId); chk.E(err) {
378 log.W.F("failed to delete ephemeral event %x: %v", eventId, err)
379 continue
380 }
381 deletedCount++
382 }
383
384 log.I.F("cleaned up %d ephemeral events from database", deletedCount)
385 }
386
387 // ConvertSmallEventsToInline migrates small events (<=384 bytes) to inline storage.
388 // This is a Reiser4-inspired optimization that stores small event data in the key itself,
389 // avoiding a second database lookup and improving query performance.
390 // Also handles replaceable and addressable events with specialized storage.
391 func (d *D) ConvertSmallEventsToInline() {
392 log.I.F("converting events to optimized inline storage (Reiser4 optimization)...")
393 var err error
394 const smallEventThreshold = 384
395
396 type EventData struct {
397 Serial uint64
398 EventData []byte
399 OldKey []byte
400 IsReplaceable bool
401 IsAddressable bool
402 Pubkey []byte
403 Kind uint16
404 DTag []byte
405 }
406
407 var events []EventData
408 var convertedCount int
409 var deletedCount int
410
411 // Helper function for counting by predicate
412 countBy := func(events []EventData, predicate func(EventData) bool) int {
413 count := 0
414 for _, e := range events {
415 if predicate(e) {
416 count++
417 }
418 }
419 return count
420 }
421
422 // First pass: identify events in evt table that can benefit from inline storage
423 if err = d.View(
424 func(txn *badger.Txn) (err error) {
425 prf := new(bytes.Buffer)
426 if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
427 return
428 }
429 it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
430 defer it.Close()
431
432 for it.Rewind(); it.Valid(); it.Next() {
433 item := it.Item()
434 var val []byte
435 if val, err = item.ValueCopy(nil); chk.E(err) {
436 continue
437 }
438
439 // Check if event data is small enough for inline storage
440 if len(val) <= smallEventThreshold {
441 // Decode event to check if it's replaceable or addressable
442 ev := new(event.E)
443 if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
444 continue
445 }
446
447 // Extract serial from key
448 key := item.KeyCopy(nil)
449 ser := indexes.EventVars()
450 if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
451 continue
452 }
453
454 eventData := EventData{
455 Serial: ser.Get(),
456 EventData: val,
457 OldKey: key,
458 IsReplaceable: kind.IsReplaceable(ev.Kind),
459 IsAddressable: kind.IsParameterizedReplaceable(ev.Kind),
460 Pubkey: ev.Pubkey,
461 Kind: ev.Kind,
462 }
463
464 // Extract d-tag for addressable events
465 if eventData.IsAddressable {
466 dTag := ev.Tags.GetFirst([]byte("d"))
467 if dTag != nil {
468 eventData.DTag = dTag.Value()
469 }
470 }
471
472 events = append(events, eventData)
473 }
474 }
475 return nil
476 },
477 ); chk.E(err) {
478 return
479 }
480
481 log.I.F("found %d events to convert (%d regular, %d replaceable, %d addressable)",
482 len(events),
483 countBy(events, func(e EventData) bool { return !e.IsReplaceable && !e.IsAddressable }),
484 countBy(events, func(e EventData) bool { return e.IsReplaceable }),
485 countBy(events, func(e EventData) bool { return e.IsAddressable }),
486 )
487
488 // Second pass: convert in batches to avoid large transactions
489 const batchSize = 1000
490 for i := 0; i < len(events); i += batchSize {
491 end := i + batchSize
492 if end > len(events) {
493 end = len(events)
494 }
495 batch := events[i:end]
496
497 // Write new inline keys and delete old keys
498 if err = d.Update(
499 func(txn *badger.Txn) (err error) {
500 for _, e := range batch {
501 // First, write the sev key for serial-based access (all small events)
502 sevKeyBuf := new(bytes.Buffer)
503 ser := new(types.Uint40)
504 if err = ser.Set(e.Serial); chk.E(err) {
505 continue
506 }
507
508 if err = indexes.SmallEventEnc(ser).MarshalWrite(sevKeyBuf); chk.E(err) {
509 continue
510 }
511
512 // Append size as uint16 big-endian (2 bytes)
513 sizeBytes := []byte{byte(len(e.EventData) >> 8), byte(len(e.EventData))}
514 sevKeyBuf.Write(sizeBytes)
515
516 // Append event data
517 sevKeyBuf.Write(e.EventData)
518
519 // Write sev key (no value needed)
520 if err = txn.Set(sevKeyBuf.Bytes(), nil); chk.E(err) {
521 log.W.F("failed to write sev key for serial %d: %v", e.Serial, err)
522 continue
523 }
524 convertedCount++
525
526 // Additionally, for replaceable/addressable events, write specialized keys
527 if e.IsAddressable && len(e.DTag) > 0 {
528 // Addressable event: aev|pubkey_hash|kind|dtag_hash|size|data
529 aevKeyBuf := new(bytes.Buffer)
530 pubHash := new(types.PubHash)
531 pubHash.FromPubkey(e.Pubkey)
532 kindVal := new(types.Uint16)
533 kindVal.Set(e.Kind)
534 dTagHash := new(types.Ident)
535 dTagHash.FromIdent(e.DTag)
536
537 if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(aevKeyBuf); chk.E(err) {
538 continue
539 }
540
541 // Append size and data
542 aevKeyBuf.Write(sizeBytes)
543 aevKeyBuf.Write(e.EventData)
544
545 if err = txn.Set(aevKeyBuf.Bytes(), nil); chk.E(err) {
546 log.W.F("failed to write aev key for serial %d: %v", e.Serial, err)
547 continue
548 }
549 } else if e.IsReplaceable {
550 // Replaceable event: rev|pubkey_hash|kind|size|data
551 revKeyBuf := new(bytes.Buffer)
552 pubHash := new(types.PubHash)
553 pubHash.FromPubkey(e.Pubkey)
554 kindVal := new(types.Uint16)
555 kindVal.Set(e.Kind)
556
557 if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(revKeyBuf); chk.E(err) {
558 continue
559 }
560
561 // Append size and data
562 revKeyBuf.Write(sizeBytes)
563 revKeyBuf.Write(e.EventData)
564
565 if err = txn.Set(revKeyBuf.Bytes(), nil); chk.E(err) {
566 log.W.F("failed to write rev key for serial %d: %v", e.Serial, err)
567 continue
568 }
569 }
570
571 // Delete old evt key
572 if err = txn.Delete(e.OldKey); chk.E(err) {
573 log.W.F("failed to delete old event key for serial %d: %v", e.Serial, err)
574 continue
575 }
576 deletedCount++
577 }
578 return nil
579 },
580 ); chk.E(err) {
581 log.W.F("batch update failed: %v", err)
582 continue
583 }
584
585 if (i/batchSize)%10 == 0 && i > 0 {
586 log.I.F("progress: %d/%d events converted", i, len(events))
587 }
588 }
589
590 log.I.F("migration complete: converted %d events to optimized inline storage, deleted %d old keys", convertedCount, deletedCount)
591 }
592
593 // ReencodeEventsWithOptimizedTags re-encodes all events to use the new binary
594 // tag format that stores e/p tag values as 33-byte binary (32-byte hash + null)
595 // instead of 64-byte hex strings. This reduces memory usage by ~48% for these tags.
596 func (d *D) ReencodeEventsWithOptimizedTags() {
597 log.I.F("re-encoding events with optimized tag binary format...")
598 var err error
599
600 type EventUpdate struct {
601 Key []byte
602 OldData []byte
603 NewData []byte
604 }
605
606 var updates []EventUpdate
607 var processedCount int
608
609 // Helper to collect event updates from iterator
610 // Only processes regular events (evt prefix) - inline storage already benefits
611 collectUpdates := func(it *badger.Iterator, prefix []byte) error {
612 for it.Rewind(); it.Valid(); it.Next() {
613 item := it.Item()
614 key := item.KeyCopy(nil)
615
616 var val []byte
617 if val, err = item.ValueCopy(nil); chk.E(err) {
618 continue
619 }
620
621 // Regular event storage - data is in value
622 eventData := val
623 if len(eventData) == 0 {
624 continue
625 }
626
627 // Decode the event
628 ev := new(event.E)
629 if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); chk.E(err) {
630 continue
631 }
632
633 // Check if this event has e or p tags that could benefit from optimization
634 hasOptimizableTags := false
635 if ev.Tags != nil && ev.Tags.Len() > 0 {
636 for _, t := range *ev.Tags {
637 if t.Len() >= 2 {
638 key := t.Key()
639 if len(key) == 1 && (key[0] == 'e' || key[0] == 'p') {
640 hasOptimizableTags = true
641 break
642 }
643 }
644 }
645 }
646
647 if !hasOptimizableTags {
648 continue
649 }
650
651 // Re-encode the event (this will apply the new tag optimization)
652 newData := ev.MarshalBinaryToBytes(nil)
653
654 // Only update if the data actually changed
655 if !bytes.Equal(eventData, newData) {
656 updates = append(updates, EventUpdate{
657 Key: key,
658 OldData: eventData,
659 NewData: newData,
660 })
661 }
662 }
663 return nil
664 }
665
666 // Only process regular "evt" prefix events (not inline storage)
667 // Inline storage (sev, rev, aev) already benefits from the optimization
668 // because the binary data is stored directly in the key
669 prf := new(bytes.Buffer)
670 if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
671 return
672 }
673 evtPrefix := prf.Bytes()
674
675 // Collect updates from regular events only
676 if err = d.View(func(txn *badger.Txn) error {
677 it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrefix})
678 defer it.Close()
679 return collectUpdates(it, evtPrefix)
680 }); chk.E(err) {
681 return
682 }
683
684 log.I.F("found %d events with e/p tags to re-encode", len(updates))
685
686 if len(updates) == 0 {
687 log.I.F("no events need re-encoding")
688 return
689 }
690
691 // Apply updates in batches
692 const batchSize = 1000
693 for i := 0; i < len(updates); i += batchSize {
694 end := i + batchSize
695 if end > len(updates) {
696 end = len(updates)
697 }
698 batch := updates[i:end]
699
700 if err = d.Update(func(txn *badger.Txn) error {
701 for _, upd := range batch {
702 // Since we're only processing regular events (evt prefix),
703 // we just update the value directly
704 if err = txn.Set(upd.Key, upd.NewData); chk.E(err) {
705 log.W.F("failed to update event: %v", err)
706 continue
707 }
708 processedCount++
709 }
710 return nil
711 }); chk.E(err) {
712 log.W.F("batch update failed: %v", err)
713 continue
714 }
715
716 if (i/batchSize)%10 == 0 && i > 0 {
717 log.I.F("progress: %d/%d events re-encoded", i, len(updates))
718 }
719 }
720
721 savedBytes := 0
722 for _, upd := range updates {
723 savedBytes += len(upd.OldData) - len(upd.NewData)
724 }
725
726 log.I.F("migration complete: re-encoded %d events, saved approximately %d bytes (%.2f KB)",
727 processedCount, savedBytes, float64(savedBytes)/1024.0)
728 }
729
730 // ConvertToCompactEventFormat migrates all existing events to the new compact format.
731 // This format uses 5-byte serial references instead of 32-byte IDs/pubkeys,
732 // dramatically reducing storage requirements (up to 80% savings on ID/pubkey data).
733 //
734 // The migration:
735 // 1. Reads each event from legacy storage (evt/sev prefixes)
736 // 2. Creates SerialEventId mapping (sei prefix) for event ID lookup
737 // 3. Re-encodes the event in compact format
738 // 4. Stores in cmp prefix
739 // 5. Optionally removes legacy storage after successful migration
740 func (d *D) ConvertToCompactEventFormat() {
741 log.I.F("converting events to compact serial-reference format...")
742 var err error
743
744 type EventMigration struct {
745 Serial uint64
746 EventId []byte
747 OldData []byte
748 OldKey []byte
749 IsInline bool // true if from sev, false if from evt
750 }
751
752 var migrations []EventMigration
753 var processedCount int
754 var savedBytes int64
755
756 // Create resolver for compact encoding
757 resolver := NewDatabaseSerialResolver(d, d.serialCache)
758
759 // First pass: collect all events that need migration
760 // Only process events that don't have a cmp entry yet
761 if err = d.View(func(txn *badger.Txn) error {
762 // Process evt (large events) table
763 evtPrf := new(bytes.Buffer)
764 if err = indexes.EventEnc(nil).MarshalWrite(evtPrf); chk.E(err) {
765 return err
766 }
767 it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf.Bytes()})
768 defer it.Close()
769
770 for it.Rewind(); it.Valid(); it.Next() {
771 item := it.Item()
772 key := item.KeyCopy(nil)
773
774 // Extract serial from key
775 ser := indexes.EventVars()
776 if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
777 continue
778 }
779
780 // Check if this event already has a cmp entry
781 cmpKey := new(bytes.Buffer)
782 if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); err == nil {
783 if _, getErr := txn.Get(cmpKey.Bytes()); getErr == nil {
784 // Already migrated
785 continue
786 }
787 }
788
789 var val []byte
790 if val, err = item.ValueCopy(nil); chk.E(err) {
791 continue
792 }
793
794 // Skip if this is already compact format
795 if len(val) > 0 && val[0] == CompactFormatVersion {
796 continue
797 }
798
799 // Decode the event to get the ID
800 ev := new(event.E)
801 if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
802 continue
803 }
804
805 migrations = append(migrations, EventMigration{
806 Serial: ser.Get(),
807 EventId: ev.ID,
808 OldData: val,
809 OldKey: key,
810 IsInline: false,
811 })
812 }
813 it.Close()
814
815 // Process sev (small inline events) table
816 sevPrf := new(bytes.Buffer)
817 if err = indexes.SmallEventEnc(nil).MarshalWrite(sevPrf); chk.E(err) {
818 return err
819 }
820 it2 := txn.NewIterator(badger.IteratorOptions{Prefix: sevPrf.Bytes()})
821 defer it2.Close()
822
823 for it2.Rewind(); it2.Valid(); it2.Next() {
824 item := it2.Item()
825 key := item.KeyCopy(nil)
826
827 // Extract serial and data from inline key
828 if len(key) <= 8+2 {
829 continue
830 }
831
832 // Extract serial
833 ser := new(types.Uint40)
834 if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
835 continue
836 }
837
838 // Check if this event already has a cmp entry
839 cmpKey := new(bytes.Buffer)
840 if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); err == nil {
841 if _, getErr := txn.Get(cmpKey.Bytes()); getErr == nil {
842 // Already migrated
843 continue
844 }
845 }
846
847 // Extract size and data
848 sizeIdx := 8
849 size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
850 dataStart := sizeIdx + 2
851 if len(key) < dataStart+size {
852 continue
853 }
854 eventData := key[dataStart : dataStart+size]
855
856 // Skip if this is already compact format
857 if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
858 continue
859 }
860
861 // Decode the event to get the ID
862 ev := new(event.E)
863 if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); chk.E(err) {
864 continue
865 }
866
867 migrations = append(migrations, EventMigration{
868 Serial: ser.Get(),
869 EventId: ev.ID,
870 OldData: eventData,
871 OldKey: key,
872 IsInline: true,
873 })
874 }
875
876 return nil
877 }); chk.E(err) {
878 return
879 }
880
881 log.I.F("found %d events to convert to compact format", len(migrations))
882
883 if len(migrations) == 0 {
884 log.I.F("no events need conversion")
885 return
886 }
887
888 // Process each event individually to avoid transaction size limits
889 // Some events (like kind 3 follow lists) can be very large
890 for i, m := range migrations {
891 if err = d.Update(func(txn *badger.Txn) error {
892 // Decode the legacy event
893 ev := new(event.E)
894 if err = ev.UnmarshalBinary(bytes.NewBuffer(m.OldData)); chk.E(err) {
895 log.W.F("migration: failed to decode event serial %d: %v", m.Serial, err)
896 return nil // Continue with next event
897 }
898
899 // Store SerialEventId mapping
900 if err = d.StoreEventIdSerial(txn, m.Serial, m.EventId); chk.E(err) {
901 log.W.F("migration: failed to store event ID mapping for serial %d: %v", m.Serial, err)
902 return nil // Continue with next event
903 }
904
905 // Encode in compact format
906 compactData, encErr := MarshalCompactEvent(ev, resolver)
907 if encErr != nil {
908 log.W.F("migration: failed to encode compact event for serial %d: %v", m.Serial, encErr)
909 return nil // Continue with next event
910 }
911
912 // Store compact event
913 ser := new(types.Uint40)
914 if err = ser.Set(m.Serial); chk.E(err) {
915 return nil // Continue with next event
916 }
917 cmpKey := new(bytes.Buffer)
918 if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); chk.E(err) {
919 return nil // Continue with next event
920 }
921 if err = txn.Set(cmpKey.Bytes(), compactData); chk.E(err) {
922 log.W.F("migration: failed to store compact event for serial %d: %v", m.Serial, err)
923 return nil // Continue with next event
924 }
925
926 // Track savings
927 savedBytes += int64(len(m.OldData) - len(compactData))
928 processedCount++
929
930 // Cache the mappings
931 d.serialCache.CacheEventId(m.Serial, m.EventId)
932 return nil
933 }); chk.E(err) {
934 log.W.F("migration failed for event %d: %v", m.Serial, err)
935 continue
936 }
937
938 // Log progress every 1000 events
939 if (i+1)%1000 == 0 {
940 log.I.F("migration progress: %d/%d events converted", i+1, len(migrations))
941 }
942 }
943
944 log.I.F("compact format migration complete: converted %d events, saved approximately %d bytes (%.2f MB)",
945 processedCount, savedBytes, float64(savedBytes)/(1024.0*1024.0))
946
947 // Cleanup legacy storage after successful migration
948 log.I.F("cleaning up legacy event storage (evt/sev prefixes)...")
949 d.CleanupLegacyEventStorage()
950 }
951
952 // CleanupLegacyEventStorage removes legacy evt and sev storage entries after
953 // compact format migration. This reclaims disk space by removing the old storage
954 // format entries once all events have been successfully migrated to cmp format.
955 //
956 // The cleanup:
957 // 1. Iterates through all cmp entries (compact format)
958 // 2. For each serial found in cmp, deletes corresponding evt and sev entries
959 // 3. Reports total bytes reclaimed
960 func (d *D) CleanupLegacyEventStorage() {
961 var err error
962 var cleanedEvt, cleanedSev int
963 var bytesReclaimed int64
964
965 // Collect serials from cmp table
966 var serialsToClean []uint64
967
968 if err = d.View(func(txn *badger.Txn) error {
969 cmpPrf := new(bytes.Buffer)
970 if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
971 return err
972 }
973
974 it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
975 defer it.Close()
976
977 for it.Rewind(); it.Valid(); it.Next() {
978 key := it.Item().Key()
979 // Extract serial from key (prefix 3 bytes + serial 5 bytes)
980 if len(key) >= 8 {
981 ser := new(types.Uint40)
982 if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); err == nil {
983 serialsToClean = append(serialsToClean, ser.Get())
984 }
985 }
986 }
987 return nil
988 }); chk.E(err) {
989 log.W.F("failed to collect compact event serials: %v", err)
990 return
991 }
992
993 log.I.F("found %d compact events to clean up legacy storage for", len(serialsToClean))
994
995 // Clean up in batches
996 const batchSize = 1000
997 for i := 0; i < len(serialsToClean); i += batchSize {
998 end := i + batchSize
999 if end > len(serialsToClean) {
1000 end = len(serialsToClean)
1001 }
1002 batch := serialsToClean[i:end]
1003
1004 if err = d.Update(func(txn *badger.Txn) error {
1005 for _, serial := range batch {
1006 ser := new(types.Uint40)
1007 if err = ser.Set(serial); err != nil {
1008 continue
1009 }
1010
1011 // Try to delete evt entry
1012 evtKeyBuf := new(bytes.Buffer)
1013 if err = indexes.EventEnc(ser).MarshalWrite(evtKeyBuf); err == nil {
1014 item, getErr := txn.Get(evtKeyBuf.Bytes())
1015 if getErr == nil {
1016 // Track size before deleting
1017 bytesReclaimed += int64(item.ValueSize())
1018 if delErr := txn.Delete(evtKeyBuf.Bytes()); delErr == nil {
1019 cleanedEvt++
1020 }
1021 }
1022 }
1023
1024 // Try to delete sev entry (need to iterate with prefix since key includes inline data)
1025 sevKeyBuf := new(bytes.Buffer)
1026 if err = indexes.SmallEventEnc(ser).MarshalWrite(sevKeyBuf); err == nil {
1027 opts := badger.DefaultIteratorOptions
1028 opts.Prefix = sevKeyBuf.Bytes()
1029 it := txn.NewIterator(opts)
1030
1031 it.Rewind()
1032 if it.Valid() {
1033 key := it.Item().KeyCopy(nil)
1034 bytesReclaimed += int64(len(key)) // sev stores data in key
1035 if delErr := txn.Delete(key); delErr == nil {
1036 cleanedSev++
1037 }
1038 }
1039 it.Close()
1040 }
1041 }
1042 return nil
1043 }); chk.E(err) {
1044 log.W.F("batch cleanup failed: %v", err)
1045 continue
1046 }
1047
1048 if (i/batchSize)%10 == 0 && i > 0 {
1049 log.I.F("cleanup progress: %d/%d events processed", i, len(serialsToClean))
1050 }
1051 }
1052
1053 log.I.F("legacy storage cleanup complete: removed %d evt entries, %d sev entries, reclaimed approximately %d bytes (%.2f MB)",
1054 cleanedEvt, cleanedSev, bytesReclaimed, float64(bytesReclaimed)/(1024.0*1024.0))
1055 }
1056
1057 // RebuildWordIndexesWithNormalization rebuilds all word indexes with unicode
1058 // normalization applied. This migration:
1059 // 1. Deletes all existing word indexes (wrd prefix)
1060 // 2. Re-tokenizes all events with normalizeRune() applied
1061 // 3. Creates new consolidated indexes where decorative unicode maps to ASCII
1062 //
1063 // After this migration, "ᴅᴇᴀᴛʜ" (small caps) and "𝔇𝔢𝔞𝔱𝔥" (fraktur) will index
1064 // the same as "death", eliminating duplicate entries and enabling proper search.
1065 func (d *D) RebuildWordIndexesWithNormalization() {
1066 log.I.F("rebuilding word indexes with unicode normalization...")
1067 var err error
1068
1069 // Step 1: Delete all existing word indexes
1070 var deletedCount int
1071 if err = d.Update(func(txn *badger.Txn) error {
1072 wrdPrf := new(bytes.Buffer)
1073 if err = indexes.WordEnc(nil, nil).MarshalWrite(wrdPrf); chk.E(err) {
1074 return err
1075 }
1076
1077 opts := badger.DefaultIteratorOptions
1078 opts.Prefix = wrdPrf.Bytes()
1079 opts.PrefetchValues = false // Keys only for deletion
1080
1081 it := txn.NewIterator(opts)
1082 defer it.Close()
1083
1084 // Collect keys to delete (can't delete during iteration)
1085 var keysToDelete [][]byte
1086 for it.Rewind(); it.Valid(); it.Next() {
1087 keysToDelete = append(keysToDelete, it.Item().KeyCopy(nil))
1088 }
1089
1090 for _, key := range keysToDelete {
1091 if err = txn.Delete(key); err == nil {
1092 deletedCount++
1093 }
1094 }
1095 return nil
1096 }); chk.E(err) {
1097 log.W.F("failed to delete old word indexes: %v", err)
1098 return
1099 }
1100
1101 log.I.F("deleted %d old word index entries", deletedCount)
1102
1103 // Step 2: Rebuild word indexes from all events
1104 // Reuse the existing UpdateWordIndexes logic which now uses normalizeRune
1105 d.UpdateWordIndexes()
1106
1107 log.I.F("word index rebuild with unicode normalization complete")
1108 }
1109
1110 // BackfillETagGraph populates e-tag graph indexes (eeg/gee) for all existing events.
1111 // This enables graph traversal queries for thread/reply discovery.
1112 //
1113 // The migration:
1114 // 1. Iterates all events in compact storage (cmp prefix)
1115 // 2. Extracts e-tags from each event
1116 // 3. For e-tags referencing events we have, creates bidirectional edges:
1117 // - eeg|source|target|kind|direction(out) - forward edge
1118 // - gee|target|kind|direction(in)|source - reverse edge
1119 //
1120 // This is idempotent: running multiple times won't create duplicate edges
1121 // (BadgerDB overwrites existing keys).
1122 func (d *D) BackfillETagGraph() {
1123 log.I.F("backfilling e-tag graph indexes for graph query support...")
1124 var err error
1125
1126 type ETagEdge struct {
1127 SourceSerial *types.Uint40
1128 TargetSerial *types.Uint40
1129 Kind *types.Uint16
1130 }
1131
1132 var edges []ETagEdge
1133 var processedEvents int
1134 var eventsWithETags int
1135 var skippedTargets int
1136
1137 // First pass: collect all e-tag edges from events
1138 if err = d.View(func(txn *badger.Txn) error {
1139 // Iterate compact events (cmp prefix)
1140 cmpPrf := new(bytes.Buffer)
1141 if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
1142 return err
1143 }
1144
1145 it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
1146 defer it.Close()
1147
1148 for it.Rewind(); it.Valid(); it.Next() {
1149 item := it.Item()
1150 key := item.KeyCopy(nil)
1151
1152 // Extract serial from key (prefix 3 bytes + serial 5 bytes)
1153 if len(key) < 8 {
1154 continue
1155 }
1156 sourceSerial := new(types.Uint40)
1157 if err = sourceSerial.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
1158 continue
1159 }
1160
1161 // Get event data
1162 var val []byte
1163 if val, err = item.ValueCopy(nil); chk.E(err) {
1164 continue
1165 }
1166
1167 // Decode the event
1168 // First get the event ID from serial (needed for compact format decoding)
1169 eventId, idErr := d.GetEventIdBySerial(sourceSerial)
1170 if idErr != nil {
1171 continue
1172 }
1173 resolver := NewDatabaseSerialResolver(d, d.serialCache)
1174 ev, decErr := UnmarshalCompactEvent(val, eventId, resolver)
1175 if decErr != nil || ev == nil {
1176 continue
1177 }
1178 processedEvents++
1179
1180 // Extract e-tags
1181 eTags := ev.Tags.GetAll([]byte("e"))
1182 if len(eTags) == 0 {
1183 continue
1184 }
1185 eventsWithETags++
1186
1187 eventKind := new(types.Uint16)
1188 eventKind.Set(ev.Kind)
1189
1190 for _, eTag := range eTags {
1191 if eTag.Len() < 2 {
1192 continue
1193 }
1194
1195 // Get event ID from e-tag
1196 var targetEventID []byte
1197 targetEventID, err = hex.Dec(string(eTag.ValueHex()))
1198 if err != nil || len(targetEventID) != 32 {
1199 continue
1200 }
1201
1202 // Look up target event's serial
1203 targetSerial, lookupErr := d.GetSerialById(targetEventID)
1204 if lookupErr != nil || targetSerial == nil {
1205 // Target event not in our database - skip
1206 skippedTargets++
1207 continue
1208 }
1209
1210 edges = append(edges, ETagEdge{
1211 SourceSerial: sourceSerial,
1212 TargetSerial: targetSerial,
1213 Kind: eventKind,
1214 })
1215 }
1216 }
1217 return nil
1218 }); chk.E(err) {
1219 log.E.F("e-tag graph backfill: failed to collect edges: %v", err)
1220 return
1221 }
1222
1223 log.I.F("e-tag graph backfill: processed %d events, %d with e-tags, found %d edges to create (%d targets not found)",
1224 processedEvents, eventsWithETags, len(edges), skippedTargets)
1225
1226 if len(edges) == 0 {
1227 log.I.F("e-tag graph backfill: no edges to create")
1228 return
1229 }
1230
1231 // Sort edges for ordered writes (improves compaction)
1232 sort.Slice(edges, func(i, j int) bool {
1233 if edges[i].SourceSerial.Get() != edges[j].SourceSerial.Get() {
1234 return edges[i].SourceSerial.Get() < edges[j].SourceSerial.Get()
1235 }
1236 return edges[i].TargetSerial.Get() < edges[j].TargetSerial.Get()
1237 })
1238
1239 // Second pass: write edges in batches
1240 const batchSize = 1000
1241 var createdEdges int
1242
1243 for i := 0; i < len(edges); i += batchSize {
1244 end := i + batchSize
1245 if end > len(edges) {
1246 end = len(edges)
1247 }
1248 batch := edges[i:end]
1249
1250 if err = d.Update(func(txn *badger.Txn) error {
1251 for _, edge := range batch {
1252 // Create forward edge: eeg|source|target|kind|direction(out)
1253 directionOut := new(types.Letter)
1254 directionOut.Set(types.EdgeDirectionETagOut)
1255 keyBuf := new(bytes.Buffer)
1256 if err = indexes.EventEventGraphEnc(edge.SourceSerial, edge.TargetSerial, edge.Kind, directionOut).MarshalWrite(keyBuf); chk.E(err) {
1257 continue
1258 }
1259 if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
1260 continue
1261 }
1262
1263 // Create reverse edge: gee|target|kind|direction(in)|source
1264 directionIn := new(types.Letter)
1265 directionIn.Set(types.EdgeDirectionETagIn)
1266 keyBuf.Reset()
1267 if err = indexes.GraphEventEventEnc(edge.TargetSerial, edge.Kind, directionIn, edge.SourceSerial).MarshalWrite(keyBuf); chk.E(err) {
1268 continue
1269 }
1270 if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
1271 continue
1272 }
1273
1274 createdEdges++
1275 }
1276 return nil
1277 }); chk.E(err) {
1278 log.W.F("e-tag graph backfill: batch write failed: %v", err)
1279 continue
1280 }
1281
1282 if (i/batchSize)%10 == 0 && i > 0 {
1283 log.I.F("e-tag graph backfill progress: %d/%d edges created", i, len(edges))
1284 }
1285 }
1286
1287 log.I.F("e-tag graph backfill complete: created %d bidirectional edges", createdEdges)
1288 }
1289
1290 // BackfillMissingSerialEventIdMappings finds legacy events that were incorrectly
1291 // skipped during the v6 compact format migration and creates their SerialEventId
1292 // mappings. This fixes events whose ID happens to start with byte 0x01, which was
1293 // mistakenly interpreted as CompactFormatVersion during the original migration.
1294 //
1295 // The v6 migration had this check:
1296 //
1297 // if len(eventData) > 0 && eventData[0] == CompactFormatVersion { continue }
1298 //
1299 // This caused legacy events with IDs starting with 0x01 to be skipped, leaving
1300 // them without sei mappings and causing "Key not found" errors when fetching.
1301 func (d *D) BackfillMissingSerialEventIdMappings() {
1302 log.I.F("backfilling missing SerialEventId mappings for legacy events...")
1303 var err error
1304
1305 type LegacyEvent struct {
1306 Serial uint64
1307 EventData []byte
1308 IsInline bool // true if from sev, false if from evt
1309 }
1310
1311 var legacyEvents []LegacyEvent
1312 var alreadyHasMapping int
1313 var skippedCompact int
1314
1315 // First pass: find legacy events that don't have sei mappings
1316 if err = d.View(func(txn *badger.Txn) error {
1317 // Process evt (large events) table
1318 evtPrf := new(bytes.Buffer)
1319 if err = indexes.EventEnc(nil).MarshalWrite(evtPrf); chk.E(err) {
1320 return err
1321 }
1322 it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf.Bytes()})
1323 defer it.Close()
1324
1325 for it.Rewind(); it.Valid(); it.Next() {
1326 item := it.Item()
1327 key := item.KeyCopy(nil)
1328
1329 // Extract serial from key
1330 ser := indexes.EventVars()
1331 if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
1332 continue
1333 }
1334
1335 // Check if this serial already has an sei mapping
1336 seiKey := new(bytes.Buffer)
1337 if err = indexes.SerialEventIdEnc(ser).MarshalWrite(seiKey); err == nil {
1338 if _, getErr := txn.Get(seiKey.Bytes()); getErr == nil {
1339 // Already has mapping
1340 alreadyHasMapping++
1341 continue
1342 }
1343 }
1344
1345 var val []byte
1346 if val, err = item.ValueCopy(nil); chk.E(err) {
1347 continue
1348 }
1349
1350 // Only process if first byte is 0x01 (these were the ones skipped)
1351 // Events with other first bytes would have been migrated correctly
1352 if len(val) == 0 || val[0] != CompactFormatVersion {
1353 continue
1354 }
1355
1356 // Verify this is actually legacy format by checking size
1357 // Legacy format: 32 (ID) + 32 (Pubkey) + varints + 64 (Sig) = ~135+ bytes minimum
1358 // Compact format is smaller, typically < 100 bytes for simple events
1359 if len(val) < 130 {
1360 // Likely actually compact format, skip
1361 skippedCompact++
1362 continue
1363 }
1364
1365 legacyEvents = append(legacyEvents, LegacyEvent{
1366 Serial: ser.Get(),
1367 EventData: val,
1368 IsInline: false,
1369 })
1370 }
1371 it.Close()
1372
1373 // Process sev (small inline events) table
1374 sevPrf := new(bytes.Buffer)
1375 if err = indexes.SmallEventEnc(nil).MarshalWrite(sevPrf); chk.E(err) {
1376 return err
1377 }
1378 it2 := txn.NewIterator(badger.IteratorOptions{Prefix: sevPrf.Bytes()})
1379 defer it2.Close()
1380
1381 for it2.Rewind(); it2.Valid(); it2.Next() {
1382 item := it2.Item()
1383 key := item.KeyCopy(nil)
1384
1385 // Extract serial and data from inline key
1386 if len(key) <= 8+2 {
1387 continue
1388 }
1389
1390 // Extract serial
1391 ser := new(types.Uint40)
1392 if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
1393 continue
1394 }
1395
1396 // Check if this serial already has an sei mapping
1397 seiKey := new(bytes.Buffer)
1398 if err = indexes.SerialEventIdEnc(ser).MarshalWrite(seiKey); err == nil {
1399 if _, getErr := txn.Get(seiKey.Bytes()); getErr == nil {
1400 // Already has mapping
1401 alreadyHasMapping++
1402 continue
1403 }
1404 }
1405
1406 // Extract size and data
1407 sizeIdx := 8
1408 size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
1409 dataStart := sizeIdx + 2
1410 if len(key) < dataStart+size {
1411 continue
1412 }
1413 eventData := key[dataStart : dataStart+size]
1414
1415 // Only process if first byte is 0x01
1416 if len(eventData) == 0 || eventData[0] != CompactFormatVersion {
1417 continue
1418 }
1419
1420 // Verify this is actually legacy format by checking size
1421 if len(eventData) < 130 {
1422 skippedCompact++
1423 continue
1424 }
1425
1426 legacyEvents = append(legacyEvents, LegacyEvent{
1427 Serial: ser.Get(),
1428 EventData: eventData,
1429 IsInline: true,
1430 })
1431 }
1432
1433 return nil
1434 }); chk.E(err) {
1435 log.E.F("failed to scan for legacy events: %v", err)
1436 return
1437 }
1438
1439 log.I.F("found %d legacy events needing sei mappings (%d already have mappings, %d skipped as compact)",
1440 len(legacyEvents), alreadyHasMapping, skippedCompact)
1441
1442 if len(legacyEvents) == 0 {
1443 log.I.F("no legacy events need sei mapping backfill")
1444 return
1445 }
1446
1447 // Create resolver for potential compact conversion
1448 resolver := NewDatabaseSerialResolver(d, d.serialCache)
1449
1450 // Process each event
1451 var successCount, failCount int
1452 var convertedToCompact int
1453
1454 for i, le := range legacyEvents {
1455 if err = d.Update(func(txn *badger.Txn) error {
1456 // Decode the legacy event to get the ID
1457 ev := new(event.E)
1458 if err = ev.UnmarshalBinary(bytes.NewBuffer(le.EventData)); err != nil {
1459 log.D.F("backfill: failed to decode event serial %d as legacy format: %v", le.Serial, err)
1460 failCount++
1461 return nil // Continue with next event
1462 }
1463
1464 // Verify the event ID actually starts with 0x01
1465 if len(ev.ID) < 1 || ev.ID[0] != 0x01 {
1466 log.D.F("backfill: event serial %d doesn't have ID starting with 0x01, skipping", le.Serial)
1467 return nil
1468 }
1469
1470 // Store SerialEventId mapping
1471 if err = d.StoreEventIdSerial(txn, le.Serial, ev.ID); chk.E(err) {
1472 log.W.F("backfill: failed to store sei mapping for serial %d: %v", le.Serial, err)
1473 failCount++
1474 return nil
1475 }
1476
1477 // Cache the mapping
1478 d.serialCache.CacheEventId(le.Serial, ev.ID)
1479
1480 // Also convert to compact format if not already done
1481 ser := new(types.Uint40)
1482 if err = ser.Set(le.Serial); err != nil {
1483 successCount++
1484 return nil
1485 }
1486
1487 // Check if cmp entry exists
1488 cmpKey := new(bytes.Buffer)
1489 if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); err == nil {
1490 if _, getErr := txn.Get(cmpKey.Bytes()); getErr == nil {
1491 // Already has compact entry
1492 successCount++
1493 return nil
1494 }
1495 }
1496
1497 // Create compact format entry
1498 compactData, encErr := MarshalCompactEvent(ev, resolver)
1499 if encErr != nil {
1500 log.D.F("backfill: failed to encode compact event for serial %d: %v", le.Serial, encErr)
1501 successCount++ // sei mapping was successful, just couldn't convert
1502 return nil
1503 }
1504
1505 if err = txn.Set(cmpKey.Bytes(), compactData); chk.E(err) {
1506 log.D.F("backfill: failed to store compact event for serial %d: %v", le.Serial, err)
1507 successCount++ // sei mapping was successful
1508 return nil
1509 }
1510
1511 convertedToCompact++
1512 successCount++
1513 return nil
1514 }); chk.E(err) {
1515 log.W.F("backfill: transaction failed for serial %d: %v", le.Serial, err)
1516 failCount++
1517 continue
1518 }
1519
1520 // Log progress every 1000 events
1521 if (i+1)%1000 == 0 {
1522 log.I.F("backfill progress: %d/%d events processed", i+1, len(legacyEvents))
1523 }
1524 }
1525
1526 log.I.F("SerialEventId backfill complete: %d successful, %d failed, %d also converted to compact format",
1527 successCount, failCount, convertedToCompact)
1528 }
1529
1530 // BackfillPubkeyPubkeyGraph populates pubkey-to-pubkey (noun-noun) graph indexes
1531 // (ppg/gpp) for all existing events that contain p-tags.
1532 // This materializes direct pubkey→pubkey edges, collapsing the two-hop
1533 // pubkey→event→pubkey traversal into a single-hop lookup.
1534 //
1535 // For each event with p-tags, creates bidirectional edges:
1536 // - ppg|author_serial|target_serial|kind|direction(out)|event_serial - forward edge
1537 // - gpp|target_serial|kind|direction(in)|author_serial|event_serial - reverse edge
1538 //
1539 // This is idempotent: running multiple times won't create duplicate edges.
1540 func (d *D) BackfillPubkeyPubkeyGraph() {
1541 log.I.F("backfilling pubkey-to-pubkey graph indexes...")
1542 var err error
1543
1544 type PubkeyEdge struct {
1545 AuthorSerial *types.Uint40
1546 TargetSerial *types.Uint40
1547 Kind *types.Uint16
1548 EventSerial *types.Uint40
1549 }
1550
1551 var edges []PubkeyEdge
1552 var processedEvents int
1553 var eventsWithPTags int
1554 var skippedPubkeys int
1555
1556 // First pass: collect all pubkey-pubkey edges from events
1557 if err = d.View(func(txn *badger.Txn) error {
1558 // Iterate compact events (cmp prefix)
1559 cmpPrf := new(bytes.Buffer)
1560 if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
1561 return err
1562 }
1563
1564 it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
1565 defer it.Close()
1566
1567 for it.Rewind(); it.Valid(); it.Next() {
1568 item := it.Item()
1569 key := item.KeyCopy(nil)
1570
1571 // Extract serial from key (prefix 3 bytes + serial 5 bytes)
1572 if len(key) < 8 {
1573 continue
1574 }
1575 eventSerial := new(types.Uint40)
1576 if err = eventSerial.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
1577 continue
1578 }
1579
1580 // Get event data
1581 var val []byte
1582 if val, err = item.ValueCopy(nil); chk.E(err) {
1583 continue
1584 }
1585
1586 // Decode the event
1587 eventId, idErr := d.GetEventIdBySerial(eventSerial)
1588 if idErr != nil {
1589 continue
1590 }
1591 resolver := NewDatabaseSerialResolver(d, d.serialCache)
1592 ev, decErr := UnmarshalCompactEvent(val, eventId, resolver)
1593 if decErr != nil || ev == nil {
1594 continue
1595 }
1596 processedEvents++
1597
1598 // Extract p-tags
1599 pTags := ev.Tags.GetAll([]byte("p"))
1600 if len(pTags) == 0 {
1601 continue
1602 }
1603 eventsWithPTags++
1604
1605 // Get author pubkey serial
1606 authorSerial, authErr := d.GetOrCreatePubkeySerial(ev.Pubkey)
1607 if authErr != nil {
1608 continue
1609 }
1610
1611 eventKind := new(types.Uint16)
1612 eventKind.Set(ev.Kind)
1613
1614 for _, pTag := range pTags {
1615 if pTag.Len() < 2 {
1616 continue
1617 }
1618
1619 // Get pubkey from p-tag
1620 var targetPubkey []byte
1621 targetPubkey, err = hex.Dec(string(pTag.ValueHex()))
1622 if err != nil || len(targetPubkey) != 32 {
1623 continue
1624 }
1625
1626 // Get or create target pubkey serial
1627 targetSerial, lookupErr := d.GetOrCreatePubkeySerial(targetPubkey)
1628 if lookupErr != nil {
1629 skippedPubkeys++
1630 continue
1631 }
1632
1633 // Skip self-references
1634 if authorSerial.Get() == targetSerial.Get() {
1635 continue
1636 }
1637
1638 // Clone serials for storage in slice
1639 asSer := new(types.Uint40)
1640 asSer.Set(authorSerial.Get())
1641 tsSer := new(types.Uint40)
1642 tsSer.Set(targetSerial.Get())
1643 esSer := new(types.Uint40)
1644 esSer.Set(eventSerial.Get())
1645 ekKind := new(types.Uint16)
1646 ekKind.Set(ev.Kind)
1647
1648 edges = append(edges, PubkeyEdge{
1649 AuthorSerial: asSer,
1650 TargetSerial: tsSer,
1651 Kind: ekKind,
1652 EventSerial: esSer,
1653 })
1654 }
1655 }
1656 return nil
1657 }); chk.E(err) {
1658 log.E.F("pubkey-pubkey graph backfill: failed to collect edges: %v", err)
1659 return
1660 }
1661
1662 log.I.F("pubkey-pubkey graph backfill: processed %d events, %d with p-tags, found %d edges to create (%d pubkeys failed)",
1663 processedEvents, eventsWithPTags, len(edges), skippedPubkeys)
1664
1665 if len(edges) == 0 {
1666 log.I.F("pubkey-pubkey graph backfill: no edges to create")
1667 return
1668 }
1669
1670 // Sort edges for ordered writes (improves compaction)
1671 sort.Slice(edges, func(i, j int) bool {
1672 if edges[i].AuthorSerial.Get() != edges[j].AuthorSerial.Get() {
1673 return edges[i].AuthorSerial.Get() < edges[j].AuthorSerial.Get()
1674 }
1675 return edges[i].TargetSerial.Get() < edges[j].TargetSerial.Get()
1676 })
1677
1678 // Second pass: write edges in batches
1679 const batchSize = 1000
1680 var createdEdges int
1681
1682 for i := 0; i < len(edges); i += batchSize {
1683 end := i + batchSize
1684 if end > len(edges) {
1685 end = len(edges)
1686 }
1687 batch := edges[i:end]
1688
1689 if err = d.Update(func(txn *badger.Txn) error {
1690 for _, edge := range batch {
1691 // Forward edge: ppg|author|target|kind|direction(out)|event_serial
1692 directionOut := new(types.Letter)
1693 directionOut.Set(types.EdgeDirectionPubkeyOut)
1694 keyBuf := new(bytes.Buffer)
1695 if err = indexes.PubkeyPubkeyGraphEnc(edge.AuthorSerial, edge.TargetSerial, edge.Kind, directionOut, edge.EventSerial).MarshalWrite(keyBuf); chk.E(err) {
1696 continue
1697 }
1698 if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
1699 continue
1700 }
1701
1702 // Reverse edge: gpp|target|kind|direction(in)|author|event_serial
1703 directionIn := new(types.Letter)
1704 directionIn.Set(types.EdgeDirectionPubkeyIn)
1705 keyBuf.Reset()
1706 if err = indexes.GraphPubkeyPubkeyEnc(edge.TargetSerial, edge.Kind, directionIn, edge.AuthorSerial, edge.EventSerial).MarshalWrite(keyBuf); chk.E(err) {
1707 continue
1708 }
1709 if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
1710 continue
1711 }
1712
1713 createdEdges++
1714 }
1715 return nil
1716 }); chk.E(err) {
1717 log.W.F("pubkey-pubkey graph backfill: batch write failed: %v", err)
1718 continue
1719 }
1720
1721 if (i/batchSize)%10 == 0 && i > 0 {
1722 log.I.F("pubkey-pubkey graph backfill progress: %d/%d edges created", i, len(edges))
1723 }
1724 }
1725
1726 log.I.F("pubkey-pubkey graph backfill complete: created %d bidirectional edges", createdEdges)
1727 }
1728