follows.go raw

   1  package acl
   2  
   3  import (
   4  	"context"
   5  	"encoding/hex"
   6  	"net/http"
   7  	"reflect"
   8  	"strings"
   9  	"sync"
  10  	"time"
  11  
  12  	"github.com/gorilla/websocket"
  13  	"next.orly.dev/pkg/lol/chk"
  14  	"next.orly.dev/pkg/lol/errorf"
  15  	"next.orly.dev/pkg/lol/log"
  16  	"next.orly.dev/app/config"
  17  	"next.orly.dev/pkg/database"
  18  	"next.orly.dev/pkg/nostr/encoders/bech32encoding"
  19  	"next.orly.dev/pkg/nostr/encoders/envelopes"
  20  	"next.orly.dev/pkg/nostr/encoders/envelopes/eoseenvelope"
  21  	"next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
  22  	"next.orly.dev/pkg/nostr/encoders/envelopes/reqenvelope"
  23  	"next.orly.dev/pkg/nostr/encoders/event"
  24  	"next.orly.dev/pkg/nostr/encoders/filter"
  25  	"next.orly.dev/pkg/nostr/encoders/kind"
  26  	"next.orly.dev/pkg/nostr/encoders/tag"
  27  	"next.orly.dev/pkg/protocol/publish"
  28  	"next.orly.dev/pkg/nostr/utils/normalize"
  29  	"next.orly.dev/pkg/nostr/utils/values"
  30  )
  31  
  32  type Follows struct {
  33  	// Ctx holds the context for the ACL.
  34  	// Deprecated: Use Context() method instead of accessing directly.
  35  	Ctx context.Context
  36  	cfg *config.C
  37  	db  database.Database
  38  	pubs       *publish.S
  39  	followsMx  sync.RWMutex
  40  	admins     [][]byte
  41  	owners     [][]byte
  42  	follows    [][]byte
  43  	// Map-based caches for O(1) lookups (hex pubkey -> struct{})
  44  	ownersSet  map[string]struct{}
  45  	adminsSet  map[string]struct{}
  46  	followsSet map[string]struct{}
  47  	// Track last follow list fetch time
  48  	lastFollowListFetch time.Time
  49  	// Callback for external notification of follow list changes
  50  	onFollowListUpdate func()
  51  	// Progressive throttle for non-followed users (nil if disabled)
  52  	throttle *ProgressiveThrottle
  53  }
  54  
  55  // Context returns the ACL context.
  56  func (f *Follows) Context() context.Context {
  57  	return f.Ctx
  58  }
  59  
  60  func (f *Follows) Configure(cfg ...any) (err error) {
  61  	log.I.F("configuring follows ACL")
  62  	for _, ca := range cfg {
  63  		switch c := ca.(type) {
  64  		case *config.C:
  65  			// log.D.F("setting ACL config: %v", c)
  66  			f.cfg = c
  67  		case database.Database:
  68  			// log.D.F("setting ACL database: %s", c.Path())
  69  			f.db = c
  70  		case context.Context:
  71  			// log.D.F("setting ACL context: %s", c.Value("id"))
  72  			f.Ctx = c
  73  		case *publish.S:
  74  			// set publisher for dispatching new events
  75  			f.pubs = c
  76  		default:
  77  			err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
  78  		}
  79  	}
  80  	if f.cfg == nil || f.db == nil {
  81  		err = errorf.E("both config and database must be set")
  82  		return
  83  	}
  84  
  85  	// Build all lists in local variables WITHOUT holding the lock
  86  	// This prevents blocking GetAccessLevel calls during slow database I/O
  87  	var newOwners [][]byte
  88  	newOwnersSet := make(map[string]struct{})
  89  	var newAdmins [][]byte
  90  	newAdminsSet := make(map[string]struct{})
  91  	var newFollows [][]byte
  92  	newFollowsSet := make(map[string]struct{})
  93  
  94  	// add owners list
  95  	for _, owner := range f.cfg.Owners {
  96  		var own []byte
  97  		if o, e := bech32encoding.NpubOrHexToPublicKeyBinary(owner); chk.E(e) {
  98  			continue
  99  		} else {
 100  			own = o
 101  		}
 102  		newOwners = append(newOwners, own)
 103  		newOwnersSet[hex.EncodeToString(own)] = struct{}{}
 104  	}
 105  
 106  	// parse admin pubkeys
 107  	var adminBinaries [][]byte
 108  	for _, admin := range f.cfg.Admins {
 109  		var adm []byte
 110  		if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) {
 111  			continue
 112  		} else {
 113  			adm = a
 114  		}
 115  		newAdmins = append(newAdmins, adm)
 116  		newAdminsSet[hex.EncodeToString(adm)] = struct{}{}
 117  		adminBinaries = append(adminBinaries, adm)
 118  	}
 119  
 120  	// Batch query all admin follow lists in a single DB call
 121  	// Kind 3 is replaceable, so QueryEvents returns only the latest per author
 122  	if len(adminBinaries) > 0 {
 123  		ctx := f.Ctx
 124  		if ctx == nil {
 125  			ctx = context.Background()
 126  		}
 127  		fl := &filter.F{
 128  			Authors: tag.NewFromBytesSlice(adminBinaries...),
 129  			Kinds:   kind.NewS(kind.New(kind.FollowList.K)),
 130  			Limit:   values.ToUintPointer(uint(len(adminBinaries))),
 131  		}
 132  		var evs event.S
 133  		if evs, err = f.db.QueryEvents(ctx, fl); err != nil {
 134  			log.W.F("follows ACL: error querying admin follow lists: %v", err)
 135  			err = nil // Don't fail Configure on query error
 136  		}
 137  		for _, ev := range evs {
 138  			for _, v := range ev.Tags.GetAll([]byte("p")) {
 139  				// ValueHex() automatically handles both binary and hex storage formats
 140  				if b, e := hex.DecodeString(string(v.ValueHex())); chk.E(e) {
 141  					continue
 142  				} else {
 143  					hexKey := hex.EncodeToString(b)
 144  					if _, exists := newFollowsSet[hexKey]; !exists {
 145  						newFollows = append(newFollows, b)
 146  						newFollowsSet[hexKey] = struct{}{}
 147  					}
 148  				}
 149  			}
 150  		}
 151  	}
 152  
 153  	// Now acquire the lock ONLY for the quick swap operation
 154  	f.followsMx.Lock()
 155  	f.owners = newOwners
 156  	f.ownersSet = newOwnersSet
 157  	f.admins = newAdmins
 158  	f.adminsSet = newAdminsSet
 159  	f.follows = newFollows
 160  	f.followsSet = newFollowsSet
 161  	f.followsMx.Unlock()
 162  
 163  	log.I.F("follows ACL configured: %d owners, %d admins, %d follows",
 164  		len(newOwners), len(newAdmins), len(newFollows))
 165  
 166  	// Initialize progressive throttle if enabled
 167  	if f.cfg.FollowsThrottleEnabled {
 168  		perEvent := f.cfg.FollowsThrottlePerEvent
 169  		if perEvent == 0 {
 170  			perEvent = 25 * time.Millisecond
 171  		}
 172  		maxDelay := f.cfg.FollowsThrottleMaxDelay
 173  		if maxDelay == 0 {
 174  			maxDelay = 60 * time.Second
 175  		}
 176  		f.throttle = NewProgressiveThrottle(perEvent, maxDelay)
 177  		log.I.F("follows ACL: progressive throttle enabled (increment: %v, max: %v)",
 178  			perEvent, maxDelay)
 179  	}
 180  
 181  	return
 182  }
 183  
 184  func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
 185  	pubHex := hex.EncodeToString(pub)
 186  
 187  	f.followsMx.RLock()
 188  	defer f.followsMx.RUnlock()
 189  
 190  	// O(1) map lookups instead of O(n) linear scans
 191  	if f.ownersSet != nil {
 192  		if _, ok := f.ownersSet[pubHex]; ok {
 193  			return "owner"
 194  		}
 195  	}
 196  	if f.adminsSet != nil {
 197  		if _, ok := f.adminsSet[pubHex]; ok {
 198  			return "admin"
 199  		}
 200  	}
 201  	if f.followsSet != nil {
 202  		if _, ok := f.followsSet[pubHex]; ok {
 203  			return "write"
 204  		}
 205  	}
 206  
 207  	if f.cfg == nil {
 208  		return "write"
 209  	}
 210  	// If throttle enabled, non-followed users get write access (with delay applied in handle-event)
 211  	if f.throttle != nil {
 212  		return "write"
 213  	}
 214  	return "read"
 215  }
 216  
 217  func (f *Follows) GetACLInfo() (name, description, documentation string) {
 218  	return "follows", "whitelist follows of admins",
 219  		`This ACL mode searches for follow lists of admins and grants all followers write access`
 220  }
 221  
 222  func (f *Follows) Type() string { return "follows" }
 223  
 224  // GetThrottleDelay returns the progressive throttle delay for this event.
 225  // Returns 0 if throttle is disabled or if the user is exempt (owner/admin/followed).
 226  func (f *Follows) GetThrottleDelay(pubkey []byte, ip string) time.Duration {
 227  	if f.throttle == nil {
 228  		return 0
 229  	}
 230  
 231  	pubkeyHex := hex.EncodeToString(pubkey)
 232  
 233  	// Check if user is exempt from throttling using O(1) map lookups
 234  	f.followsMx.RLock()
 235  	defer f.followsMx.RUnlock()
 236  
 237  	// Owners bypass throttle
 238  	if f.ownersSet != nil {
 239  		if _, ok := f.ownersSet[pubkeyHex]; ok {
 240  			return 0
 241  		}
 242  	}
 243  	// Admins bypass throttle
 244  	if f.adminsSet != nil {
 245  		if _, ok := f.adminsSet[pubkeyHex]; ok {
 246  			return 0
 247  		}
 248  	}
 249  	// Followed users bypass throttle
 250  	if f.followsSet != nil {
 251  		if _, ok := f.followsSet[pubkeyHex]; ok {
 252  			return 0
 253  		}
 254  	}
 255  
 256  	// Non-followed users get throttled
 257  	return f.throttle.GetDelay(ip, pubkeyHex)
 258  }
 259  
 260  func (f *Follows) adminRelays() (urls []string) {
 261  	f.followsMx.RLock()
 262  	admins := make([][]byte, len(f.admins))
 263  	copy(admins, f.admins)
 264  	f.followsMx.RUnlock()
 265  	seen := make(map[string]struct{})
 266  	// Build a set of normalized self relay addresses to avoid self-connections
 267  	selfSet := make(map[string]struct{})
 268  	selfHosts := make(map[string]struct{})
 269  	if f.cfg != nil && len(f.cfg.RelayAddresses) > 0 {
 270  		for _, a := range f.cfg.RelayAddresses {
 271  			n := string(normalize.URL(a))
 272  			if n == "" {
 273  				continue
 274  			}
 275  			selfSet[n] = struct{}{}
 276  			// Also record hostname (without port) for robust matching
 277  			// Accept simple splitting; normalize.URL ensures scheme://host[:port]
 278  			host := n
 279  			if i := strings.Index(host, "://"); i >= 0 {
 280  				host = host[i+3:]
 281  			}
 282  			if j := strings.Index(host, "/"); j >= 0 {
 283  				host = host[:j]
 284  			}
 285  			if k := strings.Index(host, ":"); k >= 0 {
 286  				host = host[:k]
 287  			}
 288  			if host != "" {
 289  				selfHosts[host] = struct{}{}
 290  			}
 291  		}
 292  	}
 293  
 294  	// Batch query all admin relay list events in a single DB call
 295  	// Kind 10002 is replaceable, so QueryEvents returns only the latest per author
 296  	if len(admins) > 0 {
 297  		ctx := f.Ctx
 298  		if ctx == nil {
 299  			ctx = context.Background()
 300  		}
 301  		fl := &filter.F{
 302  			Authors: tag.NewFromBytesSlice(admins...),
 303  			Kinds:   kind.NewS(kind.New(kind.RelayListMetadata.K)),
 304  			Limit:   values.ToUintPointer(uint(len(admins))),
 305  		}
 306  		evs, qerr := f.db.QueryEvents(ctx, fl)
 307  		if qerr != nil {
 308  			log.W.F("follows ACL: error querying admin relay lists: %v", qerr)
 309  		}
 310  		for _, ev := range evs {
 311  			for _, v := range ev.Tags.GetAll([]byte("r")) {
 312  				u := string(v.Value())
 313  				n := string(normalize.URL(u))
 314  				if n == "" {
 315  					continue
 316  				}
 317  				// Skip if this URL is one of our configured self relay addresses or hosts
 318  				if _, isSelf := selfSet[n]; isSelf {
 319  					log.D.F("follows syncer: skipping configured self relay address: %s", n)
 320  					continue
 321  				}
 322  				// Host match
 323  				host := n
 324  				if i := strings.Index(host, "://"); i >= 0 {
 325  					host = host[i+3:]
 326  				}
 327  				if j := strings.Index(host, "/"); j >= 0 {
 328  					host = host[:j]
 329  				}
 330  				if k := strings.Index(host, ":"); k >= 0 {
 331  					host = host[:k]
 332  				}
 333  				if _, isSelfHost := selfHosts[host]; isSelfHost {
 334  					log.D.F("follows syncer: skipping configured self relay address: %s", n)
 335  					continue
 336  				}
 337  				if _, ok := seen[n]; ok {
 338  					continue
 339  				}
 340  				seen[n] = struct{}{}
 341  				urls = append(urls, n)
 342  			}
 343  		}
 344  	}
 345  
 346  	// If no admin relays found, use bootstrap relays as fallback
 347  	if len(urls) == 0 {
 348  		log.D.F("no admin relays found in DB, checking bootstrap relays and failover relays")
 349  		if len(f.cfg.BootstrapRelays) > 0 {
 350  			log.D.F("using bootstrap relays: %v", f.cfg.BootstrapRelays)
 351  			for _, relay := range f.cfg.BootstrapRelays {
 352  				n := string(normalize.URL(relay))
 353  				if n == "" {
 354  					log.W.F("invalid bootstrap relay URL: %s", relay)
 355  					continue
 356  				}
 357  				// Skip if this URL is one of our configured self relay addresses or hosts
 358  				if _, isSelf := selfSet[n]; isSelf {
 359  					log.D.F("follows syncer: skipping configured self relay address: %s", n)
 360  					continue
 361  				}
 362  				// Host match
 363  				host := n
 364  				if i := strings.Index(host, "://"); i >= 0 {
 365  					host = host[i+3:]
 366  				}
 367  				if j := strings.Index(host, "/"); j >= 0 {
 368  					host = host[:j]
 369  				}
 370  				if k := strings.Index(host, ":"); k >= 0 {
 371  					host = host[:k]
 372  				}
 373  				if _, isSelfHost := selfHosts[host]; isSelfHost {
 374  					log.D.F("follows syncer: skipping configured self relay address: %s", n)
 375  					continue
 376  				}
 377  				if _, ok := seen[n]; ok {
 378  					continue
 379  				}
 380  				seen[n] = struct{}{}
 381  				urls = append(urls, n)
 382  			}
 383  		} else {
 384  			log.D.F("no bootstrap relays configured, using failover relays")
 385  		}
 386  
 387  		// If still no relays found, use hardcoded failover relays
 388  		// These relays will be used to fetch admin relay lists (kind 10002) and store them
 389  		// in the database so they're found next time
 390  		if len(urls) == 0 {
 391  			failoverRelays := []string{
 392  				"wss://nostr.land",
 393  				"wss://nostr.wine",
 394  				"wss://nos.lol",
 395  				"wss://relay.damus.io",
 396  			}
 397  			log.D.F("using failover relays: %v", failoverRelays)
 398  			for _, relay := range failoverRelays {
 399  				n := string(normalize.URL(relay))
 400  				if n == "" {
 401  					log.W.F("invalid failover relay URL: %s", relay)
 402  					continue
 403  				}
 404  				// Skip if this URL is one of our configured self relay addresses or hosts
 405  				if _, isSelf := selfSet[n]; isSelf {
 406  					log.D.F("follows syncer: skipping configured self relay address: %s", n)
 407  					continue
 408  				}
 409  				// Host match
 410  				host := n
 411  				if i := strings.Index(host, "://"); i >= 0 {
 412  					host = host[i+3:]
 413  				}
 414  				if j := strings.Index(host, "/"); j >= 0 {
 415  					host = host[:j]
 416  				}
 417  				if k := strings.Index(host, ":"); k >= 0 {
 418  					host = host[:k]
 419  				}
 420  				if _, isSelfHost := selfHosts[host]; isSelfHost {
 421  					log.D.F("follows syncer: skipping configured self relay address: %s", n)
 422  					continue
 423  				}
 424  				if _, ok := seen[n]; ok {
 425  					continue
 426  				}
 427  				seen[n] = struct{}{}
 428  				urls = append(urls, n)
 429  			}
 430  		}
 431  	}
 432  
 433  	return
 434  }
 435  
 436  
 437  func (f *Follows) Syncer() {
 438  	log.D.F("starting follows syncer")
 439  
 440  	// Start periodic follow list and metadata fetching
 441  	go f.startPeriodicFollowListFetching()
 442  
 443  	// Start throttle cleanup goroutine if throttle is enabled
 444  	if f.throttle != nil {
 445  		go f.throttleCleanup()
 446  	}
 447  }
 448  
 449  // throttleCleanup periodically removes fully-decayed throttle entries
 450  func (f *Follows) throttleCleanup() {
 451  	ticker := time.NewTicker(10 * time.Minute)
 452  	defer ticker.Stop()
 453  
 454  	for {
 455  		select {
 456  		case <-f.Ctx.Done():
 457  			return
 458  		case <-ticker.C:
 459  			f.throttle.Cleanup()
 460  			ipCount, pubkeyCount := f.throttle.Stats()
 461  			log.T.F("follows throttle: cleanup complete, tracking %d IPs and %d pubkeys",
 462  				ipCount, pubkeyCount)
 463  		}
 464  	}
 465  }
 466  
 467  // startPeriodicFollowListFetching starts periodic fetching of admin follow lists
 468  func (f *Follows) startPeriodicFollowListFetching() {
 469  	frequency := f.cfg.FollowListFrequency
 470  	if frequency == 0 {
 471  		frequency = time.Hour // Default to 1 hour
 472  	}
 473  
 474  	log.D.F("starting periodic follow list fetching every %v", frequency)
 475  
 476  	ticker := time.NewTicker(frequency)
 477  	defer ticker.Stop()
 478  
 479  	// Fetch immediately on startup
 480  	f.fetchAdminFollowLists()
 481  
 482  	for {
 483  		select {
 484  		case <-f.Ctx.Done():
 485  			log.D.F("periodic follow list fetching stopped due to context cancellation")
 486  			return
 487  		case <-ticker.C:
 488  			f.fetchAdminFollowLists()
 489  		}
 490  	}
 491  }
 492  
 493  // fetchAdminFollowLists fetches follow lists for admins and metadata for all follows
 494  func (f *Follows) fetchAdminFollowLists() {
 495  	log.D.F("follows syncer: fetching admin follow lists and follows metadata")
 496  
 497  	urls := f.adminRelays()
 498  	if len(urls) == 0 {
 499  		log.W.F("follows syncer: no relays available for follow list fetching (no admin relays, bootstrap relays, or failover relays)")
 500  		return
 501  	}
 502  
 503  	// build authors lists: admins for follow lists, all follows for metadata
 504  	f.followsMx.RLock()
 505  	admins := make([][]byte, len(f.admins))
 506  	copy(admins, f.admins)
 507  	allFollows := make([][]byte, 0, len(f.admins)+len(f.follows))
 508  	allFollows = append(allFollows, f.admins...)
 509  	allFollows = append(allFollows, f.follows...)
 510  	f.followsMx.RUnlock()
 511  
 512  	if len(admins) == 0 {
 513  		log.W.F("follows syncer: no admins to fetch follow lists for")
 514  		return
 515  	}
 516  
 517  	log.D.F("follows syncer: fetching from %d relays: follow lists for %d admins, metadata for %d follows",
 518  		len(urls), len(admins), len(allFollows))
 519  
 520  	for _, u := range urls {
 521  		go f.fetchFollowListsFromRelay(u, admins, allFollows)
 522  	}
 523  }
 524  
 525  // fetchFollowListsFromRelay fetches follow lists for admins and metadata for all follows from a specific relay
 526  func (f *Follows) fetchFollowListsFromRelay(relayURL string, admins [][]byte, allFollows [][]byte) {
 527  	ctx, cancel := context.WithTimeout(f.Ctx, 60*time.Second)
 528  	defer cancel()
 529  
 530  	// Create proper headers for the WebSocket connection
 531  	headers := http.Header{}
 532  	headers.Set("User-Agent", "ORLY-Relay/0.9.2")
 533  	headers.Set("Origin", "https://orly.dev")
 534  
 535  	// Use proper WebSocket dial options
 536  	dialer := websocket.Dialer{
 537  		HandshakeTimeout: 10 * time.Second,
 538  	}
 539  
 540  	c, resp, err := dialer.DialContext(ctx, relayURL, headers)
 541  	if resp != nil {
 542  		resp.Body.Close()
 543  	}
 544  	if err != nil {
 545  		log.W.F("follows syncer: failed to connect to %s for follow list fetch: %v", relayURL, err)
 546  		return
 547  	}
 548  	defer c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "follow list fetch complete"), time.Now().Add(time.Second))
 549  
 550  	log.D.F("follows syncer: fetching follow lists and metadata from relay %s", relayURL)
 551  
 552  	// Create filters:
 553  	// - kind 3 (follow lists) for admins only
 554  	// - kind 0 (metadata) + kind 10002 (relay lists) for all follows
 555  	ff := &filter.S{}
 556  
 557  	// Filter for admin follow lists (kind 3)
 558  	f1 := &filter.F{
 559  		Authors: tag.NewFromBytesSlice(admins...),
 560  		Kinds:   kind.NewS(kind.New(kind.FollowList.K)),
 561  		Limit:   values.ToUintPointer(uint(len(admins) * 2)),
 562  	}
 563  
 564  	// Filter for metadata (kind 0) for all follows
 565  	f2 := &filter.F{
 566  		Authors: tag.NewFromBytesSlice(allFollows...),
 567  		Kinds:   kind.NewS(kind.New(kind.ProfileMetadata.K)),
 568  		Limit:   values.ToUintPointer(uint(len(allFollows) * 2)),
 569  	}
 570  
 571  	// Filter for relay lists (kind 10002) for all follows
 572  	f3 := &filter.F{
 573  		Authors: tag.NewFromBytesSlice(allFollows...),
 574  		Kinds:   kind.NewS(kind.New(kind.RelayListMetadata.K)),
 575  		Limit:   values.ToUintPointer(uint(len(allFollows) * 2)),
 576  	}
 577  	*ff = append(*ff, f1, f2, f3)
 578  
 579  	// Use a specific subscription ID for follow list fetching
 580  	subID := "follow-lists-fetch"
 581  	req := reqenvelope.NewFrom([]byte(subID), ff)
 582  	reqBytes := req.Marshal(nil)
 583  	log.T.F("follows syncer: outbound REQ to %s: %s", relayURL, string(reqBytes))
 584  	c.SetWriteDeadline(time.Now().Add(10 * time.Second))
 585  	if err = c.WriteMessage(websocket.TextMessage, reqBytes); chk.E(err) {
 586  		log.W.F("follows syncer: failed to send follow list REQ to %s: %v", relayURL, err)
 587  		return
 588  	}
 589  
 590  	log.T.F("follows syncer: sent follow list, metadata, and relay list REQ to %s", relayURL)
 591  
 592  	// Collect all events before processing
 593  	var followListEvents []*event.E
 594  	var metadataEvents []*event.E
 595  	var relayListEvents []*event.E
 596  
 597  	// Read events with timeout (longer timeout for larger fetches)
 598  	timeout := time.After(30 * time.Second)
 599  	for {
 600  		select {
 601  		case <-ctx.Done():
 602  			goto processEvents
 603  		case <-timeout:
 604  			log.T.F("follows syncer: timeout reading events from %s", relayURL)
 605  			goto processEvents
 606  		default:
 607  		}
 608  
 609  		c.SetReadDeadline(time.Now().Add(30 * time.Second))
 610  		_, data, err := c.ReadMessage()
 611  		if err != nil {
 612  			log.T.F("follows syncer: error reading events from %s: %v", relayURL, err)
 613  			goto processEvents
 614  		}
 615  
 616  		label, rem, err := envelopes.Identify(data)
 617  		if chk.E(err) {
 618  			continue
 619  		}
 620  
 621  		switch label {
 622  		case eventenvelope.L:
 623  			res, _, err := eventenvelope.ParseResult(rem)
 624  			if chk.E(err) || res == nil || res.Event == nil {
 625  				continue
 626  			}
 627  
 628  			// Collect events by kind
 629  			switch res.Event.Kind {
 630  			case kind.FollowList.K:
 631  				log.T.F("follows syncer: received follow list from %s on relay %s",
 632  					hex.EncodeToString(res.Event.Pubkey), relayURL)
 633  				followListEvents = append(followListEvents, res.Event)
 634  			case kind.ProfileMetadata.K:
 635  				log.T.F("follows syncer: received metadata from %s on relay %s",
 636  					hex.EncodeToString(res.Event.Pubkey), relayURL)
 637  				metadataEvents = append(metadataEvents, res.Event)
 638  			case kind.RelayListMetadata.K:
 639  				log.T.F("follows syncer: received relay list from %s on relay %s",
 640  					hex.EncodeToString(res.Event.Pubkey), relayURL)
 641  				relayListEvents = append(relayListEvents, res.Event)
 642  			}
 643  		case eoseenvelope.L:
 644  			log.T.F("follows syncer: end of events from %s", relayURL)
 645  			goto processEvents
 646  		default:
 647  			// ignore other labels
 648  		}
 649  	}
 650  
 651  processEvents:
 652  	// Process collected events - keep only the newest per pubkey and save to database
 653  	f.processCollectedEvents(relayURL, followListEvents, metadataEvents, relayListEvents)
 654  }
 655  
 656  // processCollectedEvents processes the collected events, keeping only the newest per pubkey
 657  func (f *Follows) processCollectedEvents(relayURL string, followListEvents, metadataEvents, relayListEvents []*event.E) {
 658  	// Process follow list events (kind 3) - keep newest per pubkey
 659  	latestFollowLists := make(map[string]*event.E)
 660  	for _, ev := range followListEvents {
 661  		pubkeyHex := hex.EncodeToString(ev.Pubkey)
 662  		existing, exists := latestFollowLists[pubkeyHex]
 663  		if !exists || ev.CreatedAt > existing.CreatedAt {
 664  			latestFollowLists[pubkeyHex] = ev
 665  		}
 666  	}
 667  
 668  	// Process metadata events (kind 0) - keep newest per pubkey
 669  	latestMetadata := make(map[string]*event.E)
 670  	for _, ev := range metadataEvents {
 671  		pubkeyHex := hex.EncodeToString(ev.Pubkey)
 672  		existing, exists := latestMetadata[pubkeyHex]
 673  		if !exists || ev.CreatedAt > existing.CreatedAt {
 674  			latestMetadata[pubkeyHex] = ev
 675  		}
 676  	}
 677  
 678  	// Process relay list events (kind 10002) - keep newest per pubkey
 679  	latestRelayLists := make(map[string]*event.E)
 680  	for _, ev := range relayListEvents {
 681  		pubkeyHex := hex.EncodeToString(ev.Pubkey)
 682  		existing, exists := latestRelayLists[pubkeyHex]
 683  		if !exists || ev.CreatedAt > existing.CreatedAt {
 684  			latestRelayLists[pubkeyHex] = ev
 685  		}
 686  	}
 687  
 688  	// Save and process the newest events
 689  	savedFollowLists := 0
 690  	savedMetadata := 0
 691  	savedRelayLists := 0
 692  
 693  	// Save follow list events to database and extract follows
 694  	for pubkeyHex, ev := range latestFollowLists {
 695  		if _, err := f.db.SaveEvent(f.Ctx, ev); err != nil {
 696  			if strings.Contains(err.Error(), "blocked:") {
 697  				log.T.F("follows syncer: skipped follow list from %s (already stored): %v", pubkeyHex, err)
 698  			} else {
 699  				log.W.F("follows syncer: failed to save follow list from %s: %v", pubkeyHex, err)
 700  			}
 701  		} else {
 702  			savedFollowLists++
 703  			log.T.F("follows syncer: saved follow list from %s (created_at: %d) from relay %s",
 704  				pubkeyHex, ev.CreatedAt, relayURL)
 705  		}
 706  
 707  		// Extract followed pubkeys from admin follow lists
 708  		if f.isAdminPubkey(ev.Pubkey) {
 709  			log.D.F("follows syncer: processing admin follow list from %s", pubkeyHex)
 710  			f.extractFollowedPubkeys(ev)
 711  		}
 712  	}
 713  
 714  	// Save metadata events to database
 715  	for pubkeyHex, ev := range latestMetadata {
 716  		if _, err := f.db.SaveEvent(f.Ctx, ev); err != nil {
 717  			if strings.Contains(err.Error(), "blocked:") {
 718  				log.T.F("follows syncer: skipped metadata from %s (already stored): %v", pubkeyHex, err)
 719  			} else {
 720  				log.W.F("follows syncer: failed to save metadata from %s: %v", pubkeyHex, err)
 721  			}
 722  		} else {
 723  			savedMetadata++
 724  			log.T.F("follows syncer: saved metadata from %s (created_at: %d) from relay %s",
 725  				pubkeyHex, ev.CreatedAt, relayURL)
 726  		}
 727  	}
 728  
 729  	// Save relay list events to database
 730  	for pubkeyHex, ev := range latestRelayLists {
 731  		if _, err := f.db.SaveEvent(f.Ctx, ev); err != nil {
 732  			if strings.Contains(err.Error(), "blocked:") {
 733  				log.T.F("follows syncer: skipped relay list from %s (already stored): %v", pubkeyHex, err)
 734  			} else {
 735  				log.W.F("follows syncer: failed to save relay list from %s: %v", pubkeyHex, err)
 736  			}
 737  		} else {
 738  			savedRelayLists++
 739  			log.T.F("follows syncer: saved relay list from %s (created_at: %d) from relay %s",
 740  				pubkeyHex, ev.CreatedAt, relayURL)
 741  		}
 742  	}
 743  
 744  	log.D.F("follows syncer: from %s - received: %d follow lists, %d metadata, %d relay lists; saved: %d, %d, %d",
 745  		relayURL, len(followListEvents), len(metadataEvents), len(relayListEvents),
 746  		savedFollowLists, savedMetadata, savedRelayLists)
 747  }
 748  
 749  // GetFollowedPubkeys returns a copy of the followed pubkeys list
 750  func (f *Follows) GetFollowedPubkeys() [][]byte {
 751  	f.followsMx.RLock()
 752  	defer f.followsMx.RUnlock()
 753  
 754  	followedPubkeys := make([][]byte, len(f.follows))
 755  	copy(followedPubkeys, f.follows)
 756  	return followedPubkeys
 757  }
 758  
 759  // isAdminPubkey checks if a pubkey belongs to an admin
 760  func (f *Follows) isAdminPubkey(pubkey []byte) bool {
 761  	pubkeyHex := hex.EncodeToString(pubkey)
 762  
 763  	f.followsMx.RLock()
 764  	defer f.followsMx.RUnlock()
 765  
 766  	if f.adminsSet != nil {
 767  		_, ok := f.adminsSet[pubkeyHex]
 768  		return ok
 769  	}
 770  	return false
 771  }
 772  
 773  // extractFollowedPubkeys extracts followed pubkeys from 'p' tags in kind 3 events
 774  func (f *Follows) extractFollowedPubkeys(event *event.E) {
 775  	if event.Kind != kind.FollowList.K {
 776  		return
 777  	}
 778  
 779  	// Extract all 'p' tags (followed pubkeys) from the kind 3 event
 780  	for _, tag := range event.Tags.GetAll([]byte("p")) {
 781  		// First try binary format (optimized storage: 33 bytes = 32 hash + null)
 782  		if pubkey := tag.ValueBinary(); pubkey != nil {
 783  			f.AddFollow(pubkey)
 784  			continue
 785  		}
 786  		// Fall back to hex decoding for non-binary values
 787  		// Use ValueHex() which handles both binary and hex storage formats
 788  		if pubkey, err := hex.DecodeString(string(tag.ValueHex())); err == nil && len(pubkey) == 32 {
 789  			f.AddFollow(pubkey)
 790  		}
 791  	}
 792  }
 793  
 794  // AdminRelays returns the admin relay URLs
 795  func (f *Follows) AdminRelays() []string {
 796  	return f.adminRelays()
 797  }
 798  
 799  // SetFollowListUpdateCallback sets a callback to be called when the follow list is updated
 800  func (f *Follows) SetFollowListUpdateCallback(callback func()) {
 801  	f.followsMx.Lock()
 802  	defer f.followsMx.Unlock()
 803  	f.onFollowListUpdate = callback
 804  }
 805  
 806  // AddFollow appends a pubkey to the in-memory follows list if not already present
 807  // and signals the syncer to refresh subscriptions.
 808  func (f *Follows) AddFollow(pub []byte) {
 809  	if len(pub) == 0 {
 810  		return
 811  	}
 812  	pubHex := hex.EncodeToString(pub)
 813  
 814  	f.followsMx.Lock()
 815  	defer f.followsMx.Unlock()
 816  
 817  	// Use map for O(1) duplicate detection
 818  	if f.followsSet == nil {
 819  		f.followsSet = make(map[string]struct{})
 820  	}
 821  	if _, exists := f.followsSet[pubHex]; exists {
 822  		return
 823  	}
 824  
 825  	b := make([]byte, len(pub))
 826  	copy(b, pub)
 827  	f.follows = append(f.follows, b)
 828  	f.followsSet[pubHex] = struct{}{}
 829  
 830  	log.D.F(
 831  		"follows syncer: added new followed pubkey: %s",
 832  		pubHex,
 833  	)
 834  	// notify external listeners (e.g., spider)
 835  	if f.onFollowListUpdate != nil {
 836  		go f.onFollowListUpdate()
 837  	}
 838  }
 839  
 840  func init() {
 841  	Registry.Register(new(Follows))
 842  }
 843