handle-req.go raw

   1  package app
   2  
   3  import (
   4  	"context"
   5  	"encoding/hex"
   6  	"errors"
   7  	"fmt"
   8  	"strings"
   9  	"time"
  10  
  11  	"github.com/dgraph-io/badger/v4"
  12  	"next.orly.dev/pkg/lol/chk"
  13  	"next.orly.dev/pkg/lol/log"
  14  	"next.orly.dev/pkg/acl"
  15  	"next.orly.dev/pkg/nostr/encoders/bech32encoding"
  16  	"next.orly.dev/pkg/nostr/encoders/envelopes/authenvelope"
  17  	"next.orly.dev/pkg/nostr/encoders/envelopes/closedenvelope"
  18  	"next.orly.dev/pkg/nostr/encoders/envelopes/eoseenvelope"
  19  	"next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
  20  	"next.orly.dev/pkg/nostr/encoders/envelopes/reqenvelope"
  21  	"next.orly.dev/pkg/nostr/encoders/event"
  22  	"next.orly.dev/pkg/nostr/encoders/filter"
  23  	hexenc "next.orly.dev/pkg/nostr/encoders/hex"
  24  	"next.orly.dev/pkg/nostr/encoders/kind"
  25  	"next.orly.dev/pkg/nostr/encoders/reason"
  26  	"next.orly.dev/pkg/nostr/encoders/tag"
  27  	"next.orly.dev/pkg/policy"
  28  	"next.orly.dev/pkg/protocol/graph"
  29  	"next.orly.dev/pkg/protocol/nip43"
  30  	"next.orly.dev/pkg/protocol/publish"
  31  	"next.orly.dev/pkg/ratelimit"
  32  	"next.orly.dev/pkg/nostr/utils/normalize"
  33  	"next.orly.dev/pkg/nostr/utils/pointers"
  34  )
  35  
  36  func (l *Listener) HandleReq(msg []byte) (err error) {
  37  	log.D.F("handling REQ: %s", msg)
  38  	// var rem []byte
  39  	env := reqenvelope.New()
  40  	if _, err = env.Unmarshal(msg); chk.E(err) {
  41  		// Provide more specific error context for JSON parsing failures
  42  		if strings.Contains(err.Error(), "invalid character") {
  43  			log.E.F("REQ JSON parsing failed from %s: %v", l.remote, err)
  44  			log.T.F("REQ malformed message from %s: %q", l.remote, string(msg))
  45  			return normalize.Error.Errorf("malformed REQ message: %s", err.Error())
  46  		}
  47  		return normalize.Error.Errorf(err.Error())
  48  	}
  49  	log.T.C(
  50  		func() string {
  51  			return fmt.Sprintf(
  52  				"REQ sub=%s filters=%d", env.Subscription, len(*env.Filters),
  53  			)
  54  		},
  55  	)
  56  
  57  	// Classify query cost for adaptive rate limiting
  58  	var totalAuthors, totalKinds, totalIds int
  59  	var hasLimit bool
  60  	var limitVal int
  61  	for _, f := range *env.Filters {
  62  		if f == nil {
  63  			continue
  64  		}
  65  		if f.Authors != nil {
  66  			totalAuthors += f.Authors.Len()
  67  		}
  68  		if f.Kinds != nil {
  69  			totalKinds += f.Kinds.Len()
  70  		}
  71  		if f.Ids != nil {
  72  			totalIds += f.Ids.Len()
  73  		}
  74  		if f.Limit != nil {
  75  			hasLimit = true
  76  			limitVal = int(*f.Limit)
  77  		}
  78  	}
  79  	qCost := ratelimit.ClassifyQuery(totalAuthors, totalKinds, totalIds, hasLimit, limitVal)
  80  	log.D.F("REQ %s: query cost=%s (authors=%d, kinds=%d, ids=%d, limit=%v)",
  81  		env.Subscription, qCost.Level, totalAuthors, totalKinds, totalIds, limitVal)
  82  
  83  	// Track accumulated cost per connection (units: multiplier * 100)
  84  	l.queryCostAccumulator.Add(int64(qCost.Multiplier * 100))
  85  
  86  	// Adaptive query deferral: apply cost-weighted delay under load
  87  	if l.rateLimiter != nil && l.rateLimiter.IsEnabled() {
  88  		baseDelay := l.rateLimiter.ComputeDelay(ratelimit.Read)
  89  		if baseDelay > 0 {
  90  			costDelay := time.Duration(float64(baseDelay) * qCost.Multiplier)
  91  			if costDelay > 0 {
  92  				log.D.F("REQ %s: cost-weighted delay %v (cost=%s, base=%v)",
  93  					env.Subscription, costDelay, qCost.Level, baseDelay)
  94  				select {
  95  				case <-l.ctx.Done():
  96  					return nil
  97  				case <-time.After(costDelay):
  98  				}
  99  			}
 100  		}
 101  
 102  		// In emergency mode, reject expensive queries outright
 103  		if l.rateLimiter.InEmergencyMode() && qCost.Level >= ratelimit.CostHeavy {
 104  			log.W.F("REQ %s: rejecting expensive query (cost=%s) during emergency mode",
 105  				env.Subscription, qCost.Level)
 106  			if err = closedenvelope.NewFrom(
 107  				env.Subscription,
 108  				reason.Error.F("server overloaded, please retry later"),
 109  			).Write(l); chk.E(err) {
 110  				return
 111  			}
 112  			return nil
 113  		}
 114  	}
 115  
 116  	// NIP-46 signer-based authentication:
 117  	// If client is not authenticated and requests kind 24133 with exactly one #p tag,
 118  	// check if there's an active signer subscription for that pubkey.
 119  	// If so, authenticate the client as that pubkey.
 120  	const kindNIP46 = 24133
 121  	if len(l.authedPubkey.Load()) == 0 && len(*env.Filters) == 1 {
 122  		f := (*env.Filters)[0]
 123  		if f != nil && f.Kinds != nil && f.Kinds.Len() == 1 {
 124  			isNIP46Kind := false
 125  			for _, k := range f.Kinds.K {
 126  				if k.K == kindNIP46 {
 127  					isNIP46Kind = true
 128  					break
 129  				}
 130  			}
 131  			if isNIP46Kind && f.Tags != nil {
 132  				pTag := f.Tags.GetFirst([]byte("p"))
 133  				// Must have exactly one pubkey in the #p tag
 134  				if pTag != nil && pTag.Len() == 2 {
 135  					signerPubkey := pTag.Value()
 136  					// Convert to binary if hex
 137  					var signerPubkeyBin []byte
 138  					if len(signerPubkey) == 64 {
 139  						signerPubkeyBin, _ = hexenc.Dec(string(signerPubkey))
 140  					} else if len(signerPubkey) == 32 {
 141  						signerPubkeyBin = signerPubkey
 142  					}
 143  					if len(signerPubkeyBin) == 32 {
 144  						// Check if there's an active signer for this pubkey
 145  						if socketPub := l.publishers.GetSocketPublisher(); socketPub != nil {
 146  							if checker, ok := socketPub.(publish.NIP46SignerChecker); ok {
 147  								if checker.HasActiveNIP46Signer(signerPubkeyBin) {
 148  									log.I.F("NIP-46 auth: client %s authenticated via active signer %s",
 149  										l.remote, hexenc.Enc(signerPubkeyBin))
 150  									l.authedPubkey.Store(signerPubkeyBin)
 151  								}
 152  							}
 153  						}
 154  					}
 155  				}
 156  			}
 157  		}
 158  	}
 159  
 160  	// send a challenge to the client to auth if an ACL is active, auth is required, or AuthToWrite is enabled
 161  	if len(l.authedPubkey.Load()) == 0 && (acl.Registry.GetMode() != "none" || l.Config.AuthRequired || l.Config.AuthToWrite) {
 162  		if err = authenvelope.NewChallengeWith(l.challenge.Load()).
 163  			Write(l); chk.E(err) {
 164  			return
 165  		}
 166  	}
 167  	// check permissions of user
 168  	accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
 169  
 170  	// If auth is required but user is not authenticated, deny access
 171  	if l.Config.AuthRequired && len(l.authedPubkey.Load()) == 0 {
 172  		if err = closedenvelope.NewFrom(
 173  			env.Subscription,
 174  			reason.AuthRequired.F("authentication required"),
 175  		).Write(l); chk.E(err) {
 176  			return
 177  		}
 178  		return
 179  	}
 180  
 181  	// If AuthToWrite is enabled, allow REQ without auth (but still check ACL)
 182  	// Skip the auth requirement check for REQ when AuthToWrite is true
 183  	if l.Config.AuthToWrite && len(l.authedPubkey.Load()) == 0 {
 184  		// Allow unauthenticated REQ when AuthToWrite is enabled
 185  		// but still respect ACL access levels if ACL is active
 186  		if acl.Registry.GetMode() != "none" {
 187  			switch accessLevel {
 188  			case "none", "blocked", "banned":
 189  				if err = closedenvelope.NewFrom(
 190  					env.Subscription,
 191  					reason.AuthRequired.F("user not authed or has no read access"),
 192  				).Write(l); chk.E(err) {
 193  					return
 194  				}
 195  				return
 196  			}
 197  		}
 198  		// Allow the request to proceed without authentication
 199  	}
 200  
 201  	// Only check ACL access level if not already handled by AuthToWrite
 202  	if !l.Config.AuthToWrite || len(l.authedPubkey.Load()) > 0 {
 203  		switch accessLevel {
 204  		case "none":
 205  			// For REQ denial, send a CLOSED with auth-required reason (NIP-01)
 206  			if err = closedenvelope.NewFrom(
 207  				env.Subscription,
 208  				reason.AuthRequired.F("user not authed or has no read access"),
 209  			).Write(l); chk.E(err) {
 210  				return
 211  			}
 212  			return
 213  		default:
 214  			// user has read access or better, continue
 215  		}
 216  	}
 217  
 218  	// Privileged kinds (DMs, gift-wrap, seals, channels, etc.) always require
 219  	// authentication regardless of ACL mode. Discoverable channel kinds (40, 41)
 220  	// are exempt since they're needed for channel listing.
 221  	if len(l.authedPubkey.Load()) == 0 {
 222  		hasPrivilegedKinds := false
 223  		for _, f := range *env.Filters {
 224  			if f != nil && f.Kinds != nil {
 225  				for _, k := range f.Kinds.K {
 226  					if kind.IsPrivileged(k.K) && !kind.IsDiscoverableChannelKind(k.K) {
 227  						hasPrivilegedKinds = true
 228  						break
 229  					}
 230  				}
 231  			}
 232  			if hasPrivilegedKinds {
 233  				break
 234  			}
 235  		}
 236  		if hasPrivilegedKinds {
 237  			// Send AUTH challenge so client can authenticate
 238  			if err = authenvelope.NewChallengeWith(l.challenge.Load()).
 239  				Write(l); chk.E(err) {
 240  				return
 241  			}
 242  			if err = closedenvelope.NewFrom(
 243  				env.Subscription,
 244  				reason.AuthRequired.F("authentication required for access to private events"),
 245  			).Write(l); chk.E(err) {
 246  				return
 247  			}
 248  			return
 249  		}
 250  	}
 251  
 252  	// Handle NIP-43 invite request (kind 28935) - ephemeral event
 253  	// Check if any filter requests kind 28935
 254  	for _, f := range *env.Filters {
 255  		if f != nil && f.Kinds != nil {
 256  			if f.Kinds.Contains(nip43.KindInviteReq) {
 257  				// Generate and send invite event
 258  				inviteEvent, err := l.Server.HandleNIP43InviteRequest(l.authedPubkey.Load())
 259  				if err != nil {
 260  					log.W.F("failed to generate NIP-43 invite: %v", err)
 261  					// Send EOSE and return
 262  					if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
 263  						return err
 264  					}
 265  					return nil
 266  				}
 267  
 268  				// Send the invite event
 269  				evEnv, _ := eventenvelope.NewResultWith(env.Subscription, inviteEvent)
 270  				if err = evEnv.Write(l); chk.E(err) {
 271  					return err
 272  				}
 273  
 274  				// Send EOSE
 275  				if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
 276  					return err
 277  				}
 278  
 279  				log.D.F("sent NIP-43 invite event to %s", l.remote)
 280  				return nil
 281  			}
 282  		}
 283  	}
 284  
 285  	// Check for NIP-XX graph queries in filters
 286  	// Graph queries use the _graph filter extension to traverse the social graph
 287  	for _, f := range *env.Filters {
 288  		if f != nil && graph.IsGraphQuery(f) {
 289  			graphQuery, graphErr := graph.ExtractFromFilter(f)
 290  			if graphErr != nil {
 291  				log.W.F("invalid _graph query from %s: %v", l.remote, graphErr)
 292  				if err = closedenvelope.NewFrom(
 293  					env.Subscription,
 294  					reason.Error.F("invalid _graph query: %s", graphErr.Error()),
 295  				).Write(l); chk.E(err) {
 296  					return
 297  				}
 298  				return
 299  			}
 300  			if graphQuery != nil {
 301  				log.D.F("graph query from %s: edge=%s dir=%s seed=%s depth=%d",
 302  					l.remote, graphQuery.Edge, graphQuery.Direction, graphQuery.Pubkey, graphQuery.Depth)
 303  
 304  				// Check if graph executor is available
 305  				if l.graphExecutor == nil {
 306  					log.W.F("graph query received but executor not initialized")
 307  					if err = closedenvelope.NewFrom(
 308  						env.Subscription,
 309  						reason.Error.F("graph queries not supported on this relay"),
 310  					).Write(l); chk.E(err) {
 311  						return
 312  					}
 313  					return
 314  				}
 315  
 316  				// Execute the graph query
 317  				resultEvent, execErr := l.graphExecutor.Execute(graphQuery)
 318  				if execErr != nil {
 319  					log.W.F("graph query execution failed from %s: %v", l.remote, execErr)
 320  					if err = closedenvelope.NewFrom(
 321  						env.Subscription,
 322  						reason.Error.F("graph query failed: %s", execErr.Error()),
 323  					).Write(l); chk.E(err) {
 324  						return
 325  					}
 326  					return
 327  				}
 328  
 329  				// Send the result event
 330  				var res *eventenvelope.Result
 331  				if res, err = eventenvelope.NewResultWith(env.Subscription, resultEvent); chk.E(err) {
 332  					return
 333  				}
 334  				if err = res.Write(l); chk.E(err) {
 335  					return
 336  				}
 337  
 338  				// Send EOSE to signal completion
 339  				if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
 340  					return
 341  				}
 342  
 343  				log.D.F("graph query completed for %s: edge=%s dir=%s, returned event kind %d",
 344  					l.remote, graphQuery.Edge, graphQuery.Direction, resultEvent.Kind)
 345  				return
 346  			}
 347  		}
 348  	}
 349  
 350  	// Filter out policy config events (kind 12345) for non-policy-admin users
 351  	// Policy config events should only be visible to policy administrators
 352  	if l.policyManager != nil && l.policyManager.IsEnabled() {
 353  		isPolicyAdmin := l.policyManager.IsPolicyAdmin(l.authedPubkey.Load())
 354  		if !isPolicyAdmin {
 355  			// Remove kind 12345 from all filters
 356  			for _, f := range *env.Filters {
 357  				if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
 358  					// Create a new kinds list without PolicyConfig
 359  					var filteredKinds []*kind.K
 360  					for _, k := range f.Kinds.K {
 361  						if k.K != kind.PolicyConfig.K {
 362  							filteredKinds = append(filteredKinds, k)
 363  						}
 364  					}
 365  					f.Kinds.K = filteredKinds
 366  				}
 367  			}
 368  		}
 369  	}
 370  
 371  	var events event.S
 372  	// Create a single context for all filter queries, isolated from the connection context
 373  	// to prevent query timeouts from affecting the long-lived websocket connection
 374  	queryCtx, queryCancel := context.WithTimeout(
 375  		context.Background(), 30*time.Second,
 376  	)
 377  	defer queryCancel()
 378  
 379  	// Check cache first for single-filter queries (most common case)
 380  	// Multi-filter queries are not cached as they're more complex
 381  	if len(*env.Filters) == 1 && env.Filters != nil {
 382  		f := (*env.Filters)[0]
 383  		if cachedEvents, found := l.DB.GetCachedEvents(f); found {
 384  			log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedEvents))
 385  			// Wrap cached events with current subscription ID
 386  			for _, ev := range cachedEvents {
 387  				var res *eventenvelope.Result
 388  				if res, err = eventenvelope.NewResultWith(env.Subscription, ev); chk.E(err) {
 389  					return
 390  				}
 391  				if err = res.Write(l); err != nil {
 392  					if !strings.Contains(err.Error(), "context canceled") {
 393  						chk.E(err)
 394  					}
 395  					return
 396  				}
 397  			}
 398  			// Send EOSE
 399  			if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
 400  				return
 401  			}
 402  			// Don't create subscription for cached results with satisfied limits
 403  			if f.Limit != nil && len(cachedEvents) >= int(*f.Limit) {
 404  				log.D.F("REQ %s: limit satisfied by cache, not creating subscription", env.Subscription)
 405  				return
 406  			}
 407  			// Fall through to create subscription for ongoing updates
 408  		}
 409  	}
 410  
 411  	// Collect all events from all filters
 412  	var allEvents event.S
 413  
 414  	// Server-side query result limit to prevent memory exhaustion
 415  	serverLimit := l.Config.QueryResultLimit
 416  	if serverLimit <= 0 {
 417  		serverLimit = 256 // Default if not configured
 418  	}
 419  
 420  	for _, f := range *env.Filters {
 421  		if f != nil {
 422  			// Enforce server-side limit on each filter
 423  			if serverLimit > 0 {
 424  				if f.Limit == nil {
 425  					// No client limit - apply server limit
 426  					limitVal := uint(serverLimit)
 427  					f.Limit = &limitVal
 428  				} else if int(*f.Limit) > serverLimit {
 429  					// Client limit exceeds server limit - cap it
 430  					limitVal := uint(serverLimit)
 431  					f.Limit = &limitVal
 432  				}
 433  			}
 434  			// Summarize filter details for diagnostics (avoid internal fields)
 435  			var kindsLen int
 436  			if f.Kinds != nil {
 437  				kindsLen = f.Kinds.Len()
 438  			}
 439  			var authorsLen int
 440  			if f.Authors != nil {
 441  				authorsLen = f.Authors.Len()
 442  			}
 443  			var idsLen int
 444  			if f.Ids != nil {
 445  				idsLen = f.Ids.Len()
 446  			}
 447  			var dtag string
 448  			if f.Tags != nil {
 449  				if d := f.Tags.GetFirst([]byte("d")); d != nil {
 450  					dtag = string(d.Value())
 451  				}
 452  			}
 453  			var lim any
 454  			if f.Limit != nil {
 455  				lim = *f.Limit
 456  			}
 457  			var since any
 458  			if f.Since != nil {
 459  				since = f.Since.Int()
 460  			}
 461  			var until any
 462  			if f.Until != nil {
 463  				until = f.Until.Int()
 464  			}
 465  			log.T.C(
 466  				func() string {
 467  					return fmt.Sprintf(
 468  						"REQ %s filter: kinds.len=%d authors.len=%d ids.len=%d d=%q limit=%v since=%v until=%v",
 469  						env.Subscription, kindsLen, authorsLen, idsLen, dtag,
 470  						lim, since, until,
 471  					)
 472  				},
 473  			)
 474  
 475  			// Process large author lists by breaking them into chunks
 476  			if f.Authors != nil && f.Authors.Len() > 1000 {
 477  				log.W.F("REQ %s: breaking down large author list (%d authors) into chunks", env.Subscription, f.Authors.Len())
 478  
 479  				// Calculate chunk size to stay under message size limits
 480  				// Each pubkey is 64 hex chars, plus JSON overhead, so ~100 bytes per author
 481  				// Target ~50MB per chunk to stay well under 100MB limit
 482  				chunkSize := ClientMessageSizeLimit / 200 // ~500KB per chunk
 483  				if f.Kinds != nil && f.Kinds.Len() > 0 {
 484  					// Reduce chunk size if there are multiple kinds to prevent too many index ranges
 485  					chunkSize = chunkSize / f.Kinds.Len()
 486  					if chunkSize < 100 {
 487  						chunkSize = 100 // Minimum chunk size
 488  					}
 489  				}
 490  
 491  				// Process authors in chunks
 492  				for i := 0; i < f.Authors.Len(); i += chunkSize {
 493  					end := i + chunkSize
 494  					if end > f.Authors.Len() {
 495  						end = f.Authors.Len()
 496  					}
 497  
 498  					// Create a chunk filter
 499  					chunkAuthors := tag.NewFromBytesSlice(f.Authors.T[i:end]...)
 500  					chunkFilter := &filter.F{
 501  						Kinds:   f.Kinds,
 502  						Authors: chunkAuthors,
 503  						Ids:     f.Ids,
 504  						Tags:    f.Tags,
 505  						Since:   f.Since,
 506  						Until:   f.Until,
 507  						Limit:   f.Limit,
 508  						Search:  f.Search,
 509  					}
 510  
 511  					log.T.F("REQ %s: processing chunk %d-%d of %d authors", env.Subscription, i+1, end, f.Authors.Len())
 512  
 513  					// Process this chunk
 514  					var chunkEvents event.S
 515  					if chunkEvents, err = l.QueryEvents(queryCtx, chunkFilter); chk.E(err) {
 516  						if errors.Is(err, badger.ErrDBClosed) {
 517  							return
 518  						}
 519  						log.E.F("QueryEvents failed for chunk filter: %v", err)
 520  						err = nil
 521  						continue
 522  					}
 523  
 524  					// Add chunk results to overall results
 525  					allEvents = append(allEvents, chunkEvents...)
 526  
 527  					// Check if we've hit the limit
 528  					if f.Limit != nil && len(allEvents) >= int(*f.Limit) {
 529  						log.T.F("REQ %s: reached limit of %d events, stopping chunk processing", env.Subscription, *f.Limit)
 530  						break
 531  					}
 532  				}
 533  
 534  				// Skip the normal processing since we handled it in chunks
 535  				continue
 536  			}
 537  		}
 538  		if f != nil && pointers.Present(f.Limit) {
 539  			if *f.Limit == 0 {
 540  				continue
 541  			}
 542  		}
 543  		var filterEvents event.S
 544  		if filterEvents, err = l.QueryEvents(queryCtx, f); chk.E(err) {
 545  			if errors.Is(err, badger.ErrDBClosed) {
 546  				return
 547  			}
 548  			log.E.F("QueryEvents failed for filter: %v", err)
 549  			err = nil
 550  			continue
 551  		}
 552  		// Append events from this filter to the overall collection
 553  		allEvents = append(allEvents, filterEvents...)
 554  	}
 555  	events = allEvents
 556  	defer func() {
 557  		for _, ev := range events {
 558  			ev.Free()
 559  		}
 560  	}()
 561  	var tmp event.S
 562  	for _, ev := range events {
 563  		// Check for private tag first
 564  		privateTags := ev.Tags.GetAll([]byte("private"))
 565  		if len(privateTags) > 0 && accessLevel != "admin" {
 566  			pk := l.authedPubkey.Load()
 567  			if pk == nil {
 568  				continue // no auth, can't access private events
 569  			}
 570  
 571  			// Convert authenticated pubkey to npub for comparison
 572  			authedNpub, err := bech32encoding.BinToNpub(pk)
 573  			if err != nil {
 574  				continue // couldn't convert pubkey, skip
 575  			}
 576  
 577  			// Check if authenticated npub is in any private tag
 578  			authorized := false
 579  			for _, privateTag := range privateTags {
 580  				authorizedNpubs := strings.Split(
 581  					string(privateTag.Value()), ",",
 582  				)
 583  				for _, npub := range authorizedNpubs {
 584  					if strings.TrimSpace(npub) == string(authedNpub) {
 585  						authorized = true
 586  						break
 587  					}
 588  				}
 589  				if authorized {
 590  					break
 591  				}
 592  			}
 593  
 594  			if !authorized {
 595  				continue // not authorized to see this private event
 596  			}
 597  			// Event has private tag and user is authorized - continue to privileged check
 598  		}
 599  
 600  		// Filter privileged events based on kind.
 601  		// Privileged kinds always require auth and party-involvement checks,
 602  		// regardless of ACL mode. This protects DM metadata even on open relays.
 603  		if kind.IsPrivileged(ev.Kind) && accessLevel != "admin" {
 604  			log.T.C(
 605  				func() string {
 606  					return fmt.Sprintf(
 607  						"checking privileged event %0x", ev.ID,
 608  					)
 609  				},
 610  			)
 611  			pk := l.authedPubkey.Load()
 612  
 613  			// Channel kinds (40-44) use channel membership instead of p-tag involvement
 614  			var allowed bool
 615  			if kind.IsChannelKind(ev.Kind) && l.channelMembership != nil {
 616  				allowed = l.channelMembership.IsChannelMember(ev, pk, l.ctx)
 617  			} else {
 618  				// Use centralized IsPartyInvolved function for consistent privilege checking
 619  				allowed = policy.IsPartyInvolved(ev, pk)
 620  			}
 621  
 622  			if allowed {
 623  				log.T.C(
 624  					func() string {
 625  						return fmt.Sprintf(
 626  							"privileged event %s allowed for logged in pubkey %0x",
 627  							ev.ID, pk,
 628  						)
 629  					},
 630  				)
 631  				tmp = append(tmp, ev)
 632  			} else {
 633  				log.T.C(
 634  					func() string {
 635  						return fmt.Sprintf(
 636  							"privileged event %s denied for pubkey %0x (not authenticated or not a party involved)",
 637  							ev.ID, pk,
 638  						)
 639  					},
 640  				)
 641  			}
 642  		} else {
 643  			// Check if this non-privileged event references a channel event via e-tags.
 644  			// Reactions, reposts, zaps, etc. that target channel messages must be
 645  			// filtered based on the channel's access control.
 646  			if l.channelMembership != nil {
 647  				if channelIDHex, isChannel := l.channelMembership.ReferencesChannelEvent(ev, l.ctx); isChannel {
 648  					pk := l.authedPubkey.Load()
 649  					if !l.channelMembership.IsChannelMemberByID(channelIDHex, ev.Kind, pk, l.ctx) {
 650  						log.T.C(func() string {
 651  							return fmt.Sprintf(
 652  								"channel-referencing event %0x kind %d denied for pubkey %0x (not a member of channel %s)",
 653  								ev.ID, ev.Kind, pk, channelIDHex,
 654  							)
 655  						})
 656  						continue
 657  					}
 658  				}
 659  			}
 660  			tmp = append(tmp, ev)
 661  		}
 662  	}
 663  	events = tmp
 664  
 665  	// Apply policy filtering for read access if policy is enabled
 666  	if l.policyManager.IsEnabled() {
 667  		var policyFilteredEvents event.S
 668  		for _, ev := range events {
 669  			allowed, policyErr := l.policyManager.CheckPolicy("read", ev, l.authedPubkey.Load(), l.remote)
 670  			if chk.E(policyErr) {
 671  				log.E.F("policy check failed for read: %v", policyErr)
 672  				// Default to allow on policy error
 673  				policyFilteredEvents = append(policyFilteredEvents, ev)
 674  				continue
 675  			}
 676  
 677  			if allowed {
 678  				policyFilteredEvents = append(policyFilteredEvents, ev)
 679  			} else {
 680  				log.D.F("policy filtered out event %0x for read access", ev.ID)
 681  			}
 682  		}
 683  		events = policyFilteredEvents
 684  	}
 685  
 686  	// Deduplicate events (in case chunk processing returned duplicates)
 687  	// Use events (already filtered for privileged/policy) instead of allEvents
 688  	if len(events) > 0 {
 689  		seen := make(map[string]struct{})
 690  		var deduplicatedEvents event.S
 691  		originalCount := len(events)
 692  		for _, ev := range events {
 693  			eventID := hexenc.Enc(ev.ID)
 694  			if _, exists := seen[eventID]; !exists {
 695  				seen[eventID] = struct{}{}
 696  				deduplicatedEvents = append(deduplicatedEvents, ev)
 697  			}
 698  		}
 699  		events = deduplicatedEvents
 700  		if originalCount != len(events) {
 701  			log.T.F("REQ %s: deduplicated %d events to %d unique events", env.Subscription, originalCount, len(events))
 702  		}
 703  	}
 704  
 705  	// Apply managed ACL filtering for read access if managed ACL is active
 706  	if acl.Registry.GetMode() == "managed" {
 707  		var aclFilteredEvents event.S
 708  		for _, ev := range events {
 709  			// Check if event is banned
 710  			eventID := hex.EncodeToString(ev.ID)
 711  			if banned, err := l.getManagedACL().IsEventBanned(eventID); err == nil && banned {
 712  				log.D.F("managed ACL filtered out banned event %s", hexenc.Enc(ev.ID))
 713  				continue
 714  			}
 715  
 716  			// Check if event author is banned
 717  			authorHex := hex.EncodeToString(ev.Pubkey)
 718  			if banned, err := l.getManagedACL().IsPubkeyBanned(authorHex); err == nil && banned {
 719  				log.D.F("managed ACL filtered out event %s from banned pubkey %s", hexenc.Enc(ev.ID), authorHex)
 720  				continue
 721  			}
 722  
 723  			// Check if event kind is allowed (only if allowed kinds are configured)
 724  			if allowed, err := l.getManagedACL().IsKindAllowed(int(ev.Kind)); err == nil && !allowed {
 725  				allowedKinds, err := l.getManagedACL().ListAllowedKinds()
 726  				if err == nil && len(allowedKinds) > 0 {
 727  					log.D.F("managed ACL filtered out event %s with disallowed kind %d", hexenc.Enc(ev.ID), ev.Kind)
 728  					continue
 729  				}
 730  			}
 731  
 732  			aclFilteredEvents = append(aclFilteredEvents, ev)
 733  		}
 734  		events = aclFilteredEvents
 735  	}
 736  
 737  	// Apply curating ACL filtering for read access if curating ACL is active
 738  	if acl.Registry.GetMode() == "curating" {
 739  		// Find the curating ACL instance
 740  		for _, aclInstance := range acl.Registry.ACLs() {
 741  			if aclInstance.Type() == "curating" {
 742  				if curatingACL, ok := aclInstance.(*acl.Curating); ok {
 743  					var curatingFilteredEvents event.S
 744  					for _, ev := range events {
 745  						if curatingACL.IsEventVisible(ev, accessLevel) {
 746  							curatingFilteredEvents = append(curatingFilteredEvents, ev)
 747  						} else {
 748  							log.D.F("curating ACL filtered out event %s from blacklisted pubkey", hexenc.Enc(ev.ID))
 749  						}
 750  					}
 751  					events = curatingFilteredEvents
 752  				}
 753  				break
 754  			}
 755  		}
 756  	}
 757  
 758  	// Apply private tag filtering - only show events with "private" tags to authorized users
 759  	var privateFilteredEvents event.S
 760  	authedPubkey := l.authedPubkey.Load()
 761  	for _, ev := range events {
 762  		// Check if event has private tags
 763  		hasPrivateTag := false
 764  		var privatePubkey []byte
 765  
 766  		if ev.Tags != nil && ev.Tags.Len() > 0 {
 767  			for _, t := range *ev.Tags {
 768  				if t.Len() >= 2 {
 769  					keyBytes := t.Key()
 770  					if len(keyBytes) == 7 && string(keyBytes) == "private" {
 771  						hasPrivateTag = true
 772  						privatePubkey = t.Value()
 773  						break
 774  					}
 775  				}
 776  			}
 777  		}
 778  
 779  		// If no private tag, include the event
 780  		if !hasPrivateTag {
 781  			privateFilteredEvents = append(privateFilteredEvents, ev)
 782  			continue
 783  		}
 784  
 785  		// Event has private tag - check if user is authorized to see it
 786  		canSeePrivate := l.canSeePrivateEvent(authedPubkey, privatePubkey)
 787  		if canSeePrivate {
 788  			privateFilteredEvents = append(privateFilteredEvents, ev)
 789  			log.D.F("private tag: allowing event %s for authorized user", hexenc.Enc(ev.ID))
 790  		} else {
 791  			log.D.F("private tag: filtering out event %s from unauthorized user", hexenc.Enc(ev.ID))
 792  		}
 793  	}
 794  	events = privateFilteredEvents
 795  
 796  	seen := make(map[string]struct{})
 797  	// Cache events for single-filter queries (without subscription ID)
 798  	shouldCache := len(*env.Filters) == 1 && len(events) > 0
 799  
 800  	for _, ev := range events {
 801  		log.T.C(
 802  			func() string {
 803  				return fmt.Sprintf(
 804  					"REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
 805  					hexenc.Enc(ev.ID), ev.Kind,
 806  				)
 807  			},
 808  		)
 809  		log.T.C(
 810  			func() string {
 811  				return fmt.Sprintf("event:\n%s\n", ev.Serialize())
 812  			},
 813  		)
 814  		var res *eventenvelope.Result
 815  		if res, err = eventenvelope.NewResultWith(
 816  			env.Subscription, ev,
 817  		); chk.E(err) {
 818  			return
 819  		}
 820  
 821  		if err = res.Write(l); err != nil {
 822  			// Don't log context canceled errors as they're expected during shutdown
 823  			if !strings.Contains(err.Error(), "context canceled") {
 824  				chk.E(err)
 825  			}
 826  			return
 827  		}
 828  		// track the IDs we've sent (use hex encoding for stable key)
 829  		seen[hexenc.Enc(ev.ID)] = struct{}{}
 830  	}
 831  
 832  	// Populate cache after successfully sending all events
 833  	// Cache the events themselves (not marshaled JSON with subscription ID)
 834  	if shouldCache && len(events) > 0 {
 835  		f := (*env.Filters)[0]
 836  		l.DB.CacheEvents(f, events)
 837  		log.D.F("REQ %s: cached %d events", env.Subscription, len(events))
 838  	}
 839  	// write the EOSE to signal to the client that all events found have been
 840  	// sent.
 841  	log.T.F("sending EOSE to %s", l.remote)
 842  	if err = eoseenvelope.NewFrom(env.Subscription).
 843  		Write(l); chk.E(err) {
 844  		return
 845  	}
 846  
 847  	// Record access for returned events (for GC access-based ranking).
 848  	// Copy event IDs before launching the goroutine because the deferred
 849  	// ev.Free() above will release the events when HandleReq returns.
 850  	if l.accessTracker != nil && len(events) > 0 {
 851  		eventIDs := make([][]byte, 0, len(events))
 852  		for _, ev := range events {
 853  			if len(ev.ID) == 32 {
 854  				id := make([]byte, 32)
 855  				copy(id, ev.ID)
 856  				eventIDs = append(eventIDs, id)
 857  			}
 858  		}
 859  		go func(ids [][]byte, connID string) {
 860  			defer func() {
 861  				if r := recover(); r != nil {
 862  					log.W.F("access tracker panic (recovered): %v", r)
 863  				}
 864  			}()
 865  			for _, id := range ids {
 866  				if ser, err := l.DB.GetSerialById(id); err == nil && ser != nil {
 867  					l.accessTracker.RecordAccess(ser.Get(), connID)
 868  				}
 869  			}
 870  		}(eventIDs, l.connectionID)
 871  	}
 872  
 873  	// Trigger archive relay query if enabled (background fetch + stream results)
 874  	if l.archiveManager != nil && l.archiveManager.IsEnabled() && len(*env.Filters) > 0 {
 875  		// Use first filter for archive query
 876  		f := (*env.Filters)[0]
 877  		go l.archiveManager.QueryArchive(
 878  			string(env.Subscription),
 879  			l.connectionID,
 880  			f,
 881  			seen,
 882  			l, // implements EventDeliveryChannel
 883  		)
 884  	}
 885  
 886  	// if the query was for just Ids, we know there can't be any more results,
 887  	// so cancel the subscription.
 888  	cancel := true
 889  	log.T.F(
 890  		"REQ %s: computing cancel/subscription; events_sent=%d",
 891  		env.Subscription, len(events),
 892  	)
 893  	var subbedFilters filter.S
 894  	for _, f := range *env.Filters {
 895  		// Check if this filter's limit was satisfied
 896  		limitSatisfied := false
 897  		if pointers.Present(f.Limit) {
 898  			if len(events) >= int(*f.Limit) {
 899  				limitSatisfied = true
 900  			}
 901  		}
 902  
 903  		if f.Ids.Len() < 1 {
 904  			// Filter has no IDs - keep subscription open unless limit was satisfied
 905  			if !limitSatisfied {
 906  				cancel = false
 907  				subbedFilters = append(subbedFilters, f)
 908  			}
 909  		} else {
 910  			// remove the IDs that we already sent, as it's one less
 911  			// comparison we have to make.
 912  			var notFounds [][]byte
 913  			for _, id := range f.Ids.T {
 914  				if _, ok := seen[hexenc.Enc(id)]; ok {
 915  					continue
 916  				}
 917  				notFounds = append(notFounds, id)
 918  			}
 919  			log.T.F(
 920  				"REQ %s: ids outstanding=%d of %d", env.Subscription,
 921  				len(notFounds), f.Ids.Len(),
 922  			)
 923  			// if all were found, don't add to subbedFilters
 924  			if len(notFounds) == 0 {
 925  				continue
 926  			}
 927  			// Check if limit was satisfied
 928  			if limitSatisfied {
 929  				continue
 930  			}
 931  			// rewrite the filter Ids to remove the ones we already sent
 932  			f.Ids = tag.NewFromBytesSlice(notFounds...)
 933  			// add the filter to the list of filters we're subscribing to
 934  			cancel = false
 935  			subbedFilters = append(subbedFilters, f)
 936  		}
 937  	}
 938  	receiver := make(event.C, 32)
 939  	// if the subscription should be cancelled, do so
 940  	if !cancel {
 941  		// Check global subscription limit (reduced in emergency mode)
 942  		maxSubs := int64(l.Config.MaxSubscriptions)
 943  		if maxSubs <= 0 {
 944  			maxSubs = 10000
 945  		}
 946  		if l.rateLimiter != nil && l.rateLimiter.InEmergencyMode() {
 947  			maxSubs = maxSubs / 10 // Restrict to 10% during emergency
 948  			if maxSubs < 100 {
 949  				maxSubs = 100
 950  			}
 951  		}
 952  		if l.activeSubscriptionCount.Load() >= maxSubs {
 953  			log.W.F("REQ %s: rejecting subscription (active=%d, max=%d)",
 954  				env.Subscription, l.activeSubscriptionCount.Load(), maxSubs)
 955  			// Send EOSE without creating subscription
 956  			if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
 957  				return
 958  			}
 959  			return nil
 960  		}
 961  		l.activeSubscriptionCount.Add(1)
 962  
 963  		// Create a dedicated context for this subscription that's independent of query context
 964  		// but is child of the listener context so it gets cancelled when connection closes
 965  		subCtx, subCancel := context.WithCancel(l.ctx)
 966  
 967  		// Track this subscription so we can cancel it on CLOSE or connection close
 968  		subID := string(env.Subscription)
 969  		l.subscriptionsMu.Lock()
 970  		if l.subscriptions == nil {
 971  			l.subscriptions = make(map[string]context.CancelFunc)
 972  		}
 973  		l.subscriptions[subID] = subCancel
 974  		l.subscriptionsMu.Unlock()
 975  
 976  		// Register subscription with publisher
 977  		// AuthRequired is set when ACL is active OR when the subscription includes
 978  		// non-discoverable channel kinds (42-44 require auth regardless of ACL mode)
 979  		authRequired := acl.Registry.GetMode() != "none"
 980  		if !authRequired {
 981  			for _, f := range subbedFilters {
 982  				if f != nil && f.Kinds != nil {
 983  					for _, k := range f.Kinds.K {
 984  						if kind.IsChannelKind(k.K) && !kind.IsDiscoverableChannelKind(k.K) {
 985  							authRequired = true
 986  							break
 987  						}
 988  					}
 989  				}
 990  				if authRequired {
 991  					break
 992  				}
 993  			}
 994  		}
 995  		l.publishers.Receive(
 996  			&W{
 997  				Conn:         l.conn,
 998  				remote:       l.remote,
 999  				Id:           subID,
1000  				Receiver:     receiver,
1001  				Filters:      &subbedFilters,
1002  				AuthedPubkey: l.authedPubkey.Load(),
1003  				AuthRequired: authRequired,
1004  			},
1005  		)
1006  
1007  		// Launch goroutine to consume from receiver channel and forward to client
1008  		// This is the critical missing piece - without this, the receiver channel fills up
1009  		// and the publisher times out trying to send, causing subscription to be removed
1010  		go func() {
1011  			defer func() {
1012  				// Clean up when subscription ends
1013  				l.activeSubscriptionCount.Add(-1)
1014  				l.subscriptionsMu.Lock()
1015  				delete(l.subscriptions, subID)
1016  				l.subscriptionsMu.Unlock()
1017  				log.D.F("subscription goroutine exiting for %s @ %s", subID, l.remote)
1018  			}()
1019  
1020  			for {
1021  				select {
1022  				case <-subCtx.Done():
1023  					// Subscription cancelled (CLOSE message or connection closing)
1024  					log.D.F("subscription %s cancelled for %s", subID, l.remote)
1025  					return
1026  				case ev, ok := <-receiver:
1027  					if !ok {
1028  						// Channel closed - subscription ended
1029  						log.D.F("subscription %s receiver channel closed for %s", subID, l.remote)
1030  						return
1031  					}
1032  
1033  					// Forward event to client via write channel
1034  					var res *eventenvelope.Result
1035  					var err error
1036  					if res, err = eventenvelope.NewResultWith(subID, ev); chk.E(err) {
1037  						log.E.F("failed to create event envelope for subscription %s: %v", subID, err)
1038  						continue
1039  					}
1040  
1041  					// Write to client - this goes through the write worker
1042  					if err = res.Write(l); err != nil {
1043  						if !strings.Contains(err.Error(), "context canceled") {
1044  							log.E.F("failed to write event to subscription %s @ %s: %v", subID, l.remote, err)
1045  						}
1046  						// Don't return here - write errors shouldn't kill the subscription
1047  						// The connection cleanup will handle removing the subscription
1048  						continue
1049  					}
1050  
1051  					log.D.F("delivered real-time event %s to subscription %s @ %s",
1052  						hexenc.Enc(ev.ID), subID, l.remote)
1053  				}
1054  			}
1055  		}()
1056  
1057  		log.D.F("subscription %s created and goroutine launched for %s", subID, l.remote)
1058  	} else {
1059  		// suppress server-sent CLOSED; client will close subscription if desired
1060  		log.D.F("subscription request cancelled immediately (all IDs found or limit satisfied)")
1061  	}
1062  	log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
1063  	return
1064  }
1065