migrations.go raw
1 package neo4j
2
3 import (
4 "context"
5 "encoding/hex"
6 "fmt"
7
8 "next.orly.dev/pkg/nostr/encoders/tag"
9 "next.orly.dev/pkg/database"
10 )
11
12 // Migration represents a database migration with a version identifier
13 type Migration struct {
14 Version string
15 Description string
16 Migrate func(ctx context.Context, n *N) error
17 }
18
19 // migrations is the ordered list of database migrations
20 // Migrations are applied in order and tracked via Marker nodes
21 var migrations = []Migration{
22 {
23 Version: "v1",
24 Description: "Merge Author nodes into NostrUser nodes",
25 Migrate: migrateAuthorToNostrUser,
26 },
27 {
28 Version: "v2",
29 Description: "Clean up binary-encoded pubkeys and event IDs to lowercase hex",
30 Migrate: migrateBinaryToHex,
31 },
32 {
33 Version: "v3",
34 Description: "Convert direct REFERENCES/MENTIONS relationships to Tag-based model",
35 Migrate: migrateToTagBasedReferences,
36 },
37 {
38 Version: "v4",
39 Description: "Deduplicate REPORTS relationships by (reporter, reported, report_type)",
40 Migrate: migrateDeduplicateReports,
41 },
42 {
43 Version: "v5",
44 Description: "Add naddr property to addressable Event nodes (kinds 30000-39999)",
45 Migrate: migrateAddNaddr,
46 },
47 {
48 Version: "v6",
49 Description: "Backfill Word nodes and HAS_WORD relationships for NIP-50 search",
50 Migrate: migrateBackfillWords,
51 },
52 }
53
54 // RunMigrations executes all pending migrations
55 func (n *N) RunMigrations() {
56 ctx := context.Background()
57
58 for _, migration := range migrations {
59 // Check if migration has already been applied
60 if n.migrationApplied(ctx, migration.Version) {
61 n.Logger.Infof("migration %s already applied, skipping", migration.Version)
62 continue
63 }
64
65 n.Logger.Infof("applying migration %s: %s", migration.Version, migration.Description)
66
67 if err := migration.Migrate(ctx, n); err != nil {
68 n.Logger.Errorf("migration %s failed: %v", migration.Version, err)
69 // Continue to next migration - don't fail startup
70 continue
71 }
72
73 // Mark migration as complete
74 if err := n.markMigrationComplete(ctx, migration.Version, migration.Description); err != nil {
75 n.Logger.Warningf("failed to mark migration %s as complete: %v", migration.Version, err)
76 }
77
78 n.Logger.Infof("migration %s completed successfully", migration.Version)
79 }
80 }
81
82 // migrationApplied checks if a migration has already been applied
83 func (n *N) migrationApplied(ctx context.Context, version string) bool {
84 cypher := `
85 MATCH (m:Migration {version: $version})
86 RETURN m.version
87 `
88 result, err := n.ExecuteRead(ctx, cypher, map[string]any{"version": version})
89 if err != nil {
90 return false
91 }
92 return result.Next(ctx)
93 }
94
95 // markMigrationComplete marks a migration as applied
96 func (n *N) markMigrationComplete(ctx context.Context, version, description string) error {
97 cypher := `
98 CREATE (m:Migration {
99 version: $version,
100 description: $description,
101 applied_at: timestamp()
102 })
103 `
104 _, err := n.ExecuteWrite(ctx, cypher, map[string]any{
105 "version": version,
106 "description": description,
107 })
108 return err
109 }
110
111 // migrateAuthorToNostrUser migrates Author nodes to NostrUser nodes
112 // This consolidates the separate Author (NIP-01) and NostrUser (WoT) labels
113 // into a unified NostrUser label for the social graph
114 func migrateAuthorToNostrUser(ctx context.Context, n *N) error {
115 // Step 1: Check if there are any Author nodes to migrate
116 countCypher := `MATCH (a:Author) RETURN count(a) AS count`
117 countResult, err := n.ExecuteRead(ctx, countCypher, nil)
118 if err != nil {
119 return fmt.Errorf("failed to count Author nodes: %w", err)
120 }
121
122 var authorCount int64
123 if countResult.Next(ctx) {
124 record := countResult.Record()
125 if count, ok := record.Values[0].(int64); ok {
126 authorCount = count
127 }
128 }
129
130 if authorCount == 0 {
131 n.Logger.Infof("no Author nodes to migrate")
132 return nil
133 }
134
135 n.Logger.Infof("migrating %d Author nodes to NostrUser", authorCount)
136
137 // Step 2: For each Author node, merge into NostrUser with same pubkey
138 // This uses MERGE to either match existing NostrUser or create new one
139 // Then copies any relationships from Author to NostrUser
140 mergeCypher := `
141 // Match all Author nodes
142 MATCH (a:Author)
143
144 // For each Author, merge into NostrUser (creates if doesn't exist)
145 MERGE (u:NostrUser {pubkey: a.pubkey})
146 ON CREATE SET u.created_at = timestamp(), u.migrated_from_author = true
147
148 // Return count for logging
149 RETURN count(DISTINCT a) AS migrated
150 `
151
152 result, err := n.ExecuteWrite(ctx, mergeCypher, nil)
153 if err != nil {
154 return fmt.Errorf("failed to merge Author nodes to NostrUser: %w", err)
155 }
156
157 // Log result (result consumption happens within the session)
158 _ = result
159
160 // Step 3: Migrate AUTHORED_BY relationships from Author to NostrUser
161 // Events should now point to NostrUser instead of Author
162 relationshipCypher := `
163 // Find events linked to Author via AUTHORED_BY
164 MATCH (e:Event)-[r:AUTHORED_BY]->(a:Author)
165
166 // Get or create the corresponding NostrUser
167 MATCH (u:NostrUser {pubkey: a.pubkey})
168
169 // Create new relationship to NostrUser if it doesn't exist
170 MERGE (e)-[:AUTHORED_BY]->(u)
171
172 // Delete old relationship to Author
173 DELETE r
174
175 RETURN count(r) AS migrated_relationships
176 `
177
178 _, err = n.ExecuteWrite(ctx, relationshipCypher, nil)
179 if err != nil {
180 return fmt.Errorf("failed to migrate AUTHORED_BY relationships: %w", err)
181 }
182
183 // Step 4: Migrate MENTIONS relationships from Author to NostrUser
184 mentionsCypher := `
185 // Find events with MENTIONS to Author
186 MATCH (e:Event)-[r:MENTIONS]->(a:Author)
187
188 // Get or create the corresponding NostrUser
189 MATCH (u:NostrUser {pubkey: a.pubkey})
190
191 // Create new relationship to NostrUser if it doesn't exist
192 MERGE (e)-[:MENTIONS]->(u)
193
194 // Delete old relationship to Author
195 DELETE r
196
197 RETURN count(r) AS migrated_mentions
198 `
199
200 _, err = n.ExecuteWrite(ctx, mentionsCypher, nil)
201 if err != nil {
202 return fmt.Errorf("failed to migrate MENTIONS relationships: %w", err)
203 }
204
205 // Step 5: Delete orphaned Author nodes (no longer needed)
206 deleteCypher := `
207 // Find Author nodes with no remaining relationships
208 MATCH (a:Author)
209 WHERE NOT (a)<-[:AUTHORED_BY]-() AND NOT (a)<-[:MENTIONS]-()
210 DETACH DELETE a
211 RETURN count(a) AS deleted
212 `
213
214 _, err = n.ExecuteWrite(ctx, deleteCypher, nil)
215 if err != nil {
216 return fmt.Errorf("failed to delete orphaned Author nodes: %w", err)
217 }
218
219 // Step 6: Drop the old Author constraint if it exists
220 dropConstraintCypher := `DROP CONSTRAINT author_pubkey_unique IF EXISTS`
221 _, _ = n.ExecuteWrite(ctx, dropConstraintCypher, nil)
222 // Ignore error as constraint may not exist
223
224 n.Logger.Infof("completed Author to NostrUser migration")
225 return nil
226 }
227
228 // migrateBinaryToHex cleans up any binary-encoded pubkeys and event IDs
229 // The nostr library stores e/p tag values in binary format (33 bytes with null terminator),
230 // but Neo4j should store them as lowercase hex strings for consistent querying.
231 // This migration:
232 // 1. Finds NostrUser nodes with invalid (non-hex) pubkeys and deletes them
233 // 2. Finds Event nodes with invalid pubkeys/IDs and deletes them
234 // 3. Finds Tag nodes (type 'e' or 'p') with invalid values and deletes them
235 // 4. Cleans up MENTIONS relationships pointing to invalid NostrUser nodes
236 func migrateBinaryToHex(ctx context.Context, n *N) error {
237 // Step 1: Count problematic nodes before cleanup
238 n.Logger.Infof("scanning for binary-encoded values in Neo4j...")
239
240 // Check for NostrUser nodes with invalid pubkeys (not 64 char hex)
241 // A valid hex pubkey is exactly 64 lowercase hex characters
242 countInvalidUsersCypher := `
243 MATCH (u:NostrUser)
244 WHERE size(u.pubkey) <> 64
245 OR NOT u.pubkey =~ '^[0-9a-f]{64}$'
246 RETURN count(u) AS count
247 `
248 result, err := n.ExecuteRead(ctx, countInvalidUsersCypher, nil)
249 if err != nil {
250 return fmt.Errorf("failed to count invalid NostrUser nodes: %w", err)
251 }
252
253 var invalidUserCount int64
254 if result.Next(ctx) {
255 if count, ok := result.Record().Values[0].(int64); ok {
256 invalidUserCount = count
257 }
258 }
259 n.Logger.Infof("found %d NostrUser nodes with invalid pubkeys", invalidUserCount)
260
261 // Check for Event nodes with invalid pubkeys or IDs
262 countInvalidEventsCypher := `
263 MATCH (e:Event)
264 WHERE (size(e.pubkey) <> 64 OR NOT e.pubkey =~ '^[0-9a-f]{64}$')
265 OR (size(e.id) <> 64 OR NOT e.id =~ '^[0-9a-f]{64}$')
266 RETURN count(e) AS count
267 `
268 result, err = n.ExecuteRead(ctx, countInvalidEventsCypher, nil)
269 if err != nil {
270 return fmt.Errorf("failed to count invalid Event nodes: %w", err)
271 }
272
273 var invalidEventCount int64
274 if result.Next(ctx) {
275 if count, ok := result.Record().Values[0].(int64); ok {
276 invalidEventCount = count
277 }
278 }
279 n.Logger.Infof("found %d Event nodes with invalid pubkeys or IDs", invalidEventCount)
280
281 // Check for Tag nodes (e/p type) with invalid values
282 countInvalidTagsCypher := `
283 MATCH (t:Tag)
284 WHERE t.type IN ['e', 'p']
285 AND (size(t.value) <> 64 OR NOT t.value =~ '^[0-9a-f]{64}$')
286 RETURN count(t) AS count
287 `
288 result, err = n.ExecuteRead(ctx, countInvalidTagsCypher, nil)
289 if err != nil {
290 return fmt.Errorf("failed to count invalid Tag nodes: %w", err)
291 }
292
293 var invalidTagCount int64
294 if result.Next(ctx) {
295 if count, ok := result.Record().Values[0].(int64); ok {
296 invalidTagCount = count
297 }
298 }
299 n.Logger.Infof("found %d Tag nodes (e/p type) with invalid values", invalidTagCount)
300
301 // If nothing to clean up, we're done
302 if invalidUserCount == 0 && invalidEventCount == 0 && invalidTagCount == 0 {
303 n.Logger.Infof("no binary-encoded values found, migration complete")
304 return nil
305 }
306
307 // Step 2: Delete invalid NostrUser nodes and their relationships
308 if invalidUserCount > 0 {
309 n.Logger.Infof("deleting %d invalid NostrUser nodes...", invalidUserCount)
310 deleteInvalidUsersCypher := `
311 MATCH (u:NostrUser)
312 WHERE size(u.pubkey) <> 64
313 OR NOT u.pubkey =~ '^[0-9a-f]{64}$'
314 DETACH DELETE u
315 `
316 _, err = n.ExecuteWrite(ctx, deleteInvalidUsersCypher, nil)
317 if err != nil {
318 return fmt.Errorf("failed to delete invalid NostrUser nodes: %w", err)
319 }
320 n.Logger.Infof("deleted %d invalid NostrUser nodes", invalidUserCount)
321 }
322
323 // Step 3: Delete invalid Event nodes and their relationships
324 if invalidEventCount > 0 {
325 n.Logger.Infof("deleting %d invalid Event nodes...", invalidEventCount)
326 deleteInvalidEventsCypher := `
327 MATCH (e:Event)
328 WHERE (size(e.pubkey) <> 64 OR NOT e.pubkey =~ '^[0-9a-f]{64}$')
329 OR (size(e.id) <> 64 OR NOT e.id =~ '^[0-9a-f]{64}$')
330 DETACH DELETE e
331 `
332 _, err = n.ExecuteWrite(ctx, deleteInvalidEventsCypher, nil)
333 if err != nil {
334 return fmt.Errorf("failed to delete invalid Event nodes: %w", err)
335 }
336 n.Logger.Infof("deleted %d invalid Event nodes", invalidEventCount)
337 }
338
339 // Step 4: Delete invalid Tag nodes (e/p type) and their relationships
340 if invalidTagCount > 0 {
341 n.Logger.Infof("deleting %d invalid Tag nodes...", invalidTagCount)
342 deleteInvalidTagsCypher := `
343 MATCH (t:Tag)
344 WHERE t.type IN ['e', 'p']
345 AND (size(t.value) <> 64 OR NOT t.value =~ '^[0-9a-f]{64}$')
346 DETACH DELETE t
347 `
348 _, err = n.ExecuteWrite(ctx, deleteInvalidTagsCypher, nil)
349 if err != nil {
350 return fmt.Errorf("failed to delete invalid Tag nodes: %w", err)
351 }
352 n.Logger.Infof("deleted %d invalid Tag nodes", invalidTagCount)
353 }
354
355 // Step 5: Clean up any orphaned MENTIONS/REFERENCES relationships
356 // These would be relationships pointing to nodes we just deleted
357 cleanupOrphanedCypher := `
358 // Clean up any ProcessedSocialEvent nodes with invalid pubkeys
359 MATCH (p:ProcessedSocialEvent)
360 WHERE size(p.pubkey) <> 64
361 OR NOT p.pubkey =~ '^[0-9a-f]{64}$'
362 DETACH DELETE p
363 `
364 _, _ = n.ExecuteWrite(ctx, cleanupOrphanedCypher, nil)
365 // Ignore errors - best effort cleanup
366
367 n.Logger.Infof("binary-to-hex migration completed successfully")
368 return nil
369 }
370
371 // migrateToTagBasedReferences converts direct REFERENCES and MENTIONS relationships
372 // to the new Tag-based model where:
373 // - Event-[:REFERENCES]->Event becomes Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->Event
374 // - Event-[:MENTIONS]->NostrUser becomes Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->NostrUser
375 //
376 // This enables unified tag querying via #e and #p filters while maintaining graph traversal.
377 func migrateToTagBasedReferences(ctx context.Context, n *N) error {
378 // Step 1: Count existing direct REFERENCES relationships (Event->Event)
379 countRefCypher := `
380 MATCH (source:Event)-[r:REFERENCES]->(target:Event)
381 RETURN count(r) AS count
382 `
383 result, err := n.ExecuteRead(ctx, countRefCypher, nil)
384 if err != nil {
385 return fmt.Errorf("failed to count REFERENCES relationships: %w", err)
386 }
387
388 var refCount int64
389 if result.Next(ctx) {
390 if count, ok := result.Record().Values[0].(int64); ok {
391 refCount = count
392 }
393 }
394 n.Logger.Infof("found %d direct Event-[:REFERENCES]->Event relationships to migrate", refCount)
395
396 // Step 2: Count existing direct MENTIONS relationships (Event->NostrUser)
397 countMentionsCypher := `
398 MATCH (source:Event)-[r:MENTIONS]->(target:NostrUser)
399 RETURN count(r) AS count
400 `
401 result, err = n.ExecuteRead(ctx, countMentionsCypher, nil)
402 if err != nil {
403 return fmt.Errorf("failed to count MENTIONS relationships: %w", err)
404 }
405
406 var mentionsCount int64
407 if result.Next(ctx) {
408 if count, ok := result.Record().Values[0].(int64); ok {
409 mentionsCount = count
410 }
411 }
412 n.Logger.Infof("found %d direct Event-[:MENTIONS]->NostrUser relationships to migrate", mentionsCount)
413
414 // If nothing to migrate, we're done
415 if refCount == 0 && mentionsCount == 0 {
416 n.Logger.Infof("no direct relationships to migrate, migration complete")
417 return nil
418 }
419
420 // Step 3: Migrate REFERENCES relationships to Tag-based model
421 // Process in batches to avoid memory issues with large datasets
422 if refCount > 0 {
423 n.Logger.Infof("migrating %d REFERENCES relationships to Tag-based model...", refCount)
424
425 // This query:
426 // 1. Finds Event->Event REFERENCES relationships
427 // 2. Creates/merges Tag node with type='e' and value=target event ID
428 // 3. Creates TAGGED_WITH from source Event to Tag
429 // 4. Creates REFERENCES from Tag to target Event
430 // 5. Deletes the old direct REFERENCES relationship
431 migrateRefCypher := `
432 MATCH (source:Event)-[r:REFERENCES]->(target:Event)
433 WITH source, r, target LIMIT 1000
434 MERGE (t:Tag {type: 'e', value: target.id})
435 CREATE (source)-[:TAGGED_WITH]->(t)
436 MERGE (t)-[:REFERENCES]->(target)
437 DELETE r
438 RETURN count(r) AS migrated
439 `
440
441 // Run migration in batches until no more relationships exist
442 totalMigrated := int64(0)
443 for {
444 result, err := n.ExecuteWrite(ctx, migrateRefCypher, nil)
445 if err != nil {
446 return fmt.Errorf("failed to migrate REFERENCES batch: %w", err)
447 }
448
449 var batchMigrated int64
450 if result.Next(ctx) {
451 if count, ok := result.Record().Values[0].(int64); ok {
452 batchMigrated = count
453 }
454 }
455
456 if batchMigrated == 0 {
457 break
458 }
459 totalMigrated += batchMigrated
460 n.Logger.Infof("migrated %d REFERENCES relationships (total: %d)", batchMigrated, totalMigrated)
461 }
462
463 n.Logger.Infof("completed migrating %d REFERENCES relationships", totalMigrated)
464 }
465
466 // Step 4: Migrate MENTIONS relationships to Tag-based model
467 if mentionsCount > 0 {
468 n.Logger.Infof("migrating %d MENTIONS relationships to Tag-based model...", mentionsCount)
469
470 // This query:
471 // 1. Finds Event->NostrUser MENTIONS relationships
472 // 2. Creates/merges Tag node with type='p' and value=target pubkey
473 // 3. Creates TAGGED_WITH from source Event to Tag
474 // 4. Creates REFERENCES from Tag to target NostrUser
475 // 5. Deletes the old direct MENTIONS relationship
476 migrateMentionsCypher := `
477 MATCH (source:Event)-[r:MENTIONS]->(target:NostrUser)
478 WITH source, r, target LIMIT 1000
479 MERGE (t:Tag {type: 'p', value: target.pubkey})
480 CREATE (source)-[:TAGGED_WITH]->(t)
481 MERGE (t)-[:REFERENCES]->(target)
482 DELETE r
483 RETURN count(r) AS migrated
484 `
485
486 // Run migration in batches until no more relationships exist
487 totalMigrated := int64(0)
488 for {
489 result, err := n.ExecuteWrite(ctx, migrateMentionsCypher, nil)
490 if err != nil {
491 return fmt.Errorf("failed to migrate MENTIONS batch: %w", err)
492 }
493
494 var batchMigrated int64
495 if result.Next(ctx) {
496 if count, ok := result.Record().Values[0].(int64); ok {
497 batchMigrated = count
498 }
499 }
500
501 if batchMigrated == 0 {
502 break
503 }
504 totalMigrated += batchMigrated
505 n.Logger.Infof("migrated %d MENTIONS relationships (total: %d)", batchMigrated, totalMigrated)
506 }
507
508 n.Logger.Infof("completed migrating %d MENTIONS relationships", totalMigrated)
509 }
510
511 n.Logger.Infof("Tag-based references migration completed successfully")
512 return nil
513 }
514
515 // migrateDeduplicateReports removes duplicate REPORTS relationships
516 // Prior to this migration, processReport() used CREATE which allowed multiple
517 // REPORTS relationships with the same report_type between the same two users.
518 // This migration keeps only the most recent report (by created_at) for each
519 // (reporter, reported, report_type) combination.
520 func migrateDeduplicateReports(ctx context.Context, n *N) error {
521 // Step 1: Count duplicate REPORTS relationships
522 // Duplicates are defined as multiple REPORTS with the same (reporter, reported, report_type)
523 countDuplicatesCypher := `
524 MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser)
525 WITH reporter, reported, r.report_type AS type, collect(r) AS rels
526 WHERE size(rels) > 1
527 RETURN sum(size(rels) - 1) AS duplicate_count
528 `
529 result, err := n.ExecuteRead(ctx, countDuplicatesCypher, nil)
530 if err != nil {
531 return fmt.Errorf("failed to count duplicate REPORTS: %w", err)
532 }
533
534 var duplicateCount int64
535 if result.Next(ctx) {
536 if count, ok := result.Record().Values[0].(int64); ok {
537 duplicateCount = count
538 }
539 }
540
541 if duplicateCount == 0 {
542 n.Logger.Infof("no duplicate REPORTS relationships found, migration complete")
543 return nil
544 }
545
546 n.Logger.Infof("found %d duplicate REPORTS relationships to remove", duplicateCount)
547
548 // Step 2: Delete duplicate REPORTS, keeping the one with the highest created_at
549 // This query:
550 // 1. Groups REPORTS by (reporter, reported, report_type)
551 // 2. Finds the maximum created_at for each group
552 // 3. Deletes all relationships in the group except the newest one
553 deleteDuplicatesCypher := `
554 MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser)
555 WITH reporter, reported, r.report_type AS type,
556 collect(r) AS rels, max(r.created_at) AS maxCreatedAt
557 WHERE size(rels) > 1
558 UNWIND rels AS rel
559 WITH rel, maxCreatedAt
560 WHERE rel.created_at < maxCreatedAt
561 DELETE rel
562 RETURN count(*) AS deleted
563 `
564
565 writeResult, err := n.ExecuteWrite(ctx, deleteDuplicatesCypher, nil)
566 if err != nil {
567 return fmt.Errorf("failed to delete duplicate REPORTS: %w", err)
568 }
569
570 var deletedCount int64
571 if writeResult.Next(ctx) {
572 if count, ok := writeResult.Record().Values[0].(int64); ok {
573 deletedCount = count
574 }
575 }
576
577 n.Logger.Infof("deleted %d duplicate REPORTS relationships", deletedCount)
578
579 // Step 3: Mark superseded ProcessedSocialEvent nodes for deleted reports
580 // Find ProcessedSocialEvent nodes (kind 1984) whose event IDs are no longer
581 // referenced by any REPORTS relationship's created_by_event
582 markSupersededCypher := `
583 MATCH (evt:ProcessedSocialEvent {event_kind: 1984})
584 WHERE evt.superseded_by IS NULL
585 AND NOT EXISTS {
586 MATCH ()-[r:REPORTS]->()
587 WHERE r.created_by_event = evt.event_id
588 }
589 SET evt.superseded_by = 'migration_v4_dedupe'
590 RETURN count(evt) AS superseded
591 `
592
593 markResult, err := n.ExecuteWrite(ctx, markSupersededCypher, nil)
594 if err != nil {
595 // Non-fatal - just log warning
596 n.Logger.Warningf("failed to mark superseded ProcessedSocialEvent nodes: %v", err)
597 } else {
598 var supersededCount int64
599 if markResult.Next(ctx) {
600 if count, ok := markResult.Record().Values[0].(int64); ok {
601 supersededCount = count
602 }
603 }
604 if supersededCount > 0 {
605 n.Logger.Infof("marked %d ProcessedSocialEvent nodes as superseded", supersededCount)
606 }
607 }
608
609 n.Logger.Infof("REPORTS deduplication migration completed successfully")
610 return nil
611 }
612
613 // migrateAddNaddr adds the naddr property to existing addressable Event nodes (kinds 30000-39999).
614 // The naddr format is: pubkey:kind:dtag (colon-delimited coordinate)
615 // This enables direct lookups and uniqueness constraints for parameterized replaceable events.
616 func migrateAddNaddr(ctx context.Context, n *N) error {
617 // Step 1: Count addressable events without naddr
618 countCypher := `
619 MATCH (e:Event)
620 WHERE e.kind >= 30000 AND e.kind < 40000
621 AND e.naddr IS NULL
622 RETURN count(e) AS count
623 `
624 result, err := n.ExecuteRead(ctx, countCypher, nil)
625 if err != nil {
626 return fmt.Errorf("failed to count addressable events: %w", err)
627 }
628
629 var eventCount int64
630 if result.Next(ctx) {
631 if count, ok := result.Record().Values[0].(int64); ok {
632 eventCount = count
633 }
634 }
635
636 if eventCount == 0 {
637 n.Logger.Infof("no addressable events without naddr found, migration complete")
638 return nil
639 }
640
641 n.Logger.Infof("found %d addressable events to update with naddr", eventCount)
642
643 // Step 2: Update events in batches
644 // For each event, compute naddr from pubkey + kind + d-tag
645 // The d-tag value is obtained from the Tag node via TAGGED_WITH relationship
646 updateCypher := `
647 MATCH (e:Event)
648 WHERE e.kind >= 30000 AND e.kind < 40000
649 AND e.naddr IS NULL
650 WITH e LIMIT 1000
651
652 // Get d-tag value via TAGGED_WITH relationship
653 OPTIONAL MATCH (e)-[:TAGGED_WITH]->(t:Tag {type: 'd'})
654 WITH e, COALESCE(t.value, '') AS dValue
655
656 // Build naddr: pubkey:kind:dValue
657 SET e.naddr = e.pubkey + ':' + toString(e.kind) + ':' + dValue
658
659 RETURN count(e) AS updated
660 `
661
662 // Run migration in batches until no more events to update
663 totalUpdated := int64(0)
664 for {
665 writeResult, err := n.ExecuteWrite(ctx, updateCypher, nil)
666 if err != nil {
667 return fmt.Errorf("failed to update addressable events batch: %w", err)
668 }
669
670 var batchUpdated int64
671 if writeResult.Next(ctx) {
672 if count, ok := writeResult.Record().Values[0].(int64); ok {
673 batchUpdated = count
674 }
675 }
676
677 if batchUpdated == 0 {
678 break
679 }
680 totalUpdated += batchUpdated
681 n.Logger.Infof("updated %d addressable events with naddr (total: %d)", batchUpdated, totalUpdated)
682 }
683
684 n.Logger.Infof("naddr migration completed: updated %d addressable events", totalUpdated)
685 return nil
686 }
687
688 // migrateBackfillWords creates Word nodes and HAS_WORD relationships for all
689 // existing Event nodes that don't have them yet. This enables NIP-50 word search
690 // for events saved before word indexing was added to SaveEvent.
691 func migrateBackfillWords(ctx context.Context, n *N) error {
692 // Count events without word index
693 countCypher := `
694 MATCH (e:Event)
695 WHERE NOT (e)-[:HAS_WORD]->(:Word)
696 RETURN count(e) AS count
697 `
698 result, err := n.ExecuteRead(ctx, countCypher, nil)
699 if err != nil {
700 return fmt.Errorf("failed to count events: %w", err)
701 }
702
703 var eventCount int64
704 if result.Next(ctx) {
705 if count, ok := result.Record().Values[0].(int64); ok {
706 eventCount = count
707 }
708 }
709
710 if eventCount == 0 {
711 n.Logger.Infof("no events to backfill word index for")
712 return nil
713 }
714
715 n.Logger.Infof("backfilling word index for %d events...", eventCount)
716
717 // Process events in batches: fetch content+tags, tokenize in Go, write Word nodes back
718 const fetchBatchSize = 500
719 totalProcessed := int64(0)
720
721 for {
722 fetchCypher := `
723 MATCH (e:Event)
724 WHERE NOT (e)-[:HAS_WORD]->(:Word)
725 RETURN e.id AS id, e.content AS content, e.tags AS tags
726 LIMIT $batchSize
727 `
728 batchResult, err := n.ExecuteRead(ctx, fetchCypher, map[string]any{
729 "batchSize": fetchBatchSize,
730 })
731 if err != nil {
732 return fmt.Errorf("failed to fetch events batch: %w", err)
733 }
734
735 var batchCount int64
736 for batchResult.Next(ctx) {
737 record := batchResult.Record()
738 if record == nil {
739 continue
740 }
741
742 idRaw, _ := record.Get("id")
743 contentRaw, _ := record.Get("content")
744 tagsRaw, _ := record.Get("tags")
745
746 eventID, _ := idRaw.(string)
747 content, _ := contentRaw.(string)
748 tagsStr, _ := tagsRaw.(string)
749
750 // Collect unique word tokens from content and tags
751 seen := make(map[string]struct{})
752 var words []database.WordToken
753
754 if len(content) > 0 {
755 for _, wt := range database.TokenWords([]byte(content)) {
756 hashHex := hex.EncodeToString(wt.Hash)
757 if _, ok := seen[hashHex]; !ok {
758 seen[hashHex] = struct{}{}
759 words = append(words, wt)
760 }
761 }
762 }
763
764 if tagsStr != "" {
765 tags := tag.NewS()
766 if err := tags.UnmarshalJSON([]byte(tagsStr)); err == nil && tags != nil {
767 for _, t := range *tags {
768 for _, field := range t.T {
769 if len(field) == 0 {
770 continue
771 }
772 for _, wt := range database.TokenWords(field) {
773 hashHex := hex.EncodeToString(wt.Hash)
774 if _, ok := seen[hashHex]; !ok {
775 seen[hashHex] = struct{}{}
776 words = append(words, wt)
777 }
778 }
779 }
780 }
781 }
782 }
783
784 if len(words) > 0 {
785 if err := n.addWordsInBatches(ctx, eventID, words); err != nil {
786 n.Logger.Warningf("failed to backfill words for event %s: %v",
787 safePrefix(eventID, 16), err)
788 }
789 }
790
791 batchCount++
792 }
793
794 if batchCount == 0 {
795 break
796 }
797
798 totalProcessed += batchCount
799 n.Logger.Infof("backfilled word index: %d/%d events processed", totalProcessed, eventCount)
800 }
801
802 n.Logger.Infof("word index backfill complete: %d events processed", totalProcessed)
803 return nil
804 }
805