main.go raw
1 // orly-sync-cluster is a standalone gRPC cluster sync service for ORLY.
2 // It provides cluster replication with persistent state between relay instances.
3 package main
4
5 import (
6 "context"
7 "net"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 "google.golang.org/grpc"
14 "google.golang.org/grpc/reflection"
15 "next.orly.dev/pkg/lol"
16 "next.orly.dev/pkg/lol/chk"
17 "next.orly.dev/pkg/lol/log"
18
19 "next.orly.dev/pkg/database"
20 databasegrpc "next.orly.dev/pkg/database/grpc"
21 "next.orly.dev/pkg/sync/cluster"
22 clusterv1 "next.orly.dev/pkg/proto/orlysync/cluster/v1"
23 )
24
25 func main() {
26 cfg := loadConfig()
27
28 // Set log level
29 lol.SetLogLevel(cfg.LogLevel)
30 log.I.F("orly-sync-cluster starting with log level: %s", cfg.LogLevel)
31
32 ctx, cancel := context.WithCancel(context.Background())
33 defer cancel()
34
35 // Initialize database connection
36 var db database.Database
37 var dbCloser func()
38
39 if cfg.DBType == "grpc" {
40 log.I.F("connecting to gRPC database server at %s", cfg.GRPCDBServer)
41 dbClient, err := databasegrpc.New(ctx, &databasegrpc.ClientConfig{
42 ServerAddress: cfg.GRPCDBServer,
43 ConnectTimeout: 30 * time.Second,
44 })
45 if chk.E(err) {
46 log.E.F("failed to connect to database server: %v", err)
47 os.Exit(1)
48 }
49 db = dbClient
50 dbCloser = func() { dbClient.Close() }
51 } else {
52 log.E.F("badger mode not yet implemented for sync-cluster")
53 os.Exit(1)
54 }
55
56 // Wait for database to be ready
57 log.I.F("waiting for database to be ready...")
58 <-db.Ready()
59 log.I.F("database ready")
60
61 // Create cluster manager configuration
62 clusterCfg := &cluster.Config{
63 AdminNpubs: cfg.AdminNpubs,
64 PropagatePrivilegedEvents: cfg.PropagatePrivilegedEvents,
65 PollInterval: cfg.PollInterval,
66 NIP11CacheTTL: cfg.NIP11CacheTTL,
67 }
68
69 log.I.F("initializing cluster sync manager with %d admins", len(cfg.AdminNpubs))
70
71 // Create gRPC server
72 grpcServer := grpc.NewServer(
73 grpc.MaxRecvMsgSize(16<<20), // 16MB
74 grpc.MaxSendMsgSize(16<<20), // 16MB
75 )
76
77 // Register cluster sync service
78 // TODO: Create and register service once server implementation is done
79 // service := cluster.NewService(clusterMgr, cfg)
80 // clusterv1.RegisterClusterSyncServiceServer(grpcServer, service)
81 _ = clusterCfg
82 _ = db
83 _ = clusterv1.ClusterSyncService_ServiceDesc
84
85 // Register reflection for debugging with grpcurl
86 reflection.Register(grpcServer)
87
88 // Start listening
89 lis, err := net.Listen("tcp", cfg.Listen)
90 if chk.E(err) {
91 log.E.F("failed to listen on %s: %v", cfg.Listen, err)
92 os.Exit(1)
93 }
94 log.I.F("gRPC server listening on %s", cfg.Listen)
95
96 // Handle graceful shutdown
97 go func() {
98 sigs := make(chan os.Signal, 1)
99 signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
100 sig := <-sigs
101 log.I.F("received signal %v, shutting down...", sig)
102
103 cancel()
104
105 stopped := make(chan struct{})
106 go func() {
107 grpcServer.GracefulStop()
108 close(stopped)
109 }()
110
111 select {
112 case <-stopped:
113 log.I.F("gRPC server stopped gracefully")
114 case <-time.After(5 * time.Second):
115 log.W.F("gRPC graceful stop timed out, forcing stop")
116 grpcServer.Stop()
117 }
118
119 if dbCloser != nil {
120 log.I.F("closing database connection...")
121 dbCloser()
122 }
123 log.I.F("shutdown complete")
124 }()
125
126 // Serve gRPC
127 if err := grpcServer.Serve(lis); err != nil {
128 log.E.F("gRPC server error: %v", err)
129 }
130 }
131