startup.go raw
1 //go:build !(js && wasm)
2
3 // Package relay provides shared startup logic for running the ORLY relay.
4 // This allows both the root binary and the unified binary to share
5 // the same initialization code for monolithic deployments.
6 package relay
7
8 import (
9 "context"
10 "fmt"
11 "net/http"
12 pp "net/http/pprof"
13 "os"
14 "os/signal"
15 "runtime"
16 "runtime/debug"
17 "sync"
18 "syscall"
19 "time"
20
21 "github.com/pkg/profile"
22 "next.orly.dev/pkg/lol/chk"
23 "next.orly.dev/pkg/lol/log"
24 "next.orly.dev/app"
25 "next.orly.dev/app/config"
26 "next.orly.dev/pkg/acl"
27 aclgrpc "next.orly.dev/pkg/acl/grpc"
28 "next.orly.dev/pkg/database"
29 _ "next.orly.dev/pkg/database/grpc" // Import for grpc factory registration
30 neo4jdb "next.orly.dev/pkg/neo4j"
31 "next.orly.dev/pkg/ratelimit"
32 "next.orly.dev/pkg/sync/negentropy"
33 negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc"
34 "next.orly.dev/pkg/utils/interrupt"
35 )
36
37 // StartupResult holds the initialized components from Startup.
38 type StartupResult struct {
39 Ctx context.Context
40 Cancel context.CancelFunc
41 DB database.Database
42 Limiter *ratelimit.Limiter
43 Quit chan struct{}
44 }
45
46 // Startup initializes the database, ACL, rate limiter, and starts the relay server.
47 // It returns a StartupResult containing all initialized components.
48 func Startup(cfg *config.C) (*StartupResult, error) {
49 runtime.GOMAXPROCS(128)
50 debug.SetGCPercent(10)
51
52 // Setup profiling
53 profileStop := setupProfiling(cfg)
54
55 ctx, cancel := context.WithCancel(context.Background())
56
57 // Initialize database
58 log.I.F("initializing %s database at %s", cfg.DBType, cfg.DataDir)
59 db, err := database.NewDatabaseWithConfig(
60 ctx, cancel, cfg.DBType, MakeDatabaseConfig(cfg),
61 )
62 if chk.E(err) {
63 cancel()
64 return nil, fmt.Errorf("failed to initialize database: %w", err)
65 }
66 log.I.F("%s database initialized successfully", cfg.DBType)
67
68 // Initialize ACL
69 if err := initializeACL(ctx, cfg, db); err != nil {
70 db.Close()
71 cancel()
72 return nil, fmt.Errorf("failed to initialize ACL: %w", err)
73 }
74
75 // Initialize negentropy handler (embedded or gRPC client)
76 initializeNegentropy(ctx, cfg, db)
77
78 // Create rate limiter
79 limiter := createRateLimiter(cfg, db)
80
81 // Start pprof HTTP server if enabled
82 startPprofServer(ctx, cfg)
83
84 // Start health check server if configured
85 startHealthServer(ctx, cfg)
86
87 // Start the relay
88 quit := app.Run(ctx, cfg, db, limiter)
89
90 // Store profileStop for cleanup
91 result := &StartupResult{
92 Ctx: ctx,
93 Cancel: cancel,
94 DB: db,
95 Limiter: limiter,
96 Quit: quit,
97 }
98
99 // Register profile stop handler
100 interrupt.AddHandler(func() {
101 log.I.F("interrupt received: stopping profiling")
102 profileStop()
103 })
104
105 return result, nil
106 }
107
108 // RunWithSignals calls Startup and blocks on signals, handling graceful shutdown.
109 // This is the main entry point for running the relay as a standalone process.
110 func RunWithSignals(cfg *config.C) error {
111 result, err := Startup(cfg)
112 if err != nil {
113 return err
114 }
115
116 sigs := make(chan os.Signal, 1)
117 signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
118
119 for {
120 select {
121 case <-sigs:
122 fmt.Printf("\r")
123 log.I.F("received shutdown signal, starting graceful shutdown")
124 result.Cancel()
125 <-result.Quit
126 chk.E(result.DB.Close())
127 log.I.F("exiting")
128 return nil
129 case <-result.Quit:
130 log.I.F("application quit signal received")
131 result.Cancel()
132 chk.E(result.DB.Close())
133 log.I.F("exiting")
134 return nil
135 }
136 }
137 }
138
139 // setupProfiling configures profiling based on config and returns a stop function.
140 func setupProfiling(cfg *config.C) func() {
141 var profileStopOnce sync.Once
142 profileStop := func() {}
143
144 switch cfg.Pprof {
145 case "cpu":
146 var prof interface{ Stop() }
147 if cfg.PprofPath != "" {
148 prof = profile.Start(profile.CPUProfile, profile.ProfilePath(cfg.PprofPath))
149 } else {
150 prof = profile.Start(profile.CPUProfile)
151 }
152 profileStop = func() {
153 profileStopOnce.Do(func() {
154 prof.Stop()
155 log.I.F("cpu profiling stopped and flushed")
156 })
157 }
158
159 case "memory":
160 var prof interface{ Stop() }
161 if cfg.PprofPath != "" {
162 prof = profile.Start(profile.MemProfile, profile.MemProfileRate(32), profile.ProfilePath(cfg.PprofPath))
163 } else {
164 prof = profile.Start(profile.MemProfile)
165 }
166 profileStop = func() {
167 profileStopOnce.Do(func() {
168 prof.Stop()
169 log.I.F("memory profiling stopped and flushed")
170 })
171 }
172
173 case "allocation":
174 var prof interface{ Stop() }
175 if cfg.PprofPath != "" {
176 prof = profile.Start(profile.MemProfileAllocs, profile.MemProfileRate(32), profile.ProfilePath(cfg.PprofPath))
177 } else {
178 prof = profile.Start(profile.MemProfileAllocs)
179 }
180 profileStop = func() {
181 profileStopOnce.Do(func() {
182 prof.Stop()
183 log.I.F("allocation profiling stopped and flushed")
184 })
185 }
186
187 case "heap":
188 var prof interface{ Stop() }
189 if cfg.PprofPath != "" {
190 prof = profile.Start(profile.MemProfileHeap, profile.ProfilePath(cfg.PprofPath))
191 } else {
192 prof = profile.Start(profile.MemProfileHeap)
193 }
194 profileStop = func() {
195 profileStopOnce.Do(func() {
196 prof.Stop()
197 log.I.F("heap profiling stopped and flushed")
198 })
199 }
200
201 case "mutex":
202 var prof interface{ Stop() }
203 if cfg.PprofPath != "" {
204 prof = profile.Start(profile.MutexProfile, profile.ProfilePath(cfg.PprofPath))
205 } else {
206 prof = profile.Start(profile.MutexProfile)
207 }
208 profileStop = func() {
209 profileStopOnce.Do(func() {
210 prof.Stop()
211 log.I.F("mutex profiling stopped and flushed")
212 })
213 }
214
215 case "threadcreate":
216 var prof interface{ Stop() }
217 if cfg.PprofPath != "" {
218 prof = profile.Start(profile.ThreadcreationProfile, profile.ProfilePath(cfg.PprofPath))
219 } else {
220 prof = profile.Start(profile.ThreadcreationProfile)
221 }
222 profileStop = func() {
223 profileStopOnce.Do(func() {
224 prof.Stop()
225 log.I.F("threadcreate profiling stopped and flushed")
226 })
227 }
228
229 case "goroutine":
230 var prof interface{ Stop() }
231 if cfg.PprofPath != "" {
232 prof = profile.Start(profile.GoroutineProfile, profile.ProfilePath(cfg.PprofPath))
233 } else {
234 prof = profile.Start(profile.GoroutineProfile)
235 }
236 profileStop = func() {
237 profileStopOnce.Do(func() {
238 prof.Stop()
239 log.I.F("goroutine profiling stopped and flushed")
240 })
241 }
242
243 case "block":
244 var prof interface{ Stop() }
245 if cfg.PprofPath != "" {
246 prof = profile.Start(profile.BlockProfile, profile.ProfilePath(cfg.PprofPath))
247 } else {
248 prof = profile.Start(profile.BlockProfile)
249 }
250 profileStop = func() {
251 profileStopOnce.Do(func() {
252 prof.Stop()
253 log.I.F("block profiling stopped and flushed")
254 })
255 }
256 }
257
258 return profileStop
259 }
260
261 // initializeACL sets up ACL - either remote gRPC or in-process.
262 func initializeACL(ctx context.Context, cfg *config.C, db database.Database) error {
263 aclType, aclServerAddr, aclConnTimeout := cfg.GetGRPCACLConfigValues()
264
265 if aclType == "grpc" {
266 // Use remote ACL server via gRPC
267 log.I.F("connecting to gRPC ACL server at %s", aclServerAddr)
268 aclClient, err := aclgrpc.New(ctx, &aclgrpc.ClientConfig{
269 ServerAddress: aclServerAddr,
270 ConnectTimeout: aclConnTimeout,
271 })
272 if chk.E(err) {
273 return fmt.Errorf("failed to connect to gRPC ACL server: %w", err)
274 }
275
276 // Wait for ACL server to be ready
277 select {
278 case <-aclClient.Ready():
279 log.I.F("gRPC ACL client connected, mode: %s", aclClient.Type())
280 case <-time.After(30 * time.Second):
281 return fmt.Errorf("timeout waiting for gRPC ACL server")
282 }
283
284 // Register and activate the gRPC client as the ACL backend
285 acl.Registry.RegisterAndActivate(aclClient)
286 } else {
287 // Use in-process ACL
288 acl.Registry.SetMode(cfg.ACLMode)
289 if err := acl.Registry.Configure(cfg, db, ctx); chk.E(err) {
290 return err
291 }
292 acl.Registry.Syncer()
293 }
294
295 return nil
296 }
297
298 // initializeNegentropy sets up negentropy handling (embedded or gRPC client).
299 func initializeNegentropy(ctx context.Context, cfg *config.C, db database.Database) {
300 syncType, _, _, _, negentropyAddr, syncTimeout, negentropyEnabled := cfg.GetGRPCSyncConfigValues()
301
302 if !negentropyEnabled {
303 log.I.F("negentropy NIP-77 disabled (set ORLY_NEGENTROPY_ENABLED=true to enable)")
304 return
305 }
306
307 if syncType == "grpc" && negentropyAddr != "" {
308 // Use gRPC client to connect to remote negentropy server
309 log.I.F("connecting to gRPC negentropy server at %s", negentropyAddr)
310 negClient, err := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{
311 ServerAddress: negentropyAddr,
312 ConnectTimeout: syncTimeout,
313 })
314 if err != nil {
315 log.W.F("failed to connect to gRPC negentropy server at %s: %v — falling back to embedded handler", negentropyAddr, err)
316 } else {
317 // Wait for negentropy server to be ready
318 select {
319 case <-negClient.Ready():
320 log.I.F("gRPC negentropy client connected")
321 app.SetNegentropyHandler(negClient)
322 return
323 case <-time.After(30 * time.Second):
324 log.W.F("timeout waiting for gRPC negentropy server at %s — falling back to embedded handler", negentropyAddr)
325 negClient.Close()
326 }
327 }
328 }
329
330 // Embedded negentropy handler — used in standalone mode or as fallback
331 // when gRPC connection fails, so NIP-77 is never silently disabled.
332 log.I.F("initializing embedded negentropy handler")
333 negHandler := negentropy.NewEmbeddedHandler(db, &negentropy.Config{
334 SyncInterval: 60 * time.Second,
335 FrameSize: 128 * 1024,
336 IDSize: 16,
337 ClientSessionTimeout: 5 * time.Minute,
338 })
339 negHandler.Start()
340 app.SetNegentropyHandler(negHandler)
341 log.I.F("embedded negentropy handler initialized (NIP-77 enabled)")
342 }
343
344 // createRateLimiter creates and configures the rate limiter.
345 func createRateLimiter(cfg *config.C, db database.Database) *ratelimit.Limiter {
346 rateLimitEnabled, targetMB,
347 writeKp, writeKi, writeKd,
348 readKp, readKi, readKd,
349 maxWriteMs, maxReadMs,
350 writeTarget, readTarget,
351 emergencyThreshold, recoveryThreshold,
352 emergencyMaxMs := cfg.GetRateLimitConfigValues()
353
354 if !rateLimitEnabled {
355 return ratelimit.NewDisabledLimiter()
356 }
357
358 // Auto-detect memory target if set to 0
359 if targetMB == 0 {
360 var err error
361 targetMB, err = ratelimit.CalculateTargetMemoryMB(targetMB)
362 if err != nil {
363 log.F.F("FATAL: %v", err)
364 log.F.F("There is not enough memory to run this relay in this environment.")
365 log.F.F("Available: %dMB, Required minimum: %dMB",
366 ratelimit.DetectAvailableMemoryMB(), ratelimit.MinimumMemoryMB)
367 os.Exit(1)
368 }
369 stats := ratelimit.GetMemoryStats(targetMB)
370 calculated66 := int(float64(stats.AvailableMB) * ratelimit.AutoDetectMemoryFraction)
371 if calculated66 > ratelimit.DefaultMaxMemoryMB {
372 log.I.F("memory auto-detected: total=%dMB, available=%dMB, target=%dMB (capped at default max, 66%% would be %dMB)",
373 stats.TotalMB, stats.AvailableMB, targetMB, calculated66)
374 } else {
375 log.I.F("memory auto-detected: total=%dMB, available=%dMB, target=%dMB (66%% of available)",
376 stats.TotalMB, stats.AvailableMB, targetMB)
377 }
378 } else {
379 // Validate explicitly configured target
380 _, err := ratelimit.CalculateTargetMemoryMB(targetMB)
381 if err != nil {
382 log.F.F("FATAL: %v", err)
383 log.F.F("Configured target memory %dMB is below minimum required %dMB.",
384 targetMB, ratelimit.MinimumMemoryMB)
385 os.Exit(1)
386 }
387 }
388
389 rlConfig := ratelimit.NewConfigFromValues(
390 rateLimitEnabled, targetMB,
391 writeKp, writeKi, writeKd,
392 readKp, readKi, readKd,
393 maxWriteMs, maxReadMs,
394 writeTarget, readTarget,
395 emergencyThreshold, recoveryThreshold,
396 emergencyMaxMs,
397 )
398
399 // Create appropriate monitor based on database type
400 if badgerDB, ok := db.(*database.D); ok {
401 limiter := ratelimit.NewBadgerLimiter(rlConfig, badgerDB.DB)
402 badgerDB.SetRateLimiter(limiter)
403 log.I.F("rate limiter configured for Badger backend (target: %dMB)", targetMB)
404 return limiter
405 }
406
407 if n4jDB, ok := db.(*neo4jdb.N); ok {
408 limiter := ratelimit.NewNeo4jLimiter(
409 rlConfig,
410 n4jDB.Driver(),
411 n4jDB.QuerySem(),
412 n4jDB.MaxConcurrentQueries(),
413 )
414 log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB)
415 return limiter
416 }
417
418 // For other backends, create a disabled limiter
419 log.I.F("rate limiter disabled for unknown backend")
420 return ratelimit.NewDisabledLimiter()
421 }
422
423 // startPprofServer starts the HTTP pprof server if enabled.
424 func startPprofServer(ctx context.Context, cfg *config.C) {
425 if !cfg.PprofHTTP {
426 return
427 }
428
429 pprofAddr := fmt.Sprintf("%s:%d", cfg.Listen, 6060)
430 pprofMux := http.NewServeMux()
431 pprofMux.HandleFunc("/debug/pprof/", pp.Index)
432 pprofMux.HandleFunc("/debug/pprof/cmdline", pp.Cmdline)
433 pprofMux.HandleFunc("/debug/pprof/profile", pp.Profile)
434 pprofMux.HandleFunc("/debug/pprof/symbol", pp.Symbol)
435 pprofMux.HandleFunc("/debug/pprof/trace", pp.Trace)
436 for _, p := range []string{"allocs", "block", "goroutine", "heap", "mutex", "threadcreate"} {
437 pprofMux.Handle("/debug/pprof/"+p, pp.Handler(p))
438 }
439
440 ppSrv := &http.Server{Addr: pprofAddr, Handler: pprofMux}
441 go func() {
442 log.I.F("pprof server listening on %s", pprofAddr)
443 if err := ppSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
444 log.E.F("pprof server error: %v", err)
445 }
446 }()
447
448 go func() {
449 <-ctx.Done()
450 shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 2*time.Second)
451 defer cancelShutdown()
452 _ = ppSrv.Shutdown(shutdownCtx)
453 }()
454 }
455
456 // startHealthServer starts the health check HTTP server if configured.
457 func startHealthServer(ctx context.Context, cfg *config.C) {
458 if cfg.HealthPort <= 0 {
459 return
460 }
461
462 mux := http.NewServeMux()
463 mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
464 w.WriteHeader(http.StatusOK)
465 _, _ = w.Write([]byte("ok"))
466 log.I.F("health check ok")
467 })
468
469 // Optional shutdown endpoint
470 if cfg.EnableShutdown {
471 mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
472 w.WriteHeader(http.StatusOK)
473 _, _ = w.Write([]byte("shutting down"))
474 log.I.F("shutdown requested via /shutdown; sending SIGINT to self")
475 go func() {
476 p, _ := os.FindProcess(os.Getpid())
477 _ = p.Signal(os.Interrupt)
478 }()
479 })
480 }
481
482 healthSrv := &http.Server{
483 Addr: fmt.Sprintf("%s:%d", cfg.Listen, cfg.HealthPort),
484 Handler: mux,
485 }
486
487 go func() {
488 log.I.F("health check server listening on %s", healthSrv.Addr)
489 if err := healthSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
490 log.E.F("health server error: %v", err)
491 }
492 }()
493
494 go func() {
495 <-ctx.Done()
496 shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 2*time.Second)
497 defer cancelShutdown()
498 _ = healthSrv.Shutdown(shutdownCtx)
499 }()
500 }
501
502 // MakeDatabaseConfig creates a database.DatabaseConfig from the app config.
503 func MakeDatabaseConfig(cfg *config.C) *database.DatabaseConfig {
504 dataDir, logLevel,
505 blockCacheMB, indexCacheMB, queryCacheSizeMB,
506 queryCacheMaxAge,
507 queryCacheDisabled,
508 serialCachePubkeys, serialCacheEventIds,
509 zstdLevel,
510 neo4jURI, neo4jUser, neo4jPassword,
511 neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit := cfg.GetDatabaseConfigValues()
512
513 grpcServerAddress, grpcConnectTimeout := cfg.GetGRPCConfigValues()
514
515 return &database.DatabaseConfig{
516 DataDir: dataDir,
517 LogLevel: logLevel,
518 BlockCacheMB: blockCacheMB,
519 IndexCacheMB: indexCacheMB,
520 QueryCacheSizeMB: queryCacheSizeMB,
521 QueryCacheMaxAge: queryCacheMaxAge,
522 QueryCacheDisabled: queryCacheDisabled,
523 SerialCachePubkeys: serialCachePubkeys,
524 SerialCacheEventIds: serialCacheEventIds,
525 ZSTDLevel: zstdLevel,
526 Neo4jURI: neo4jURI,
527 Neo4jUser: neo4jUser,
528 Neo4jPassword: neo4jPassword,
529 Neo4jMaxConnPoolSize: neo4jMaxConnPoolSize,
530 Neo4jFetchSize: neo4jFetchSize,
531 Neo4jMaxTxRetrySeconds: neo4jMaxTxRetrySeconds,
532 Neo4jQueryResultLimit: neo4jQueryResultLimit,
533 GRPCServerAddress: grpcServerAddress,
534 GRPCConnectTimeout: grpcConnectTimeout,
535 }
536 }
537