migrations.go raw

   1  package neo4j
   2  
   3  import (
   4  	"context"
   5  	"encoding/hex"
   6  	"fmt"
   7  
   8  	"next.orly.dev/pkg/nostr/encoders/tag"
   9  	"next.orly.dev/pkg/database"
  10  )
  11  
  12  // Migration represents a database migration with a version identifier
  13  type Migration struct {
  14  	Version     string
  15  	Description string
  16  	Migrate     func(ctx context.Context, n *N) error
  17  }
  18  
  19  // migrations is the ordered list of database migrations
  20  // Migrations are applied in order and tracked via Marker nodes
  21  var migrations = []Migration{
  22  	{
  23  		Version:     "v1",
  24  		Description: "Merge Author nodes into NostrUser nodes",
  25  		Migrate:     migrateAuthorToNostrUser,
  26  	},
  27  	{
  28  		Version:     "v2",
  29  		Description: "Clean up binary-encoded pubkeys and event IDs to lowercase hex",
  30  		Migrate:     migrateBinaryToHex,
  31  	},
  32  	{
  33  		Version:     "v3",
  34  		Description: "Convert direct REFERENCES/MENTIONS relationships to Tag-based model",
  35  		Migrate:     migrateToTagBasedReferences,
  36  	},
  37  	{
  38  		Version:     "v4",
  39  		Description: "Deduplicate REPORTS relationships by (reporter, reported, report_type)",
  40  		Migrate:     migrateDeduplicateReports,
  41  	},
  42  	{
  43  		Version:     "v5",
  44  		Description: "Add naddr property to addressable Event nodes (kinds 30000-39999)",
  45  		Migrate:     migrateAddNaddr,
  46  	},
  47  	{
  48  		Version:     "v6",
  49  		Description: "Backfill Word nodes and HAS_WORD relationships for NIP-50 search",
  50  		Migrate:     migrateBackfillWords,
  51  	},
  52  }
  53  
  54  // RunMigrations executes all pending migrations
  55  func (n *N) RunMigrations() {
  56  	ctx := context.Background()
  57  
  58  	for _, migration := range migrations {
  59  		// Check if migration has already been applied
  60  		if n.migrationApplied(ctx, migration.Version) {
  61  			n.Logger.Infof("migration %s already applied, skipping", migration.Version)
  62  			continue
  63  		}
  64  
  65  		n.Logger.Infof("applying migration %s: %s", migration.Version, migration.Description)
  66  
  67  		if err := migration.Migrate(ctx, n); err != nil {
  68  			n.Logger.Errorf("migration %s failed: %v", migration.Version, err)
  69  			// Continue to next migration - don't fail startup
  70  			continue
  71  		}
  72  
  73  		// Mark migration as complete
  74  		if err := n.markMigrationComplete(ctx, migration.Version, migration.Description); err != nil {
  75  			n.Logger.Warningf("failed to mark migration %s as complete: %v", migration.Version, err)
  76  		}
  77  
  78  		n.Logger.Infof("migration %s completed successfully", migration.Version)
  79  	}
  80  }
  81  
  82  // migrationApplied checks if a migration has already been applied
  83  func (n *N) migrationApplied(ctx context.Context, version string) bool {
  84  	cypher := `
  85  		MATCH (m:Migration {version: $version})
  86  		RETURN m.version
  87  	`
  88  	result, err := n.ExecuteRead(ctx, cypher, map[string]any{"version": version})
  89  	if err != nil {
  90  		return false
  91  	}
  92  	return result.Next(ctx)
  93  }
  94  
  95  // markMigrationComplete marks a migration as applied
  96  func (n *N) markMigrationComplete(ctx context.Context, version, description string) error {
  97  	cypher := `
  98  		CREATE (m:Migration {
  99  			version: $version,
 100  			description: $description,
 101  			applied_at: timestamp()
 102  		})
 103  	`
 104  	_, err := n.ExecuteWrite(ctx, cypher, map[string]any{
 105  		"version":     version,
 106  		"description": description,
 107  	})
 108  	return err
 109  }
 110  
 111  // migrateAuthorToNostrUser migrates Author nodes to NostrUser nodes
 112  // This consolidates the separate Author (NIP-01) and NostrUser (WoT) labels
 113  // into a unified NostrUser label for the social graph
 114  func migrateAuthorToNostrUser(ctx context.Context, n *N) error {
 115  	// Step 1: Check if there are any Author nodes to migrate
 116  	countCypher := `MATCH (a:Author) RETURN count(a) AS count`
 117  	countResult, err := n.ExecuteRead(ctx, countCypher, nil)
 118  	if err != nil {
 119  		return fmt.Errorf("failed to count Author nodes: %w", err)
 120  	}
 121  
 122  	var authorCount int64
 123  	if countResult.Next(ctx) {
 124  		record := countResult.Record()
 125  		if count, ok := record.Values[0].(int64); ok {
 126  			authorCount = count
 127  		}
 128  	}
 129  
 130  	if authorCount == 0 {
 131  		n.Logger.Infof("no Author nodes to migrate")
 132  		return nil
 133  	}
 134  
 135  	n.Logger.Infof("migrating %d Author nodes to NostrUser", authorCount)
 136  
 137  	// Step 2: For each Author node, merge into NostrUser with same pubkey
 138  	// This uses MERGE to either match existing NostrUser or create new one
 139  	// Then copies any relationships from Author to NostrUser
 140  	mergeCypher := `
 141  		// Match all Author nodes
 142  		MATCH (a:Author)
 143  
 144  		// For each Author, merge into NostrUser (creates if doesn't exist)
 145  		MERGE (u:NostrUser {pubkey: a.pubkey})
 146  		ON CREATE SET u.created_at = timestamp(), u.migrated_from_author = true
 147  
 148  		// Return count for logging
 149  		RETURN count(DISTINCT a) AS migrated
 150  	`
 151  
 152  	result, err := n.ExecuteWrite(ctx, mergeCypher, nil)
 153  	if err != nil {
 154  		return fmt.Errorf("failed to merge Author nodes to NostrUser: %w", err)
 155  	}
 156  
 157  	// Log result (result consumption happens within the session)
 158  	_ = result
 159  
 160  	// Step 3: Migrate AUTHORED_BY relationships from Author to NostrUser
 161  	// Events should now point to NostrUser instead of Author
 162  	relationshipCypher := `
 163  		// Find events linked to Author via AUTHORED_BY
 164  		MATCH (e:Event)-[r:AUTHORED_BY]->(a:Author)
 165  
 166  		// Get or create the corresponding NostrUser
 167  		MATCH (u:NostrUser {pubkey: a.pubkey})
 168  
 169  		// Create new relationship to NostrUser if it doesn't exist
 170  		MERGE (e)-[:AUTHORED_BY]->(u)
 171  
 172  		// Delete old relationship to Author
 173  		DELETE r
 174  
 175  		RETURN count(r) AS migrated_relationships
 176  	`
 177  
 178  	_, err = n.ExecuteWrite(ctx, relationshipCypher, nil)
 179  	if err != nil {
 180  		return fmt.Errorf("failed to migrate AUTHORED_BY relationships: %w", err)
 181  	}
 182  
 183  	// Step 4: Migrate MENTIONS relationships from Author to NostrUser
 184  	mentionsCypher := `
 185  		// Find events with MENTIONS to Author
 186  		MATCH (e:Event)-[r:MENTIONS]->(a:Author)
 187  
 188  		// Get or create the corresponding NostrUser
 189  		MATCH (u:NostrUser {pubkey: a.pubkey})
 190  
 191  		// Create new relationship to NostrUser if it doesn't exist
 192  		MERGE (e)-[:MENTIONS]->(u)
 193  
 194  		// Delete old relationship to Author
 195  		DELETE r
 196  
 197  		RETURN count(r) AS migrated_mentions
 198  	`
 199  
 200  	_, err = n.ExecuteWrite(ctx, mentionsCypher, nil)
 201  	if err != nil {
 202  		return fmt.Errorf("failed to migrate MENTIONS relationships: %w", err)
 203  	}
 204  
 205  	// Step 5: Delete orphaned Author nodes (no longer needed)
 206  	deleteCypher := `
 207  		// Find Author nodes with no remaining relationships
 208  		MATCH (a:Author)
 209  		WHERE NOT (a)<-[:AUTHORED_BY]-() AND NOT (a)<-[:MENTIONS]-()
 210  		DETACH DELETE a
 211  		RETURN count(a) AS deleted
 212  	`
 213  
 214  	_, err = n.ExecuteWrite(ctx, deleteCypher, nil)
 215  	if err != nil {
 216  		return fmt.Errorf("failed to delete orphaned Author nodes: %w", err)
 217  	}
 218  
 219  	// Step 6: Drop the old Author constraint if it exists
 220  	dropConstraintCypher := `DROP CONSTRAINT author_pubkey_unique IF EXISTS`
 221  	_, _ = n.ExecuteWrite(ctx, dropConstraintCypher, nil)
 222  	// Ignore error as constraint may not exist
 223  
 224  	n.Logger.Infof("completed Author to NostrUser migration")
 225  	return nil
 226  }
 227  
 228  // migrateBinaryToHex cleans up any binary-encoded pubkeys and event IDs
 229  // The nostr library stores e/p tag values in binary format (33 bytes with null terminator),
 230  // but Neo4j should store them as lowercase hex strings for consistent querying.
 231  // This migration:
 232  // 1. Finds NostrUser nodes with invalid (non-hex) pubkeys and deletes them
 233  // 2. Finds Event nodes with invalid pubkeys/IDs and deletes them
 234  // 3. Finds Tag nodes (type 'e' or 'p') with invalid values and deletes them
 235  // 4. Cleans up MENTIONS relationships pointing to invalid NostrUser nodes
 236  func migrateBinaryToHex(ctx context.Context, n *N) error {
 237  	// Step 1: Count problematic nodes before cleanup
 238  	n.Logger.Infof("scanning for binary-encoded values in Neo4j...")
 239  
 240  	// Check for NostrUser nodes with invalid pubkeys (not 64 char hex)
 241  	// A valid hex pubkey is exactly 64 lowercase hex characters
 242  	countInvalidUsersCypher := `
 243  		MATCH (u:NostrUser)
 244  		WHERE size(u.pubkey) <> 64
 245  		   OR NOT u.pubkey =~ '^[0-9a-f]{64}$'
 246  		RETURN count(u) AS count
 247  	`
 248  	result, err := n.ExecuteRead(ctx, countInvalidUsersCypher, nil)
 249  	if err != nil {
 250  		return fmt.Errorf("failed to count invalid NostrUser nodes: %w", err)
 251  	}
 252  
 253  	var invalidUserCount int64
 254  	if result.Next(ctx) {
 255  		if count, ok := result.Record().Values[0].(int64); ok {
 256  			invalidUserCount = count
 257  		}
 258  	}
 259  	n.Logger.Infof("found %d NostrUser nodes with invalid pubkeys", invalidUserCount)
 260  
 261  	// Check for Event nodes with invalid pubkeys or IDs
 262  	countInvalidEventsCypher := `
 263  		MATCH (e:Event)
 264  		WHERE (size(e.pubkey) <> 64 OR NOT e.pubkey =~ '^[0-9a-f]{64}$')
 265  		   OR (size(e.id) <> 64 OR NOT e.id =~ '^[0-9a-f]{64}$')
 266  		RETURN count(e) AS count
 267  	`
 268  	result, err = n.ExecuteRead(ctx, countInvalidEventsCypher, nil)
 269  	if err != nil {
 270  		return fmt.Errorf("failed to count invalid Event nodes: %w", err)
 271  	}
 272  
 273  	var invalidEventCount int64
 274  	if result.Next(ctx) {
 275  		if count, ok := result.Record().Values[0].(int64); ok {
 276  			invalidEventCount = count
 277  		}
 278  	}
 279  	n.Logger.Infof("found %d Event nodes with invalid pubkeys or IDs", invalidEventCount)
 280  
 281  	// Check for Tag nodes (e/p type) with invalid values
 282  	countInvalidTagsCypher := `
 283  		MATCH (t:Tag)
 284  		WHERE t.type IN ['e', 'p']
 285  		  AND (size(t.value) <> 64 OR NOT t.value =~ '^[0-9a-f]{64}$')
 286  		RETURN count(t) AS count
 287  	`
 288  	result, err = n.ExecuteRead(ctx, countInvalidTagsCypher, nil)
 289  	if err != nil {
 290  		return fmt.Errorf("failed to count invalid Tag nodes: %w", err)
 291  	}
 292  
 293  	var invalidTagCount int64
 294  	if result.Next(ctx) {
 295  		if count, ok := result.Record().Values[0].(int64); ok {
 296  			invalidTagCount = count
 297  		}
 298  	}
 299  	n.Logger.Infof("found %d Tag nodes (e/p type) with invalid values", invalidTagCount)
 300  
 301  	// If nothing to clean up, we're done
 302  	if invalidUserCount == 0 && invalidEventCount == 0 && invalidTagCount == 0 {
 303  		n.Logger.Infof("no binary-encoded values found, migration complete")
 304  		return nil
 305  	}
 306  
 307  	// Step 2: Delete invalid NostrUser nodes and their relationships
 308  	if invalidUserCount > 0 {
 309  		n.Logger.Infof("deleting %d invalid NostrUser nodes...", invalidUserCount)
 310  		deleteInvalidUsersCypher := `
 311  			MATCH (u:NostrUser)
 312  			WHERE size(u.pubkey) <> 64
 313  			   OR NOT u.pubkey =~ '^[0-9a-f]{64}$'
 314  			DETACH DELETE u
 315  		`
 316  		_, err = n.ExecuteWrite(ctx, deleteInvalidUsersCypher, nil)
 317  		if err != nil {
 318  			return fmt.Errorf("failed to delete invalid NostrUser nodes: %w", err)
 319  		}
 320  		n.Logger.Infof("deleted %d invalid NostrUser nodes", invalidUserCount)
 321  	}
 322  
 323  	// Step 3: Delete invalid Event nodes and their relationships
 324  	if invalidEventCount > 0 {
 325  		n.Logger.Infof("deleting %d invalid Event nodes...", invalidEventCount)
 326  		deleteInvalidEventsCypher := `
 327  			MATCH (e:Event)
 328  			WHERE (size(e.pubkey) <> 64 OR NOT e.pubkey =~ '^[0-9a-f]{64}$')
 329  			   OR (size(e.id) <> 64 OR NOT e.id =~ '^[0-9a-f]{64}$')
 330  			DETACH DELETE e
 331  		`
 332  		_, err = n.ExecuteWrite(ctx, deleteInvalidEventsCypher, nil)
 333  		if err != nil {
 334  			return fmt.Errorf("failed to delete invalid Event nodes: %w", err)
 335  		}
 336  		n.Logger.Infof("deleted %d invalid Event nodes", invalidEventCount)
 337  	}
 338  
 339  	// Step 4: Delete invalid Tag nodes (e/p type) and their relationships
 340  	if invalidTagCount > 0 {
 341  		n.Logger.Infof("deleting %d invalid Tag nodes...", invalidTagCount)
 342  		deleteInvalidTagsCypher := `
 343  			MATCH (t:Tag)
 344  			WHERE t.type IN ['e', 'p']
 345  			  AND (size(t.value) <> 64 OR NOT t.value =~ '^[0-9a-f]{64}$')
 346  			DETACH DELETE t
 347  		`
 348  		_, err = n.ExecuteWrite(ctx, deleteInvalidTagsCypher, nil)
 349  		if err != nil {
 350  			return fmt.Errorf("failed to delete invalid Tag nodes: %w", err)
 351  		}
 352  		n.Logger.Infof("deleted %d invalid Tag nodes", invalidTagCount)
 353  	}
 354  
 355  	// Step 5: Clean up any orphaned MENTIONS/REFERENCES relationships
 356  	// These would be relationships pointing to nodes we just deleted
 357  	cleanupOrphanedCypher := `
 358  		// Clean up any ProcessedSocialEvent nodes with invalid pubkeys
 359  		MATCH (p:ProcessedSocialEvent)
 360  		WHERE size(p.pubkey) <> 64
 361  		   OR NOT p.pubkey =~ '^[0-9a-f]{64}$'
 362  		DETACH DELETE p
 363  	`
 364  	_, _ = n.ExecuteWrite(ctx, cleanupOrphanedCypher, nil)
 365  	// Ignore errors - best effort cleanup
 366  
 367  	n.Logger.Infof("binary-to-hex migration completed successfully")
 368  	return nil
 369  }
 370  
 371  // migrateToTagBasedReferences converts direct REFERENCES and MENTIONS relationships
 372  // to the new Tag-based model where:
 373  // - Event-[:REFERENCES]->Event becomes Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->Event
 374  // - Event-[:MENTIONS]->NostrUser becomes Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->NostrUser
 375  //
 376  // This enables unified tag querying via #e and #p filters while maintaining graph traversal.
 377  func migrateToTagBasedReferences(ctx context.Context, n *N) error {
 378  	// Step 1: Count existing direct REFERENCES relationships (Event->Event)
 379  	countRefCypher := `
 380  		MATCH (source:Event)-[r:REFERENCES]->(target:Event)
 381  		RETURN count(r) AS count
 382  	`
 383  	result, err := n.ExecuteRead(ctx, countRefCypher, nil)
 384  	if err != nil {
 385  		return fmt.Errorf("failed to count REFERENCES relationships: %w", err)
 386  	}
 387  
 388  	var refCount int64
 389  	if result.Next(ctx) {
 390  		if count, ok := result.Record().Values[0].(int64); ok {
 391  			refCount = count
 392  		}
 393  	}
 394  	n.Logger.Infof("found %d direct Event-[:REFERENCES]->Event relationships to migrate", refCount)
 395  
 396  	// Step 2: Count existing direct MENTIONS relationships (Event->NostrUser)
 397  	countMentionsCypher := `
 398  		MATCH (source:Event)-[r:MENTIONS]->(target:NostrUser)
 399  		RETURN count(r) AS count
 400  	`
 401  	result, err = n.ExecuteRead(ctx, countMentionsCypher, nil)
 402  	if err != nil {
 403  		return fmt.Errorf("failed to count MENTIONS relationships: %w", err)
 404  	}
 405  
 406  	var mentionsCount int64
 407  	if result.Next(ctx) {
 408  		if count, ok := result.Record().Values[0].(int64); ok {
 409  			mentionsCount = count
 410  		}
 411  	}
 412  	n.Logger.Infof("found %d direct Event-[:MENTIONS]->NostrUser relationships to migrate", mentionsCount)
 413  
 414  	// If nothing to migrate, we're done
 415  	if refCount == 0 && mentionsCount == 0 {
 416  		n.Logger.Infof("no direct relationships to migrate, migration complete")
 417  		return nil
 418  	}
 419  
 420  	// Step 3: Migrate REFERENCES relationships to Tag-based model
 421  	// Process in batches to avoid memory issues with large datasets
 422  	if refCount > 0 {
 423  		n.Logger.Infof("migrating %d REFERENCES relationships to Tag-based model...", refCount)
 424  
 425  		// This query:
 426  		// 1. Finds Event->Event REFERENCES relationships
 427  		// 2. Creates/merges Tag node with type='e' and value=target event ID
 428  		// 3. Creates TAGGED_WITH from source Event to Tag
 429  		// 4. Creates REFERENCES from Tag to target Event
 430  		// 5. Deletes the old direct REFERENCES relationship
 431  		migrateRefCypher := `
 432  			MATCH (source:Event)-[r:REFERENCES]->(target:Event)
 433  			WITH source, r, target LIMIT 1000
 434  			MERGE (t:Tag {type: 'e', value: target.id})
 435  			CREATE (source)-[:TAGGED_WITH]->(t)
 436  			MERGE (t)-[:REFERENCES]->(target)
 437  			DELETE r
 438  			RETURN count(r) AS migrated
 439  		`
 440  
 441  		// Run migration in batches until no more relationships exist
 442  		totalMigrated := int64(0)
 443  		for {
 444  			result, err := n.ExecuteWrite(ctx, migrateRefCypher, nil)
 445  			if err != nil {
 446  				return fmt.Errorf("failed to migrate REFERENCES batch: %w", err)
 447  			}
 448  
 449  			var batchMigrated int64
 450  			if result.Next(ctx) {
 451  				if count, ok := result.Record().Values[0].(int64); ok {
 452  					batchMigrated = count
 453  				}
 454  			}
 455  
 456  			if batchMigrated == 0 {
 457  				break
 458  			}
 459  			totalMigrated += batchMigrated
 460  			n.Logger.Infof("migrated %d REFERENCES relationships (total: %d)", batchMigrated, totalMigrated)
 461  		}
 462  
 463  		n.Logger.Infof("completed migrating %d REFERENCES relationships", totalMigrated)
 464  	}
 465  
 466  	// Step 4: Migrate MENTIONS relationships to Tag-based model
 467  	if mentionsCount > 0 {
 468  		n.Logger.Infof("migrating %d MENTIONS relationships to Tag-based model...", mentionsCount)
 469  
 470  		// This query:
 471  		// 1. Finds Event->NostrUser MENTIONS relationships
 472  		// 2. Creates/merges Tag node with type='p' and value=target pubkey
 473  		// 3. Creates TAGGED_WITH from source Event to Tag
 474  		// 4. Creates REFERENCES from Tag to target NostrUser
 475  		// 5. Deletes the old direct MENTIONS relationship
 476  		migrateMentionsCypher := `
 477  			MATCH (source:Event)-[r:MENTIONS]->(target:NostrUser)
 478  			WITH source, r, target LIMIT 1000
 479  			MERGE (t:Tag {type: 'p', value: target.pubkey})
 480  			CREATE (source)-[:TAGGED_WITH]->(t)
 481  			MERGE (t)-[:REFERENCES]->(target)
 482  			DELETE r
 483  			RETURN count(r) AS migrated
 484  		`
 485  
 486  		// Run migration in batches until no more relationships exist
 487  		totalMigrated := int64(0)
 488  		for {
 489  			result, err := n.ExecuteWrite(ctx, migrateMentionsCypher, nil)
 490  			if err != nil {
 491  				return fmt.Errorf("failed to migrate MENTIONS batch: %w", err)
 492  			}
 493  
 494  			var batchMigrated int64
 495  			if result.Next(ctx) {
 496  				if count, ok := result.Record().Values[0].(int64); ok {
 497  					batchMigrated = count
 498  				}
 499  			}
 500  
 501  			if batchMigrated == 0 {
 502  				break
 503  			}
 504  			totalMigrated += batchMigrated
 505  			n.Logger.Infof("migrated %d MENTIONS relationships (total: %d)", batchMigrated, totalMigrated)
 506  		}
 507  
 508  		n.Logger.Infof("completed migrating %d MENTIONS relationships", totalMigrated)
 509  	}
 510  
 511  	n.Logger.Infof("Tag-based references migration completed successfully")
 512  	return nil
 513  }
 514  
 515  // migrateDeduplicateReports removes duplicate REPORTS relationships
 516  // Prior to this migration, processReport() used CREATE which allowed multiple
 517  // REPORTS relationships with the same report_type between the same two users.
 518  // This migration keeps only the most recent report (by created_at) for each
 519  // (reporter, reported, report_type) combination.
 520  func migrateDeduplicateReports(ctx context.Context, n *N) error {
 521  	// Step 1: Count duplicate REPORTS relationships
 522  	// Duplicates are defined as multiple REPORTS with the same (reporter, reported, report_type)
 523  	countDuplicatesCypher := `
 524  		MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser)
 525  		WITH reporter, reported, r.report_type AS type, collect(r) AS rels
 526  		WHERE size(rels) > 1
 527  		RETURN sum(size(rels) - 1) AS duplicate_count
 528  	`
 529  	result, err := n.ExecuteRead(ctx, countDuplicatesCypher, nil)
 530  	if err != nil {
 531  		return fmt.Errorf("failed to count duplicate REPORTS: %w", err)
 532  	}
 533  
 534  	var duplicateCount int64
 535  	if result.Next(ctx) {
 536  		if count, ok := result.Record().Values[0].(int64); ok {
 537  			duplicateCount = count
 538  		}
 539  	}
 540  
 541  	if duplicateCount == 0 {
 542  		n.Logger.Infof("no duplicate REPORTS relationships found, migration complete")
 543  		return nil
 544  	}
 545  
 546  	n.Logger.Infof("found %d duplicate REPORTS relationships to remove", duplicateCount)
 547  
 548  	// Step 2: Delete duplicate REPORTS, keeping the one with the highest created_at
 549  	// This query:
 550  	// 1. Groups REPORTS by (reporter, reported, report_type)
 551  	// 2. Finds the maximum created_at for each group
 552  	// 3. Deletes all relationships in the group except the newest one
 553  	deleteDuplicatesCypher := `
 554  		MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser)
 555  		WITH reporter, reported, r.report_type AS type,
 556  		     collect(r) AS rels, max(r.created_at) AS maxCreatedAt
 557  		WHERE size(rels) > 1
 558  		UNWIND rels AS rel
 559  		WITH rel, maxCreatedAt
 560  		WHERE rel.created_at < maxCreatedAt
 561  		DELETE rel
 562  		RETURN count(*) AS deleted
 563  	`
 564  
 565  	writeResult, err := n.ExecuteWrite(ctx, deleteDuplicatesCypher, nil)
 566  	if err != nil {
 567  		return fmt.Errorf("failed to delete duplicate REPORTS: %w", err)
 568  	}
 569  
 570  	var deletedCount int64
 571  	if writeResult.Next(ctx) {
 572  		if count, ok := writeResult.Record().Values[0].(int64); ok {
 573  			deletedCount = count
 574  		}
 575  	}
 576  
 577  	n.Logger.Infof("deleted %d duplicate REPORTS relationships", deletedCount)
 578  
 579  	// Step 3: Mark superseded ProcessedSocialEvent nodes for deleted reports
 580  	// Find ProcessedSocialEvent nodes (kind 1984) whose event IDs are no longer
 581  	// referenced by any REPORTS relationship's created_by_event
 582  	markSupersededCypher := `
 583  		MATCH (evt:ProcessedSocialEvent {event_kind: 1984})
 584  		WHERE evt.superseded_by IS NULL
 585  		  AND NOT EXISTS {
 586  		      MATCH ()-[r:REPORTS]->()
 587  		      WHERE r.created_by_event = evt.event_id
 588  		  }
 589  		SET evt.superseded_by = 'migration_v4_dedupe'
 590  		RETURN count(evt) AS superseded
 591  	`
 592  
 593  	markResult, err := n.ExecuteWrite(ctx, markSupersededCypher, nil)
 594  	if err != nil {
 595  		// Non-fatal - just log warning
 596  		n.Logger.Warningf("failed to mark superseded ProcessedSocialEvent nodes: %v", err)
 597  	} else {
 598  		var supersededCount int64
 599  		if markResult.Next(ctx) {
 600  			if count, ok := markResult.Record().Values[0].(int64); ok {
 601  				supersededCount = count
 602  			}
 603  		}
 604  		if supersededCount > 0 {
 605  			n.Logger.Infof("marked %d ProcessedSocialEvent nodes as superseded", supersededCount)
 606  		}
 607  	}
 608  
 609  	n.Logger.Infof("REPORTS deduplication migration completed successfully")
 610  	return nil
 611  }
 612  
 613  // migrateAddNaddr adds the naddr property to existing addressable Event nodes (kinds 30000-39999).
 614  // The naddr format is: pubkey:kind:dtag (colon-delimited coordinate)
 615  // This enables direct lookups and uniqueness constraints for parameterized replaceable events.
 616  func migrateAddNaddr(ctx context.Context, n *N) error {
 617  	// Step 1: Count addressable events without naddr
 618  	countCypher := `
 619  		MATCH (e:Event)
 620  		WHERE e.kind >= 30000 AND e.kind < 40000
 621  		  AND e.naddr IS NULL
 622  		RETURN count(e) AS count
 623  	`
 624  	result, err := n.ExecuteRead(ctx, countCypher, nil)
 625  	if err != nil {
 626  		return fmt.Errorf("failed to count addressable events: %w", err)
 627  	}
 628  
 629  	var eventCount int64
 630  	if result.Next(ctx) {
 631  		if count, ok := result.Record().Values[0].(int64); ok {
 632  			eventCount = count
 633  		}
 634  	}
 635  
 636  	if eventCount == 0 {
 637  		n.Logger.Infof("no addressable events without naddr found, migration complete")
 638  		return nil
 639  	}
 640  
 641  	n.Logger.Infof("found %d addressable events to update with naddr", eventCount)
 642  
 643  	// Step 2: Update events in batches
 644  	// For each event, compute naddr from pubkey + kind + d-tag
 645  	// The d-tag value is obtained from the Tag node via TAGGED_WITH relationship
 646  	updateCypher := `
 647  		MATCH (e:Event)
 648  		WHERE e.kind >= 30000 AND e.kind < 40000
 649  		  AND e.naddr IS NULL
 650  		WITH e LIMIT 1000
 651  
 652  		// Get d-tag value via TAGGED_WITH relationship
 653  		OPTIONAL MATCH (e)-[:TAGGED_WITH]->(t:Tag {type: 'd'})
 654  		WITH e, COALESCE(t.value, '') AS dValue
 655  
 656  		// Build naddr: pubkey:kind:dValue
 657  		SET e.naddr = e.pubkey + ':' + toString(e.kind) + ':' + dValue
 658  
 659  		RETURN count(e) AS updated
 660  	`
 661  
 662  	// Run migration in batches until no more events to update
 663  	totalUpdated := int64(0)
 664  	for {
 665  		writeResult, err := n.ExecuteWrite(ctx, updateCypher, nil)
 666  		if err != nil {
 667  			return fmt.Errorf("failed to update addressable events batch: %w", err)
 668  		}
 669  
 670  		var batchUpdated int64
 671  		if writeResult.Next(ctx) {
 672  			if count, ok := writeResult.Record().Values[0].(int64); ok {
 673  				batchUpdated = count
 674  			}
 675  		}
 676  
 677  		if batchUpdated == 0 {
 678  			break
 679  		}
 680  		totalUpdated += batchUpdated
 681  		n.Logger.Infof("updated %d addressable events with naddr (total: %d)", batchUpdated, totalUpdated)
 682  	}
 683  
 684  	n.Logger.Infof("naddr migration completed: updated %d addressable events", totalUpdated)
 685  	return nil
 686  }
 687  
 688  // migrateBackfillWords creates Word nodes and HAS_WORD relationships for all
 689  // existing Event nodes that don't have them yet. This enables NIP-50 word search
 690  // for events saved before word indexing was added to SaveEvent.
 691  func migrateBackfillWords(ctx context.Context, n *N) error {
 692  	// Count events without word index
 693  	countCypher := `
 694  		MATCH (e:Event)
 695  		WHERE NOT (e)-[:HAS_WORD]->(:Word)
 696  		RETURN count(e) AS count
 697  	`
 698  	result, err := n.ExecuteRead(ctx, countCypher, nil)
 699  	if err != nil {
 700  		return fmt.Errorf("failed to count events: %w", err)
 701  	}
 702  
 703  	var eventCount int64
 704  	if result.Next(ctx) {
 705  		if count, ok := result.Record().Values[0].(int64); ok {
 706  			eventCount = count
 707  		}
 708  	}
 709  
 710  	if eventCount == 0 {
 711  		n.Logger.Infof("no events to backfill word index for")
 712  		return nil
 713  	}
 714  
 715  	n.Logger.Infof("backfilling word index for %d events...", eventCount)
 716  
 717  	// Process events in batches: fetch content+tags, tokenize in Go, write Word nodes back
 718  	const fetchBatchSize = 500
 719  	totalProcessed := int64(0)
 720  
 721  	for {
 722  		fetchCypher := `
 723  			MATCH (e:Event)
 724  			WHERE NOT (e)-[:HAS_WORD]->(:Word)
 725  			RETURN e.id AS id, e.content AS content, e.tags AS tags
 726  			LIMIT $batchSize
 727  		`
 728  		batchResult, err := n.ExecuteRead(ctx, fetchCypher, map[string]any{
 729  			"batchSize": fetchBatchSize,
 730  		})
 731  		if err != nil {
 732  			return fmt.Errorf("failed to fetch events batch: %w", err)
 733  		}
 734  
 735  		var batchCount int64
 736  		for batchResult.Next(ctx) {
 737  			record := batchResult.Record()
 738  			if record == nil {
 739  				continue
 740  			}
 741  
 742  			idRaw, _ := record.Get("id")
 743  			contentRaw, _ := record.Get("content")
 744  			tagsRaw, _ := record.Get("tags")
 745  
 746  			eventID, _ := idRaw.(string)
 747  			content, _ := contentRaw.(string)
 748  			tagsStr, _ := tagsRaw.(string)
 749  
 750  			// Collect unique word tokens from content and tags
 751  			seen := make(map[string]struct{})
 752  			var words []database.WordToken
 753  
 754  			if len(content) > 0 {
 755  				for _, wt := range database.TokenWords([]byte(content)) {
 756  					hashHex := hex.EncodeToString(wt.Hash)
 757  					if _, ok := seen[hashHex]; !ok {
 758  						seen[hashHex] = struct{}{}
 759  						words = append(words, wt)
 760  					}
 761  				}
 762  			}
 763  
 764  			if tagsStr != "" {
 765  				tags := tag.NewS()
 766  				if err := tags.UnmarshalJSON([]byte(tagsStr)); err == nil && tags != nil {
 767  					for _, t := range *tags {
 768  						for _, field := range t.T {
 769  							if len(field) == 0 {
 770  								continue
 771  							}
 772  							for _, wt := range database.TokenWords(field) {
 773  								hashHex := hex.EncodeToString(wt.Hash)
 774  								if _, ok := seen[hashHex]; !ok {
 775  									seen[hashHex] = struct{}{}
 776  									words = append(words, wt)
 777  								}
 778  							}
 779  						}
 780  					}
 781  				}
 782  			}
 783  
 784  			if len(words) > 0 {
 785  				if err := n.addWordsInBatches(ctx, eventID, words); err != nil {
 786  					n.Logger.Warningf("failed to backfill words for event %s: %v",
 787  						safePrefix(eventID, 16), err)
 788  				}
 789  			}
 790  
 791  			batchCount++
 792  		}
 793  
 794  		if batchCount == 0 {
 795  			break
 796  		}
 797  
 798  		totalProcessed += batchCount
 799  		n.Logger.Infof("backfilled word index: %d/%d events processed", totalProcessed, eventCount)
 800  	}
 801  
 802  	n.Logger.Infof("word index backfill complete: %d events processed", totalProcessed)
 803  	return nil
 804  }
 805