main.go raw
1 // orly-sync-negentropy is a standalone gRPC negentropy sync service for ORLY.
2 // It provides NIP-77 negentropy-based set reconciliation for both
3 // relay-to-relay sync and client-facing WebSocket operations.
4 package main
5
6 import (
7 "context"
8 "net"
9 "os"
10 "os/signal"
11 "syscall"
12 "time"
13
14 "google.golang.org/grpc"
15 "google.golang.org/grpc/reflection"
16 "next.orly.dev/pkg/lol"
17 "next.orly.dev/pkg/lol/chk"
18 "next.orly.dev/pkg/lol/log"
19
20 "next.orly.dev/pkg/database"
21 databasegrpc "next.orly.dev/pkg/database/grpc"
22 negentropyv1 "next.orly.dev/pkg/proto/orlysync/negentropy/v1"
23 "next.orly.dev/pkg/sync/negentropy"
24 negentropyserver "next.orly.dev/pkg/sync/negentropy/server"
25 )
26
27 func main() {
28 cfg := loadConfig()
29
30 // Set log level
31 lol.SetLogLevel(cfg.LogLevel)
32 log.I.F("orly-sync-negentropy starting with log level: %s", cfg.LogLevel)
33
34 ctx, cancel := context.WithCancel(context.Background())
35 defer cancel()
36
37 // Initialize database connection
38 var db database.Database
39 var dbCloser func()
40
41 if cfg.DBType == "grpc" {
42 log.I.F("connecting to gRPC database server at %s", cfg.GRPCDBServer)
43 dbClient, err := databasegrpc.New(ctx, &databasegrpc.ClientConfig{
44 ServerAddress: cfg.GRPCDBServer,
45 ConnectTimeout: 30 * time.Second,
46 })
47 if chk.E(err) {
48 log.E.F("failed to connect to database server: %v", err)
49 os.Exit(1)
50 }
51 db = dbClient
52 dbCloser = func() { dbClient.Close() }
53 } else {
54 log.E.F("badger mode not yet implemented for sync-negentropy")
55 os.Exit(1)
56 }
57
58 // Wait for database to be ready
59 log.I.F("waiting for database to be ready...")
60 <-db.Ready()
61 log.I.F("database ready")
62
63 log.I.F("initializing negentropy sync manager with %d peers", len(cfg.Peers))
64
65 // Create negentropy manager
66 mgrConfig := &negentropy.Config{
67 Peers: cfg.Peers,
68 SyncInterval: cfg.SyncInterval,
69 FrameSize: cfg.FrameSize,
70 IDSize: cfg.IDSize,
71 ClientSessionTimeout: cfg.ClientSessionTimeout,
72 Filter: cfg.Filter,
73 }
74 negentropyMgr := negentropy.NewManager(db, mgrConfig)
75
76 // Start background sync loop if we have peers
77 if len(cfg.Peers) > 0 {
78 log.I.F("starting background sync with %d peers", len(cfg.Peers))
79 negentropyMgr.Start()
80 }
81
82 // Create gRPC server
83 grpcServer := grpc.NewServer(
84 grpc.MaxRecvMsgSize(16<<20), // 16MB
85 grpc.MaxSendMsgSize(16<<20), // 16MB
86 )
87
88 // Register negentropy service
89 service := negentropyserver.NewService(db, negentropyMgr)
90 negentropyv1.RegisterNegentropyServiceServer(grpcServer, service)
91
92 // Register reflection for debugging with grpcurl
93 reflection.Register(grpcServer)
94
95 // Start listening
96 lis, err := net.Listen("tcp", cfg.Listen)
97 if chk.E(err) {
98 log.E.F("failed to listen on %s: %v", cfg.Listen, err)
99 os.Exit(1)
100 }
101 log.I.F("gRPC server listening on %s", cfg.Listen)
102
103 // Handle graceful shutdown
104 go func() {
105 sigs := make(chan os.Signal, 1)
106 signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
107 sig := <-sigs
108 log.I.F("received signal %v, shutting down...", sig)
109
110 cancel()
111
112 stopped := make(chan struct{})
113 go func() {
114 grpcServer.GracefulStop()
115 close(stopped)
116 }()
117
118 select {
119 case <-stopped:
120 log.I.F("gRPC server stopped gracefully")
121 case <-time.After(5 * time.Second):
122 log.W.F("gRPC graceful stop timed out, forcing stop")
123 grpcServer.Stop()
124 }
125
126 // Stop the negentropy manager
127 log.I.F("stopping negentropy manager...")
128 negentropyMgr.Stop()
129
130 if dbCloser != nil {
131 log.I.F("closing database connection...")
132 dbCloser()
133 }
134 log.I.F("shutdown complete")
135 }()
136
137 // Serve gRPC
138 if err := grpcServer.Serve(lis); err != nil {
139 log.E.F("gRPC server error: %v", err)
140 }
141 }
142