main.go raw

   1  // orly-sync-cluster is a standalone gRPC cluster sync service for ORLY.
   2  // It provides cluster replication with persistent state between relay instances.
   3  package main
   4  
   5  import (
   6  	"context"
   7  	"net"
   8  	"os"
   9  	"os/signal"
  10  	"syscall"
  11  	"time"
  12  
  13  	"google.golang.org/grpc"
  14  	"google.golang.org/grpc/reflection"
  15  	"next.orly.dev/pkg/lol"
  16  	"next.orly.dev/pkg/lol/chk"
  17  	"next.orly.dev/pkg/lol/log"
  18  
  19  	"next.orly.dev/pkg/database"
  20  	databasegrpc "next.orly.dev/pkg/database/grpc"
  21  	"next.orly.dev/pkg/sync/cluster"
  22  	clusterv1 "next.orly.dev/pkg/proto/orlysync/cluster/v1"
  23  )
  24  
  25  func main() {
  26  	cfg := loadConfig()
  27  
  28  	// Set log level
  29  	lol.SetLogLevel(cfg.LogLevel)
  30  	log.I.F("orly-sync-cluster starting with log level: %s", cfg.LogLevel)
  31  
  32  	ctx, cancel := context.WithCancel(context.Background())
  33  	defer cancel()
  34  
  35  	// Initialize database connection
  36  	var db database.Database
  37  	var dbCloser func()
  38  
  39  	if cfg.DBType == "grpc" {
  40  		log.I.F("connecting to gRPC database server at %s", cfg.GRPCDBServer)
  41  		dbClient, err := databasegrpc.New(ctx, &databasegrpc.ClientConfig{
  42  			ServerAddress:  cfg.GRPCDBServer,
  43  			ConnectTimeout: 30 * time.Second,
  44  		})
  45  		if chk.E(err) {
  46  			log.E.F("failed to connect to database server: %v", err)
  47  			os.Exit(1)
  48  		}
  49  		db = dbClient
  50  		dbCloser = func() { dbClient.Close() }
  51  	} else {
  52  		log.E.F("badger mode not yet implemented for sync-cluster")
  53  		os.Exit(1)
  54  	}
  55  
  56  	// Wait for database to be ready
  57  	log.I.F("waiting for database to be ready...")
  58  	<-db.Ready()
  59  	log.I.F("database ready")
  60  
  61  	// Create cluster manager configuration
  62  	clusterCfg := &cluster.Config{
  63  		AdminNpubs:                cfg.AdminNpubs,
  64  		PropagatePrivilegedEvents: cfg.PropagatePrivilegedEvents,
  65  		PollInterval:              cfg.PollInterval,
  66  		NIP11CacheTTL:             cfg.NIP11CacheTTL,
  67  	}
  68  
  69  	log.I.F("initializing cluster sync manager with %d admins", len(cfg.AdminNpubs))
  70  
  71  	// Create gRPC server
  72  	grpcServer := grpc.NewServer(
  73  		grpc.MaxRecvMsgSize(16<<20), // 16MB
  74  		grpc.MaxSendMsgSize(16<<20), // 16MB
  75  	)
  76  
  77  	// Register cluster sync service
  78  	// TODO: Create and register service once server implementation is done
  79  	// service := cluster.NewService(clusterMgr, cfg)
  80  	// clusterv1.RegisterClusterSyncServiceServer(grpcServer, service)
  81  	_ = clusterCfg
  82  	_ = db
  83  	_ = clusterv1.ClusterSyncService_ServiceDesc
  84  
  85  	// Register reflection for debugging with grpcurl
  86  	reflection.Register(grpcServer)
  87  
  88  	// Start listening
  89  	lis, err := net.Listen("tcp", cfg.Listen)
  90  	if chk.E(err) {
  91  		log.E.F("failed to listen on %s: %v", cfg.Listen, err)
  92  		os.Exit(1)
  93  	}
  94  	log.I.F("gRPC server listening on %s", cfg.Listen)
  95  
  96  	// Handle graceful shutdown
  97  	go func() {
  98  		sigs := make(chan os.Signal, 1)
  99  		signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
 100  		sig := <-sigs
 101  		log.I.F("received signal %v, shutting down...", sig)
 102  
 103  		cancel()
 104  
 105  		stopped := make(chan struct{})
 106  		go func() {
 107  			grpcServer.GracefulStop()
 108  			close(stopped)
 109  		}()
 110  
 111  		select {
 112  		case <-stopped:
 113  			log.I.F("gRPC server stopped gracefully")
 114  		case <-time.After(5 * time.Second):
 115  			log.W.F("gRPC graceful stop timed out, forcing stop")
 116  			grpcServer.Stop()
 117  		}
 118  
 119  		if dbCloser != nil {
 120  			log.I.F("closing database connection...")
 121  			dbCloser()
 122  		}
 123  		log.I.F("shutdown complete")
 124  	}()
 125  
 126  	// Serve gRPC
 127  	if err := grpcServer.Serve(lis); err != nil {
 128  		log.E.F("gRPC server error: %v", err)
 129  	}
 130  }
 131