server.go raw

   1  package server
   2  
   3  import (
   4  	"context"
   5  	"net"
   6  	"os"
   7  	"os/signal"
   8  	"syscall"
   9  	"time"
  10  
  11  	"google.golang.org/grpc"
  12  	"google.golang.org/grpc/reflection"
  13  	"next.orly.dev/pkg/lol/chk"
  14  	"next.orly.dev/pkg/lol/log"
  15  
  16  	"next.orly.dev/app/config"
  17  	"next.orly.dev/pkg/acl"
  18  	"next.orly.dev/pkg/database"
  19  	orlyaclv1 "next.orly.dev/pkg/proto/orlyacl/v1"
  20  )
  21  
  22  // Server wraps a gRPC ACL server.
  23  type Server struct {
  24  	grpcServer *grpc.Server
  25  	db         database.Database
  26  	cfg        *Config
  27  	listener   net.Listener
  28  	ownsDB     bool // Whether we own the database and should close it
  29  }
  30  
  31  // New creates a new ACL gRPC server.
  32  func New(db database.Database, cfg *Config, ownsDB bool) *Server {
  33  	// Create gRPC server
  34  	grpcServer := grpc.NewServer(
  35  		grpc.MaxRecvMsgSize(16<<20), // 16MB
  36  		grpc.MaxSendMsgSize(16<<20), // 16MB
  37  	)
  38  
  39  	// Register ACL service
  40  	service := NewACLService(cfg, db)
  41  	orlyaclv1.RegisterACLServiceServer(grpcServer, service)
  42  
  43  	// Register reflection for debugging with grpcurl
  44  	reflection.Register(grpcServer)
  45  
  46  	return &Server{
  47  		grpcServer: grpcServer,
  48  		db:         db,
  49  		cfg:        cfg,
  50  		ownsDB:     ownsDB,
  51  	}
  52  }
  53  
  54  // ConfigureACL sets up the ACL mode and configures the registry.
  55  func (s *Server) ConfigureACL(ctx context.Context) error {
  56  	// Create app config for ACL configuration
  57  	appCfg := &config.C{
  58  		Owners:                  s.cfg.Owners,
  59  		Admins:                  s.cfg.Admins,
  60  		BootstrapRelays:         s.cfg.BootstrapRelays,
  61  		RelayAddresses:          s.cfg.RelayAddresses,
  62  		FollowListFrequency:     s.cfg.FollowListFrequency,
  63  		FollowsThrottleEnabled:  s.cfg.FollowsThrottleEnabled,
  64  		FollowsThrottlePerEvent: s.cfg.FollowsThrottlePerEvent,
  65  		FollowsThrottleMaxDelay: s.cfg.FollowsThrottleMaxDelay,
  66  	}
  67  
  68  	// Set ACL mode and configure the registry
  69  	acl.Registry.SetMode(s.cfg.ACLMode)
  70  	if err := acl.Registry.Configure(appCfg, s.db, ctx); chk.E(err) {
  71  		return err
  72  	}
  73  
  74  	// Start the syncer goroutine for background operations
  75  	acl.Registry.Syncer()
  76  	log.I.F("ACL syncer started for mode: %s", s.cfg.ACLMode)
  77  
  78  	return nil
  79  }
  80  
  81  // ListenAndServe starts the gRPC server.
  82  func (s *Server) ListenAndServe(ctx context.Context, cancel context.CancelFunc) error {
  83  	// Start listening
  84  	lis, err := net.Listen("tcp", s.cfg.Listen)
  85  	if chk.E(err) {
  86  		return err
  87  	}
  88  	s.listener = lis
  89  	log.I.F("gRPC ACL server listening on %s", s.cfg.Listen)
  90  
  91  	// Handle graceful shutdown
  92  	go s.handleShutdown(ctx, cancel)
  93  
  94  	// Serve gRPC
  95  	return s.grpcServer.Serve(lis)
  96  }
  97  
  98  func (s *Server) handleShutdown(ctx context.Context, cancel context.CancelFunc) {
  99  	sigs := make(chan os.Signal, 1)
 100  	signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
 101  
 102  	select {
 103  	case sig := <-sigs:
 104  		log.I.F("received signal %v, shutting down...", sig)
 105  	case <-ctx.Done():
 106  		log.I.F("context cancelled, shutting down...")
 107  	}
 108  
 109  	// Cancel context to stop all operations
 110  	cancel()
 111  
 112  	// Gracefully stop gRPC server with timeout
 113  	stopped := make(chan struct{})
 114  	go func() {
 115  		s.grpcServer.GracefulStop()
 116  		close(stopped)
 117  	}()
 118  
 119  	select {
 120  	case <-stopped:
 121  		log.I.F("gRPC server stopped gracefully")
 122  	case <-time.After(5 * time.Second):
 123  		log.W.F("gRPC graceful stop timed out, forcing stop")
 124  		s.grpcServer.Stop()
 125  	}
 126  
 127  	// Sync and close database if we own it
 128  	if s.ownsDB {
 129  		log.I.F("syncing database...")
 130  		if err := s.db.Sync(); chk.E(err) {
 131  			log.W.F("failed to sync database: %v", err)
 132  		}
 133  		log.I.F("closing database...")
 134  		if err := s.db.Close(); chk.E(err) {
 135  			log.W.F("failed to close database: %v", err)
 136  		}
 137  	}
 138  	log.I.F("shutdown complete")
 139  }
 140  
 141  // Stop stops the server.
 142  func (s *Server) Stop() {
 143  	s.grpcServer.Stop()
 144  }
 145