server.go raw

   1  package app
   2  
   3  import (
   4  	"context"
   5  	"encoding/json"
   6  	"fmt"
   7  	"io"
   8  	"log"
   9  	"net/http"
  10  	"net/http/httputil"
  11  	"net/url"
  12  	"strconv"
  13  	"strings"
  14  	"sync"
  15  	"sync/atomic"
  16  	"time"
  17  
  18  	"next.orly.dev/pkg/lol/chk"
  19  	"next.orly.dev/app/branding"
  20  	"next.orly.dev/app/config"
  21  	"next.orly.dev/pkg/acl"
  22  	acliface "next.orly.dev/pkg/interfaces/acl"
  23  	"next.orly.dev/pkg/blossom"
  24  	"next.orly.dev/pkg/database"
  25  	domainevents "next.orly.dev/pkg/domain/events"
  26  	"next.orly.dev/pkg/domain/events/subscribers"
  27  	"next.orly.dev/pkg/event/authorization"
  28  	"next.orly.dev/pkg/event/ingestion"
  29  	"next.orly.dev/pkg/event/processing"
  30  	"next.orly.dev/pkg/event/routing"
  31  	"next.orly.dev/pkg/event/specialkinds"
  32  	"next.orly.dev/pkg/event/validation"
  33  	"next.orly.dev/pkg/nostr/encoders/event"
  34  	"next.orly.dev/pkg/nostr/encoders/filter"
  35  	"next.orly.dev/pkg/nostr/encoders/hex"
  36  	"next.orly.dev/pkg/nostr/encoders/tag"
  37  	"next.orly.dev/pkg/policy"
  38  	"next.orly.dev/pkg/nostr/protocol/auth"
  39  	"next.orly.dev/pkg/nostr/httpauth"
  40  	"next.orly.dev/pkg/grapevine"
  41  	"next.orly.dev/pkg/protocol/graph"
  42  	"next.orly.dev/pkg/protocol/nip43"
  43  	"next.orly.dev/pkg/protocol/publish"
  44  	"next.orly.dev/pkg/bunker"
  45  	"next.orly.dev/pkg/protocol/nrc"
  46  	"next.orly.dev/pkg/ratelimit"
  47  	"next.orly.dev/pkg/spider"
  48  	"next.orly.dev/pkg/storage"
  49  	dsync "next.orly.dev/pkg/sync"
  50  	"next.orly.dev/pkg/wireguard"
  51  	"next.orly.dev/pkg/archive"
  52  	emailbridge "next.orly.dev/pkg/bridge"
  53  	"next.orly.dev/pkg/crawler"
  54  	"next.orly.dev/pkg/httpguard"
  55  	"next.orly.dev/pkg/transport"
  56  )
  57  
  58  type Server struct {
  59  	mux *http.ServeMux
  60  	// Config holds the relay configuration.
  61  	// Deprecated: Use GetConfig() method instead of accessing directly.
  62  	Config *config.C
  63  	// Ctx holds the server context.
  64  	// Deprecated: Use Context() method instead of accessing directly.
  65  	Ctx context.Context
  66  	publishers *publish.S
  67  	// Admins holds the admin pubkeys.
  68  	// Deprecated: Use IsAdmin() method instead of accessing directly.
  69  	Admins [][]byte
  70  	// Owners holds the owner pubkeys.
  71  	// Deprecated: Use IsOwner() method instead of accessing directly.
  72  	Owners [][]byte
  73  	// DB holds the database instance.
  74  	// Deprecated: Use Database() method instead of accessing directly.
  75  	DB database.Database
  76  
  77  	// optional reverse proxy for dev web server
  78  	devProxy *httputil.ReverseProxy
  79  
  80  	// Per-IP connection tracking to prevent resource exhaustion
  81  	connPerIPMu sync.RWMutex
  82  	connPerIP   map[string]int
  83  
  84  	// Global connection and subscription counters for adaptive rate limiting
  85  	activeConnCount         atomic.Int64
  86  	activeSubscriptionCount atomic.Int64
  87  
  88  	// Challenge storage for HTTP UI authentication
  89  	challengeMutex sync.RWMutex
  90  	challenges     map[string][]byte
  91  
  92  	// Message processing pause mutex for policy/follow list updates
  93  	// Use RLock() for normal message processing, Lock() for updates
  94  	messagePauseMutex sync.RWMutex
  95  
  96  	paymentProcessor  *PaymentProcessor
  97  	sprocketManager   *SprocketManager
  98  	policyManager     *policy.P
  99  	spiderManager     *spider.Spider
 100  	directorySpider   *spider.DirectorySpider
 101  	syncManager       *dsync.Manager
 102  	relayGroupMgr     *dsync.RelayGroupManager
 103  	clusterManager    *dsync.ClusterManager
 104  	blossomServer     *blossom.Server
 105  	InviteManager     *nip43.InviteManager
 106  	graphExecutor        *graph.Executor
 107  	grapeVineEngine      *grapevine.Engine
 108  	grapeVineScheduler   *grapevine.Scheduler
 109  	rateLimiter          *ratelimit.Limiter
 110  	cfg               *config.C
 111  	db                database.Database // Changed from *database.D to interface
 112  
 113  	// Domain services for event handling
 114  	eventValidator    *validation.Service
 115  	eventAuthorizer   *authorization.Service
 116  	eventRouter       *routing.DefaultRouter
 117  	eventProcessor    *processing.Service
 118  	eventDispatcher   *domainevents.Dispatcher
 119  	ingestionService  *ingestion.Service
 120  	specialKinds      *specialkinds.Registry
 121  	aclRegistry       acliface.Registry
 122  
 123  	// WireGuard VPN and NIP-46 Bunker
 124  	wireguardServer *wireguard.Server
 125  	bunkerServer    *bunker.Server
 126  	subnetPool      *wireguard.SubnetPool
 127  
 128  	// Channel membership enforcement for NIRC (kinds 40-44)
 129  	channelMembership *ChannelMembership
 130  
 131  	// Negentropy full sync whitelist: hex pubkeys allowed full event delivery.
 132  	// Empty map means nobody gets full sync (public only for everyone).
 133  	negentropyFullSyncPubkeys map[string]bool
 134  
 135  	// DM rate limiter (stranger 3-message cap)
 136  	dmRateLimiter *DMRateLimiter
 137  
 138  	// NRC (Nostr Relay Connect) bridge for remote relay access
 139  	nrcBridge     *nrc.Bridge
 140  	nrcEventStore *database.NRCEventStore
 141  
 142  	// Archive relay and storage management
 143  	archiveManager   *archive.Manager
 144  	accessTracker    *storage.AccessTracker
 145  	garbageCollector *storage.GarbageCollector
 146  
 147  	// Transport manager for network transports (TCP, TLS, Tor, etc.)
 148  	transportMgr *transport.Manager
 149  
 150  	// Branding/white-label customization
 151  	brandingMgr *branding.Manager
 152  
 153  	// HTTP guard (bot blocking + rate limiting)
 154  	httpGuard *httpguard.Guard
 155  
 156  	// Email bridge (Marmot DM to SMTP)
 157  	emailBridge *emailbridge.Bridge
 158  
 159  	// Corpus crawler (relay discovery + negentropy sync)
 160  	corpusCrawler *crawler.Crawler
 161  
 162  	// Smesh embedded web client
 163  	smeshServer *SmeshServer
 164  }
 165  
 166  // =============================================================================
 167  // Server Accessor Methods
 168  // =============================================================================
 169  
 170  // GetConfig returns the relay configuration.
 171  func (s *Server) GetConfig() *config.C {
 172  	return s.Config
 173  }
 174  
 175  // Context returns the server context.
 176  func (s *Server) Context() context.Context {
 177  	return s.Ctx
 178  }
 179  
 180  // Database returns the database instance.
 181  func (s *Server) Database() database.Database {
 182  	return s.DB
 183  }
 184  
 185  // IsAdmin returns true if the given pubkey is an admin.
 186  func (s *Server) IsAdmin(pubkey []byte) bool {
 187  	pubHex := string(hex.Enc(pubkey))
 188  	for _, admin := range s.Admins {
 189  		if string(hex.Enc(admin)) == pubHex {
 190  			return true
 191  		}
 192  	}
 193  	return false
 194  }
 195  
 196  // IsOwner returns true if the given pubkey is an owner.
 197  func (s *Server) IsOwner(pubkey []byte) bool {
 198  	pubHex := string(hex.Enc(pubkey))
 199  	for _, owner := range s.Owners {
 200  		if string(hex.Enc(owner)) == pubHex {
 201  			return true
 202  		}
 203  	}
 204  	return false
 205  }
 206  
 207  // ACLRegistry returns the ACL registry instance.
 208  // This enables dependency injection for testing and removes reliance on global state.
 209  func (s *Server) ACLRegistry() acliface.Registry {
 210  	return s.aclRegistry
 211  }
 212  
 213  // isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
 214  func (s *Server) isIPBlacklisted(remote string) bool {
 215  	// Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1")
 216  	remoteIP := strings.Split(remote, ":")[0]
 217  
 218  	// Check static IP blacklist from config first
 219  	if len(s.Config.IPBlacklist) > 0 {
 220  		for _, blocked := range s.Config.IPBlacklist {
 221  			// Allow simple prefix matching for subnets (e.g., "192.168" matches 192.168.0.0/16)
 222  			if blocked != "" && strings.HasPrefix(remoteIP, blocked) {
 223  				return true
 224  			}
 225  		}
 226  	}
 227  
 228  	// Check if managed ACL is available and active
 229  	if s.Config.ACLMode == "managed" {
 230  		for _, aclInstance := range acl.Registry.ACLs() {
 231  			if aclInstance.Type() == "managed" {
 232  				if managed, ok := aclInstance.(*acl.Managed); ok {
 233  					return managed.IsIPBlocked(remoteIP)
 234  				}
 235  			}
 236  		}
 237  	}
 238  
 239  	return false
 240  }
 241  
 242  // isAllowedCORSOrigin checks if the given origin is allowed for CORS requests.
 243  // Returns true if:
 244  //   - CORSOrigins contains "*" (allow all)
 245  //   - CORSOrigins is empty (allow all when CORS is enabled)
 246  //   - The origin matches one of the configured origins
 247  func (s *Server) isAllowedCORSOrigin(origin string) bool {
 248  	if origin == "" {
 249  		return false
 250  	}
 251  
 252  	// If no specific origins configured, allow all
 253  	if len(s.Config.CORSOrigins) == 0 {
 254  		return true
 255  	}
 256  
 257  	for _, allowed := range s.Config.CORSOrigins {
 258  		if allowed == "*" {
 259  			return true
 260  		}
 261  		if allowed == origin {
 262  			return true
 263  		}
 264  	}
 265  	return false
 266  }
 267  
 268  func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 269  	// HTTP guard: bot blocking + per-IP rate limiting (before any routing)
 270  	if s.httpGuard != nil && !s.httpGuard.Allow(w, r) {
 271  		return
 272  	}
 273  
 274  	// Check if this is a blossom-related path (needs CORS headers)
 275  	path := r.URL.Path
 276  	isBlossomPath := path == "/upload" || path == "/media" ||
 277  		path == "/mirror" || path == "/report" ||
 278  		strings.HasPrefix(path, "/list/") ||
 279  		strings.HasPrefix(path, "/blossom/") ||
 280  		(len(path) == 65 && path[0] == '/') // /<sha256> blob downloads
 281  
 282  	// Set CORS headers for all blossom-related requests
 283  	if isBlossomPath {
 284  		w.Header().Set("Access-Control-Allow-Origin", "*")
 285  		w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, PUT, DELETE, OPTIONS")
 286  		w.Header().Set("Access-Control-Allow-Headers", "Authorization, authorization, Content-Type, content-type, X-SHA-256, x-sha-256, X-Content-Length, x-content-length, X-Content-Type, x-content-type, Accept, accept")
 287  		w.Header().Set("Access-Control-Expose-Headers", "X-Reason, Content-Length, Content-Type, Accept-Ranges")
 288  		w.Header().Set("Access-Control-Max-Age", "86400")
 289  
 290  		// Handle preflight OPTIONS requests for blossom paths
 291  		if r.Method == "OPTIONS" {
 292  			w.WriteHeader(http.StatusOK)
 293  			return
 294  		}
 295  	}
 296  
 297  	// Set CORS headers for API endpoints when enabled (for standalone dashboard mode)
 298  	// Also allow root path for NIP-11 relay info requests
 299  	isAPIPath := strings.HasPrefix(path, "/api/")
 300  	isRelayInfoRequest := path == "/" && r.Header.Get("Accept") == "application/nostr+json"
 301  	if s.Config != nil && s.Config.CORSEnabled && (isAPIPath || isRelayInfoRequest) {
 302  		origin := r.Header.Get("Origin")
 303  		if s.isAllowedCORSOrigin(origin) {
 304  			w.Header().Set("Access-Control-Allow-Origin", origin)
 305  			w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
 306  			w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Accept")
 307  			w.Header().Set("Access-Control-Allow-Credentials", "true")
 308  			w.Header().Set("Access-Control-Max-Age", "86400")
 309  		}
 310  
 311  		// Handle preflight OPTIONS requests for API paths
 312  		if r.Method == "OPTIONS" {
 313  			w.WriteHeader(http.StatusOK)
 314  			return
 315  		}
 316  	}
 317  
 318  	if r.Method == "OPTIONS" && !isBlossomPath && !isAPIPath {
 319  		// Handle OPTIONS for other paths
 320  		if s.mux != nil {
 321  			s.mux.ServeHTTP(w, r)
 322  			return
 323  		}
 324  		w.WriteHeader(http.StatusOK)
 325  		return
 326  	}
 327  
 328  	// Log proxy information for debugging (only for WebSocket requests to avoid spam)
 329  	if r.Header.Get("Upgrade") == "websocket" {
 330  		LogProxyInfo(r, "HTTP request")
 331  	}
 332  
 333  	// If this is a websocket request, only intercept the relay root path.
 334  	// This allows other websocket paths (e.g., Vite HMR) to be handled by the dev proxy when enabled.
 335  	if r.Header.Get("Upgrade") == "websocket" {
 336  		if s.mux != nil && s.Config != nil && s.Config.WebDisableEmbedded && s.Config.WebDevProxyURL != "" && r.URL.Path != "/" {
 337  			// forward to mux (which will proxy to dev server)
 338  			s.mux.ServeHTTP(w, r)
 339  			return
 340  		}
 341  		s.HandleWebsocket(w, r)
 342  		return
 343  	}
 344  
 345  	if r.Header.Get("Accept") == "application/nostr+json" {
 346  		s.HandleRelayInfo(w, r)
 347  		return
 348  	}
 349  
 350  	if s.mux == nil {
 351  		http.Error(w, "Upgrade required", http.StatusUpgradeRequired)
 352  		return
 353  	}
 354  	s.mux.ServeHTTP(w, r)
 355  }
 356  
 357  func (s *Server) ServiceURL(req *http.Request) (url string) {
 358  	// Use configured RelayURL if available
 359  	if s.Config != nil && s.Config.RelayURL != "" {
 360  		relayURL := strings.TrimSuffix(s.Config.RelayURL, "/")
 361  		// Ensure it has a protocol
 362  		if !strings.HasPrefix(relayURL, "http://") && !strings.HasPrefix(relayURL, "https://") {
 363  			relayURL = "http://" + relayURL
 364  		}
 365  		return relayURL
 366  	}
 367  
 368  	proto := req.Header.Get("X-Forwarded-Proto")
 369  	if proto == "" {
 370  		if req.TLS != nil {
 371  			proto = "https"
 372  		} else {
 373  			proto = "http"
 374  		}
 375  	}
 376  	host := req.Header.Get("X-Forwarded-Host")
 377  	if host == "" {
 378  		host = req.Host
 379  	}
 380  	return proto + "://" + host
 381  }
 382  
 383  func (s *Server) WebSocketURL(req *http.Request) (url string) {
 384  	proto := req.Header.Get("X-Forwarded-Proto")
 385  	if proto == "" {
 386  		if req.TLS != nil {
 387  			proto = "wss"
 388  		} else {
 389  			proto = "ws"
 390  		}
 391  	} else {
 392  		// Convert HTTP scheme to WebSocket scheme
 393  		if proto == "https" {
 394  			proto = "wss"
 395  		} else if proto == "http" {
 396  			proto = "ws"
 397  		}
 398  	}
 399  	host := req.Header.Get("X-Forwarded-Host")
 400  	if host == "" {
 401  		host = req.Host
 402  	}
 403  	return proto + "://" + strings.TrimRight(host, "/") + "/"
 404  }
 405  
 406  func (s *Server) DashboardURL(req *http.Request) (url string) {
 407  	return s.ServiceURL(req) + "/"
 408  }
 409  
 410  // UserInterface sets up a basic Nostr NDK interface that allows users to log into the relay user interface
 411  func (s *Server) UserInterface() {
 412  	if s.mux == nil {
 413  		s.mux = http.NewServeMux()
 414  	}
 415  
 416  	// If dev proxy is configured, initialize it
 417  	if s.Config != nil && s.Config.WebDisableEmbedded && s.Config.WebDevProxyURL != "" {
 418  		proxyURL := s.Config.WebDevProxyURL
 419  		// Add default scheme if missing to avoid: proxy error: unsupported protocol scheme ""
 420  		if !strings.Contains(proxyURL, "://") {
 421  			proxyURL = "http://" + proxyURL
 422  		}
 423  		if target, err := url.Parse(proxyURL); !chk.E(err) {
 424  			if target.Scheme == "" || target.Host == "" {
 425  				// invalid URL, disable proxy
 426  				log.Printf(
 427  					"invalid ORLY_WEB_DEV_PROXY_URL: %q — disabling dev proxy\n",
 428  					s.Config.WebDevProxyURL,
 429  				)
 430  			} else {
 431  				s.devProxy = httputil.NewSingleHostReverseProxy(target)
 432  				// Ensure Host header points to upstream for dev servers that care
 433  				origDirector := s.devProxy.Director
 434  				s.devProxy.Director = func(req *http.Request) {
 435  					origDirector(req)
 436  					req.Host = target.Host
 437  				}
 438  				// Suppress noisy "context canceled" errors from browser navigation
 439  				s.devProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
 440  					if r.Context().Err() == context.Canceled {
 441  						// Browser canceled the request - this is normal, don't log it
 442  						return
 443  					}
 444  					log.Printf("proxy error: %v", err)
 445  					http.Error(w, "Bad Gateway", http.StatusBadGateway)
 446  				}
 447  			}
 448  		}
 449  	}
 450  
 451  	// Initialize challenge storage if not already done
 452  	if s.challenges == nil {
 453  		s.challengeMutex.Lock()
 454  		s.challenges = make(map[string][]byte)
 455  		s.challengeMutex.Unlock()
 456  	}
 457  
 458  	// Serve favicon.ico by serving favicon.png
 459  	s.mux.HandleFunc("/favicon.ico", s.handleFavicon)
 460  
 461  	// Branding/white-label endpoints (custom assets, CSS, manifest)
 462  	s.mux.HandleFunc("/branding/", s.handleBrandingAsset)
 463  
 464  	// Intercept /orly.png to serve custom logo if branding is active
 465  	s.mux.HandleFunc("/orly.png", s.handleLogo)
 466  
 467  	// Serve the main login interface (and static assets) or proxy in dev mode
 468  	s.mux.HandleFunc("/", s.handleLoginInterface)
 469  
 470  	// API endpoints for authentication
 471  	s.mux.HandleFunc("/api/auth/challenge", s.handleAuthChallenge)
 472  	s.mux.HandleFunc("/api/auth/login", s.handleAuthLogin)
 473  	s.mux.HandleFunc("/api/auth/status", s.handleAuthStatus)
 474  	s.mux.HandleFunc("/api/auth/logout", s.handleAuthLogout)
 475  	s.mux.HandleFunc("/api/permissions/", s.handlePermissions)
 476  	s.mux.HandleFunc("/api/role", s.handleRole)
 477  	// Export endpoint
 478  	s.mux.HandleFunc("/api/export", s.handleExport)
 479  	// Events endpoints
 480  	s.mux.HandleFunc("/api/events/mine", s.handleEventsMine)
 481  	// Import endpoint (admin only)
 482  	s.mux.HandleFunc("/api/import", s.handleImport)
 483  	// Sprocket endpoints (owner only)
 484  	s.mux.HandleFunc("/api/sprocket/status", s.handleSprocketStatus)
 485  	s.mux.HandleFunc("/api/sprocket/update", s.handleSprocketUpdate)
 486  	s.mux.HandleFunc("/api/sprocket/restart", s.handleSprocketRestart)
 487  	s.mux.HandleFunc("/api/sprocket/versions", s.handleSprocketVersions)
 488  	s.mux.HandleFunc(
 489  		"/api/sprocket/delete-version", s.handleSprocketDeleteVersion,
 490  	)
 491  	s.mux.HandleFunc("/api/sprocket/config", s.handleSprocketConfig)
 492  	// NIP-86 management endpoint
 493  	s.mux.HandleFunc("/api/nip86", s.handleNIP86Management)
 494  	// ACL mode endpoint
 495  	s.mux.HandleFunc("/api/acl-mode", s.handleACLMode)
 496  	// Log viewer endpoints (owner only)
 497  	s.mux.HandleFunc("/api/logs", s.handleGetLogs)
 498  	s.mux.HandleFunc("/api/logs/clear", s.handleClearLogs)
 499  	s.mux.HandleFunc("/api/logs/level", s.handleLogLevel)
 500  
 501  	// Sync endpoints for distributed synchronization
 502  	if s.syncManager != nil {
 503  		s.mux.HandleFunc("/api/sync/current", s.handleSyncCurrent)
 504  		s.mux.HandleFunc("/api/sync/event-ids", s.handleSyncEventIDs)
 505  		log.Printf("Distributed sync API enabled at /api/sync")
 506  	}
 507  
 508  	// Blossom blob storage API endpoint
 509  	if s.blossomServer != nil {
 510  		// Primary routes under /blossom/
 511  		s.mux.HandleFunc("/blossom/", s.blossomHandler)
 512  		// Root-level routes for clients that expect blossom at root (like Jumble)
 513  		s.mux.HandleFunc("/upload", s.blossomRootHandler)
 514  		s.mux.HandleFunc("/list/", s.blossomRootHandler)
 515  		s.mux.HandleFunc("/media", s.blossomRootHandler)
 516  		s.mux.HandleFunc("/mirror", s.blossomRootHandler)
 517  		s.mux.HandleFunc("/report", s.blossomRootHandler)
 518  		log.Printf("Blossom blob storage API enabled at /blossom and root")
 519  	} else {
 520  		log.Printf("WARNING: Blossom server is nil, routes not registered")
 521  	}
 522  
 523  	// Cluster replication API endpoints
 524  	if s.clusterManager != nil {
 525  		s.mux.HandleFunc("/cluster/latest", s.clusterManager.HandleLatestSerial)
 526  		s.mux.HandleFunc("/cluster/events", s.clusterManager.HandleEventsRange)
 527  		log.Printf("Cluster replication API enabled at /cluster")
 528  	}
 529  
 530  	// WireGuard VPN and Bunker API endpoints
 531  	// These are always registered but will return errors if not enabled
 532  	s.mux.HandleFunc("/api/wireguard/config", s.handleWireGuardConfig)
 533  	s.mux.HandleFunc("/api/wireguard/regenerate", s.handleWireGuardRegenerate)
 534  	s.mux.HandleFunc("/api/wireguard/status", s.handleWireGuardStatus)
 535  	s.mux.HandleFunc("/api/wireguard/audit", s.handleWireGuardAudit)
 536  	s.mux.HandleFunc("/api/bunker/url", s.handleBunkerURL)
 537  	s.mux.HandleFunc("/api/bunker/info", s.handleBunkerInfo)
 538  
 539  	// NRC (Nostr Relay Connect) management endpoints
 540  	s.mux.HandleFunc("/api/nrc/connections", s.handleNRCConnectionsRouter)
 541  	s.mux.HandleFunc("/api/nrc/connections/", s.handleNRCConnectionsRouter)
 542  	s.mux.HandleFunc("/api/nrc/config", s.handleNRCConfig)
 543  
 544  	// Neo4j configuration status endpoint
 545  	s.mux.HandleFunc("/api/neo4j/config", s.handleNeo4jConfig)
 546  
 547  	// Neo4j Cypher query proxy (NIP-98 owner-gated)
 548  	s.mux.HandleFunc("/api/neo4j/cypher", s.handleNeo4jCypher)
 549  
 550  	// GrapeVine WoT influence scoring API (NIP-98 authenticated)
 551  	s.mux.HandleFunc("/api/grapevine/scores", s.handleGrapeVineScores)
 552  	s.mux.HandleFunc("/api/grapevine/score", s.handleGrapeVineScore)
 553  	s.mux.HandleFunc("/api/grapevine/recalculate", s.handleGrapeVineRecalculate)
 554  
 555  	// Email bridge compose and decrypt pages
 556  	if s.emailBridge != nil {
 557  		s.mux.HandleFunc("/compose", emailbridge.ComposeHandler())
 558  		s.mux.HandleFunc("/decrypt", emailbridge.DecryptHandler())
 559  		log.Printf("Email bridge compose/decrypt pages enabled at /compose and /decrypt")
 560  	}
 561  }
 562  
 563  // handleFavicon serves favicon.png as favicon.ico
 564  func (s *Server) handleFavicon(w http.ResponseWriter, r *http.Request) {
 565  	// In dev mode with proxy configured, forward to dev server
 566  	if s.devProxy != nil {
 567  		s.devProxy.ServeHTTP(w, r)
 568  		return
 569  	}
 570  
 571  	// If web UI is disabled without a proxy, return 404
 572  	if s.Config != nil && s.Config.WebDisableEmbedded {
 573  		http.NotFound(w, r)
 574  		return
 575  	}
 576  
 577  	// Check for custom branding favicon first
 578  	if s.brandingMgr != nil {
 579  		if data, mimeType, ok := s.brandingMgr.GetAsset("favicon"); ok {
 580  			w.Header().Set("Content-Type", mimeType)
 581  			w.Header().Set("Cache-Control", "public, max-age=86400")
 582  			w.Write(data)
 583  			return
 584  		}
 585  	}
 586  
 587  	// Serve favicon.png as favicon.ico from embedded web app
 588  	w.Header().Set("Content-Type", "image/png")
 589  	w.Header().Set("Cache-Control", "public, max-age=86400") // Cache for 1 day
 590  
 591  	// Create a request for favicon.png and serve it
 592  	faviconReq := &http.Request{
 593  		Method: "GET",
 594  		URL:    &url.URL{Path: "/favicon.png"},
 595  	}
 596  	ServeEmbeddedWeb(w, faviconReq)
 597  }
 598  
 599  // handleLogo serves the logo image, using custom branding if available
 600  func (s *Server) handleLogo(w http.ResponseWriter, r *http.Request) {
 601  	// In dev mode with proxy configured, forward to dev server
 602  	if s.devProxy != nil {
 603  		s.devProxy.ServeHTTP(w, r)
 604  		return
 605  	}
 606  
 607  	// Check for custom branding logo first
 608  	if s.brandingMgr != nil {
 609  		if data, mimeType, ok := s.brandingMgr.GetAsset("logo"); ok {
 610  			w.Header().Set("Content-Type", mimeType)
 611  			w.Header().Set("Cache-Control", "public, max-age=86400")
 612  			w.Write(data)
 613  			return
 614  		}
 615  	}
 616  
 617  	// Fall back to embedded orly.png
 618  	w.Header().Set("Content-Type", "image/png")
 619  	w.Header().Set("Cache-Control", "public, max-age=86400")
 620  	ServeEmbeddedWeb(w, r)
 621  }
 622  
 623  // handleLoginInterface serves the main user interface for login
 624  func (s *Server) handleLoginInterface(w http.ResponseWriter, r *http.Request) {
 625  	// In dev mode with proxy configured, forward to dev server
 626  	if s.devProxy != nil {
 627  		s.devProxy.ServeHTTP(w, r)
 628  		return
 629  	}
 630  
 631  	// If web UI is disabled without a proxy, return 404
 632  	if s.Config != nil && s.Config.WebDisableEmbedded {
 633  		http.NotFound(w, r)
 634  		return
 635  	}
 636  
 637  	// If branding is enabled and this is the index page, inject customizations
 638  	if s.brandingMgr != nil && (r.URL.Path == "/" || r.URL.Path == "/index.html") {
 639  		s.serveModifiedIndex(w, r)
 640  		return
 641  	}
 642  
 643  	// Serve embedded web interface
 644  	ServeEmbeddedWeb(w, r)
 645  }
 646  
 647  // serveModifiedIndex serves the index.html with branding modifications injected
 648  func (s *Server) serveModifiedIndex(w http.ResponseWriter, r *http.Request) {
 649  	// Read the embedded index.html
 650  	fs := GetReactAppFS()
 651  	file, err := fs.Open("index.html")
 652  	if err != nil {
 653  		// Fallback to embedded serving
 654  		ServeEmbeddedWeb(w, r)
 655  		return
 656  	}
 657  	defer file.Close()
 658  
 659  	originalHTML, err := io.ReadAll(file)
 660  	if err != nil {
 661  		ServeEmbeddedWeb(w, r)
 662  		return
 663  	}
 664  
 665  	// Apply branding modifications
 666  	modifiedHTML, err := s.brandingMgr.ModifyIndexHTML(originalHTML)
 667  	if err != nil {
 668  		ServeEmbeddedWeb(w, r)
 669  		return
 670  	}
 671  
 672  	w.Header().Set("Content-Type", "text/html; charset=utf-8")
 673  	w.Header().Set("Cache-Control", "no-cache")
 674  	w.Write(modifiedHTML)
 675  }
 676  
 677  // handleBrandingAsset serves custom branding assets (logo, icons, CSS, manifest)
 678  func (s *Server) handleBrandingAsset(w http.ResponseWriter, r *http.Request) {
 679  	// Extract asset name from path: /branding/logo.png -> logo.png
 680  	path := strings.TrimPrefix(r.URL.Path, "/branding/")
 681  
 682  	// If no branding manager, return 404
 683  	if s.brandingMgr == nil {
 684  		http.NotFound(w, r)
 685  		return
 686  	}
 687  
 688  	switch path {
 689  	case "custom.css":
 690  		// Serve combined custom CSS
 691  		css, err := s.brandingMgr.GetCustomCSS()
 692  		if err != nil {
 693  			http.NotFound(w, r)
 694  			return
 695  		}
 696  		w.Header().Set("Content-Type", "text/css; charset=utf-8")
 697  		w.Header().Set("Cache-Control", "public, max-age=3600")
 698  		w.Write(css)
 699  
 700  	case "manifest.json":
 701  		// Serve customized manifest.json
 702  		// First read the embedded manifest
 703  		fs := GetReactAppFS()
 704  		file, err := fs.Open("manifest.json")
 705  		if err != nil {
 706  			http.NotFound(w, r)
 707  			return
 708  		}
 709  		defer file.Close()
 710  
 711  		originalManifest, err := io.ReadAll(file)
 712  		if err != nil {
 713  			http.NotFound(w, r)
 714  			return
 715  		}
 716  
 717  		manifest, err := s.brandingMgr.GetManifest(originalManifest)
 718  		if err != nil {
 719  			// Fallback to original
 720  			w.Header().Set("Content-Type", "application/manifest+json")
 721  			w.Write(originalManifest)
 722  			return
 723  		}
 724  
 725  		w.Header().Set("Content-Type", "application/manifest+json")
 726  		w.Header().Set("Cache-Control", "public, max-age=3600")
 727  		w.Write(manifest)
 728  
 729  	case "logo.png":
 730  		s.serveBrandingAsset(w, "logo")
 731  
 732  	case "favicon.png":
 733  		s.serveBrandingAsset(w, "favicon")
 734  
 735  	case "icon-192.png":
 736  		s.serveBrandingAsset(w, "icon-192")
 737  
 738  	case "icon-512.png":
 739  		s.serveBrandingAsset(w, "icon-512")
 740  
 741  	default:
 742  		http.NotFound(w, r)
 743  	}
 744  }
 745  
 746  // serveBrandingAsset serves a specific branding asset by name
 747  func (s *Server) serveBrandingAsset(w http.ResponseWriter, name string) {
 748  	if s.brandingMgr == nil {
 749  		http.NotFound(w, nil)
 750  		return
 751  	}
 752  
 753  	data, mimeType, ok := s.brandingMgr.GetAsset(name)
 754  	if !ok {
 755  		http.NotFound(w, nil)
 756  		return
 757  	}
 758  
 759  	w.Header().Set("Content-Type", mimeType)
 760  	w.Header().Set("Cache-Control", "public, max-age=86400")
 761  	w.Write(data)
 762  }
 763  
 764  // handleAuthChallenge generates a new authentication challenge
 765  func (s *Server) handleAuthChallenge(w http.ResponseWriter, r *http.Request) {
 766  	if r.Method != http.MethodGet {
 767  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 768  		return
 769  	}
 770  
 771  	w.Header().Set("Content-Type", "application/json")
 772  
 773  	// Generate a new challenge
 774  	challenge := auth.GenerateChallenge()
 775  	challengeHex := hex.Enc(challenge)
 776  
 777  	// Store the challenge with expiration (5 minutes)
 778  	s.challengeMutex.Lock()
 779  	if s.challenges == nil {
 780  		s.challenges = make(map[string][]byte)
 781  	}
 782  	s.challenges[challengeHex] = challenge
 783  	s.challengeMutex.Unlock()
 784  
 785  	// Clean up expired challenges
 786  	go func() {
 787  		time.Sleep(5 * time.Minute)
 788  		s.challengeMutex.Lock()
 789  		delete(s.challenges, challengeHex)
 790  		s.challengeMutex.Unlock()
 791  	}()
 792  
 793  	// Return the challenge
 794  	response := struct {
 795  		Challenge string `json:"challenge"`
 796  	}{
 797  		Challenge: challengeHex,
 798  	}
 799  
 800  	jsonData, err := json.Marshal(response)
 801  	if chk.E(err) {
 802  		http.Error(
 803  			w, "Error generating challenge", http.StatusInternalServerError,
 804  		)
 805  		return
 806  	}
 807  
 808  	w.Write(jsonData)
 809  }
 810  
 811  // handleAuthLogin processes authentication requests
 812  func (s *Server) handleAuthLogin(w http.ResponseWriter, r *http.Request) {
 813  	if r.Method != http.MethodPost {
 814  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 815  		return
 816  	}
 817  
 818  	w.Header().Set("Content-Type", "application/json")
 819  
 820  	// Read the request body
 821  	body, err := io.ReadAll(r.Body)
 822  	if chk.E(err) {
 823  		w.Write([]byte(`{"success": false, "error": "Failed to read request body"}`))
 824  		return
 825  	}
 826  
 827  	// Parse the signed event
 828  	var evt event.E
 829  	if err = json.Unmarshal(body, &evt); chk.E(err) {
 830  		w.Write([]byte(`{"success": false, "error": "Invalid event format"}`))
 831  		return
 832  	}
 833  
 834  	// Extract the challenge from the event to look up the stored challenge
 835  	challengeTag := evt.Tags.GetFirst([]byte("challenge"))
 836  	if challengeTag == nil {
 837  		w.Write([]byte(`{"success": false, "error": "Challenge tag missing from event"}`))
 838  		return
 839  	}
 840  
 841  	challengeHex := string(challengeTag.Value())
 842  
 843  	// Retrieve the stored challenge
 844  	s.challengeMutex.RLock()
 845  	_, exists := s.challenges[challengeHex]
 846  	s.challengeMutex.RUnlock()
 847  
 848  	if !exists {
 849  		w.Write([]byte(`{"success": false, "error": "Invalid or expired challenge"}`))
 850  		return
 851  	}
 852  
 853  	// Clean up the used challenge
 854  	s.challengeMutex.Lock()
 855  	delete(s.challenges, challengeHex)
 856  	s.challengeMutex.Unlock()
 857  
 858  	relayURL := s.WebSocketURL(r)
 859  
 860  	// Validate the authentication event with the correct challenge
 861  	// The challenge in the event tag is hex-encoded, so we need to pass the hex string as bytes
 862  	ok, err := auth.Validate(&evt, []byte(challengeHex), relayURL)
 863  	if chk.E(err) || !ok {
 864  		errorMsg := "Authentication validation failed"
 865  		if err != nil {
 866  			errorMsg = err.Error()
 867  		}
 868  		w.Write([]byte(`{"success": false, "error": "` + errorMsg + `"}`))
 869  		return
 870  	}
 871  
 872  	// Authentication successful: set a simple session cookie with the pubkey
 873  	cookie := &http.Cookie{
 874  		Name:     "orly_auth",
 875  		Value:    hex.Enc(evt.Pubkey),
 876  		Path:     "/",
 877  		HttpOnly: true,
 878  		SameSite: http.SameSiteLaxMode,
 879  		MaxAge:   60 * 60 * 24 * 30, // 30 days
 880  	}
 881  	http.SetCookie(w, cookie)
 882  
 883  	w.Write([]byte(`{"success": true}`))
 884  }
 885  
 886  // handleAuthStatus checks if the user is authenticated
 887  func (s *Server) handleAuthStatus(w http.ResponseWriter, r *http.Request) {
 888  	if r.Method != http.MethodGet {
 889  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 890  		return
 891  	}
 892  
 893  	w.Header().Set("Content-Type", "application/json")
 894  
 895  	// Check for auth cookie
 896  	c, err := r.Cookie("orly_auth")
 897  	if err != nil || c.Value == "" {
 898  		w.Write([]byte(`{"authenticated": false}`))
 899  		return
 900  	}
 901  
 902  	// Validate the pubkey format
 903  	pubkey, err := hex.Dec(c.Value)
 904  	if chk.E(err) {
 905  		w.Write([]byte(`{"authenticated": false}`))
 906  		return
 907  	}
 908  
 909  	// Get user permissions
 910  	permission := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
 911  
 912  	response := struct {
 913  		Authenticated bool   `json:"authenticated"`
 914  		Pubkey        string `json:"pubkey"`
 915  		Permission    string `json:"permission"`
 916  	}{
 917  		Authenticated: true,
 918  		Pubkey:        c.Value,
 919  		Permission:    permission,
 920  	}
 921  
 922  	jsonData, err := json.Marshal(response)
 923  	if chk.E(err) {
 924  		w.Write([]byte(`{"authenticated": false}`))
 925  		return
 926  	}
 927  
 928  	w.Write(jsonData)
 929  }
 930  
 931  // handleAuthLogout clears the authentication cookie
 932  func (s *Server) handleAuthLogout(w http.ResponseWriter, r *http.Request) {
 933  	if r.Method != http.MethodPost {
 934  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 935  		return
 936  	}
 937  
 938  	w.Header().Set("Content-Type", "application/json")
 939  
 940  	// Clear the auth cookie
 941  	cookie := &http.Cookie{
 942  		Name:     "orly_auth",
 943  		Value:    "",
 944  		Path:     "/",
 945  		HttpOnly: true,
 946  		SameSite: http.SameSiteLaxMode,
 947  		MaxAge:   -1, // Expire immediately
 948  	}
 949  	http.SetCookie(w, cookie)
 950  
 951  	w.Write([]byte(`{"success": true}`))
 952  }
 953  
 954  // handlePermissions returns the permission level for a given pubkey
 955  func (s *Server) handlePermissions(w http.ResponseWriter, r *http.Request) {
 956  	if r.Method != http.MethodGet {
 957  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 958  		return
 959  	}
 960  
 961  	// Extract pubkey from URL path
 962  	pubkeyHex := strings.TrimPrefix(r.URL.Path, "/api/permissions/")
 963  	if pubkeyHex == "" || pubkeyHex == "/" {
 964  		http.Error(w, "Invalid pubkey", http.StatusBadRequest)
 965  		return
 966  	}
 967  
 968  	// Convert hex to binary pubkey
 969  	pubkey, err := hex.Dec(pubkeyHex)
 970  	if chk.E(err) {
 971  		http.Error(w, "Invalid pubkey format", http.StatusBadRequest)
 972  		return
 973  	}
 974  
 975  	// Get access level using acl registry
 976  	permission := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
 977  
 978  	// Set content type and write JSON response
 979  	w.Header().Set("Content-Type", "application/json")
 980  
 981  	// Format response as proper JSON
 982  	response := struct {
 983  		Permission string `json:"permission"`
 984  	}{
 985  		Permission: permission,
 986  	}
 987  
 988  	// Marshal and write the response
 989  	jsonData, err := json.Marshal(response)
 990  	if chk.E(err) {
 991  		http.Error(
 992  			w, "Error generating response", http.StatusInternalServerError,
 993  		)
 994  		return
 995  	}
 996  
 997  	w.Write(jsonData)
 998  }
 999  
1000  // handleRole returns the caller's access level using NIP-98 authentication.
1001  // Used by the smesh admin UI to determine whether to show admin controls.
1002  func (s *Server) handleRole(w http.ResponseWriter, r *http.Request) {
1003  	if r.Method != http.MethodGet {
1004  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1005  		return
1006  	}
1007  
1008  	w.Header().Set("Content-Type", "application/json")
1009  
1010  	valid, pubkey, err := httpauth.CheckAuth(r)
1011  	if chk.E(err) || !valid {
1012  		w.Write([]byte(`{"role":""}`))
1013  		return
1014  	}
1015  
1016  	role := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
1017  	jsonData, err := json.Marshal(struct {
1018  		Role string `json:"role"`
1019  	}{Role: role})
1020  	if chk.E(err) {
1021  		w.Write([]byte(`{"role":""}`))
1022  		return
1023  	}
1024  	w.Write(jsonData)
1025  }
1026  
1027  // handleExport streams events as JSONL (NDJSON) using NIP-98 authentication.
1028  // Supports both GET (query params) and POST (JSON body) for pubkey filtering.
1029  func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) {
1030  	if r.Method != http.MethodGet && r.Method != http.MethodPost {
1031  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1032  		return
1033  	}
1034  
1035  	// Skip authentication and permission checks when ACL is "none" (open relay mode)
1036  	if acl.Registry.GetMode() != "none" {
1037  		// Validate NIP-98 authentication
1038  		valid, pubkey, err := httpauth.CheckAuth(r)
1039  		if chk.E(err) || !valid {
1040  			errorMsg := "NIP-98 authentication validation failed"
1041  			if err != nil {
1042  				errorMsg = err.Error()
1043  			}
1044  			http.Error(w, errorMsg, http.StatusUnauthorized)
1045  			return
1046  		}
1047  
1048  		// Check permissions - require write, admin, or owner level
1049  		accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
1050  		if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" {
1051  			http.Error(
1052  				w, "Write, admin, or owner permission required",
1053  				http.StatusForbidden,
1054  			)
1055  			return
1056  		}
1057  	}
1058  
1059  	// Parse pubkeys from request
1060  	var pks [][]byte
1061  
1062  	if r.Method == http.MethodPost {
1063  		// Parse JSON body for pubkeys
1064  		var requestBody struct {
1065  			Pubkeys []string `json:"pubkeys"`
1066  		}
1067  
1068  		if err := json.NewDecoder(r.Body).Decode(&requestBody); err == nil {
1069  			// If JSON parsing succeeds, use pubkeys from body
1070  			for _, pkHex := range requestBody.Pubkeys {
1071  				if pkHex == "" {
1072  					continue
1073  				}
1074  				if pk, err := hex.Dec(pkHex); !chk.E(err) {
1075  					pks = append(pks, pk)
1076  				}
1077  			}
1078  		}
1079  		// If JSON parsing fails, fall back to empty pubkeys (export all)
1080  	} else {
1081  		// GET method - parse query parameters
1082  		q := r.URL.Query()
1083  		for _, pkHex := range q["pubkey"] {
1084  			if pkHex == "" {
1085  				continue
1086  			}
1087  			if pk, err := hex.Dec(pkHex); !chk.E(err) {
1088  				pks = append(pks, pk)
1089  			}
1090  		}
1091  	}
1092  
1093  	// Determine filename based on whether filtering by pubkeys
1094  	var filename string
1095  	if len(pks) == 0 {
1096  		filename = "all-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl"
1097  	} else if len(pks) == 1 {
1098  		filename = "my-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl"
1099  	} else {
1100  		filename = "filtered-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl"
1101  	}
1102  
1103  	w.Header().Set("Content-Type", "application/x-ndjson")
1104  	w.Header().Set(
1105  		"Content-Disposition", "attachment; filename=\""+filename+"\"",
1106  	)
1107  	w.Header().Set("X-Content-Type-Options", "nosniff")
1108  
1109  	// Flush headers to start streaming immediately
1110  	if flusher, ok := w.(http.Flusher); ok {
1111  		flusher.Flush()
1112  	}
1113  
1114  	// Stream export
1115  	s.DB.Export(s.Ctx, w, pks...)
1116  }
1117  
1118  // handleEventsMine returns the authenticated user's events in JSON format with pagination using NIP-98 authentication.
1119  func (s *Server) handleEventsMine(w http.ResponseWriter, r *http.Request) {
1120  	if r.Method != http.MethodGet {
1121  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1122  		return
1123  	}
1124  
1125  	// Validate NIP-98 authentication
1126  	valid, pubkey, err := httpauth.CheckAuth(r)
1127  	if chk.E(err) || !valid {
1128  		errorMsg := "NIP-98 authentication validation failed"
1129  		if err != nil {
1130  			errorMsg = err.Error()
1131  		}
1132  		http.Error(w, errorMsg, http.StatusUnauthorized)
1133  		return
1134  	}
1135  
1136  	// Parse pagination parameters
1137  	query := r.URL.Query()
1138  	limit := 50 // default limit
1139  	if l := query.Get("limit"); l != "" {
1140  		if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 100 {
1141  			limit = parsed
1142  		}
1143  	}
1144  
1145  	offset := 0
1146  	if o := query.Get("offset"); o != "" {
1147  		if parsed, err := strconv.Atoi(o); err == nil && parsed >= 0 {
1148  			offset = parsed
1149  		}
1150  	}
1151  
1152  	// Use QueryEvents with filter for this user's events
1153  	f := &filter.F{
1154  		Authors: tag.NewFromBytesSlice(pubkey),
1155  	}
1156  
1157  	log.Printf("DEBUG: Querying events for pubkey: %s", hex.Enc(pubkey))
1158  	events, err := s.DB.QueryEvents(s.Ctx, f)
1159  	if chk.E(err) {
1160  		log.Printf("DEBUG: QueryEvents failed: %v", err)
1161  		http.Error(w, "Failed to query events", http.StatusInternalServerError)
1162  		return
1163  	}
1164  	log.Printf("DEBUG: QueryEvents returned %d events", len(events))
1165  
1166  	// Apply pagination
1167  	totalEvents := len(events)
1168  	if offset >= totalEvents {
1169  		events = event.S{} // Empty slice
1170  	} else {
1171  		end := offset + limit
1172  		if end > totalEvents {
1173  			end = totalEvents
1174  		}
1175  		events = events[offset:end]
1176  	}
1177  
1178  	// Set content type and write JSON response
1179  	w.Header().Set("Content-Type", "application/json")
1180  
1181  	// Format response as proper JSON
1182  	response := struct {
1183  		Events []*event.E `json:"events"`
1184  		Total  int        `json:"total"`
1185  		Limit  int        `json:"limit"`
1186  		Offset int        `json:"offset"`
1187  	}{
1188  		Events: events,
1189  		Total:  totalEvents,
1190  		Limit:  limit,
1191  		Offset: offset,
1192  	}
1193  
1194  	// Marshal and write the response
1195  	jsonData, err := json.Marshal(response)
1196  	if chk.E(err) {
1197  		http.Error(
1198  			w, "Error generating response", http.StatusInternalServerError,
1199  		)
1200  		return
1201  	}
1202  
1203  	w.Write(jsonData)
1204  }
1205  
1206  // handleImport receives a JSONL/NDJSON file or body and enqueues an async import using NIP-98 authentication. Admins only.
1207  func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
1208  	if r.Method != http.MethodPost {
1209  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1210  		return
1211  	}
1212  
1213  	// Skip authentication and permission checks when ACL is "none" (open relay mode)
1214  	if acl.Registry.GetMode() != "none" {
1215  		// Validate NIP-98 authentication
1216  		valid, pubkey, err := httpauth.CheckAuth(r)
1217  		if chk.E(err) || !valid {
1218  			errorMsg := "NIP-98 authentication validation failed"
1219  			if err != nil {
1220  				errorMsg = err.Error()
1221  			}
1222  			http.Error(w, errorMsg, http.StatusUnauthorized)
1223  			return
1224  		}
1225  
1226  		// Check permissions - require admin or owner level
1227  		accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
1228  		if accessLevel != "admin" && accessLevel != "owner" {
1229  			http.Error(
1230  				w, "Admin or owner permission required", http.StatusForbidden,
1231  			)
1232  			return
1233  		}
1234  	}
1235  
1236  	ct := r.Header.Get("Content-Type")
1237  	if strings.HasPrefix(ct, "multipart/form-data") {
1238  		if err := r.ParseMultipartForm(32 << 20); chk.E(err) { // 32MB memory, rest to temp files
1239  			http.Error(w, "Failed to parse form", http.StatusBadRequest)
1240  			return
1241  		}
1242  		file, _, err := r.FormFile("file")
1243  		if chk.E(err) {
1244  			http.Error(w, "Missing file", http.StatusBadRequest)
1245  			return
1246  		}
1247  		defer file.Close()
1248  		s.DB.Import(file)
1249  	} else {
1250  		if r.Body == nil {
1251  			http.Error(w, "Empty request body", http.StatusBadRequest)
1252  			return
1253  		}
1254  		s.DB.Import(r.Body)
1255  	}
1256  
1257  	w.Header().Set("Content-Type", "application/json")
1258  	w.WriteHeader(http.StatusAccepted)
1259  	w.Write([]byte(`{"success": true, "message": "Import started"}`))
1260  }
1261  
1262  // handleSprocketStatus returns the current status of the sprocket script
1263  func (s *Server) handleSprocketStatus(w http.ResponseWriter, r *http.Request) {
1264  	if r.Method != http.MethodGet {
1265  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1266  		return
1267  	}
1268  
1269  	// Validate NIP-98 authentication
1270  	valid, pubkey, err := httpauth.CheckAuth(r)
1271  	if chk.E(err) || !valid {
1272  		errorMsg := "NIP-98 authentication validation failed"
1273  		if err != nil {
1274  			errorMsg = err.Error()
1275  		}
1276  		http.Error(w, errorMsg, http.StatusUnauthorized)
1277  		return
1278  	}
1279  
1280  	// Check permissions - require owner level
1281  	accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
1282  	if accessLevel != "owner" {
1283  		http.Error(w, "Owner permission required", http.StatusForbidden)
1284  		return
1285  	}
1286  
1287  	status := s.sprocketManager.GetSprocketStatus()
1288  
1289  	w.Header().Set("Content-Type", "application/json")
1290  	jsonData, err := json.Marshal(status)
1291  	if chk.E(err) {
1292  		http.Error(
1293  			w, "Error generating response", http.StatusInternalServerError,
1294  		)
1295  		return
1296  	}
1297  
1298  	w.Write(jsonData)
1299  }
1300  
1301  // handleSprocketUpdate updates the sprocket script and restarts it
1302  func (s *Server) handleSprocketUpdate(w http.ResponseWriter, r *http.Request) {
1303  	if r.Method != http.MethodPost {
1304  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1305  		return
1306  	}
1307  
1308  	// Validate NIP-98 authentication
1309  	valid, pubkey, err := httpauth.CheckAuth(r)
1310  	if chk.E(err) || !valid {
1311  		errorMsg := "NIP-98 authentication validation failed"
1312  		if err != nil {
1313  			errorMsg = err.Error()
1314  		}
1315  		http.Error(w, errorMsg, http.StatusUnauthorized)
1316  		return
1317  	}
1318  
1319  	// Check permissions - require owner level
1320  	accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
1321  	if accessLevel != "owner" {
1322  		http.Error(w, "Owner permission required", http.StatusForbidden)
1323  		return
1324  	}
1325  
1326  	// Read the request body
1327  	body, err := io.ReadAll(r.Body)
1328  	if chk.E(err) {
1329  		http.Error(w, "Failed to read request body", http.StatusBadRequest)
1330  		return
1331  	}
1332  
1333  	// Update the sprocket script
1334  	if err := s.sprocketManager.UpdateSprocket(string(body)); chk.E(err) {
1335  		http.Error(
1336  			w, fmt.Sprintf("Failed to update sprocket: %v", err),
1337  			http.StatusInternalServerError,
1338  		)
1339  		return
1340  	}
1341  
1342  	w.Header().Set("Content-Type", "application/json")
1343  	w.Write([]byte(`{"success": true, "message": "Sprocket updated successfully"}`))
1344  }
1345  
1346  // handleSprocketRestart restarts the sprocket script
1347  func (s *Server) handleSprocketRestart(w http.ResponseWriter, r *http.Request) {
1348  	if r.Method != http.MethodPost {
1349  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1350  		return
1351  	}
1352  
1353  	// Validate NIP-98 authentication
1354  	valid, pubkey, err := httpauth.CheckAuth(r)
1355  	if chk.E(err) || !valid {
1356  		errorMsg := "NIP-98 authentication validation failed"
1357  		if err != nil {
1358  			errorMsg = err.Error()
1359  		}
1360  		http.Error(w, errorMsg, http.StatusUnauthorized)
1361  		return
1362  	}
1363  
1364  	// Check permissions - require owner level
1365  	accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
1366  	if accessLevel != "owner" {
1367  		http.Error(w, "Owner permission required", http.StatusForbidden)
1368  		return
1369  	}
1370  
1371  	// Restart the sprocket script
1372  	if err := s.sprocketManager.RestartSprocket(); chk.E(err) {
1373  		http.Error(
1374  			w, fmt.Sprintf("Failed to restart sprocket: %v", err),
1375  			http.StatusInternalServerError,
1376  		)
1377  		return
1378  	}
1379  
1380  	w.Header().Set("Content-Type", "application/json")
1381  	w.Write([]byte(`{"success": true, "message": "Sprocket restarted successfully"}`))
1382  }
1383  
1384  // handleSprocketVersions returns all sprocket script versions
1385  func (s *Server) handleSprocketVersions(
1386  	w http.ResponseWriter, r *http.Request,
1387  ) {
1388  	if r.Method != http.MethodGet {
1389  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1390  		return
1391  	}
1392  
1393  	// Validate NIP-98 authentication
1394  	valid, pubkey, err := httpauth.CheckAuth(r)
1395  	if chk.E(err) || !valid {
1396  		errorMsg := "NIP-98 authentication validation failed"
1397  		if err != nil {
1398  			errorMsg = err.Error()
1399  		}
1400  		http.Error(w, errorMsg, http.StatusUnauthorized)
1401  		return
1402  	}
1403  
1404  	// Check permissions - require owner level
1405  	accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
1406  	if accessLevel != "owner" {
1407  		http.Error(w, "Owner permission required", http.StatusForbidden)
1408  		return
1409  	}
1410  
1411  	versions, err := s.sprocketManager.GetSprocketVersions()
1412  	if chk.E(err) {
1413  		http.Error(
1414  			w, fmt.Sprintf("Failed to get sprocket versions: %v", err),
1415  			http.StatusInternalServerError,
1416  		)
1417  		return
1418  	}
1419  
1420  	w.Header().Set("Content-Type", "application/json")
1421  	jsonData, err := json.Marshal(versions)
1422  	if chk.E(err) {
1423  		http.Error(
1424  			w, "Error generating response", http.StatusInternalServerError,
1425  		)
1426  		return
1427  	}
1428  
1429  	w.Write(jsonData)
1430  }
1431  
1432  // handleSprocketDeleteVersion deletes a specific sprocket version
1433  func (s *Server) handleSprocketDeleteVersion(
1434  	w http.ResponseWriter, r *http.Request,
1435  ) {
1436  	if r.Method != http.MethodPost {
1437  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1438  		return
1439  	}
1440  
1441  	// Validate NIP-98 authentication
1442  	valid, pubkey, err := httpauth.CheckAuth(r)
1443  	if chk.E(err) || !valid {
1444  		errorMsg := "NIP-98 authentication validation failed"
1445  		if err != nil {
1446  			errorMsg = err.Error()
1447  		}
1448  		http.Error(w, errorMsg, http.StatusUnauthorized)
1449  		return
1450  	}
1451  
1452  	// Check permissions - require owner level
1453  	accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
1454  	if accessLevel != "owner" {
1455  		http.Error(w, "Owner permission required", http.StatusForbidden)
1456  		return
1457  	}
1458  
1459  	// Read the request body
1460  	body, err := io.ReadAll(r.Body)
1461  	if chk.E(err) {
1462  		http.Error(w, "Failed to read request body", http.StatusBadRequest)
1463  		return
1464  	}
1465  
1466  	var request struct {
1467  		Filename string `json:"filename"`
1468  	}
1469  	if err := json.Unmarshal(body, &request); chk.E(err) {
1470  		http.Error(w, "Invalid JSON in request body", http.StatusBadRequest)
1471  		return
1472  	}
1473  
1474  	if request.Filename == "" {
1475  		http.Error(w, "Filename is required", http.StatusBadRequest)
1476  		return
1477  	}
1478  
1479  	// Delete the sprocket version
1480  	if err := s.sprocketManager.DeleteSprocketVersion(request.Filename); chk.E(err) {
1481  		http.Error(
1482  			w, fmt.Sprintf("Failed to delete sprocket version: %v", err),
1483  			http.StatusInternalServerError,
1484  		)
1485  		return
1486  	}
1487  
1488  	w.Header().Set("Content-Type", "application/json")
1489  	w.Write([]byte(`{"success": true, "message": "Sprocket version deleted successfully"}`))
1490  }
1491  
1492  // handleSprocketConfig returns the sprocket configuration status
1493  func (s *Server) handleSprocketConfig(w http.ResponseWriter, r *http.Request) {
1494  	if r.Method != http.MethodGet {
1495  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1496  		return
1497  	}
1498  
1499  	w.Header().Set("Content-Type", "application/json")
1500  
1501  	response := struct {
1502  		Enabled bool `json:"enabled"`
1503  	}{
1504  		Enabled: s.Config.SprocketEnabled,
1505  	}
1506  
1507  	jsonData, err := json.Marshal(response)
1508  	if chk.E(err) {
1509  		http.Error(
1510  			w, "Error generating response", http.StatusInternalServerError,
1511  		)
1512  		return
1513  	}
1514  
1515  	w.Write(jsonData)
1516  }
1517  
1518  // handleACLMode returns the current ACL mode
1519  func (s *Server) handleACLMode(w http.ResponseWriter, r *http.Request) {
1520  	if r.Method != http.MethodGet {
1521  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1522  		return
1523  	}
1524  
1525  	w.Header().Set("Content-Type", "application/json")
1526  
1527  	response := struct {
1528  		ACLMode string `json:"acl_mode"`
1529  	}{
1530  		ACLMode: acl.Registry.Type(),
1531  	}
1532  
1533  	jsonData, err := json.Marshal(response)
1534  	if chk.E(err) {
1535  		http.Error(
1536  			w, "Error generating response", http.StatusInternalServerError,
1537  		)
1538  		return
1539  	}
1540  
1541  	w.Write(jsonData)
1542  }
1543  
1544  // handleSyncCurrent handles requests for the current serial number
1545  func (s *Server) handleSyncCurrent(w http.ResponseWriter, r *http.Request) {
1546  	if s.syncManager == nil {
1547  		http.Error(
1548  			w, "Sync manager not initialized", http.StatusServiceUnavailable,
1549  		)
1550  		return
1551  	}
1552  
1553  	// Validate NIP-98 authentication and check peer authorization
1554  	if !s.validatePeerRequest(w, r) {
1555  		return
1556  	}
1557  
1558  	s.syncManager.HandleCurrentRequest(w, r)
1559  }
1560  
1561  // handleSyncEventIDs handles requests for event IDs with their serial numbers
1562  func (s *Server) handleSyncEventIDs(w http.ResponseWriter, r *http.Request) {
1563  	if s.syncManager == nil {
1564  		http.Error(
1565  			w, "Sync manager not initialized", http.StatusServiceUnavailable,
1566  		)
1567  		return
1568  	}
1569  
1570  	// Validate NIP-98 authentication and check peer authorization
1571  	if !s.validatePeerRequest(w, r) {
1572  		return
1573  	}
1574  
1575  	s.syncManager.HandleEventIDsRequest(w, r)
1576  }
1577  
1578  // validatePeerRequest validates NIP-98 authentication and checks if the requesting peer is authorized
1579  func (s *Server) validatePeerRequest(
1580  	w http.ResponseWriter, r *http.Request,
1581  ) bool {
1582  	// Validate NIP-98 authentication
1583  	valid, pubkey, err := httpauth.CheckAuth(r)
1584  	if err != nil {
1585  		log.Printf("NIP-98 auth validation error: %v", err)
1586  		http.Error(
1587  			w, "Authentication validation failed", http.StatusUnauthorized,
1588  		)
1589  		return false
1590  	}
1591  	if !valid {
1592  		http.Error(w, "NIP-98 authentication required", http.StatusUnauthorized)
1593  		return false
1594  	}
1595  
1596  	if s.syncManager == nil {
1597  		log.Printf("Sync manager not available for peer validation")
1598  		http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
1599  		return false
1600  	}
1601  
1602  	// Extract the relay URL from the request (this should be in the request body)
1603  	// For now, we'll check against all configured peers
1604  	peerPubkeyHex := hex.Enc(pubkey)
1605  
1606  	// Check if this pubkey matches any of our configured peer relays' NIP-11 pubkeys
1607  	for _, peerURL := range s.syncManager.GetPeers() {
1608  		if s.syncManager.IsAuthorizedPeer(peerURL, peerPubkeyHex) {
1609  			// Also update ACL to grant admin access to this peer pubkey
1610  			s.updatePeerAdminACL(pubkey)
1611  			return true
1612  		}
1613  	}
1614  
1615  	log.Printf("Unauthorized sync request from pubkey: %s", peerPubkeyHex)
1616  	http.Error(w, "Unauthorized peer", http.StatusForbidden)
1617  	return false
1618  }
1619  
1620  // updatePeerAdminACL grants admin access to peer relay identity pubkeys
1621  func (s *Server) updatePeerAdminACL(peerPubkey []byte) {
1622  	// Find the managed ACL instance and update peer admins
1623  	for _, aclInstance := range acl.Registry.ACLs() {
1624  		if aclInstance.Type() == "managed" {
1625  			if managed, ok := aclInstance.(*acl.Managed); ok {
1626  				// Collect all current peer pubkeys
1627  				var peerPubkeys [][]byte
1628  				for _, peerURL := range s.syncManager.GetPeers() {
1629  					if pubkey, err := s.syncManager.GetPeerPubkey(peerURL); err == nil {
1630  						peerPubkeys = append(peerPubkeys, []byte(pubkey))
1631  					}
1632  				}
1633  				managed.UpdatePeerAdmins(peerPubkeys)
1634  				break
1635  			}
1636  		}
1637  	}
1638  }
1639  
1640  // =============================================================================
1641  // Event Service Initialization
1642  // =============================================================================
1643  
1644  // InitEventServices initializes the domain services for event handling.
1645  // This should be called after the Server is created but before accepting connections.
1646  func (s *Server) InitEventServices() {
1647  	// Initialize validation service
1648  	s.eventValidator = validation.NewWithConfig(&validation.Config{
1649  		MaxFutureSeconds: 3600, // 1 hour
1650  	})
1651  
1652  	// Initialize authorization service
1653  	authCfg := &authorization.Config{
1654  		AuthRequired:    s.Config.AuthRequired,
1655  		AuthToWrite:     s.Config.AuthToWrite,
1656  		NIP46BypassAuth: s.Config.NIP46BypassAuth,
1657  		Admins:          s.Admins,
1658  		Owners:          s.Owners,
1659  	}
1660  	s.eventAuthorizer = authorization.New(
1661  		authCfg,
1662  		s.wrapAuthACLRegistry(),
1663  		s.wrapAuthPolicyManager(),
1664  		s.wrapAuthSyncManager(),
1665  	)
1666  
1667  	// Initialize router with handlers for special event kinds
1668  	s.eventRouter = routing.New()
1669  
1670  	// Register ephemeral event handler (kinds 20000-29999)
1671  	s.eventRouter.RegisterKindCheck(
1672  		"ephemeral",
1673  		routing.IsEphemeral,
1674  		routing.MakeEphemeralHandler(s.publishers),
1675  	)
1676  
1677  	// Initialize processing service
1678  	procCfg := &processing.Config{
1679  		Admins:       s.Admins,
1680  		Owners:       s.Owners,
1681  		WriteTimeout: 30 * time.Second,
1682  	}
1683  	s.eventProcessor = processing.New(procCfg, s.wrapDB(), s.publishers)
1684  
1685  	// Wire up optional dependencies to processing service
1686  	if s.rateLimiter != nil {
1687  		s.eventProcessor.SetRateLimiter(s.wrapRateLimiter())
1688  	}
1689  	if s.syncManager != nil {
1690  		s.eventProcessor.SetSyncManager(s.wrapSyncManager())
1691  	}
1692  	if s.relayGroupMgr != nil {
1693  		s.eventProcessor.SetRelayGroupManager(s.wrapRelayGroupManager())
1694  	}
1695  	if s.clusterManager != nil {
1696  		s.eventProcessor.SetClusterManager(s.wrapClusterManager())
1697  	}
1698  	s.eventProcessor.SetACLRegistry(s.wrapACLRegistry())
1699  
1700  	// Initialize domain event dispatcher
1701  	s.eventDispatcher = domainevents.NewDispatcher(domainevents.DefaultDispatcherConfig())
1702  
1703  	// Register logging subscriber for analytics
1704  	logLevel := "debug"
1705  	if s.Config.LogLevel == "trace" {
1706  		logLevel = "trace"
1707  	} else if s.Config.LogLevel == "info" || s.Config.LogLevel == "warn" || s.Config.LogLevel == "error" {
1708  		logLevel = "info"
1709  	}
1710  	s.eventDispatcher.Subscribe(subscribers.NewLoggingSubscriber(logLevel))
1711  
1712  	// Wire dispatcher to processing service
1713  	s.eventProcessor.SetEventDispatcher(s.eventDispatcher)
1714  
1715  	// Initialize special kinds registry and register handlers
1716  	s.specialKinds = specialkinds.NewRegistry()
1717  	s.registerSpecialKindHandlers()
1718  
1719  	// Initialize ingestion service
1720  	s.ingestionService = ingestion.NewService(
1721  		s.eventValidator,
1722  		s.eventAuthorizer,
1723  		s.eventRouter,
1724  		s.eventProcessor,
1725  		ingestion.Config{
1726  			SprocketChecker: s.wrapSprocketChecker(),
1727  			SpecialKinds:    s.specialKinds,
1728  			ACLMode:         acl.Registry.GetMode,
1729  		},
1730  	)
1731  }
1732  
1733  // SprocketChecker wrapper for ingestion.SprocketChecker interface
1734  type sprocketCheckerWrapper struct {
1735  	sm *SprocketManager
1736  }
1737  
1738  func (s *Server) wrapSprocketChecker() ingestion.SprocketChecker {
1739  	if s.sprocketManager == nil {
1740  		return nil
1741  	}
1742  	return &sprocketCheckerWrapper{sm: s.sprocketManager}
1743  }
1744  
1745  func (w *sprocketCheckerWrapper) IsEnabled() bool {
1746  	return w.sm != nil && w.sm.IsEnabled()
1747  }
1748  
1749  func (w *sprocketCheckerWrapper) IsDisabled() bool {
1750  	return w.sm.IsDisabled()
1751  }
1752  
1753  func (w *sprocketCheckerWrapper) IsRunning() bool {
1754  	return w.sm.IsRunning()
1755  }
1756  
1757  func (w *sprocketCheckerWrapper) ProcessEvent(ev *event.E) (*ingestion.SprocketResponse, error) {
1758  	resp, err := w.sm.ProcessEvent(ev)
1759  	if err != nil {
1760  		return nil, err
1761  	}
1762  	return &ingestion.SprocketResponse{
1763  		Action: resp.Action,
1764  		Msg:    resp.Msg,
1765  	}, nil
1766  }
1767  
1768  // Database wrapper for processing.Database interface
1769  type processingDBWrapper struct {
1770  	db database.Database
1771  }
1772  
1773  func (s *Server) wrapDB() processing.Database {
1774  	return &processingDBWrapper{db: s.DB}
1775  }
1776  
1777  func (w *processingDBWrapper) SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error) {
1778  	return w.db.SaveEvent(ctx, ev)
1779  }
1780  
1781  func (w *processingDBWrapper) CheckForDeleted(ev *event.E, adminOwners [][]byte) error {
1782  	return w.db.CheckForDeleted(ev, adminOwners)
1783  }
1784  
1785  // RateLimiter wrapper for processing.RateLimiter interface
1786  type processingRateLimiterWrapper struct {
1787  	rl *ratelimit.Limiter
1788  }
1789  
1790  func (s *Server) wrapRateLimiter() processing.RateLimiter {
1791  	return &processingRateLimiterWrapper{rl: s.rateLimiter}
1792  }
1793  
1794  func (w *processingRateLimiterWrapper) IsEnabled() bool {
1795  	return w.rl.IsEnabled()
1796  }
1797  
1798  func (w *processingRateLimiterWrapper) Wait(ctx context.Context, opType int) error {
1799  	w.rl.Wait(ctx, opType)
1800  	return nil
1801  }
1802  
1803  // SyncManager wrapper for processing.SyncManager interface
1804  type processingSyncManagerWrapper struct {
1805  	sm *dsync.Manager
1806  }
1807  
1808  func (s *Server) wrapSyncManager() processing.SyncManager {
1809  	return &processingSyncManagerWrapper{sm: s.syncManager}
1810  }
1811  
1812  func (w *processingSyncManagerWrapper) UpdateSerial() {
1813  	w.sm.UpdateSerial()
1814  }
1815  
1816  // RelayGroupManager wrapper for processing.RelayGroupManager interface
1817  type processingRelayGroupManagerWrapper struct {
1818  	rgm *dsync.RelayGroupManager
1819  }
1820  
1821  func (s *Server) wrapRelayGroupManager() processing.RelayGroupManager {
1822  	return &processingRelayGroupManagerWrapper{rgm: s.relayGroupMgr}
1823  }
1824  
1825  func (w *processingRelayGroupManagerWrapper) ValidateRelayGroupEvent(ev *event.E) error {
1826  	return w.rgm.ValidateRelayGroupEvent(ev)
1827  }
1828  
1829  func (w *processingRelayGroupManagerWrapper) HandleRelayGroupEvent(ev *event.E, syncMgr any) {
1830  	if sm, ok := syncMgr.(*dsync.Manager); ok {
1831  		w.rgm.HandleRelayGroupEvent(ev, sm)
1832  	}
1833  }
1834  
1835  // ClusterManager wrapper for processing.ClusterManager interface
1836  type processingClusterManagerWrapper struct {
1837  	cm *dsync.ClusterManager
1838  }
1839  
1840  func (s *Server) wrapClusterManager() processing.ClusterManager {
1841  	return &processingClusterManagerWrapper{cm: s.clusterManager}
1842  }
1843  
1844  func (w *processingClusterManagerWrapper) HandleMembershipEvent(ev *event.E) error {
1845  	return w.cm.HandleMembershipEvent(ev)
1846  }
1847  
1848  // ACLRegistry wrapper for processing.ACLRegistry interface
1849  type processingACLRegistryWrapper struct{}
1850  
1851  func (s *Server) wrapACLRegistry() processing.ACLRegistry {
1852  	return &processingACLRegistryWrapper{}
1853  }
1854  
1855  func (w *processingACLRegistryWrapper) Configure(cfg ...any) error {
1856  	return acl.Registry.Configure(cfg...)
1857  }
1858  
1859  func (w *processingACLRegistryWrapper) Active() string {
1860  	return acl.Registry.GetMode()
1861  }
1862  
1863  // =============================================================================
1864  // Authorization Service Wrappers
1865  // =============================================================================
1866  
1867  // ACLRegistry wrapper for authorization.ACLRegistry interface
1868  type authACLRegistryWrapper struct{}
1869  
1870  func (s *Server) wrapAuthACLRegistry() authorization.ACLRegistry {
1871  	return &authACLRegistryWrapper{}
1872  }
1873  
1874  func (w *authACLRegistryWrapper) GetAccessLevel(pub []byte, address string) string {
1875  	return acl.Registry.GetAccessLevel(pub, address)
1876  }
1877  
1878  func (w *authACLRegistryWrapper) CheckPolicy(ev *event.E) (bool, error) {
1879  	return acl.Registry.CheckPolicy(ev)
1880  }
1881  
1882  func (w *authACLRegistryWrapper) Active() string {
1883  	return acl.Registry.GetMode()
1884  }
1885  
1886  // PolicyManager wrapper for authorization.PolicyManager interface
1887  type authPolicyManagerWrapper struct {
1888  	pm *policy.P
1889  }
1890  
1891  func (s *Server) wrapAuthPolicyManager() authorization.PolicyManager {
1892  	if s.policyManager == nil {
1893  		return nil
1894  	}
1895  	return &authPolicyManagerWrapper{pm: s.policyManager}
1896  }
1897  
1898  func (w *authPolicyManagerWrapper) IsEnabled() bool {
1899  	return w.pm.IsEnabled()
1900  }
1901  
1902  func (w *authPolicyManagerWrapper) CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) {
1903  	return w.pm.CheckPolicy(action, ev, pubkey, remote)
1904  }
1905  
1906  // SyncManager wrapper for authorization.SyncManager interface
1907  type authSyncManagerWrapper struct {
1908  	sm *dsync.Manager
1909  }
1910  
1911  func (s *Server) wrapAuthSyncManager() authorization.SyncManager {
1912  	if s.syncManager == nil {
1913  		return nil
1914  	}
1915  	return &authSyncManagerWrapper{sm: s.syncManager}
1916  }
1917  
1918  func (w *authSyncManagerWrapper) GetPeers() []string {
1919  	return w.sm.GetPeers()
1920  }
1921  
1922  func (w *authSyncManagerWrapper) IsAuthorizedPeer(url, pubkey string) bool {
1923  	return w.sm.IsAuthorizedPeer(url, pubkey)
1924  }
1925  
1926  // =============================================================================
1927  // Message Processing Pause/Resume for Policy and Follow List Updates
1928  // =============================================================================
1929  
1930  // PauseMessageProcessing acquires an exclusive lock to pause all message processing.
1931  // This should be called before updating policy configuration or follow lists.
1932  // Call ResumeMessageProcessing to release the lock after updates are complete.
1933  func (s *Server) PauseMessageProcessing() {
1934  	s.messagePauseMutex.Lock()
1935  }
1936  
1937  // ResumeMessageProcessing releases the exclusive lock to resume message processing.
1938  // This should be called after policy configuration or follow list updates are complete.
1939  func (s *Server) ResumeMessageProcessing() {
1940  	s.messagePauseMutex.Unlock()
1941  }
1942  
1943  // AcquireMessageProcessingLock acquires a read lock for normal message processing.
1944  // This allows concurrent message processing while blocking during policy updates.
1945  // Call ReleaseMessageProcessingLock when message processing is complete.
1946  func (s *Server) AcquireMessageProcessingLock() {
1947  	s.messagePauseMutex.RLock()
1948  }
1949  
1950  // ReleaseMessageProcessingLock releases the read lock after message processing.
1951  func (s *Server) ReleaseMessageProcessingLock() {
1952  	s.messagePauseMutex.RUnlock()
1953  }
1954