startup.go raw

   1  //go:build !(js && wasm)
   2  
   3  // Package relay provides shared startup logic for running the ORLY relay.
   4  // This allows both the root binary and the unified binary to share
   5  // the same initialization code for monolithic deployments.
   6  package relay
   7  
   8  import (
   9  	"context"
  10  	"fmt"
  11  	"net/http"
  12  	pp "net/http/pprof"
  13  	"os"
  14  	"os/signal"
  15  	"runtime"
  16  	"runtime/debug"
  17  	"sync"
  18  	"syscall"
  19  	"time"
  20  
  21  	"github.com/pkg/profile"
  22  	"next.orly.dev/pkg/lol/chk"
  23  	"next.orly.dev/pkg/lol/log"
  24  	"next.orly.dev/app"
  25  	"next.orly.dev/app/config"
  26  	"next.orly.dev/pkg/acl"
  27  	aclgrpc "next.orly.dev/pkg/acl/grpc"
  28  	"next.orly.dev/pkg/database"
  29  	_ "next.orly.dev/pkg/database/grpc" // Import for grpc factory registration
  30  	neo4jdb "next.orly.dev/pkg/neo4j"
  31  	"next.orly.dev/pkg/ratelimit"
  32  	"next.orly.dev/pkg/sync/negentropy"
  33  	negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc"
  34  	"next.orly.dev/pkg/utils/interrupt"
  35  )
  36  
  37  // StartupResult holds the initialized components from Startup.
  38  type StartupResult struct {
  39  	Ctx     context.Context
  40  	Cancel  context.CancelFunc
  41  	DB      database.Database
  42  	Limiter *ratelimit.Limiter
  43  	Quit    chan struct{}
  44  }
  45  
  46  // Startup initializes the database, ACL, rate limiter, and starts the relay server.
  47  // It returns a StartupResult containing all initialized components.
  48  func Startup(cfg *config.C) (*StartupResult, error) {
  49  	runtime.GOMAXPROCS(128)
  50  	debug.SetGCPercent(10)
  51  
  52  	// Setup profiling
  53  	profileStop := setupProfiling(cfg)
  54  
  55  	ctx, cancel := context.WithCancel(context.Background())
  56  
  57  	// Initialize database
  58  	log.I.F("initializing %s database at %s", cfg.DBType, cfg.DataDir)
  59  	db, err := database.NewDatabaseWithConfig(
  60  		ctx, cancel, cfg.DBType, MakeDatabaseConfig(cfg),
  61  	)
  62  	if chk.E(err) {
  63  		cancel()
  64  		return nil, fmt.Errorf("failed to initialize database: %w", err)
  65  	}
  66  	log.I.F("%s database initialized successfully", cfg.DBType)
  67  
  68  	// Initialize ACL
  69  	if err := initializeACL(ctx, cfg, db); err != nil {
  70  		db.Close()
  71  		cancel()
  72  		return nil, fmt.Errorf("failed to initialize ACL: %w", err)
  73  	}
  74  
  75  	// Initialize negentropy handler (embedded or gRPC client)
  76  	initializeNegentropy(ctx, cfg, db)
  77  
  78  	// Create rate limiter
  79  	limiter := createRateLimiter(cfg, db)
  80  
  81  	// Start pprof HTTP server if enabled
  82  	startPprofServer(ctx, cfg)
  83  
  84  	// Start health check server if configured
  85  	startHealthServer(ctx, cfg)
  86  
  87  	// Start the relay
  88  	quit := app.Run(ctx, cfg, db, limiter)
  89  
  90  	// Store profileStop for cleanup
  91  	result := &StartupResult{
  92  		Ctx:     ctx,
  93  		Cancel:  cancel,
  94  		DB:      db,
  95  		Limiter: limiter,
  96  		Quit:    quit,
  97  	}
  98  
  99  	// Register profile stop handler
 100  	interrupt.AddHandler(func() {
 101  		log.I.F("interrupt received: stopping profiling")
 102  		profileStop()
 103  	})
 104  
 105  	return result, nil
 106  }
 107  
 108  // RunWithSignals calls Startup and blocks on signals, handling graceful shutdown.
 109  // This is the main entry point for running the relay as a standalone process.
 110  func RunWithSignals(cfg *config.C) error {
 111  	result, err := Startup(cfg)
 112  	if err != nil {
 113  		return err
 114  	}
 115  
 116  	sigs := make(chan os.Signal, 1)
 117  	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
 118  
 119  	for {
 120  		select {
 121  		case <-sigs:
 122  			fmt.Printf("\r")
 123  			log.I.F("received shutdown signal, starting graceful shutdown")
 124  			result.Cancel()
 125  			<-result.Quit
 126  			chk.E(result.DB.Close())
 127  			log.I.F("exiting")
 128  			return nil
 129  		case <-result.Quit:
 130  			log.I.F("application quit signal received")
 131  			result.Cancel()
 132  			chk.E(result.DB.Close())
 133  			log.I.F("exiting")
 134  			return nil
 135  		}
 136  	}
 137  }
 138  
 139  // setupProfiling configures profiling based on config and returns a stop function.
 140  func setupProfiling(cfg *config.C) func() {
 141  	var profileStopOnce sync.Once
 142  	profileStop := func() {}
 143  
 144  	switch cfg.Pprof {
 145  	case "cpu":
 146  		var prof interface{ Stop() }
 147  		if cfg.PprofPath != "" {
 148  			prof = profile.Start(profile.CPUProfile, profile.ProfilePath(cfg.PprofPath))
 149  		} else {
 150  			prof = profile.Start(profile.CPUProfile)
 151  		}
 152  		profileStop = func() {
 153  			profileStopOnce.Do(func() {
 154  				prof.Stop()
 155  				log.I.F("cpu profiling stopped and flushed")
 156  			})
 157  		}
 158  
 159  	case "memory":
 160  		var prof interface{ Stop() }
 161  		if cfg.PprofPath != "" {
 162  			prof = profile.Start(profile.MemProfile, profile.MemProfileRate(32), profile.ProfilePath(cfg.PprofPath))
 163  		} else {
 164  			prof = profile.Start(profile.MemProfile)
 165  		}
 166  		profileStop = func() {
 167  			profileStopOnce.Do(func() {
 168  				prof.Stop()
 169  				log.I.F("memory profiling stopped and flushed")
 170  			})
 171  		}
 172  
 173  	case "allocation":
 174  		var prof interface{ Stop() }
 175  		if cfg.PprofPath != "" {
 176  			prof = profile.Start(profile.MemProfileAllocs, profile.MemProfileRate(32), profile.ProfilePath(cfg.PprofPath))
 177  		} else {
 178  			prof = profile.Start(profile.MemProfileAllocs)
 179  		}
 180  		profileStop = func() {
 181  			profileStopOnce.Do(func() {
 182  				prof.Stop()
 183  				log.I.F("allocation profiling stopped and flushed")
 184  			})
 185  		}
 186  
 187  	case "heap":
 188  		var prof interface{ Stop() }
 189  		if cfg.PprofPath != "" {
 190  			prof = profile.Start(profile.MemProfileHeap, profile.ProfilePath(cfg.PprofPath))
 191  		} else {
 192  			prof = profile.Start(profile.MemProfileHeap)
 193  		}
 194  		profileStop = func() {
 195  			profileStopOnce.Do(func() {
 196  				prof.Stop()
 197  				log.I.F("heap profiling stopped and flushed")
 198  			})
 199  		}
 200  
 201  	case "mutex":
 202  		var prof interface{ Stop() }
 203  		if cfg.PprofPath != "" {
 204  			prof = profile.Start(profile.MutexProfile, profile.ProfilePath(cfg.PprofPath))
 205  		} else {
 206  			prof = profile.Start(profile.MutexProfile)
 207  		}
 208  		profileStop = func() {
 209  			profileStopOnce.Do(func() {
 210  				prof.Stop()
 211  				log.I.F("mutex profiling stopped and flushed")
 212  			})
 213  		}
 214  
 215  	case "threadcreate":
 216  		var prof interface{ Stop() }
 217  		if cfg.PprofPath != "" {
 218  			prof = profile.Start(profile.ThreadcreationProfile, profile.ProfilePath(cfg.PprofPath))
 219  		} else {
 220  			prof = profile.Start(profile.ThreadcreationProfile)
 221  		}
 222  		profileStop = func() {
 223  			profileStopOnce.Do(func() {
 224  				prof.Stop()
 225  				log.I.F("threadcreate profiling stopped and flushed")
 226  			})
 227  		}
 228  
 229  	case "goroutine":
 230  		var prof interface{ Stop() }
 231  		if cfg.PprofPath != "" {
 232  			prof = profile.Start(profile.GoroutineProfile, profile.ProfilePath(cfg.PprofPath))
 233  		} else {
 234  			prof = profile.Start(profile.GoroutineProfile)
 235  		}
 236  		profileStop = func() {
 237  			profileStopOnce.Do(func() {
 238  				prof.Stop()
 239  				log.I.F("goroutine profiling stopped and flushed")
 240  			})
 241  		}
 242  
 243  	case "block":
 244  		var prof interface{ Stop() }
 245  		if cfg.PprofPath != "" {
 246  			prof = profile.Start(profile.BlockProfile, profile.ProfilePath(cfg.PprofPath))
 247  		} else {
 248  			prof = profile.Start(profile.BlockProfile)
 249  		}
 250  		profileStop = func() {
 251  			profileStopOnce.Do(func() {
 252  				prof.Stop()
 253  				log.I.F("block profiling stopped and flushed")
 254  			})
 255  		}
 256  	}
 257  
 258  	return profileStop
 259  }
 260  
 261  // initializeACL sets up ACL - either remote gRPC or in-process.
 262  func initializeACL(ctx context.Context, cfg *config.C, db database.Database) error {
 263  	aclType, aclServerAddr, aclConnTimeout := cfg.GetGRPCACLConfigValues()
 264  
 265  	if aclType == "grpc" {
 266  		// Use remote ACL server via gRPC
 267  		log.I.F("connecting to gRPC ACL server at %s", aclServerAddr)
 268  		aclClient, err := aclgrpc.New(ctx, &aclgrpc.ClientConfig{
 269  			ServerAddress:  aclServerAddr,
 270  			ConnectTimeout: aclConnTimeout,
 271  		})
 272  		if chk.E(err) {
 273  			return fmt.Errorf("failed to connect to gRPC ACL server: %w", err)
 274  		}
 275  
 276  		// Wait for ACL server to be ready
 277  		select {
 278  		case <-aclClient.Ready():
 279  			log.I.F("gRPC ACL client connected, mode: %s", aclClient.Type())
 280  		case <-time.After(30 * time.Second):
 281  			return fmt.Errorf("timeout waiting for gRPC ACL server")
 282  		}
 283  
 284  		// Register and activate the gRPC client as the ACL backend
 285  		acl.Registry.RegisterAndActivate(aclClient)
 286  	} else {
 287  		// Use in-process ACL
 288  		acl.Registry.SetMode(cfg.ACLMode)
 289  		if err := acl.Registry.Configure(cfg, db, ctx); chk.E(err) {
 290  			return err
 291  		}
 292  		acl.Registry.Syncer()
 293  	}
 294  
 295  	return nil
 296  }
 297  
 298  // initializeNegentropy sets up negentropy handling (embedded or gRPC client).
 299  func initializeNegentropy(ctx context.Context, cfg *config.C, db database.Database) {
 300  	syncType, _, _, _, negentropyAddr, syncTimeout, negentropyEnabled := cfg.GetGRPCSyncConfigValues()
 301  
 302  	if !negentropyEnabled {
 303  		log.I.F("negentropy NIP-77 disabled (set ORLY_NEGENTROPY_ENABLED=true to enable)")
 304  		return
 305  	}
 306  
 307  	if syncType == "grpc" && negentropyAddr != "" {
 308  		// Use gRPC client to connect to remote negentropy server
 309  		log.I.F("connecting to gRPC negentropy server at %s", negentropyAddr)
 310  		negClient, err := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{
 311  			ServerAddress:  negentropyAddr,
 312  			ConnectTimeout: syncTimeout,
 313  		})
 314  		if err != nil {
 315  			log.W.F("failed to connect to gRPC negentropy server at %s: %v — falling back to embedded handler", negentropyAddr, err)
 316  		} else {
 317  			// Wait for negentropy server to be ready
 318  			select {
 319  			case <-negClient.Ready():
 320  				log.I.F("gRPC negentropy client connected")
 321  				app.SetNegentropyHandler(negClient)
 322  				return
 323  			case <-time.After(30 * time.Second):
 324  				log.W.F("timeout waiting for gRPC negentropy server at %s — falling back to embedded handler", negentropyAddr)
 325  				negClient.Close()
 326  			}
 327  		}
 328  	}
 329  
 330  	// Embedded negentropy handler — used in standalone mode or as fallback
 331  	// when gRPC connection fails, so NIP-77 is never silently disabled.
 332  	log.I.F("initializing embedded negentropy handler")
 333  	negHandler := negentropy.NewEmbeddedHandler(db, &negentropy.Config{
 334  		SyncInterval:         60 * time.Second,
 335  		FrameSize:            128 * 1024,
 336  		IDSize:               16,
 337  		ClientSessionTimeout: 5 * time.Minute,
 338  	})
 339  	negHandler.Start()
 340  	app.SetNegentropyHandler(negHandler)
 341  	log.I.F("embedded negentropy handler initialized (NIP-77 enabled)")
 342  }
 343  
 344  // createRateLimiter creates and configures the rate limiter.
 345  func createRateLimiter(cfg *config.C, db database.Database) *ratelimit.Limiter {
 346  	rateLimitEnabled, targetMB,
 347  		writeKp, writeKi, writeKd,
 348  		readKp, readKi, readKd,
 349  		maxWriteMs, maxReadMs,
 350  		writeTarget, readTarget,
 351  		emergencyThreshold, recoveryThreshold,
 352  		emergencyMaxMs := cfg.GetRateLimitConfigValues()
 353  
 354  	if !rateLimitEnabled {
 355  		return ratelimit.NewDisabledLimiter()
 356  	}
 357  
 358  	// Auto-detect memory target if set to 0
 359  	if targetMB == 0 {
 360  		var err error
 361  		targetMB, err = ratelimit.CalculateTargetMemoryMB(targetMB)
 362  		if err != nil {
 363  			log.F.F("FATAL: %v", err)
 364  			log.F.F("There is not enough memory to run this relay in this environment.")
 365  			log.F.F("Available: %dMB, Required minimum: %dMB",
 366  				ratelimit.DetectAvailableMemoryMB(), ratelimit.MinimumMemoryMB)
 367  			os.Exit(1)
 368  		}
 369  		stats := ratelimit.GetMemoryStats(targetMB)
 370  		calculated66 := int(float64(stats.AvailableMB) * ratelimit.AutoDetectMemoryFraction)
 371  		if calculated66 > ratelimit.DefaultMaxMemoryMB {
 372  			log.I.F("memory auto-detected: total=%dMB, available=%dMB, target=%dMB (capped at default max, 66%% would be %dMB)",
 373  				stats.TotalMB, stats.AvailableMB, targetMB, calculated66)
 374  		} else {
 375  			log.I.F("memory auto-detected: total=%dMB, available=%dMB, target=%dMB (66%% of available)",
 376  				stats.TotalMB, stats.AvailableMB, targetMB)
 377  		}
 378  	} else {
 379  		// Validate explicitly configured target
 380  		_, err := ratelimit.CalculateTargetMemoryMB(targetMB)
 381  		if err != nil {
 382  			log.F.F("FATAL: %v", err)
 383  			log.F.F("Configured target memory %dMB is below minimum required %dMB.",
 384  				targetMB, ratelimit.MinimumMemoryMB)
 385  			os.Exit(1)
 386  		}
 387  	}
 388  
 389  	rlConfig := ratelimit.NewConfigFromValues(
 390  		rateLimitEnabled, targetMB,
 391  		writeKp, writeKi, writeKd,
 392  		readKp, readKi, readKd,
 393  		maxWriteMs, maxReadMs,
 394  		writeTarget, readTarget,
 395  		emergencyThreshold, recoveryThreshold,
 396  		emergencyMaxMs,
 397  	)
 398  
 399  	// Create appropriate monitor based on database type
 400  	if badgerDB, ok := db.(*database.D); ok {
 401  		limiter := ratelimit.NewBadgerLimiter(rlConfig, badgerDB.DB)
 402  		badgerDB.SetRateLimiter(limiter)
 403  		log.I.F("rate limiter configured for Badger backend (target: %dMB)", targetMB)
 404  		return limiter
 405  	}
 406  
 407  	if n4jDB, ok := db.(*neo4jdb.N); ok {
 408  		limiter := ratelimit.NewNeo4jLimiter(
 409  			rlConfig,
 410  			n4jDB.Driver(),
 411  			n4jDB.QuerySem(),
 412  			n4jDB.MaxConcurrentQueries(),
 413  		)
 414  		log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB)
 415  		return limiter
 416  	}
 417  
 418  	// For other backends, create a disabled limiter
 419  	log.I.F("rate limiter disabled for unknown backend")
 420  	return ratelimit.NewDisabledLimiter()
 421  }
 422  
 423  // startPprofServer starts the HTTP pprof server if enabled.
 424  func startPprofServer(ctx context.Context, cfg *config.C) {
 425  	if !cfg.PprofHTTP {
 426  		return
 427  	}
 428  
 429  	pprofAddr := fmt.Sprintf("%s:%d", cfg.Listen, 6060)
 430  	pprofMux := http.NewServeMux()
 431  	pprofMux.HandleFunc("/debug/pprof/", pp.Index)
 432  	pprofMux.HandleFunc("/debug/pprof/cmdline", pp.Cmdline)
 433  	pprofMux.HandleFunc("/debug/pprof/profile", pp.Profile)
 434  	pprofMux.HandleFunc("/debug/pprof/symbol", pp.Symbol)
 435  	pprofMux.HandleFunc("/debug/pprof/trace", pp.Trace)
 436  	for _, p := range []string{"allocs", "block", "goroutine", "heap", "mutex", "threadcreate"} {
 437  		pprofMux.Handle("/debug/pprof/"+p, pp.Handler(p))
 438  	}
 439  
 440  	ppSrv := &http.Server{Addr: pprofAddr, Handler: pprofMux}
 441  	go func() {
 442  		log.I.F("pprof server listening on %s", pprofAddr)
 443  		if err := ppSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
 444  			log.E.F("pprof server error: %v", err)
 445  		}
 446  	}()
 447  
 448  	go func() {
 449  		<-ctx.Done()
 450  		shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 2*time.Second)
 451  		defer cancelShutdown()
 452  		_ = ppSrv.Shutdown(shutdownCtx)
 453  	}()
 454  }
 455  
 456  // startHealthServer starts the health check HTTP server if configured.
 457  func startHealthServer(ctx context.Context, cfg *config.C) {
 458  	if cfg.HealthPort <= 0 {
 459  		return
 460  	}
 461  
 462  	mux := http.NewServeMux()
 463  	mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
 464  		w.WriteHeader(http.StatusOK)
 465  		_, _ = w.Write([]byte("ok"))
 466  		log.I.F("health check ok")
 467  	})
 468  
 469  	// Optional shutdown endpoint
 470  	if cfg.EnableShutdown {
 471  		mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
 472  			w.WriteHeader(http.StatusOK)
 473  			_, _ = w.Write([]byte("shutting down"))
 474  			log.I.F("shutdown requested via /shutdown; sending SIGINT to self")
 475  			go func() {
 476  				p, _ := os.FindProcess(os.Getpid())
 477  				_ = p.Signal(os.Interrupt)
 478  			}()
 479  		})
 480  	}
 481  
 482  	healthSrv := &http.Server{
 483  		Addr:    fmt.Sprintf("%s:%d", cfg.Listen, cfg.HealthPort),
 484  		Handler: mux,
 485  	}
 486  
 487  	go func() {
 488  		log.I.F("health check server listening on %s", healthSrv.Addr)
 489  		if err := healthSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
 490  			log.E.F("health server error: %v", err)
 491  		}
 492  	}()
 493  
 494  	go func() {
 495  		<-ctx.Done()
 496  		shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 2*time.Second)
 497  		defer cancelShutdown()
 498  		_ = healthSrv.Shutdown(shutdownCtx)
 499  	}()
 500  }
 501  
 502  // MakeDatabaseConfig creates a database.DatabaseConfig from the app config.
 503  func MakeDatabaseConfig(cfg *config.C) *database.DatabaseConfig {
 504  	dataDir, logLevel,
 505  		blockCacheMB, indexCacheMB, queryCacheSizeMB,
 506  		queryCacheMaxAge,
 507  		queryCacheDisabled,
 508  		serialCachePubkeys, serialCacheEventIds,
 509  		zstdLevel,
 510  		neo4jURI, neo4jUser, neo4jPassword,
 511  		neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit := cfg.GetDatabaseConfigValues()
 512  
 513  	grpcServerAddress, grpcConnectTimeout := cfg.GetGRPCConfigValues()
 514  
 515  	return &database.DatabaseConfig{
 516  		DataDir:                dataDir,
 517  		LogLevel:               logLevel,
 518  		BlockCacheMB:           blockCacheMB,
 519  		IndexCacheMB:           indexCacheMB,
 520  		QueryCacheSizeMB:       queryCacheSizeMB,
 521  		QueryCacheMaxAge:       queryCacheMaxAge,
 522  		QueryCacheDisabled:     queryCacheDisabled,
 523  		SerialCachePubkeys:     serialCachePubkeys,
 524  		SerialCacheEventIds:    serialCacheEventIds,
 525  		ZSTDLevel:              zstdLevel,
 526  		Neo4jURI:               neo4jURI,
 527  		Neo4jUser:              neo4jUser,
 528  		Neo4jPassword:          neo4jPassword,
 529  		Neo4jMaxConnPoolSize:   neo4jMaxConnPoolSize,
 530  		Neo4jFetchSize:         neo4jFetchSize,
 531  		Neo4jMaxTxRetrySeconds: neo4jMaxTxRetrySeconds,
 532  		Neo4jQueryResultLimit:  neo4jQueryResultLimit,
 533  		GRPCServerAddress:      grpcServerAddress,
 534  		GRPCConnectTimeout:     grpcConnectTimeout,
 535  	}
 536  }
 537