server.go raw

   1  package server
   2  
   3  import (
   4  	"context"
   5  	"net"
   6  	"os"
   7  	"os/signal"
   8  	"syscall"
   9  	"time"
  10  
  11  	"google.golang.org/grpc"
  12  	"google.golang.org/grpc/reflection"
  13  	"next.orly.dev/pkg/lol/chk"
  14  	"next.orly.dev/pkg/lol/log"
  15  
  16  	"next.orly.dev/pkg/database"
  17  	orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
  18  )
  19  
  20  // Server wraps a gRPC database server.
  21  type Server struct {
  22  	grpcServer *grpc.Server
  23  	db         database.Database
  24  	cfg        *Config
  25  	listener   net.Listener
  26  }
  27  
  28  // New creates a new database gRPC server.
  29  func New(db database.Database, cfg *Config) *Server {
  30  	// Create gRPC server with large message sizes for events
  31  	grpcServer := grpc.NewServer(
  32  		grpc.MaxRecvMsgSize(64<<20), // 64MB
  33  		grpc.MaxSendMsgSize(64<<20), // 64MB
  34  	)
  35  
  36  	// Register database service
  37  	service := NewDatabaseService(db, cfg)
  38  	orlydbv1.RegisterDatabaseServiceServer(grpcServer, service)
  39  
  40  	// Register reflection for debugging with grpcurl
  41  	reflection.Register(grpcServer)
  42  
  43  	return &Server{
  44  		grpcServer: grpcServer,
  45  		db:         db,
  46  		cfg:        cfg,
  47  	}
  48  }
  49  
  50  // ListenAndServe starts the gRPC server.
  51  func (s *Server) ListenAndServe(ctx context.Context, cancel context.CancelFunc) error {
  52  	// Start listening
  53  	lis, err := net.Listen("tcp", s.cfg.Listen)
  54  	if chk.E(err) {
  55  		return err
  56  	}
  57  	s.listener = lis
  58  	log.I.F("gRPC database server listening on %s", s.cfg.Listen)
  59  
  60  	// Handle graceful shutdown
  61  	go s.handleShutdown(ctx, cancel)
  62  
  63  	// Serve gRPC
  64  	return s.grpcServer.Serve(lis)
  65  }
  66  
  67  func (s *Server) handleShutdown(ctx context.Context, cancel context.CancelFunc) {
  68  	sigs := make(chan os.Signal, 1)
  69  	signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
  70  
  71  	select {
  72  	case sig := <-sigs:
  73  		log.I.F("received signal %v, shutting down...", sig)
  74  	case <-ctx.Done():
  75  		log.I.F("context cancelled, shutting down...")
  76  	}
  77  
  78  	// Cancel context to stop all operations
  79  	cancel()
  80  
  81  	// Gracefully stop gRPC server with timeout
  82  	stopped := make(chan struct{})
  83  	go func() {
  84  		s.grpcServer.GracefulStop()
  85  		close(stopped)
  86  	}()
  87  
  88  	select {
  89  	case <-stopped:
  90  		log.I.F("gRPC server stopped gracefully")
  91  	case <-time.After(5 * time.Second):
  92  		log.W.F("gRPC graceful stop timed out, forcing stop")
  93  		s.grpcServer.Stop()
  94  	}
  95  
  96  	// Sync and close database
  97  	log.I.F("syncing database...")
  98  	if err := s.db.Sync(); chk.E(err) {
  99  		log.W.F("failed to sync database: %v", err)
 100  	}
 101  	log.I.F("closing database...")
 102  	if err := s.db.Close(); chk.E(err) {
 103  		log.W.F("failed to close database: %v", err)
 104  	}
 105  	log.I.F("shutdown complete")
 106  }
 107  
 108  // Stop stops the server.
 109  func (s *Server) Stop() {
 110  	s.grpcServer.Stop()
 111  }
 112