main.go raw
1 package app
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "sync"
10 "time"
11
12 "github.com/adrg/xdg"
13 "next.orly.dev/pkg/lol/chk"
14 "next.orly.dev/pkg/lol/log"
15 "next.orly.dev/app/branding"
16 "next.orly.dev/app/config"
17 "next.orly.dev/pkg/acl"
18 "next.orly.dev/pkg/nostr/crypto/keys"
19 "next.orly.dev/pkg/database"
20 "next.orly.dev/pkg/nostr/encoders/bech32encoding"
21 "next.orly.dev/pkg/nostr/encoders/hex"
22 "next.orly.dev/pkg/neo4j"
23 "next.orly.dev/pkg/policy"
24 "next.orly.dev/pkg/protocol/graph"
25 "next.orly.dev/pkg/protocol/nip43"
26 "next.orly.dev/pkg/protocol/publish"
27 "next.orly.dev/pkg/bunker"
28 "next.orly.dev/pkg/protocol/nrc"
29 "next.orly.dev/pkg/ratelimit"
30 "next.orly.dev/pkg/crawler"
31 "next.orly.dev/pkg/grapevine"
32 "next.orly.dev/pkg/spider"
33 "next.orly.dev/pkg/storage"
34 dsync "next.orly.dev/pkg/sync"
35 "next.orly.dev/pkg/transport"
36 "next.orly.dev/pkg/transport/tcp"
37 tlstransport "next.orly.dev/pkg/transport/tls"
38 tortransport "next.orly.dev/pkg/transport/tor"
39 "next.orly.dev/pkg/wireguard"
40 "next.orly.dev/pkg/archive"
41 emailbridge "next.orly.dev/pkg/bridge"
42 "next.orly.dev/pkg/httpguard"
43
44 "next.orly.dev/pkg/nostr/interfaces/signer/p8k"
45 )
46
47 func Run(
48 ctx context.Context, cfg *config.C, db database.Database, limiter *ratelimit.Limiter,
49 ) (quit chan struct{}) {
50 quit = make(chan struct{})
51 var once sync.Once
52
53 // shutdown handler
54 go func() {
55 <-ctx.Done()
56 log.I.F("shutting down")
57 once.Do(func() { close(quit) })
58 }()
59 // get the admins
60 var err error
61 var adminKeys [][]byte
62 for _, admin := range cfg.Admins {
63 if len(admin) == 0 {
64 continue
65 }
66 var pk []byte
67 if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
68 continue
69 }
70 adminKeys = append(adminKeys, pk)
71 }
72 // get the owners
73 var ownerKeys [][]byte
74 for _, owner := range cfg.Owners {
75 if len(owner) == 0 {
76 continue
77 }
78 var pk []byte
79 if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(owner); chk.E(err) {
80 continue
81 }
82 ownerKeys = append(ownerKeys, pk)
83 }
84 // start listener
85 channelMembership := NewChannelMembership(db)
86 dmLimiter := NewDMRateLimiter(db)
87 wsPublisher := NewPublisher(ctx)
88 wsPublisher.ChannelMembership = channelMembership
89
90 l := &Server{
91 Ctx: ctx,
92 Config: cfg,
93 DB: db,
94 publishers: publish.New(wsPublisher),
95 Admins: adminKeys,
96 Owners: ownerKeys,
97 rateLimiter: limiter,
98 cfg: cfg,
99 db: db,
100 connPerIP: make(map[string]int),
101 aclRegistry: acl.Registry, // Inject ACL registry (transitional from global)
102 channelMembership: channelMembership,
103 dmRateLimiter: dmLimiter,
104 negentropyFullSyncPubkeys: parseNegentropyFullSyncPubkeys(cfg.NegentropyFullSyncPubkeys),
105 }
106
107 // Configure connection storm mitigation limits on the rate limiter
108 if limiter != nil {
109 limiter.SetConnectionLimits(
110 cfg.MaxGlobalConnections,
111 cfg.ConnectionDelayMaxMs,
112 cfg.GoroutineWarningCount,
113 cfg.GoroutineMaxCount,
114 )
115 }
116
117 // Initialize HTTP guard (bot blocking + per-IP rate limiting)
118 if cfg.HTTPGuardEnabled {
119 l.httpGuard = httpguard.New(httpguard.Config{
120 Enabled: true,
121 BotBlock: cfg.HTTPGuardBotBlock,
122 RPM: cfg.HTTPGuardRPM,
123 WSPerMin: cfg.HTTPGuardWSPerMin,
124 IPBlacklist: cfg.IPBlacklist,
125 })
126 }
127
128 // Initialize branding/white-label manager if enabled
129 if cfg.BrandingEnabled {
130 brandingDir := cfg.BrandingDir
131 if brandingDir == "" {
132 brandingDir = filepath.Join(xdg.ConfigHome, cfg.AppName, "branding")
133 }
134 if _, err := os.Stat(brandingDir); err == nil {
135 if l.brandingMgr, err = branding.New(brandingDir); err != nil {
136 log.W.F("failed to load branding from %s: %v", brandingDir, err)
137 } else {
138 log.I.F("custom branding loaded from %s", brandingDir)
139 }
140 }
141 }
142
143 // Initialize NIP-43 invite manager if enabled
144 if cfg.NIP43Enabled {
145 l.InviteManager = nip43.NewInviteManager(cfg.NIP43InviteExpiry)
146 log.I.F("NIP-43 invite system enabled with %v expiry", cfg.NIP43InviteExpiry)
147 }
148
149 // Initialize sprocket manager
150 l.sprocketManager = NewSprocketManager(ctx, cfg.AppName, cfg.SprocketEnabled)
151
152 // Initialize policy manager
153 l.policyManager = policy.NewWithManager(ctx, cfg.AppName, cfg.PolicyEnabled, cfg.PolicyPath)
154
155 // Merge policy-defined owners with environment-defined owners
156 // This allows cloud deployments to add owners via policy.json when env vars cannot be modified
157 if l.policyManager != nil {
158 policyOwners := l.policyManager.GetOwnersBin()
159 if len(policyOwners) > 0 {
160 // Deduplicate when merging
161 existingOwners := make(map[string]struct{})
162 for _, owner := range l.Owners {
163 existingOwners[string(owner)] = struct{}{}
164 }
165 for _, policyOwner := range policyOwners {
166 if _, exists := existingOwners[string(policyOwner)]; !exists {
167 l.Owners = append(l.Owners, policyOwner)
168 existingOwners[string(policyOwner)] = struct{}{}
169 }
170 }
171 log.I.F("merged %d policy-defined owners with %d environment-defined owners (total: %d unique owners)",
172 len(policyOwners), len(ownerKeys), len(l.Owners))
173 }
174 }
175
176 // Initialize policy follows from database (load follow lists of policy admins)
177 // This must be done after policy manager initialization but before accepting connections
178 if err := l.InitializePolicyFollows(); err != nil {
179 log.W.F("failed to initialize policy follows: %v", err)
180 // Continue anyway - follows can be loaded when admins update their follow lists
181 }
182
183 // Cleanup any kind 3 events that lost their p tags (only for Badger backend)
184 if badgerDB, ok := db.(*database.D); ok {
185 if err := badgerDB.CleanupKind3WithoutPTags(ctx); chk.E(err) {
186 log.E.F("failed to cleanup kind 3 events: %v", err)
187 }
188 }
189
190 // Initialize graph query executor (Badger backend) if enabled
191 if badgerDB, ok := db.(*database.D); ok && cfg.GraphQueriesEnabled {
192 // Get relay identity key for signing graph query responses
193 relaySecretKey, err := badgerDB.GetOrCreateRelayIdentitySecret()
194 if err != nil {
195 log.E.F("failed to get relay identity key for graph executor: %v", err)
196 } else {
197 // Create the graph adapter and executor
198 graphAdapter := database.NewGraphAdapter(badgerDB)
199 if l.graphExecutor, err = graph.NewExecutor(graphAdapter, relaySecretKey); err != nil {
200 log.E.F("failed to create graph executor: %v", err)
201 } else {
202 graphEnabled, maxDepth, maxResults, rateLimitRPM := cfg.GetGraphConfigValues()
203 log.I.F("graph query executor initialized (Badger backend, enabled=%v, max_depth=%d, max_results=%d, rate_limit=%d/min)",
204 graphEnabled, maxDepth, maxResults, rateLimitRPM)
205 }
206 }
207 }
208
209 // Initialize graph query executor (Neo4j backend) if enabled
210 if neo4jDB, ok := db.(*neo4j.N); ok && cfg.GraphQueriesEnabled {
211 // Get relay identity key for signing graph query responses
212 relaySecretKey, err := neo4jDB.GetOrCreateRelayIdentitySecret()
213 if err != nil {
214 log.E.F("failed to get relay identity key for graph executor: %v", err)
215 } else {
216 // Create the graph adapter and executor
217 graphAdapter := neo4j.NewGraphAdapter(neo4jDB)
218 if l.graphExecutor, err = graph.NewExecutor(graphAdapter, relaySecretKey); err != nil {
219 log.E.F("failed to create graph executor: %v", err)
220 } else {
221 graphEnabled, maxDepth, maxResults, rateLimitRPM := cfg.GetGraphConfigValues()
222 log.I.F("graph query executor initialized (Neo4j backend, enabled=%v, max_depth=%d, max_results=%d, rate_limit=%d/min)",
223 graphEnabled, maxDepth, maxResults, rateLimitRPM)
224 }
225 }
226 }
227
228 // Initialize GrapeVine WoT scoring engine (Badger backend only)
229 if cfg.GrapeVineEnabled {
230 if badgerDB, ok := db.(*database.D); ok {
231 gvStore := database.NewGrapeVineStore(badgerDB)
232 gvSource := grapevine.NewBadgerGraphSource(badgerDB)
233 gvCfg := grapevine.Config{
234 MaxDepth: cfg.GrapeVineMaxDepth,
235 Cycles: cfg.GrapeVineCycles,
236 AttenuationFactor: cfg.GrapeVineAttenuation,
237 Rigor: cfg.GrapeVineRigor,
238 FollowConfidence: cfg.GrapeVineFollowConf,
239 }
240 l.grapeVineEngine = grapevine.NewEngine(gvSource, gvStore, gvCfg)
241 l.grapeVineScheduler = grapevine.NewScheduler(l.grapeVineEngine, cfg.GrapeVineObservers, cfg.GrapeVineRefresh)
242 if len(cfg.GrapeVineObservers) > 0 {
243 go l.grapeVineScheduler.Start(ctx)
244 }
245 log.I.F("grapevine WoT scoring enabled (depth=%d, cycles=%d, observers=%d)",
246 cfg.GrapeVineMaxDepth, cfg.GrapeVineCycles, len(cfg.GrapeVineObservers))
247 } else {
248 log.W.F("grapevine enabled but database is not Badger — grapevine requires Badger graph indexes")
249 }
250 }
251
252 // Initialize NRC (Nostr Relay Connect) bridge if enabled
253 nrcEnabled, nrcRendezvousURL, nrcAuthorizedKeys, nrcSessionTimeout := cfg.GetNRCConfigValues()
254 if nrcEnabled && nrcRendezvousURL != "" {
255 // Get relay identity for signing NRC responses
256 relaySecretKey, err := db.GetOrCreateRelayIdentitySecret()
257 if err != nil {
258 log.E.F("failed to get relay identity for NRC bridge: %v", err)
259 } else {
260 // Create signer from secret key
261 relaySigner, sigErr := p8k.New()
262 if sigErr != nil {
263 log.E.F("failed to create signer for NRC bridge: %v", sigErr)
264 } else if sigErr = relaySigner.InitSec(relaySecretKey); sigErr != nil {
265 log.E.F("failed to init signer for NRC bridge: %v", sigErr)
266 } else {
267 // Parse authorized secrets (format: secret:name,secret:name,...)
268 authorizedSecrets := make(map[string]string)
269 for _, entry := range nrcAuthorizedKeys {
270 parts := strings.SplitN(entry, ":", 2)
271 if len(parts) >= 1 {
272 secretHex := parts[0]
273 name := ""
274 if len(parts) == 2 {
275 name = parts[1]
276 }
277 // Derive pubkey from secret
278 secretBytes, decErr := hex.Dec(secretHex)
279 if decErr != nil || len(secretBytes) != 32 {
280 log.W.F("NRC: skipping invalid secret key: %s", secretHex[:8])
281 continue
282 }
283 derivedSigner, signerErr := p8k.New()
284 if signerErr != nil {
285 log.W.F("NRC: failed to create signer: %v", signerErr)
286 continue
287 }
288 if signerErr = derivedSigner.InitSec(secretBytes); signerErr != nil {
289 log.W.F("NRC: failed to init signer: %v", signerErr)
290 continue
291 }
292 derivedPubkeyHex := string(hex.Enc(derivedSigner.Pub()))
293 authorizedSecrets[derivedPubkeyHex] = name
294 }
295 }
296
297 // Construct local relay URL
298 localRelayURL := fmt.Sprintf("ws://localhost:%d", cfg.Port)
299
300 // Create bridge config
301 bridgeConfig := &nrc.BridgeConfig{
302 RendezvousURL: nrcRendezvousURL,
303 LocalRelayURL: localRelayURL,
304 Signer: relaySigner,
305 AuthorizedSecrets: authorizedSecrets,
306 SessionTimeout: nrcSessionTimeout,
307 }
308
309 // Create event-based NRC store for dynamic connection management
310 // This works with any database backend
311 l.nrcEventStore = database.NewNRCEventStore(db, relaySigner)
312 bridgeConfig.Authorizer = database.NewNRCEventAuthorizer(l.nrcEventStore)
313 log.D.F("NRC bridge using event-based authorization")
314
315 // Create and start the bridge
316 l.nrcBridge = nrc.NewBridge(bridgeConfig)
317 if err := l.nrcBridge.Start(); err != nil {
318 log.E.F("failed to start NRC bridge: %v", err)
319 l.nrcBridge = nil
320 l.nrcEventStore = nil
321 } else {
322 log.I.F("NRC bridge started (rendezvous: %s, authorized: %d env, event-store: %v)",
323 nrcRendezvousURL, len(authorizedSecrets), l.nrcEventStore != nil)
324 }
325 }
326 }
327 }
328
329 // Initialize spider manager based on mode (only for Badger backend)
330 if badgerDB, ok := db.(*database.D); ok && cfg.SpiderMode != "none" {
331 if l.spiderManager, err = spider.New(ctx, badgerDB, l.publishers, cfg.SpiderMode); chk.E(err) {
332 log.E.F("failed to create spider manager: %v", err)
333 } else {
334 // Set up callbacks for follows mode
335 if cfg.SpiderMode == "follows" {
336 l.spiderManager.SetCallbacks(
337 func() []string {
338 // Get admin relays from follows ACL if available
339 for _, aclInstance := range acl.Registry.ACLs() {
340 if aclInstance.Type() == "follows" {
341 if follows, ok := aclInstance.(*acl.Follows); ok {
342 return follows.AdminRelays()
343 }
344 }
345 }
346 return nil
347 },
348 func() [][]byte {
349 // Get followed pubkeys from follows ACL if available
350 for _, aclInstance := range acl.Registry.ACLs() {
351 if aclInstance.Type() == "follows" {
352 if follows, ok := aclInstance.(*acl.Follows); ok {
353 return follows.GetFollowedPubkeys()
354 }
355 }
356 }
357 return nil
358 },
359 )
360 }
361
362 if err = l.spiderManager.Start(); chk.E(err) {
363 log.E.F("failed to start spider manager: %v", err)
364 } else {
365 log.I.F("spider manager started successfully in '%s' mode", cfg.SpiderMode)
366
367 // Hook up follow list update notifications from ACL to spider
368 if cfg.SpiderMode == "follows" {
369 for _, aclInstance := range acl.Registry.ACLs() {
370 if aclInstance.Type() == "follows" {
371 if follows, ok := aclInstance.(*acl.Follows); ok {
372 follows.SetFollowListUpdateCallback(func() {
373 log.I.F("follow list updated, notifying spider")
374 l.spiderManager.NotifyFollowListUpdate()
375 })
376 log.I.F("spider: follow list update notifications configured")
377 }
378 }
379 }
380 }
381 }
382 }
383 }
384
385 // Initialize directory spider if enabled (only for Badger backend)
386 if badgerDB, ok := db.(*database.D); ok && cfg.DirectorySpiderEnabled {
387 if l.directorySpider, err = spider.NewDirectorySpider(
388 ctx,
389 badgerDB,
390 l.publishers,
391 cfg.DirectorySpiderInterval,
392 cfg.DirectorySpiderMaxHops,
393 ); chk.E(err) {
394 log.E.F("failed to create directory spider: %v", err)
395 } else {
396 // Set up callback to get seed pubkeys (whitelisted users)
397 l.directorySpider.SetSeedCallback(func() [][]byte {
398 var pubkeys [][]byte
399 // Get followed pubkeys from follows ACL if available
400 for _, aclInstance := range acl.Registry.ACLs() {
401 if aclInstance.Type() == "follows" {
402 if follows, ok := aclInstance.(*acl.Follows); ok {
403 pubkeys = append(pubkeys, follows.GetFollowedPubkeys()...)
404 }
405 }
406 }
407 // Fall back to admin keys if no follows ACL
408 if len(pubkeys) == 0 {
409 pubkeys = adminKeys
410 }
411 return pubkeys
412 })
413
414 if err = l.directorySpider.Start(); chk.E(err) {
415 log.E.F("failed to start directory spider: %v", err)
416 } else {
417 log.I.F("directory spider started (interval: %v, max hops: %d)",
418 cfg.DirectorySpiderInterval, cfg.DirectorySpiderMaxHops)
419 }
420 }
421 }
422
423 // Initialize corpus crawler if enabled (works with any database backend)
424 if cfg.CrawlerEnabled {
425 crawlerCfg := crawler.DefaultConfig()
426 if cfg.CrawlerDiscoveryInterval > 0 {
427 crawlerCfg.DiscoveryInterval = cfg.CrawlerDiscoveryInterval
428 }
429 if cfg.CrawlerSyncInterval > 0 {
430 crawlerCfg.SyncInterval = cfg.CrawlerSyncInterval
431 }
432 if cfg.CrawlerMaxHops > 0 {
433 crawlerCfg.MaxHops = cfg.CrawlerMaxHops
434 }
435 if cfg.CrawlerConcurrency > 0 {
436 crawlerCfg.Concurrency = cfg.CrawlerConcurrency
437 }
438
439 if l.corpusCrawler, err = crawler.New(ctx, db, l.publishers, crawlerCfg); err != nil {
440 log.E.F("failed to create corpus crawler: %v", err)
441 } else {
442 l.corpusCrawler.SetSeedCallback(func() [][]byte {
443 var pubkeys [][]byte
444 for _, aclInstance := range acl.Registry.ACLs() {
445 if aclInstance.Type() == "follows" {
446 if follows, ok := aclInstance.(*acl.Follows); ok {
447 pubkeys = append(pubkeys, follows.GetFollowedPubkeys()...)
448 }
449 }
450 }
451 if len(pubkeys) == 0 {
452 pubkeys = adminKeys
453 }
454 return pubkeys
455 })
456
457 if err = l.corpusCrawler.Start(); err != nil {
458 log.E.F("failed to start corpus crawler: %v", err)
459 } else {
460 log.I.F("corpus crawler started (discovery: %v, sync: %v, hops: %d, concurrency: %d)",
461 crawlerCfg.DiscoveryInterval, crawlerCfg.SyncInterval,
462 crawlerCfg.MaxHops, crawlerCfg.Concurrency)
463 }
464 }
465 }
466
467 // Initialize relay group manager (only for Badger backend)
468 if badgerDB, ok := db.(*database.D); ok {
469 l.relayGroupMgr = dsync.NewRelayGroupManager(badgerDB, cfg.RelayGroupAdmins)
470 } else if cfg.SpiderMode != "none" || len(cfg.RelayPeers) > 0 || len(cfg.ClusterAdmins) > 0 {
471 log.I.Ln("spider, sync, and cluster features require Badger backend (currently using alternative backend)")
472 }
473
474 // Initialize sync manager if relay peers are configured (only for Badger backend)
475 if badgerDB, ok := db.(*database.D); ok {
476 var peers []string
477 if len(cfg.RelayPeers) > 0 {
478 peers = cfg.RelayPeers
479 } else {
480 // Try to get peers from relay group configuration
481 if l.relayGroupMgr != nil {
482 if config, err := l.relayGroupMgr.FindAuthoritativeConfig(ctx); err == nil && config != nil {
483 peers = config.Relays
484 log.I.F("using relay group configuration with %d peers", len(peers))
485 }
486 }
487 }
488
489 if len(peers) > 0 {
490 // Get relay identity for node ID
491 sk, err := db.GetOrCreateRelayIdentitySecret()
492 if err != nil {
493 log.E.F("failed to get relay identity for sync: %v", err)
494 } else {
495 nodeID, err := keys.SecretBytesToPubKeyHex(sk)
496 if err != nil {
497 log.E.F("failed to derive pubkey for sync node ID: %v", err)
498 } else {
499 relayURL := cfg.RelayURL
500 if relayURL == "" {
501 relayURL = fmt.Sprintf("http://localhost:%d", cfg.Port)
502 }
503 l.syncManager = dsync.NewManager(ctx, badgerDB, nodeID, relayURL, peers, l.relayGroupMgr, l.policyManager)
504 log.I.F("distributed sync manager initialized with %d peers", len(peers))
505 }
506 }
507 }
508 }
509
510 // Initialize cluster manager for cluster replication (only for Badger backend)
511 if badgerDB, ok := db.(*database.D); ok {
512 var clusterAdminNpubs []string
513 if len(cfg.ClusterAdmins) > 0 {
514 clusterAdminNpubs = cfg.ClusterAdmins
515 } else {
516 // Default to regular admins if no cluster admins specified
517 for _, admin := range cfg.Admins {
518 clusterAdminNpubs = append(clusterAdminNpubs, admin)
519 }
520 }
521
522 if len(clusterAdminNpubs) > 0 {
523 l.clusterManager = dsync.NewClusterManager(ctx, badgerDB, clusterAdminNpubs, cfg.ClusterPropagatePrivilegedEvents, l.publishers)
524 l.clusterManager.Start()
525 log.I.F("cluster replication manager initialized with %d admin npubs", len(clusterAdminNpubs))
526 }
527 }
528
529 // Initialize Blossom blob storage server
530 // Now works with any database backend that implements blob storage methods.
531 // MUST be done before UserInterface() which registers routes.
532 if cfg.BlossomEnabled {
533 log.I.F("initializing Blossom server...")
534 if l.blossomServer, err = initializeBlossomServer(ctx, cfg, db); err != nil {
535 log.E.F("failed to initialize blossom server: %v", err)
536 // Continue without blossom server
537 } else if l.blossomServer != nil {
538 log.I.F("blossom blob storage server initialized")
539 } else {
540 log.W.F("blossom server initialization returned nil without error")
541 }
542 } else {
543 log.I.F("Blossom server disabled via ORLY_BLOSSOM_ENABLED=false")
544 }
545
546 // Initialize Nostr-Email bridge if enabled
547 bridgeEnabled, bridgeDomain, bridgeNSEC, bridgeRelayURL,
548 bridgeSMTPPort, bridgeSMTPHost, bridgeDataDir,
549 bridgeDKIMKeyPath, bridgeDKIMSelector,
550 bridgeNWCURI, bridgeMonthlyPriceSats, bridgeComposeURL,
551 bridgeSMTPRelayHost, bridgeSMTPRelayPort,
552 bridgeSMTPRelayUsername, bridgeSMTPRelayPassword,
553 bridgeACLGRPCServer, bridgeAliasPriceSats, bridgeProfilePath := cfg.GetBridgeConfigValues()
554
555 if bridgeEnabled {
556 bridgeCfg := &emailbridge.Config{
557 Domain: bridgeDomain,
558 NSEC: bridgeNSEC,
559 RelayURL: bridgeRelayURL,
560 SMTPPort: bridgeSMTPPort,
561 SMTPHost: bridgeSMTPHost,
562 DataDir: bridgeDataDir,
563 DKIMKeyPath: bridgeDKIMKeyPath,
564 DKIMSelector: bridgeDKIMSelector,
565 NWCURI: bridgeNWCURI,
566 MonthlyPriceSats: bridgeMonthlyPriceSats,
567 ComposeURL: bridgeComposeURL,
568 SMTPRelayHost: bridgeSMTPRelayHost,
569 SMTPRelayPort: bridgeSMTPRelayPort,
570 SMTPRelayUsername: bridgeSMTPRelayUsername,
571 SMTPRelayPassword: bridgeSMTPRelayPassword,
572 ACLGRPCServer: bridgeACLGRPCServer,
573 AliasPriceSats: bridgeAliasPriceSats,
574 ProfilePath: bridgeProfilePath,
575 }
576
577 // In monolithic mode, provide a database getter for identity resolution
578 dbGetter := func() ([]byte, error) {
579 return db.GetOrCreateRelayIdentitySecret()
580 }
581
582 l.emailBridge = emailbridge.New(bridgeCfg, dbGetter)
583 if err := l.emailBridge.Start(ctx); err != nil {
584 log.E.F("failed to start email bridge: %v", err)
585 l.emailBridge = nil
586 } else {
587 log.I.F("email bridge started (domain: %s, SMTP: %s:%d)",
588 bridgeDomain, bridgeSMTPHost, bridgeSMTPPort)
589 }
590 }
591
592 // Initialize WireGuard VPN and NIP-46 Bunker (only for Badger backend)
593 // Requires ACL mode 'follows' or 'managed' - no point for open relays
594 if badgerDB, ok := db.(*database.D); ok && cfg.WGEnabled && cfg.ACLMode != "none" {
595 if cfg.WGEndpoint == "" {
596 log.E.F("WireGuard enabled but ORLY_WG_ENDPOINT not set - skipping")
597 } else {
598 // Get or create the subnet pool (restores seed and allocations from DB)
599 subnetPool, err := badgerDB.GetOrCreateSubnetPool(cfg.WGNetwork)
600 if err != nil {
601 log.E.F("failed to create subnet pool: %v", err)
602 } else {
603 l.subnetPool = subnetPool
604
605 // Get or create WireGuard server key
606 wgServerKey, err := badgerDB.GetOrCreateWireGuardServerKey()
607 if err != nil {
608 log.E.F("failed to get WireGuard server key: %v", err)
609 } else {
610 // Create WireGuard server
611 wgConfig := &wireguard.Config{
612 Port: cfg.WGPort,
613 Endpoint: cfg.WGEndpoint,
614 PrivateKey: wgServerKey,
615 Network: cfg.WGNetwork,
616 ServerIP: "10.73.0.1",
617 }
618
619 l.wireguardServer, err = wireguard.New(wgConfig)
620 if err != nil {
621 log.E.F("failed to create WireGuard server: %v", err)
622 } else {
623 if err = l.wireguardServer.Start(); err != nil {
624 log.E.F("failed to start WireGuard server: %v", err)
625 } else {
626 log.I.F("WireGuard VPN server started on UDP port %d", cfg.WGPort)
627
628 // Load existing peers from database and add to server
629 peers, err := badgerDB.GetAllWireGuardPeers()
630 if err != nil {
631 log.W.F("failed to load existing WireGuard peers: %v", err)
632 } else {
633 for _, peer := range peers {
634 // Derive client IP from sequence
635 subnet := subnetPool.SubnetForSequence(peer.Sequence)
636 clientIP := subnet.ClientIP.String()
637 if err := l.wireguardServer.AddPeer(peer.NostrPubkey, peer.WGPublicKey, clientIP); err != nil {
638 log.W.F("failed to add existing peer: %v", err)
639 }
640 }
641 if len(peers) > 0 {
642 log.I.F("loaded %d existing WireGuard peers", len(peers))
643 }
644 }
645
646 // Initialize bunker if enabled
647 if cfg.BunkerEnabled {
648 // Get relay identity for signing
649 relaySecretKey, err := badgerDB.GetOrCreateRelayIdentitySecret()
650 if err != nil {
651 log.E.F("failed to get relay identity for bunker: %v", err)
652 } else {
653 // Create signer from secret key
654 relaySigner, sigErr := p8k.New()
655 if sigErr != nil {
656 log.E.F("failed to create signer for bunker: %v", sigErr)
657 } else if sigErr = relaySigner.InitSec(relaySecretKey); sigErr != nil {
658 log.E.F("failed to init signer for bunker: %v", sigErr)
659 } else {
660 relayPubkey := relaySigner.Pub()
661
662 bunkerConfig := &bunker.Config{
663 RelaySigner: relaySigner,
664 RelayPubkey: relayPubkey[:],
665 Netstack: l.wireguardServer.GetNetstack(),
666 ListenAddr: fmt.Sprintf("10.73.0.1:%d", cfg.BunkerPort),
667 }
668
669 l.bunkerServer = bunker.New(bunkerConfig)
670 if err = l.bunkerServer.Start(); err != nil {
671 log.E.F("failed to start bunker server: %v", err)
672 } else {
673 log.I.F("NIP-46 bunker server started on 10.73.0.1:%d (WireGuard only)", cfg.BunkerPort)
674 }
675 }
676 }
677 }
678 }
679 }
680 }
681 }
682 }
683 } else if cfg.WGEnabled && cfg.ACLMode == "none" {
684 log.I.F("WireGuard disabled: requires ACL mode 'follows' or 'managed' (currently: 'none')")
685 }
686
687 // Initialize event domain services (validation, routing, processing)
688 l.InitEventServices()
689
690 // Initialize the user interface (registers routes)
691 l.UserInterface()
692
693 // Start embedded Smesh web client if enabled
694 if cfg.SmeshEnabled && cfg.SmeshPort > 0 {
695 l.smeshServer = NewSmeshServer(cfg.SmeshPort)
696 if err := l.smeshServer.Start(ctx); err != nil {
697 log.E.F("failed to start smesh server: %v", err)
698 l.smeshServer = nil
699 }
700 }
701
702 // Ensure a relay identity secret key exists when subscriptions and NWC are enabled
703 if cfg.SubscriptionEnabled && cfg.NWCUri != "" {
704 if skb, e := db.GetOrCreateRelayIdentitySecret(); e != nil {
705 log.E.F("failed to ensure relay identity key: %v", e)
706 } else if pk, e2 := keys.SecretBytesToPubKeyHex(skb); e2 == nil {
707 log.I.F("relay identity loaded (pub=%s)", pk)
708 // ensure relay identity pubkey is considered an admin for ACL follows mode
709 found := false
710 for _, a := range cfg.Admins {
711 if a == pk {
712 found = true
713 break
714 }
715 }
716 if !found {
717 cfg.Admins = append(cfg.Admins, pk)
718 log.I.F("added relay identity to admins for follow-list whitelisting")
719 }
720 // also ensure relay identity pubkey is considered an owner for full control
721 found = false
722 for _, o := range cfg.Owners {
723 if o == pk {
724 found = true
725 break
726 }
727 }
728 if !found {
729 cfg.Owners = append(cfg.Owners, pk)
730 log.I.F("added relay identity to owners for full control")
731 }
732 }
733 }
734
735 // Initialize payment processor (only for Badger backend)
736 if badgerDB, ok := db.(*database.D); ok {
737 if l.paymentProcessor, err = NewPaymentProcessor(ctx, cfg, badgerDB); err != nil {
738 // log.E.F("failed to create payment processor: %v", err)
739 // Continue without payment processor
740 } else {
741 if err = l.paymentProcessor.Start(); err != nil {
742 log.E.F("failed to start payment processor: %v", err)
743 } else {
744 log.I.F("payment processor started successfully")
745 }
746 }
747 }
748
749 // Initialize access tracker for storage management (only for Badger backend)
750 if badgerDB, ok := db.(*database.D); ok {
751 l.accessTracker = storage.NewAccessTracker(badgerDB, 100000) // 100k dedup cache
752 l.accessTracker.Start()
753 log.I.F("access tracker initialized")
754
755 // Initialize garbage collector if enabled
756 maxBytes, gcEnabled, gcIntervalSec, gcBatchSize := cfg.GetStorageConfigValues()
757 if gcEnabled {
758 gcCfg := storage.GCConfig{
759 MaxStorageBytes: maxBytes,
760 Interval: time.Duration(gcIntervalSec) * time.Second,
761 BatchSize: gcBatchSize,
762 MinAgeSec: 3600, // Minimum 1 hour before eviction
763 }
764
765 // Wire WoT provider from social ACL if active
766 if acl.Registry.GetMode() == "social" {
767 for _, aclInstance := range acl.Registry.ACLs() {
768 if social, ok := aclInstance.(*acl.Social); ok {
769 wotMap := social.GetWoTDepthMap()
770 gcCfg.WoTProvider = wotMap
771 gcCfg.AuthorLookup = badgerDB
772 log.I.F("garbage collector: WoT-weighted eviction enabled via social ACL")
773 break
774 }
775 }
776 }
777
778 l.garbageCollector = storage.NewGarbageCollector(ctx, badgerDB, l.accessTracker, gcCfg)
779 l.garbageCollector.Start()
780 log.I.F("garbage collector started (interval: %ds, batch: %d)", gcIntervalSec, gcBatchSize)
781 }
782 }
783
784 // Initialize archive relay manager if enabled
785 archiveEnabled, archiveRelays, archiveTimeoutSec, archiveCacheTTLHrs := cfg.GetArchiveConfigValues()
786 if archiveEnabled && len(archiveRelays) > 0 {
787 archiveCfg := archive.Config{
788 Enabled: true,
789 Relays: archiveRelays,
790 TimeoutSec: archiveTimeoutSec,
791 CacheTTLHrs: archiveCacheTTLHrs,
792 }
793 l.archiveManager = archive.New(ctx, db, archiveCfg)
794 log.I.F("archive relay manager initialized with %d relays", len(archiveRelays))
795 }
796
797 // Build transport manager
798 l.transportMgr = transport.NewManager()
799
800 // Add Tor transport if enabled (can start before db is ready)
801 torEnabled, torPort, torDataDir, torBinary, torSOCKSPort := cfg.GetTorConfigValues()
802 if torEnabled {
803 tt := tortransport.New(&tortransport.Config{
804 Port: torPort,
805 DataDir: torDataDir,
806 Binary: torBinary,
807 SOCKSPort: torSOCKSPort,
808 Handler: l,
809 })
810 l.transportMgr.Add(tt)
811 }
812
813 // Start rate limiter if enabled
814 if limiter != nil && limiter.IsEnabled() {
815 limiter.Start()
816 log.I.F("adaptive rate limiter started")
817 }
818
819 // Wait for database to be ready before accepting requests
820 log.I.F("waiting for database warmup to complete...")
821 <-db.Ready()
822 log.I.F("database ready, starting transports")
823
824 // Add TLS or plain TCP transport (mutually exclusive)
825 if len(cfg.TLSDomains) > 0 {
826 l.transportMgr.Add(tlstransport.New(&tlstransport.Config{
827 Domains: cfg.TLSDomains,
828 Certs: cfg.Certs,
829 DataDir: cfg.DataDir,
830 Handler: l,
831 }))
832 } else {
833 l.transportMgr.Add(tcp.New(&tcp.Config{
834 Addr: fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port),
835 Handler: l,
836 }))
837 }
838
839 // Start all transports
840 if err := l.transportMgr.StartAll(ctx); err != nil {
841 log.E.F("transport startup failed: %v", err)
842 }
843
844 // Graceful shutdown handler
845 go func() {
846 <-ctx.Done()
847 log.I.F("shutting down servers gracefully")
848
849 // Stop spider manager if running
850 if l.spiderManager != nil {
851 l.spiderManager.Stop()
852 log.I.F("spider manager stopped")
853 }
854
855 // Stop directory spider if running
856 if l.directorySpider != nil {
857 l.directorySpider.Stop()
858 log.I.F("directory spider stopped")
859 }
860
861 // Stop corpus crawler if running
862 if l.corpusCrawler != nil {
863 l.corpusCrawler.Stop()
864 log.I.F("corpus crawler stopped")
865 }
866
867 // Stop rate limiter if running
868 if l.rateLimiter != nil && l.rateLimiter.IsEnabled() {
869 l.rateLimiter.Stop()
870 log.I.F("rate limiter stopped")
871 }
872
873 // Stop HTTP guard
874 if l.httpGuard != nil {
875 l.httpGuard.Stop()
876 log.I.F("HTTP guard stopped")
877 }
878
879 // Stop archive manager if running
880 if l.archiveManager != nil {
881 l.archiveManager.Stop()
882 log.I.F("archive manager stopped")
883 }
884
885 // Stop garbage collector if running
886 if l.garbageCollector != nil {
887 l.garbageCollector.Stop()
888 log.I.F("garbage collector stopped")
889 }
890
891 // Stop access tracker if running
892 if l.accessTracker != nil {
893 l.accessTracker.Stop()
894 log.I.F("access tracker stopped")
895 }
896
897 // Stop bunker server if running
898 if l.bunkerServer != nil {
899 l.bunkerServer.Stop()
900 log.I.F("bunker server stopped")
901 }
902
903 // Stop email bridge if running
904 if l.emailBridge != nil {
905 l.emailBridge.Stop()
906 log.I.F("email bridge stopped")
907 }
908
909 // Stop smesh server if running
910 if l.smeshServer != nil {
911 l.smeshServer.Stop()
912 log.I.F("smesh server stopped")
913 }
914
915 // Stop NRC bridge if running
916 if l.nrcBridge != nil {
917 l.nrcBridge.Stop()
918 log.I.F("NRC bridge stopped")
919 }
920
921 // Stop WireGuard server if running
922 if l.wireguardServer != nil {
923 l.wireguardServer.Stop()
924 log.I.F("WireGuard server stopped")
925 }
926
927 // Stop all transports (TCP/TLS/Tor)
928 if l.transportMgr != nil {
929 shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
930 defer cancelShutdown()
931 if err := l.transportMgr.StopAll(shutdownCtx); err != nil {
932 log.E.F("transport shutdown error: %v", err)
933 }
934 }
935
936 once.Do(func() { close(quit) })
937 }()
938
939 return
940 }
941
942 // parseNegentropyFullSyncPubkeys parses a comma-separated list of npubs or hex
943 // pubkeys into a set of lowercase hex pubkey strings.
944 func parseNegentropyFullSyncPubkeys(raw string) map[string]bool {
945 m := make(map[string]bool)
946 if raw == "" {
947 return m
948 }
949 for _, entry := range strings.Split(raw, ",") {
950 entry = strings.TrimSpace(entry)
951 if entry == "" {
952 continue
953 }
954 if strings.HasPrefix(entry, "npub1") {
955 _, decoded, err := bech32encoding.Decode([]byte(entry))
956 if err != nil {
957 log.W.F("ignoring invalid npub in ORLY_NEGENTROPY_FULL_SYNC_PUBKEYS: %s", entry)
958 continue
959 }
960 if pkBytes, ok := decoded.([]byte); ok {
961 m[hex.Enc(pkBytes)] = true
962 } else {
963 log.W.F("ignoring invalid npub in ORLY_NEGENTROPY_FULL_SYNC_PUBKEYS: %s", entry)
964 }
965 } else {
966 // Assume hex
967 entry = strings.ToLower(entry)
968 if len(entry) == 64 {
969 m[entry] = true
970 } else {
971 log.W.F("ignoring invalid pubkey in ORLY_NEGENTROPY_FULL_SYNC_PUBKEYS: %s", entry)
972 }
973 }
974 }
975 if len(m) > 0 {
976 log.I.F("negentropy full sync whitelist: %d pubkeys", len(m))
977 }
978 return m
979 }
980