migrations.go raw

   1  //go:build !(js && wasm)
   2  
   3  package database
   4  
   5  import (
   6  	"bytes"
   7  	"context"
   8  	"sort"
   9  
  10  	"github.com/dgraph-io/badger/v4"
  11  	"next.orly.dev/pkg/lol/chk"
  12  	"next.orly.dev/pkg/lol/log"
  13  	"next.orly.dev/pkg/database/indexes"
  14  	"next.orly.dev/pkg/database/indexes/types"
  15  	"next.orly.dev/pkg/nostr/encoders/event"
  16  	"next.orly.dev/pkg/nostr/encoders/hex"
  17  	"next.orly.dev/pkg/nostr/encoders/ints"
  18  	"next.orly.dev/pkg/nostr/encoders/kind"
  19  )
  20  
  21  const (
  22  	currentVersion uint32 = 10
  23  )
  24  
  25  func (d *D) RunMigrations() {
  26  	var err error
  27  	var dbVersion uint32
  28  	// first find the current version tag if any
  29  	if err = d.View(
  30  		func(txn *badger.Txn) (err error) {
  31  			buf := new(bytes.Buffer)
  32  			if err = indexes.VersionEnc(nil).MarshalWrite(buf); chk.E(err) {
  33  				return
  34  			}
  35  			verPrf := new(bytes.Buffer)
  36  			if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) {
  37  				return
  38  			}
  39  			it := txn.NewIterator(
  40  				badger.IteratorOptions{
  41  					Prefix: verPrf.Bytes(),
  42  				},
  43  			)
  44  			defer it.Close()
  45  			ver := indexes.VersionVars()
  46  			for it.Rewind(); it.Valid(); it.Next() {
  47  				// there should only be one
  48  				item := it.Item()
  49  				key := item.Key()
  50  				if err = indexes.VersionDec(ver).UnmarshalRead(
  51  					bytes.NewBuffer(key),
  52  				); chk.E(err) {
  53  					return
  54  				}
  55  				log.I.F("found version tag: %d", ver.Get())
  56  				dbVersion = ver.Get()
  57  			}
  58  			return
  59  		},
  60  	); chk.E(err) {
  61  	}
  62  	if dbVersion == 0 {
  63  		// write the version tag now (ensure any old tags are removed first)
  64  		if err = d.writeVersionTag(currentVersion); chk.E(err) {
  65  			return
  66  		}
  67  	}
  68  	if dbVersion < 1 {
  69  		log.I.F("migrating to version 1...")
  70  		// the first migration is expiration tags
  71  		d.UpdateExpirationTags()
  72  		// bump to version 1
  73  		_ = d.writeVersionTag(1)
  74  	}
  75  	if dbVersion < 2 {
  76  		log.I.F("migrating to version 2...")
  77  		// backfill word indexes
  78  		d.UpdateWordIndexes()
  79  		// bump to version 2
  80  		_ = d.writeVersionTag(2)
  81  	}
  82  	if dbVersion < 3 {
  83  		log.I.F("migrating to version 3...")
  84  		// clean up ephemeral events that should never have been stored
  85  		d.CleanupEphemeralEvents()
  86  		// bump to version 3
  87  		_ = d.writeVersionTag(3)
  88  	}
  89  	if dbVersion < 4 {
  90  		log.I.F("migrating to version 4...")
  91  		// convert small events to inline storage (Reiser4 optimization)
  92  		d.ConvertSmallEventsToInline()
  93  		// bump to version 4
  94  		_ = d.writeVersionTag(4)
  95  	}
  96  	if dbVersion < 5 {
  97  		log.I.F("migrating to version 5...")
  98  		// re-encode events with optimized tag binary format (e/p tags)
  99  		d.ReencodeEventsWithOptimizedTags()
 100  		// bump to version 5
 101  		_ = d.writeVersionTag(5)
 102  	}
 103  	if dbVersion < 6 {
 104  		log.I.F("migrating to version 6...")
 105  		// convert events to compact serial-reference format
 106  		// This replaces 32-byte IDs/pubkeys with 5-byte serial references
 107  		d.ConvertToCompactEventFormat()
 108  		// bump to version 6
 109  		_ = d.writeVersionTag(6)
 110  	}
 111  	if dbVersion < 7 {
 112  		log.I.F("migrating to version 7...")
 113  		// Rebuild word indexes with unicode normalization (small caps, fraktur → ASCII)
 114  		// This consolidates duplicate indexes from decorative unicode text
 115  		d.RebuildWordIndexesWithNormalization()
 116  		// bump to version 7
 117  		_ = d.writeVersionTag(7)
 118  	}
 119  	if dbVersion < 8 {
 120  		log.I.F("migrating to version 8...")
 121  		// Backfill e-tag graph indexes (eeg/gee) for graph query support
 122  		// This creates edges for all existing events with e-tags
 123  		d.BackfillETagGraph()
 124  		// bump to version 8
 125  		_ = d.writeVersionTag(8)
 126  	}
 127  	if dbVersion < 9 {
 128  		log.I.F("migrating to version 9...")
 129  		// Backfill SerialEventId mappings for legacy events that were skipped
 130  		// during the v6 compact format migration because their ID starts with 0x01
 131  		// (which was mistakenly interpreted as CompactFormatVersion)
 132  		d.BackfillMissingSerialEventIdMappings()
 133  		// bump to version 9
 134  		_ = d.writeVersionTag(9)
 135  	}
 136  	if dbVersion < 10 {
 137  		log.I.F("migrating to version 10...")
 138  		// Backfill pubkey-to-pubkey (noun-noun) graph indexes (ppg/gpp)
 139  		// This materializes direct pubkey→pubkey edges from p-tag relationships,
 140  		// collapsing the two-hop pubkey→event→pubkey traversal into single-hop lookups
 141  		d.BackfillPubkeyPubkeyGraph()
 142  		// bump to version 10
 143  		_ = d.writeVersionTag(10)
 144  	}
 145  }
 146  
 147  // writeVersionTag writes a new version tag key to the database (no value)
 148  func (d *D) writeVersionTag(ver uint32) (err error) {
 149  	return d.Update(
 150  		func(txn *badger.Txn) (err error) {
 151  			// delete any existing version keys first (there should only be one, but be safe)
 152  			verPrf := new(bytes.Buffer)
 153  			if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) {
 154  				return
 155  			}
 156  			it := txn.NewIterator(badger.IteratorOptions{Prefix: verPrf.Bytes()})
 157  			defer it.Close()
 158  			for it.Rewind(); it.Valid(); it.Next() {
 159  				item := it.Item()
 160  				key := item.KeyCopy(nil)
 161  				if err = txn.Delete(key); chk.E(err) {
 162  					return
 163  				}
 164  			}
 165  
 166  			// now write the new version key
 167  			buf := new(bytes.Buffer)
 168  			vv := new(types.Uint32)
 169  			vv.Set(ver)
 170  			if err = indexes.VersionEnc(vv).MarshalWrite(buf); chk.E(err) {
 171  				return
 172  			}
 173  			return txn.Set(buf.Bytes(), nil)
 174  		},
 175  	)
 176  }
 177  
 178  func (d *D) UpdateWordIndexes() {
 179  	var err error
 180  	var wordIndexes [][]byte
 181  	// iterate all events and generate word index keys from content and tags
 182  	if err = d.View(
 183  		func(txn *badger.Txn) (err error) {
 184  			prf := new(bytes.Buffer)
 185  			if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
 186  				return
 187  			}
 188  			it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
 189  			defer it.Close()
 190  			for it.Rewind(); it.Valid(); it.Next() {
 191  				item := it.Item()
 192  				var val []byte
 193  				if val, err = item.ValueCopy(nil); chk.E(err) {
 194  					continue
 195  				}
 196  				// decode the event
 197  				ev := new(event.E)
 198  				if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
 199  					continue
 200  				}
 201  				// log.I.F("updating word indexes for event: %s", ev.Serialize())
 202  				// read serial from key
 203  				key := item.Key()
 204  				ser := indexes.EventVars()
 205  				if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
 206  					continue
 207  				}
 208  				// collect unique word hashes for this event
 209  				seen := make(map[string]struct{})
 210  				// from content
 211  				if len(ev.Content) > 0 {
 212  					for _, h := range TokenHashes(ev.Content) {
 213  						seen[string(h)] = struct{}{}
 214  					}
 215  				}
 216  				// from all tag fields (key and values)
 217  				if ev.Tags != nil && ev.Tags.Len() > 0 {
 218  					for _, t := range *ev.Tags {
 219  						for _, field := range t.T {
 220  							if len(field) == 0 {
 221  								continue
 222  							}
 223  							for _, h := range TokenHashes(field) {
 224  								seen[string(h)] = struct{}{}
 225  							}
 226  						}
 227  					}
 228  				}
 229  				// build keys
 230  				for k := range seen {
 231  					w := new(types.Word)
 232  					w.FromWord([]byte(k))
 233  					buf := new(bytes.Buffer)
 234  					if err = indexes.WordEnc(
 235  						w, ser,
 236  					).MarshalWrite(buf); chk.E(err) {
 237  						continue
 238  					}
 239  					wordIndexes = append(wordIndexes, buf.Bytes())
 240  				}
 241  			}
 242  			return
 243  		},
 244  	); chk.E(err) {
 245  		return
 246  	}
 247  	// sort the indexes for ordered writes
 248  	sort.Slice(
 249  		wordIndexes, func(i, j int) bool {
 250  			return bytes.Compare(
 251  				wordIndexes[i], wordIndexes[j],
 252  			) < 0
 253  		},
 254  	)
 255  	// write in a batch
 256  	batch := d.NewWriteBatch()
 257  	for _, v := range wordIndexes {
 258  		if err = batch.Set(v, nil); chk.E(err) {
 259  			continue
 260  		}
 261  	}
 262  	_ = batch.Flush()
 263  }
 264  
 265  func (d *D) UpdateExpirationTags() {
 266  	var err error
 267  	var expIndexes [][]byte
 268  	// iterate all event records and decode and look for version tags
 269  	if err = d.View(
 270  		func(txn *badger.Txn) (err error) {
 271  			prf := new(bytes.Buffer)
 272  			if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
 273  				return
 274  			}
 275  			it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
 276  			defer it.Close()
 277  			for it.Rewind(); it.Valid(); it.Next() {
 278  				item := it.Item()
 279  				var val []byte
 280  				if val, err = item.ValueCopy(nil); chk.E(err) {
 281  					continue
 282  				}
 283  				// decode the event
 284  				ev := new(event.E)
 285  				if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
 286  					continue
 287  				}
 288  				expTag := ev.Tags.GetFirst([]byte("expiration"))
 289  				if expTag == nil {
 290  					continue
 291  				}
 292  				expTS := ints.New(0)
 293  				if _, err = expTS.Unmarshal(expTag.Value()); chk.E(err) {
 294  					continue
 295  				}
 296  				key := item.Key()
 297  				ser := indexes.EventVars()
 298  				if err = indexes.EventDec(ser).UnmarshalRead(
 299  					bytes.NewBuffer(key),
 300  				); chk.E(err) {
 301  					continue
 302  				}
 303  				// create the expiration tag
 304  				exp, _ := indexes.ExpirationVars()
 305  				exp.Set(expTS.N)
 306  				expBuf := new(bytes.Buffer)
 307  				if err = indexes.ExpirationEnc(
 308  					exp, ser,
 309  				).MarshalWrite(expBuf); chk.E(err) {
 310  					continue
 311  				}
 312  				expIndexes = append(expIndexes, expBuf.Bytes())
 313  			}
 314  			return
 315  		},
 316  	); chk.E(err) {
 317  		return
 318  	}
 319  	// sort the indexes first so they're written in order, improving compaction
 320  	// and iteration.
 321  	sort.Slice(
 322  		expIndexes, func(i, j int) bool {
 323  			return bytes.Compare(expIndexes[i], expIndexes[j]) < 0
 324  		},
 325  	)
 326  	// write the collected indexes
 327  	batch := d.NewWriteBatch()
 328  	for _, v := range expIndexes {
 329  		if err = batch.Set(v, nil); chk.E(err) {
 330  			continue
 331  		}
 332  	}
 333  	if err = batch.Flush(); chk.E(err) {
 334  		return
 335  	}
 336  }
 337  
 338  func (d *D) CleanupEphemeralEvents() {
 339  	log.I.F("cleaning up ephemeral events (kinds 20000-29999)...")
 340  	var err error
 341  	var ephemeralEvents [][]byte
 342  	
 343  	// iterate all event records and find ephemeral events
 344  	if err = d.View(
 345  		func(txn *badger.Txn) (err error) {
 346  			prf := new(bytes.Buffer)
 347  			if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
 348  				return
 349  			}
 350  			it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
 351  			defer it.Close()
 352  			for it.Rewind(); it.Valid(); it.Next() {
 353  				item := it.Item()
 354  				var val []byte
 355  				if val, err = item.ValueCopy(nil); chk.E(err) {
 356  					continue
 357  				}
 358  				// decode the event
 359  				ev := new(event.E)
 360  				if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
 361  					continue
 362  				}
 363  				// check if it's an ephemeral event (kinds 20000-29999)
 364  				if ev.Kind >= 20000 && ev.Kind <= 29999 {
 365  					ephemeralEvents = append(ephemeralEvents, ev.ID)
 366  				}
 367  			}
 368  			return
 369  		},
 370  	); chk.E(err) {
 371  		return
 372  	}
 373  	
 374  	// delete all found ephemeral events
 375  	deletedCount := 0
 376  	for _, eventId := range ephemeralEvents {
 377  		if err = d.DeleteEvent(context.Background(), eventId); chk.E(err) {
 378  			log.W.F("failed to delete ephemeral event %x: %v", eventId, err)
 379  			continue
 380  		}
 381  		deletedCount++
 382  	}
 383  	
 384  	log.I.F("cleaned up %d ephemeral events from database", deletedCount)
 385  }
 386  
 387  // ConvertSmallEventsToInline migrates small events (<=384 bytes) to inline storage.
 388  // This is a Reiser4-inspired optimization that stores small event data in the key itself,
 389  // avoiding a second database lookup and improving query performance.
 390  // Also handles replaceable and addressable events with specialized storage.
 391  func (d *D) ConvertSmallEventsToInline() {
 392  	log.I.F("converting events to optimized inline storage (Reiser4 optimization)...")
 393  	var err error
 394  	const smallEventThreshold = 384
 395  
 396  	type EventData struct {
 397  		Serial          uint64
 398  		EventData       []byte
 399  		OldKey          []byte
 400  		IsReplaceable   bool
 401  		IsAddressable   bool
 402  		Pubkey          []byte
 403  		Kind            uint16
 404  		DTag            []byte
 405  	}
 406  
 407  	var events []EventData
 408  	var convertedCount int
 409  	var deletedCount int
 410  
 411  	// Helper function for counting by predicate
 412  	countBy := func(events []EventData, predicate func(EventData) bool) int {
 413  		count := 0
 414  		for _, e := range events {
 415  			if predicate(e) {
 416  				count++
 417  			}
 418  		}
 419  		return count
 420  	}
 421  
 422  	// First pass: identify events in evt table that can benefit from inline storage
 423  	if err = d.View(
 424  		func(txn *badger.Txn) (err error) {
 425  			prf := new(bytes.Buffer)
 426  			if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
 427  				return
 428  			}
 429  			it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
 430  			defer it.Close()
 431  
 432  			for it.Rewind(); it.Valid(); it.Next() {
 433  				item := it.Item()
 434  				var val []byte
 435  				if val, err = item.ValueCopy(nil); chk.E(err) {
 436  					continue
 437  				}
 438  
 439  				// Check if event data is small enough for inline storage
 440  				if len(val) <= smallEventThreshold {
 441  					// Decode event to check if it's replaceable or addressable
 442  					ev := new(event.E)
 443  					if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
 444  						continue
 445  					}
 446  
 447  					// Extract serial from key
 448  					key := item.KeyCopy(nil)
 449  					ser := indexes.EventVars()
 450  					if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
 451  						continue
 452  					}
 453  
 454  					eventData := EventData{
 455  						Serial:        ser.Get(),
 456  						EventData:     val,
 457  						OldKey:        key,
 458  						IsReplaceable: kind.IsReplaceable(ev.Kind),
 459  						IsAddressable: kind.IsParameterizedReplaceable(ev.Kind),
 460  						Pubkey:        ev.Pubkey,
 461  						Kind:          ev.Kind,
 462  					}
 463  
 464  					// Extract d-tag for addressable events
 465  					if eventData.IsAddressable {
 466  						dTag := ev.Tags.GetFirst([]byte("d"))
 467  						if dTag != nil {
 468  							eventData.DTag = dTag.Value()
 469  						}
 470  					}
 471  
 472  					events = append(events, eventData)
 473  				}
 474  			}
 475  			return nil
 476  		},
 477  	); chk.E(err) {
 478  		return
 479  	}
 480  
 481  	log.I.F("found %d events to convert (%d regular, %d replaceable, %d addressable)",
 482  		len(events),
 483  		countBy(events, func(e EventData) bool { return !e.IsReplaceable && !e.IsAddressable }),
 484  		countBy(events, func(e EventData) bool { return e.IsReplaceable }),
 485  		countBy(events, func(e EventData) bool { return e.IsAddressable }),
 486  	)
 487  
 488  	// Second pass: convert in batches to avoid large transactions
 489  	const batchSize = 1000
 490  	for i := 0; i < len(events); i += batchSize {
 491  		end := i + batchSize
 492  		if end > len(events) {
 493  			end = len(events)
 494  		}
 495  		batch := events[i:end]
 496  
 497  		// Write new inline keys and delete old keys
 498  		if err = d.Update(
 499  			func(txn *badger.Txn) (err error) {
 500  				for _, e := range batch {
 501  					// First, write the sev key for serial-based access (all small events)
 502  					sevKeyBuf := new(bytes.Buffer)
 503  					ser := new(types.Uint40)
 504  					if err = ser.Set(e.Serial); chk.E(err) {
 505  						continue
 506  					}
 507  
 508  					if err = indexes.SmallEventEnc(ser).MarshalWrite(sevKeyBuf); chk.E(err) {
 509  						continue
 510  					}
 511  
 512  					// Append size as uint16 big-endian (2 bytes)
 513  					sizeBytes := []byte{byte(len(e.EventData) >> 8), byte(len(e.EventData))}
 514  					sevKeyBuf.Write(sizeBytes)
 515  
 516  					// Append event data
 517  					sevKeyBuf.Write(e.EventData)
 518  
 519  					// Write sev key (no value needed)
 520  					if err = txn.Set(sevKeyBuf.Bytes(), nil); chk.E(err) {
 521  						log.W.F("failed to write sev key for serial %d: %v", e.Serial, err)
 522  						continue
 523  					}
 524  					convertedCount++
 525  
 526  					// Additionally, for replaceable/addressable events, write specialized keys
 527  					if e.IsAddressable && len(e.DTag) > 0 {
 528  						// Addressable event: aev|pubkey_hash|kind|dtag_hash|size|data
 529  						aevKeyBuf := new(bytes.Buffer)
 530  						pubHash := new(types.PubHash)
 531  						pubHash.FromPubkey(e.Pubkey)
 532  						kindVal := new(types.Uint16)
 533  						kindVal.Set(e.Kind)
 534  						dTagHash := new(types.Ident)
 535  						dTagHash.FromIdent(e.DTag)
 536  
 537  						if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(aevKeyBuf); chk.E(err) {
 538  							continue
 539  						}
 540  
 541  						// Append size and data
 542  						aevKeyBuf.Write(sizeBytes)
 543  						aevKeyBuf.Write(e.EventData)
 544  
 545  						if err = txn.Set(aevKeyBuf.Bytes(), nil); chk.E(err) {
 546  							log.W.F("failed to write aev key for serial %d: %v", e.Serial, err)
 547  							continue
 548  						}
 549  					} else if e.IsReplaceable {
 550  						// Replaceable event: rev|pubkey_hash|kind|size|data
 551  						revKeyBuf := new(bytes.Buffer)
 552  						pubHash := new(types.PubHash)
 553  						pubHash.FromPubkey(e.Pubkey)
 554  						kindVal := new(types.Uint16)
 555  						kindVal.Set(e.Kind)
 556  
 557  						if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(revKeyBuf); chk.E(err) {
 558  							continue
 559  						}
 560  
 561  						// Append size and data
 562  						revKeyBuf.Write(sizeBytes)
 563  						revKeyBuf.Write(e.EventData)
 564  
 565  						if err = txn.Set(revKeyBuf.Bytes(), nil); chk.E(err) {
 566  							log.W.F("failed to write rev key for serial %d: %v", e.Serial, err)
 567  							continue
 568  						}
 569  					}
 570  
 571  					// Delete old evt key
 572  					if err = txn.Delete(e.OldKey); chk.E(err) {
 573  						log.W.F("failed to delete old event key for serial %d: %v", e.Serial, err)
 574  						continue
 575  					}
 576  					deletedCount++
 577  				}
 578  				return nil
 579  			},
 580  		); chk.E(err) {
 581  			log.W.F("batch update failed: %v", err)
 582  			continue
 583  		}
 584  
 585  		if (i/batchSize)%10 == 0 && i > 0 {
 586  			log.I.F("progress: %d/%d events converted", i, len(events))
 587  		}
 588  	}
 589  
 590  	log.I.F("migration complete: converted %d events to optimized inline storage, deleted %d old keys", convertedCount, deletedCount)
 591  }
 592  
 593  // ReencodeEventsWithOptimizedTags re-encodes all events to use the new binary
 594  // tag format that stores e/p tag values as 33-byte binary (32-byte hash + null)
 595  // instead of 64-byte hex strings. This reduces memory usage by ~48% for these tags.
 596  func (d *D) ReencodeEventsWithOptimizedTags() {
 597  	log.I.F("re-encoding events with optimized tag binary format...")
 598  	var err error
 599  
 600  	type EventUpdate struct {
 601  		Key     []byte
 602  		OldData []byte
 603  		NewData []byte
 604  	}
 605  
 606  	var updates []EventUpdate
 607  	var processedCount int
 608  
 609  	// Helper to collect event updates from iterator
 610  	// Only processes regular events (evt prefix) - inline storage already benefits
 611  	collectUpdates := func(it *badger.Iterator, prefix []byte) error {
 612  		for it.Rewind(); it.Valid(); it.Next() {
 613  			item := it.Item()
 614  			key := item.KeyCopy(nil)
 615  
 616  			var val []byte
 617  			if val, err = item.ValueCopy(nil); chk.E(err) {
 618  				continue
 619  			}
 620  
 621  			// Regular event storage - data is in value
 622  			eventData := val
 623  			if len(eventData) == 0 {
 624  				continue
 625  			}
 626  
 627  			// Decode the event
 628  			ev := new(event.E)
 629  			if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); chk.E(err) {
 630  				continue
 631  			}
 632  
 633  			// Check if this event has e or p tags that could benefit from optimization
 634  			hasOptimizableTags := false
 635  			if ev.Tags != nil && ev.Tags.Len() > 0 {
 636  				for _, t := range *ev.Tags {
 637  					if t.Len() >= 2 {
 638  						key := t.Key()
 639  						if len(key) == 1 && (key[0] == 'e' || key[0] == 'p') {
 640  							hasOptimizableTags = true
 641  							break
 642  						}
 643  					}
 644  				}
 645  			}
 646  
 647  			if !hasOptimizableTags {
 648  				continue
 649  			}
 650  
 651  			// Re-encode the event (this will apply the new tag optimization)
 652  			newData := ev.MarshalBinaryToBytes(nil)
 653  
 654  			// Only update if the data actually changed
 655  			if !bytes.Equal(eventData, newData) {
 656  				updates = append(updates, EventUpdate{
 657  					Key:     key,
 658  					OldData: eventData,
 659  					NewData: newData,
 660  				})
 661  			}
 662  		}
 663  		return nil
 664  	}
 665  
 666  	// Only process regular "evt" prefix events (not inline storage)
 667  	// Inline storage (sev, rev, aev) already benefits from the optimization
 668  	// because the binary data is stored directly in the key
 669  	prf := new(bytes.Buffer)
 670  	if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
 671  		return
 672  	}
 673  	evtPrefix := prf.Bytes()
 674  
 675  	// Collect updates from regular events only
 676  	if err = d.View(func(txn *badger.Txn) error {
 677  		it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrefix})
 678  		defer it.Close()
 679  		return collectUpdates(it, evtPrefix)
 680  	}); chk.E(err) {
 681  		return
 682  	}
 683  
 684  	log.I.F("found %d events with e/p tags to re-encode", len(updates))
 685  
 686  	if len(updates) == 0 {
 687  		log.I.F("no events need re-encoding")
 688  		return
 689  	}
 690  
 691  	// Apply updates in batches
 692  	const batchSize = 1000
 693  	for i := 0; i < len(updates); i += batchSize {
 694  		end := i + batchSize
 695  		if end > len(updates) {
 696  			end = len(updates)
 697  		}
 698  		batch := updates[i:end]
 699  
 700  		if err = d.Update(func(txn *badger.Txn) error {
 701  			for _, upd := range batch {
 702  				// Since we're only processing regular events (evt prefix),
 703  				// we just update the value directly
 704  				if err = txn.Set(upd.Key, upd.NewData); chk.E(err) {
 705  					log.W.F("failed to update event: %v", err)
 706  					continue
 707  				}
 708  				processedCount++
 709  			}
 710  			return nil
 711  		}); chk.E(err) {
 712  			log.W.F("batch update failed: %v", err)
 713  			continue
 714  		}
 715  
 716  		if (i/batchSize)%10 == 0 && i > 0 {
 717  			log.I.F("progress: %d/%d events re-encoded", i, len(updates))
 718  		}
 719  	}
 720  
 721  	savedBytes := 0
 722  	for _, upd := range updates {
 723  		savedBytes += len(upd.OldData) - len(upd.NewData)
 724  	}
 725  
 726  	log.I.F("migration complete: re-encoded %d events, saved approximately %d bytes (%.2f KB)",
 727  		processedCount, savedBytes, float64(savedBytes)/1024.0)
 728  }
 729  
 730  // ConvertToCompactEventFormat migrates all existing events to the new compact format.
 731  // This format uses 5-byte serial references instead of 32-byte IDs/pubkeys,
 732  // dramatically reducing storage requirements (up to 80% savings on ID/pubkey data).
 733  //
 734  // The migration:
 735  // 1. Reads each event from legacy storage (evt/sev prefixes)
 736  // 2. Creates SerialEventId mapping (sei prefix) for event ID lookup
 737  // 3. Re-encodes the event in compact format
 738  // 4. Stores in cmp prefix
 739  // 5. Optionally removes legacy storage after successful migration
 740  func (d *D) ConvertToCompactEventFormat() {
 741  	log.I.F("converting events to compact serial-reference format...")
 742  	var err error
 743  
 744  	type EventMigration struct {
 745  		Serial    uint64
 746  		EventId   []byte
 747  		OldData   []byte
 748  		OldKey    []byte
 749  		IsInline  bool // true if from sev, false if from evt
 750  	}
 751  
 752  	var migrations []EventMigration
 753  	var processedCount int
 754  	var savedBytes int64
 755  
 756  	// Create resolver for compact encoding
 757  	resolver := NewDatabaseSerialResolver(d, d.serialCache)
 758  
 759  	// First pass: collect all events that need migration
 760  	// Only process events that don't have a cmp entry yet
 761  	if err = d.View(func(txn *badger.Txn) error {
 762  		// Process evt (large events) table
 763  		evtPrf := new(bytes.Buffer)
 764  		if err = indexes.EventEnc(nil).MarshalWrite(evtPrf); chk.E(err) {
 765  			return err
 766  		}
 767  		it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf.Bytes()})
 768  		defer it.Close()
 769  
 770  		for it.Rewind(); it.Valid(); it.Next() {
 771  			item := it.Item()
 772  			key := item.KeyCopy(nil)
 773  
 774  			// Extract serial from key
 775  			ser := indexes.EventVars()
 776  			if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
 777  				continue
 778  			}
 779  
 780  			// Check if this event already has a cmp entry
 781  			cmpKey := new(bytes.Buffer)
 782  			if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); err == nil {
 783  				if _, getErr := txn.Get(cmpKey.Bytes()); getErr == nil {
 784  					// Already migrated
 785  					continue
 786  				}
 787  			}
 788  
 789  			var val []byte
 790  			if val, err = item.ValueCopy(nil); chk.E(err) {
 791  				continue
 792  			}
 793  
 794  			// Skip if this is already compact format
 795  			if len(val) > 0 && val[0] == CompactFormatVersion {
 796  				continue
 797  			}
 798  
 799  			// Decode the event to get the ID
 800  			ev := new(event.E)
 801  			if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
 802  				continue
 803  			}
 804  
 805  			migrations = append(migrations, EventMigration{
 806  				Serial:   ser.Get(),
 807  				EventId:  ev.ID,
 808  				OldData:  val,
 809  				OldKey:   key,
 810  				IsInline: false,
 811  			})
 812  		}
 813  		it.Close()
 814  
 815  		// Process sev (small inline events) table
 816  		sevPrf := new(bytes.Buffer)
 817  		if err = indexes.SmallEventEnc(nil).MarshalWrite(sevPrf); chk.E(err) {
 818  			return err
 819  		}
 820  		it2 := txn.NewIterator(badger.IteratorOptions{Prefix: sevPrf.Bytes()})
 821  		defer it2.Close()
 822  
 823  		for it2.Rewind(); it2.Valid(); it2.Next() {
 824  			item := it2.Item()
 825  			key := item.KeyCopy(nil)
 826  
 827  			// Extract serial and data from inline key
 828  			if len(key) <= 8+2 {
 829  				continue
 830  			}
 831  
 832  			// Extract serial
 833  			ser := new(types.Uint40)
 834  			if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
 835  				continue
 836  			}
 837  
 838  			// Check if this event already has a cmp entry
 839  			cmpKey := new(bytes.Buffer)
 840  			if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); err == nil {
 841  				if _, getErr := txn.Get(cmpKey.Bytes()); getErr == nil {
 842  					// Already migrated
 843  					continue
 844  				}
 845  			}
 846  
 847  			// Extract size and data
 848  			sizeIdx := 8
 849  			size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
 850  			dataStart := sizeIdx + 2
 851  			if len(key) < dataStart+size {
 852  				continue
 853  			}
 854  			eventData := key[dataStart : dataStart+size]
 855  
 856  			// Skip if this is already compact format
 857  			if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
 858  				continue
 859  			}
 860  
 861  			// Decode the event to get the ID
 862  			ev := new(event.E)
 863  			if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); chk.E(err) {
 864  				continue
 865  			}
 866  
 867  			migrations = append(migrations, EventMigration{
 868  				Serial:   ser.Get(),
 869  				EventId:  ev.ID,
 870  				OldData:  eventData,
 871  				OldKey:   key,
 872  				IsInline: true,
 873  			})
 874  		}
 875  
 876  		return nil
 877  	}); chk.E(err) {
 878  		return
 879  	}
 880  
 881  	log.I.F("found %d events to convert to compact format", len(migrations))
 882  
 883  	if len(migrations) == 0 {
 884  		log.I.F("no events need conversion")
 885  		return
 886  	}
 887  
 888  	// Process each event individually to avoid transaction size limits
 889  	// Some events (like kind 3 follow lists) can be very large
 890  	for i, m := range migrations {
 891  		if err = d.Update(func(txn *badger.Txn) error {
 892  			// Decode the legacy event
 893  			ev := new(event.E)
 894  			if err = ev.UnmarshalBinary(bytes.NewBuffer(m.OldData)); chk.E(err) {
 895  				log.W.F("migration: failed to decode event serial %d: %v", m.Serial, err)
 896  				return nil // Continue with next event
 897  			}
 898  
 899  			// Store SerialEventId mapping
 900  			if err = d.StoreEventIdSerial(txn, m.Serial, m.EventId); chk.E(err) {
 901  				log.W.F("migration: failed to store event ID mapping for serial %d: %v", m.Serial, err)
 902  				return nil // Continue with next event
 903  			}
 904  
 905  			// Encode in compact format
 906  			compactData, encErr := MarshalCompactEvent(ev, resolver)
 907  			if encErr != nil {
 908  				log.W.F("migration: failed to encode compact event for serial %d: %v", m.Serial, encErr)
 909  				return nil // Continue with next event
 910  			}
 911  
 912  			// Store compact event
 913  			ser := new(types.Uint40)
 914  			if err = ser.Set(m.Serial); chk.E(err) {
 915  				return nil // Continue with next event
 916  			}
 917  			cmpKey := new(bytes.Buffer)
 918  			if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); chk.E(err) {
 919  				return nil // Continue with next event
 920  			}
 921  			if err = txn.Set(cmpKey.Bytes(), compactData); chk.E(err) {
 922  				log.W.F("migration: failed to store compact event for serial %d: %v", m.Serial, err)
 923  				return nil // Continue with next event
 924  			}
 925  
 926  			// Track savings
 927  			savedBytes += int64(len(m.OldData) - len(compactData))
 928  			processedCount++
 929  
 930  			// Cache the mappings
 931  			d.serialCache.CacheEventId(m.Serial, m.EventId)
 932  			return nil
 933  		}); chk.E(err) {
 934  			log.W.F("migration failed for event %d: %v", m.Serial, err)
 935  			continue
 936  		}
 937  
 938  		// Log progress every 1000 events
 939  		if (i+1)%1000 == 0 {
 940  			log.I.F("migration progress: %d/%d events converted", i+1, len(migrations))
 941  		}
 942  	}
 943  
 944  	log.I.F("compact format migration complete: converted %d events, saved approximately %d bytes (%.2f MB)",
 945  		processedCount, savedBytes, float64(savedBytes)/(1024.0*1024.0))
 946  
 947  	// Cleanup legacy storage after successful migration
 948  	log.I.F("cleaning up legacy event storage (evt/sev prefixes)...")
 949  	d.CleanupLegacyEventStorage()
 950  }
 951  
 952  // CleanupLegacyEventStorage removes legacy evt and sev storage entries after
 953  // compact format migration. This reclaims disk space by removing the old storage
 954  // format entries once all events have been successfully migrated to cmp format.
 955  //
 956  // The cleanup:
 957  // 1. Iterates through all cmp entries (compact format)
 958  // 2. For each serial found in cmp, deletes corresponding evt and sev entries
 959  // 3. Reports total bytes reclaimed
 960  func (d *D) CleanupLegacyEventStorage() {
 961  	var err error
 962  	var cleanedEvt, cleanedSev int
 963  	var bytesReclaimed int64
 964  
 965  	// Collect serials from cmp table
 966  	var serialsToClean []uint64
 967  
 968  	if err = d.View(func(txn *badger.Txn) error {
 969  		cmpPrf := new(bytes.Buffer)
 970  		if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
 971  			return err
 972  		}
 973  
 974  		it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
 975  		defer it.Close()
 976  
 977  		for it.Rewind(); it.Valid(); it.Next() {
 978  			key := it.Item().Key()
 979  			// Extract serial from key (prefix 3 bytes + serial 5 bytes)
 980  			if len(key) >= 8 {
 981  				ser := new(types.Uint40)
 982  				if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); err == nil {
 983  					serialsToClean = append(serialsToClean, ser.Get())
 984  				}
 985  			}
 986  		}
 987  		return nil
 988  	}); chk.E(err) {
 989  		log.W.F("failed to collect compact event serials: %v", err)
 990  		return
 991  	}
 992  
 993  	log.I.F("found %d compact events to clean up legacy storage for", len(serialsToClean))
 994  
 995  	// Clean up in batches
 996  	const batchSize = 1000
 997  	for i := 0; i < len(serialsToClean); i += batchSize {
 998  		end := i + batchSize
 999  		if end > len(serialsToClean) {
1000  			end = len(serialsToClean)
1001  		}
1002  		batch := serialsToClean[i:end]
1003  
1004  		if err = d.Update(func(txn *badger.Txn) error {
1005  			for _, serial := range batch {
1006  				ser := new(types.Uint40)
1007  				if err = ser.Set(serial); err != nil {
1008  					continue
1009  				}
1010  
1011  				// Try to delete evt entry
1012  				evtKeyBuf := new(bytes.Buffer)
1013  				if err = indexes.EventEnc(ser).MarshalWrite(evtKeyBuf); err == nil {
1014  					item, getErr := txn.Get(evtKeyBuf.Bytes())
1015  					if getErr == nil {
1016  						// Track size before deleting
1017  						bytesReclaimed += int64(item.ValueSize())
1018  						if delErr := txn.Delete(evtKeyBuf.Bytes()); delErr == nil {
1019  							cleanedEvt++
1020  						}
1021  					}
1022  				}
1023  
1024  				// Try to delete sev entry (need to iterate with prefix since key includes inline data)
1025  				sevKeyBuf := new(bytes.Buffer)
1026  				if err = indexes.SmallEventEnc(ser).MarshalWrite(sevKeyBuf); err == nil {
1027  					opts := badger.DefaultIteratorOptions
1028  					opts.Prefix = sevKeyBuf.Bytes()
1029  					it := txn.NewIterator(opts)
1030  
1031  					it.Rewind()
1032  					if it.Valid() {
1033  						key := it.Item().KeyCopy(nil)
1034  						bytesReclaimed += int64(len(key)) // sev stores data in key
1035  						if delErr := txn.Delete(key); delErr == nil {
1036  							cleanedSev++
1037  						}
1038  					}
1039  					it.Close()
1040  				}
1041  			}
1042  			return nil
1043  		}); chk.E(err) {
1044  			log.W.F("batch cleanup failed: %v", err)
1045  			continue
1046  		}
1047  
1048  		if (i/batchSize)%10 == 0 && i > 0 {
1049  			log.I.F("cleanup progress: %d/%d events processed", i, len(serialsToClean))
1050  		}
1051  	}
1052  
1053  	log.I.F("legacy storage cleanup complete: removed %d evt entries, %d sev entries, reclaimed approximately %d bytes (%.2f MB)",
1054  		cleanedEvt, cleanedSev, bytesReclaimed, float64(bytesReclaimed)/(1024.0*1024.0))
1055  }
1056  
1057  // RebuildWordIndexesWithNormalization rebuilds all word indexes with unicode
1058  // normalization applied. This migration:
1059  // 1. Deletes all existing word indexes (wrd prefix)
1060  // 2. Re-tokenizes all events with normalizeRune() applied
1061  // 3. Creates new consolidated indexes where decorative unicode maps to ASCII
1062  //
1063  // After this migration, "ᴅᴇᴀᴛʜ" (small caps) and "𝔇𝔢𝔞𝔱𝔥" (fraktur) will index
1064  // the same as "death", eliminating duplicate entries and enabling proper search.
1065  func (d *D) RebuildWordIndexesWithNormalization() {
1066  	log.I.F("rebuilding word indexes with unicode normalization...")
1067  	var err error
1068  
1069  	// Step 1: Delete all existing word indexes
1070  	var deletedCount int
1071  	if err = d.Update(func(txn *badger.Txn) error {
1072  		wrdPrf := new(bytes.Buffer)
1073  		if err = indexes.WordEnc(nil, nil).MarshalWrite(wrdPrf); chk.E(err) {
1074  			return err
1075  		}
1076  
1077  		opts := badger.DefaultIteratorOptions
1078  		opts.Prefix = wrdPrf.Bytes()
1079  		opts.PrefetchValues = false // Keys only for deletion
1080  
1081  		it := txn.NewIterator(opts)
1082  		defer it.Close()
1083  
1084  		// Collect keys to delete (can't delete during iteration)
1085  		var keysToDelete [][]byte
1086  		for it.Rewind(); it.Valid(); it.Next() {
1087  			keysToDelete = append(keysToDelete, it.Item().KeyCopy(nil))
1088  		}
1089  
1090  		for _, key := range keysToDelete {
1091  			if err = txn.Delete(key); err == nil {
1092  				deletedCount++
1093  			}
1094  		}
1095  		return nil
1096  	}); chk.E(err) {
1097  		log.W.F("failed to delete old word indexes: %v", err)
1098  		return
1099  	}
1100  
1101  	log.I.F("deleted %d old word index entries", deletedCount)
1102  
1103  	// Step 2: Rebuild word indexes from all events
1104  	// Reuse the existing UpdateWordIndexes logic which now uses normalizeRune
1105  	d.UpdateWordIndexes()
1106  
1107  	log.I.F("word index rebuild with unicode normalization complete")
1108  }
1109  
1110  // BackfillETagGraph populates e-tag graph indexes (eeg/gee) for all existing events.
1111  // This enables graph traversal queries for thread/reply discovery.
1112  //
1113  // The migration:
1114  // 1. Iterates all events in compact storage (cmp prefix)
1115  // 2. Extracts e-tags from each event
1116  // 3. For e-tags referencing events we have, creates bidirectional edges:
1117  //    - eeg|source|target|kind|direction(out) - forward edge
1118  //    - gee|target|kind|direction(in)|source - reverse edge
1119  //
1120  // This is idempotent: running multiple times won't create duplicate edges
1121  // (BadgerDB overwrites existing keys).
1122  func (d *D) BackfillETagGraph() {
1123  	log.I.F("backfilling e-tag graph indexes for graph query support...")
1124  	var err error
1125  
1126  	type ETagEdge struct {
1127  		SourceSerial *types.Uint40
1128  		TargetSerial *types.Uint40
1129  		Kind         *types.Uint16
1130  	}
1131  
1132  	var edges []ETagEdge
1133  	var processedEvents int
1134  	var eventsWithETags int
1135  	var skippedTargets int
1136  
1137  	// First pass: collect all e-tag edges from events
1138  	if err = d.View(func(txn *badger.Txn) error {
1139  		// Iterate compact events (cmp prefix)
1140  		cmpPrf := new(bytes.Buffer)
1141  		if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
1142  			return err
1143  		}
1144  
1145  		it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
1146  		defer it.Close()
1147  
1148  		for it.Rewind(); it.Valid(); it.Next() {
1149  			item := it.Item()
1150  			key := item.KeyCopy(nil)
1151  
1152  			// Extract serial from key (prefix 3 bytes + serial 5 bytes)
1153  			if len(key) < 8 {
1154  				continue
1155  			}
1156  			sourceSerial := new(types.Uint40)
1157  			if err = sourceSerial.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
1158  				continue
1159  			}
1160  
1161  			// Get event data
1162  			var val []byte
1163  			if val, err = item.ValueCopy(nil); chk.E(err) {
1164  				continue
1165  			}
1166  
1167  			// Decode the event
1168  			// First get the event ID from serial (needed for compact format decoding)
1169  			eventId, idErr := d.GetEventIdBySerial(sourceSerial)
1170  			if idErr != nil {
1171  				continue
1172  			}
1173  			resolver := NewDatabaseSerialResolver(d, d.serialCache)
1174  			ev, decErr := UnmarshalCompactEvent(val, eventId, resolver)
1175  			if decErr != nil || ev == nil {
1176  				continue
1177  			}
1178  			processedEvents++
1179  
1180  			// Extract e-tags
1181  			eTags := ev.Tags.GetAll([]byte("e"))
1182  			if len(eTags) == 0 {
1183  				continue
1184  			}
1185  			eventsWithETags++
1186  
1187  			eventKind := new(types.Uint16)
1188  			eventKind.Set(ev.Kind)
1189  
1190  			for _, eTag := range eTags {
1191  				if eTag.Len() < 2 {
1192  					continue
1193  				}
1194  
1195  				// Get event ID from e-tag
1196  				var targetEventID []byte
1197  				targetEventID, err = hex.Dec(string(eTag.ValueHex()))
1198  				if err != nil || len(targetEventID) != 32 {
1199  					continue
1200  				}
1201  
1202  				// Look up target event's serial
1203  				targetSerial, lookupErr := d.GetSerialById(targetEventID)
1204  				if lookupErr != nil || targetSerial == nil {
1205  					// Target event not in our database - skip
1206  					skippedTargets++
1207  					continue
1208  				}
1209  
1210  				edges = append(edges, ETagEdge{
1211  					SourceSerial: sourceSerial,
1212  					TargetSerial: targetSerial,
1213  					Kind:         eventKind,
1214  				})
1215  			}
1216  		}
1217  		return nil
1218  	}); chk.E(err) {
1219  		log.E.F("e-tag graph backfill: failed to collect edges: %v", err)
1220  		return
1221  	}
1222  
1223  	log.I.F("e-tag graph backfill: processed %d events, %d with e-tags, found %d edges to create (%d targets not found)",
1224  		processedEvents, eventsWithETags, len(edges), skippedTargets)
1225  
1226  	if len(edges) == 0 {
1227  		log.I.F("e-tag graph backfill: no edges to create")
1228  		return
1229  	}
1230  
1231  	// Sort edges for ordered writes (improves compaction)
1232  	sort.Slice(edges, func(i, j int) bool {
1233  		if edges[i].SourceSerial.Get() != edges[j].SourceSerial.Get() {
1234  			return edges[i].SourceSerial.Get() < edges[j].SourceSerial.Get()
1235  		}
1236  		return edges[i].TargetSerial.Get() < edges[j].TargetSerial.Get()
1237  	})
1238  
1239  	// Second pass: write edges in batches
1240  	const batchSize = 1000
1241  	var createdEdges int
1242  
1243  	for i := 0; i < len(edges); i += batchSize {
1244  		end := i + batchSize
1245  		if end > len(edges) {
1246  			end = len(edges)
1247  		}
1248  		batch := edges[i:end]
1249  
1250  		if err = d.Update(func(txn *badger.Txn) error {
1251  			for _, edge := range batch {
1252  				// Create forward edge: eeg|source|target|kind|direction(out)
1253  				directionOut := new(types.Letter)
1254  				directionOut.Set(types.EdgeDirectionETagOut)
1255  				keyBuf := new(bytes.Buffer)
1256  				if err = indexes.EventEventGraphEnc(edge.SourceSerial, edge.TargetSerial, edge.Kind, directionOut).MarshalWrite(keyBuf); chk.E(err) {
1257  					continue
1258  				}
1259  				if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
1260  					continue
1261  				}
1262  
1263  				// Create reverse edge: gee|target|kind|direction(in)|source
1264  				directionIn := new(types.Letter)
1265  				directionIn.Set(types.EdgeDirectionETagIn)
1266  				keyBuf.Reset()
1267  				if err = indexes.GraphEventEventEnc(edge.TargetSerial, edge.Kind, directionIn, edge.SourceSerial).MarshalWrite(keyBuf); chk.E(err) {
1268  					continue
1269  				}
1270  				if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
1271  					continue
1272  				}
1273  
1274  				createdEdges++
1275  			}
1276  			return nil
1277  		}); chk.E(err) {
1278  			log.W.F("e-tag graph backfill: batch write failed: %v", err)
1279  			continue
1280  		}
1281  
1282  		if (i/batchSize)%10 == 0 && i > 0 {
1283  			log.I.F("e-tag graph backfill progress: %d/%d edges created", i, len(edges))
1284  		}
1285  	}
1286  
1287  	log.I.F("e-tag graph backfill complete: created %d bidirectional edges", createdEdges)
1288  }
1289  
1290  // BackfillMissingSerialEventIdMappings finds legacy events that were incorrectly
1291  // skipped during the v6 compact format migration and creates their SerialEventId
1292  // mappings. This fixes events whose ID happens to start with byte 0x01, which was
1293  // mistakenly interpreted as CompactFormatVersion during the original migration.
1294  //
1295  // The v6 migration had this check:
1296  //
1297  //	if len(eventData) > 0 && eventData[0] == CompactFormatVersion { continue }
1298  //
1299  // This caused legacy events with IDs starting with 0x01 to be skipped, leaving
1300  // them without sei mappings and causing "Key not found" errors when fetching.
1301  func (d *D) BackfillMissingSerialEventIdMappings() {
1302  	log.I.F("backfilling missing SerialEventId mappings for legacy events...")
1303  	var err error
1304  
1305  	type LegacyEvent struct {
1306  		Serial    uint64
1307  		EventData []byte
1308  		IsInline  bool // true if from sev, false if from evt
1309  	}
1310  
1311  	var legacyEvents []LegacyEvent
1312  	var alreadyHasMapping int
1313  	var skippedCompact int
1314  
1315  	// First pass: find legacy events that don't have sei mappings
1316  	if err = d.View(func(txn *badger.Txn) error {
1317  		// Process evt (large events) table
1318  		evtPrf := new(bytes.Buffer)
1319  		if err = indexes.EventEnc(nil).MarshalWrite(evtPrf); chk.E(err) {
1320  			return err
1321  		}
1322  		it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf.Bytes()})
1323  		defer it.Close()
1324  
1325  		for it.Rewind(); it.Valid(); it.Next() {
1326  			item := it.Item()
1327  			key := item.KeyCopy(nil)
1328  
1329  			// Extract serial from key
1330  			ser := indexes.EventVars()
1331  			if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
1332  				continue
1333  			}
1334  
1335  			// Check if this serial already has an sei mapping
1336  			seiKey := new(bytes.Buffer)
1337  			if err = indexes.SerialEventIdEnc(ser).MarshalWrite(seiKey); err == nil {
1338  				if _, getErr := txn.Get(seiKey.Bytes()); getErr == nil {
1339  					// Already has mapping
1340  					alreadyHasMapping++
1341  					continue
1342  				}
1343  			}
1344  
1345  			var val []byte
1346  			if val, err = item.ValueCopy(nil); chk.E(err) {
1347  				continue
1348  			}
1349  
1350  			// Only process if first byte is 0x01 (these were the ones skipped)
1351  			// Events with other first bytes would have been migrated correctly
1352  			if len(val) == 0 || val[0] != CompactFormatVersion {
1353  				continue
1354  			}
1355  
1356  			// Verify this is actually legacy format by checking size
1357  			// Legacy format: 32 (ID) + 32 (Pubkey) + varints + 64 (Sig) = ~135+ bytes minimum
1358  			// Compact format is smaller, typically < 100 bytes for simple events
1359  			if len(val) < 130 {
1360  				// Likely actually compact format, skip
1361  				skippedCompact++
1362  				continue
1363  			}
1364  
1365  			legacyEvents = append(legacyEvents, LegacyEvent{
1366  				Serial:    ser.Get(),
1367  				EventData: val,
1368  				IsInline:  false,
1369  			})
1370  		}
1371  		it.Close()
1372  
1373  		// Process sev (small inline events) table
1374  		sevPrf := new(bytes.Buffer)
1375  		if err = indexes.SmallEventEnc(nil).MarshalWrite(sevPrf); chk.E(err) {
1376  			return err
1377  		}
1378  		it2 := txn.NewIterator(badger.IteratorOptions{Prefix: sevPrf.Bytes()})
1379  		defer it2.Close()
1380  
1381  		for it2.Rewind(); it2.Valid(); it2.Next() {
1382  			item := it2.Item()
1383  			key := item.KeyCopy(nil)
1384  
1385  			// Extract serial and data from inline key
1386  			if len(key) <= 8+2 {
1387  				continue
1388  			}
1389  
1390  			// Extract serial
1391  			ser := new(types.Uint40)
1392  			if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
1393  				continue
1394  			}
1395  
1396  			// Check if this serial already has an sei mapping
1397  			seiKey := new(bytes.Buffer)
1398  			if err = indexes.SerialEventIdEnc(ser).MarshalWrite(seiKey); err == nil {
1399  				if _, getErr := txn.Get(seiKey.Bytes()); getErr == nil {
1400  					// Already has mapping
1401  					alreadyHasMapping++
1402  					continue
1403  				}
1404  			}
1405  
1406  			// Extract size and data
1407  			sizeIdx := 8
1408  			size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
1409  			dataStart := sizeIdx + 2
1410  			if len(key) < dataStart+size {
1411  				continue
1412  			}
1413  			eventData := key[dataStart : dataStart+size]
1414  
1415  			// Only process if first byte is 0x01
1416  			if len(eventData) == 0 || eventData[0] != CompactFormatVersion {
1417  				continue
1418  			}
1419  
1420  			// Verify this is actually legacy format by checking size
1421  			if len(eventData) < 130 {
1422  				skippedCompact++
1423  				continue
1424  			}
1425  
1426  			legacyEvents = append(legacyEvents, LegacyEvent{
1427  				Serial:    ser.Get(),
1428  				EventData: eventData,
1429  				IsInline:  true,
1430  			})
1431  		}
1432  
1433  		return nil
1434  	}); chk.E(err) {
1435  		log.E.F("failed to scan for legacy events: %v", err)
1436  		return
1437  	}
1438  
1439  	log.I.F("found %d legacy events needing sei mappings (%d already have mappings, %d skipped as compact)",
1440  		len(legacyEvents), alreadyHasMapping, skippedCompact)
1441  
1442  	if len(legacyEvents) == 0 {
1443  		log.I.F("no legacy events need sei mapping backfill")
1444  		return
1445  	}
1446  
1447  	// Create resolver for potential compact conversion
1448  	resolver := NewDatabaseSerialResolver(d, d.serialCache)
1449  
1450  	// Process each event
1451  	var successCount, failCount int
1452  	var convertedToCompact int
1453  
1454  	for i, le := range legacyEvents {
1455  		if err = d.Update(func(txn *badger.Txn) error {
1456  			// Decode the legacy event to get the ID
1457  			ev := new(event.E)
1458  			if err = ev.UnmarshalBinary(bytes.NewBuffer(le.EventData)); err != nil {
1459  				log.D.F("backfill: failed to decode event serial %d as legacy format: %v", le.Serial, err)
1460  				failCount++
1461  				return nil // Continue with next event
1462  			}
1463  
1464  			// Verify the event ID actually starts with 0x01
1465  			if len(ev.ID) < 1 || ev.ID[0] != 0x01 {
1466  				log.D.F("backfill: event serial %d doesn't have ID starting with 0x01, skipping", le.Serial)
1467  				return nil
1468  			}
1469  
1470  			// Store SerialEventId mapping
1471  			if err = d.StoreEventIdSerial(txn, le.Serial, ev.ID); chk.E(err) {
1472  				log.W.F("backfill: failed to store sei mapping for serial %d: %v", le.Serial, err)
1473  				failCount++
1474  				return nil
1475  			}
1476  
1477  			// Cache the mapping
1478  			d.serialCache.CacheEventId(le.Serial, ev.ID)
1479  
1480  			// Also convert to compact format if not already done
1481  			ser := new(types.Uint40)
1482  			if err = ser.Set(le.Serial); err != nil {
1483  				successCount++
1484  				return nil
1485  			}
1486  
1487  			// Check if cmp entry exists
1488  			cmpKey := new(bytes.Buffer)
1489  			if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); err == nil {
1490  				if _, getErr := txn.Get(cmpKey.Bytes()); getErr == nil {
1491  					// Already has compact entry
1492  					successCount++
1493  					return nil
1494  				}
1495  			}
1496  
1497  			// Create compact format entry
1498  			compactData, encErr := MarshalCompactEvent(ev, resolver)
1499  			if encErr != nil {
1500  				log.D.F("backfill: failed to encode compact event for serial %d: %v", le.Serial, encErr)
1501  				successCount++ // sei mapping was successful, just couldn't convert
1502  				return nil
1503  			}
1504  
1505  			if err = txn.Set(cmpKey.Bytes(), compactData); chk.E(err) {
1506  				log.D.F("backfill: failed to store compact event for serial %d: %v", le.Serial, err)
1507  				successCount++ // sei mapping was successful
1508  				return nil
1509  			}
1510  
1511  			convertedToCompact++
1512  			successCount++
1513  			return nil
1514  		}); chk.E(err) {
1515  			log.W.F("backfill: transaction failed for serial %d: %v", le.Serial, err)
1516  			failCount++
1517  			continue
1518  		}
1519  
1520  		// Log progress every 1000 events
1521  		if (i+1)%1000 == 0 {
1522  			log.I.F("backfill progress: %d/%d events processed", i+1, len(legacyEvents))
1523  		}
1524  	}
1525  
1526  	log.I.F("SerialEventId backfill complete: %d successful, %d failed, %d also converted to compact format",
1527  		successCount, failCount, convertedToCompact)
1528  }
1529  
1530  // BackfillPubkeyPubkeyGraph populates pubkey-to-pubkey (noun-noun) graph indexes
1531  // (ppg/gpp) for all existing events that contain p-tags.
1532  // This materializes direct pubkey→pubkey edges, collapsing the two-hop
1533  // pubkey→event→pubkey traversal into a single-hop lookup.
1534  //
1535  // For each event with p-tags, creates bidirectional edges:
1536  //   - ppg|author_serial|target_serial|kind|direction(out)|event_serial - forward edge
1537  //   - gpp|target_serial|kind|direction(in)|author_serial|event_serial - reverse edge
1538  //
1539  // This is idempotent: running multiple times won't create duplicate edges.
1540  func (d *D) BackfillPubkeyPubkeyGraph() {
1541  	log.I.F("backfilling pubkey-to-pubkey graph indexes...")
1542  	var err error
1543  
1544  	type PubkeyEdge struct {
1545  		AuthorSerial *types.Uint40
1546  		TargetSerial *types.Uint40
1547  		Kind         *types.Uint16
1548  		EventSerial  *types.Uint40
1549  	}
1550  
1551  	var edges []PubkeyEdge
1552  	var processedEvents int
1553  	var eventsWithPTags int
1554  	var skippedPubkeys int
1555  
1556  	// First pass: collect all pubkey-pubkey edges from events
1557  	if err = d.View(func(txn *badger.Txn) error {
1558  		// Iterate compact events (cmp prefix)
1559  		cmpPrf := new(bytes.Buffer)
1560  		if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
1561  			return err
1562  		}
1563  
1564  		it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
1565  		defer it.Close()
1566  
1567  		for it.Rewind(); it.Valid(); it.Next() {
1568  			item := it.Item()
1569  			key := item.KeyCopy(nil)
1570  
1571  			// Extract serial from key (prefix 3 bytes + serial 5 bytes)
1572  			if len(key) < 8 {
1573  				continue
1574  			}
1575  			eventSerial := new(types.Uint40)
1576  			if err = eventSerial.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
1577  				continue
1578  			}
1579  
1580  			// Get event data
1581  			var val []byte
1582  			if val, err = item.ValueCopy(nil); chk.E(err) {
1583  				continue
1584  			}
1585  
1586  			// Decode the event
1587  			eventId, idErr := d.GetEventIdBySerial(eventSerial)
1588  			if idErr != nil {
1589  				continue
1590  			}
1591  			resolver := NewDatabaseSerialResolver(d, d.serialCache)
1592  			ev, decErr := UnmarshalCompactEvent(val, eventId, resolver)
1593  			if decErr != nil || ev == nil {
1594  				continue
1595  			}
1596  			processedEvents++
1597  
1598  			// Extract p-tags
1599  			pTags := ev.Tags.GetAll([]byte("p"))
1600  			if len(pTags) == 0 {
1601  				continue
1602  			}
1603  			eventsWithPTags++
1604  
1605  			// Get author pubkey serial
1606  			authorSerial, authErr := d.GetOrCreatePubkeySerial(ev.Pubkey)
1607  			if authErr != nil {
1608  				continue
1609  			}
1610  
1611  			eventKind := new(types.Uint16)
1612  			eventKind.Set(ev.Kind)
1613  
1614  			for _, pTag := range pTags {
1615  				if pTag.Len() < 2 {
1616  					continue
1617  				}
1618  
1619  				// Get pubkey from p-tag
1620  				var targetPubkey []byte
1621  				targetPubkey, err = hex.Dec(string(pTag.ValueHex()))
1622  				if err != nil || len(targetPubkey) != 32 {
1623  					continue
1624  				}
1625  
1626  				// Get or create target pubkey serial
1627  				targetSerial, lookupErr := d.GetOrCreatePubkeySerial(targetPubkey)
1628  				if lookupErr != nil {
1629  					skippedPubkeys++
1630  					continue
1631  				}
1632  
1633  				// Skip self-references
1634  				if authorSerial.Get() == targetSerial.Get() {
1635  					continue
1636  				}
1637  
1638  				// Clone serials for storage in slice
1639  				asSer := new(types.Uint40)
1640  				asSer.Set(authorSerial.Get())
1641  				tsSer := new(types.Uint40)
1642  				tsSer.Set(targetSerial.Get())
1643  				esSer := new(types.Uint40)
1644  				esSer.Set(eventSerial.Get())
1645  				ekKind := new(types.Uint16)
1646  				ekKind.Set(ev.Kind)
1647  
1648  				edges = append(edges, PubkeyEdge{
1649  					AuthorSerial: asSer,
1650  					TargetSerial: tsSer,
1651  					Kind:         ekKind,
1652  					EventSerial:  esSer,
1653  				})
1654  			}
1655  		}
1656  		return nil
1657  	}); chk.E(err) {
1658  		log.E.F("pubkey-pubkey graph backfill: failed to collect edges: %v", err)
1659  		return
1660  	}
1661  
1662  	log.I.F("pubkey-pubkey graph backfill: processed %d events, %d with p-tags, found %d edges to create (%d pubkeys failed)",
1663  		processedEvents, eventsWithPTags, len(edges), skippedPubkeys)
1664  
1665  	if len(edges) == 0 {
1666  		log.I.F("pubkey-pubkey graph backfill: no edges to create")
1667  		return
1668  	}
1669  
1670  	// Sort edges for ordered writes (improves compaction)
1671  	sort.Slice(edges, func(i, j int) bool {
1672  		if edges[i].AuthorSerial.Get() != edges[j].AuthorSerial.Get() {
1673  			return edges[i].AuthorSerial.Get() < edges[j].AuthorSerial.Get()
1674  		}
1675  		return edges[i].TargetSerial.Get() < edges[j].TargetSerial.Get()
1676  	})
1677  
1678  	// Second pass: write edges in batches
1679  	const batchSize = 1000
1680  	var createdEdges int
1681  
1682  	for i := 0; i < len(edges); i += batchSize {
1683  		end := i + batchSize
1684  		if end > len(edges) {
1685  			end = len(edges)
1686  		}
1687  		batch := edges[i:end]
1688  
1689  		if err = d.Update(func(txn *badger.Txn) error {
1690  			for _, edge := range batch {
1691  				// Forward edge: ppg|author|target|kind|direction(out)|event_serial
1692  				directionOut := new(types.Letter)
1693  				directionOut.Set(types.EdgeDirectionPubkeyOut)
1694  				keyBuf := new(bytes.Buffer)
1695  				if err = indexes.PubkeyPubkeyGraphEnc(edge.AuthorSerial, edge.TargetSerial, edge.Kind, directionOut, edge.EventSerial).MarshalWrite(keyBuf); chk.E(err) {
1696  					continue
1697  				}
1698  				if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
1699  					continue
1700  				}
1701  
1702  				// Reverse edge: gpp|target|kind|direction(in)|author|event_serial
1703  				directionIn := new(types.Letter)
1704  				directionIn.Set(types.EdgeDirectionPubkeyIn)
1705  				keyBuf.Reset()
1706  				if err = indexes.GraphPubkeyPubkeyEnc(edge.TargetSerial, edge.Kind, directionIn, edge.AuthorSerial, edge.EventSerial).MarshalWrite(keyBuf); chk.E(err) {
1707  					continue
1708  				}
1709  				if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
1710  					continue
1711  				}
1712  
1713  				createdEdges++
1714  			}
1715  			return nil
1716  		}); chk.E(err) {
1717  			log.W.F("pubkey-pubkey graph backfill: batch write failed: %v", err)
1718  			continue
1719  		}
1720  
1721  		if (i/batchSize)%10 == 0 && i > 0 {
1722  			log.I.F("pubkey-pubkey graph backfill progress: %d/%d edges created", i, len(edges))
1723  		}
1724  	}
1725  
1726  	log.I.F("pubkey-pubkey graph backfill complete: created %d bidirectional edges", createdEdges)
1727  }
1728