main.go raw
1 // orly-sync-distributed is a standalone gRPC distributed sync service for ORLY.
2 // It provides serial-based peer-to-peer synchronization between relay instances.
3 package main
4
5 import (
6 "context"
7 "net"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 "google.golang.org/grpc"
14 "google.golang.org/grpc/reflection"
15 "next.orly.dev/pkg/lol"
16 "next.orly.dev/pkg/lol/chk"
17 "next.orly.dev/pkg/lol/log"
18
19 "next.orly.dev/pkg/database"
20 databasegrpc "next.orly.dev/pkg/database/grpc"
21 "next.orly.dev/pkg/sync/distributed"
22 distributedv1 "next.orly.dev/pkg/proto/orlysync/distributed/v1"
23 )
24
25 func main() {
26 cfg := loadConfig()
27
28 // Set log level
29 lol.SetLogLevel(cfg.LogLevel)
30 log.I.F("orly-sync-distributed starting with log level: %s", cfg.LogLevel)
31
32 ctx, cancel := context.WithCancel(context.Background())
33 defer cancel()
34
35 // Initialize database connection
36 var db database.Database
37 var dbCloser func()
38
39 if cfg.DBType == "grpc" {
40 log.I.F("connecting to gRPC database server at %s", cfg.GRPCDBServer)
41 dbClient, err := databasegrpc.New(ctx, &databasegrpc.ClientConfig{
42 ServerAddress: cfg.GRPCDBServer,
43 ConnectTimeout: 30 * time.Second,
44 })
45 if chk.E(err) {
46 log.E.F("failed to connect to database server: %v", err)
47 os.Exit(1)
48 }
49 db = dbClient
50 dbCloser = func() { dbClient.Close() }
51 } else {
52 log.E.F("badger mode not yet implemented for sync-distributed")
53 os.Exit(1)
54 }
55
56 // Wait for database to be ready
57 log.I.F("waiting for database to be ready...")
58 <-db.Ready()
59 log.I.F("database ready")
60
61 // Create sync manager configuration
62 syncCfg := &distributed.Config{
63 NodeID: cfg.NodeID,
64 RelayURL: cfg.RelayURL,
65 Peers: cfg.Peers,
66 SyncInterval: cfg.SyncInterval,
67 NIP11CacheTTL: cfg.NIP11CacheTTL,
68 }
69
70 // The distributed.Manager currently requires *database.D, not the interface.
71 // For gRPC mode, we need the manager to be updated to use the database.Database interface.
72 // For now, log that this mode requires refactoring.
73 if cfg.DBType == "grpc" {
74 log.W.F("distributed sync manager needs refactoring to use database.Database interface")
75 log.W.F("currently only stub service is available in gRPC mode")
76 }
77
78 log.I.F("initializing distributed sync manager with %d peers", len(cfg.Peers))
79
80 // Create gRPC server
81 grpcServer := grpc.NewServer(
82 grpc.MaxRecvMsgSize(16<<20), // 16MB
83 grpc.MaxSendMsgSize(16<<20), // 16MB
84 )
85
86 // Register distributed sync service
87 // Note: The manager needs refactoring to use the interface before this works fully
88 // For now, register a stub service that will return appropriate responses
89 _ = syncCfg
90 _ = distributedv1.DistributedSyncService_ServiceDesc
91 // TODO: Once distributed.Manager uses database.Database interface:
92 // syncMgr := distributed.NewManager(ctx, db, syncCfg, nil)
93 // service := distributedserver.NewService(db, syncMgr)
94 // distributedv1.RegisterDistributedSyncServiceServer(grpcServer, service)
95
96 // Register reflection for debugging with grpcurl
97 reflection.Register(grpcServer)
98
99 // Start listening
100 lis, err := net.Listen("tcp", cfg.Listen)
101 if chk.E(err) {
102 log.E.F("failed to listen on %s: %v", cfg.Listen, err)
103 os.Exit(1)
104 }
105 log.I.F("gRPC server listening on %s", cfg.Listen)
106
107 // Handle graceful shutdown
108 go func() {
109 sigs := make(chan os.Signal, 1)
110 signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
111 sig := <-sigs
112 log.I.F("received signal %v, shutting down...", sig)
113
114 // Cancel context to stop all operations
115 cancel()
116
117 // Gracefully stop gRPC server with timeout
118 stopped := make(chan struct{})
119 go func() {
120 grpcServer.GracefulStop()
121 close(stopped)
122 }()
123
124 select {
125 case <-stopped:
126 log.I.F("gRPC server stopped gracefully")
127 case <-time.After(5 * time.Second):
128 log.W.F("gRPC graceful stop timed out, forcing stop")
129 grpcServer.Stop()
130 }
131
132 // Close database connection
133 if dbCloser != nil {
134 log.I.F("closing database connection...")
135 dbCloser()
136 }
137 log.I.F("shutdown complete")
138 }()
139
140 // Serve gRPC
141 if err := grpcServer.Serve(lis); err != nil {
142 log.E.F("gRPC server error: %v", err)
143 }
144 }
145