handle-req.go raw
1 package app
2
3 import (
4 "context"
5 "encoding/hex"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10
11 "github.com/dgraph-io/badger/v4"
12 "next.orly.dev/pkg/lol/chk"
13 "next.orly.dev/pkg/lol/log"
14 "next.orly.dev/pkg/acl"
15 "next.orly.dev/pkg/nostr/encoders/bech32encoding"
16 "next.orly.dev/pkg/nostr/encoders/envelopes/authenvelope"
17 "next.orly.dev/pkg/nostr/encoders/envelopes/closedenvelope"
18 "next.orly.dev/pkg/nostr/encoders/envelopes/eoseenvelope"
19 "next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
20 "next.orly.dev/pkg/nostr/encoders/envelopes/reqenvelope"
21 "next.orly.dev/pkg/nostr/encoders/event"
22 "next.orly.dev/pkg/nostr/encoders/filter"
23 hexenc "next.orly.dev/pkg/nostr/encoders/hex"
24 "next.orly.dev/pkg/nostr/encoders/kind"
25 "next.orly.dev/pkg/nostr/encoders/reason"
26 "next.orly.dev/pkg/nostr/encoders/tag"
27 "next.orly.dev/pkg/policy"
28 "next.orly.dev/pkg/protocol/graph"
29 "next.orly.dev/pkg/protocol/nip43"
30 "next.orly.dev/pkg/protocol/publish"
31 "next.orly.dev/pkg/ratelimit"
32 "next.orly.dev/pkg/nostr/utils/normalize"
33 "next.orly.dev/pkg/nostr/utils/pointers"
34 )
35
36 func (l *Listener) HandleReq(msg []byte) (err error) {
37 log.D.F("handling REQ: %s", msg)
38 // var rem []byte
39 env := reqenvelope.New()
40 if _, err = env.Unmarshal(msg); chk.E(err) {
41 // Provide more specific error context for JSON parsing failures
42 if strings.Contains(err.Error(), "invalid character") {
43 log.E.F("REQ JSON parsing failed from %s: %v", l.remote, err)
44 log.T.F("REQ malformed message from %s: %q", l.remote, string(msg))
45 return normalize.Error.Errorf("malformed REQ message: %s", err.Error())
46 }
47 return normalize.Error.Errorf(err.Error())
48 }
49 log.T.C(
50 func() string {
51 return fmt.Sprintf(
52 "REQ sub=%s filters=%d", env.Subscription, len(*env.Filters),
53 )
54 },
55 )
56
57 // Classify query cost for adaptive rate limiting
58 var totalAuthors, totalKinds, totalIds int
59 var hasLimit bool
60 var limitVal int
61 for _, f := range *env.Filters {
62 if f == nil {
63 continue
64 }
65 if f.Authors != nil {
66 totalAuthors += f.Authors.Len()
67 }
68 if f.Kinds != nil {
69 totalKinds += f.Kinds.Len()
70 }
71 if f.Ids != nil {
72 totalIds += f.Ids.Len()
73 }
74 if f.Limit != nil {
75 hasLimit = true
76 limitVal = int(*f.Limit)
77 }
78 }
79 qCost := ratelimit.ClassifyQuery(totalAuthors, totalKinds, totalIds, hasLimit, limitVal)
80 log.D.F("REQ %s: query cost=%s (authors=%d, kinds=%d, ids=%d, limit=%v)",
81 env.Subscription, qCost.Level, totalAuthors, totalKinds, totalIds, limitVal)
82
83 // Track accumulated cost per connection (units: multiplier * 100)
84 l.queryCostAccumulator.Add(int64(qCost.Multiplier * 100))
85
86 // Adaptive query deferral: apply cost-weighted delay under load
87 if l.rateLimiter != nil && l.rateLimiter.IsEnabled() {
88 baseDelay := l.rateLimiter.ComputeDelay(ratelimit.Read)
89 if baseDelay > 0 {
90 costDelay := time.Duration(float64(baseDelay) * qCost.Multiplier)
91 if costDelay > 0 {
92 log.D.F("REQ %s: cost-weighted delay %v (cost=%s, base=%v)",
93 env.Subscription, costDelay, qCost.Level, baseDelay)
94 select {
95 case <-l.ctx.Done():
96 return nil
97 case <-time.After(costDelay):
98 }
99 }
100 }
101
102 // In emergency mode, reject expensive queries outright
103 if l.rateLimiter.InEmergencyMode() && qCost.Level >= ratelimit.CostHeavy {
104 log.W.F("REQ %s: rejecting expensive query (cost=%s) during emergency mode",
105 env.Subscription, qCost.Level)
106 if err = closedenvelope.NewFrom(
107 env.Subscription,
108 reason.Error.F("server overloaded, please retry later"),
109 ).Write(l); chk.E(err) {
110 return
111 }
112 return nil
113 }
114 }
115
116 // NIP-46 signer-based authentication:
117 // If client is not authenticated and requests kind 24133 with exactly one #p tag,
118 // check if there's an active signer subscription for that pubkey.
119 // If so, authenticate the client as that pubkey.
120 const kindNIP46 = 24133
121 if len(l.authedPubkey.Load()) == 0 && len(*env.Filters) == 1 {
122 f := (*env.Filters)[0]
123 if f != nil && f.Kinds != nil && f.Kinds.Len() == 1 {
124 isNIP46Kind := false
125 for _, k := range f.Kinds.K {
126 if k.K == kindNIP46 {
127 isNIP46Kind = true
128 break
129 }
130 }
131 if isNIP46Kind && f.Tags != nil {
132 pTag := f.Tags.GetFirst([]byte("p"))
133 // Must have exactly one pubkey in the #p tag
134 if pTag != nil && pTag.Len() == 2 {
135 signerPubkey := pTag.Value()
136 // Convert to binary if hex
137 var signerPubkeyBin []byte
138 if len(signerPubkey) == 64 {
139 signerPubkeyBin, _ = hexenc.Dec(string(signerPubkey))
140 } else if len(signerPubkey) == 32 {
141 signerPubkeyBin = signerPubkey
142 }
143 if len(signerPubkeyBin) == 32 {
144 // Check if there's an active signer for this pubkey
145 if socketPub := l.publishers.GetSocketPublisher(); socketPub != nil {
146 if checker, ok := socketPub.(publish.NIP46SignerChecker); ok {
147 if checker.HasActiveNIP46Signer(signerPubkeyBin) {
148 log.I.F("NIP-46 auth: client %s authenticated via active signer %s",
149 l.remote, hexenc.Enc(signerPubkeyBin))
150 l.authedPubkey.Store(signerPubkeyBin)
151 }
152 }
153 }
154 }
155 }
156 }
157 }
158 }
159
160 // send a challenge to the client to auth if an ACL is active, auth is required, or AuthToWrite is enabled
161 if len(l.authedPubkey.Load()) == 0 && (acl.Registry.GetMode() != "none" || l.Config.AuthRequired || l.Config.AuthToWrite) {
162 if err = authenvelope.NewChallengeWith(l.challenge.Load()).
163 Write(l); chk.E(err) {
164 return
165 }
166 }
167 // check permissions of user
168 accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
169
170 // If auth is required but user is not authenticated, deny access
171 if l.Config.AuthRequired && len(l.authedPubkey.Load()) == 0 {
172 if err = closedenvelope.NewFrom(
173 env.Subscription,
174 reason.AuthRequired.F("authentication required"),
175 ).Write(l); chk.E(err) {
176 return
177 }
178 return
179 }
180
181 // If AuthToWrite is enabled, allow REQ without auth (but still check ACL)
182 // Skip the auth requirement check for REQ when AuthToWrite is true
183 if l.Config.AuthToWrite && len(l.authedPubkey.Load()) == 0 {
184 // Allow unauthenticated REQ when AuthToWrite is enabled
185 // but still respect ACL access levels if ACL is active
186 if acl.Registry.GetMode() != "none" {
187 switch accessLevel {
188 case "none", "blocked", "banned":
189 if err = closedenvelope.NewFrom(
190 env.Subscription,
191 reason.AuthRequired.F("user not authed or has no read access"),
192 ).Write(l); chk.E(err) {
193 return
194 }
195 return
196 }
197 }
198 // Allow the request to proceed without authentication
199 }
200
201 // Only check ACL access level if not already handled by AuthToWrite
202 if !l.Config.AuthToWrite || len(l.authedPubkey.Load()) > 0 {
203 switch accessLevel {
204 case "none":
205 // For REQ denial, send a CLOSED with auth-required reason (NIP-01)
206 if err = closedenvelope.NewFrom(
207 env.Subscription,
208 reason.AuthRequired.F("user not authed or has no read access"),
209 ).Write(l); chk.E(err) {
210 return
211 }
212 return
213 default:
214 // user has read access or better, continue
215 }
216 }
217
218 // Privileged kinds (DMs, gift-wrap, seals, channels, etc.) always require
219 // authentication regardless of ACL mode. Discoverable channel kinds (40, 41)
220 // are exempt since they're needed for channel listing.
221 if len(l.authedPubkey.Load()) == 0 {
222 hasPrivilegedKinds := false
223 for _, f := range *env.Filters {
224 if f != nil && f.Kinds != nil {
225 for _, k := range f.Kinds.K {
226 if kind.IsPrivileged(k.K) && !kind.IsDiscoverableChannelKind(k.K) {
227 hasPrivilegedKinds = true
228 break
229 }
230 }
231 }
232 if hasPrivilegedKinds {
233 break
234 }
235 }
236 if hasPrivilegedKinds {
237 // Send AUTH challenge so client can authenticate
238 if err = authenvelope.NewChallengeWith(l.challenge.Load()).
239 Write(l); chk.E(err) {
240 return
241 }
242 if err = closedenvelope.NewFrom(
243 env.Subscription,
244 reason.AuthRequired.F("authentication required for access to private events"),
245 ).Write(l); chk.E(err) {
246 return
247 }
248 return
249 }
250 }
251
252 // Handle NIP-43 invite request (kind 28935) - ephemeral event
253 // Check if any filter requests kind 28935
254 for _, f := range *env.Filters {
255 if f != nil && f.Kinds != nil {
256 if f.Kinds.Contains(nip43.KindInviteReq) {
257 // Generate and send invite event
258 inviteEvent, err := l.Server.HandleNIP43InviteRequest(l.authedPubkey.Load())
259 if err != nil {
260 log.W.F("failed to generate NIP-43 invite: %v", err)
261 // Send EOSE and return
262 if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
263 return err
264 }
265 return nil
266 }
267
268 // Send the invite event
269 evEnv, _ := eventenvelope.NewResultWith(env.Subscription, inviteEvent)
270 if err = evEnv.Write(l); chk.E(err) {
271 return err
272 }
273
274 // Send EOSE
275 if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
276 return err
277 }
278
279 log.D.F("sent NIP-43 invite event to %s", l.remote)
280 return nil
281 }
282 }
283 }
284
285 // Check for NIP-XX graph queries in filters
286 // Graph queries use the _graph filter extension to traverse the social graph
287 for _, f := range *env.Filters {
288 if f != nil && graph.IsGraphQuery(f) {
289 graphQuery, graphErr := graph.ExtractFromFilter(f)
290 if graphErr != nil {
291 log.W.F("invalid _graph query from %s: %v", l.remote, graphErr)
292 if err = closedenvelope.NewFrom(
293 env.Subscription,
294 reason.Error.F("invalid _graph query: %s", graphErr.Error()),
295 ).Write(l); chk.E(err) {
296 return
297 }
298 return
299 }
300 if graphQuery != nil {
301 log.D.F("graph query from %s: edge=%s dir=%s seed=%s depth=%d",
302 l.remote, graphQuery.Edge, graphQuery.Direction, graphQuery.Pubkey, graphQuery.Depth)
303
304 // Check if graph executor is available
305 if l.graphExecutor == nil {
306 log.W.F("graph query received but executor not initialized")
307 if err = closedenvelope.NewFrom(
308 env.Subscription,
309 reason.Error.F("graph queries not supported on this relay"),
310 ).Write(l); chk.E(err) {
311 return
312 }
313 return
314 }
315
316 // Execute the graph query
317 resultEvent, execErr := l.graphExecutor.Execute(graphQuery)
318 if execErr != nil {
319 log.W.F("graph query execution failed from %s: %v", l.remote, execErr)
320 if err = closedenvelope.NewFrom(
321 env.Subscription,
322 reason.Error.F("graph query failed: %s", execErr.Error()),
323 ).Write(l); chk.E(err) {
324 return
325 }
326 return
327 }
328
329 // Send the result event
330 var res *eventenvelope.Result
331 if res, err = eventenvelope.NewResultWith(env.Subscription, resultEvent); chk.E(err) {
332 return
333 }
334 if err = res.Write(l); chk.E(err) {
335 return
336 }
337
338 // Send EOSE to signal completion
339 if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
340 return
341 }
342
343 log.D.F("graph query completed for %s: edge=%s dir=%s, returned event kind %d",
344 l.remote, graphQuery.Edge, graphQuery.Direction, resultEvent.Kind)
345 return
346 }
347 }
348 }
349
350 // Filter out policy config events (kind 12345) for non-policy-admin users
351 // Policy config events should only be visible to policy administrators
352 if l.policyManager != nil && l.policyManager.IsEnabled() {
353 isPolicyAdmin := l.policyManager.IsPolicyAdmin(l.authedPubkey.Load())
354 if !isPolicyAdmin {
355 // Remove kind 12345 from all filters
356 for _, f := range *env.Filters {
357 if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
358 // Create a new kinds list without PolicyConfig
359 var filteredKinds []*kind.K
360 for _, k := range f.Kinds.K {
361 if k.K != kind.PolicyConfig.K {
362 filteredKinds = append(filteredKinds, k)
363 }
364 }
365 f.Kinds.K = filteredKinds
366 }
367 }
368 }
369 }
370
371 var events event.S
372 // Create a single context for all filter queries, isolated from the connection context
373 // to prevent query timeouts from affecting the long-lived websocket connection
374 queryCtx, queryCancel := context.WithTimeout(
375 context.Background(), 30*time.Second,
376 )
377 defer queryCancel()
378
379 // Check cache first for single-filter queries (most common case)
380 // Multi-filter queries are not cached as they're more complex
381 if len(*env.Filters) == 1 && env.Filters != nil {
382 f := (*env.Filters)[0]
383 if cachedEvents, found := l.DB.GetCachedEvents(f); found {
384 log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedEvents))
385 // Wrap cached events with current subscription ID
386 for _, ev := range cachedEvents {
387 var res *eventenvelope.Result
388 if res, err = eventenvelope.NewResultWith(env.Subscription, ev); chk.E(err) {
389 return
390 }
391 if err = res.Write(l); err != nil {
392 if !strings.Contains(err.Error(), "context canceled") {
393 chk.E(err)
394 }
395 return
396 }
397 }
398 // Send EOSE
399 if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
400 return
401 }
402 // Don't create subscription for cached results with satisfied limits
403 if f.Limit != nil && len(cachedEvents) >= int(*f.Limit) {
404 log.D.F("REQ %s: limit satisfied by cache, not creating subscription", env.Subscription)
405 return
406 }
407 // Fall through to create subscription for ongoing updates
408 }
409 }
410
411 // Collect all events from all filters
412 var allEvents event.S
413
414 // Server-side query result limit to prevent memory exhaustion
415 serverLimit := l.Config.QueryResultLimit
416 if serverLimit <= 0 {
417 serverLimit = 256 // Default if not configured
418 }
419
420 for _, f := range *env.Filters {
421 if f != nil {
422 // Enforce server-side limit on each filter
423 if serverLimit > 0 {
424 if f.Limit == nil {
425 // No client limit - apply server limit
426 limitVal := uint(serverLimit)
427 f.Limit = &limitVal
428 } else if int(*f.Limit) > serverLimit {
429 // Client limit exceeds server limit - cap it
430 limitVal := uint(serverLimit)
431 f.Limit = &limitVal
432 }
433 }
434 // Summarize filter details for diagnostics (avoid internal fields)
435 var kindsLen int
436 if f.Kinds != nil {
437 kindsLen = f.Kinds.Len()
438 }
439 var authorsLen int
440 if f.Authors != nil {
441 authorsLen = f.Authors.Len()
442 }
443 var idsLen int
444 if f.Ids != nil {
445 idsLen = f.Ids.Len()
446 }
447 var dtag string
448 if f.Tags != nil {
449 if d := f.Tags.GetFirst([]byte("d")); d != nil {
450 dtag = string(d.Value())
451 }
452 }
453 var lim any
454 if f.Limit != nil {
455 lim = *f.Limit
456 }
457 var since any
458 if f.Since != nil {
459 since = f.Since.Int()
460 }
461 var until any
462 if f.Until != nil {
463 until = f.Until.Int()
464 }
465 log.T.C(
466 func() string {
467 return fmt.Sprintf(
468 "REQ %s filter: kinds.len=%d authors.len=%d ids.len=%d d=%q limit=%v since=%v until=%v",
469 env.Subscription, kindsLen, authorsLen, idsLen, dtag,
470 lim, since, until,
471 )
472 },
473 )
474
475 // Process large author lists by breaking them into chunks
476 if f.Authors != nil && f.Authors.Len() > 1000 {
477 log.W.F("REQ %s: breaking down large author list (%d authors) into chunks", env.Subscription, f.Authors.Len())
478
479 // Calculate chunk size to stay under message size limits
480 // Each pubkey is 64 hex chars, plus JSON overhead, so ~100 bytes per author
481 // Target ~50MB per chunk to stay well under 100MB limit
482 chunkSize := ClientMessageSizeLimit / 200 // ~500KB per chunk
483 if f.Kinds != nil && f.Kinds.Len() > 0 {
484 // Reduce chunk size if there are multiple kinds to prevent too many index ranges
485 chunkSize = chunkSize / f.Kinds.Len()
486 if chunkSize < 100 {
487 chunkSize = 100 // Minimum chunk size
488 }
489 }
490
491 // Process authors in chunks
492 for i := 0; i < f.Authors.Len(); i += chunkSize {
493 end := i + chunkSize
494 if end > f.Authors.Len() {
495 end = f.Authors.Len()
496 }
497
498 // Create a chunk filter
499 chunkAuthors := tag.NewFromBytesSlice(f.Authors.T[i:end]...)
500 chunkFilter := &filter.F{
501 Kinds: f.Kinds,
502 Authors: chunkAuthors,
503 Ids: f.Ids,
504 Tags: f.Tags,
505 Since: f.Since,
506 Until: f.Until,
507 Limit: f.Limit,
508 Search: f.Search,
509 }
510
511 log.T.F("REQ %s: processing chunk %d-%d of %d authors", env.Subscription, i+1, end, f.Authors.Len())
512
513 // Process this chunk
514 var chunkEvents event.S
515 if chunkEvents, err = l.QueryEvents(queryCtx, chunkFilter); chk.E(err) {
516 if errors.Is(err, badger.ErrDBClosed) {
517 return
518 }
519 log.E.F("QueryEvents failed for chunk filter: %v", err)
520 err = nil
521 continue
522 }
523
524 // Add chunk results to overall results
525 allEvents = append(allEvents, chunkEvents...)
526
527 // Check if we've hit the limit
528 if f.Limit != nil && len(allEvents) >= int(*f.Limit) {
529 log.T.F("REQ %s: reached limit of %d events, stopping chunk processing", env.Subscription, *f.Limit)
530 break
531 }
532 }
533
534 // Skip the normal processing since we handled it in chunks
535 continue
536 }
537 }
538 if f != nil && pointers.Present(f.Limit) {
539 if *f.Limit == 0 {
540 continue
541 }
542 }
543 var filterEvents event.S
544 if filterEvents, err = l.QueryEvents(queryCtx, f); chk.E(err) {
545 if errors.Is(err, badger.ErrDBClosed) {
546 return
547 }
548 log.E.F("QueryEvents failed for filter: %v", err)
549 err = nil
550 continue
551 }
552 // Append events from this filter to the overall collection
553 allEvents = append(allEvents, filterEvents...)
554 }
555 events = allEvents
556 defer func() {
557 for _, ev := range events {
558 ev.Free()
559 }
560 }()
561 var tmp event.S
562 for _, ev := range events {
563 // Check for private tag first
564 privateTags := ev.Tags.GetAll([]byte("private"))
565 if len(privateTags) > 0 && accessLevel != "admin" {
566 pk := l.authedPubkey.Load()
567 if pk == nil {
568 continue // no auth, can't access private events
569 }
570
571 // Convert authenticated pubkey to npub for comparison
572 authedNpub, err := bech32encoding.BinToNpub(pk)
573 if err != nil {
574 continue // couldn't convert pubkey, skip
575 }
576
577 // Check if authenticated npub is in any private tag
578 authorized := false
579 for _, privateTag := range privateTags {
580 authorizedNpubs := strings.Split(
581 string(privateTag.Value()), ",",
582 )
583 for _, npub := range authorizedNpubs {
584 if strings.TrimSpace(npub) == string(authedNpub) {
585 authorized = true
586 break
587 }
588 }
589 if authorized {
590 break
591 }
592 }
593
594 if !authorized {
595 continue // not authorized to see this private event
596 }
597 // Event has private tag and user is authorized - continue to privileged check
598 }
599
600 // Filter privileged events based on kind.
601 // Privileged kinds always require auth and party-involvement checks,
602 // regardless of ACL mode. This protects DM metadata even on open relays.
603 if kind.IsPrivileged(ev.Kind) && accessLevel != "admin" {
604 log.T.C(
605 func() string {
606 return fmt.Sprintf(
607 "checking privileged event %0x", ev.ID,
608 )
609 },
610 )
611 pk := l.authedPubkey.Load()
612
613 // Channel kinds (40-44) use channel membership instead of p-tag involvement
614 var allowed bool
615 if kind.IsChannelKind(ev.Kind) && l.channelMembership != nil {
616 allowed = l.channelMembership.IsChannelMember(ev, pk, l.ctx)
617 } else {
618 // Use centralized IsPartyInvolved function for consistent privilege checking
619 allowed = policy.IsPartyInvolved(ev, pk)
620 }
621
622 if allowed {
623 log.T.C(
624 func() string {
625 return fmt.Sprintf(
626 "privileged event %s allowed for logged in pubkey %0x",
627 ev.ID, pk,
628 )
629 },
630 )
631 tmp = append(tmp, ev)
632 } else {
633 log.T.C(
634 func() string {
635 return fmt.Sprintf(
636 "privileged event %s denied for pubkey %0x (not authenticated or not a party involved)",
637 ev.ID, pk,
638 )
639 },
640 )
641 }
642 } else {
643 // Check if this non-privileged event references a channel event via e-tags.
644 // Reactions, reposts, zaps, etc. that target channel messages must be
645 // filtered based on the channel's access control.
646 if l.channelMembership != nil {
647 if channelIDHex, isChannel := l.channelMembership.ReferencesChannelEvent(ev, l.ctx); isChannel {
648 pk := l.authedPubkey.Load()
649 if !l.channelMembership.IsChannelMemberByID(channelIDHex, ev.Kind, pk, l.ctx) {
650 log.T.C(func() string {
651 return fmt.Sprintf(
652 "channel-referencing event %0x kind %d denied for pubkey %0x (not a member of channel %s)",
653 ev.ID, ev.Kind, pk, channelIDHex,
654 )
655 })
656 continue
657 }
658 }
659 }
660 tmp = append(tmp, ev)
661 }
662 }
663 events = tmp
664
665 // Apply policy filtering for read access if policy is enabled
666 if l.policyManager.IsEnabled() {
667 var policyFilteredEvents event.S
668 for _, ev := range events {
669 allowed, policyErr := l.policyManager.CheckPolicy("read", ev, l.authedPubkey.Load(), l.remote)
670 if chk.E(policyErr) {
671 log.E.F("policy check failed for read: %v", policyErr)
672 // Default to allow on policy error
673 policyFilteredEvents = append(policyFilteredEvents, ev)
674 continue
675 }
676
677 if allowed {
678 policyFilteredEvents = append(policyFilteredEvents, ev)
679 } else {
680 log.D.F("policy filtered out event %0x for read access", ev.ID)
681 }
682 }
683 events = policyFilteredEvents
684 }
685
686 // Deduplicate events (in case chunk processing returned duplicates)
687 // Use events (already filtered for privileged/policy) instead of allEvents
688 if len(events) > 0 {
689 seen := make(map[string]struct{})
690 var deduplicatedEvents event.S
691 originalCount := len(events)
692 for _, ev := range events {
693 eventID := hexenc.Enc(ev.ID)
694 if _, exists := seen[eventID]; !exists {
695 seen[eventID] = struct{}{}
696 deduplicatedEvents = append(deduplicatedEvents, ev)
697 }
698 }
699 events = deduplicatedEvents
700 if originalCount != len(events) {
701 log.T.F("REQ %s: deduplicated %d events to %d unique events", env.Subscription, originalCount, len(events))
702 }
703 }
704
705 // Apply managed ACL filtering for read access if managed ACL is active
706 if acl.Registry.GetMode() == "managed" {
707 var aclFilteredEvents event.S
708 for _, ev := range events {
709 // Check if event is banned
710 eventID := hex.EncodeToString(ev.ID)
711 if banned, err := l.getManagedACL().IsEventBanned(eventID); err == nil && banned {
712 log.D.F("managed ACL filtered out banned event %s", hexenc.Enc(ev.ID))
713 continue
714 }
715
716 // Check if event author is banned
717 authorHex := hex.EncodeToString(ev.Pubkey)
718 if banned, err := l.getManagedACL().IsPubkeyBanned(authorHex); err == nil && banned {
719 log.D.F("managed ACL filtered out event %s from banned pubkey %s", hexenc.Enc(ev.ID), authorHex)
720 continue
721 }
722
723 // Check if event kind is allowed (only if allowed kinds are configured)
724 if allowed, err := l.getManagedACL().IsKindAllowed(int(ev.Kind)); err == nil && !allowed {
725 allowedKinds, err := l.getManagedACL().ListAllowedKinds()
726 if err == nil && len(allowedKinds) > 0 {
727 log.D.F("managed ACL filtered out event %s with disallowed kind %d", hexenc.Enc(ev.ID), ev.Kind)
728 continue
729 }
730 }
731
732 aclFilteredEvents = append(aclFilteredEvents, ev)
733 }
734 events = aclFilteredEvents
735 }
736
737 // Apply curating ACL filtering for read access if curating ACL is active
738 if acl.Registry.GetMode() == "curating" {
739 // Find the curating ACL instance
740 for _, aclInstance := range acl.Registry.ACLs() {
741 if aclInstance.Type() == "curating" {
742 if curatingACL, ok := aclInstance.(*acl.Curating); ok {
743 var curatingFilteredEvents event.S
744 for _, ev := range events {
745 if curatingACL.IsEventVisible(ev, accessLevel) {
746 curatingFilteredEvents = append(curatingFilteredEvents, ev)
747 } else {
748 log.D.F("curating ACL filtered out event %s from blacklisted pubkey", hexenc.Enc(ev.ID))
749 }
750 }
751 events = curatingFilteredEvents
752 }
753 break
754 }
755 }
756 }
757
758 // Apply private tag filtering - only show events with "private" tags to authorized users
759 var privateFilteredEvents event.S
760 authedPubkey := l.authedPubkey.Load()
761 for _, ev := range events {
762 // Check if event has private tags
763 hasPrivateTag := false
764 var privatePubkey []byte
765
766 if ev.Tags != nil && ev.Tags.Len() > 0 {
767 for _, t := range *ev.Tags {
768 if t.Len() >= 2 {
769 keyBytes := t.Key()
770 if len(keyBytes) == 7 && string(keyBytes) == "private" {
771 hasPrivateTag = true
772 privatePubkey = t.Value()
773 break
774 }
775 }
776 }
777 }
778
779 // If no private tag, include the event
780 if !hasPrivateTag {
781 privateFilteredEvents = append(privateFilteredEvents, ev)
782 continue
783 }
784
785 // Event has private tag - check if user is authorized to see it
786 canSeePrivate := l.canSeePrivateEvent(authedPubkey, privatePubkey)
787 if canSeePrivate {
788 privateFilteredEvents = append(privateFilteredEvents, ev)
789 log.D.F("private tag: allowing event %s for authorized user", hexenc.Enc(ev.ID))
790 } else {
791 log.D.F("private tag: filtering out event %s from unauthorized user", hexenc.Enc(ev.ID))
792 }
793 }
794 events = privateFilteredEvents
795
796 seen := make(map[string]struct{})
797 // Cache events for single-filter queries (without subscription ID)
798 shouldCache := len(*env.Filters) == 1 && len(events) > 0
799
800 for _, ev := range events {
801 log.T.C(
802 func() string {
803 return fmt.Sprintf(
804 "REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
805 hexenc.Enc(ev.ID), ev.Kind,
806 )
807 },
808 )
809 log.T.C(
810 func() string {
811 return fmt.Sprintf("event:\n%s\n", ev.Serialize())
812 },
813 )
814 var res *eventenvelope.Result
815 if res, err = eventenvelope.NewResultWith(
816 env.Subscription, ev,
817 ); chk.E(err) {
818 return
819 }
820
821 if err = res.Write(l); err != nil {
822 // Don't log context canceled errors as they're expected during shutdown
823 if !strings.Contains(err.Error(), "context canceled") {
824 chk.E(err)
825 }
826 return
827 }
828 // track the IDs we've sent (use hex encoding for stable key)
829 seen[hexenc.Enc(ev.ID)] = struct{}{}
830 }
831
832 // Populate cache after successfully sending all events
833 // Cache the events themselves (not marshaled JSON with subscription ID)
834 if shouldCache && len(events) > 0 {
835 f := (*env.Filters)[0]
836 l.DB.CacheEvents(f, events)
837 log.D.F("REQ %s: cached %d events", env.Subscription, len(events))
838 }
839 // write the EOSE to signal to the client that all events found have been
840 // sent.
841 log.T.F("sending EOSE to %s", l.remote)
842 if err = eoseenvelope.NewFrom(env.Subscription).
843 Write(l); chk.E(err) {
844 return
845 }
846
847 // Record access for returned events (for GC access-based ranking).
848 // Copy event IDs before launching the goroutine because the deferred
849 // ev.Free() above will release the events when HandleReq returns.
850 if l.accessTracker != nil && len(events) > 0 {
851 eventIDs := make([][]byte, 0, len(events))
852 for _, ev := range events {
853 if len(ev.ID) == 32 {
854 id := make([]byte, 32)
855 copy(id, ev.ID)
856 eventIDs = append(eventIDs, id)
857 }
858 }
859 go func(ids [][]byte, connID string) {
860 defer func() {
861 if r := recover(); r != nil {
862 log.W.F("access tracker panic (recovered): %v", r)
863 }
864 }()
865 for _, id := range ids {
866 if ser, err := l.DB.GetSerialById(id); err == nil && ser != nil {
867 l.accessTracker.RecordAccess(ser.Get(), connID)
868 }
869 }
870 }(eventIDs, l.connectionID)
871 }
872
873 // Trigger archive relay query if enabled (background fetch + stream results)
874 if l.archiveManager != nil && l.archiveManager.IsEnabled() && len(*env.Filters) > 0 {
875 // Use first filter for archive query
876 f := (*env.Filters)[0]
877 go l.archiveManager.QueryArchive(
878 string(env.Subscription),
879 l.connectionID,
880 f,
881 seen,
882 l, // implements EventDeliveryChannel
883 )
884 }
885
886 // if the query was for just Ids, we know there can't be any more results,
887 // so cancel the subscription.
888 cancel := true
889 log.T.F(
890 "REQ %s: computing cancel/subscription; events_sent=%d",
891 env.Subscription, len(events),
892 )
893 var subbedFilters filter.S
894 for _, f := range *env.Filters {
895 // Check if this filter's limit was satisfied
896 limitSatisfied := false
897 if pointers.Present(f.Limit) {
898 if len(events) >= int(*f.Limit) {
899 limitSatisfied = true
900 }
901 }
902
903 if f.Ids.Len() < 1 {
904 // Filter has no IDs - keep subscription open unless limit was satisfied
905 if !limitSatisfied {
906 cancel = false
907 subbedFilters = append(subbedFilters, f)
908 }
909 } else {
910 // remove the IDs that we already sent, as it's one less
911 // comparison we have to make.
912 var notFounds [][]byte
913 for _, id := range f.Ids.T {
914 if _, ok := seen[hexenc.Enc(id)]; ok {
915 continue
916 }
917 notFounds = append(notFounds, id)
918 }
919 log.T.F(
920 "REQ %s: ids outstanding=%d of %d", env.Subscription,
921 len(notFounds), f.Ids.Len(),
922 )
923 // if all were found, don't add to subbedFilters
924 if len(notFounds) == 0 {
925 continue
926 }
927 // Check if limit was satisfied
928 if limitSatisfied {
929 continue
930 }
931 // rewrite the filter Ids to remove the ones we already sent
932 f.Ids = tag.NewFromBytesSlice(notFounds...)
933 // add the filter to the list of filters we're subscribing to
934 cancel = false
935 subbedFilters = append(subbedFilters, f)
936 }
937 }
938 receiver := make(event.C, 32)
939 // if the subscription should be cancelled, do so
940 if !cancel {
941 // Check global subscription limit (reduced in emergency mode)
942 maxSubs := int64(l.Config.MaxSubscriptions)
943 if maxSubs <= 0 {
944 maxSubs = 10000
945 }
946 if l.rateLimiter != nil && l.rateLimiter.InEmergencyMode() {
947 maxSubs = maxSubs / 10 // Restrict to 10% during emergency
948 if maxSubs < 100 {
949 maxSubs = 100
950 }
951 }
952 if l.activeSubscriptionCount.Load() >= maxSubs {
953 log.W.F("REQ %s: rejecting subscription (active=%d, max=%d)",
954 env.Subscription, l.activeSubscriptionCount.Load(), maxSubs)
955 // Send EOSE without creating subscription
956 if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
957 return
958 }
959 return nil
960 }
961 l.activeSubscriptionCount.Add(1)
962
963 // Create a dedicated context for this subscription that's independent of query context
964 // but is child of the listener context so it gets cancelled when connection closes
965 subCtx, subCancel := context.WithCancel(l.ctx)
966
967 // Track this subscription so we can cancel it on CLOSE or connection close
968 subID := string(env.Subscription)
969 l.subscriptionsMu.Lock()
970 if l.subscriptions == nil {
971 l.subscriptions = make(map[string]context.CancelFunc)
972 }
973 l.subscriptions[subID] = subCancel
974 l.subscriptionsMu.Unlock()
975
976 // Register subscription with publisher
977 // AuthRequired is set when ACL is active OR when the subscription includes
978 // non-discoverable channel kinds (42-44 require auth regardless of ACL mode)
979 authRequired := acl.Registry.GetMode() != "none"
980 if !authRequired {
981 for _, f := range subbedFilters {
982 if f != nil && f.Kinds != nil {
983 for _, k := range f.Kinds.K {
984 if kind.IsChannelKind(k.K) && !kind.IsDiscoverableChannelKind(k.K) {
985 authRequired = true
986 break
987 }
988 }
989 }
990 if authRequired {
991 break
992 }
993 }
994 }
995 l.publishers.Receive(
996 &W{
997 Conn: l.conn,
998 remote: l.remote,
999 Id: subID,
1000 Receiver: receiver,
1001 Filters: &subbedFilters,
1002 AuthedPubkey: l.authedPubkey.Load(),
1003 AuthRequired: authRequired,
1004 },
1005 )
1006
1007 // Launch goroutine to consume from receiver channel and forward to client
1008 // This is the critical missing piece - without this, the receiver channel fills up
1009 // and the publisher times out trying to send, causing subscription to be removed
1010 go func() {
1011 defer func() {
1012 // Clean up when subscription ends
1013 l.activeSubscriptionCount.Add(-1)
1014 l.subscriptionsMu.Lock()
1015 delete(l.subscriptions, subID)
1016 l.subscriptionsMu.Unlock()
1017 log.D.F("subscription goroutine exiting for %s @ %s", subID, l.remote)
1018 }()
1019
1020 for {
1021 select {
1022 case <-subCtx.Done():
1023 // Subscription cancelled (CLOSE message or connection closing)
1024 log.D.F("subscription %s cancelled for %s", subID, l.remote)
1025 return
1026 case ev, ok := <-receiver:
1027 if !ok {
1028 // Channel closed - subscription ended
1029 log.D.F("subscription %s receiver channel closed for %s", subID, l.remote)
1030 return
1031 }
1032
1033 // Forward event to client via write channel
1034 var res *eventenvelope.Result
1035 var err error
1036 if res, err = eventenvelope.NewResultWith(subID, ev); chk.E(err) {
1037 log.E.F("failed to create event envelope for subscription %s: %v", subID, err)
1038 continue
1039 }
1040
1041 // Write to client - this goes through the write worker
1042 if err = res.Write(l); err != nil {
1043 if !strings.Contains(err.Error(), "context canceled") {
1044 log.E.F("failed to write event to subscription %s @ %s: %v", subID, l.remote, err)
1045 }
1046 // Don't return here - write errors shouldn't kill the subscription
1047 // The connection cleanup will handle removing the subscription
1048 continue
1049 }
1050
1051 log.D.F("delivered real-time event %s to subscription %s @ %s",
1052 hexenc.Enc(ev.ID), subID, l.remote)
1053 }
1054 }
1055 }()
1056
1057 log.D.F("subscription %s created and goroutine launched for %s", subID, l.remote)
1058 } else {
1059 // suppress server-sent CLOSED; client will close subscription if desired
1060 log.D.F("subscription request cancelled immediately (all IDs found or limit satisfied)")
1061 }
1062 log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
1063 return
1064 }
1065