social-event-processor.go raw
1 package neo4j
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "sort"
8
9 "next.orly.dev/pkg/nostr/encoders/event"
10 "next.orly.dev/pkg/nostr/encoders/hex"
11 )
12
13 // SocialEventProcessor handles kind 0, 3, 1984, 10000 events for social graph management
14 type SocialEventProcessor struct {
15 db *N
16 }
17
18 // NewSocialEventProcessor creates a new social event processor
19 func NewSocialEventProcessor(db *N) *SocialEventProcessor {
20 return &SocialEventProcessor{db: db}
21 }
22
23 // ProcessedSocialEvent represents a processed social graph event in Neo4j
24 type ProcessedSocialEvent struct {
25 EventID string
26 EventKind int
27 Pubkey string
28 CreatedAt int64
29 ProcessedAt int64
30 RelationshipCount int
31 SupersededBy *string // nil if still active
32 }
33
34 // ProcessSocialEvent routes events to appropriate handlers based on kind
35 func (p *SocialEventProcessor) ProcessSocialEvent(ctx context.Context, ev *event.E) error {
36 switch ev.Kind {
37 case 0:
38 return p.processProfileMetadata(ctx, ev)
39 case 3:
40 return p.processContactList(ctx, ev)
41 case 1984:
42 return p.processReport(ctx, ev)
43 case 10000:
44 return p.processMuteList(ctx, ev)
45 default:
46 return fmt.Errorf("unsupported social event kind: %d", ev.Kind)
47 }
48 }
49
50 // processProfileMetadata handles kind 0 events (profile metadata)
51 func (p *SocialEventProcessor) processProfileMetadata(ctx context.Context, ev *event.E) error {
52 pubkey := hex.Enc(ev.Pubkey[:])
53 eventID := hex.Enc(ev.ID[:])
54
55 // Parse profile JSON from content
56 var profile map[string]interface{}
57 if err := json.Unmarshal(ev.Content, &profile); err != nil {
58 p.db.Logger.Warningf("invalid profile JSON in event %s: %v", eventID, err)
59 return nil // Don't fail, just skip profile update
60 }
61
62 // Update NostrUser node with profile data
63 cypher := `
64 MERGE (user:NostrUser {pubkey: $pubkey})
65 ON CREATE SET
66 user.created_at = timestamp(),
67 user.first_seen_event = $event_id
68 ON MATCH SET
69 user.last_profile_update = $created_at
70 SET
71 user.name = $name,
72 user.about = $about,
73 user.picture = $picture,
74 user.nip05 = $nip05,
75 user.lud16 = $lud16,
76 user.display_name = $display_name,
77 user.npub = $npub
78 `
79
80 params := map[string]any{
81 "pubkey": pubkey,
82 "event_id": eventID,
83 "created_at": ev.CreatedAt,
84 "name": getStringFromMap(profile, "name"),
85 "about": getStringFromMap(profile, "about"),
86 "picture": getStringFromMap(profile, "picture"),
87 "nip05": getStringFromMap(profile, "nip05"),
88 "lud16": getStringFromMap(profile, "lud16"),
89 "display_name": getStringFromMap(profile, "display_name"),
90 "npub": "", // TODO: compute npub from pubkey
91 }
92
93 _, err := p.db.ExecuteWrite(ctx, cypher, params)
94 if err != nil {
95 return fmt.Errorf("failed to update profile: %w", err)
96 }
97
98 p.db.Logger.Infof("updated profile for user %s", safePrefix(pubkey, 16))
99 return nil
100 }
101
102 // processContactList handles kind 3 events (follow lists)
103 func (p *SocialEventProcessor) processContactList(ctx context.Context, ev *event.E) error {
104 authorPubkey := hex.Enc(ev.Pubkey[:])
105 eventID := hex.Enc(ev.ID[:])
106
107 // 1. Check for existing contact list
108 existingEvent, err := p.getLatestSocialEvent(ctx, authorPubkey, 3)
109 if err != nil {
110 return fmt.Errorf("failed to check existing contact list: %w", err)
111 }
112
113 // 2. Reject if this event is older than existing
114 if existingEvent != nil && existingEvent.CreatedAt >= ev.CreatedAt {
115 p.db.Logger.Infof("rejecting older contact list event %s (existing: %s)",
116 safePrefix(eventID, 16), safePrefix(existingEvent.EventID, 16))
117 return nil // Not an error, just skip
118 }
119
120 // 3. Extract p-tags to get new follows list
121 newFollows := extractPTags(ev)
122
123 // 4. Get old follows list if replacing an existing event
124 var oldFollows []string
125 var oldEventID string
126 if existingEvent != nil {
127 oldEventID = existingEvent.EventID
128 oldFollows, err = p.getFollowsForEvent(ctx, oldEventID)
129 if err != nil {
130 return fmt.Errorf("failed to get old follows: %w", err)
131 }
132 }
133
134 // 5. Compute diff
135 added, removed := diffStringSlices(oldFollows, newFollows)
136
137 // 6. Update graph in transaction
138 err = p.updateContactListGraph(ctx, UpdateContactListParams{
139 AuthorPubkey: authorPubkey,
140 NewEventID: eventID,
141 OldEventID: oldEventID,
142 CreatedAt: ev.CreatedAt,
143 AddedFollows: added,
144 RemovedFollows: removed,
145 TotalFollows: len(newFollows),
146 })
147
148 if err != nil {
149 return fmt.Errorf("failed to update contact list graph: %w", err)
150 }
151
152 p.db.Logger.Infof("processed contact list: author=%s, event=%s, added=%d, removed=%d, total=%d",
153 safePrefix(authorPubkey, 16), safePrefix(eventID, 16), len(added), len(removed), len(newFollows))
154
155 return nil
156 }
157
158 // processMuteList handles kind 10000 events (mute lists)
159 func (p *SocialEventProcessor) processMuteList(ctx context.Context, ev *event.E) error {
160 authorPubkey := hex.Enc(ev.Pubkey[:])
161 eventID := hex.Enc(ev.ID[:])
162
163 // Check for existing mute list
164 existingEvent, err := p.getLatestSocialEvent(ctx, authorPubkey, 10000)
165 if err != nil {
166 return fmt.Errorf("failed to check existing mute list: %w", err)
167 }
168
169 // Reject if older
170 if existingEvent != nil && existingEvent.CreatedAt >= ev.CreatedAt {
171 p.db.Logger.Infof("rejecting older mute list event %s", safePrefix(eventID, 16))
172 return nil
173 }
174
175 // Extract p-tags
176 newMutes := extractPTags(ev)
177
178 // Get old mutes
179 var oldMutes []string
180 var oldEventID string
181 if existingEvent != nil {
182 oldEventID = existingEvent.EventID
183 oldMutes, err = p.getMutesForEvent(ctx, oldEventID)
184 if err != nil {
185 return fmt.Errorf("failed to get old mutes: %w", err)
186 }
187 }
188
189 // Compute diff
190 added, removed := diffStringSlices(oldMutes, newMutes)
191
192 // Update graph
193 err = p.updateMuteListGraph(ctx, UpdateMuteListParams{
194 AuthorPubkey: authorPubkey,
195 NewEventID: eventID,
196 OldEventID: oldEventID,
197 CreatedAt: ev.CreatedAt,
198 AddedMutes: added,
199 RemovedMutes: removed,
200 TotalMutes: len(newMutes),
201 })
202
203 if err != nil {
204 return fmt.Errorf("failed to update mute list graph: %w", err)
205 }
206
207 p.db.Logger.Infof("processed mute list: author=%s, event=%s, added=%d, removed=%d",
208 safePrefix(authorPubkey, 16), safePrefix(eventID, 16), len(added), len(removed))
209
210 return nil
211 }
212
213 // processReport handles kind 1984 events (reports)
214 // Deduplicates by (reporter, reported, report_type) - only one REPORTS relationship
215 // per combination, with the most recent event's data preserved.
216 func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) error {
217 reporterPubkey := hex.Enc(ev.Pubkey[:])
218 eventID := hex.Enc(ev.ID[:])
219
220 // Extract report target and type from tags
221 // Format: ["p", "reported_pubkey", "report_type"]
222 var reportedPubkey string
223 var reportType string = "other" // default
224
225 for _, t := range *ev.Tags {
226 if len(t.T) >= 2 && string(t.T[0]) == "p" {
227 // Use ExtractPTagValue to handle binary encoding and normalize to lowercase
228 reportedPubkey = ExtractPTagValue(t)
229 if len(t.T) >= 3 {
230 reportType = string(t.T[2])
231 }
232 break // Use first p-tag
233 }
234 }
235
236 if reportedPubkey == "" {
237 p.db.Logger.Warningf("report event %s has no p-tag, skipping", safePrefix(eventID, 16))
238 return nil
239 }
240
241 // Check for existing report of the same type to determine if this is an update
242 existingEventID, err := p.getExistingReportEvent(ctx, reporterPubkey, reportedPubkey, reportType)
243 if err != nil {
244 return fmt.Errorf("failed to check existing report: %w", err)
245 }
246
247 // Create REPORTS relationship with MERGE to deduplicate
248 // MERGE on (reporter, reported, report_type) ensures only one relationship per combination
249 cypher := `
250 // Create event tracking node
251 CREATE (evt:ProcessedSocialEvent {
252 event_id: $event_id,
253 event_kind: 1984,
254 pubkey: $reporter_pubkey,
255 created_at: $created_at,
256 processed_at: timestamp(),
257 relationship_count: 1,
258 superseded_by: null
259 })
260
261 // WITH required to transition from CREATE to MERGE
262 WITH evt
263
264 // Create or get reporter and reported users
265 MERGE (reporter:NostrUser {pubkey: $reporter_pubkey})
266 MERGE (reported:NostrUser {pubkey: $reported_pubkey})
267
268 // MERGE on (reporter, reported, report_type) - deduplicate!
269 MERGE (reporter)-[r:REPORTS {report_type: $report_type}]->(reported)
270 ON CREATE SET
271 r.created_by_event = $event_id,
272 r.created_at = $created_at,
273 r.relay_received_at = timestamp()
274 ON MATCH SET
275 // Only update if this event is newer
276 r.created_by_event = CASE WHEN $created_at > r.created_at
277 THEN $event_id ELSE r.created_by_event END,
278 r.created_at = CASE WHEN $created_at > r.created_at
279 THEN $created_at ELSE r.created_at END
280 `
281
282 params := map[string]any{
283 "event_id": eventID,
284 "reporter_pubkey": reporterPubkey,
285 "reported_pubkey": reportedPubkey,
286 "created_at": ev.CreatedAt,
287 "report_type": reportType,
288 }
289
290 _, err = p.db.ExecuteWrite(ctx, cypher, params)
291 if err != nil {
292 return fmt.Errorf("failed to create/update report: %w", err)
293 }
294
295 // Mark old ProcessedSocialEvent as superseded if this is an update with newer data
296 if existingEventID != "" && existingEventID != eventID {
297 p.markReportEventSuperseded(ctx, existingEventID, eventID)
298 }
299
300 p.db.Logger.Infof("processed report: reporter=%s, reported=%s, type=%s",
301 safePrefix(reporterPubkey, 16), safePrefix(reportedPubkey, 16), reportType)
302
303 return nil
304 }
305
306 // getExistingReportEvent checks if a REPORTS relationship already exists for this combination
307 // Returns the event ID that created the relationship, or empty string if none exists
308 func (p *SocialEventProcessor) getExistingReportEvent(ctx context.Context, reporterPubkey, reportedPubkey, reportType string) (string, error) {
309 cypher := `
310 MATCH (reporter:NostrUser {pubkey: $reporter_pubkey})-[r:REPORTS {report_type: $report_type}]->(reported:NostrUser {pubkey: $reported_pubkey})
311 RETURN r.created_by_event AS event_id
312 LIMIT 1
313 `
314
315 params := map[string]any{
316 "reporter_pubkey": reporterPubkey,
317 "reported_pubkey": reportedPubkey,
318 "report_type": reportType,
319 }
320
321 result, err := p.db.ExecuteRead(ctx, cypher, params)
322 if err != nil {
323 return "", err
324 }
325
326 if result.Next(ctx) {
327 record := result.Record()
328 if eventID, ok := record.Values[0].(string); ok {
329 return eventID, nil
330 }
331 }
332
333 return "", nil
334 }
335
336 // markReportEventSuperseded marks an older ProcessedSocialEvent as superseded by a newer one
337 func (p *SocialEventProcessor) markReportEventSuperseded(ctx context.Context, oldEventID, newEventID string) {
338 cypher := `
339 MATCH (old:ProcessedSocialEvent {event_id: $old_event_id, event_kind: 1984})
340 SET old.superseded_by = $new_event_id
341 `
342
343 params := map[string]any{
344 "old_event_id": oldEventID,
345 "new_event_id": newEventID,
346 }
347
348 // Ignore errors - old event may not exist
349 p.db.ExecuteWrite(ctx, cypher, params)
350 }
351
352 // UpdateContactListParams holds parameters for contact list graph update
353 type UpdateContactListParams struct {
354 AuthorPubkey string
355 NewEventID string
356 OldEventID string
357 CreatedAt int64
358 AddedFollows []string
359 RemovedFollows []string
360 TotalFollows int
361 }
362
363 // updateContactListGraph performs atomic graph update for contact list changes
364 func (p *SocialEventProcessor) updateContactListGraph(ctx context.Context, params UpdateContactListParams) error {
365 // We need to break this into separate operations because Neo4j's UNWIND
366 // produces zero rows for empty arrays, which stops query execution.
367 // Also, complex query chains with OPTIONAL MATCH can have issues.
368
369 // Step 1: Create the ProcessedSocialEvent and NostrUser nodes
370 createCypher := `
371 // Get or create author node first
372 MERGE (author:NostrUser {pubkey: $author_pubkey})
373 ON CREATE SET author.created_at = timestamp()
374
375 // Create new ProcessedSocialEvent tracking node
376 CREATE (new:ProcessedSocialEvent {
377 event_id: $new_event_id,
378 event_kind: 3,
379 pubkey: $author_pubkey,
380 created_at: $created_at,
381 processed_at: timestamp(),
382 relationship_count: $total_follows,
383 superseded_by: null
384 })
385
386 RETURN author.pubkey AS author_pubkey
387 `
388
389 createParams := map[string]any{
390 "author_pubkey": params.AuthorPubkey,
391 "new_event_id": params.NewEventID,
392 "created_at": params.CreatedAt,
393 "total_follows": params.TotalFollows,
394 }
395
396 _, err := p.db.ExecuteWrite(ctx, createCypher, createParams)
397 if err != nil {
398 return fmt.Errorf("failed to create ProcessedSocialEvent: %w", err)
399 }
400
401 // Step 2: Mark old event as superseded (if it exists)
402 if params.OldEventID != "" {
403 supersedeCypher := `
404 MATCH (old:ProcessedSocialEvent {event_id: $old_event_id})
405 SET old.superseded_by = $new_event_id
406 `
407 supersedeParams := map[string]any{
408 "old_event_id": params.OldEventID,
409 "new_event_id": params.NewEventID,
410 }
411 // Ignore errors - old event may not exist
412 p.db.ExecuteWrite(ctx, supersedeCypher, supersedeParams)
413
414 // Step 3: Update unchanged FOLLOWS to point to new event
415 // Always update relationships that aren't being removed
416 updateCypher := `
417 MATCH (author:NostrUser {pubkey: $author_pubkey})-[f:FOLLOWS]->(followed:NostrUser)
418 WHERE f.created_by_event = $old_event_id
419 AND NOT followed.pubkey IN $removed_follows
420 SET f.created_by_event = $new_event_id,
421 f.created_at = $created_at
422 `
423 updateParams := map[string]any{
424 "author_pubkey": params.AuthorPubkey,
425 "old_event_id": params.OldEventID,
426 "new_event_id": params.NewEventID,
427 "created_at": params.CreatedAt,
428 "removed_follows": params.RemovedFollows,
429 }
430 p.db.ExecuteWrite(ctx, updateCypher, updateParams)
431
432 // Step 4: Remove FOLLOWS for removed follows
433 if len(params.RemovedFollows) > 0 {
434 removeCypher := `
435 MATCH (author:NostrUser {pubkey: $author_pubkey})-[f:FOLLOWS]->(followed:NostrUser)
436 WHERE f.created_by_event = $old_event_id
437 AND followed.pubkey IN $removed_follows
438 DELETE f
439 `
440 removeParams := map[string]any{
441 "author_pubkey": params.AuthorPubkey,
442 "old_event_id": params.OldEventID,
443 "removed_follows": params.RemovedFollows,
444 }
445 p.db.ExecuteWrite(ctx, removeCypher, removeParams)
446 }
447 }
448
449 // Step 5: Create new FOLLOWS relationships for added follows
450 // Process in batches to avoid memory issues
451 const batchSize = 500
452 for i := 0; i < len(params.AddedFollows); i += batchSize {
453 end := i + batchSize
454 if end > len(params.AddedFollows) {
455 end = len(params.AddedFollows)
456 }
457 batch := params.AddedFollows[i:end]
458
459 followsCypher := `
460 MATCH (author:NostrUser {pubkey: $author_pubkey})
461 UNWIND $added_follows AS followed_pubkey
462 MERGE (followed:NostrUser {pubkey: followed_pubkey})
463 ON CREATE SET followed.created_at = timestamp()
464 MERGE (author)-[f:FOLLOWS]->(followed)
465 ON CREATE SET
466 f.created_by_event = $new_event_id,
467 f.created_at = $created_at,
468 f.relay_received_at = timestamp()
469 ON MATCH SET
470 f.created_by_event = $new_event_id,
471 f.created_at = $created_at
472 `
473
474 followsParams := map[string]any{
475 "author_pubkey": params.AuthorPubkey,
476 "new_event_id": params.NewEventID,
477 "created_at": params.CreatedAt,
478 "added_follows": batch,
479 }
480
481 if _, err := p.db.ExecuteWrite(ctx, followsCypher, followsParams); err != nil {
482 return fmt.Errorf("failed to create FOLLOWS batch %d-%d: %w", i, end, err)
483 }
484 }
485
486 return nil
487 }
488
489 // UpdateMuteListParams holds parameters for mute list graph update
490 type UpdateMuteListParams struct {
491 AuthorPubkey string
492 NewEventID string
493 OldEventID string
494 CreatedAt int64
495 AddedMutes []string
496 RemovedMutes []string
497 TotalMutes int
498 }
499
500 // updateMuteListGraph performs atomic graph update for mute list changes
501 func (p *SocialEventProcessor) updateMuteListGraph(ctx context.Context, params UpdateMuteListParams) error {
502 // We need to break this into separate operations because Neo4j's UNWIND
503 // produces zero rows for empty arrays, which stops query execution.
504
505 // Step 1: Create the ProcessedSocialEvent and NostrUser nodes
506 createCypher := `
507 // Get or create author node first
508 MERGE (author:NostrUser {pubkey: $author_pubkey})
509 ON CREATE SET author.created_at = timestamp()
510
511 // Create new ProcessedSocialEvent tracking node
512 CREATE (new:ProcessedSocialEvent {
513 event_id: $new_event_id,
514 event_kind: 10000,
515 pubkey: $author_pubkey,
516 created_at: $created_at,
517 processed_at: timestamp(),
518 relationship_count: $total_mutes,
519 superseded_by: null
520 })
521
522 RETURN author.pubkey AS author_pubkey
523 `
524
525 createParams := map[string]any{
526 "author_pubkey": params.AuthorPubkey,
527 "new_event_id": params.NewEventID,
528 "created_at": params.CreatedAt,
529 "total_mutes": params.TotalMutes,
530 }
531
532 _, err := p.db.ExecuteWrite(ctx, createCypher, createParams)
533 if err != nil {
534 return fmt.Errorf("failed to create ProcessedSocialEvent: %w", err)
535 }
536
537 // Step 2: Mark old event as superseded (if it exists)
538 if params.OldEventID != "" {
539 supersedeCypher := `
540 MATCH (old:ProcessedSocialEvent {event_id: $old_event_id})
541 SET old.superseded_by = $new_event_id
542 `
543 supersedeParams := map[string]any{
544 "old_event_id": params.OldEventID,
545 "new_event_id": params.NewEventID,
546 }
547 p.db.ExecuteWrite(ctx, supersedeCypher, supersedeParams)
548
549 // Step 3: Update unchanged MUTES to point to new event
550 // Always update relationships that aren't being removed
551 updateCypher := `
552 MATCH (author:NostrUser {pubkey: $author_pubkey})-[m:MUTES]->(muted:NostrUser)
553 WHERE m.created_by_event = $old_event_id
554 AND NOT muted.pubkey IN $removed_mutes
555 SET m.created_by_event = $new_event_id,
556 m.created_at = $created_at
557 `
558 updateParams := map[string]any{
559 "author_pubkey": params.AuthorPubkey,
560 "old_event_id": params.OldEventID,
561 "new_event_id": params.NewEventID,
562 "created_at": params.CreatedAt,
563 "removed_mutes": params.RemovedMutes,
564 }
565 p.db.ExecuteWrite(ctx, updateCypher, updateParams)
566
567 // Step 4: Remove MUTES for removed mutes
568 if len(params.RemovedMutes) > 0 {
569 removeCypher := `
570 MATCH (author:NostrUser {pubkey: $author_pubkey})-[m:MUTES]->(muted:NostrUser)
571 WHERE m.created_by_event = $old_event_id
572 AND muted.pubkey IN $removed_mutes
573 DELETE m
574 `
575 removeParams := map[string]any{
576 "author_pubkey": params.AuthorPubkey,
577 "old_event_id": params.OldEventID,
578 "removed_mutes": params.RemovedMutes,
579 }
580 p.db.ExecuteWrite(ctx, removeCypher, removeParams)
581 }
582 }
583
584 // Step 5: Create new MUTES relationships for added mutes
585 // Process in batches to avoid memory issues
586 const batchSize = 500
587 for i := 0; i < len(params.AddedMutes); i += batchSize {
588 end := i + batchSize
589 if end > len(params.AddedMutes) {
590 end = len(params.AddedMutes)
591 }
592 batch := params.AddedMutes[i:end]
593
594 mutesCypher := `
595 MATCH (author:NostrUser {pubkey: $author_pubkey})
596 UNWIND $added_mutes AS muted_pubkey
597 MERGE (muted:NostrUser {pubkey: muted_pubkey})
598 ON CREATE SET muted.created_at = timestamp()
599 MERGE (author)-[m:MUTES]->(muted)
600 ON CREATE SET
601 m.created_by_event = $new_event_id,
602 m.created_at = $created_at,
603 m.relay_received_at = timestamp()
604 ON MATCH SET
605 m.created_by_event = $new_event_id,
606 m.created_at = $created_at
607 `
608
609 mutesParams := map[string]any{
610 "author_pubkey": params.AuthorPubkey,
611 "new_event_id": params.NewEventID,
612 "created_at": params.CreatedAt,
613 "added_mutes": batch,
614 }
615
616 if _, err := p.db.ExecuteWrite(ctx, mutesCypher, mutesParams); err != nil {
617 return fmt.Errorf("failed to create MUTES batch %d-%d: %w", i, end, err)
618 }
619 }
620
621 return nil
622 }
623
624 // getLatestSocialEvent retrieves the most recent non-superseded event of a given kind for a pubkey
625 func (p *SocialEventProcessor) getLatestSocialEvent(ctx context.Context, pubkey string, kind int) (*ProcessedSocialEvent, error) {
626 cypher := `
627 MATCH (evt:ProcessedSocialEvent {pubkey: $pubkey, event_kind: $kind})
628 WHERE evt.superseded_by IS NULL
629 RETURN evt.event_id AS event_id,
630 evt.created_at AS created_at,
631 evt.relationship_count AS relationship_count
632 ORDER BY evt.created_at DESC
633 LIMIT 1
634 `
635
636 params := map[string]any{
637 "pubkey": pubkey,
638 "kind": kind,
639 }
640
641 result, err := p.db.ExecuteRead(ctx, cypher, params)
642 if err != nil {
643 return nil, err
644 }
645
646 if result.Next(ctx) {
647 record := result.Record()
648 return &ProcessedSocialEvent{
649 EventID: record.Values[0].(string),
650 CreatedAt: record.Values[1].(int64),
651 RelationshipCount: int(record.Values[2].(int64)),
652 }, nil
653 }
654
655 return nil, nil // No existing event
656 }
657
658 // getFollowsForEvent retrieves the list of followed pubkeys for a specific event
659 func (p *SocialEventProcessor) getFollowsForEvent(ctx context.Context, eventID string) ([]string, error) {
660 cypher := `
661 MATCH (author:NostrUser)-[f:FOLLOWS]->(followed:NostrUser)
662 WHERE f.created_by_event = $event_id
663 RETURN collect(followed.pubkey) AS pubkeys
664 `
665
666 params := map[string]any{
667 "event_id": eventID,
668 }
669
670 result, err := p.db.ExecuteRead(ctx, cypher, params)
671 if err != nil {
672 return nil, err
673 }
674
675 if result.Next(ctx) {
676 record := result.Record()
677 pubkeysRaw := record.Values[0].([]interface{})
678 pubkeys := make([]string, len(pubkeysRaw))
679 for i, p := range pubkeysRaw {
680 pubkeys[i] = p.(string)
681 }
682 return pubkeys, nil
683 }
684
685 return []string{}, nil
686 }
687
688 // getMutesForEvent retrieves the list of muted pubkeys for a specific event
689 func (p *SocialEventProcessor) getMutesForEvent(ctx context.Context, eventID string) ([]string, error) {
690 cypher := `
691 MATCH (author:NostrUser)-[m:MUTES]->(muted:NostrUser)
692 WHERE m.created_by_event = $event_id
693 RETURN collect(muted.pubkey) AS pubkeys
694 `
695
696 params := map[string]any{
697 "event_id": eventID,
698 }
699
700 result, err := p.db.ExecuteRead(ctx, cypher, params)
701 if err != nil {
702 return nil, err
703 }
704
705 if result.Next(ctx) {
706 record := result.Record()
707 pubkeysRaw := record.Values[0].([]interface{})
708 pubkeys := make([]string, len(pubkeysRaw))
709 for i, p := range pubkeysRaw {
710 pubkeys[i] = p.(string)
711 }
712 return pubkeys, nil
713 }
714
715 return []string{}, nil
716 }
717
718 // BatchProcessContactLists processes multiple contact list events in order
719 func (p *SocialEventProcessor) BatchProcessContactLists(ctx context.Context, events []*event.E) error {
720 // Group by author
721 byAuthor := make(map[string][]*event.E)
722 for _, ev := range events {
723 if ev.Kind != 3 {
724 continue
725 }
726 pubkey := hex.Enc(ev.Pubkey[:])
727 byAuthor[pubkey] = append(byAuthor[pubkey], ev)
728 }
729
730 // Process each author's events in chronological order
731 for pubkey, authorEvents := range byAuthor {
732 // Sort by created_at (oldest first)
733 sort.Slice(authorEvents, func(i, j int) bool {
734 return authorEvents[i].CreatedAt < authorEvents[j].CreatedAt
735 })
736
737 // Process in order
738 for _, ev := range authorEvents {
739 if err := p.processContactList(ctx, ev); err != nil {
740 return fmt.Errorf("batch process failed for %s: %w", pubkey, err)
741 }
742 }
743 }
744
745 return nil
746 }
747
748 // Helper functions
749
750 // extractPTags extracts unique pubkeys from p-tags
751 // Uses ExtractPTagValue to properly handle binary-encoded tag values
752 // and normalizes to lowercase hex for consistent Neo4j storage
753 func extractPTags(ev *event.E) []string {
754 seen := make(map[string]bool)
755 var pubkeys []string
756
757 for _, t := range *ev.Tags {
758 if len(t.T) >= 2 && string(t.T[0]) == "p" {
759 // Use ExtractPTagValue to handle binary encoding and normalize to lowercase
760 pubkey := ExtractPTagValue(t)
761 if IsValidHexPubkey(pubkey) && !seen[pubkey] {
762 seen[pubkey] = true
763 pubkeys = append(pubkeys, pubkey)
764 }
765 }
766 }
767
768 return pubkeys
769 }
770
771 // diffStringSlices computes added and removed elements between old and new slices
772 func diffStringSlices(old, new []string) (added, removed []string) {
773 oldSet := make(map[string]bool)
774 for _, s := range old {
775 oldSet[s] = true
776 }
777
778 newSet := make(map[string]bool)
779 for _, s := range new {
780 newSet[s] = true
781 if !oldSet[s] {
782 added = append(added, s)
783 }
784 }
785
786 for _, s := range old {
787 if !newSet[s] {
788 removed = append(removed, s)
789 }
790 }
791
792 return
793 }
794
795 // getStringFromMap safely extracts a string value from a map
796 func getStringFromMap(m map[string]interface{}, key string) string {
797 if val, ok := m[key]; ok {
798 if str, ok := val.(string); ok {
799 return str
800 }
801 }
802 return ""
803 }
804