supervisor.go raw
1 package main
2
3 import (
4 "context"
5 "fmt"
6 "net"
7 "os"
8 "os/exec"
9 "sync"
10 "syscall"
11 "time"
12
13 "google.golang.org/grpc"
14 "google.golang.org/grpc/credentials/insecure"
15 "next.orly.dev/pkg/lol/chk"
16 "next.orly.dev/pkg/lol/log"
17
18 orlyaclv1 "next.orly.dev/pkg/proto/orlyacl/v1"
19 orlynitsv1 "next.orly.dev/pkg/proto/orlynits/v1"
20 )
21
22 // Supervisor manages the database, ACL, sync, and relay processes.
23 type Supervisor struct {
24 cfg *Config
25 ctx context.Context
26 cancel context.CancelFunc
27
28 dbProc *Process
29 aclProc *Process
30 relayProc *Process
31
32 // Sync service processes
33 distributedSyncProc *Process
34 clusterSyncProc *Process
35 relayGroupProc *Process
36 negentropyProc *Process
37
38 // Certificate service process
39 certsProc *Process
40
41 // Bitcoin node (nits) process
42 nitsProc *Process
43
44 // Lightning node (luk) process
45 lukProc *Process
46
47 // Wallet (strela) process
48 strelaProc *Process
49
50 wg sync.WaitGroup
51 mu sync.Mutex
52 closed bool
53 }
54
55 // Process represents a managed subprocess.
56 type Process struct {
57 name string
58 cmd *exec.Cmd
59 restarts int
60 exited chan struct{} // closed when process exits
61 mu sync.Mutex
62 }
63
64 // NewSupervisor creates a new process supervisor.
65 func NewSupervisor(ctx context.Context, cancel context.CancelFunc, cfg *Config) *Supervisor {
66 return &Supervisor{
67 cfg: cfg,
68 ctx: ctx,
69 cancel: cancel,
70 }
71 }
72
73 // IsRunning returns true if any managed processes are running.
74 func (s *Supervisor) IsRunning() bool {
75 s.mu.Lock()
76 defer s.mu.Unlock()
77
78 // Check if any process is running
79 if s.dbProc != nil {
80 select {
81 case <-s.dbProc.exited:
82 // Process has exited
83 default:
84 return true
85 }
86 }
87 if s.relayProc != nil {
88 select {
89 case <-s.relayProc.exited:
90 // Process has exited
91 default:
92 return true
93 }
94 }
95 return false
96 }
97
98 // Start starts the database, optional ACL server, sync services, and relay processes.
99 func (s *Supervisor) Start() error {
100 s.mu.Lock()
101 s.closed = false
102 s.mu.Unlock()
103
104 // Phase 1: Start Bitcoin node if enabled (nits → luk depends on it)
105 if s.cfg.NitsEnabled {
106 if err := s.startNits(); err != nil {
107 return fmt.Errorf("failed to start nits: %w", err)
108 }
109
110 if err := s.waitForNitsReady(s.cfg.NitsReadyTimeout); err != nil {
111 s.stopProcess(s.nitsProc, 30*time.Second)
112 return fmt.Errorf("nits not ready: %w", err)
113 }
114
115 log.I.F("bitcoin node is ready")
116 }
117
118 // Phase 2: Start Lightning node if enabled (luk → strela depends on it)
119 if s.cfg.LukEnabled {
120 if err := s.startLuk(); err != nil {
121 if s.cfg.NitsEnabled {
122 s.stopProcess(s.nitsProc, 30*time.Second)
123 }
124 return fmt.Errorf("failed to start luk: %w", err)
125 }
126
127 if err := s.waitForServiceReady(s.cfg.LukRPCListen, s.cfg.LukReadyTimeout); err != nil {
128 s.stopProcess(s.lukProc, 10*time.Second)
129 if s.cfg.NitsEnabled {
130 s.stopProcess(s.nitsProc, 30*time.Second)
131 }
132 return fmt.Errorf("luk not ready: %w", err)
133 }
134
135 log.I.F("lightning node is ready")
136 }
137
138 // Phase 3: Start database server
139 if err := s.startDB(); err != nil {
140 return fmt.Errorf("failed to start database: %w", err)
141 }
142
143 if err := s.waitForDBReady(s.cfg.DBReadyTimeout); err != nil {
144 s.stopDB()
145 return fmt.Errorf("database not ready: %w", err)
146 }
147
148 log.I.F("database is ready")
149
150 // Phase 4: Start ACL server if enabled
151 if s.cfg.ACLEnabled {
152 if err := s.startACL(); err != nil {
153 s.stopDB()
154 return fmt.Errorf("failed to start ACL server: %w", err)
155 }
156
157 if err := s.waitForACLReady(s.cfg.ACLReadyTimeout); err != nil {
158 s.stopACL()
159 s.stopDB()
160 return fmt.Errorf("ACL server not ready: %w", err)
161 }
162
163 log.I.F("ACL server is ready")
164 }
165
166 // Start sync services in parallel (they all depend on DB)
167 if err := s.startSyncServices(); err != nil {
168 s.stopSyncServices()
169 if s.cfg.ACLEnabled {
170 s.stopACL()
171 }
172 s.stopDB()
173 return fmt.Errorf("failed to start sync services: %w", err)
174 }
175
176 // Phase 5: Start strela (needs luk) + relay (needs db/acl) in parallel
177 if s.cfg.StrelaEnabled {
178 if err := s.startStrela(); err != nil {
179 log.W.F("failed to start strela: %v", err)
180 } else {
181 log.I.F("strela wallet started")
182 }
183 }
184
185 if err := s.startRelay(); err != nil {
186 s.stopSyncServices()
187 if s.cfg.ACLEnabled {
188 s.stopACL()
189 }
190 s.stopDB()
191 return fmt.Errorf("failed to start relay: %w", err)
192 }
193
194 // Phase 6: Start certificate service if enabled (independent)
195 if s.cfg.CertsEnabled {
196 if err := s.startCerts(); err != nil {
197 log.W.F("failed to start certificate service: %v", err)
198 } else {
199 log.I.F("certificate service started")
200 }
201 }
202
203 // Start monitoring goroutines
204 monitorCount := 2 // db + relay
205 if s.cfg.NitsEnabled {
206 monitorCount++
207 }
208 if s.cfg.LukEnabled {
209 monitorCount++
210 }
211 if s.cfg.StrelaEnabled {
212 monitorCount++
213 }
214 if s.cfg.ACLEnabled {
215 monitorCount++
216 }
217 if s.cfg.DistributedSyncEnabled {
218 monitorCount++
219 }
220 if s.cfg.ClusterSyncEnabled {
221 monitorCount++
222 }
223 if s.cfg.RelayGroupEnabled {
224 monitorCount++
225 }
226 if s.cfg.NegentropyEnabled {
227 monitorCount++
228 }
229 if s.cfg.CertsEnabled {
230 monitorCount++
231 }
232
233 s.wg.Add(monitorCount)
234 if s.cfg.NitsEnabled {
235 go s.monitorProcess(s.nitsProc, "nits", s.startNits)
236 }
237 if s.cfg.LukEnabled {
238 go s.monitorProcess(s.lukProc, "luk", s.startLuk)
239 }
240 go s.monitorProcess(s.dbProc, "db", s.startDB)
241 if s.cfg.ACLEnabled {
242 go s.monitorProcess(s.aclProc, "acl", s.startACL)
243 }
244 if s.cfg.DistributedSyncEnabled {
245 go s.monitorProcess(s.distributedSyncProc, "distributed-sync", s.startDistributedSync)
246 }
247 if s.cfg.ClusterSyncEnabled {
248 go s.monitorProcess(s.clusterSyncProc, "cluster-sync", s.startClusterSync)
249 }
250 if s.cfg.RelayGroupEnabled {
251 go s.monitorProcess(s.relayGroupProc, "relaygroup", s.startRelayGroup)
252 }
253 if s.cfg.NegentropyEnabled {
254 go s.monitorProcess(s.negentropyProc, "negentropy", s.startNegentropy)
255 }
256 if s.cfg.StrelaEnabled {
257 go s.monitorProcess(s.strelaProc, "strela", s.startStrela)
258 }
259 if s.cfg.CertsEnabled {
260 go s.monitorProcess(s.certsProc, "certs", s.startCerts)
261 }
262 go s.monitorProcess(s.relayProc, "relay", s.startRelay)
263
264 return nil
265 }
266
267 // Stop stops all managed processes gracefully.
268 func (s *Supervisor) Stop() error {
269 s.mu.Lock()
270 if s.closed {
271 s.mu.Unlock()
272 return nil
273 }
274 s.closed = true
275 s.mu.Unlock()
276
277 // Stop in reverse startup order: certs → relay, strela → acl, sync → db → luk → nits
278
279 if s.cfg.CertsEnabled && s.certsProc != nil {
280 log.I.F("stopping certificate service...")
281 s.stopProcess(s.certsProc, 5*time.Second)
282 }
283
284 log.I.F("stopping relay...")
285 s.stopProcess(s.relayProc, 5*time.Second)
286
287 if s.cfg.StrelaEnabled && s.strelaProc != nil {
288 log.I.F("stopping strela...")
289 s.stopProcess(s.strelaProc, 5*time.Second)
290 }
291
292 log.I.F("stopping sync services...")
293 s.stopSyncServices()
294
295 if s.cfg.ACLEnabled && s.aclProc != nil {
296 log.I.F("stopping ACL server...")
297 s.stopProcess(s.aclProc, 5*time.Second)
298 }
299
300 log.I.F("stopping database...")
301 s.stopProcess(s.dbProc, s.cfg.StopTimeout)
302
303 if s.cfg.LukEnabled && s.lukProc != nil {
304 log.I.F("stopping lightning node...")
305 s.stopProcess(s.lukProc, 10*time.Second)
306 }
307
308 if s.cfg.NitsEnabled && s.nitsProc != nil {
309 log.I.F("stopping bitcoin node...")
310 s.stopProcess(s.nitsProc, 30*time.Second)
311 }
312
313 // Wait for monitor goroutines to exit (they will exit when they see closed=true)
314 s.wg.Wait()
315
316 return nil
317 }
318
319 func (s *Supervisor) startDB() error {
320 s.mu.Lock()
321 defer s.mu.Unlock()
322
323 // Build environment for database process
324 env := os.Environ()
325 env = append(env, fmt.Sprintf("ORLY_DB_LISTEN=%s", s.cfg.DBListen))
326 env = append(env, fmt.Sprintf("ORLY_DATA_DIR=%s", s.cfg.DataDir))
327 env = append(env, fmt.Sprintf("ORLY_DB_LOG_LEVEL=%s", s.cfg.LogLevel))
328
329 cmd := exec.CommandContext(s.ctx, s.cfg.DBBinary)
330 cmd.Env = env
331 cmd.Stdout = os.Stdout
332 cmd.Stderr = os.Stderr
333
334 if err := cmd.Start(); chk.E(err) {
335 return err
336 }
337
338 exited := make(chan struct{})
339 s.dbProc = &Process{
340 name: "orly-db",
341 cmd: cmd,
342 exited: exited,
343 }
344
345 // Start a goroutine to wait for the process and close the exited channel
346 go func() {
347 cmd.Wait()
348 close(exited)
349 }()
350
351 log.I.F("started database server (pid %d)", cmd.Process.Pid)
352 return nil
353 }
354
355 func (s *Supervisor) startACL() error {
356 s.mu.Lock()
357 defer s.mu.Unlock()
358
359 // Build environment for ACL process
360 env := os.Environ()
361 env = append(env, fmt.Sprintf("ORLY_ACL_LISTEN=%s", s.cfg.ACLListen))
362 env = append(env, "ORLY_ACL_DB_TYPE=grpc")
363 env = append(env, fmt.Sprintf("ORLY_ACL_GRPC_DB_SERVER=%s", s.cfg.DBListen))
364 env = append(env, fmt.Sprintf("ORLY_ACL_MODE=%s", s.cfg.ACLMode))
365 env = append(env, fmt.Sprintf("ORLY_ACL_LOG_LEVEL=%s", s.cfg.LogLevel))
366
367 cmd := exec.CommandContext(s.ctx, s.cfg.ACLBinary)
368 cmd.Env = env
369 cmd.Stdout = os.Stdout
370 cmd.Stderr = os.Stderr
371
372 if err := cmd.Start(); chk.E(err) {
373 return err
374 }
375
376 exited := make(chan struct{})
377 s.aclProc = &Process{
378 name: "orly-acl",
379 cmd: cmd,
380 exited: exited,
381 }
382
383 // Start a goroutine to wait for the process and close the exited channel
384 go func() {
385 cmd.Wait()
386 close(exited)
387 }()
388
389 log.I.F("started ACL server (pid %d)", cmd.Process.Pid)
390 return nil
391 }
392
393 func (s *Supervisor) waitForACLReady(timeout time.Duration) error {
394 deadline := time.Now().Add(timeout)
395 ticker := time.NewTicker(250 * time.Millisecond)
396 defer ticker.Stop()
397
398 var grpcConn *grpc.ClientConn
399 var aclClient orlyaclv1.ACLServiceClient
400
401 for {
402 select {
403 case <-s.ctx.Done():
404 if grpcConn != nil {
405 grpcConn.Close()
406 }
407 return s.ctx.Err()
408 case <-ticker.C:
409 if time.Now().After(deadline) {
410 if grpcConn != nil {
411 grpcConn.Close()
412 }
413 return fmt.Errorf("timeout waiting for ACL server")
414 }
415
416 // First, check if TCP port is open
417 conn, err := net.DialTimeout("tcp", s.cfg.ACLListen, time.Second)
418 if err != nil {
419 continue // Port not open yet
420 }
421 conn.Close()
422
423 // Port is open, now check gRPC Ready() endpoint
424 if grpcConn == nil {
425 grpcConn, err = grpc.DialContext(s.ctx, s.cfg.ACLListen,
426 grpc.WithTransportCredentials(insecure.NewCredentials()),
427 )
428 if err != nil {
429 continue // Failed to connect
430 }
431 aclClient = orlyaclv1.NewACLServiceClient(grpcConn)
432 }
433
434 // Call Ready() to check if service is fully configured
435 ctx, cancel := context.WithTimeout(s.ctx, time.Second)
436 resp, err := aclClient.Ready(ctx, &orlyaclv1.Empty{})
437 cancel()
438 if err == nil && resp.Ready {
439 grpcConn.Close()
440 return nil // ACL server is fully ready
441 }
442 // Not ready yet, keep polling
443 }
444 }
445 }
446
447 func (s *Supervisor) stopACL() {
448 s.stopProcess(s.aclProc, 5*time.Second)
449 }
450
451 func (s *Supervisor) startRelay() error {
452 s.mu.Lock()
453 defer s.mu.Unlock()
454
455 // Build environment for relay process
456 env := os.Environ()
457 env = append(env, "ORLY_DB_TYPE=grpc")
458 env = append(env, fmt.Sprintf("ORLY_GRPC_SERVER=%s", s.cfg.DBListen))
459 env = append(env, fmt.Sprintf("ORLY_LOG_LEVEL=%s", s.cfg.LogLevel))
460
461 // If ACL is enabled, configure relay to use gRPC ACL
462 // Otherwise, run in open mode (no ACL restrictions)
463 if s.cfg.ACLEnabled {
464 env = append(env, "ORLY_ACL_TYPE=grpc")
465 env = append(env, fmt.Sprintf("ORLY_GRPC_ACL_SERVER=%s", s.cfg.ACLListen))
466 env = append(env, fmt.Sprintf("ORLY_ACL_MODE=%s", s.cfg.ACLMode))
467 } else {
468 // Open relay - no ACL restrictions
469 env = append(env, "ORLY_ACL_TYPE=local")
470 env = append(env, "ORLY_ACL_MODE=none")
471 }
472
473 // Configure sync service connections
474 if s.cfg.DistributedSyncEnabled {
475 env = append(env, "ORLY_SYNC_TYPE=grpc")
476 env = append(env, fmt.Sprintf("ORLY_GRPC_SYNC_DISTRIBUTED=%s", s.cfg.DistributedSyncListen))
477 }
478 if s.cfg.ClusterSyncEnabled {
479 env = append(env, fmt.Sprintf("ORLY_GRPC_SYNC_CLUSTER=%s", s.cfg.ClusterSyncListen))
480 }
481 if s.cfg.RelayGroupEnabled {
482 env = append(env, fmt.Sprintf("ORLY_GRPC_SYNC_RELAYGROUP=%s", s.cfg.RelayGroupListen))
483 }
484 if s.cfg.NegentropyEnabled {
485 env = append(env, "ORLY_NEGENTROPY_ENABLED=true")
486 env = append(env, fmt.Sprintf("ORLY_GRPC_SYNC_NEGENTROPY=%s", s.cfg.NegentropyListen))
487 }
488
489 cmd := exec.CommandContext(s.ctx, s.cfg.RelayBinary)
490 cmd.Env = env
491 cmd.Stdout = os.Stdout
492 cmd.Stderr = os.Stderr
493
494 if err := cmd.Start(); chk.E(err) {
495 return err
496 }
497
498 exited := make(chan struct{})
499 s.relayProc = &Process{
500 name: "orly",
501 cmd: cmd,
502 exited: exited,
503 }
504
505 // Start a goroutine to wait for the process and close the exited channel
506 go func() {
507 cmd.Wait()
508 close(exited)
509 }()
510
511 log.I.F("started relay server (pid %d)", cmd.Process.Pid)
512 return nil
513 }
514
515 func (s *Supervisor) waitForDBReady(timeout time.Duration) error {
516 deadline := time.Now().Add(timeout)
517 ticker := time.NewTicker(250 * time.Millisecond)
518 defer ticker.Stop()
519
520 for {
521 select {
522 case <-s.ctx.Done():
523 return s.ctx.Err()
524 case <-ticker.C:
525 if time.Now().After(deadline) {
526 return fmt.Errorf("timeout waiting for database")
527 }
528
529 // Try to connect to the gRPC port
530 conn, err := net.DialTimeout("tcp", s.cfg.DBListen, time.Second)
531 if err == nil {
532 conn.Close()
533 return nil // Database is accepting connections
534 }
535 }
536 }
537 }
538
539 func (s *Supervisor) stopDB() {
540 s.stopProcess(s.dbProc, s.cfg.StopTimeout)
541 }
542
543 func (s *Supervisor) stopProcess(p *Process, timeout time.Duration) {
544 if p == nil {
545 return
546 }
547
548 p.mu.Lock()
549 if p.cmd == nil || p.cmd.Process == nil {
550 p.mu.Unlock()
551 return
552 }
553 p.mu.Unlock()
554
555 // Send SIGTERM for graceful shutdown
556 if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
557 // Process may have already exited
558 log.D.F("%s already exited: %v", p.name, err)
559 return
560 }
561
562 // Wait for process to exit using the exited channel
563 select {
564 case <-p.exited:
565 log.I.F("%s stopped gracefully", p.name)
566 case <-time.After(timeout):
567 log.W.F("%s did not stop in time, killing", p.name)
568 p.cmd.Process.Kill()
569 <-p.exited // Wait for the kill to complete
570 }
571 }
572
573 func (s *Supervisor) monitorProcess(p *Process, procType string, restart func() error) {
574 defer s.wg.Done()
575
576 for {
577 // Check if we're shutting down
578 s.mu.Lock()
579 closed := s.closed
580 s.mu.Unlock()
581
582 if closed {
583 return
584 }
585
586 select {
587 case <-s.ctx.Done():
588 return
589 default:
590 }
591
592 if p == nil || p.exited == nil {
593 return
594 }
595
596 // Wait for process to exit
597 select {
598 case <-p.exited:
599 // Process exited
600 case <-s.ctx.Done():
601 return
602 }
603
604 // Check again if we're shutting down (process may have been stopped intentionally)
605 s.mu.Lock()
606 closed = s.closed
607 s.mu.Unlock()
608
609 if closed {
610 return
611 }
612
613 // Process exited unexpectedly
614 p.restarts++
615 log.W.F("%s exited unexpectedly, restart count: %d", p.name, p.restarts)
616
617 // Backoff before restart
618 backoff := time.Duration(p.restarts) * time.Second
619 if backoff > 30*time.Second {
620 backoff = 30 * time.Second
621 }
622
623 select {
624 case <-s.ctx.Done():
625 return
626 case <-time.After(backoff):
627 }
628
629 // Check one more time before restarting
630 s.mu.Lock()
631 closed = s.closed
632 s.mu.Unlock()
633
634 if closed {
635 return
636 }
637
638 if err := restart(); err != nil {
639 log.E.F("failed to restart %s: %v", p.name, err)
640 } else {
641 // Update p to point to the new process
642 s.mu.Lock()
643 switch procType {
644 case "db":
645 p = s.dbProc
646 case "acl":
647 p = s.aclProc
648 case "distributed-sync":
649 p = s.distributedSyncProc
650 case "cluster-sync":
651 p = s.clusterSyncProc
652 case "relaygroup":
653 p = s.relayGroupProc
654 case "negentropy":
655 p = s.negentropyProc
656 case "certs":
657 p = s.certsProc
658 case "nits":
659 p = s.nitsProc
660 case "luk":
661 p = s.lukProc
662 case "strela":
663 p = s.strelaProc
664 default:
665 p = s.relayProc
666 }
667 s.mu.Unlock()
668 }
669 }
670 }
671
672 // startSyncServices starts all enabled sync services in parallel.
673 func (s *Supervisor) startSyncServices() error {
674 var wg sync.WaitGroup
675 var mu sync.Mutex
676 var errs []error
677
678 // Start each enabled service in a goroutine
679 if s.cfg.DistributedSyncEnabled {
680 wg.Add(1)
681 go func() {
682 defer wg.Done()
683 if err := s.startDistributedSync(); err != nil {
684 mu.Lock()
685 errs = append(errs, fmt.Errorf("distributed sync: %w", err))
686 mu.Unlock()
687 return
688 }
689 if err := s.waitForServiceReady(s.cfg.DistributedSyncListen, s.cfg.SyncReadyTimeout); err != nil {
690 mu.Lock()
691 errs = append(errs, fmt.Errorf("distributed sync not ready: %w", err))
692 mu.Unlock()
693 return
694 }
695 log.I.F("distributed sync service is ready")
696 }()
697 }
698
699 if s.cfg.ClusterSyncEnabled {
700 wg.Add(1)
701 go func() {
702 defer wg.Done()
703 if err := s.startClusterSync(); err != nil {
704 mu.Lock()
705 errs = append(errs, fmt.Errorf("cluster sync: %w", err))
706 mu.Unlock()
707 return
708 }
709 if err := s.waitForServiceReady(s.cfg.ClusterSyncListen, s.cfg.SyncReadyTimeout); err != nil {
710 mu.Lock()
711 errs = append(errs, fmt.Errorf("cluster sync not ready: %w", err))
712 mu.Unlock()
713 return
714 }
715 log.I.F("cluster sync service is ready")
716 }()
717 }
718
719 if s.cfg.RelayGroupEnabled {
720 wg.Add(1)
721 go func() {
722 defer wg.Done()
723 if err := s.startRelayGroup(); err != nil {
724 mu.Lock()
725 errs = append(errs, fmt.Errorf("relaygroup: %w", err))
726 mu.Unlock()
727 return
728 }
729 if err := s.waitForServiceReady(s.cfg.RelayGroupListen, s.cfg.SyncReadyTimeout); err != nil {
730 mu.Lock()
731 errs = append(errs, fmt.Errorf("relaygroup not ready: %w", err))
732 mu.Unlock()
733 return
734 }
735 log.I.F("relaygroup service is ready")
736 }()
737 }
738
739 if s.cfg.NegentropyEnabled {
740 wg.Add(1)
741 go func() {
742 defer wg.Done()
743 if err := s.startNegentropy(); err != nil {
744 mu.Lock()
745 errs = append(errs, fmt.Errorf("negentropy: %w", err))
746 mu.Unlock()
747 return
748 }
749 if err := s.waitForServiceReady(s.cfg.NegentropyListen, s.cfg.SyncReadyTimeout); err != nil {
750 mu.Lock()
751 errs = append(errs, fmt.Errorf("negentropy not ready: %w", err))
752 mu.Unlock()
753 return
754 }
755 log.I.F("negentropy service is ready")
756 }()
757 }
758
759 wg.Wait()
760
761 if len(errs) > 0 {
762 return errs[0]
763 }
764 return nil
765 }
766
767 // stopSyncServices stops all sync services.
768 func (s *Supervisor) stopSyncServices() {
769 // Stop all in parallel using a WaitGroup
770 var wg sync.WaitGroup
771
772 if s.distributedSyncProc != nil {
773 wg.Add(1)
774 go func() {
775 defer wg.Done()
776 s.stopProcess(s.distributedSyncProc, 5*time.Second)
777 }()
778 }
779
780 if s.clusterSyncProc != nil {
781 wg.Add(1)
782 go func() {
783 defer wg.Done()
784 s.stopProcess(s.clusterSyncProc, 5*time.Second)
785 }()
786 }
787
788 if s.relayGroupProc != nil {
789 wg.Add(1)
790 go func() {
791 defer wg.Done()
792 s.stopProcess(s.relayGroupProc, 5*time.Second)
793 }()
794 }
795
796 if s.negentropyProc != nil {
797 wg.Add(1)
798 go func() {
799 defer wg.Done()
800 s.stopProcess(s.negentropyProc, 5*time.Second)
801 }()
802 }
803
804 wg.Wait()
805 }
806
807 // waitForServiceReady waits for a gRPC service to be accepting connections.
808 func (s *Supervisor) waitForServiceReady(address string, timeout time.Duration) error {
809 deadline := time.Now().Add(timeout)
810 ticker := time.NewTicker(250 * time.Millisecond)
811 defer ticker.Stop()
812
813 for {
814 select {
815 case <-s.ctx.Done():
816 return s.ctx.Err()
817 case <-ticker.C:
818 if time.Now().After(deadline) {
819 return fmt.Errorf("timeout waiting for service at %s", address)
820 }
821
822 conn, err := net.DialTimeout("tcp", address, time.Second)
823 if err == nil {
824 conn.Close()
825 return nil
826 }
827 }
828 }
829 }
830
831 func (s *Supervisor) startDistributedSync() error {
832 s.mu.Lock()
833 defer s.mu.Unlock()
834
835 env := os.Environ()
836 env = append(env, fmt.Sprintf("ORLY_SYNC_DISTRIBUTED_LISTEN=%s", s.cfg.DistributedSyncListen))
837 env = append(env, "ORLY_SYNC_DISTRIBUTED_DB_TYPE=grpc")
838 env = append(env, fmt.Sprintf("ORLY_SYNC_DISTRIBUTED_DB_SERVER=%s", s.cfg.DBListen))
839 env = append(env, fmt.Sprintf("ORLY_SYNC_DISTRIBUTED_LOG_LEVEL=%s", s.cfg.LogLevel))
840
841 cmd := exec.CommandContext(s.ctx, s.cfg.DistributedSyncBinary)
842 cmd.Env = env
843 cmd.Stdout = os.Stdout
844 cmd.Stderr = os.Stderr
845
846 if err := cmd.Start(); chk.E(err) {
847 return err
848 }
849
850 exited := make(chan struct{})
851 s.distributedSyncProc = &Process{
852 name: "orly-sync-distributed",
853 cmd: cmd,
854 exited: exited,
855 }
856
857 go func() {
858 cmd.Wait()
859 close(exited)
860 }()
861
862 log.I.F("started distributed sync service (pid %d)", cmd.Process.Pid)
863 return nil
864 }
865
866 func (s *Supervisor) startClusterSync() error {
867 s.mu.Lock()
868 defer s.mu.Unlock()
869
870 env := os.Environ()
871 env = append(env, fmt.Sprintf("ORLY_SYNC_CLUSTER_LISTEN=%s", s.cfg.ClusterSyncListen))
872 env = append(env, "ORLY_SYNC_CLUSTER_DB_TYPE=grpc")
873 env = append(env, fmt.Sprintf("ORLY_SYNC_CLUSTER_DB_SERVER=%s", s.cfg.DBListen))
874 env = append(env, fmt.Sprintf("ORLY_SYNC_CLUSTER_LOG_LEVEL=%s", s.cfg.LogLevel))
875
876 cmd := exec.CommandContext(s.ctx, s.cfg.ClusterSyncBinary)
877 cmd.Env = env
878 cmd.Stdout = os.Stdout
879 cmd.Stderr = os.Stderr
880
881 if err := cmd.Start(); chk.E(err) {
882 return err
883 }
884
885 exited := make(chan struct{})
886 s.clusterSyncProc = &Process{
887 name: "orly-sync-cluster",
888 cmd: cmd,
889 exited: exited,
890 }
891
892 go func() {
893 cmd.Wait()
894 close(exited)
895 }()
896
897 log.I.F("started cluster sync service (pid %d)", cmd.Process.Pid)
898 return nil
899 }
900
901 func (s *Supervisor) startRelayGroup() error {
902 s.mu.Lock()
903 defer s.mu.Unlock()
904
905 env := os.Environ()
906 env = append(env, fmt.Sprintf("ORLY_SYNC_RELAYGROUP_LISTEN=%s", s.cfg.RelayGroupListen))
907 env = append(env, "ORLY_SYNC_RELAYGROUP_DB_TYPE=grpc")
908 env = append(env, fmt.Sprintf("ORLY_SYNC_RELAYGROUP_DB_SERVER=%s", s.cfg.DBListen))
909 env = append(env, fmt.Sprintf("ORLY_SYNC_RELAYGROUP_LOG_LEVEL=%s", s.cfg.LogLevel))
910
911 cmd := exec.CommandContext(s.ctx, s.cfg.RelayGroupBinary)
912 cmd.Env = env
913 cmd.Stdout = os.Stdout
914 cmd.Stderr = os.Stderr
915
916 if err := cmd.Start(); chk.E(err) {
917 return err
918 }
919
920 exited := make(chan struct{})
921 s.relayGroupProc = &Process{
922 name: "orly-sync-relaygroup",
923 cmd: cmd,
924 exited: exited,
925 }
926
927 go func() {
928 cmd.Wait()
929 close(exited)
930 }()
931
932 log.I.F("started relaygroup service (pid %d)", cmd.Process.Pid)
933 return nil
934 }
935
936 func (s *Supervisor) startNegentropy() error {
937 s.mu.Lock()
938 defer s.mu.Unlock()
939
940 env := os.Environ()
941 env = append(env, fmt.Sprintf("ORLY_SYNC_NEGENTROPY_LISTEN=%s", s.cfg.NegentropyListen))
942 env = append(env, "ORLY_SYNC_NEGENTROPY_DB_TYPE=grpc")
943 env = append(env, fmt.Sprintf("ORLY_SYNC_NEGENTROPY_DB_SERVER=%s", s.cfg.DBListen))
944 env = append(env, fmt.Sprintf("ORLY_SYNC_NEGENTROPY_LOG_LEVEL=%s", s.cfg.LogLevel))
945
946 cmd := exec.CommandContext(s.ctx, s.cfg.NegentropyBinary)
947 cmd.Env = env
948 cmd.Stdout = os.Stdout
949 cmd.Stderr = os.Stderr
950
951 if err := cmd.Start(); chk.E(err) {
952 return err
953 }
954
955 exited := make(chan struct{})
956 s.negentropyProc = &Process{
957 name: "orly-sync-negentropy",
958 cmd: cmd,
959 exited: exited,
960 }
961
962 go func() {
963 cmd.Wait()
964 close(exited)
965 }()
966
967 log.I.F("started negentropy service (pid %d)", cmd.Process.Pid)
968 return nil
969 }
970
971 func (s *Supervisor) startCerts() error {
972 s.mu.Lock()
973 defer s.mu.Unlock()
974
975 // Certificate service uses its own environment variables
976 // ORLY_CERTS_DOMAIN, ORLY_CERTS_EMAIL, ORLY_CERTS_DNS_PROVIDER, etc.
977 env := os.Environ()
978 env = append(env, fmt.Sprintf("ORLY_CERTS_LOG_LEVEL=%s", s.cfg.LogLevel))
979
980 cmd := exec.CommandContext(s.ctx, s.cfg.CertsBinary)
981 cmd.Env = env
982 cmd.Stdout = os.Stdout
983 cmd.Stderr = os.Stderr
984
985 if err := cmd.Start(); chk.E(err) {
986 return err
987 }
988
989 exited := make(chan struct{})
990 s.certsProc = &Process{
991 name: "orly-certs",
992 cmd: cmd,
993 exited: exited,
994 }
995
996 go func() {
997 cmd.Wait()
998 close(exited)
999 }()
1000
1001 log.I.F("started certificate service (pid %d)", cmd.Process.Pid)
1002 return nil
1003 }
1004
1005 func (s *Supervisor) startNits() error {
1006 s.mu.Lock()
1007 defer s.mu.Unlock()
1008
1009 env := os.Environ()
1010 env = append(env, fmt.Sprintf("ORLY_NITS_LISTEN=%s", s.cfg.NitsListen))
1011 env = append(env, fmt.Sprintf("ORLY_NITS_BITCOIND=%s", s.cfg.NitsBinary))
1012 env = append(env, fmt.Sprintf("ORLY_NITS_RPC_PORT=%d", s.cfg.NitsRPCPort))
1013 env = append(env, fmt.Sprintf("ORLY_NITS_DATA_DIR=%s", s.cfg.NitsDataDir))
1014 env = append(env, fmt.Sprintf("ORLY_NITS_PRUNE_MB=%d", s.cfg.NitsPruneMB))
1015 env = append(env, fmt.Sprintf("ORLY_NITS_NETWORK=%s", s.cfg.NitsNetwork))
1016 env = append(env, fmt.Sprintf("ORLY_NITS_LOG_LEVEL=%s", s.cfg.LogLevel))
1017
1018 cmd := exec.CommandContext(s.ctx, s.cfg.NitsShimBinary)
1019 cmd.Env = env
1020 cmd.Stdout = os.Stdout
1021 cmd.Stderr = os.Stderr
1022
1023 if err := cmd.Start(); chk.E(err) {
1024 return err
1025 }
1026
1027 exited := make(chan struct{})
1028 s.nitsProc = &Process{
1029 name: "orly-nits",
1030 cmd: cmd,
1031 exited: exited,
1032 }
1033
1034 go func() {
1035 cmd.Wait()
1036 close(exited)
1037 }()
1038
1039 log.I.F("started bitcoin node shim (pid %d)", cmd.Process.Pid)
1040 return nil
1041 }
1042
1043 func (s *Supervisor) waitForNitsReady(timeout time.Duration) error {
1044 deadline := time.Now().Add(timeout)
1045 ticker := time.NewTicker(2 * time.Second)
1046 defer ticker.Stop()
1047
1048 var grpcConn *grpc.ClientConn
1049 var nitsClient orlynitsv1.NitsServiceClient
1050
1051 for {
1052 select {
1053 case <-s.ctx.Done():
1054 if grpcConn != nil {
1055 grpcConn.Close()
1056 }
1057 return s.ctx.Err()
1058 case <-ticker.C:
1059 if time.Now().After(deadline) {
1060 if grpcConn != nil {
1061 grpcConn.Close()
1062 }
1063 return fmt.Errorf("timeout waiting for bitcoin node")
1064 }
1065
1066 // Check TCP first
1067 conn, err := net.DialTimeout("tcp", s.cfg.NitsListen, time.Second)
1068 if err != nil {
1069 continue
1070 }
1071 conn.Close()
1072
1073 // Then check gRPC Ready()
1074 if grpcConn == nil {
1075 grpcConn, err = grpc.DialContext(s.ctx, s.cfg.NitsListen,
1076 grpc.WithTransportCredentials(insecure.NewCredentials()),
1077 )
1078 if err != nil {
1079 continue
1080 }
1081 nitsClient = orlynitsv1.NewNitsServiceClient(grpcConn)
1082 }
1083
1084 ctx, cancel := context.WithTimeout(s.ctx, 2*time.Second)
1085 resp, err := nitsClient.Ready(ctx, &orlynitsv1.Empty{})
1086 cancel()
1087 if err == nil && resp.Ready {
1088 grpcConn.Close()
1089 return nil
1090 }
1091 // Bitcoind responsive but still syncing is fine for luk to start
1092 if err == nil && resp.Status == "syncing" {
1093 grpcConn.Close()
1094 log.I.F("bitcoin node responsive (syncing %d%%), proceeding", resp.SyncProgressPercent)
1095 return nil
1096 }
1097 }
1098 }
1099 }
1100
1101 func (s *Supervisor) startLuk() error {
1102 s.mu.Lock()
1103 defer s.mu.Unlock()
1104
1105 // luk (LND fork) uses command-line flags
1106 args := []string{
1107 fmt.Sprintf("--lnddir=%s", s.cfg.LukDataDir),
1108 fmt.Sprintf("--rpclisten=%s", s.cfg.LukRPCListen),
1109 fmt.Sprintf("--listen=%s", s.cfg.LukPeerListen),
1110 fmt.Sprintf("--bitcoind.rpchost=127.0.0.1:%d", s.cfg.NitsRPCPort),
1111 }
1112
1113 // Set network flag if not mainnet
1114 switch s.cfg.NitsNetwork {
1115 case "testnet":
1116 args = append(args, "--bitcoin.testnet")
1117 case "signet":
1118 args = append(args, "--bitcoin.signet")
1119 case "regtest":
1120 args = append(args, "--bitcoin.regtest")
1121 }
1122
1123 cmd := exec.CommandContext(s.ctx, s.cfg.LukBinary, args...)
1124 cmd.Env = os.Environ()
1125 cmd.Stdout = os.Stdout
1126 cmd.Stderr = os.Stderr
1127
1128 if err := cmd.Start(); chk.E(err) {
1129 return err
1130 }
1131
1132 exited := make(chan struct{})
1133 s.lukProc = &Process{
1134 name: "luk",
1135 cmd: cmd,
1136 exited: exited,
1137 }
1138
1139 go func() {
1140 cmd.Wait()
1141 close(exited)
1142 }()
1143
1144 log.I.F("started lightning node (pid %d)", cmd.Process.Pid)
1145 return nil
1146 }
1147
1148 func (s *Supervisor) startStrela() error {
1149 s.mu.Lock()
1150 defer s.mu.Unlock()
1151
1152 env := os.Environ()
1153 env = append(env, "LN_BACKEND_TYPE=LND")
1154 env = append(env, fmt.Sprintf("LND_ADDRESS=%s", s.cfg.LukRPCListen))
1155 env = append(env, fmt.Sprintf("LND_CERT_FILE=%s/tls.cert", s.cfg.LukDataDir))
1156 env = append(env, fmt.Sprintf("LND_MACAROON_FILE=%s/data/chain/bitcoin/mainnet/admin.macaroon", s.cfg.LukDataDir))
1157 env = append(env, fmt.Sprintf("PORT=%d", s.cfg.StrelaPort))
1158 env = append(env, fmt.Sprintf("WORK_DIR=%s", s.cfg.StrelaDataDir))
1159 env = append(env, fmt.Sprintf("LOG_LEVEL=%s", s.cfg.LogLevel))
1160
1161 cmd := exec.CommandContext(s.ctx, s.cfg.StrelaBinary)
1162 cmd.Env = env
1163 cmd.Stdout = os.Stdout
1164 cmd.Stderr = os.Stderr
1165
1166 if err := cmd.Start(); chk.E(err) {
1167 return err
1168 }
1169
1170 exited := make(chan struct{})
1171 s.strelaProc = &Process{
1172 name: "strela",
1173 cmd: cmd,
1174 exited: exited,
1175 }
1176
1177 go func() {
1178 cmd.Wait()
1179 close(exited)
1180 }()
1181
1182 log.I.F("started strela wallet (pid %d)", cmd.Process.Pid)
1183 return nil
1184 }
1185
1186 // GetProcessStatuses returns the status of all available modules with categories.
1187 // Modules are grouped by category, and some categories are mutually exclusive.
1188 func (s *Supervisor) GetProcessStatuses() []ProcessStatus {
1189 s.mu.Lock()
1190 defer s.mu.Unlock()
1191
1192 var statuses []ProcessStatus
1193
1194 // Database backends (mutually exclusive - only one can be active)
1195 isBadger := s.cfg.DBBackend == "badger"
1196 isNeo4j := s.cfg.DBBackend == "neo4j"
1197
1198 statuses = append(statuses, s.getProcessStatusFull(
1199 s.dbProc, "orly-db-badger", "orly-db-badger",
1200 isBadger, "database", "Badger embedded key-value store (default)",
1201 ))
1202 statuses = append(statuses, s.getProcessStatusFull(
1203 nil, "orly-db-neo4j", "orly-db-neo4j",
1204 isNeo4j, "database", "Neo4j graph database for WoT queries",
1205 ))
1206
1207 // ACL backends (mutually exclusive - only one can be active, can be disabled)
1208 isFollows := s.cfg.ACLEnabled && s.cfg.ACLMode == "follows"
1209 isManaged := s.cfg.ACLEnabled && s.cfg.ACLMode == "managed"
1210 isCuration := s.cfg.ACLEnabled && (s.cfg.ACLMode == "curation" || s.cfg.ACLMode == "curating")
1211
1212 statuses = append(statuses, s.getProcessStatusFull(
1213 s.aclProc, "orly-acl-follows", "orly-acl-follows",
1214 isFollows, "acl", "Whitelist based on admin's follow list",
1215 ))
1216 statuses = append(statuses, s.getProcessStatusFull(
1217 nil, "orly-acl-managed", "orly-acl-managed",
1218 isManaged, "acl", "NIP-86 fine-grained access control",
1219 ))
1220 statuses = append(statuses, s.getProcessStatusFull(
1221 nil, "orly-acl-curation", "orly-acl-curation",
1222 isCuration, "acl", "Rate-limited trust tiers for curation",
1223 ))
1224
1225 // Sync services (independent - multiple can be enabled)
1226 statuses = append(statuses, s.getProcessStatusFull(
1227 s.distributedSyncProc, "orly-sync-distributed", s.cfg.DistributedSyncBinary,
1228 s.cfg.DistributedSyncEnabled, "sync", "Distributed event synchronization",
1229 ))
1230 statuses = append(statuses, s.getProcessStatusFull(
1231 s.clusterSyncProc, "orly-sync-cluster", s.cfg.ClusterSyncBinary,
1232 s.cfg.ClusterSyncEnabled, "sync", "Cluster synchronization for HA",
1233 ))
1234 statuses = append(statuses, s.getProcessStatusFull(
1235 s.relayGroupProc, "orly-sync-relaygroup", s.cfg.RelayGroupBinary,
1236 s.cfg.RelayGroupEnabled, "sync", "NIP-29 relay group synchronization",
1237 ))
1238 statuses = append(statuses, s.getProcessStatusFull(
1239 s.negentropyProc, "orly-sync-negentropy", s.cfg.NegentropyBinary,
1240 s.cfg.NegentropyEnabled, "sync", "NIP-77 negentropy reconciliation",
1241 ))
1242
1243 // Certificate service (standalone)
1244 statuses = append(statuses, s.getProcessStatusFull(
1245 s.certsProc, "orly-certs", s.cfg.CertsBinary,
1246 s.cfg.CertsEnabled, "certs", "Let's Encrypt certificate management",
1247 ))
1248
1249 // Bitcoin node (nits)
1250 statuses = append(statuses, s.getProcessStatusFull(
1251 s.nitsProc, "orly-nits", s.cfg.NitsShimBinary,
1252 s.cfg.NitsEnabled, "bitcoin", "Bitcoin node manager (bitcoind)",
1253 ))
1254
1255 // Lightning node (luk)
1256 statuses = append(statuses, s.getProcessStatusFull(
1257 s.lukProc, "luk", s.cfg.LukBinary,
1258 s.cfg.LukEnabled, "lightning", "Lightning network node (LND fork)",
1259 ))
1260
1261 // Wallet (strela)
1262 statuses = append(statuses, s.getProcessStatusFull(
1263 s.strelaProc, "strela", s.cfg.StrelaBinary,
1264 s.cfg.StrelaEnabled, "wallet", "Lightning wallet web UI",
1265 ))
1266
1267 // Relay process - always enabled
1268 statuses = append(statuses, s.getProcessStatusFull(
1269 s.relayProc, "orly", s.cfg.RelayBinary,
1270 true, "relay", "Main Nostr relay server",
1271 ))
1272
1273 return statuses
1274 }
1275
1276 func (s *Supervisor) getProcessStatus(p *Process, binaryPath string, enabled bool) ProcessStatus {
1277 status := "stopped"
1278 pid := 0
1279
1280 p.mu.Lock()
1281 defer p.mu.Unlock()
1282
1283 if p.cmd != nil && p.cmd.Process != nil {
1284 // Check if process is still running
1285 select {
1286 case <-p.exited:
1287 status = "stopped"
1288 default:
1289 status = "running"
1290 pid = p.cmd.Process.Pid
1291 }
1292 }
1293
1294 return ProcessStatus{
1295 Name: p.name,
1296 Binary: binaryPath,
1297 Version: "", // Will be filled by caller if needed
1298 Status: status,
1299 Enabled: enabled,
1300 PID: pid,
1301 Restarts: p.restarts,
1302 }
1303 }
1304
1305 // getProcessStatusOrDisabled returns the status of a process, or a disabled status if the process is nil.
1306 func (s *Supervisor) getProcessStatusOrDisabled(p *Process, name, binaryPath string, enabled bool) ProcessStatus {
1307 if p != nil {
1308 return s.getProcessStatus(p, binaryPath, enabled)
1309 }
1310
1311 // Process doesn't exist - show as disabled or stopped
1312 status := "stopped"
1313 if !enabled {
1314 status = "disabled"
1315 }
1316
1317 return ProcessStatus{
1318 Name: name,
1319 Binary: binaryPath,
1320 Status: status,
1321 Enabled: enabled,
1322 }
1323 }
1324
1325 // getProcessStatusFull returns the status of a process with category and description.
1326 func (s *Supervisor) getProcessStatusFull(p *Process, name, binaryPath string, enabled bool, category, description string) ProcessStatus {
1327 status := "disabled"
1328 pid := 0
1329 restarts := 0
1330
1331 if enabled {
1332 status = "stopped"
1333 }
1334
1335 if p != nil {
1336 p.mu.Lock()
1337 defer p.mu.Unlock()
1338
1339 if p.cmd != nil && p.cmd.Process != nil {
1340 select {
1341 case <-p.exited:
1342 status = "stopped"
1343 default:
1344 status = "running"
1345 pid = p.cmd.Process.Pid
1346 }
1347 }
1348 restarts = p.restarts
1349 }
1350
1351 return ProcessStatus{
1352 Name: name,
1353 Binary: binaryPath,
1354 Status: status,
1355 Enabled: enabled,
1356 Category: category,
1357 Description: description,
1358 PID: pid,
1359 Restarts: restarts,
1360 }
1361 }
1362
1363 // RestartService restarts a specific service with dependency handling.
1364 // If a service's dependencies need to restart, they are handled appropriately.
1365 // Returns the list of services that were restarted.
1366 func (s *Supervisor) RestartService(serviceName string) ([]string, error) {
1367 log.I.F("restarting service: %s", serviceName)
1368
1369 var restarted []string
1370
1371 // Determine which services need to restart based on dependencies
1372 // db → acl, sync services, relay all depend on db
1373 // acl → relay depends on acl
1374 // sync services → relay may depend on sync services
1375 // relay → nothing depends on relay
1376
1377 switch serviceName {
1378 case "orly-db", "db":
1379 // Restart db and all dependent services
1380 // First stop in reverse order: relay, sync, acl, db
1381 s.stopProcess(s.relayProc, 5*time.Second)
1382 s.stopSyncServices()
1383 if s.cfg.ACLEnabled && s.aclProc != nil {
1384 s.stopProcess(s.aclProc, 5*time.Second)
1385 }
1386 s.stopProcess(s.dbProc, s.cfg.StopTimeout)
1387
1388 time.Sleep(500 * time.Millisecond)
1389
1390 // Start in dependency order
1391 if err := s.startDB(); err != nil {
1392 return restarted, fmt.Errorf("failed to restart db: %w", err)
1393 }
1394 restarted = append(restarted, "orly-db")
1395
1396 if err := s.waitForDBReady(s.cfg.DBReadyTimeout); err != nil {
1397 return restarted, fmt.Errorf("db not ready: %w", err)
1398 }
1399
1400 if s.cfg.ACLEnabled {
1401 if err := s.startACL(); err != nil {
1402 return restarted, fmt.Errorf("failed to restart acl: %w", err)
1403 }
1404 restarted = append(restarted, "orly-acl")
1405 if err := s.waitForACLReady(s.cfg.ACLReadyTimeout); err != nil {
1406 return restarted, fmt.Errorf("acl not ready: %w", err)
1407 }
1408 }
1409
1410 if err := s.startSyncServices(); err != nil {
1411 return restarted, fmt.Errorf("failed to restart sync services: %w", err)
1412 }
1413 if s.cfg.DistributedSyncEnabled {
1414 restarted = append(restarted, "orly-sync-distributed")
1415 }
1416 if s.cfg.ClusterSyncEnabled {
1417 restarted = append(restarted, "orly-sync-cluster")
1418 }
1419 if s.cfg.RelayGroupEnabled {
1420 restarted = append(restarted, "orly-sync-relaygroup")
1421 }
1422 if s.cfg.NegentropyEnabled {
1423 restarted = append(restarted, "orly-sync-negentropy")
1424 }
1425
1426 if err := s.startRelay(); err != nil {
1427 return restarted, fmt.Errorf("failed to restart relay: %w", err)
1428 }
1429 restarted = append(restarted, "orly")
1430
1431 case "orly-acl", "acl":
1432 if !s.cfg.ACLEnabled {
1433 return restarted, fmt.Errorf("ACL is not enabled")
1434 }
1435 // Restart acl and relay (relay depends on acl)
1436 s.stopProcess(s.relayProc, 5*time.Second)
1437 s.stopProcess(s.aclProc, 5*time.Second)
1438
1439 time.Sleep(500 * time.Millisecond)
1440
1441 if err := s.startACL(); err != nil {
1442 return restarted, fmt.Errorf("failed to restart acl: %w", err)
1443 }
1444 restarted = append(restarted, "orly-acl")
1445
1446 if err := s.waitForACLReady(s.cfg.ACLReadyTimeout); err != nil {
1447 return restarted, fmt.Errorf("acl not ready: %w", err)
1448 }
1449
1450 if err := s.startRelay(); err != nil {
1451 return restarted, fmt.Errorf("failed to restart relay: %w", err)
1452 }
1453 restarted = append(restarted, "orly")
1454
1455 case "orly-sync-distributed", "distributed-sync":
1456 if !s.cfg.DistributedSyncEnabled {
1457 return restarted, fmt.Errorf("distributed sync is not enabled")
1458 }
1459 s.stopProcess(s.distributedSyncProc, 5*time.Second)
1460 time.Sleep(500 * time.Millisecond)
1461 if err := s.startDistributedSync(); err != nil {
1462 return restarted, fmt.Errorf("failed to restart distributed sync: %w", err)
1463 }
1464 restarted = append(restarted, "orly-sync-distributed")
1465
1466 case "orly-sync-cluster", "cluster-sync":
1467 if !s.cfg.ClusterSyncEnabled {
1468 return restarted, fmt.Errorf("cluster sync is not enabled")
1469 }
1470 s.stopProcess(s.clusterSyncProc, 5*time.Second)
1471 time.Sleep(500 * time.Millisecond)
1472 if err := s.startClusterSync(); err != nil {
1473 return restarted, fmt.Errorf("failed to restart cluster sync: %w", err)
1474 }
1475 restarted = append(restarted, "orly-sync-cluster")
1476
1477 case "orly-sync-relaygroup", "relaygroup":
1478 if !s.cfg.RelayGroupEnabled {
1479 return restarted, fmt.Errorf("relaygroup is not enabled")
1480 }
1481 s.stopProcess(s.relayGroupProc, 5*time.Second)
1482 time.Sleep(500 * time.Millisecond)
1483 if err := s.startRelayGroup(); err != nil {
1484 return restarted, fmt.Errorf("failed to restart relaygroup: %w", err)
1485 }
1486 restarted = append(restarted, "orly-sync-relaygroup")
1487
1488 case "orly-sync-negentropy", "negentropy":
1489 if !s.cfg.NegentropyEnabled {
1490 return restarted, fmt.Errorf("negentropy is not enabled")
1491 }
1492 s.stopProcess(s.negentropyProc, 5*time.Second)
1493 time.Sleep(500 * time.Millisecond)
1494 if err := s.startNegentropy(); err != nil {
1495 return restarted, fmt.Errorf("failed to restart negentropy: %w", err)
1496 }
1497 restarted = append(restarted, "orly-sync-negentropy")
1498
1499 case "orly-certs", "certs":
1500 if !s.cfg.CertsEnabled {
1501 return restarted, fmt.Errorf("certificate service is not enabled")
1502 }
1503 s.stopProcess(s.certsProc, 5*time.Second)
1504 time.Sleep(500 * time.Millisecond)
1505 if err := s.startCerts(); err != nil {
1506 return restarted, fmt.Errorf("failed to restart certificate service: %w", err)
1507 }
1508 restarted = append(restarted, "orly-certs")
1509
1510 case "orly", "relay":
1511 // Just restart relay
1512 s.stopProcess(s.relayProc, 5*time.Second)
1513 time.Sleep(500 * time.Millisecond)
1514 if err := s.startRelay(); err != nil {
1515 return restarted, fmt.Errorf("failed to restart relay: %w", err)
1516 }
1517 restarted = append(restarted, "orly")
1518
1519 case "orly-nits", "nits":
1520 if !s.cfg.NitsEnabled {
1521 return restarted, fmt.Errorf("nits is not enabled")
1522 }
1523 // nits restart cascades to luk → strela
1524 if s.cfg.StrelaEnabled && s.strelaProc != nil {
1525 s.stopProcess(s.strelaProc, 5*time.Second)
1526 }
1527 if s.cfg.LukEnabled && s.lukProc != nil {
1528 s.stopProcess(s.lukProc, 10*time.Second)
1529 }
1530 s.stopProcess(s.nitsProc, 30*time.Second)
1531 time.Sleep(500 * time.Millisecond)
1532
1533 if err := s.startNits(); err != nil {
1534 return restarted, fmt.Errorf("failed to restart nits: %w", err)
1535 }
1536 restarted = append(restarted, "orly-nits")
1537
1538 if err := s.waitForNitsReady(s.cfg.NitsReadyTimeout); err != nil {
1539 return restarted, fmt.Errorf("nits not ready: %w", err)
1540 }
1541
1542 if s.cfg.LukEnabled {
1543 if err := s.startLuk(); err != nil {
1544 return restarted, fmt.Errorf("failed to restart luk: %w", err)
1545 }
1546 restarted = append(restarted, "luk")
1547 if err := s.waitForServiceReady(s.cfg.LukRPCListen, s.cfg.LukReadyTimeout); err != nil {
1548 return restarted, fmt.Errorf("luk not ready: %w", err)
1549 }
1550 }
1551
1552 if s.cfg.StrelaEnabled {
1553 if err := s.startStrela(); err != nil {
1554 return restarted, fmt.Errorf("failed to restart strela: %w", err)
1555 }
1556 restarted = append(restarted, "strela")
1557 }
1558
1559 case "luk", "lightning":
1560 if !s.cfg.LukEnabled {
1561 return restarted, fmt.Errorf("luk is not enabled")
1562 }
1563 // luk restart cascades to strela
1564 if s.cfg.StrelaEnabled && s.strelaProc != nil {
1565 s.stopProcess(s.strelaProc, 5*time.Second)
1566 }
1567 s.stopProcess(s.lukProc, 10*time.Second)
1568 time.Sleep(500 * time.Millisecond)
1569
1570 if err := s.startLuk(); err != nil {
1571 return restarted, fmt.Errorf("failed to restart luk: %w", err)
1572 }
1573 restarted = append(restarted, "luk")
1574
1575 if err := s.waitForServiceReady(s.cfg.LukRPCListen, s.cfg.LukReadyTimeout); err != nil {
1576 return restarted, fmt.Errorf("luk not ready: %w", err)
1577 }
1578
1579 if s.cfg.StrelaEnabled {
1580 if err := s.startStrela(); err != nil {
1581 return restarted, fmt.Errorf("failed to restart strela: %w", err)
1582 }
1583 restarted = append(restarted, "strela")
1584 }
1585
1586 case "strela", "wallet":
1587 if !s.cfg.StrelaEnabled {
1588 return restarted, fmt.Errorf("strela is not enabled")
1589 }
1590 // strela is a leaf node, no cascade
1591 s.stopProcess(s.strelaProc, 5*time.Second)
1592 time.Sleep(500 * time.Millisecond)
1593 if err := s.startStrela(); err != nil {
1594 return restarted, fmt.Errorf("failed to restart strela: %w", err)
1595 }
1596 restarted = append(restarted, "strela")
1597
1598 default:
1599 return restarted, fmt.Errorf("unknown service: %s", serviceName)
1600 }
1601
1602 log.I.F("restarted services: %v", restarted)
1603 return restarted, nil
1604 }
1605
1606 // StartService starts a specific service if it's not already running.
1607 func (s *Supervisor) StartService(serviceName string) error {
1608 log.I.F("starting service: %s", serviceName)
1609
1610 switch serviceName {
1611 case "orly-db", "db":
1612 if err := s.startDB(); err != nil {
1613 return fmt.Errorf("failed to start db: %w", err)
1614 }
1615 if err := s.waitForDBReady(s.cfg.DBReadyTimeout); err != nil {
1616 return fmt.Errorf("db not ready: %w", err)
1617 }
1618
1619 case "orly-acl", "acl":
1620 if !s.cfg.ACLEnabled {
1621 return fmt.Errorf("ACL is not enabled in configuration")
1622 }
1623 if err := s.startACL(); err != nil {
1624 return fmt.Errorf("failed to start acl: %w", err)
1625 }
1626 if err := s.waitForACLReady(s.cfg.ACLReadyTimeout); err != nil {
1627 return fmt.Errorf("acl not ready: %w", err)
1628 }
1629
1630 case "orly-sync-distributed", "distributed-sync":
1631 if !s.cfg.DistributedSyncEnabled {
1632 return fmt.Errorf("distributed sync is not enabled in configuration")
1633 }
1634 if err := s.startDistributedSync(); err != nil {
1635 return fmt.Errorf("failed to start distributed sync: %w", err)
1636 }
1637
1638 case "orly-sync-cluster", "cluster-sync":
1639 if !s.cfg.ClusterSyncEnabled {
1640 return fmt.Errorf("cluster sync is not enabled in configuration")
1641 }
1642 if err := s.startClusterSync(); err != nil {
1643 return fmt.Errorf("failed to start cluster sync: %w", err)
1644 }
1645
1646 case "orly-sync-relaygroup", "relaygroup":
1647 if !s.cfg.RelayGroupEnabled {
1648 return fmt.Errorf("relaygroup is not enabled in configuration")
1649 }
1650 if err := s.startRelayGroup(); err != nil {
1651 return fmt.Errorf("failed to start relaygroup: %w", err)
1652 }
1653
1654 case "orly-sync-negentropy", "negentropy":
1655 if !s.cfg.NegentropyEnabled {
1656 return fmt.Errorf("negentropy is not enabled in configuration")
1657 }
1658 if err := s.startNegentropy(); err != nil {
1659 return fmt.Errorf("failed to start negentropy: %w", err)
1660 }
1661
1662 case "orly-certs", "certs":
1663 if !s.cfg.CertsEnabled {
1664 return fmt.Errorf("certificate service is not enabled in configuration")
1665 }
1666 if err := s.startCerts(); err != nil {
1667 return fmt.Errorf("failed to start certificate service: %w", err)
1668 }
1669
1670 case "orly", "relay":
1671 if err := s.startRelay(); err != nil {
1672 return fmt.Errorf("failed to start relay: %w", err)
1673 }
1674
1675 case "orly-nits", "nits":
1676 if !s.cfg.NitsEnabled {
1677 return fmt.Errorf("nits is not enabled in configuration")
1678 }
1679 if err := s.startNits(); err != nil {
1680 return fmt.Errorf("failed to start nits: %w", err)
1681 }
1682 if err := s.waitForNitsReady(s.cfg.NitsReadyTimeout); err != nil {
1683 return fmt.Errorf("nits not ready: %w", err)
1684 }
1685
1686 case "luk", "lightning":
1687 if !s.cfg.LukEnabled {
1688 return fmt.Errorf("luk is not enabled in configuration")
1689 }
1690 if err := s.startLuk(); err != nil {
1691 return fmt.Errorf("failed to start luk: %w", err)
1692 }
1693 if err := s.waitForServiceReady(s.cfg.LukRPCListen, s.cfg.LukReadyTimeout); err != nil {
1694 return fmt.Errorf("luk not ready: %w", err)
1695 }
1696
1697 case "strela", "wallet":
1698 if !s.cfg.StrelaEnabled {
1699 return fmt.Errorf("strela is not enabled in configuration")
1700 }
1701 if err := s.startStrela(); err != nil {
1702 return fmt.Errorf("failed to start strela: %w", err)
1703 }
1704
1705 default:
1706 return fmt.Errorf("unknown service: %s", serviceName)
1707 }
1708
1709 log.I.F("started service: %s", serviceName)
1710 return nil
1711 }
1712
1713 // StopService stops a specific service and its dependents.
1714 // Dependency chain: db → acl, sync services → relay
1715 // Stopping a service will first stop all services that depend on it.
1716 func (s *Supervisor) StopService(serviceName string) error {
1717 log.I.F("stopping service: %s", serviceName)
1718
1719 switch serviceName {
1720 case "orly-db", "db":
1721 // DB is the root - everything depends on it
1722 // Stop in reverse dependency order: relay, sync, acl, then db
1723 log.I.F("stopping relay (depends on db)")
1724 s.stopProcess(s.relayProc, 5*time.Second)
1725
1726 log.I.F("stopping sync services (depend on db)")
1727 s.stopSyncServices()
1728
1729 if s.cfg.ACLEnabled && s.aclProc != nil {
1730 log.I.F("stopping acl (depends on db)")
1731 s.stopProcess(s.aclProc, 5*time.Second)
1732 }
1733
1734 log.I.F("stopping db")
1735 s.stopProcess(s.dbProc, s.cfg.StopTimeout)
1736
1737 case "orly-acl", "acl":
1738 // Relay depends on ACL when ACL is enabled
1739 if s.cfg.ACLEnabled {
1740 log.I.F("stopping relay (depends on acl)")
1741 s.stopProcess(s.relayProc, 5*time.Second)
1742 }
1743 if s.aclProc != nil {
1744 s.stopProcess(s.aclProc, 5*time.Second)
1745 }
1746
1747 case "orly-sync-distributed", "distributed-sync":
1748 // Relay may depend on sync services
1749 if s.distributedSyncProc != nil {
1750 s.stopProcess(s.distributedSyncProc, 5*time.Second)
1751 }
1752
1753 case "orly-sync-cluster", "cluster-sync":
1754 if s.clusterSyncProc != nil {
1755 s.stopProcess(s.clusterSyncProc, 5*time.Second)
1756 }
1757
1758 case "orly-sync-relaygroup", "relaygroup":
1759 if s.relayGroupProc != nil {
1760 s.stopProcess(s.relayGroupProc, 5*time.Second)
1761 }
1762
1763 case "orly-sync-negentropy", "negentropy":
1764 if s.negentropyProc != nil {
1765 s.stopProcess(s.negentropyProc, 5*time.Second)
1766 }
1767
1768 case "orly-certs", "certs":
1769 // Certs is independent
1770 if s.certsProc != nil {
1771 s.stopProcess(s.certsProc, 5*time.Second)
1772 }
1773
1774 case "orly", "relay":
1775 // Relay is a leaf - nothing depends on it
1776 s.stopProcess(s.relayProc, 5*time.Second)
1777
1778 case "orly-nits", "nits":
1779 // nits is root of bitcoin stack: strela → luk → nits
1780 if s.cfg.StrelaEnabled && s.strelaProc != nil {
1781 log.I.F("stopping strela (depends on luk)")
1782 s.stopProcess(s.strelaProc, 5*time.Second)
1783 }
1784 if s.cfg.LukEnabled && s.lukProc != nil {
1785 log.I.F("stopping luk (depends on nits)")
1786 s.stopProcess(s.lukProc, 10*time.Second)
1787 }
1788 if s.nitsProc != nil {
1789 s.stopProcess(s.nitsProc, 30*time.Second)
1790 }
1791
1792 case "luk", "lightning":
1793 // strela depends on luk
1794 if s.cfg.StrelaEnabled && s.strelaProc != nil {
1795 log.I.F("stopping strela (depends on luk)")
1796 s.stopProcess(s.strelaProc, 5*time.Second)
1797 }
1798 if s.lukProc != nil {
1799 s.stopProcess(s.lukProc, 10*time.Second)
1800 }
1801
1802 case "strela", "wallet":
1803 // strela is a leaf
1804 if s.strelaProc != nil {
1805 s.stopProcess(s.strelaProc, 5*time.Second)
1806 }
1807
1808 default:
1809 return fmt.Errorf("unknown service: %s", serviceName)
1810 }
1811
1812 log.I.F("stopped service: %s", serviceName)
1813 return nil
1814 }
1815
1816 // RestartAll stops all processes and starts them again.
1817 func (s *Supervisor) RestartAll() error {
1818 log.I.F("restarting all processes...")
1819
1820 // Stop in reverse dependency order: certs, relay, strela, sync, acl, db, luk, nits
1821 s.mu.Lock()
1822 if s.certsProc != nil && s.cfg.CertsEnabled {
1823 s.mu.Unlock()
1824 s.stopProcess(s.certsProc, 5*time.Second)
1825 s.mu.Lock()
1826 }
1827
1828 if s.relayProc != nil {
1829 s.mu.Unlock()
1830 s.stopProcess(s.relayProc, 5*time.Second)
1831 s.mu.Lock()
1832 }
1833
1834 if s.strelaProc != nil && s.cfg.StrelaEnabled {
1835 s.mu.Unlock()
1836 s.stopProcess(s.strelaProc, 5*time.Second)
1837 s.mu.Lock()
1838 }
1839
1840 s.mu.Unlock()
1841 s.stopSyncServices()
1842 s.mu.Lock()
1843
1844 if s.cfg.ACLEnabled && s.aclProc != nil {
1845 s.mu.Unlock()
1846 s.stopProcess(s.aclProc, 5*time.Second)
1847 s.mu.Lock()
1848 }
1849
1850 if s.dbProc != nil {
1851 s.mu.Unlock()
1852 s.stopProcess(s.dbProc, s.cfg.StopTimeout)
1853 s.mu.Lock()
1854 }
1855
1856 if s.lukProc != nil && s.cfg.LukEnabled {
1857 s.mu.Unlock()
1858 s.stopProcess(s.lukProc, 10*time.Second)
1859 s.mu.Lock()
1860 }
1861
1862 if s.nitsProc != nil && s.cfg.NitsEnabled {
1863 s.mu.Unlock()
1864 s.stopProcess(s.nitsProc, 30*time.Second)
1865 s.mu.Lock()
1866 }
1867 s.mu.Unlock()
1868
1869 time.Sleep(500 * time.Millisecond)
1870
1871 // Start again in dependency order: nits → luk → db → acl → sync → relay, strela → certs
1872 if s.cfg.NitsEnabled {
1873 if err := s.startNits(); err != nil {
1874 return fmt.Errorf("failed to restart nits: %w", err)
1875 }
1876 if err := s.waitForNitsReady(s.cfg.NitsReadyTimeout); err != nil {
1877 return fmt.Errorf("nits not ready after restart: %w", err)
1878 }
1879 }
1880
1881 if s.cfg.LukEnabled {
1882 if err := s.startLuk(); err != nil {
1883 return fmt.Errorf("failed to restart luk: %w", err)
1884 }
1885 if err := s.waitForServiceReady(s.cfg.LukRPCListen, s.cfg.LukReadyTimeout); err != nil {
1886 return fmt.Errorf("luk not ready after restart: %w", err)
1887 }
1888 }
1889
1890 if err := s.startDB(); err != nil {
1891 return fmt.Errorf("failed to restart database: %w", err)
1892 }
1893
1894 if err := s.waitForDBReady(s.cfg.DBReadyTimeout); err != nil {
1895 return fmt.Errorf("database not ready after restart: %w", err)
1896 }
1897
1898 if s.cfg.ACLEnabled {
1899 if err := s.startACL(); err != nil {
1900 return fmt.Errorf("failed to restart ACL: %w", err)
1901 }
1902 if err := s.waitForACLReady(s.cfg.ACLReadyTimeout); err != nil {
1903 return fmt.Errorf("ACL not ready after restart: %w", err)
1904 }
1905 }
1906
1907 if err := s.startSyncServices(); err != nil {
1908 return fmt.Errorf("failed to restart sync services: %w", err)
1909 }
1910
1911 if s.cfg.StrelaEnabled {
1912 if err := s.startStrela(); err != nil {
1913 log.W.F("failed to restart strela: %v", err)
1914 }
1915 }
1916
1917 if err := s.startRelay(); err != nil {
1918 return fmt.Errorf("failed to restart relay: %w", err)
1919 }
1920
1921 if s.cfg.CertsEnabled {
1922 if err := s.startCerts(); err != nil {
1923 log.W.F("failed to restart certificate service: %v", err)
1924 }
1925 }
1926
1927 log.I.F("all processes restarted successfully")
1928 return nil
1929 }
1930