follows.go raw
1 package acl
2
3 import (
4 "context"
5 "encoding/hex"
6 "net/http"
7 "reflect"
8 "strings"
9 "sync"
10 "time"
11
12 "github.com/gorilla/websocket"
13 "next.orly.dev/pkg/lol/chk"
14 "next.orly.dev/pkg/lol/errorf"
15 "next.orly.dev/pkg/lol/log"
16 "next.orly.dev/app/config"
17 "next.orly.dev/pkg/database"
18 "next.orly.dev/pkg/nostr/encoders/bech32encoding"
19 "next.orly.dev/pkg/nostr/encoders/envelopes"
20 "next.orly.dev/pkg/nostr/encoders/envelopes/eoseenvelope"
21 "next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
22 "next.orly.dev/pkg/nostr/encoders/envelopes/reqenvelope"
23 "next.orly.dev/pkg/nostr/encoders/event"
24 "next.orly.dev/pkg/nostr/encoders/filter"
25 "next.orly.dev/pkg/nostr/encoders/kind"
26 "next.orly.dev/pkg/nostr/encoders/tag"
27 "next.orly.dev/pkg/protocol/publish"
28 "next.orly.dev/pkg/nostr/utils/normalize"
29 "next.orly.dev/pkg/nostr/utils/values"
30 )
31
32 type Follows struct {
33 // Ctx holds the context for the ACL.
34 // Deprecated: Use Context() method instead of accessing directly.
35 Ctx context.Context
36 cfg *config.C
37 db database.Database
38 pubs *publish.S
39 followsMx sync.RWMutex
40 admins [][]byte
41 owners [][]byte
42 follows [][]byte
43 // Map-based caches for O(1) lookups (hex pubkey -> struct{})
44 ownersSet map[string]struct{}
45 adminsSet map[string]struct{}
46 followsSet map[string]struct{}
47 // Track last follow list fetch time
48 lastFollowListFetch time.Time
49 // Callback for external notification of follow list changes
50 onFollowListUpdate func()
51 // Progressive throttle for non-followed users (nil if disabled)
52 throttle *ProgressiveThrottle
53 }
54
55 // Context returns the ACL context.
56 func (f *Follows) Context() context.Context {
57 return f.Ctx
58 }
59
60 func (f *Follows) Configure(cfg ...any) (err error) {
61 log.I.F("configuring follows ACL")
62 for _, ca := range cfg {
63 switch c := ca.(type) {
64 case *config.C:
65 // log.D.F("setting ACL config: %v", c)
66 f.cfg = c
67 case database.Database:
68 // log.D.F("setting ACL database: %s", c.Path())
69 f.db = c
70 case context.Context:
71 // log.D.F("setting ACL context: %s", c.Value("id"))
72 f.Ctx = c
73 case *publish.S:
74 // set publisher for dispatching new events
75 f.pubs = c
76 default:
77 err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
78 }
79 }
80 if f.cfg == nil || f.db == nil {
81 err = errorf.E("both config and database must be set")
82 return
83 }
84
85 // Build all lists in local variables WITHOUT holding the lock
86 // This prevents blocking GetAccessLevel calls during slow database I/O
87 var newOwners [][]byte
88 newOwnersSet := make(map[string]struct{})
89 var newAdmins [][]byte
90 newAdminsSet := make(map[string]struct{})
91 var newFollows [][]byte
92 newFollowsSet := make(map[string]struct{})
93
94 // add owners list
95 for _, owner := range f.cfg.Owners {
96 var own []byte
97 if o, e := bech32encoding.NpubOrHexToPublicKeyBinary(owner); chk.E(e) {
98 continue
99 } else {
100 own = o
101 }
102 newOwners = append(newOwners, own)
103 newOwnersSet[hex.EncodeToString(own)] = struct{}{}
104 }
105
106 // parse admin pubkeys
107 var adminBinaries [][]byte
108 for _, admin := range f.cfg.Admins {
109 var adm []byte
110 if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) {
111 continue
112 } else {
113 adm = a
114 }
115 newAdmins = append(newAdmins, adm)
116 newAdminsSet[hex.EncodeToString(adm)] = struct{}{}
117 adminBinaries = append(adminBinaries, adm)
118 }
119
120 // Batch query all admin follow lists in a single DB call
121 // Kind 3 is replaceable, so QueryEvents returns only the latest per author
122 if len(adminBinaries) > 0 {
123 ctx := f.Ctx
124 if ctx == nil {
125 ctx = context.Background()
126 }
127 fl := &filter.F{
128 Authors: tag.NewFromBytesSlice(adminBinaries...),
129 Kinds: kind.NewS(kind.New(kind.FollowList.K)),
130 Limit: values.ToUintPointer(uint(len(adminBinaries))),
131 }
132 var evs event.S
133 if evs, err = f.db.QueryEvents(ctx, fl); err != nil {
134 log.W.F("follows ACL: error querying admin follow lists: %v", err)
135 err = nil // Don't fail Configure on query error
136 }
137 for _, ev := range evs {
138 for _, v := range ev.Tags.GetAll([]byte("p")) {
139 // ValueHex() automatically handles both binary and hex storage formats
140 if b, e := hex.DecodeString(string(v.ValueHex())); chk.E(e) {
141 continue
142 } else {
143 hexKey := hex.EncodeToString(b)
144 if _, exists := newFollowsSet[hexKey]; !exists {
145 newFollows = append(newFollows, b)
146 newFollowsSet[hexKey] = struct{}{}
147 }
148 }
149 }
150 }
151 }
152
153 // Now acquire the lock ONLY for the quick swap operation
154 f.followsMx.Lock()
155 f.owners = newOwners
156 f.ownersSet = newOwnersSet
157 f.admins = newAdmins
158 f.adminsSet = newAdminsSet
159 f.follows = newFollows
160 f.followsSet = newFollowsSet
161 f.followsMx.Unlock()
162
163 log.I.F("follows ACL configured: %d owners, %d admins, %d follows",
164 len(newOwners), len(newAdmins), len(newFollows))
165
166 // Initialize progressive throttle if enabled
167 if f.cfg.FollowsThrottleEnabled {
168 perEvent := f.cfg.FollowsThrottlePerEvent
169 if perEvent == 0 {
170 perEvent = 25 * time.Millisecond
171 }
172 maxDelay := f.cfg.FollowsThrottleMaxDelay
173 if maxDelay == 0 {
174 maxDelay = 60 * time.Second
175 }
176 f.throttle = NewProgressiveThrottle(perEvent, maxDelay)
177 log.I.F("follows ACL: progressive throttle enabled (increment: %v, max: %v)",
178 perEvent, maxDelay)
179 }
180
181 return
182 }
183
184 func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
185 pubHex := hex.EncodeToString(pub)
186
187 f.followsMx.RLock()
188 defer f.followsMx.RUnlock()
189
190 // O(1) map lookups instead of O(n) linear scans
191 if f.ownersSet != nil {
192 if _, ok := f.ownersSet[pubHex]; ok {
193 return "owner"
194 }
195 }
196 if f.adminsSet != nil {
197 if _, ok := f.adminsSet[pubHex]; ok {
198 return "admin"
199 }
200 }
201 if f.followsSet != nil {
202 if _, ok := f.followsSet[pubHex]; ok {
203 return "write"
204 }
205 }
206
207 if f.cfg == nil {
208 return "write"
209 }
210 // If throttle enabled, non-followed users get write access (with delay applied in handle-event)
211 if f.throttle != nil {
212 return "write"
213 }
214 return "read"
215 }
216
217 func (f *Follows) GetACLInfo() (name, description, documentation string) {
218 return "follows", "whitelist follows of admins",
219 `This ACL mode searches for follow lists of admins and grants all followers write access`
220 }
221
222 func (f *Follows) Type() string { return "follows" }
223
224 // GetThrottleDelay returns the progressive throttle delay for this event.
225 // Returns 0 if throttle is disabled or if the user is exempt (owner/admin/followed).
226 func (f *Follows) GetThrottleDelay(pubkey []byte, ip string) time.Duration {
227 if f.throttle == nil {
228 return 0
229 }
230
231 pubkeyHex := hex.EncodeToString(pubkey)
232
233 // Check if user is exempt from throttling using O(1) map lookups
234 f.followsMx.RLock()
235 defer f.followsMx.RUnlock()
236
237 // Owners bypass throttle
238 if f.ownersSet != nil {
239 if _, ok := f.ownersSet[pubkeyHex]; ok {
240 return 0
241 }
242 }
243 // Admins bypass throttle
244 if f.adminsSet != nil {
245 if _, ok := f.adminsSet[pubkeyHex]; ok {
246 return 0
247 }
248 }
249 // Followed users bypass throttle
250 if f.followsSet != nil {
251 if _, ok := f.followsSet[pubkeyHex]; ok {
252 return 0
253 }
254 }
255
256 // Non-followed users get throttled
257 return f.throttle.GetDelay(ip, pubkeyHex)
258 }
259
260 func (f *Follows) adminRelays() (urls []string) {
261 f.followsMx.RLock()
262 admins := make([][]byte, len(f.admins))
263 copy(admins, f.admins)
264 f.followsMx.RUnlock()
265 seen := make(map[string]struct{})
266 // Build a set of normalized self relay addresses to avoid self-connections
267 selfSet := make(map[string]struct{})
268 selfHosts := make(map[string]struct{})
269 if f.cfg != nil && len(f.cfg.RelayAddresses) > 0 {
270 for _, a := range f.cfg.RelayAddresses {
271 n := string(normalize.URL(a))
272 if n == "" {
273 continue
274 }
275 selfSet[n] = struct{}{}
276 // Also record hostname (without port) for robust matching
277 // Accept simple splitting; normalize.URL ensures scheme://host[:port]
278 host := n
279 if i := strings.Index(host, "://"); i >= 0 {
280 host = host[i+3:]
281 }
282 if j := strings.Index(host, "/"); j >= 0 {
283 host = host[:j]
284 }
285 if k := strings.Index(host, ":"); k >= 0 {
286 host = host[:k]
287 }
288 if host != "" {
289 selfHosts[host] = struct{}{}
290 }
291 }
292 }
293
294 // Batch query all admin relay list events in a single DB call
295 // Kind 10002 is replaceable, so QueryEvents returns only the latest per author
296 if len(admins) > 0 {
297 ctx := f.Ctx
298 if ctx == nil {
299 ctx = context.Background()
300 }
301 fl := &filter.F{
302 Authors: tag.NewFromBytesSlice(admins...),
303 Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
304 Limit: values.ToUintPointer(uint(len(admins))),
305 }
306 evs, qerr := f.db.QueryEvents(ctx, fl)
307 if qerr != nil {
308 log.W.F("follows ACL: error querying admin relay lists: %v", qerr)
309 }
310 for _, ev := range evs {
311 for _, v := range ev.Tags.GetAll([]byte("r")) {
312 u := string(v.Value())
313 n := string(normalize.URL(u))
314 if n == "" {
315 continue
316 }
317 // Skip if this URL is one of our configured self relay addresses or hosts
318 if _, isSelf := selfSet[n]; isSelf {
319 log.D.F("follows syncer: skipping configured self relay address: %s", n)
320 continue
321 }
322 // Host match
323 host := n
324 if i := strings.Index(host, "://"); i >= 0 {
325 host = host[i+3:]
326 }
327 if j := strings.Index(host, "/"); j >= 0 {
328 host = host[:j]
329 }
330 if k := strings.Index(host, ":"); k >= 0 {
331 host = host[:k]
332 }
333 if _, isSelfHost := selfHosts[host]; isSelfHost {
334 log.D.F("follows syncer: skipping configured self relay address: %s", n)
335 continue
336 }
337 if _, ok := seen[n]; ok {
338 continue
339 }
340 seen[n] = struct{}{}
341 urls = append(urls, n)
342 }
343 }
344 }
345
346 // If no admin relays found, use bootstrap relays as fallback
347 if len(urls) == 0 {
348 log.D.F("no admin relays found in DB, checking bootstrap relays and failover relays")
349 if len(f.cfg.BootstrapRelays) > 0 {
350 log.D.F("using bootstrap relays: %v", f.cfg.BootstrapRelays)
351 for _, relay := range f.cfg.BootstrapRelays {
352 n := string(normalize.URL(relay))
353 if n == "" {
354 log.W.F("invalid bootstrap relay URL: %s", relay)
355 continue
356 }
357 // Skip if this URL is one of our configured self relay addresses or hosts
358 if _, isSelf := selfSet[n]; isSelf {
359 log.D.F("follows syncer: skipping configured self relay address: %s", n)
360 continue
361 }
362 // Host match
363 host := n
364 if i := strings.Index(host, "://"); i >= 0 {
365 host = host[i+3:]
366 }
367 if j := strings.Index(host, "/"); j >= 0 {
368 host = host[:j]
369 }
370 if k := strings.Index(host, ":"); k >= 0 {
371 host = host[:k]
372 }
373 if _, isSelfHost := selfHosts[host]; isSelfHost {
374 log.D.F("follows syncer: skipping configured self relay address: %s", n)
375 continue
376 }
377 if _, ok := seen[n]; ok {
378 continue
379 }
380 seen[n] = struct{}{}
381 urls = append(urls, n)
382 }
383 } else {
384 log.D.F("no bootstrap relays configured, using failover relays")
385 }
386
387 // If still no relays found, use hardcoded failover relays
388 // These relays will be used to fetch admin relay lists (kind 10002) and store them
389 // in the database so they're found next time
390 if len(urls) == 0 {
391 failoverRelays := []string{
392 "wss://nostr.land",
393 "wss://nostr.wine",
394 "wss://nos.lol",
395 "wss://relay.damus.io",
396 }
397 log.D.F("using failover relays: %v", failoverRelays)
398 for _, relay := range failoverRelays {
399 n := string(normalize.URL(relay))
400 if n == "" {
401 log.W.F("invalid failover relay URL: %s", relay)
402 continue
403 }
404 // Skip if this URL is one of our configured self relay addresses or hosts
405 if _, isSelf := selfSet[n]; isSelf {
406 log.D.F("follows syncer: skipping configured self relay address: %s", n)
407 continue
408 }
409 // Host match
410 host := n
411 if i := strings.Index(host, "://"); i >= 0 {
412 host = host[i+3:]
413 }
414 if j := strings.Index(host, "/"); j >= 0 {
415 host = host[:j]
416 }
417 if k := strings.Index(host, ":"); k >= 0 {
418 host = host[:k]
419 }
420 if _, isSelfHost := selfHosts[host]; isSelfHost {
421 log.D.F("follows syncer: skipping configured self relay address: %s", n)
422 continue
423 }
424 if _, ok := seen[n]; ok {
425 continue
426 }
427 seen[n] = struct{}{}
428 urls = append(urls, n)
429 }
430 }
431 }
432
433 return
434 }
435
436
437 func (f *Follows) Syncer() {
438 log.D.F("starting follows syncer")
439
440 // Start periodic follow list and metadata fetching
441 go f.startPeriodicFollowListFetching()
442
443 // Start throttle cleanup goroutine if throttle is enabled
444 if f.throttle != nil {
445 go f.throttleCleanup()
446 }
447 }
448
449 // throttleCleanup periodically removes fully-decayed throttle entries
450 func (f *Follows) throttleCleanup() {
451 ticker := time.NewTicker(10 * time.Minute)
452 defer ticker.Stop()
453
454 for {
455 select {
456 case <-f.Ctx.Done():
457 return
458 case <-ticker.C:
459 f.throttle.Cleanup()
460 ipCount, pubkeyCount := f.throttle.Stats()
461 log.T.F("follows throttle: cleanup complete, tracking %d IPs and %d pubkeys",
462 ipCount, pubkeyCount)
463 }
464 }
465 }
466
467 // startPeriodicFollowListFetching starts periodic fetching of admin follow lists
468 func (f *Follows) startPeriodicFollowListFetching() {
469 frequency := f.cfg.FollowListFrequency
470 if frequency == 0 {
471 frequency = time.Hour // Default to 1 hour
472 }
473
474 log.D.F("starting periodic follow list fetching every %v", frequency)
475
476 ticker := time.NewTicker(frequency)
477 defer ticker.Stop()
478
479 // Fetch immediately on startup
480 f.fetchAdminFollowLists()
481
482 for {
483 select {
484 case <-f.Ctx.Done():
485 log.D.F("periodic follow list fetching stopped due to context cancellation")
486 return
487 case <-ticker.C:
488 f.fetchAdminFollowLists()
489 }
490 }
491 }
492
493 // fetchAdminFollowLists fetches follow lists for admins and metadata for all follows
494 func (f *Follows) fetchAdminFollowLists() {
495 log.D.F("follows syncer: fetching admin follow lists and follows metadata")
496
497 urls := f.adminRelays()
498 if len(urls) == 0 {
499 log.W.F("follows syncer: no relays available for follow list fetching (no admin relays, bootstrap relays, or failover relays)")
500 return
501 }
502
503 // build authors lists: admins for follow lists, all follows for metadata
504 f.followsMx.RLock()
505 admins := make([][]byte, len(f.admins))
506 copy(admins, f.admins)
507 allFollows := make([][]byte, 0, len(f.admins)+len(f.follows))
508 allFollows = append(allFollows, f.admins...)
509 allFollows = append(allFollows, f.follows...)
510 f.followsMx.RUnlock()
511
512 if len(admins) == 0 {
513 log.W.F("follows syncer: no admins to fetch follow lists for")
514 return
515 }
516
517 log.D.F("follows syncer: fetching from %d relays: follow lists for %d admins, metadata for %d follows",
518 len(urls), len(admins), len(allFollows))
519
520 for _, u := range urls {
521 go f.fetchFollowListsFromRelay(u, admins, allFollows)
522 }
523 }
524
525 // fetchFollowListsFromRelay fetches follow lists for admins and metadata for all follows from a specific relay
526 func (f *Follows) fetchFollowListsFromRelay(relayURL string, admins [][]byte, allFollows [][]byte) {
527 ctx, cancel := context.WithTimeout(f.Ctx, 60*time.Second)
528 defer cancel()
529
530 // Create proper headers for the WebSocket connection
531 headers := http.Header{}
532 headers.Set("User-Agent", "ORLY-Relay/0.9.2")
533 headers.Set("Origin", "https://orly.dev")
534
535 // Use proper WebSocket dial options
536 dialer := websocket.Dialer{
537 HandshakeTimeout: 10 * time.Second,
538 }
539
540 c, resp, err := dialer.DialContext(ctx, relayURL, headers)
541 if resp != nil {
542 resp.Body.Close()
543 }
544 if err != nil {
545 log.W.F("follows syncer: failed to connect to %s for follow list fetch: %v", relayURL, err)
546 return
547 }
548 defer c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "follow list fetch complete"), time.Now().Add(time.Second))
549
550 log.D.F("follows syncer: fetching follow lists and metadata from relay %s", relayURL)
551
552 // Create filters:
553 // - kind 3 (follow lists) for admins only
554 // - kind 0 (metadata) + kind 10002 (relay lists) for all follows
555 ff := &filter.S{}
556
557 // Filter for admin follow lists (kind 3)
558 f1 := &filter.F{
559 Authors: tag.NewFromBytesSlice(admins...),
560 Kinds: kind.NewS(kind.New(kind.FollowList.K)),
561 Limit: values.ToUintPointer(uint(len(admins) * 2)),
562 }
563
564 // Filter for metadata (kind 0) for all follows
565 f2 := &filter.F{
566 Authors: tag.NewFromBytesSlice(allFollows...),
567 Kinds: kind.NewS(kind.New(kind.ProfileMetadata.K)),
568 Limit: values.ToUintPointer(uint(len(allFollows) * 2)),
569 }
570
571 // Filter for relay lists (kind 10002) for all follows
572 f3 := &filter.F{
573 Authors: tag.NewFromBytesSlice(allFollows...),
574 Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
575 Limit: values.ToUintPointer(uint(len(allFollows) * 2)),
576 }
577 *ff = append(*ff, f1, f2, f3)
578
579 // Use a specific subscription ID for follow list fetching
580 subID := "follow-lists-fetch"
581 req := reqenvelope.NewFrom([]byte(subID), ff)
582 reqBytes := req.Marshal(nil)
583 log.T.F("follows syncer: outbound REQ to %s: %s", relayURL, string(reqBytes))
584 c.SetWriteDeadline(time.Now().Add(10 * time.Second))
585 if err = c.WriteMessage(websocket.TextMessage, reqBytes); chk.E(err) {
586 log.W.F("follows syncer: failed to send follow list REQ to %s: %v", relayURL, err)
587 return
588 }
589
590 log.T.F("follows syncer: sent follow list, metadata, and relay list REQ to %s", relayURL)
591
592 // Collect all events before processing
593 var followListEvents []*event.E
594 var metadataEvents []*event.E
595 var relayListEvents []*event.E
596
597 // Read events with timeout (longer timeout for larger fetches)
598 timeout := time.After(30 * time.Second)
599 for {
600 select {
601 case <-ctx.Done():
602 goto processEvents
603 case <-timeout:
604 log.T.F("follows syncer: timeout reading events from %s", relayURL)
605 goto processEvents
606 default:
607 }
608
609 c.SetReadDeadline(time.Now().Add(30 * time.Second))
610 _, data, err := c.ReadMessage()
611 if err != nil {
612 log.T.F("follows syncer: error reading events from %s: %v", relayURL, err)
613 goto processEvents
614 }
615
616 label, rem, err := envelopes.Identify(data)
617 if chk.E(err) {
618 continue
619 }
620
621 switch label {
622 case eventenvelope.L:
623 res, _, err := eventenvelope.ParseResult(rem)
624 if chk.E(err) || res == nil || res.Event == nil {
625 continue
626 }
627
628 // Collect events by kind
629 switch res.Event.Kind {
630 case kind.FollowList.K:
631 log.T.F("follows syncer: received follow list from %s on relay %s",
632 hex.EncodeToString(res.Event.Pubkey), relayURL)
633 followListEvents = append(followListEvents, res.Event)
634 case kind.ProfileMetadata.K:
635 log.T.F("follows syncer: received metadata from %s on relay %s",
636 hex.EncodeToString(res.Event.Pubkey), relayURL)
637 metadataEvents = append(metadataEvents, res.Event)
638 case kind.RelayListMetadata.K:
639 log.T.F("follows syncer: received relay list from %s on relay %s",
640 hex.EncodeToString(res.Event.Pubkey), relayURL)
641 relayListEvents = append(relayListEvents, res.Event)
642 }
643 case eoseenvelope.L:
644 log.T.F("follows syncer: end of events from %s", relayURL)
645 goto processEvents
646 default:
647 // ignore other labels
648 }
649 }
650
651 processEvents:
652 // Process collected events - keep only the newest per pubkey and save to database
653 f.processCollectedEvents(relayURL, followListEvents, metadataEvents, relayListEvents)
654 }
655
656 // processCollectedEvents processes the collected events, keeping only the newest per pubkey
657 func (f *Follows) processCollectedEvents(relayURL string, followListEvents, metadataEvents, relayListEvents []*event.E) {
658 // Process follow list events (kind 3) - keep newest per pubkey
659 latestFollowLists := make(map[string]*event.E)
660 for _, ev := range followListEvents {
661 pubkeyHex := hex.EncodeToString(ev.Pubkey)
662 existing, exists := latestFollowLists[pubkeyHex]
663 if !exists || ev.CreatedAt > existing.CreatedAt {
664 latestFollowLists[pubkeyHex] = ev
665 }
666 }
667
668 // Process metadata events (kind 0) - keep newest per pubkey
669 latestMetadata := make(map[string]*event.E)
670 for _, ev := range metadataEvents {
671 pubkeyHex := hex.EncodeToString(ev.Pubkey)
672 existing, exists := latestMetadata[pubkeyHex]
673 if !exists || ev.CreatedAt > existing.CreatedAt {
674 latestMetadata[pubkeyHex] = ev
675 }
676 }
677
678 // Process relay list events (kind 10002) - keep newest per pubkey
679 latestRelayLists := make(map[string]*event.E)
680 for _, ev := range relayListEvents {
681 pubkeyHex := hex.EncodeToString(ev.Pubkey)
682 existing, exists := latestRelayLists[pubkeyHex]
683 if !exists || ev.CreatedAt > existing.CreatedAt {
684 latestRelayLists[pubkeyHex] = ev
685 }
686 }
687
688 // Save and process the newest events
689 savedFollowLists := 0
690 savedMetadata := 0
691 savedRelayLists := 0
692
693 // Save follow list events to database and extract follows
694 for pubkeyHex, ev := range latestFollowLists {
695 if _, err := f.db.SaveEvent(f.Ctx, ev); err != nil {
696 if strings.Contains(err.Error(), "blocked:") {
697 log.T.F("follows syncer: skipped follow list from %s (already stored): %v", pubkeyHex, err)
698 } else {
699 log.W.F("follows syncer: failed to save follow list from %s: %v", pubkeyHex, err)
700 }
701 } else {
702 savedFollowLists++
703 log.T.F("follows syncer: saved follow list from %s (created_at: %d) from relay %s",
704 pubkeyHex, ev.CreatedAt, relayURL)
705 }
706
707 // Extract followed pubkeys from admin follow lists
708 if f.isAdminPubkey(ev.Pubkey) {
709 log.D.F("follows syncer: processing admin follow list from %s", pubkeyHex)
710 f.extractFollowedPubkeys(ev)
711 }
712 }
713
714 // Save metadata events to database
715 for pubkeyHex, ev := range latestMetadata {
716 if _, err := f.db.SaveEvent(f.Ctx, ev); err != nil {
717 if strings.Contains(err.Error(), "blocked:") {
718 log.T.F("follows syncer: skipped metadata from %s (already stored): %v", pubkeyHex, err)
719 } else {
720 log.W.F("follows syncer: failed to save metadata from %s: %v", pubkeyHex, err)
721 }
722 } else {
723 savedMetadata++
724 log.T.F("follows syncer: saved metadata from %s (created_at: %d) from relay %s",
725 pubkeyHex, ev.CreatedAt, relayURL)
726 }
727 }
728
729 // Save relay list events to database
730 for pubkeyHex, ev := range latestRelayLists {
731 if _, err := f.db.SaveEvent(f.Ctx, ev); err != nil {
732 if strings.Contains(err.Error(), "blocked:") {
733 log.T.F("follows syncer: skipped relay list from %s (already stored): %v", pubkeyHex, err)
734 } else {
735 log.W.F("follows syncer: failed to save relay list from %s: %v", pubkeyHex, err)
736 }
737 } else {
738 savedRelayLists++
739 log.T.F("follows syncer: saved relay list from %s (created_at: %d) from relay %s",
740 pubkeyHex, ev.CreatedAt, relayURL)
741 }
742 }
743
744 log.D.F("follows syncer: from %s - received: %d follow lists, %d metadata, %d relay lists; saved: %d, %d, %d",
745 relayURL, len(followListEvents), len(metadataEvents), len(relayListEvents),
746 savedFollowLists, savedMetadata, savedRelayLists)
747 }
748
749 // GetFollowedPubkeys returns a copy of the followed pubkeys list
750 func (f *Follows) GetFollowedPubkeys() [][]byte {
751 f.followsMx.RLock()
752 defer f.followsMx.RUnlock()
753
754 followedPubkeys := make([][]byte, len(f.follows))
755 copy(followedPubkeys, f.follows)
756 return followedPubkeys
757 }
758
759 // isAdminPubkey checks if a pubkey belongs to an admin
760 func (f *Follows) isAdminPubkey(pubkey []byte) bool {
761 pubkeyHex := hex.EncodeToString(pubkey)
762
763 f.followsMx.RLock()
764 defer f.followsMx.RUnlock()
765
766 if f.adminsSet != nil {
767 _, ok := f.adminsSet[pubkeyHex]
768 return ok
769 }
770 return false
771 }
772
773 // extractFollowedPubkeys extracts followed pubkeys from 'p' tags in kind 3 events
774 func (f *Follows) extractFollowedPubkeys(event *event.E) {
775 if event.Kind != kind.FollowList.K {
776 return
777 }
778
779 // Extract all 'p' tags (followed pubkeys) from the kind 3 event
780 for _, tag := range event.Tags.GetAll([]byte("p")) {
781 // First try binary format (optimized storage: 33 bytes = 32 hash + null)
782 if pubkey := tag.ValueBinary(); pubkey != nil {
783 f.AddFollow(pubkey)
784 continue
785 }
786 // Fall back to hex decoding for non-binary values
787 // Use ValueHex() which handles both binary and hex storage formats
788 if pubkey, err := hex.DecodeString(string(tag.ValueHex())); err == nil && len(pubkey) == 32 {
789 f.AddFollow(pubkey)
790 }
791 }
792 }
793
794 // AdminRelays returns the admin relay URLs
795 func (f *Follows) AdminRelays() []string {
796 return f.adminRelays()
797 }
798
799 // SetFollowListUpdateCallback sets a callback to be called when the follow list is updated
800 func (f *Follows) SetFollowListUpdateCallback(callback func()) {
801 f.followsMx.Lock()
802 defer f.followsMx.Unlock()
803 f.onFollowListUpdate = callback
804 }
805
806 // AddFollow appends a pubkey to the in-memory follows list if not already present
807 // and signals the syncer to refresh subscriptions.
808 func (f *Follows) AddFollow(pub []byte) {
809 if len(pub) == 0 {
810 return
811 }
812 pubHex := hex.EncodeToString(pub)
813
814 f.followsMx.Lock()
815 defer f.followsMx.Unlock()
816
817 // Use map for O(1) duplicate detection
818 if f.followsSet == nil {
819 f.followsSet = make(map[string]struct{})
820 }
821 if _, exists := f.followsSet[pubHex]; exists {
822 return
823 }
824
825 b := make([]byte, len(pub))
826 copy(b, pub)
827 f.follows = append(f.follows, b)
828 f.followsSet[pubHex] = struct{}{}
829
830 log.D.F(
831 "follows syncer: added new followed pubkey: %s",
832 pubHex,
833 )
834 // notify external listeners (e.g., spider)
835 if f.onFollowListUpdate != nil {
836 go f.onFollowListUpdate()
837 }
838 }
839
840 func init() {
841 Registry.Register(new(Follows))
842 }
843