repair.go raw
1 //go:build !(js && wasm)
2
3 package database
4
5 import (
6 "bytes"
7 "context"
8 "encoding/binary"
9 "fmt"
10 "io"
11 "time"
12
13 "github.com/dgraph-io/badger/v4"
14 "next.orly.dev/pkg/nostr/crypto/ec/schnorr"
15 "next.orly.dev/pkg/nostr/encoders/event"
16 "next.orly.dev/pkg/nostr/encoders/tag"
17 "next.orly.dev/pkg/nostr/encoders/varint"
18 "next.orly.dev/pkg/lol/log"
19 "next.orly.dev/pkg/database/indexes"
20 "next.orly.dev/pkg/database/indexes/types"
21 )
22
23 // RepairReport contains the results of a database repair operation.
24 type RepairReport struct {
25 // Operation metadata
26 Started time.Time
27 Duration time.Duration
28 DryRun bool
29
30 // Events scanned
31 CompactEventsScanned int64
32 LegacyEventsScanned int64
33
34 // Repairs performed
35 SeiEntriesCreated int64 // sei mappings rebuilt from compact events
36 SeiEntriesRemoved int64 // orphaned sei entries removed
37 PubkeyMappingsFixed int64 // pubkey serial mappings fixed
38 OrphanedIndexesFixed int64 // orphaned index entries removed
39
40 // Errors encountered
41 Errors []string
42 }
43
44 // String returns a human-readable repair report.
45 func (r *RepairReport) String() string {
46 var buf bytes.Buffer
47 fmt.Fprintln(&buf, "Database Repair Report")
48 fmt.Fprintln(&buf, "======================")
49 fmt.Fprintf(&buf, "Duration: %v\n", r.Duration)
50 if r.DryRun {
51 fmt.Fprintln(&buf, "Mode: DRY RUN (no changes made)")
52 } else {
53 fmt.Fprintln(&buf, "Mode: REPAIR (changes applied)")
54 }
55 fmt.Fprintln(&buf)
56
57 fmt.Fprintln(&buf, "Events Scanned:")
58 fmt.Fprintf(&buf, " Compact events: %d\n", r.CompactEventsScanned)
59 fmt.Fprintf(&buf, " Legacy events: %d\n\n", r.LegacyEventsScanned)
60
61 fmt.Fprintln(&buf, "Repairs:")
62 fmt.Fprintf(&buf, " sei entries created: %d\n", r.SeiEntriesCreated)
63 fmt.Fprintf(&buf, " sei entries removed: %d\n", r.SeiEntriesRemoved)
64 fmt.Fprintf(&buf, " pubkey mappings fixed: %d\n", r.PubkeyMappingsFixed)
65 fmt.Fprintf(&buf, " orphaned indexes: %d\n\n", r.OrphanedIndexesFixed)
66
67 if len(r.Errors) > 0 {
68 fmt.Fprintln(&buf, "Errors encountered:")
69 for _, e := range r.Errors {
70 fmt.Fprintf(&buf, " - %s\n", e)
71 }
72 }
73
74 return buf.String()
75 }
76
77 // RepairOptions configures the repair operation.
78 type RepairOptions struct {
79 // DryRun if true, only reports what would be fixed without making changes
80 DryRun bool
81
82 // FixMissingSei if true, rebuilds missing sei entries from compact events
83 FixMissingSei bool
84
85 // RemoveOrphanedSei if true, removes sei entries without corresponding events
86 RemoveOrphanedSei bool
87
88 // FixPubkeyMappings if true, fixes inconsistent pubkey serial mappings
89 FixPubkeyMappings bool
90
91 // Progress writer for progress updates
92 Progress io.Writer
93 }
94
95 // DefaultRepairOptions returns the default repair options.
96 func DefaultRepairOptions() *RepairOptions {
97 return &RepairOptions{
98 DryRun: false,
99 FixMissingSei: true,
100 RemoveOrphanedSei: true,
101 FixPubkeyMappings: true,
102 }
103 }
104
105 // Repair performs database repair operations based on the provided options.
106 // It fixes integrity issues found by HealthCheck.
107 func (d *D) Repair(ctx context.Context, opts *RepairOptions) (report *RepairReport, err error) {
108 if opts == nil {
109 opts = DefaultRepairOptions()
110 }
111
112 report = &RepairReport{
113 Started: time.Now(),
114 DryRun: opts.DryRun,
115 Errors: make([]string, 0),
116 }
117
118 progress := opts.Progress
119
120 if progress != nil {
121 if opts.DryRun {
122 fmt.Fprintln(progress, "Starting database repair (DRY RUN)...")
123 } else {
124 fmt.Fprintln(progress, "Starting database repair...")
125 }
126 }
127
128 // Phase 1: Fix missing sei entries
129 if opts.FixMissingSei {
130 if progress != nil {
131 fmt.Fprintln(progress, "\nPhase 1: Rebuilding missing sei entries from compact events...")
132 }
133
134 if err = d.repairMissingSei(ctx, opts, report); err != nil {
135 report.Errors = append(report.Errors, fmt.Sprintf("sei repair error: %v", err))
136 log.E.F("sei repair error: %v", err)
137 }
138 }
139
140 // Phase 2: Remove orphaned sei entries
141 if opts.RemoveOrphanedSei {
142 if progress != nil {
143 fmt.Fprintln(progress, "\nPhase 2: Removing orphaned sei entries...")
144 }
145
146 if err = d.repairOrphanedSei(ctx, opts, report); err != nil {
147 report.Errors = append(report.Errors, fmt.Sprintf("orphaned sei repair error: %v", err))
148 log.E.F("orphaned sei repair error: %v", err)
149 }
150 }
151
152 // Phase 3: Fix pubkey serial mappings
153 if opts.FixPubkeyMappings {
154 if progress != nil {
155 fmt.Fprintln(progress, "\nPhase 3: Checking pubkey serial mappings...")
156 }
157
158 if err = d.repairPubkeyMappings(ctx, opts, report); err != nil {
159 report.Errors = append(report.Errors, fmt.Sprintf("pubkey mapping repair error: %v", err))
160 log.E.F("pubkey mapping repair error: %v", err)
161 }
162 }
163
164 report.Duration = time.Since(report.Started)
165
166 if progress != nil {
167 fmt.Fprintf(progress, "\nRepair complete in %v\n", report.Duration)
168 fmt.Fprintf(progress, " sei entries created: %d\n", report.SeiEntriesCreated)
169 fmt.Fprintf(progress, " sei entries removed: %d\n", report.SeiEntriesRemoved)
170 fmt.Fprintf(progress, " pubkey mappings fixed: %d\n", report.PubkeyMappingsFixed)
171 if opts.DryRun {
172 fmt.Fprintln(progress, "\n(DRY RUN - no changes were made)")
173 }
174 }
175
176 return report, nil
177 }
178
179 // repairMissingSei rebuilds sei entries for compact events that are missing them.
180 func (d *D) repairMissingSei(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
181 progress := opts.Progress
182 cmpPrf := buildPrefix(indexes.CompactEventEnc(nil))
183 seiPrf := buildPrefix(indexes.SerialEventIdEnc(nil))
184
185 // First pass: collect all serials that need sei entries
186 type repairItem struct {
187 serial uint64
188 eventID []byte
189 }
190 var toRepair []repairItem
191
192 err := d.View(func(txn *badger.Txn) error {
193 it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf})
194 defer it.Close()
195
196 for it.Rewind(); it.Valid(); it.Next() {
197 select {
198 case <-ctx.Done():
199 return ctx.Err()
200 default:
201 }
202
203 item := it.Item()
204 key := item.Key()
205 if len(key) < 8 {
206 continue
207 }
208
209 serial := extractSerial(key[3:8])
210 report.CompactEventsScanned++
211
212 // Check if sei entry exists
213 seiKey := buildSeiKey(serial)
214 _, err := txn.Get(seiKey)
215 if err == badger.ErrKeyNotFound {
216 // Need to repair - extract event ID from compact event
217 var eventData []byte
218 err = item.Value(func(val []byte) error {
219 eventData = make([]byte, len(val))
220 copy(eventData, val)
221 return nil
222 })
223 if err != nil {
224 continue
225 }
226
227 // Decode compact event to get the event ID
228 eventID, decodeErr := extractEventIDFromCompact(eventData, serial, d)
229 if decodeErr != nil {
230 if progress != nil && report.CompactEventsScanned%10000 == 0 {
231 log.D.F("could not extract event ID for serial %d: %v", serial, decodeErr)
232 }
233 continue
234 }
235
236 toRepair = append(toRepair, repairItem{serial: serial, eventID: eventID})
237 }
238
239 if progress != nil && report.CompactEventsScanned%100000 == 0 {
240 fmt.Fprintf(progress, " Scanned %d compact events, found %d needing sei repair...\n",
241 report.CompactEventsScanned, len(toRepair))
242 }
243 }
244
245 return nil
246 })
247 if err != nil {
248 return err
249 }
250
251 if progress != nil {
252 fmt.Fprintf(progress, " Found %d compact events needing sei repair\n", len(toRepair))
253 }
254
255 // Also count legacy events
256 err = d.View(func(txn *badger.Txn) error {
257 evtPrf := buildPrefix(indexes.EventEnc(nil))
258 it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf, PrefetchValues: false})
259 defer it.Close()
260 for it.Rewind(); it.Valid(); it.Next() {
261 report.LegacyEventsScanned++
262 }
263 return nil
264 })
265 if err != nil {
266 log.W.F("error counting legacy events: %v", err)
267 }
268
269 // Second pass: create missing sei entries
270 if opts.DryRun {
271 report.SeiEntriesCreated = int64(len(toRepair))
272 if progress != nil {
273 fmt.Fprintf(progress, " Would create %d sei entries (dry run)\n", len(toRepair))
274 }
275 return nil
276 }
277
278 // Write in batches
279 batchSize := 1000
280 for i := 0; i < len(toRepair); i += batchSize {
281 end := i + batchSize
282 if end > len(toRepair) {
283 end = len(toRepair)
284 }
285 batch := toRepair[i:end]
286
287 wb := d.DB.NewWriteBatch()
288 for _, item := range batch {
289 seiKey := buildSeiKey(item.serial)
290 if err := wb.Set(seiKey, item.eventID); err != nil {
291 wb.Cancel()
292 return err
293 }
294 }
295
296 if err := wb.Flush(); err != nil {
297 return err
298 }
299
300 report.SeiEntriesCreated += int64(len(batch))
301
302 if progress != nil && i+batchSize < len(toRepair) {
303 fmt.Fprintf(progress, " Created %d/%d sei entries...\n", report.SeiEntriesCreated, len(toRepair))
304 }
305 }
306
307 _ = seiPrf // prevent unused warning
308 return nil
309 }
310
311 // repairOrphanedSei removes sei entries that don't have corresponding events.
312 func (d *D) repairOrphanedSei(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
313 progress := opts.Progress
314 seiPrf := buildPrefix(indexes.SerialEventIdEnc(nil))
315 cmpPrf := buildPrefix(indexes.CompactEventEnc(nil))
316 evtPrf := buildPrefix(indexes.EventEnc(nil))
317
318 var orphanedSerials []uint64
319
320 err := d.View(func(txn *badger.Txn) error {
321 it := txn.NewIterator(badger.IteratorOptions{Prefix: seiPrf, PrefetchValues: false})
322 defer it.Close()
323
324 checked := int64(0)
325 for it.Rewind(); it.Valid(); it.Next() {
326 select {
327 case <-ctx.Done():
328 return ctx.Err()
329 default:
330 }
331
332 key := it.Item().Key()
333 if len(key) < 8 {
334 continue
335 }
336
337 serial := extractSerial(key[3:8])
338
339 // Check if cmp entry exists
340 cmpKey := buildCmpKey(serial)
341 _, err := txn.Get(cmpKey)
342 if err == badger.ErrKeyNotFound {
343 // Also check legacy evt
344 evtKey := buildEvtKey(serial)
345 _, err2 := txn.Get(evtKey)
346 if err2 == badger.ErrKeyNotFound {
347 orphanedSerials = append(orphanedSerials, serial)
348 }
349 }
350
351 checked++
352 if progress != nil && checked%100000 == 0 {
353 fmt.Fprintf(progress, " Checked %d sei entries, found %d orphaned...\n",
354 checked, len(orphanedSerials))
355 }
356 }
357
358 return nil
359 })
360 if err != nil {
361 return err
362 }
363
364 if progress != nil {
365 fmt.Fprintf(progress, " Found %d orphaned sei entries\n", len(orphanedSerials))
366 }
367
368 if opts.DryRun {
369 report.SeiEntriesRemoved = int64(len(orphanedSerials))
370 if progress != nil {
371 fmt.Fprintf(progress, " Would remove %d orphaned sei entries (dry run)\n", len(orphanedSerials))
372 }
373 return nil
374 }
375
376 // Remove orphaned entries in batches
377 batchSize := 1000
378 for i := 0; i < len(orphanedSerials); i += batchSize {
379 end := i + batchSize
380 if end > len(orphanedSerials) {
381 end = len(orphanedSerials)
382 }
383 batch := orphanedSerials[i:end]
384
385 wb := d.DB.NewWriteBatch()
386 for _, serial := range batch {
387 seiKey := buildSeiKey(serial)
388 if err := wb.Delete(seiKey); err != nil {
389 wb.Cancel()
390 return err
391 }
392 }
393
394 if err := wb.Flush(); err != nil {
395 return err
396 }
397
398 report.SeiEntriesRemoved += int64(len(batch))
399 }
400
401 _ = cmpPrf // prevent unused
402 _ = evtPrf
403 return nil
404 }
405
406 // repairPubkeyMappings fixes inconsistent pubkey serial mappings.
407 func (d *D) repairPubkeyMappings(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
408 progress := opts.Progress
409 pksPrf := buildPrefix(indexes.PubkeySerialEnc(nil, nil))
410 spkPrf := buildPrefix(indexes.SerialPubkeyEnc(nil))
411
412 // Count entries
413 var pksCount, spkCount int64
414 err := d.View(func(txn *badger.Txn) error {
415 pksCount = countPrefix(txn, pksPrf)
416 spkCount = countPrefix(txn, spkPrf)
417 return nil
418 })
419 if err != nil {
420 return err
421 }
422
423 if progress != nil {
424 fmt.Fprintf(progress, " pks entries: %d, spk entries: %d\n", pksCount, spkCount)
425 }
426
427 // For now, just report the mismatch
428 // Full repair would require scanning all events and rebuilding mappings
429 diff := pksCount - spkCount
430 if diff < 0 {
431 diff = -diff
432 }
433
434 threshold := pksCount / 100
435 if threshold < 10 {
436 threshold = 10
437 }
438
439 if diff > threshold {
440 report.PubkeyMappingsFixed = diff
441 if progress != nil {
442 fmt.Fprintf(progress, " Found %d pubkey mapping inconsistencies\n", diff)
443 fmt.Fprintln(progress, " Note: Full pubkey mapping repair requires event rescan (not yet implemented)")
444 }
445 } else {
446 if progress != nil {
447 fmt.Fprintln(progress, " Pubkey mappings are consistent")
448 }
449 }
450
451 return nil
452 }
453
454 // extractEventIDFromCompact decodes a compact event and computes its ID.
455 // This is used during repair when the sei entry is missing.
456 // The event ID is computed by hashing the serialized event content.
457 func extractEventIDFromCompact(data []byte, serial uint64, d *D) ([]byte, error) {
458 // Decode the compact event without the ID
459 ev, err := unmarshalCompactForRepair(data, d)
460 if err != nil {
461 return nil, fmt.Errorf("failed to decode compact event: %w", err)
462 }
463
464 // Compute the event ID by hashing the serialized event
465 eventID := ev.GetIDBytes()
466 if eventID == nil || len(eventID) != 32 {
467 return nil, fmt.Errorf("failed to compute event ID")
468 }
469
470 return eventID, nil
471 }
472
473 // unmarshalCompactForRepair decodes a compact event for repair purposes.
474 // Unlike UnmarshalCompactEvent, this doesn't require the event ID upfront.
475 func unmarshalCompactForRepair(data []byte, d *D) (*event.E, error) {
476 r := bytes.NewReader(data)
477 ev := new(event.E)
478
479 // Version byte
480 version, err := r.ReadByte()
481 if err != nil {
482 return nil, err
483 }
484 if version != CompactFormatVersion {
485 return nil, fmt.Errorf("unsupported compact format version: %d", version)
486 }
487
488 // Author pubkey serial (5 bytes) -> full pubkey
489 authorSerial, err := readUint40(r)
490 if err != nil {
491 return nil, err
492 }
493 ev.Pubkey, err = d.getPubkeyBySerial(authorSerial)
494 if err != nil {
495 return nil, fmt.Errorf("failed to get pubkey for serial %d: %w", authorSerial, err)
496 }
497
498 // CreatedAt (varint)
499 ca, err := varint.Decode(r)
500 if err != nil {
501 return nil, err
502 }
503 ev.CreatedAt = int64(ca)
504
505 // Kind (2 bytes big-endian)
506 if err = binary.Read(r, binary.BigEndian, &ev.Kind); err != nil {
507 return nil, err
508 }
509
510 // Tags
511 nTags, err := varint.Decode(r)
512 if err != nil {
513 return nil, err
514 }
515 if nTags > MaxTagsPerEvent {
516 return nil, ErrTooManyTags
517 }
518 if nTags > 0 {
519 ev.Tags = tag.NewSWithCap(int(nTags))
520 resolver := &repairSerialResolver{d: d}
521 for i := uint64(0); i < nTags; i++ {
522 t, err := decodeCompactTag(r, resolver)
523 if err != nil {
524 return nil, err
525 }
526 *ev.Tags = append(*ev.Tags, t)
527 }
528 }
529
530 // Content
531 contentLen, err := varint.Decode(r)
532 if err != nil {
533 return nil, err
534 }
535 if contentLen > MaxContentLength {
536 return nil, ErrContentTooLarge
537 }
538 ev.Content = make([]byte, contentLen)
539 if _, err = io.ReadFull(r, ev.Content); err != nil {
540 return nil, err
541 }
542
543 // Signature (64 bytes)
544 ev.Sig = make([]byte, schnorr.SignatureSize)
545 if _, err = io.ReadFull(r, ev.Sig); err != nil {
546 return nil, err
547 }
548
549 return ev, nil
550 }
551
552 // repairSerialResolver implements SerialResolver for repair operations.
553 type repairSerialResolver struct {
554 d *D
555 }
556
557 func (r *repairSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (uint64, error) {
558 return 0, fmt.Errorf("not supported in repair mode")
559 }
560
561 func (r *repairSerialResolver) GetPubkeyBySerial(serial uint64) ([]byte, error) {
562 return r.d.getPubkeyBySerial(serial)
563 }
564
565 func (r *repairSerialResolver) GetEventSerialById(eventId []byte) (uint64, bool, error) {
566 return 0, false, nil // Not needed for decoding
567 }
568
569 func (r *repairSerialResolver) GetEventIdBySerial(serial uint64) ([]byte, error) {
570 return r.d.getEventIdBySerial(serial)
571 }
572
573 // getEventIdBySerial returns the event ID for a given serial.
574 // This is a helper for compact event decoding.
575 func (d *D) getEventIdBySerial(serial uint64) ([]byte, error) {
576 var eventID []byte
577 err := d.View(func(txn *badger.Txn) error {
578 seiKey := buildSeiKey(serial)
579 item, err := txn.Get(seiKey)
580 if err != nil {
581 return err
582 }
583 return item.Value(func(val []byte) error {
584 eventID = make([]byte, len(val))
585 copy(eventID, val)
586 return nil
587 })
588 })
589 return eventID, err
590 }
591
592 // getPubkeyBySerial returns the pubkey for a given serial.
593 // This is a helper for compact event decoding.
594 func (d *D) getPubkeyBySerial(serial uint64) ([]byte, error) {
595 var pubkey []byte
596 err := d.View(func(txn *badger.Txn) error {
597 ser := new(types.Uint40)
598 ser.Set(serial)
599 spkKey := new(bytes.Buffer)
600 indexes.SerialPubkeyEnc(ser).MarshalWrite(spkKey)
601
602 item, err := txn.Get(spkKey.Bytes())
603 if err != nil {
604 return err
605 }
606 return item.Value(func(val []byte) error {
607 pubkey = make([]byte, len(val))
608 copy(pubkey, val)
609 return nil
610 })
611 })
612 return pubkey, err
613 }
614