save-event.go raw
1 package neo4j
2
3 import (
4 "context"
5 "encoding/hex"
6 "fmt"
7 "strconv"
8
9 "next.orly.dev/pkg/nostr/encoders/event"
10 "next.orly.dev/pkg/nostr/encoders/filter"
11 nostrHex "next.orly.dev/pkg/nostr/encoders/hex"
12 "next.orly.dev/pkg/database"
13 "next.orly.dev/pkg/database/indexes/types"
14 )
15
16 // parseInt64 parses a string to int64
17 func parseInt64(s string) (int64, error) {
18 return strconv.ParseInt(s, 10, 64)
19 }
20
21 // wordBatchSize is the maximum number of words to process in a single transaction
22 const wordBatchSize = 500
23
24 // tagBatchSize is the maximum number of tags to process in a single transaction
25 // This prevents Neo4j stack overflow errors with events that have thousands of tags
26 const tagBatchSize = 500
27
28 // SaveEvent stores a Nostr event in the Neo4j database.
29 // It creates event nodes and relationships for authors, tags, and references.
30 // This method leverages Neo4j's graph capabilities to model Nostr's social graph naturally.
31 //
32 // For social graph events (kinds 0, 3, 1984, 10000), it additionally processes them
33 // to maintain NostrUser nodes and FOLLOWS/MUTES/REPORTS relationships with event traceability.
34 //
35 // To prevent Neo4j stack overflow errors with events containing thousands of tags,
36 // tags are processed in batches using UNWIND instead of generating inline Cypher.
37 func (n *N) SaveEvent(c context.Context, ev *event.E) (exists bool, err error) {
38 eventID := nostrHex.Enc(ev.ID[:])
39
40 // Check if event already exists
41 checkCypher := "MATCH (e:Event {id: $id}) RETURN e.id AS id"
42 checkParams := map[string]any{"id": eventID}
43
44 result, err := n.ExecuteRead(c, checkCypher, checkParams)
45 if err != nil {
46 return false, fmt.Errorf("failed to check event existence: %w", err)
47 }
48
49 // Check if we got a result
50 ctx := context.Background()
51 if result.Next(ctx) {
52 // Event exists - check if it's a social event that needs reprocessing
53 // (in case relationships changed)
54 if ev.Kind == 0 || ev.Kind == 3 || ev.Kind == 1984 || ev.Kind == 10000 {
55 processor := NewSocialEventProcessor(n)
56 if err := processor.ProcessSocialEvent(c, ev); err != nil {
57 n.Logger.Warningf("failed to reprocess social event %s: %v", safePrefix(eventID, 16), err)
58 // Don't fail the whole save, social processing is supplementary
59 }
60 }
61 return true, nil // Event already exists
62 }
63
64 // For parameterized replaceable events (kinds 30000-39999), delete older versions
65 // before saving the new one. This ensures Neo4j only stores the latest version.
66 if ev.Kind >= 30000 && ev.Kind < 40000 {
67 if err := n.deleteOlderParameterizedReplaceable(c, ev); err != nil {
68 n.Logger.Warningf("failed to delete older replaceable events: %v", err)
69 // Continue with save - older events will be filtered at query time
70 }
71 }
72
73 // Get next serial number
74 serial, err := n.getNextSerial()
75 if err != nil {
76 return false, fmt.Errorf("failed to get serial number: %w", err)
77 }
78
79 // Step 1: Create base event with author (small, fixed-size query)
80 cypher, params := n.buildBaseEventCypher(ev, serial)
81 if _, err = n.ExecuteWrite(c, cypher, params); err != nil {
82 return false, fmt.Errorf("failed to save event: %w", err)
83 }
84
85 // Step 2: Process tags in batches to avoid stack overflow
86 if ev.Tags != nil {
87 if err := n.addTagsInBatches(c, eventID, ev); err != nil {
88 // Log but don't fail - base event is saved, tags are supplementary for queries
89 n.Logger.Errorf("failed to add tags for event %s: %v", safePrefix(eventID, 16), err)
90 }
91 }
92
93 // Step 3: Process word tokens for NIP-50 full-text search
94 if err := n.addWordsForEvent(c, eventID, ev); err != nil {
95 n.Logger.Errorf("failed to add word tokens for event %s: %v", safePrefix(eventID, 16), err)
96 }
97
98 // Process social graph events (kinds 0, 3, 1984, 10000)
99 // This creates NostrUser nodes and social relationships (FOLLOWS, MUTES, REPORTS)
100 // with event traceability for diff-based updates
101 if ev.Kind == 0 || ev.Kind == 3 || ev.Kind == 1984 || ev.Kind == 10000 {
102 processor := NewSocialEventProcessor(n)
103 if err := processor.ProcessSocialEvent(c, ev); err != nil {
104 // Log error but don't fail the whole save
105 // NIP-01 queries will still work even if social processing fails
106 n.Logger.Errorf("failed to process social event kind %d, event %s: %v",
107 ev.Kind, safePrefix(eventID, 16), err)
108 // Consider: should we fail here or continue?
109 // For now, continue - social graph is supplementary to base relay
110 }
111 }
112
113 return false, nil
114 }
115
116 // safePrefix returns up to n characters from a string, handling short strings gracefully
117 func safePrefix(s string, n int) string {
118 if len(s) <= n {
119 return s
120 }
121 return s[:n]
122 }
123
124 // buildNaddr creates the naddr coordinate string for addressable events.
125 // Format: pubkey:kind:dtag (colon-delimited)
126 // Returns empty string for non-addressable events.
127 // This is used for the naddr_unique constraint in Neo4j.
128 func buildNaddr(ev *event.E) string {
129 // Only for addressable events (kinds 30000-39999)
130 if ev.Kind < 30000 || ev.Kind >= 40000 {
131 return ""
132 }
133
134 pubkey := nostrHex.Enc(ev.Pubkey[:])
135 kind := strconv.FormatInt(int64(ev.Kind), 10)
136
137 // Get d-tag value (empty string if not present)
138 dValue := ""
139 if ev.Tags != nil {
140 if dTag := ev.Tags.GetFirst([]byte{'d'}); dTag != nil && len(dTag.T) >= 2 {
141 dValue = string(dTag.T[1])
142 }
143 }
144
145 return pubkey + ":" + kind + ":" + dValue
146 }
147
148 // buildBaseEventCypher constructs a Cypher query to create just the base event node and author.
149 // Tags are added separately in batches to prevent stack overflow with large tag sets.
150 // This creates:
151 // - Event node with all properties
152 // - NostrUser node and AUTHORED_BY relationship (unified author + WoT node)
153 func (n *N) buildBaseEventCypher(ev *event.E, serial uint64) (string, map[string]any) {
154 params := make(map[string]any)
155
156 // Event properties
157 eventID := nostrHex.Enc(ev.ID[:])
158 authorPubkey := nostrHex.Enc(ev.Pubkey[:])
159
160 params["eventId"] = eventID
161 params["serial"] = serial
162 params["kind"] = int64(ev.Kind)
163 params["createdAt"] = ev.CreatedAt
164 params["content"] = string(ev.Content)
165 params["sig"] = nostrHex.Enc(ev.Sig[:])
166 params["pubkey"] = authorPubkey
167
168 // Check for expiration tag (NIP-40)
169 var expirationTs int64 = 0
170 if ev.Tags != nil {
171 if expTag := ev.Tags.GetFirst([]byte("expiration")); expTag != nil && len(expTag.T) >= 2 {
172 if ts, err := parseInt64(string(expTag.T[1])); err == nil {
173 expirationTs = ts
174 }
175 }
176 }
177 params["expiration"] = expirationTs
178
179 // Compute naddr for addressable events (kinds 30000-39999)
180 // Format: pubkey:kind:dtag - NULL for non-addressable events
181 // NULL allows multiple non-addressable events while maintaining uniqueness for naddr values
182 naddr := buildNaddr(ev)
183 if naddr != "" {
184 params["naddr"] = naddr
185 } else {
186 params["naddr"] = nil
187 }
188
189 // Serialize tags as JSON string for storage
190 // Handle nil tags gracefully - nil means empty tags "[]"
191 var tagsJSON []byte
192 if ev.Tags != nil {
193 tagsJSON, _ = ev.Tags.MarshalJSON()
194 } else {
195 tagsJSON = []byte("[]")
196 }
197 params["tags"] = string(tagsJSON)
198
199 // Build Cypher query - just event + author, no tags (tags added in batches)
200 // Use MERGE to ensure idempotency for NostrUser nodes
201 // NostrUser serves both NIP-01 author tracking and WoT social graph
202 cypher := `
203 // Create or match NostrUser node (unified author + social graph)
204 MERGE (a:NostrUser {pubkey: $pubkey})
205 ON CREATE SET a.created_at = timestamp(), a.first_seen_event = $eventId
206
207 // Create event node with expiration for NIP-40 support and naddr for NIP-33 addressable events
208 CREATE (e:Event {
209 id: $eventId,
210 serial: $serial,
211 kind: $kind,
212 created_at: $createdAt,
213 content: $content,
214 sig: $sig,
215 pubkey: $pubkey,
216 tags: $tags,
217 expiration: $expiration,
218 naddr: $naddr
219 })
220
221 // Link event to author
222 CREATE (e)-[:AUTHORED_BY]->(a)
223
224 RETURN e.id AS id`
225
226 return cypher, params
227 }
228
229 // tagTypeValue represents a generic tag with type and value for batch processing
230 type tagTypeValue struct {
231 Type string
232 Value string
233 }
234
235 // addTagsInBatches processes event tags in batches using UNWIND to prevent Neo4j stack overflow.
236 // This handles e-tags (event references), p-tags (pubkey mentions), and other tags separately.
237 func (n *N) addTagsInBatches(c context.Context, eventID string, ev *event.E) error {
238 if ev.Tags == nil {
239 return nil
240 }
241
242 // Collect tags by type
243 var eTags, pTags []string
244 var otherTags []tagTypeValue
245
246 for _, tagItem := range *ev.Tags {
247 if len(tagItem.T) < 2 {
248 continue
249 }
250
251 tagType := string(tagItem.T[0])
252
253 switch tagType {
254 case "e": // Event reference
255 tagValue := ExtractETagValue(tagItem)
256 if tagValue != "" {
257 eTags = append(eTags, tagValue)
258 }
259 case "p": // Pubkey mention
260 tagValue := ExtractPTagValue(tagItem)
261 if tagValue != "" {
262 pTags = append(pTags, tagValue)
263 }
264 default: // Other tags
265 tagValue := string(tagItem.T[1])
266 otherTags = append(otherTags, tagTypeValue{Type: tagType, Value: tagValue})
267 }
268 }
269
270 // Add p-tags in batches (creates MENTIONS relationships)
271 if len(pTags) > 0 {
272 if err := n.addPTagsInBatches(c, eventID, pTags); err != nil {
273 return fmt.Errorf("failed to add p-tags: %w", err)
274 }
275 }
276
277 // Add e-tags in batches (creates REFERENCES relationships)
278 if len(eTags) > 0 {
279 if err := n.addETagsInBatches(c, eventID, eTags); err != nil {
280 return fmt.Errorf("failed to add e-tags: %w", err)
281 }
282 }
283
284 // Add other tags in batches (creates TAGGED_WITH relationships)
285 if len(otherTags) > 0 {
286 if err := n.addOtherTagsInBatches(c, eventID, otherTags); err != nil {
287 return fmt.Errorf("failed to add other tags: %w", err)
288 }
289 }
290
291 return nil
292 }
293
294 // addPTagsInBatches adds p-tag (pubkey mention) relationships using UNWIND for efficiency.
295 // Creates Tag nodes with type='p' and REFERENCES relationships to NostrUser nodes.
296 // This enables unified tag querying via #p filters while maintaining the social graph.
297 func (n *N) addPTagsInBatches(c context.Context, eventID string, pTags []string) error {
298 // Process in batches to avoid memory issues
299 for i := 0; i < len(pTags); i += tagBatchSize {
300 end := i + tagBatchSize
301 if end > len(pTags) {
302 end = len(pTags)
303 }
304 batch := pTags[i:end]
305
306 // Use UNWIND to process multiple p-tags in a single query
307 // Creates Tag nodes as intermediaries, enabling unified #p filter queries
308 // Tag-[:REFERENCES]->NostrUser allows graph traversal from tag to user
309 cypher := `
310 MATCH (e:Event {id: $eventId})
311 UNWIND $pubkeys AS pubkey
312 MERGE (t:Tag {type: 'p', value: pubkey})
313 CREATE (e)-[:TAGGED_WITH]->(t)
314 WITH t, pubkey
315 MERGE (u:NostrUser {pubkey: pubkey})
316 ON CREATE SET u.created_at = timestamp()
317 MERGE (t)-[:REFERENCES]->(u)`
318
319 params := map[string]any{
320 "eventId": eventID,
321 "pubkeys": batch,
322 }
323
324 if _, err := n.ExecuteWrite(c, cypher, params); err != nil {
325 return fmt.Errorf("batch %d-%d: %w", i, end, err)
326 }
327 }
328
329 return nil
330 }
331
332 // addETagsInBatches adds e-tag (event reference) relationships using UNWIND for efficiency.
333 // Creates Tag nodes with type='e' and REFERENCES relationships to Event nodes (if they exist).
334 // This enables unified tag querying via #e filters while maintaining event graph structure.
335 func (n *N) addETagsInBatches(c context.Context, eventID string, eTags []string) error {
336 // Process in batches to avoid memory issues
337 for i := 0; i < len(eTags); i += tagBatchSize {
338 end := i + tagBatchSize
339 if end > len(eTags) {
340 end = len(eTags)
341 }
342 batch := eTags[i:end]
343
344 // Use UNWIND to process multiple e-tags in a single query
345 // Creates Tag nodes as intermediaries, enabling unified #e filter queries
346 // Tag-[:REFERENCES]->Event allows graph traversal from tag to referenced event
347 // OPTIONAL MATCH ensures we only create REFERENCES if referenced event exists
348 cypher := `
349 MATCH (e:Event {id: $eventId})
350 UNWIND $eventIds AS refId
351 MERGE (t:Tag {type: 'e', value: refId})
352 CREATE (e)-[:TAGGED_WITH]->(t)
353 WITH t, refId
354 OPTIONAL MATCH (ref:Event {id: refId})
355 WHERE ref IS NOT NULL
356 MERGE (t)-[:REFERENCES]->(ref)`
357
358 params := map[string]any{
359 "eventId": eventID,
360 "eventIds": batch,
361 }
362
363 if _, err := n.ExecuteWrite(c, cypher, params); err != nil {
364 return fmt.Errorf("batch %d-%d: %w", i, end, err)
365 }
366 }
367
368 return nil
369 }
370
371 // addOtherTagsInBatches adds generic tag relationships using UNWIND for efficiency.
372 // Creates Tag nodes with type and value, and TAGGED_WITH relationships.
373 func (n *N) addOtherTagsInBatches(c context.Context, eventID string, tags []tagTypeValue) error {
374 // Process in batches to avoid memory issues
375 for i := 0; i < len(tags); i += tagBatchSize {
376 end := i + tagBatchSize
377 if end > len(tags) {
378 end = len(tags)
379 }
380 batch := tags[i:end]
381
382 // Convert to map slice for Neo4j parameter passing
383 tagMaps := make([]map[string]string, len(batch))
384 for j, t := range batch {
385 tagMaps[j] = map[string]string{"type": t.Type, "value": t.Value}
386 }
387
388 // Use UNWIND to process multiple tags in a single query
389 cypher := `
390 MATCH (e:Event {id: $eventId})
391 UNWIND $tags AS tag
392 MERGE (t:Tag {type: tag.type, value: tag.value})
393 CREATE (e)-[:TAGGED_WITH]->(t)`
394
395 params := map[string]any{
396 "eventId": eventID,
397 "tags": tagMaps,
398 }
399
400 if _, err := n.ExecuteWrite(c, cypher, params); err != nil {
401 return fmt.Errorf("batch %d-%d: %w", i, end, err)
402 }
403 }
404
405 return nil
406 }
407
408 // addWordsForEvent tokenizes event content and all tag field values,
409 // then creates Word nodes and HAS_WORD relationships in batches.
410 func (n *N) addWordsForEvent(c context.Context, eventID string, ev *event.E) error {
411 seen := make(map[string]struct{})
412 var words []database.WordToken
413
414 // Tokenize content
415 if len(ev.Content) > 0 {
416 for _, wt := range database.TokenWords(ev.Content) {
417 hashHex := hex.EncodeToString(wt.Hash)
418 if _, ok := seen[hashHex]; !ok {
419 seen[hashHex] = struct{}{}
420 words = append(words, wt)
421 }
422 }
423 }
424
425 // Tokenize all tag field values
426 if ev.Tags != nil {
427 for _, t := range *ev.Tags {
428 for _, field := range t.T {
429 if len(field) == 0 {
430 continue
431 }
432 for _, wt := range database.TokenWords(field) {
433 hashHex := hex.EncodeToString(wt.Hash)
434 if _, ok := seen[hashHex]; !ok {
435 seen[hashHex] = struct{}{}
436 words = append(words, wt)
437 }
438 }
439 }
440 }
441 }
442
443 if len(words) == 0 {
444 return nil
445 }
446
447 return n.addWordsInBatches(c, eventID, words)
448 }
449
450 // addWordsInBatches creates Word nodes and HAS_WORD relationships using UNWIND.
451 // Word nodes are keyed by their hex-encoded 8-byte SHA-256 hash and store
452 // the normalized lowercase word text as a readable label.
453 func (n *N) addWordsInBatches(c context.Context, eventID string, words []database.WordToken) error {
454 for i := 0; i < len(words); i += wordBatchSize {
455 end := i + wordBatchSize
456 if end > len(words) {
457 end = len(words)
458 }
459 batch := words[i:end]
460
461 wordMaps := make([]map[string]string, len(batch))
462 for j, wt := range batch {
463 wordMaps[j] = map[string]string{
464 "hash": hex.EncodeToString(wt.Hash),
465 "text": wt.Word,
466 }
467 }
468
469 cypher := `
470 MATCH (e:Event {id: $eventId})
471 UNWIND $words AS word
472 MERGE (w:Word {hash: word.hash})
473 ON CREATE SET w.text = word.text
474 MERGE (e)-[:HAS_WORD]->(w)`
475
476 params := map[string]any{
477 "eventId": eventID,
478 "words": wordMaps,
479 }
480
481 if _, err := n.ExecuteWrite(c, cypher, params); err != nil {
482 return fmt.Errorf("word batch %d-%d: %w", i, end, err)
483 }
484 }
485
486 return nil
487 }
488
489 // GetSerialsFromFilter returns event serials matching a filter
490 func (n *N) GetSerialsFromFilter(f *filter.F) (serials types.Uint40s, err error) {
491 // Use QueryForSerials with background context
492 return n.QueryForSerials(context.Background(), f)
493 }
494
495 // WouldReplaceEvent checks if an event would replace existing events
496 // This handles replaceable events (kinds 0, 3, and 10000-19999)
497 // and parameterized replaceable events (kinds 30000-39999)
498 func (n *N) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) {
499 // Check for replaceable events (kinds 0, 3, and 10000-19999)
500 isReplaceable := ev.Kind == 0 || ev.Kind == 3 || (ev.Kind >= 10000 && ev.Kind < 20000)
501
502 // Check for parameterized replaceable events (kinds 30000-39999)
503 isParameterizedReplaceable := ev.Kind >= 30000 && ev.Kind < 40000
504
505 if !isReplaceable && !isParameterizedReplaceable {
506 return false, nil, nil
507 }
508
509 authorPubkey := nostrHex.Enc(ev.Pubkey[:])
510 ctx := context.Background()
511
512 var cypher string
513 params := map[string]any{
514 "pubkey": authorPubkey,
515 "kind": int64(ev.Kind),
516 "createdAt": ev.CreatedAt,
517 }
518
519 if isParameterizedReplaceable {
520 // For parameterized replaceable events, we need to match on d-tag as well
521 dTag := ev.Tags.GetFirst([]byte{'d'})
522 if dTag == nil {
523 return false, nil, nil
524 }
525
526 dValue := ""
527 if len(dTag.T) >= 2 {
528 dValue = string(dTag.T[1])
529 }
530
531 params["dValue"] = dValue
532
533 // Query for existing parameterized replaceable events with same kind, pubkey, and d-tag
534 cypher = `
535 MATCH (e:Event {kind: $kind, pubkey: $pubkey})-[:TAGGED_WITH]->(t:Tag {type: 'd', value: $dValue})
536 WHERE e.created_at < $createdAt
537 RETURN e.serial AS serial, e.created_at AS created_at
538 ORDER BY e.created_at DESC`
539
540 } else {
541 // Query for existing replaceable events with same kind and pubkey
542 cypher = `
543 MATCH (e:Event {kind: $kind, pubkey: $pubkey})
544 WHERE e.created_at < $createdAt
545 RETURN e.serial AS serial, e.created_at AS created_at
546 ORDER BY e.created_at DESC`
547 }
548
549 result, err := n.ExecuteRead(ctx, cypher, params)
550 if err != nil {
551 return false, nil, fmt.Errorf("failed to query replaceable events: %w", err)
552 }
553
554 // Parse results
555 var serials types.Uint40s
556 wouldReplace := false
557
558 for result.Next(ctx) {
559 record := result.Record()
560 if record == nil {
561 continue
562 }
563
564 serialRaw, found := record.Get("serial")
565 if !found {
566 continue
567 }
568
569 serialVal, ok := serialRaw.(int64)
570 if !ok {
571 continue
572 }
573
574 wouldReplace = true
575 serial := types.Uint40{}
576 serial.Set(uint64(serialVal))
577 serials = append(serials, &serial)
578 }
579
580 return wouldReplace, serials, nil
581 }
582
583 // deleteOlderParameterizedReplaceable deletes older versions of parameterized replaceable events
584 // (kinds 30000-39999) that have the same pubkey, kind, and d-tag value.
585 // This is called before saving a new event to ensure only the latest version is stored.
586 func (n *N) deleteOlderParameterizedReplaceable(c context.Context, ev *event.E) error {
587 authorPubkey := nostrHex.Enc(ev.Pubkey[:])
588
589 // Get the d-tag value
590 dTag := ev.Tags.GetFirst([]byte{'d'})
591 dValue := ""
592 if dTag != nil && len(dTag.T) >= 2 {
593 dValue = string(dTag.T[1])
594 }
595
596 // Delete older events with same pubkey, kind, and d-tag
597 // Only delete if the existing event is older than the new one
598 cypher := `
599 MATCH (e:Event {kind: $kind, pubkey: $pubkey})-[:TAGGED_WITH]->(t:Tag {type: 'd', value: $dValue})
600 WHERE e.created_at < $createdAt
601 DETACH DELETE e`
602
603 params := map[string]any{
604 "pubkey": authorPubkey,
605 "kind": int64(ev.Kind),
606 "dValue": dValue,
607 "createdAt": ev.CreatedAt,
608 }
609
610 if _, err := n.ExecuteWrite(c, cypher, params); err != nil {
611 return fmt.Errorf("failed to delete older replaceable events: %w", err)
612 }
613
614 return nil
615 }
616