repair.go raw

   1  //go:build !(js && wasm)
   2  
   3  package database
   4  
   5  import (
   6  	"bytes"
   7  	"context"
   8  	"encoding/binary"
   9  	"fmt"
  10  	"io"
  11  	"time"
  12  
  13  	"github.com/dgraph-io/badger/v4"
  14  	"next.orly.dev/pkg/nostr/crypto/ec/schnorr"
  15  	"next.orly.dev/pkg/nostr/encoders/event"
  16  	"next.orly.dev/pkg/nostr/encoders/tag"
  17  	"next.orly.dev/pkg/nostr/encoders/varint"
  18  	"next.orly.dev/pkg/lol/log"
  19  	"next.orly.dev/pkg/database/indexes"
  20  	"next.orly.dev/pkg/database/indexes/types"
  21  )
  22  
  23  // RepairReport contains the results of a database repair operation.
  24  type RepairReport struct {
  25  	// Operation metadata
  26  	Started  time.Time
  27  	Duration time.Duration
  28  	DryRun   bool
  29  
  30  	// Events scanned
  31  	CompactEventsScanned int64
  32  	LegacyEventsScanned  int64
  33  
  34  	// Repairs performed
  35  	SeiEntriesCreated    int64 // sei mappings rebuilt from compact events
  36  	SeiEntriesRemoved    int64 // orphaned sei entries removed
  37  	PubkeyMappingsFixed  int64 // pubkey serial mappings fixed
  38  	OrphanedIndexesFixed int64 // orphaned index entries removed
  39  
  40  	// Errors encountered
  41  	Errors []string
  42  }
  43  
  44  // String returns a human-readable repair report.
  45  func (r *RepairReport) String() string {
  46  	var buf bytes.Buffer
  47  	fmt.Fprintln(&buf, "Database Repair Report")
  48  	fmt.Fprintln(&buf, "======================")
  49  	fmt.Fprintf(&buf, "Duration: %v\n", r.Duration)
  50  	if r.DryRun {
  51  		fmt.Fprintln(&buf, "Mode: DRY RUN (no changes made)")
  52  	} else {
  53  		fmt.Fprintln(&buf, "Mode: REPAIR (changes applied)")
  54  	}
  55  	fmt.Fprintln(&buf)
  56  
  57  	fmt.Fprintln(&buf, "Events Scanned:")
  58  	fmt.Fprintf(&buf, "  Compact events:     %d\n", r.CompactEventsScanned)
  59  	fmt.Fprintf(&buf, "  Legacy events:      %d\n\n", r.LegacyEventsScanned)
  60  
  61  	fmt.Fprintln(&buf, "Repairs:")
  62  	fmt.Fprintf(&buf, "  sei entries created:  %d\n", r.SeiEntriesCreated)
  63  	fmt.Fprintf(&buf, "  sei entries removed:  %d\n", r.SeiEntriesRemoved)
  64  	fmt.Fprintf(&buf, "  pubkey mappings fixed: %d\n", r.PubkeyMappingsFixed)
  65  	fmt.Fprintf(&buf, "  orphaned indexes:     %d\n\n", r.OrphanedIndexesFixed)
  66  
  67  	if len(r.Errors) > 0 {
  68  		fmt.Fprintln(&buf, "Errors encountered:")
  69  		for _, e := range r.Errors {
  70  			fmt.Fprintf(&buf, "  - %s\n", e)
  71  		}
  72  	}
  73  
  74  	return buf.String()
  75  }
  76  
  77  // RepairOptions configures the repair operation.
  78  type RepairOptions struct {
  79  	// DryRun if true, only reports what would be fixed without making changes
  80  	DryRun bool
  81  
  82  	// FixMissingSei if true, rebuilds missing sei entries from compact events
  83  	FixMissingSei bool
  84  
  85  	// RemoveOrphanedSei if true, removes sei entries without corresponding events
  86  	RemoveOrphanedSei bool
  87  
  88  	// FixPubkeyMappings if true, fixes inconsistent pubkey serial mappings
  89  	FixPubkeyMappings bool
  90  
  91  	// Progress writer for progress updates
  92  	Progress io.Writer
  93  }
  94  
  95  // DefaultRepairOptions returns the default repair options.
  96  func DefaultRepairOptions() *RepairOptions {
  97  	return &RepairOptions{
  98  		DryRun:            false,
  99  		FixMissingSei:     true,
 100  		RemoveOrphanedSei: true,
 101  		FixPubkeyMappings: true,
 102  	}
 103  }
 104  
 105  // Repair performs database repair operations based on the provided options.
 106  // It fixes integrity issues found by HealthCheck.
 107  func (d *D) Repair(ctx context.Context, opts *RepairOptions) (report *RepairReport, err error) {
 108  	if opts == nil {
 109  		opts = DefaultRepairOptions()
 110  	}
 111  
 112  	report = &RepairReport{
 113  		Started: time.Now(),
 114  		DryRun:  opts.DryRun,
 115  		Errors:  make([]string, 0),
 116  	}
 117  
 118  	progress := opts.Progress
 119  
 120  	if progress != nil {
 121  		if opts.DryRun {
 122  			fmt.Fprintln(progress, "Starting database repair (DRY RUN)...")
 123  		} else {
 124  			fmt.Fprintln(progress, "Starting database repair...")
 125  		}
 126  	}
 127  
 128  	// Phase 1: Fix missing sei entries
 129  	if opts.FixMissingSei {
 130  		if progress != nil {
 131  			fmt.Fprintln(progress, "\nPhase 1: Rebuilding missing sei entries from compact events...")
 132  		}
 133  
 134  		if err = d.repairMissingSei(ctx, opts, report); err != nil {
 135  			report.Errors = append(report.Errors, fmt.Sprintf("sei repair error: %v", err))
 136  			log.E.F("sei repair error: %v", err)
 137  		}
 138  	}
 139  
 140  	// Phase 2: Remove orphaned sei entries
 141  	if opts.RemoveOrphanedSei {
 142  		if progress != nil {
 143  			fmt.Fprintln(progress, "\nPhase 2: Removing orphaned sei entries...")
 144  		}
 145  
 146  		if err = d.repairOrphanedSei(ctx, opts, report); err != nil {
 147  			report.Errors = append(report.Errors, fmt.Sprintf("orphaned sei repair error: %v", err))
 148  			log.E.F("orphaned sei repair error: %v", err)
 149  		}
 150  	}
 151  
 152  	// Phase 3: Fix pubkey serial mappings
 153  	if opts.FixPubkeyMappings {
 154  		if progress != nil {
 155  			fmt.Fprintln(progress, "\nPhase 3: Checking pubkey serial mappings...")
 156  		}
 157  
 158  		if err = d.repairPubkeyMappings(ctx, opts, report); err != nil {
 159  			report.Errors = append(report.Errors, fmt.Sprintf("pubkey mapping repair error: %v", err))
 160  			log.E.F("pubkey mapping repair error: %v", err)
 161  		}
 162  	}
 163  
 164  	report.Duration = time.Since(report.Started)
 165  
 166  	if progress != nil {
 167  		fmt.Fprintf(progress, "\nRepair complete in %v\n", report.Duration)
 168  		fmt.Fprintf(progress, "  sei entries created: %d\n", report.SeiEntriesCreated)
 169  		fmt.Fprintf(progress, "  sei entries removed: %d\n", report.SeiEntriesRemoved)
 170  		fmt.Fprintf(progress, "  pubkey mappings fixed: %d\n", report.PubkeyMappingsFixed)
 171  		if opts.DryRun {
 172  			fmt.Fprintln(progress, "\n(DRY RUN - no changes were made)")
 173  		}
 174  	}
 175  
 176  	return report, nil
 177  }
 178  
 179  // repairMissingSei rebuilds sei entries for compact events that are missing them.
 180  func (d *D) repairMissingSei(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
 181  	progress := opts.Progress
 182  	cmpPrf := buildPrefix(indexes.CompactEventEnc(nil))
 183  	seiPrf := buildPrefix(indexes.SerialEventIdEnc(nil))
 184  
 185  	// First pass: collect all serials that need sei entries
 186  	type repairItem struct {
 187  		serial  uint64
 188  		eventID []byte
 189  	}
 190  	var toRepair []repairItem
 191  
 192  	err := d.View(func(txn *badger.Txn) error {
 193  		it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf})
 194  		defer it.Close()
 195  
 196  		for it.Rewind(); it.Valid(); it.Next() {
 197  			select {
 198  			case <-ctx.Done():
 199  				return ctx.Err()
 200  			default:
 201  			}
 202  
 203  			item := it.Item()
 204  			key := item.Key()
 205  			if len(key) < 8 {
 206  				continue
 207  			}
 208  
 209  			serial := extractSerial(key[3:8])
 210  			report.CompactEventsScanned++
 211  
 212  			// Check if sei entry exists
 213  			seiKey := buildSeiKey(serial)
 214  			_, err := txn.Get(seiKey)
 215  			if err == badger.ErrKeyNotFound {
 216  				// Need to repair - extract event ID from compact event
 217  				var eventData []byte
 218  				err = item.Value(func(val []byte) error {
 219  					eventData = make([]byte, len(val))
 220  					copy(eventData, val)
 221  					return nil
 222  				})
 223  				if err != nil {
 224  					continue
 225  				}
 226  
 227  				// Decode compact event to get the event ID
 228  				eventID, decodeErr := extractEventIDFromCompact(eventData, serial, d)
 229  				if decodeErr != nil {
 230  					if progress != nil && report.CompactEventsScanned%10000 == 0 {
 231  						log.D.F("could not extract event ID for serial %d: %v", serial, decodeErr)
 232  					}
 233  					continue
 234  				}
 235  
 236  				toRepair = append(toRepair, repairItem{serial: serial, eventID: eventID})
 237  			}
 238  
 239  			if progress != nil && report.CompactEventsScanned%100000 == 0 {
 240  				fmt.Fprintf(progress, "  Scanned %d compact events, found %d needing sei repair...\n",
 241  					report.CompactEventsScanned, len(toRepair))
 242  			}
 243  		}
 244  
 245  		return nil
 246  	})
 247  	if err != nil {
 248  		return err
 249  	}
 250  
 251  	if progress != nil {
 252  		fmt.Fprintf(progress, "  Found %d compact events needing sei repair\n", len(toRepair))
 253  	}
 254  
 255  	// Also count legacy events
 256  	err = d.View(func(txn *badger.Txn) error {
 257  		evtPrf := buildPrefix(indexes.EventEnc(nil))
 258  		it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf, PrefetchValues: false})
 259  		defer it.Close()
 260  		for it.Rewind(); it.Valid(); it.Next() {
 261  			report.LegacyEventsScanned++
 262  		}
 263  		return nil
 264  	})
 265  	if err != nil {
 266  		log.W.F("error counting legacy events: %v", err)
 267  	}
 268  
 269  	// Second pass: create missing sei entries
 270  	if opts.DryRun {
 271  		report.SeiEntriesCreated = int64(len(toRepair))
 272  		if progress != nil {
 273  			fmt.Fprintf(progress, "  Would create %d sei entries (dry run)\n", len(toRepair))
 274  		}
 275  		return nil
 276  	}
 277  
 278  	// Write in batches
 279  	batchSize := 1000
 280  	for i := 0; i < len(toRepair); i += batchSize {
 281  		end := i + batchSize
 282  		if end > len(toRepair) {
 283  			end = len(toRepair)
 284  		}
 285  		batch := toRepair[i:end]
 286  
 287  		wb := d.DB.NewWriteBatch()
 288  		for _, item := range batch {
 289  			seiKey := buildSeiKey(item.serial)
 290  			if err := wb.Set(seiKey, item.eventID); err != nil {
 291  				wb.Cancel()
 292  				return err
 293  			}
 294  		}
 295  
 296  		if err := wb.Flush(); err != nil {
 297  			return err
 298  		}
 299  
 300  		report.SeiEntriesCreated += int64(len(batch))
 301  
 302  		if progress != nil && i+batchSize < len(toRepair) {
 303  			fmt.Fprintf(progress, "  Created %d/%d sei entries...\n", report.SeiEntriesCreated, len(toRepair))
 304  		}
 305  	}
 306  
 307  	_ = seiPrf // prevent unused warning
 308  	return nil
 309  }
 310  
 311  // repairOrphanedSei removes sei entries that don't have corresponding events.
 312  func (d *D) repairOrphanedSei(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
 313  	progress := opts.Progress
 314  	seiPrf := buildPrefix(indexes.SerialEventIdEnc(nil))
 315  	cmpPrf := buildPrefix(indexes.CompactEventEnc(nil))
 316  	evtPrf := buildPrefix(indexes.EventEnc(nil))
 317  
 318  	var orphanedSerials []uint64
 319  
 320  	err := d.View(func(txn *badger.Txn) error {
 321  		it := txn.NewIterator(badger.IteratorOptions{Prefix: seiPrf, PrefetchValues: false})
 322  		defer it.Close()
 323  
 324  		checked := int64(0)
 325  		for it.Rewind(); it.Valid(); it.Next() {
 326  			select {
 327  			case <-ctx.Done():
 328  				return ctx.Err()
 329  			default:
 330  			}
 331  
 332  			key := it.Item().Key()
 333  			if len(key) < 8 {
 334  				continue
 335  			}
 336  
 337  			serial := extractSerial(key[3:8])
 338  
 339  			// Check if cmp entry exists
 340  			cmpKey := buildCmpKey(serial)
 341  			_, err := txn.Get(cmpKey)
 342  			if err == badger.ErrKeyNotFound {
 343  				// Also check legacy evt
 344  				evtKey := buildEvtKey(serial)
 345  				_, err2 := txn.Get(evtKey)
 346  				if err2 == badger.ErrKeyNotFound {
 347  					orphanedSerials = append(orphanedSerials, serial)
 348  				}
 349  			}
 350  
 351  			checked++
 352  			if progress != nil && checked%100000 == 0 {
 353  				fmt.Fprintf(progress, "  Checked %d sei entries, found %d orphaned...\n",
 354  					checked, len(orphanedSerials))
 355  			}
 356  		}
 357  
 358  		return nil
 359  	})
 360  	if err != nil {
 361  		return err
 362  	}
 363  
 364  	if progress != nil {
 365  		fmt.Fprintf(progress, "  Found %d orphaned sei entries\n", len(orphanedSerials))
 366  	}
 367  
 368  	if opts.DryRun {
 369  		report.SeiEntriesRemoved = int64(len(orphanedSerials))
 370  		if progress != nil {
 371  			fmt.Fprintf(progress, "  Would remove %d orphaned sei entries (dry run)\n", len(orphanedSerials))
 372  		}
 373  		return nil
 374  	}
 375  
 376  	// Remove orphaned entries in batches
 377  	batchSize := 1000
 378  	for i := 0; i < len(orphanedSerials); i += batchSize {
 379  		end := i + batchSize
 380  		if end > len(orphanedSerials) {
 381  			end = len(orphanedSerials)
 382  		}
 383  		batch := orphanedSerials[i:end]
 384  
 385  		wb := d.DB.NewWriteBatch()
 386  		for _, serial := range batch {
 387  			seiKey := buildSeiKey(serial)
 388  			if err := wb.Delete(seiKey); err != nil {
 389  				wb.Cancel()
 390  				return err
 391  			}
 392  		}
 393  
 394  		if err := wb.Flush(); err != nil {
 395  			return err
 396  		}
 397  
 398  		report.SeiEntriesRemoved += int64(len(batch))
 399  	}
 400  
 401  	_ = cmpPrf // prevent unused
 402  	_ = evtPrf
 403  	return nil
 404  }
 405  
 406  // repairPubkeyMappings fixes inconsistent pubkey serial mappings.
 407  func (d *D) repairPubkeyMappings(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
 408  	progress := opts.Progress
 409  	pksPrf := buildPrefix(indexes.PubkeySerialEnc(nil, nil))
 410  	spkPrf := buildPrefix(indexes.SerialPubkeyEnc(nil))
 411  
 412  	// Count entries
 413  	var pksCount, spkCount int64
 414  	err := d.View(func(txn *badger.Txn) error {
 415  		pksCount = countPrefix(txn, pksPrf)
 416  		spkCount = countPrefix(txn, spkPrf)
 417  		return nil
 418  	})
 419  	if err != nil {
 420  		return err
 421  	}
 422  
 423  	if progress != nil {
 424  		fmt.Fprintf(progress, "  pks entries: %d, spk entries: %d\n", pksCount, spkCount)
 425  	}
 426  
 427  	// For now, just report the mismatch
 428  	// Full repair would require scanning all events and rebuilding mappings
 429  	diff := pksCount - spkCount
 430  	if diff < 0 {
 431  		diff = -diff
 432  	}
 433  
 434  	threshold := pksCount / 100
 435  	if threshold < 10 {
 436  		threshold = 10
 437  	}
 438  
 439  	if diff > threshold {
 440  		report.PubkeyMappingsFixed = diff
 441  		if progress != nil {
 442  			fmt.Fprintf(progress, "  Found %d pubkey mapping inconsistencies\n", diff)
 443  			fmt.Fprintln(progress, "  Note: Full pubkey mapping repair requires event rescan (not yet implemented)")
 444  		}
 445  	} else {
 446  		if progress != nil {
 447  			fmt.Fprintln(progress, "  Pubkey mappings are consistent")
 448  		}
 449  	}
 450  
 451  	return nil
 452  }
 453  
 454  // extractEventIDFromCompact decodes a compact event and computes its ID.
 455  // This is used during repair when the sei entry is missing.
 456  // The event ID is computed by hashing the serialized event content.
 457  func extractEventIDFromCompact(data []byte, serial uint64, d *D) ([]byte, error) {
 458  	// Decode the compact event without the ID
 459  	ev, err := unmarshalCompactForRepair(data, d)
 460  	if err != nil {
 461  		return nil, fmt.Errorf("failed to decode compact event: %w", err)
 462  	}
 463  
 464  	// Compute the event ID by hashing the serialized event
 465  	eventID := ev.GetIDBytes()
 466  	if eventID == nil || len(eventID) != 32 {
 467  		return nil, fmt.Errorf("failed to compute event ID")
 468  	}
 469  
 470  	return eventID, nil
 471  }
 472  
 473  // unmarshalCompactForRepair decodes a compact event for repair purposes.
 474  // Unlike UnmarshalCompactEvent, this doesn't require the event ID upfront.
 475  func unmarshalCompactForRepair(data []byte, d *D) (*event.E, error) {
 476  	r := bytes.NewReader(data)
 477  	ev := new(event.E)
 478  
 479  	// Version byte
 480  	version, err := r.ReadByte()
 481  	if err != nil {
 482  		return nil, err
 483  	}
 484  	if version != CompactFormatVersion {
 485  		return nil, fmt.Errorf("unsupported compact format version: %d", version)
 486  	}
 487  
 488  	// Author pubkey serial (5 bytes) -> full pubkey
 489  	authorSerial, err := readUint40(r)
 490  	if err != nil {
 491  		return nil, err
 492  	}
 493  	ev.Pubkey, err = d.getPubkeyBySerial(authorSerial)
 494  	if err != nil {
 495  		return nil, fmt.Errorf("failed to get pubkey for serial %d: %w", authorSerial, err)
 496  	}
 497  
 498  	// CreatedAt (varint)
 499  	ca, err := varint.Decode(r)
 500  	if err != nil {
 501  		return nil, err
 502  	}
 503  	ev.CreatedAt = int64(ca)
 504  
 505  	// Kind (2 bytes big-endian)
 506  	if err = binary.Read(r, binary.BigEndian, &ev.Kind); err != nil {
 507  		return nil, err
 508  	}
 509  
 510  	// Tags
 511  	nTags, err := varint.Decode(r)
 512  	if err != nil {
 513  		return nil, err
 514  	}
 515  	if nTags > MaxTagsPerEvent {
 516  		return nil, ErrTooManyTags
 517  	}
 518  	if nTags > 0 {
 519  		ev.Tags = tag.NewSWithCap(int(nTags))
 520  		resolver := &repairSerialResolver{d: d}
 521  		for i := uint64(0); i < nTags; i++ {
 522  			t, err := decodeCompactTag(r, resolver)
 523  			if err != nil {
 524  				return nil, err
 525  			}
 526  			*ev.Tags = append(*ev.Tags, t)
 527  		}
 528  	}
 529  
 530  	// Content
 531  	contentLen, err := varint.Decode(r)
 532  	if err != nil {
 533  		return nil, err
 534  	}
 535  	if contentLen > MaxContentLength {
 536  		return nil, ErrContentTooLarge
 537  	}
 538  	ev.Content = make([]byte, contentLen)
 539  	if _, err = io.ReadFull(r, ev.Content); err != nil {
 540  		return nil, err
 541  	}
 542  
 543  	// Signature (64 bytes)
 544  	ev.Sig = make([]byte, schnorr.SignatureSize)
 545  	if _, err = io.ReadFull(r, ev.Sig); err != nil {
 546  		return nil, err
 547  	}
 548  
 549  	return ev, nil
 550  }
 551  
 552  // repairSerialResolver implements SerialResolver for repair operations.
 553  type repairSerialResolver struct {
 554  	d *D
 555  }
 556  
 557  func (r *repairSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (uint64, error) {
 558  	return 0, fmt.Errorf("not supported in repair mode")
 559  }
 560  
 561  func (r *repairSerialResolver) GetPubkeyBySerial(serial uint64) ([]byte, error) {
 562  	return r.d.getPubkeyBySerial(serial)
 563  }
 564  
 565  func (r *repairSerialResolver) GetEventSerialById(eventId []byte) (uint64, bool, error) {
 566  	return 0, false, nil // Not needed for decoding
 567  }
 568  
 569  func (r *repairSerialResolver) GetEventIdBySerial(serial uint64) ([]byte, error) {
 570  	return r.d.getEventIdBySerial(serial)
 571  }
 572  
 573  // getEventIdBySerial returns the event ID for a given serial.
 574  // This is a helper for compact event decoding.
 575  func (d *D) getEventIdBySerial(serial uint64) ([]byte, error) {
 576  	var eventID []byte
 577  	err := d.View(func(txn *badger.Txn) error {
 578  		seiKey := buildSeiKey(serial)
 579  		item, err := txn.Get(seiKey)
 580  		if err != nil {
 581  			return err
 582  		}
 583  		return item.Value(func(val []byte) error {
 584  			eventID = make([]byte, len(val))
 585  			copy(eventID, val)
 586  			return nil
 587  		})
 588  	})
 589  	return eventID, err
 590  }
 591  
 592  // getPubkeyBySerial returns the pubkey for a given serial.
 593  // This is a helper for compact event decoding.
 594  func (d *D) getPubkeyBySerial(serial uint64) ([]byte, error) {
 595  	var pubkey []byte
 596  	err := d.View(func(txn *badger.Txn) error {
 597  		ser := new(types.Uint40)
 598  		ser.Set(serial)
 599  		spkKey := new(bytes.Buffer)
 600  		indexes.SerialPubkeyEnc(ser).MarshalWrite(spkKey)
 601  
 602  		item, err := txn.Get(spkKey.Bytes())
 603  		if err != nil {
 604  			return err
 605  		}
 606  		return item.Value(func(val []byte) error {
 607  			pubkey = make([]byte, len(val))
 608  			copy(pubkey, val)
 609  			return nil
 610  		})
 611  	})
 612  	return pubkey, err
 613  }
 614