server.go raw
1 package server
2
3 import (
4 "context"
5 "net"
6 "os"
7 "os/signal"
8 "syscall"
9 "time"
10
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/reflection"
13 "next.orly.dev/pkg/lol/chk"
14 "next.orly.dev/pkg/lol/log"
15
16 "next.orly.dev/pkg/database"
17 orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
18 )
19
20 // Server wraps a gRPC database server.
21 type Server struct {
22 grpcServer *grpc.Server
23 db database.Database
24 cfg *Config
25 listener net.Listener
26 }
27
28 // New creates a new database gRPC server.
29 func New(db database.Database, cfg *Config) *Server {
30 // Create gRPC server with large message sizes for events
31 grpcServer := grpc.NewServer(
32 grpc.MaxRecvMsgSize(64<<20), // 64MB
33 grpc.MaxSendMsgSize(64<<20), // 64MB
34 )
35
36 // Register database service
37 service := NewDatabaseService(db, cfg)
38 orlydbv1.RegisterDatabaseServiceServer(grpcServer, service)
39
40 // Register reflection for debugging with grpcurl
41 reflection.Register(grpcServer)
42
43 return &Server{
44 grpcServer: grpcServer,
45 db: db,
46 cfg: cfg,
47 }
48 }
49
50 // ListenAndServe starts the gRPC server.
51 func (s *Server) ListenAndServe(ctx context.Context, cancel context.CancelFunc) error {
52 // Start listening
53 lis, err := net.Listen("tcp", s.cfg.Listen)
54 if chk.E(err) {
55 return err
56 }
57 s.listener = lis
58 log.I.F("gRPC database server listening on %s", s.cfg.Listen)
59
60 // Handle graceful shutdown
61 go s.handleShutdown(ctx, cancel)
62
63 // Serve gRPC
64 return s.grpcServer.Serve(lis)
65 }
66
67 func (s *Server) handleShutdown(ctx context.Context, cancel context.CancelFunc) {
68 sigs := make(chan os.Signal, 1)
69 signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
70
71 select {
72 case sig := <-sigs:
73 log.I.F("received signal %v, shutting down...", sig)
74 case <-ctx.Done():
75 log.I.F("context cancelled, shutting down...")
76 }
77
78 // Cancel context to stop all operations
79 cancel()
80
81 // Gracefully stop gRPC server with timeout
82 stopped := make(chan struct{})
83 go func() {
84 s.grpcServer.GracefulStop()
85 close(stopped)
86 }()
87
88 select {
89 case <-stopped:
90 log.I.F("gRPC server stopped gracefully")
91 case <-time.After(5 * time.Second):
92 log.W.F("gRPC graceful stop timed out, forcing stop")
93 s.grpcServer.Stop()
94 }
95
96 // Sync and close database
97 log.I.F("syncing database...")
98 if err := s.db.Sync(); chk.E(err) {
99 log.W.F("failed to sync database: %v", err)
100 }
101 log.I.F("closing database...")
102 if err := s.db.Close(); chk.E(err) {
103 log.W.F("failed to close database: %v", err)
104 }
105 log.I.F("shutdown complete")
106 }
107
108 // Stop stops the server.
109 func (s *Server) Stop() {
110 s.grpcServer.Stop()
111 }
112