social-event-processor.go raw

   1  package neo4j
   2  
   3  import (
   4  	"context"
   5  	"encoding/json"
   6  	"fmt"
   7  	"sort"
   8  
   9  	"next.orly.dev/pkg/nostr/encoders/event"
  10  	"next.orly.dev/pkg/nostr/encoders/hex"
  11  )
  12  
  13  // SocialEventProcessor handles kind 0, 3, 1984, 10000 events for social graph management
  14  type SocialEventProcessor struct {
  15  	db *N
  16  }
  17  
  18  // NewSocialEventProcessor creates a new social event processor
  19  func NewSocialEventProcessor(db *N) *SocialEventProcessor {
  20  	return &SocialEventProcessor{db: db}
  21  }
  22  
  23  // ProcessedSocialEvent represents a processed social graph event in Neo4j
  24  type ProcessedSocialEvent struct {
  25  	EventID           string
  26  	EventKind         int
  27  	Pubkey            string
  28  	CreatedAt         int64
  29  	ProcessedAt       int64
  30  	RelationshipCount int
  31  	SupersededBy      *string // nil if still active
  32  }
  33  
  34  // ProcessSocialEvent routes events to appropriate handlers based on kind
  35  func (p *SocialEventProcessor) ProcessSocialEvent(ctx context.Context, ev *event.E) error {
  36  	switch ev.Kind {
  37  	case 0:
  38  		return p.processProfileMetadata(ctx, ev)
  39  	case 3:
  40  		return p.processContactList(ctx, ev)
  41  	case 1984:
  42  		return p.processReport(ctx, ev)
  43  	case 10000:
  44  		return p.processMuteList(ctx, ev)
  45  	default:
  46  		return fmt.Errorf("unsupported social event kind: %d", ev.Kind)
  47  	}
  48  }
  49  
  50  // processProfileMetadata handles kind 0 events (profile metadata)
  51  func (p *SocialEventProcessor) processProfileMetadata(ctx context.Context, ev *event.E) error {
  52  	pubkey := hex.Enc(ev.Pubkey[:])
  53  	eventID := hex.Enc(ev.ID[:])
  54  
  55  	// Parse profile JSON from content
  56  	var profile map[string]interface{}
  57  	if err := json.Unmarshal(ev.Content, &profile); err != nil {
  58  		p.db.Logger.Warningf("invalid profile JSON in event %s: %v", eventID, err)
  59  		return nil // Don't fail, just skip profile update
  60  	}
  61  
  62  	// Update NostrUser node with profile data
  63  	cypher := `
  64  		MERGE (user:NostrUser {pubkey: $pubkey})
  65  		ON CREATE SET
  66  			user.created_at = timestamp(),
  67  			user.first_seen_event = $event_id
  68  		ON MATCH SET
  69  			user.last_profile_update = $created_at
  70  		SET
  71  			user.name = $name,
  72  			user.about = $about,
  73  			user.picture = $picture,
  74  			user.nip05 = $nip05,
  75  			user.lud16 = $lud16,
  76  			user.display_name = $display_name,
  77  			user.npub = $npub
  78  	`
  79  
  80  	params := map[string]any{
  81  		"pubkey":       pubkey,
  82  		"event_id":     eventID,
  83  		"created_at":   ev.CreatedAt,
  84  		"name":         getStringFromMap(profile, "name"),
  85  		"about":        getStringFromMap(profile, "about"),
  86  		"picture":      getStringFromMap(profile, "picture"),
  87  		"nip05":        getStringFromMap(profile, "nip05"),
  88  		"lud16":        getStringFromMap(profile, "lud16"),
  89  		"display_name": getStringFromMap(profile, "display_name"),
  90  		"npub":         "", // TODO: compute npub from pubkey
  91  	}
  92  
  93  	_, err := p.db.ExecuteWrite(ctx, cypher, params)
  94  	if err != nil {
  95  		return fmt.Errorf("failed to update profile: %w", err)
  96  	}
  97  
  98  	p.db.Logger.Infof("updated profile for user %s", safePrefix(pubkey, 16))
  99  	return nil
 100  }
 101  
 102  // processContactList handles kind 3 events (follow lists)
 103  func (p *SocialEventProcessor) processContactList(ctx context.Context, ev *event.E) error {
 104  	authorPubkey := hex.Enc(ev.Pubkey[:])
 105  	eventID := hex.Enc(ev.ID[:])
 106  
 107  	// 1. Check for existing contact list
 108  	existingEvent, err := p.getLatestSocialEvent(ctx, authorPubkey, 3)
 109  	if err != nil {
 110  		return fmt.Errorf("failed to check existing contact list: %w", err)
 111  	}
 112  
 113  	// 2. Reject if this event is older than existing
 114  	if existingEvent != nil && existingEvent.CreatedAt >= ev.CreatedAt {
 115  		p.db.Logger.Infof("rejecting older contact list event %s (existing: %s)",
 116  			safePrefix(eventID, 16), safePrefix(existingEvent.EventID, 16))
 117  		return nil // Not an error, just skip
 118  	}
 119  
 120  	// 3. Extract p-tags to get new follows list
 121  	newFollows := extractPTags(ev)
 122  
 123  	// 4. Get old follows list if replacing an existing event
 124  	var oldFollows []string
 125  	var oldEventID string
 126  	if existingEvent != nil {
 127  		oldEventID = existingEvent.EventID
 128  		oldFollows, err = p.getFollowsForEvent(ctx, oldEventID)
 129  		if err != nil {
 130  			return fmt.Errorf("failed to get old follows: %w", err)
 131  		}
 132  	}
 133  
 134  	// 5. Compute diff
 135  	added, removed := diffStringSlices(oldFollows, newFollows)
 136  
 137  	// 6. Update graph in transaction
 138  	err = p.updateContactListGraph(ctx, UpdateContactListParams{
 139  		AuthorPubkey:   authorPubkey,
 140  		NewEventID:     eventID,
 141  		OldEventID:     oldEventID,
 142  		CreatedAt:      ev.CreatedAt,
 143  		AddedFollows:   added,
 144  		RemovedFollows: removed,
 145  		TotalFollows:   len(newFollows),
 146  	})
 147  
 148  	if err != nil {
 149  		return fmt.Errorf("failed to update contact list graph: %w", err)
 150  	}
 151  
 152  	p.db.Logger.Infof("processed contact list: author=%s, event=%s, added=%d, removed=%d, total=%d",
 153  		safePrefix(authorPubkey, 16), safePrefix(eventID, 16), len(added), len(removed), len(newFollows))
 154  
 155  	return nil
 156  }
 157  
 158  // processMuteList handles kind 10000 events (mute lists)
 159  func (p *SocialEventProcessor) processMuteList(ctx context.Context, ev *event.E) error {
 160  	authorPubkey := hex.Enc(ev.Pubkey[:])
 161  	eventID := hex.Enc(ev.ID[:])
 162  
 163  	// Check for existing mute list
 164  	existingEvent, err := p.getLatestSocialEvent(ctx, authorPubkey, 10000)
 165  	if err != nil {
 166  		return fmt.Errorf("failed to check existing mute list: %w", err)
 167  	}
 168  
 169  	// Reject if older
 170  	if existingEvent != nil && existingEvent.CreatedAt >= ev.CreatedAt {
 171  		p.db.Logger.Infof("rejecting older mute list event %s", safePrefix(eventID, 16))
 172  		return nil
 173  	}
 174  
 175  	// Extract p-tags
 176  	newMutes := extractPTags(ev)
 177  
 178  	// Get old mutes
 179  	var oldMutes []string
 180  	var oldEventID string
 181  	if existingEvent != nil {
 182  		oldEventID = existingEvent.EventID
 183  		oldMutes, err = p.getMutesForEvent(ctx, oldEventID)
 184  		if err != nil {
 185  			return fmt.Errorf("failed to get old mutes: %w", err)
 186  		}
 187  	}
 188  
 189  	// Compute diff
 190  	added, removed := diffStringSlices(oldMutes, newMutes)
 191  
 192  	// Update graph
 193  	err = p.updateMuteListGraph(ctx, UpdateMuteListParams{
 194  		AuthorPubkey:  authorPubkey,
 195  		NewEventID:    eventID,
 196  		OldEventID:    oldEventID,
 197  		CreatedAt:     ev.CreatedAt,
 198  		AddedMutes:    added,
 199  		RemovedMutes:  removed,
 200  		TotalMutes:    len(newMutes),
 201  	})
 202  
 203  	if err != nil {
 204  		return fmt.Errorf("failed to update mute list graph: %w", err)
 205  	}
 206  
 207  	p.db.Logger.Infof("processed mute list: author=%s, event=%s, added=%d, removed=%d",
 208  		safePrefix(authorPubkey, 16), safePrefix(eventID, 16), len(added), len(removed))
 209  
 210  	return nil
 211  }
 212  
 213  // processReport handles kind 1984 events (reports)
 214  // Deduplicates by (reporter, reported, report_type) - only one REPORTS relationship
 215  // per combination, with the most recent event's data preserved.
 216  func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) error {
 217  	reporterPubkey := hex.Enc(ev.Pubkey[:])
 218  	eventID := hex.Enc(ev.ID[:])
 219  
 220  	// Extract report target and type from tags
 221  	// Format: ["p", "reported_pubkey", "report_type"]
 222  	var reportedPubkey string
 223  	var reportType string = "other" // default
 224  
 225  	for _, t := range *ev.Tags {
 226  		if len(t.T) >= 2 && string(t.T[0]) == "p" {
 227  			// Use ExtractPTagValue to handle binary encoding and normalize to lowercase
 228  			reportedPubkey = ExtractPTagValue(t)
 229  			if len(t.T) >= 3 {
 230  				reportType = string(t.T[2])
 231  			}
 232  			break // Use first p-tag
 233  		}
 234  	}
 235  
 236  	if reportedPubkey == "" {
 237  		p.db.Logger.Warningf("report event %s has no p-tag, skipping", safePrefix(eventID, 16))
 238  		return nil
 239  	}
 240  
 241  	// Check for existing report of the same type to determine if this is an update
 242  	existingEventID, err := p.getExistingReportEvent(ctx, reporterPubkey, reportedPubkey, reportType)
 243  	if err != nil {
 244  		return fmt.Errorf("failed to check existing report: %w", err)
 245  	}
 246  
 247  	// Create REPORTS relationship with MERGE to deduplicate
 248  	// MERGE on (reporter, reported, report_type) ensures only one relationship per combination
 249  	cypher := `
 250  		// Create event tracking node
 251  		CREATE (evt:ProcessedSocialEvent {
 252  			event_id: $event_id,
 253  			event_kind: 1984,
 254  			pubkey: $reporter_pubkey,
 255  			created_at: $created_at,
 256  			processed_at: timestamp(),
 257  			relationship_count: 1,
 258  			superseded_by: null
 259  		})
 260  
 261  		// WITH required to transition from CREATE to MERGE
 262  		WITH evt
 263  
 264  		// Create or get reporter and reported users
 265  		MERGE (reporter:NostrUser {pubkey: $reporter_pubkey})
 266  		MERGE (reported:NostrUser {pubkey: $reported_pubkey})
 267  
 268  		// MERGE on (reporter, reported, report_type) - deduplicate!
 269  		MERGE (reporter)-[r:REPORTS {report_type: $report_type}]->(reported)
 270  		ON CREATE SET
 271  			r.created_by_event = $event_id,
 272  			r.created_at = $created_at,
 273  			r.relay_received_at = timestamp()
 274  		ON MATCH SET
 275  			// Only update if this event is newer
 276  			r.created_by_event = CASE WHEN $created_at > r.created_at
 277  			                          THEN $event_id ELSE r.created_by_event END,
 278  			r.created_at = CASE WHEN $created_at > r.created_at
 279  			                    THEN $created_at ELSE r.created_at END
 280  	`
 281  
 282  	params := map[string]any{
 283  		"event_id":        eventID,
 284  		"reporter_pubkey": reporterPubkey,
 285  		"reported_pubkey": reportedPubkey,
 286  		"created_at":      ev.CreatedAt,
 287  		"report_type":     reportType,
 288  	}
 289  
 290  	_, err = p.db.ExecuteWrite(ctx, cypher, params)
 291  	if err != nil {
 292  		return fmt.Errorf("failed to create/update report: %w", err)
 293  	}
 294  
 295  	// Mark old ProcessedSocialEvent as superseded if this is an update with newer data
 296  	if existingEventID != "" && existingEventID != eventID {
 297  		p.markReportEventSuperseded(ctx, existingEventID, eventID)
 298  	}
 299  
 300  	p.db.Logger.Infof("processed report: reporter=%s, reported=%s, type=%s",
 301  		safePrefix(reporterPubkey, 16), safePrefix(reportedPubkey, 16), reportType)
 302  
 303  	return nil
 304  }
 305  
 306  // getExistingReportEvent checks if a REPORTS relationship already exists for this combination
 307  // Returns the event ID that created the relationship, or empty string if none exists
 308  func (p *SocialEventProcessor) getExistingReportEvent(ctx context.Context, reporterPubkey, reportedPubkey, reportType string) (string, error) {
 309  	cypher := `
 310  		MATCH (reporter:NostrUser {pubkey: $reporter_pubkey})-[r:REPORTS {report_type: $report_type}]->(reported:NostrUser {pubkey: $reported_pubkey})
 311  		RETURN r.created_by_event AS event_id
 312  		LIMIT 1
 313  	`
 314  
 315  	params := map[string]any{
 316  		"reporter_pubkey": reporterPubkey,
 317  		"reported_pubkey": reportedPubkey,
 318  		"report_type":     reportType,
 319  	}
 320  
 321  	result, err := p.db.ExecuteRead(ctx, cypher, params)
 322  	if err != nil {
 323  		return "", err
 324  	}
 325  
 326  	if result.Next(ctx) {
 327  		record := result.Record()
 328  		if eventID, ok := record.Values[0].(string); ok {
 329  			return eventID, nil
 330  		}
 331  	}
 332  
 333  	return "", nil
 334  }
 335  
 336  // markReportEventSuperseded marks an older ProcessedSocialEvent as superseded by a newer one
 337  func (p *SocialEventProcessor) markReportEventSuperseded(ctx context.Context, oldEventID, newEventID string) {
 338  	cypher := `
 339  		MATCH (old:ProcessedSocialEvent {event_id: $old_event_id, event_kind: 1984})
 340  		SET old.superseded_by = $new_event_id
 341  	`
 342  
 343  	params := map[string]any{
 344  		"old_event_id": oldEventID,
 345  		"new_event_id": newEventID,
 346  	}
 347  
 348  	// Ignore errors - old event may not exist
 349  	p.db.ExecuteWrite(ctx, cypher, params)
 350  }
 351  
 352  // UpdateContactListParams holds parameters for contact list graph update
 353  type UpdateContactListParams struct {
 354  	AuthorPubkey   string
 355  	NewEventID     string
 356  	OldEventID     string
 357  	CreatedAt      int64
 358  	AddedFollows   []string
 359  	RemovedFollows []string
 360  	TotalFollows   int
 361  }
 362  
 363  // updateContactListGraph performs atomic graph update for contact list changes
 364  func (p *SocialEventProcessor) updateContactListGraph(ctx context.Context, params UpdateContactListParams) error {
 365  	// We need to break this into separate operations because Neo4j's UNWIND
 366  	// produces zero rows for empty arrays, which stops query execution.
 367  	// Also, complex query chains with OPTIONAL MATCH can have issues.
 368  
 369  	// Step 1: Create the ProcessedSocialEvent and NostrUser nodes
 370  	createCypher := `
 371  		// Get or create author node first
 372  		MERGE (author:NostrUser {pubkey: $author_pubkey})
 373  		ON CREATE SET author.created_at = timestamp()
 374  
 375  		// Create new ProcessedSocialEvent tracking node
 376  		CREATE (new:ProcessedSocialEvent {
 377  			event_id: $new_event_id,
 378  			event_kind: 3,
 379  			pubkey: $author_pubkey,
 380  			created_at: $created_at,
 381  			processed_at: timestamp(),
 382  			relationship_count: $total_follows,
 383  			superseded_by: null
 384  		})
 385  
 386  		RETURN author.pubkey AS author_pubkey
 387  	`
 388  
 389  	createParams := map[string]any{
 390  		"author_pubkey": params.AuthorPubkey,
 391  		"new_event_id":  params.NewEventID,
 392  		"created_at":    params.CreatedAt,
 393  		"total_follows": params.TotalFollows,
 394  	}
 395  
 396  	_, err := p.db.ExecuteWrite(ctx, createCypher, createParams)
 397  	if err != nil {
 398  		return fmt.Errorf("failed to create ProcessedSocialEvent: %w", err)
 399  	}
 400  
 401  	// Step 2: Mark old event as superseded (if it exists)
 402  	if params.OldEventID != "" {
 403  		supersedeCypher := `
 404  			MATCH (old:ProcessedSocialEvent {event_id: $old_event_id})
 405  			SET old.superseded_by = $new_event_id
 406  		`
 407  		supersedeParams := map[string]any{
 408  			"old_event_id": params.OldEventID,
 409  			"new_event_id": params.NewEventID,
 410  		}
 411  		// Ignore errors - old event may not exist
 412  		p.db.ExecuteWrite(ctx, supersedeCypher, supersedeParams)
 413  
 414  		// Step 3: Update unchanged FOLLOWS to point to new event
 415  		// Always update relationships that aren't being removed
 416  		updateCypher := `
 417  			MATCH (author:NostrUser {pubkey: $author_pubkey})-[f:FOLLOWS]->(followed:NostrUser)
 418  			WHERE f.created_by_event = $old_event_id
 419  			  AND NOT followed.pubkey IN $removed_follows
 420  			SET f.created_by_event = $new_event_id,
 421  			    f.created_at = $created_at
 422  		`
 423  		updateParams := map[string]any{
 424  			"author_pubkey":   params.AuthorPubkey,
 425  			"old_event_id":    params.OldEventID,
 426  			"new_event_id":    params.NewEventID,
 427  			"created_at":      params.CreatedAt,
 428  			"removed_follows": params.RemovedFollows,
 429  		}
 430  		p.db.ExecuteWrite(ctx, updateCypher, updateParams)
 431  
 432  		// Step 4: Remove FOLLOWS for removed follows
 433  		if len(params.RemovedFollows) > 0 {
 434  			removeCypher := `
 435  				MATCH (author:NostrUser {pubkey: $author_pubkey})-[f:FOLLOWS]->(followed:NostrUser)
 436  				WHERE f.created_by_event = $old_event_id
 437  				  AND followed.pubkey IN $removed_follows
 438  				DELETE f
 439  			`
 440  			removeParams := map[string]any{
 441  				"author_pubkey":   params.AuthorPubkey,
 442  				"old_event_id":    params.OldEventID,
 443  				"removed_follows": params.RemovedFollows,
 444  			}
 445  			p.db.ExecuteWrite(ctx, removeCypher, removeParams)
 446  		}
 447  	}
 448  
 449  	// Step 5: Create new FOLLOWS relationships for added follows
 450  	// Process in batches to avoid memory issues
 451  	const batchSize = 500
 452  	for i := 0; i < len(params.AddedFollows); i += batchSize {
 453  		end := i + batchSize
 454  		if end > len(params.AddedFollows) {
 455  			end = len(params.AddedFollows)
 456  		}
 457  		batch := params.AddedFollows[i:end]
 458  
 459  		followsCypher := `
 460  			MATCH (author:NostrUser {pubkey: $author_pubkey})
 461  			UNWIND $added_follows AS followed_pubkey
 462  			MERGE (followed:NostrUser {pubkey: followed_pubkey})
 463  			ON CREATE SET followed.created_at = timestamp()
 464  			MERGE (author)-[f:FOLLOWS]->(followed)
 465  			ON CREATE SET
 466  				f.created_by_event = $new_event_id,
 467  				f.created_at = $created_at,
 468  				f.relay_received_at = timestamp()
 469  			ON MATCH SET
 470  				f.created_by_event = $new_event_id,
 471  				f.created_at = $created_at
 472  		`
 473  
 474  		followsParams := map[string]any{
 475  			"author_pubkey": params.AuthorPubkey,
 476  			"new_event_id":  params.NewEventID,
 477  			"created_at":    params.CreatedAt,
 478  			"added_follows": batch,
 479  		}
 480  
 481  		if _, err := p.db.ExecuteWrite(ctx, followsCypher, followsParams); err != nil {
 482  			return fmt.Errorf("failed to create FOLLOWS batch %d-%d: %w", i, end, err)
 483  		}
 484  	}
 485  
 486  	return nil
 487  }
 488  
 489  // UpdateMuteListParams holds parameters for mute list graph update
 490  type UpdateMuteListParams struct {
 491  	AuthorPubkey  string
 492  	NewEventID    string
 493  	OldEventID    string
 494  	CreatedAt     int64
 495  	AddedMutes    []string
 496  	RemovedMutes  []string
 497  	TotalMutes    int
 498  }
 499  
 500  // updateMuteListGraph performs atomic graph update for mute list changes
 501  func (p *SocialEventProcessor) updateMuteListGraph(ctx context.Context, params UpdateMuteListParams) error {
 502  	// We need to break this into separate operations because Neo4j's UNWIND
 503  	// produces zero rows for empty arrays, which stops query execution.
 504  
 505  	// Step 1: Create the ProcessedSocialEvent and NostrUser nodes
 506  	createCypher := `
 507  		// Get or create author node first
 508  		MERGE (author:NostrUser {pubkey: $author_pubkey})
 509  		ON CREATE SET author.created_at = timestamp()
 510  
 511  		// Create new ProcessedSocialEvent tracking node
 512  		CREATE (new:ProcessedSocialEvent {
 513  			event_id: $new_event_id,
 514  			event_kind: 10000,
 515  			pubkey: $author_pubkey,
 516  			created_at: $created_at,
 517  			processed_at: timestamp(),
 518  			relationship_count: $total_mutes,
 519  			superseded_by: null
 520  		})
 521  
 522  		RETURN author.pubkey AS author_pubkey
 523  	`
 524  
 525  	createParams := map[string]any{
 526  		"author_pubkey": params.AuthorPubkey,
 527  		"new_event_id":  params.NewEventID,
 528  		"created_at":    params.CreatedAt,
 529  		"total_mutes":   params.TotalMutes,
 530  	}
 531  
 532  	_, err := p.db.ExecuteWrite(ctx, createCypher, createParams)
 533  	if err != nil {
 534  		return fmt.Errorf("failed to create ProcessedSocialEvent: %w", err)
 535  	}
 536  
 537  	// Step 2: Mark old event as superseded (if it exists)
 538  	if params.OldEventID != "" {
 539  		supersedeCypher := `
 540  			MATCH (old:ProcessedSocialEvent {event_id: $old_event_id})
 541  			SET old.superseded_by = $new_event_id
 542  		`
 543  		supersedeParams := map[string]any{
 544  			"old_event_id": params.OldEventID,
 545  			"new_event_id": params.NewEventID,
 546  		}
 547  		p.db.ExecuteWrite(ctx, supersedeCypher, supersedeParams)
 548  
 549  		// Step 3: Update unchanged MUTES to point to new event
 550  		// Always update relationships that aren't being removed
 551  		updateCypher := `
 552  			MATCH (author:NostrUser {pubkey: $author_pubkey})-[m:MUTES]->(muted:NostrUser)
 553  			WHERE m.created_by_event = $old_event_id
 554  			  AND NOT muted.pubkey IN $removed_mutes
 555  			SET m.created_by_event = $new_event_id,
 556  			    m.created_at = $created_at
 557  		`
 558  		updateParams := map[string]any{
 559  			"author_pubkey": params.AuthorPubkey,
 560  			"old_event_id":  params.OldEventID,
 561  			"new_event_id":  params.NewEventID,
 562  			"created_at":    params.CreatedAt,
 563  			"removed_mutes": params.RemovedMutes,
 564  		}
 565  		p.db.ExecuteWrite(ctx, updateCypher, updateParams)
 566  
 567  		// Step 4: Remove MUTES for removed mutes
 568  		if len(params.RemovedMutes) > 0 {
 569  			removeCypher := `
 570  				MATCH (author:NostrUser {pubkey: $author_pubkey})-[m:MUTES]->(muted:NostrUser)
 571  				WHERE m.created_by_event = $old_event_id
 572  				  AND muted.pubkey IN $removed_mutes
 573  				DELETE m
 574  			`
 575  			removeParams := map[string]any{
 576  				"author_pubkey": params.AuthorPubkey,
 577  				"old_event_id":  params.OldEventID,
 578  				"removed_mutes": params.RemovedMutes,
 579  			}
 580  			p.db.ExecuteWrite(ctx, removeCypher, removeParams)
 581  		}
 582  	}
 583  
 584  	// Step 5: Create new MUTES relationships for added mutes
 585  	// Process in batches to avoid memory issues
 586  	const batchSize = 500
 587  	for i := 0; i < len(params.AddedMutes); i += batchSize {
 588  		end := i + batchSize
 589  		if end > len(params.AddedMutes) {
 590  			end = len(params.AddedMutes)
 591  		}
 592  		batch := params.AddedMutes[i:end]
 593  
 594  		mutesCypher := `
 595  			MATCH (author:NostrUser {pubkey: $author_pubkey})
 596  			UNWIND $added_mutes AS muted_pubkey
 597  			MERGE (muted:NostrUser {pubkey: muted_pubkey})
 598  			ON CREATE SET muted.created_at = timestamp()
 599  			MERGE (author)-[m:MUTES]->(muted)
 600  			ON CREATE SET
 601  				m.created_by_event = $new_event_id,
 602  				m.created_at = $created_at,
 603  				m.relay_received_at = timestamp()
 604  			ON MATCH SET
 605  				m.created_by_event = $new_event_id,
 606  				m.created_at = $created_at
 607  		`
 608  
 609  		mutesParams := map[string]any{
 610  			"author_pubkey": params.AuthorPubkey,
 611  			"new_event_id":  params.NewEventID,
 612  			"created_at":    params.CreatedAt,
 613  			"added_mutes":   batch,
 614  		}
 615  
 616  		if _, err := p.db.ExecuteWrite(ctx, mutesCypher, mutesParams); err != nil {
 617  			return fmt.Errorf("failed to create MUTES batch %d-%d: %w", i, end, err)
 618  		}
 619  	}
 620  
 621  	return nil
 622  }
 623  
 624  // getLatestSocialEvent retrieves the most recent non-superseded event of a given kind for a pubkey
 625  func (p *SocialEventProcessor) getLatestSocialEvent(ctx context.Context, pubkey string, kind int) (*ProcessedSocialEvent, error) {
 626  	cypher := `
 627  		MATCH (evt:ProcessedSocialEvent {pubkey: $pubkey, event_kind: $kind})
 628  		WHERE evt.superseded_by IS NULL
 629  		RETURN evt.event_id AS event_id,
 630  		       evt.created_at AS created_at,
 631  		       evt.relationship_count AS relationship_count
 632  		ORDER BY evt.created_at DESC
 633  		LIMIT 1
 634  	`
 635  
 636  	params := map[string]any{
 637  		"pubkey": pubkey,
 638  		"kind":   kind,
 639  	}
 640  
 641  	result, err := p.db.ExecuteRead(ctx, cypher, params)
 642  	if err != nil {
 643  		return nil, err
 644  	}
 645  
 646  	if result.Next(ctx) {
 647  		record := result.Record()
 648  		return &ProcessedSocialEvent{
 649  			EventID:           record.Values[0].(string),
 650  			CreatedAt:         record.Values[1].(int64),
 651  			RelationshipCount: int(record.Values[2].(int64)),
 652  		}, nil
 653  	}
 654  
 655  	return nil, nil // No existing event
 656  }
 657  
 658  // getFollowsForEvent retrieves the list of followed pubkeys for a specific event
 659  func (p *SocialEventProcessor) getFollowsForEvent(ctx context.Context, eventID string) ([]string, error) {
 660  	cypher := `
 661  		MATCH (author:NostrUser)-[f:FOLLOWS]->(followed:NostrUser)
 662  		WHERE f.created_by_event = $event_id
 663  		RETURN collect(followed.pubkey) AS pubkeys
 664  	`
 665  
 666  	params := map[string]any{
 667  		"event_id": eventID,
 668  	}
 669  
 670  	result, err := p.db.ExecuteRead(ctx, cypher, params)
 671  	if err != nil {
 672  		return nil, err
 673  	}
 674  
 675  	if result.Next(ctx) {
 676  		record := result.Record()
 677  		pubkeysRaw := record.Values[0].([]interface{})
 678  		pubkeys := make([]string, len(pubkeysRaw))
 679  		for i, p := range pubkeysRaw {
 680  			pubkeys[i] = p.(string)
 681  		}
 682  		return pubkeys, nil
 683  	}
 684  
 685  	return []string{}, nil
 686  }
 687  
 688  // getMutesForEvent retrieves the list of muted pubkeys for a specific event
 689  func (p *SocialEventProcessor) getMutesForEvent(ctx context.Context, eventID string) ([]string, error) {
 690  	cypher := `
 691  		MATCH (author:NostrUser)-[m:MUTES]->(muted:NostrUser)
 692  		WHERE m.created_by_event = $event_id
 693  		RETURN collect(muted.pubkey) AS pubkeys
 694  	`
 695  
 696  	params := map[string]any{
 697  		"event_id": eventID,
 698  	}
 699  
 700  	result, err := p.db.ExecuteRead(ctx, cypher, params)
 701  	if err != nil {
 702  		return nil, err
 703  	}
 704  
 705  	if result.Next(ctx) {
 706  		record := result.Record()
 707  		pubkeysRaw := record.Values[0].([]interface{})
 708  		pubkeys := make([]string, len(pubkeysRaw))
 709  		for i, p := range pubkeysRaw {
 710  			pubkeys[i] = p.(string)
 711  		}
 712  		return pubkeys, nil
 713  	}
 714  
 715  	return []string{}, nil
 716  }
 717  
 718  // BatchProcessContactLists processes multiple contact list events in order
 719  func (p *SocialEventProcessor) BatchProcessContactLists(ctx context.Context, events []*event.E) error {
 720  	// Group by author
 721  	byAuthor := make(map[string][]*event.E)
 722  	for _, ev := range events {
 723  		if ev.Kind != 3 {
 724  			continue
 725  		}
 726  		pubkey := hex.Enc(ev.Pubkey[:])
 727  		byAuthor[pubkey] = append(byAuthor[pubkey], ev)
 728  	}
 729  
 730  	// Process each author's events in chronological order
 731  	for pubkey, authorEvents := range byAuthor {
 732  		// Sort by created_at (oldest first)
 733  		sort.Slice(authorEvents, func(i, j int) bool {
 734  			return authorEvents[i].CreatedAt < authorEvents[j].CreatedAt
 735  		})
 736  
 737  		// Process in order
 738  		for _, ev := range authorEvents {
 739  			if err := p.processContactList(ctx, ev); err != nil {
 740  				return fmt.Errorf("batch process failed for %s: %w", pubkey, err)
 741  			}
 742  		}
 743  	}
 744  
 745  	return nil
 746  }
 747  
 748  // Helper functions
 749  
 750  // extractPTags extracts unique pubkeys from p-tags
 751  // Uses ExtractPTagValue to properly handle binary-encoded tag values
 752  // and normalizes to lowercase hex for consistent Neo4j storage
 753  func extractPTags(ev *event.E) []string {
 754  	seen := make(map[string]bool)
 755  	var pubkeys []string
 756  
 757  	for _, t := range *ev.Tags {
 758  		if len(t.T) >= 2 && string(t.T[0]) == "p" {
 759  			// Use ExtractPTagValue to handle binary encoding and normalize to lowercase
 760  			pubkey := ExtractPTagValue(t)
 761  			if IsValidHexPubkey(pubkey) && !seen[pubkey] {
 762  				seen[pubkey] = true
 763  				pubkeys = append(pubkeys, pubkey)
 764  			}
 765  		}
 766  	}
 767  
 768  	return pubkeys
 769  }
 770  
 771  // diffStringSlices computes added and removed elements between old and new slices
 772  func diffStringSlices(old, new []string) (added, removed []string) {
 773  	oldSet := make(map[string]bool)
 774  	for _, s := range old {
 775  		oldSet[s] = true
 776  	}
 777  
 778  	newSet := make(map[string]bool)
 779  	for _, s := range new {
 780  		newSet[s] = true
 781  		if !oldSet[s] {
 782  			added = append(added, s)
 783  		}
 784  	}
 785  
 786  	for _, s := range old {
 787  		if !newSet[s] {
 788  			removed = append(removed, s)
 789  		}
 790  	}
 791  
 792  	return
 793  }
 794  
 795  // getStringFromMap safely extracts a string value from a map
 796  func getStringFromMap(m map[string]interface{}, key string) string {
 797  	if val, ok := m[key]; ok {
 798  		if str, ok := val.(string); ok {
 799  			return str
 800  		}
 801  	}
 802  	return ""
 803  }
 804