main.go raw

   1  package app
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"os"
   7  	"path/filepath"
   8  	"strings"
   9  	"sync"
  10  	"time"
  11  
  12  	"github.com/adrg/xdg"
  13  	"next.orly.dev/pkg/lol/chk"
  14  	"next.orly.dev/pkg/lol/log"
  15  	"next.orly.dev/app/branding"
  16  	"next.orly.dev/app/config"
  17  	"next.orly.dev/pkg/acl"
  18  	"next.orly.dev/pkg/nostr/crypto/keys"
  19  	"next.orly.dev/pkg/database"
  20  	"next.orly.dev/pkg/nostr/encoders/bech32encoding"
  21  	"next.orly.dev/pkg/nostr/encoders/hex"
  22  	"next.orly.dev/pkg/neo4j"
  23  	"next.orly.dev/pkg/policy"
  24  	"next.orly.dev/pkg/protocol/graph"
  25  	"next.orly.dev/pkg/protocol/nip43"
  26  	"next.orly.dev/pkg/protocol/publish"
  27  	"next.orly.dev/pkg/bunker"
  28  	"next.orly.dev/pkg/protocol/nrc"
  29  	"next.orly.dev/pkg/ratelimit"
  30  	"next.orly.dev/pkg/crawler"
  31  	"next.orly.dev/pkg/grapevine"
  32  	"next.orly.dev/pkg/spider"
  33  	"next.orly.dev/pkg/storage"
  34  	dsync "next.orly.dev/pkg/sync"
  35  	"next.orly.dev/pkg/transport"
  36  	"next.orly.dev/pkg/transport/tcp"
  37  	tlstransport "next.orly.dev/pkg/transport/tls"
  38  	tortransport "next.orly.dev/pkg/transport/tor"
  39  	"next.orly.dev/pkg/wireguard"
  40  	"next.orly.dev/pkg/archive"
  41  	emailbridge "next.orly.dev/pkg/bridge"
  42  	"next.orly.dev/pkg/httpguard"
  43  
  44  	"next.orly.dev/pkg/nostr/interfaces/signer/p8k"
  45  )
  46  
  47  func Run(
  48  	ctx context.Context, cfg *config.C, db database.Database, limiter *ratelimit.Limiter,
  49  ) (quit chan struct{}) {
  50  	quit = make(chan struct{})
  51  	var once sync.Once
  52  
  53  	// shutdown handler
  54  	go func() {
  55  		<-ctx.Done()
  56  		log.I.F("shutting down")
  57  		once.Do(func() { close(quit) })
  58  	}()
  59  	// get the admins
  60  	var err error
  61  	var adminKeys [][]byte
  62  	for _, admin := range cfg.Admins {
  63  		if len(admin) == 0 {
  64  			continue
  65  		}
  66  		var pk []byte
  67  		if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
  68  			continue
  69  		}
  70  		adminKeys = append(adminKeys, pk)
  71  	}
  72  	// get the owners
  73  	var ownerKeys [][]byte
  74  	for _, owner := range cfg.Owners {
  75  		if len(owner) == 0 {
  76  			continue
  77  		}
  78  		var pk []byte
  79  		if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(owner); chk.E(err) {
  80  			continue
  81  		}
  82  		ownerKeys = append(ownerKeys, pk)
  83  	}
  84  	// start listener
  85  	channelMembership := NewChannelMembership(db)
  86  	dmLimiter := NewDMRateLimiter(db)
  87  	wsPublisher := NewPublisher(ctx)
  88  	wsPublisher.ChannelMembership = channelMembership
  89  
  90  	l := &Server{
  91  		Ctx:               ctx,
  92  		Config:            cfg,
  93  		DB:                db,
  94  		publishers:        publish.New(wsPublisher),
  95  		Admins:            adminKeys,
  96  		Owners:            ownerKeys,
  97  		rateLimiter:       limiter,
  98  		cfg:               cfg,
  99  		db:                db,
 100  		connPerIP:         make(map[string]int),
 101  		aclRegistry:       acl.Registry, // Inject ACL registry (transitional from global)
 102  		channelMembership:         channelMembership,
 103  		dmRateLimiter:             dmLimiter,
 104  		negentropyFullSyncPubkeys: parseNegentropyFullSyncPubkeys(cfg.NegentropyFullSyncPubkeys),
 105  	}
 106  
 107  	// Configure connection storm mitigation limits on the rate limiter
 108  	if limiter != nil {
 109  		limiter.SetConnectionLimits(
 110  			cfg.MaxGlobalConnections,
 111  			cfg.ConnectionDelayMaxMs,
 112  			cfg.GoroutineWarningCount,
 113  			cfg.GoroutineMaxCount,
 114  		)
 115  	}
 116  
 117  	// Initialize HTTP guard (bot blocking + per-IP rate limiting)
 118  	if cfg.HTTPGuardEnabled {
 119  		l.httpGuard = httpguard.New(httpguard.Config{
 120  			Enabled:     true,
 121  			BotBlock:    cfg.HTTPGuardBotBlock,
 122  			RPM:         cfg.HTTPGuardRPM,
 123  			WSPerMin:    cfg.HTTPGuardWSPerMin,
 124  			IPBlacklist: cfg.IPBlacklist,
 125  		})
 126  	}
 127  
 128  	// Initialize branding/white-label manager if enabled
 129  	if cfg.BrandingEnabled {
 130  		brandingDir := cfg.BrandingDir
 131  		if brandingDir == "" {
 132  			brandingDir = filepath.Join(xdg.ConfigHome, cfg.AppName, "branding")
 133  		}
 134  		if _, err := os.Stat(brandingDir); err == nil {
 135  			if l.brandingMgr, err = branding.New(brandingDir); err != nil {
 136  				log.W.F("failed to load branding from %s: %v", brandingDir, err)
 137  			} else {
 138  				log.I.F("custom branding loaded from %s", brandingDir)
 139  			}
 140  		}
 141  	}
 142  
 143  	// Initialize NIP-43 invite manager if enabled
 144  	if cfg.NIP43Enabled {
 145  		l.InviteManager = nip43.NewInviteManager(cfg.NIP43InviteExpiry)
 146  		log.I.F("NIP-43 invite system enabled with %v expiry", cfg.NIP43InviteExpiry)
 147  	}
 148  
 149  	// Initialize sprocket manager
 150  	l.sprocketManager = NewSprocketManager(ctx, cfg.AppName, cfg.SprocketEnabled)
 151  
 152  	// Initialize policy manager
 153  	l.policyManager = policy.NewWithManager(ctx, cfg.AppName, cfg.PolicyEnabled, cfg.PolicyPath)
 154  
 155  	// Merge policy-defined owners with environment-defined owners
 156  	// This allows cloud deployments to add owners via policy.json when env vars cannot be modified
 157  	if l.policyManager != nil {
 158  		policyOwners := l.policyManager.GetOwnersBin()
 159  		if len(policyOwners) > 0 {
 160  			// Deduplicate when merging
 161  			existingOwners := make(map[string]struct{})
 162  			for _, owner := range l.Owners {
 163  				existingOwners[string(owner)] = struct{}{}
 164  			}
 165  			for _, policyOwner := range policyOwners {
 166  				if _, exists := existingOwners[string(policyOwner)]; !exists {
 167  					l.Owners = append(l.Owners, policyOwner)
 168  					existingOwners[string(policyOwner)] = struct{}{}
 169  				}
 170  			}
 171  			log.I.F("merged %d policy-defined owners with %d environment-defined owners (total: %d unique owners)",
 172  				len(policyOwners), len(ownerKeys), len(l.Owners))
 173  		}
 174  	}
 175  
 176  	// Initialize policy follows from database (load follow lists of policy admins)
 177  	// This must be done after policy manager initialization but before accepting connections
 178  	if err := l.InitializePolicyFollows(); err != nil {
 179  		log.W.F("failed to initialize policy follows: %v", err)
 180  		// Continue anyway - follows can be loaded when admins update their follow lists
 181  	}
 182  
 183  	// Cleanup any kind 3 events that lost their p tags (only for Badger backend)
 184  	if badgerDB, ok := db.(*database.D); ok {
 185  		if err := badgerDB.CleanupKind3WithoutPTags(ctx); chk.E(err) {
 186  			log.E.F("failed to cleanup kind 3 events: %v", err)
 187  		}
 188  	}
 189  
 190  	// Initialize graph query executor (Badger backend) if enabled
 191  	if badgerDB, ok := db.(*database.D); ok && cfg.GraphQueriesEnabled {
 192  		// Get relay identity key for signing graph query responses
 193  		relaySecretKey, err := badgerDB.GetOrCreateRelayIdentitySecret()
 194  		if err != nil {
 195  			log.E.F("failed to get relay identity key for graph executor: %v", err)
 196  		} else {
 197  			// Create the graph adapter and executor
 198  			graphAdapter := database.NewGraphAdapter(badgerDB)
 199  			if l.graphExecutor, err = graph.NewExecutor(graphAdapter, relaySecretKey); err != nil {
 200  				log.E.F("failed to create graph executor: %v", err)
 201  			} else {
 202  				graphEnabled, maxDepth, maxResults, rateLimitRPM := cfg.GetGraphConfigValues()
 203  				log.I.F("graph query executor initialized (Badger backend, enabled=%v, max_depth=%d, max_results=%d, rate_limit=%d/min)",
 204  					graphEnabled, maxDepth, maxResults, rateLimitRPM)
 205  			}
 206  		}
 207  	}
 208  
 209  	// Initialize graph query executor (Neo4j backend) if enabled
 210  	if neo4jDB, ok := db.(*neo4j.N); ok && cfg.GraphQueriesEnabled {
 211  		// Get relay identity key for signing graph query responses
 212  		relaySecretKey, err := neo4jDB.GetOrCreateRelayIdentitySecret()
 213  		if err != nil {
 214  			log.E.F("failed to get relay identity key for graph executor: %v", err)
 215  		} else {
 216  			// Create the graph adapter and executor
 217  			graphAdapter := neo4j.NewGraphAdapter(neo4jDB)
 218  			if l.graphExecutor, err = graph.NewExecutor(graphAdapter, relaySecretKey); err != nil {
 219  				log.E.F("failed to create graph executor: %v", err)
 220  			} else {
 221  				graphEnabled, maxDepth, maxResults, rateLimitRPM := cfg.GetGraphConfigValues()
 222  				log.I.F("graph query executor initialized (Neo4j backend, enabled=%v, max_depth=%d, max_results=%d, rate_limit=%d/min)",
 223  					graphEnabled, maxDepth, maxResults, rateLimitRPM)
 224  			}
 225  		}
 226  	}
 227  
 228  	// Initialize GrapeVine WoT scoring engine (Badger backend only)
 229  	if cfg.GrapeVineEnabled {
 230  		if badgerDB, ok := db.(*database.D); ok {
 231  			gvStore := database.NewGrapeVineStore(badgerDB)
 232  			gvSource := grapevine.NewBadgerGraphSource(badgerDB)
 233  			gvCfg := grapevine.Config{
 234  				MaxDepth:          cfg.GrapeVineMaxDepth,
 235  				Cycles:            cfg.GrapeVineCycles,
 236  				AttenuationFactor: cfg.GrapeVineAttenuation,
 237  				Rigor:             cfg.GrapeVineRigor,
 238  				FollowConfidence:  cfg.GrapeVineFollowConf,
 239  			}
 240  			l.grapeVineEngine = grapevine.NewEngine(gvSource, gvStore, gvCfg)
 241  			l.grapeVineScheduler = grapevine.NewScheduler(l.grapeVineEngine, cfg.GrapeVineObservers, cfg.GrapeVineRefresh)
 242  			if len(cfg.GrapeVineObservers) > 0 {
 243  				go l.grapeVineScheduler.Start(ctx)
 244  			}
 245  			log.I.F("grapevine WoT scoring enabled (depth=%d, cycles=%d, observers=%d)",
 246  				cfg.GrapeVineMaxDepth, cfg.GrapeVineCycles, len(cfg.GrapeVineObservers))
 247  		} else {
 248  			log.W.F("grapevine enabled but database is not Badger — grapevine requires Badger graph indexes")
 249  		}
 250  	}
 251  
 252  	// Initialize NRC (Nostr Relay Connect) bridge if enabled
 253  	nrcEnabled, nrcRendezvousURL, nrcAuthorizedKeys, nrcSessionTimeout := cfg.GetNRCConfigValues()
 254  	if nrcEnabled && nrcRendezvousURL != "" {
 255  		// Get relay identity for signing NRC responses
 256  		relaySecretKey, err := db.GetOrCreateRelayIdentitySecret()
 257  		if err != nil {
 258  			log.E.F("failed to get relay identity for NRC bridge: %v", err)
 259  		} else {
 260  			// Create signer from secret key
 261  			relaySigner, sigErr := p8k.New()
 262  			if sigErr != nil {
 263  				log.E.F("failed to create signer for NRC bridge: %v", sigErr)
 264  			} else if sigErr = relaySigner.InitSec(relaySecretKey); sigErr != nil {
 265  				log.E.F("failed to init signer for NRC bridge: %v", sigErr)
 266  			} else {
 267  				// Parse authorized secrets (format: secret:name,secret:name,...)
 268  				authorizedSecrets := make(map[string]string)
 269  				for _, entry := range nrcAuthorizedKeys {
 270  					parts := strings.SplitN(entry, ":", 2)
 271  					if len(parts) >= 1 {
 272  						secretHex := parts[0]
 273  						name := ""
 274  						if len(parts) == 2 {
 275  							name = parts[1]
 276  						}
 277  						// Derive pubkey from secret
 278  						secretBytes, decErr := hex.Dec(secretHex)
 279  						if decErr != nil || len(secretBytes) != 32 {
 280  							log.W.F("NRC: skipping invalid secret key: %s", secretHex[:8])
 281  							continue
 282  						}
 283  						derivedSigner, signerErr := p8k.New()
 284  						if signerErr != nil {
 285  							log.W.F("NRC: failed to create signer: %v", signerErr)
 286  							continue
 287  						}
 288  						if signerErr = derivedSigner.InitSec(secretBytes); signerErr != nil {
 289  							log.W.F("NRC: failed to init signer: %v", signerErr)
 290  							continue
 291  						}
 292  						derivedPubkeyHex := string(hex.Enc(derivedSigner.Pub()))
 293  						authorizedSecrets[derivedPubkeyHex] = name
 294  					}
 295  				}
 296  
 297  				// Construct local relay URL
 298  				localRelayURL := fmt.Sprintf("ws://localhost:%d", cfg.Port)
 299  
 300  				// Create bridge config
 301  				bridgeConfig := &nrc.BridgeConfig{
 302  					RendezvousURL:     nrcRendezvousURL,
 303  					LocalRelayURL:     localRelayURL,
 304  					Signer:            relaySigner,
 305  					AuthorizedSecrets: authorizedSecrets,
 306  					SessionTimeout:    nrcSessionTimeout,
 307  				}
 308  
 309  				// Create event-based NRC store for dynamic connection management
 310  				// This works with any database backend
 311  				l.nrcEventStore = database.NewNRCEventStore(db, relaySigner)
 312  				bridgeConfig.Authorizer = database.NewNRCEventAuthorizer(l.nrcEventStore)
 313  				log.D.F("NRC bridge using event-based authorization")
 314  
 315  				// Create and start the bridge
 316  				l.nrcBridge = nrc.NewBridge(bridgeConfig)
 317  				if err := l.nrcBridge.Start(); err != nil {
 318  					log.E.F("failed to start NRC bridge: %v", err)
 319  					l.nrcBridge = nil
 320  					l.nrcEventStore = nil
 321  				} else {
 322  					log.I.F("NRC bridge started (rendezvous: %s, authorized: %d env, event-store: %v)",
 323  						nrcRendezvousURL, len(authorizedSecrets), l.nrcEventStore != nil)
 324  				}
 325  			}
 326  		}
 327  	}
 328  
 329  	// Initialize spider manager based on mode (only for Badger backend)
 330  	if badgerDB, ok := db.(*database.D); ok && cfg.SpiderMode != "none" {
 331  		if l.spiderManager, err = spider.New(ctx, badgerDB, l.publishers, cfg.SpiderMode); chk.E(err) {
 332  			log.E.F("failed to create spider manager: %v", err)
 333  		} else {
 334  			// Set up callbacks for follows mode
 335  			if cfg.SpiderMode == "follows" {
 336  				l.spiderManager.SetCallbacks(
 337  					func() []string {
 338  						// Get admin relays from follows ACL if available
 339  						for _, aclInstance := range acl.Registry.ACLs() {
 340  							if aclInstance.Type() == "follows" {
 341  								if follows, ok := aclInstance.(*acl.Follows); ok {
 342  									return follows.AdminRelays()
 343  								}
 344  							}
 345  						}
 346  						return nil
 347  					},
 348  					func() [][]byte {
 349  						// Get followed pubkeys from follows ACL if available
 350  						for _, aclInstance := range acl.Registry.ACLs() {
 351  							if aclInstance.Type() == "follows" {
 352  								if follows, ok := aclInstance.(*acl.Follows); ok {
 353  									return follows.GetFollowedPubkeys()
 354  								}
 355  							}
 356  						}
 357  						return nil
 358  					},
 359  				)
 360  			}
 361  
 362  			if err = l.spiderManager.Start(); chk.E(err) {
 363  				log.E.F("failed to start spider manager: %v", err)
 364  			} else {
 365  				log.I.F("spider manager started successfully in '%s' mode", cfg.SpiderMode)
 366  
 367  				// Hook up follow list update notifications from ACL to spider
 368  				if cfg.SpiderMode == "follows" {
 369  					for _, aclInstance := range acl.Registry.ACLs() {
 370  						if aclInstance.Type() == "follows" {
 371  							if follows, ok := aclInstance.(*acl.Follows); ok {
 372  								follows.SetFollowListUpdateCallback(func() {
 373  									log.I.F("follow list updated, notifying spider")
 374  									l.spiderManager.NotifyFollowListUpdate()
 375  								})
 376  								log.I.F("spider: follow list update notifications configured")
 377  							}
 378  						}
 379  					}
 380  				}
 381  			}
 382  		}
 383  	}
 384  
 385  	// Initialize directory spider if enabled (only for Badger backend)
 386  	if badgerDB, ok := db.(*database.D); ok && cfg.DirectorySpiderEnabled {
 387  		if l.directorySpider, err = spider.NewDirectorySpider(
 388  			ctx,
 389  			badgerDB,
 390  			l.publishers,
 391  			cfg.DirectorySpiderInterval,
 392  			cfg.DirectorySpiderMaxHops,
 393  		); chk.E(err) {
 394  			log.E.F("failed to create directory spider: %v", err)
 395  		} else {
 396  			// Set up callback to get seed pubkeys (whitelisted users)
 397  			l.directorySpider.SetSeedCallback(func() [][]byte {
 398  				var pubkeys [][]byte
 399  				// Get followed pubkeys from follows ACL if available
 400  				for _, aclInstance := range acl.Registry.ACLs() {
 401  					if aclInstance.Type() == "follows" {
 402  						if follows, ok := aclInstance.(*acl.Follows); ok {
 403  							pubkeys = append(pubkeys, follows.GetFollowedPubkeys()...)
 404  						}
 405  					}
 406  				}
 407  				// Fall back to admin keys if no follows ACL
 408  				if len(pubkeys) == 0 {
 409  					pubkeys = adminKeys
 410  				}
 411  				return pubkeys
 412  			})
 413  
 414  			if err = l.directorySpider.Start(); chk.E(err) {
 415  				log.E.F("failed to start directory spider: %v", err)
 416  			} else {
 417  				log.I.F("directory spider started (interval: %v, max hops: %d)",
 418  					cfg.DirectorySpiderInterval, cfg.DirectorySpiderMaxHops)
 419  			}
 420  		}
 421  	}
 422  
 423  	// Initialize corpus crawler if enabled (works with any database backend)
 424  	if cfg.CrawlerEnabled {
 425  		crawlerCfg := crawler.DefaultConfig()
 426  		if cfg.CrawlerDiscoveryInterval > 0 {
 427  			crawlerCfg.DiscoveryInterval = cfg.CrawlerDiscoveryInterval
 428  		}
 429  		if cfg.CrawlerSyncInterval > 0 {
 430  			crawlerCfg.SyncInterval = cfg.CrawlerSyncInterval
 431  		}
 432  		if cfg.CrawlerMaxHops > 0 {
 433  			crawlerCfg.MaxHops = cfg.CrawlerMaxHops
 434  		}
 435  		if cfg.CrawlerConcurrency > 0 {
 436  			crawlerCfg.Concurrency = cfg.CrawlerConcurrency
 437  		}
 438  
 439  		if l.corpusCrawler, err = crawler.New(ctx, db, l.publishers, crawlerCfg); err != nil {
 440  			log.E.F("failed to create corpus crawler: %v", err)
 441  		} else {
 442  			l.corpusCrawler.SetSeedCallback(func() [][]byte {
 443  				var pubkeys [][]byte
 444  				for _, aclInstance := range acl.Registry.ACLs() {
 445  					if aclInstance.Type() == "follows" {
 446  						if follows, ok := aclInstance.(*acl.Follows); ok {
 447  							pubkeys = append(pubkeys, follows.GetFollowedPubkeys()...)
 448  						}
 449  					}
 450  				}
 451  				if len(pubkeys) == 0 {
 452  					pubkeys = adminKeys
 453  				}
 454  				return pubkeys
 455  			})
 456  
 457  			if err = l.corpusCrawler.Start(); err != nil {
 458  				log.E.F("failed to start corpus crawler: %v", err)
 459  			} else {
 460  				log.I.F("corpus crawler started (discovery: %v, sync: %v, hops: %d, concurrency: %d)",
 461  					crawlerCfg.DiscoveryInterval, crawlerCfg.SyncInterval,
 462  					crawlerCfg.MaxHops, crawlerCfg.Concurrency)
 463  			}
 464  		}
 465  	}
 466  
 467  	// Initialize relay group manager (only for Badger backend)
 468  	if badgerDB, ok := db.(*database.D); ok {
 469  		l.relayGroupMgr = dsync.NewRelayGroupManager(badgerDB, cfg.RelayGroupAdmins)
 470  	} else if cfg.SpiderMode != "none" || len(cfg.RelayPeers) > 0 || len(cfg.ClusterAdmins) > 0 {
 471  		log.I.Ln("spider, sync, and cluster features require Badger backend (currently using alternative backend)")
 472  	}
 473  
 474  	// Initialize sync manager if relay peers are configured (only for Badger backend)
 475  	if badgerDB, ok := db.(*database.D); ok {
 476  		var peers []string
 477  		if len(cfg.RelayPeers) > 0 {
 478  			peers = cfg.RelayPeers
 479  		} else {
 480  			// Try to get peers from relay group configuration
 481  			if l.relayGroupMgr != nil {
 482  				if config, err := l.relayGroupMgr.FindAuthoritativeConfig(ctx); err == nil && config != nil {
 483  					peers = config.Relays
 484  					log.I.F("using relay group configuration with %d peers", len(peers))
 485  				}
 486  			}
 487  		}
 488  
 489  		if len(peers) > 0 {
 490  			// Get relay identity for node ID
 491  			sk, err := db.GetOrCreateRelayIdentitySecret()
 492  			if err != nil {
 493  				log.E.F("failed to get relay identity for sync: %v", err)
 494  			} else {
 495  				nodeID, err := keys.SecretBytesToPubKeyHex(sk)
 496  				if err != nil {
 497  					log.E.F("failed to derive pubkey for sync node ID: %v", err)
 498  				} else {
 499  					relayURL := cfg.RelayURL
 500  					if relayURL == "" {
 501  						relayURL = fmt.Sprintf("http://localhost:%d", cfg.Port)
 502  					}
 503  					l.syncManager = dsync.NewManager(ctx, badgerDB, nodeID, relayURL, peers, l.relayGroupMgr, l.policyManager)
 504  					log.I.F("distributed sync manager initialized with %d peers", len(peers))
 505  				}
 506  			}
 507  		}
 508  	}
 509  
 510  	// Initialize cluster manager for cluster replication (only for Badger backend)
 511  	if badgerDB, ok := db.(*database.D); ok {
 512  		var clusterAdminNpubs []string
 513  		if len(cfg.ClusterAdmins) > 0 {
 514  			clusterAdminNpubs = cfg.ClusterAdmins
 515  		} else {
 516  			// Default to regular admins if no cluster admins specified
 517  			for _, admin := range cfg.Admins {
 518  				clusterAdminNpubs = append(clusterAdminNpubs, admin)
 519  			}
 520  		}
 521  
 522  		if len(clusterAdminNpubs) > 0 {
 523  			l.clusterManager = dsync.NewClusterManager(ctx, badgerDB, clusterAdminNpubs, cfg.ClusterPropagatePrivilegedEvents, l.publishers)
 524  			l.clusterManager.Start()
 525  			log.I.F("cluster replication manager initialized with %d admin npubs", len(clusterAdminNpubs))
 526  		}
 527  	}
 528  
 529  	// Initialize Blossom blob storage server
 530  	// Now works with any database backend that implements blob storage methods.
 531  	// MUST be done before UserInterface() which registers routes.
 532  	if cfg.BlossomEnabled {
 533  		log.I.F("initializing Blossom server...")
 534  		if l.blossomServer, err = initializeBlossomServer(ctx, cfg, db); err != nil {
 535  			log.E.F("failed to initialize blossom server: %v", err)
 536  			// Continue without blossom server
 537  		} else if l.blossomServer != nil {
 538  			log.I.F("blossom blob storage server initialized")
 539  		} else {
 540  			log.W.F("blossom server initialization returned nil without error")
 541  		}
 542  	} else {
 543  		log.I.F("Blossom server disabled via ORLY_BLOSSOM_ENABLED=false")
 544  	}
 545  
 546  	// Initialize Nostr-Email bridge if enabled
 547  	bridgeEnabled, bridgeDomain, bridgeNSEC, bridgeRelayURL,
 548  		bridgeSMTPPort, bridgeSMTPHost, bridgeDataDir,
 549  		bridgeDKIMKeyPath, bridgeDKIMSelector,
 550  		bridgeNWCURI, bridgeMonthlyPriceSats, bridgeComposeURL,
 551  		bridgeSMTPRelayHost, bridgeSMTPRelayPort,
 552  		bridgeSMTPRelayUsername, bridgeSMTPRelayPassword,
 553  		bridgeACLGRPCServer, bridgeAliasPriceSats, bridgeProfilePath := cfg.GetBridgeConfigValues()
 554  
 555  	if bridgeEnabled {
 556  		bridgeCfg := &emailbridge.Config{
 557  			Domain:            bridgeDomain,
 558  			NSEC:              bridgeNSEC,
 559  			RelayURL:          bridgeRelayURL,
 560  			SMTPPort:          bridgeSMTPPort,
 561  			SMTPHost:          bridgeSMTPHost,
 562  			DataDir:           bridgeDataDir,
 563  			DKIMKeyPath:       bridgeDKIMKeyPath,
 564  			DKIMSelector:      bridgeDKIMSelector,
 565  			NWCURI:            bridgeNWCURI,
 566  			MonthlyPriceSats:  bridgeMonthlyPriceSats,
 567  			ComposeURL:        bridgeComposeURL,
 568  			SMTPRelayHost:     bridgeSMTPRelayHost,
 569  			SMTPRelayPort:     bridgeSMTPRelayPort,
 570  			SMTPRelayUsername: bridgeSMTPRelayUsername,
 571  			SMTPRelayPassword: bridgeSMTPRelayPassword,
 572  			ACLGRPCServer:     bridgeACLGRPCServer,
 573  			AliasPriceSats:    bridgeAliasPriceSats,
 574  			ProfilePath:       bridgeProfilePath,
 575  		}
 576  
 577  		// In monolithic mode, provide a database getter for identity resolution
 578  		dbGetter := func() ([]byte, error) {
 579  			return db.GetOrCreateRelayIdentitySecret()
 580  		}
 581  
 582  		l.emailBridge = emailbridge.New(bridgeCfg, dbGetter)
 583  		if err := l.emailBridge.Start(ctx); err != nil {
 584  			log.E.F("failed to start email bridge: %v", err)
 585  			l.emailBridge = nil
 586  		} else {
 587  			log.I.F("email bridge started (domain: %s, SMTP: %s:%d)",
 588  				bridgeDomain, bridgeSMTPHost, bridgeSMTPPort)
 589  		}
 590  	}
 591  
 592  	// Initialize WireGuard VPN and NIP-46 Bunker (only for Badger backend)
 593  	// Requires ACL mode 'follows' or 'managed' - no point for open relays
 594  	if badgerDB, ok := db.(*database.D); ok && cfg.WGEnabled && cfg.ACLMode != "none" {
 595  		if cfg.WGEndpoint == "" {
 596  			log.E.F("WireGuard enabled but ORLY_WG_ENDPOINT not set - skipping")
 597  		} else {
 598  			// Get or create the subnet pool (restores seed and allocations from DB)
 599  			subnetPool, err := badgerDB.GetOrCreateSubnetPool(cfg.WGNetwork)
 600  			if err != nil {
 601  				log.E.F("failed to create subnet pool: %v", err)
 602  			} else {
 603  				l.subnetPool = subnetPool
 604  
 605  				// Get or create WireGuard server key
 606  				wgServerKey, err := badgerDB.GetOrCreateWireGuardServerKey()
 607  				if err != nil {
 608  					log.E.F("failed to get WireGuard server key: %v", err)
 609  				} else {
 610  					// Create WireGuard server
 611  					wgConfig := &wireguard.Config{
 612  						Port:       cfg.WGPort,
 613  						Endpoint:   cfg.WGEndpoint,
 614  						PrivateKey: wgServerKey,
 615  						Network:    cfg.WGNetwork,
 616  						ServerIP:   "10.73.0.1",
 617  					}
 618  
 619  					l.wireguardServer, err = wireguard.New(wgConfig)
 620  					if err != nil {
 621  						log.E.F("failed to create WireGuard server: %v", err)
 622  					} else {
 623  						if err = l.wireguardServer.Start(); err != nil {
 624  							log.E.F("failed to start WireGuard server: %v", err)
 625  						} else {
 626  							log.I.F("WireGuard VPN server started on UDP port %d", cfg.WGPort)
 627  
 628  							// Load existing peers from database and add to server
 629  							peers, err := badgerDB.GetAllWireGuardPeers()
 630  							if err != nil {
 631  								log.W.F("failed to load existing WireGuard peers: %v", err)
 632  							} else {
 633  								for _, peer := range peers {
 634  									// Derive client IP from sequence
 635  									subnet := subnetPool.SubnetForSequence(peer.Sequence)
 636  									clientIP := subnet.ClientIP.String()
 637  									if err := l.wireguardServer.AddPeer(peer.NostrPubkey, peer.WGPublicKey, clientIP); err != nil {
 638  										log.W.F("failed to add existing peer: %v", err)
 639  									}
 640  								}
 641  								if len(peers) > 0 {
 642  									log.I.F("loaded %d existing WireGuard peers", len(peers))
 643  								}
 644  							}
 645  
 646  						// Initialize bunker if enabled
 647  						if cfg.BunkerEnabled {
 648  							// Get relay identity for signing
 649  							relaySecretKey, err := badgerDB.GetOrCreateRelayIdentitySecret()
 650  							if err != nil {
 651  								log.E.F("failed to get relay identity for bunker: %v", err)
 652  							} else {
 653  								// Create signer from secret key
 654  								relaySigner, sigErr := p8k.New()
 655  								if sigErr != nil {
 656  									log.E.F("failed to create signer for bunker: %v", sigErr)
 657  								} else if sigErr = relaySigner.InitSec(relaySecretKey); sigErr != nil {
 658  									log.E.F("failed to init signer for bunker: %v", sigErr)
 659  								} else {
 660  								relayPubkey := relaySigner.Pub()
 661  
 662  								bunkerConfig := &bunker.Config{
 663  									RelaySigner: relaySigner,
 664  									RelayPubkey: relayPubkey[:],
 665  									Netstack:    l.wireguardServer.GetNetstack(),
 666  									ListenAddr:  fmt.Sprintf("10.73.0.1:%d", cfg.BunkerPort),
 667  								}
 668  
 669  								l.bunkerServer = bunker.New(bunkerConfig)
 670  								if err = l.bunkerServer.Start(); err != nil {
 671  									log.E.F("failed to start bunker server: %v", err)
 672  								} else {
 673  									log.I.F("NIP-46 bunker server started on 10.73.0.1:%d (WireGuard only)", cfg.BunkerPort)
 674  								}
 675  								}
 676  							}
 677  						}
 678  						}
 679  					}
 680  				}
 681  			}
 682  		}
 683  	} else if cfg.WGEnabled && cfg.ACLMode == "none" {
 684  		log.I.F("WireGuard disabled: requires ACL mode 'follows' or 'managed' (currently: 'none')")
 685  	}
 686  
 687  	// Initialize event domain services (validation, routing, processing)
 688  	l.InitEventServices()
 689  
 690  	// Initialize the user interface (registers routes)
 691  	l.UserInterface()
 692  
 693  	// Start embedded Smesh web client if enabled
 694  	if cfg.SmeshEnabled && cfg.SmeshPort > 0 {
 695  		l.smeshServer = NewSmeshServer(cfg.SmeshPort)
 696  		if err := l.smeshServer.Start(ctx); err != nil {
 697  			log.E.F("failed to start smesh server: %v", err)
 698  			l.smeshServer = nil
 699  		}
 700  	}
 701  
 702  	// Ensure a relay identity secret key exists when subscriptions and NWC are enabled
 703  	if cfg.SubscriptionEnabled && cfg.NWCUri != "" {
 704  		if skb, e := db.GetOrCreateRelayIdentitySecret(); e != nil {
 705  			log.E.F("failed to ensure relay identity key: %v", e)
 706  		} else if pk, e2 := keys.SecretBytesToPubKeyHex(skb); e2 == nil {
 707  			log.I.F("relay identity loaded (pub=%s)", pk)
 708  			// ensure relay identity pubkey is considered an admin for ACL follows mode
 709  			found := false
 710  			for _, a := range cfg.Admins {
 711  				if a == pk {
 712  					found = true
 713  					break
 714  				}
 715  			}
 716  			if !found {
 717  				cfg.Admins = append(cfg.Admins, pk)
 718  				log.I.F("added relay identity to admins for follow-list whitelisting")
 719  			}
 720  			// also ensure relay identity pubkey is considered an owner for full control
 721  			found = false
 722  			for _, o := range cfg.Owners {
 723  				if o == pk {
 724  					found = true
 725  					break
 726  				}
 727  			}
 728  			if !found {
 729  				cfg.Owners = append(cfg.Owners, pk)
 730  				log.I.F("added relay identity to owners for full control")
 731  			}
 732  		}
 733  	}
 734  
 735  	// Initialize payment processor (only for Badger backend)
 736  	if badgerDB, ok := db.(*database.D); ok {
 737  		if l.paymentProcessor, err = NewPaymentProcessor(ctx, cfg, badgerDB); err != nil {
 738  			// log.E.F("failed to create payment processor: %v", err)
 739  			// Continue without payment processor
 740  		} else {
 741  			if err = l.paymentProcessor.Start(); err != nil {
 742  				log.E.F("failed to start payment processor: %v", err)
 743  			} else {
 744  				log.I.F("payment processor started successfully")
 745  			}
 746  		}
 747  	}
 748  
 749  	// Initialize access tracker for storage management (only for Badger backend)
 750  	if badgerDB, ok := db.(*database.D); ok {
 751  		l.accessTracker = storage.NewAccessTracker(badgerDB, 100000) // 100k dedup cache
 752  		l.accessTracker.Start()
 753  		log.I.F("access tracker initialized")
 754  
 755  		// Initialize garbage collector if enabled
 756  		maxBytes, gcEnabled, gcIntervalSec, gcBatchSize := cfg.GetStorageConfigValues()
 757  		if gcEnabled {
 758  			gcCfg := storage.GCConfig{
 759  				MaxStorageBytes: maxBytes,
 760  				Interval:        time.Duration(gcIntervalSec) * time.Second,
 761  				BatchSize:       gcBatchSize,
 762  				MinAgeSec:       3600, // Minimum 1 hour before eviction
 763  			}
 764  
 765  			// Wire WoT provider from social ACL if active
 766  			if acl.Registry.GetMode() == "social" {
 767  				for _, aclInstance := range acl.Registry.ACLs() {
 768  					if social, ok := aclInstance.(*acl.Social); ok {
 769  						wotMap := social.GetWoTDepthMap()
 770  						gcCfg.WoTProvider = wotMap
 771  						gcCfg.AuthorLookup = badgerDB
 772  						log.I.F("garbage collector: WoT-weighted eviction enabled via social ACL")
 773  						break
 774  					}
 775  				}
 776  			}
 777  
 778  			l.garbageCollector = storage.NewGarbageCollector(ctx, badgerDB, l.accessTracker, gcCfg)
 779  			l.garbageCollector.Start()
 780  			log.I.F("garbage collector started (interval: %ds, batch: %d)", gcIntervalSec, gcBatchSize)
 781  		}
 782  	}
 783  
 784  	// Initialize archive relay manager if enabled
 785  	archiveEnabled, archiveRelays, archiveTimeoutSec, archiveCacheTTLHrs := cfg.GetArchiveConfigValues()
 786  	if archiveEnabled && len(archiveRelays) > 0 {
 787  		archiveCfg := archive.Config{
 788  			Enabled:     true,
 789  			Relays:      archiveRelays,
 790  			TimeoutSec:  archiveTimeoutSec,
 791  			CacheTTLHrs: archiveCacheTTLHrs,
 792  		}
 793  		l.archiveManager = archive.New(ctx, db, archiveCfg)
 794  		log.I.F("archive relay manager initialized with %d relays", len(archiveRelays))
 795  	}
 796  
 797  	// Build transport manager
 798  	l.transportMgr = transport.NewManager()
 799  
 800  	// Add Tor transport if enabled (can start before db is ready)
 801  	torEnabled, torPort, torDataDir, torBinary, torSOCKSPort := cfg.GetTorConfigValues()
 802  	if torEnabled {
 803  		tt := tortransport.New(&tortransport.Config{
 804  			Port:      torPort,
 805  			DataDir:   torDataDir,
 806  			Binary:    torBinary,
 807  			SOCKSPort: torSOCKSPort,
 808  			Handler:   l,
 809  		})
 810  		l.transportMgr.Add(tt)
 811  	}
 812  
 813  	// Start rate limiter if enabled
 814  	if limiter != nil && limiter.IsEnabled() {
 815  		limiter.Start()
 816  		log.I.F("adaptive rate limiter started")
 817  	}
 818  
 819  	// Wait for database to be ready before accepting requests
 820  	log.I.F("waiting for database warmup to complete...")
 821  	<-db.Ready()
 822  	log.I.F("database ready, starting transports")
 823  
 824  	// Add TLS or plain TCP transport (mutually exclusive)
 825  	if len(cfg.TLSDomains) > 0 {
 826  		l.transportMgr.Add(tlstransport.New(&tlstransport.Config{
 827  			Domains: cfg.TLSDomains,
 828  			Certs:   cfg.Certs,
 829  			DataDir: cfg.DataDir,
 830  			Handler: l,
 831  		}))
 832  	} else {
 833  		l.transportMgr.Add(tcp.New(&tcp.Config{
 834  			Addr:    fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port),
 835  			Handler: l,
 836  		}))
 837  	}
 838  
 839  	// Start all transports
 840  	if err := l.transportMgr.StartAll(ctx); err != nil {
 841  		log.E.F("transport startup failed: %v", err)
 842  	}
 843  
 844  	// Graceful shutdown handler
 845  	go func() {
 846  		<-ctx.Done()
 847  		log.I.F("shutting down servers gracefully")
 848  
 849  		// Stop spider manager if running
 850  		if l.spiderManager != nil {
 851  			l.spiderManager.Stop()
 852  			log.I.F("spider manager stopped")
 853  		}
 854  
 855  		// Stop directory spider if running
 856  		if l.directorySpider != nil {
 857  			l.directorySpider.Stop()
 858  			log.I.F("directory spider stopped")
 859  		}
 860  
 861  		// Stop corpus crawler if running
 862  		if l.corpusCrawler != nil {
 863  			l.corpusCrawler.Stop()
 864  			log.I.F("corpus crawler stopped")
 865  		}
 866  
 867  		// Stop rate limiter if running
 868  		if l.rateLimiter != nil && l.rateLimiter.IsEnabled() {
 869  			l.rateLimiter.Stop()
 870  			log.I.F("rate limiter stopped")
 871  		}
 872  
 873  		// Stop HTTP guard
 874  		if l.httpGuard != nil {
 875  			l.httpGuard.Stop()
 876  			log.I.F("HTTP guard stopped")
 877  		}
 878  
 879  		// Stop archive manager if running
 880  		if l.archiveManager != nil {
 881  			l.archiveManager.Stop()
 882  			log.I.F("archive manager stopped")
 883  		}
 884  
 885  		// Stop garbage collector if running
 886  		if l.garbageCollector != nil {
 887  			l.garbageCollector.Stop()
 888  			log.I.F("garbage collector stopped")
 889  		}
 890  
 891  		// Stop access tracker if running
 892  		if l.accessTracker != nil {
 893  			l.accessTracker.Stop()
 894  			log.I.F("access tracker stopped")
 895  		}
 896  
 897  		// Stop bunker server if running
 898  		if l.bunkerServer != nil {
 899  			l.bunkerServer.Stop()
 900  			log.I.F("bunker server stopped")
 901  		}
 902  
 903  		// Stop email bridge if running
 904  		if l.emailBridge != nil {
 905  			l.emailBridge.Stop()
 906  			log.I.F("email bridge stopped")
 907  		}
 908  
 909  		// Stop smesh server if running
 910  		if l.smeshServer != nil {
 911  			l.smeshServer.Stop()
 912  			log.I.F("smesh server stopped")
 913  		}
 914  
 915  		// Stop NRC bridge if running
 916  		if l.nrcBridge != nil {
 917  			l.nrcBridge.Stop()
 918  			log.I.F("NRC bridge stopped")
 919  		}
 920  
 921  		// Stop WireGuard server if running
 922  		if l.wireguardServer != nil {
 923  			l.wireguardServer.Stop()
 924  			log.I.F("WireGuard server stopped")
 925  		}
 926  
 927  		// Stop all transports (TCP/TLS/Tor)
 928  		if l.transportMgr != nil {
 929  			shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
 930  			defer cancelShutdown()
 931  			if err := l.transportMgr.StopAll(shutdownCtx); err != nil {
 932  				log.E.F("transport shutdown error: %v", err)
 933  			}
 934  		}
 935  
 936  		once.Do(func() { close(quit) })
 937  	}()
 938  
 939  	return
 940  }
 941  
 942  // parseNegentropyFullSyncPubkeys parses a comma-separated list of npubs or hex
 943  // pubkeys into a set of lowercase hex pubkey strings.
 944  func parseNegentropyFullSyncPubkeys(raw string) map[string]bool {
 945  	m := make(map[string]bool)
 946  	if raw == "" {
 947  		return m
 948  	}
 949  	for _, entry := range strings.Split(raw, ",") {
 950  		entry = strings.TrimSpace(entry)
 951  		if entry == "" {
 952  			continue
 953  		}
 954  		if strings.HasPrefix(entry, "npub1") {
 955  			_, decoded, err := bech32encoding.Decode([]byte(entry))
 956  			if err != nil {
 957  				log.W.F("ignoring invalid npub in ORLY_NEGENTROPY_FULL_SYNC_PUBKEYS: %s", entry)
 958  				continue
 959  			}
 960  			if pkBytes, ok := decoded.([]byte); ok {
 961  				m[hex.Enc(pkBytes)] = true
 962  			} else {
 963  				log.W.F("ignoring invalid npub in ORLY_NEGENTROPY_FULL_SYNC_PUBKEYS: %s", entry)
 964  			}
 965  		} else {
 966  			// Assume hex
 967  			entry = strings.ToLower(entry)
 968  			if len(entry) == 64 {
 969  				m[entry] = true
 970  			} else {
 971  				log.W.F("ignoring invalid pubkey in ORLY_NEGENTROPY_FULL_SYNC_PUBKEYS: %s", entry)
 972  			}
 973  		}
 974  	}
 975  	if len(m) > 0 {
 976  		log.I.F("negentropy full sync whitelist: %d pubkeys", len(m))
 977  	}
 978  	return m
 979  }
 980