server.go raw
1 package server
2
3 import (
4 "context"
5 "net"
6 "os"
7 "os/signal"
8 "syscall"
9 "time"
10
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/reflection"
13 "next.orly.dev/pkg/lol/chk"
14 "next.orly.dev/pkg/lol/log"
15
16 "next.orly.dev/app/config"
17 "next.orly.dev/pkg/acl"
18 "next.orly.dev/pkg/database"
19 orlyaclv1 "next.orly.dev/pkg/proto/orlyacl/v1"
20 )
21
22 // Server wraps a gRPC ACL server.
23 type Server struct {
24 grpcServer *grpc.Server
25 db database.Database
26 cfg *Config
27 listener net.Listener
28 ownsDB bool // Whether we own the database and should close it
29 }
30
31 // New creates a new ACL gRPC server.
32 func New(db database.Database, cfg *Config, ownsDB bool) *Server {
33 // Create gRPC server
34 grpcServer := grpc.NewServer(
35 grpc.MaxRecvMsgSize(16<<20), // 16MB
36 grpc.MaxSendMsgSize(16<<20), // 16MB
37 )
38
39 // Register ACL service
40 service := NewACLService(cfg, db)
41 orlyaclv1.RegisterACLServiceServer(grpcServer, service)
42
43 // Register reflection for debugging with grpcurl
44 reflection.Register(grpcServer)
45
46 return &Server{
47 grpcServer: grpcServer,
48 db: db,
49 cfg: cfg,
50 ownsDB: ownsDB,
51 }
52 }
53
54 // ConfigureACL sets up the ACL mode and configures the registry.
55 func (s *Server) ConfigureACL(ctx context.Context) error {
56 // Create app config for ACL configuration
57 appCfg := &config.C{
58 Owners: s.cfg.Owners,
59 Admins: s.cfg.Admins,
60 BootstrapRelays: s.cfg.BootstrapRelays,
61 RelayAddresses: s.cfg.RelayAddresses,
62 FollowListFrequency: s.cfg.FollowListFrequency,
63 FollowsThrottleEnabled: s.cfg.FollowsThrottleEnabled,
64 FollowsThrottlePerEvent: s.cfg.FollowsThrottlePerEvent,
65 FollowsThrottleMaxDelay: s.cfg.FollowsThrottleMaxDelay,
66 }
67
68 // Set ACL mode and configure the registry
69 acl.Registry.SetMode(s.cfg.ACLMode)
70 if err := acl.Registry.Configure(appCfg, s.db, ctx); chk.E(err) {
71 return err
72 }
73
74 // Start the syncer goroutine for background operations
75 acl.Registry.Syncer()
76 log.I.F("ACL syncer started for mode: %s", s.cfg.ACLMode)
77
78 return nil
79 }
80
81 // ListenAndServe starts the gRPC server.
82 func (s *Server) ListenAndServe(ctx context.Context, cancel context.CancelFunc) error {
83 // Start listening
84 lis, err := net.Listen("tcp", s.cfg.Listen)
85 if chk.E(err) {
86 return err
87 }
88 s.listener = lis
89 log.I.F("gRPC ACL server listening on %s", s.cfg.Listen)
90
91 // Handle graceful shutdown
92 go s.handleShutdown(ctx, cancel)
93
94 // Serve gRPC
95 return s.grpcServer.Serve(lis)
96 }
97
98 func (s *Server) handleShutdown(ctx context.Context, cancel context.CancelFunc) {
99 sigs := make(chan os.Signal, 1)
100 signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
101
102 select {
103 case sig := <-sigs:
104 log.I.F("received signal %v, shutting down...", sig)
105 case <-ctx.Done():
106 log.I.F("context cancelled, shutting down...")
107 }
108
109 // Cancel context to stop all operations
110 cancel()
111
112 // Gracefully stop gRPC server with timeout
113 stopped := make(chan struct{})
114 go func() {
115 s.grpcServer.GracefulStop()
116 close(stopped)
117 }()
118
119 select {
120 case <-stopped:
121 log.I.F("gRPC server stopped gracefully")
122 case <-time.After(5 * time.Second):
123 log.W.F("gRPC graceful stop timed out, forcing stop")
124 s.grpcServer.Stop()
125 }
126
127 // Sync and close database if we own it
128 if s.ownsDB {
129 log.I.F("syncing database...")
130 if err := s.db.Sync(); chk.E(err) {
131 log.W.F("failed to sync database: %v", err)
132 }
133 log.I.F("closing database...")
134 if err := s.db.Close(); chk.E(err) {
135 log.W.F("failed to close database: %v", err)
136 }
137 }
138 log.I.F("shutdown complete")
139 }
140
141 // Stop stops the server.
142 func (s *Server) Stop() {
143 s.grpcServer.Stop()
144 }
145