main.go raw

   1  // orly-sync-distributed is a standalone gRPC distributed sync service for ORLY.
   2  // It provides serial-based peer-to-peer synchronization between relay instances.
   3  package main
   4  
   5  import (
   6  	"context"
   7  	"net"
   8  	"os"
   9  	"os/signal"
  10  	"syscall"
  11  	"time"
  12  
  13  	"google.golang.org/grpc"
  14  	"google.golang.org/grpc/reflection"
  15  	"next.orly.dev/pkg/lol"
  16  	"next.orly.dev/pkg/lol/chk"
  17  	"next.orly.dev/pkg/lol/log"
  18  
  19  	"next.orly.dev/pkg/database"
  20  	databasegrpc "next.orly.dev/pkg/database/grpc"
  21  	"next.orly.dev/pkg/sync/distributed"
  22  	distributedv1 "next.orly.dev/pkg/proto/orlysync/distributed/v1"
  23  )
  24  
  25  func main() {
  26  	cfg := loadConfig()
  27  
  28  	// Set log level
  29  	lol.SetLogLevel(cfg.LogLevel)
  30  	log.I.F("orly-sync-distributed starting with log level: %s", cfg.LogLevel)
  31  
  32  	ctx, cancel := context.WithCancel(context.Background())
  33  	defer cancel()
  34  
  35  	// Initialize database connection
  36  	var db database.Database
  37  	var dbCloser func()
  38  
  39  	if cfg.DBType == "grpc" {
  40  		log.I.F("connecting to gRPC database server at %s", cfg.GRPCDBServer)
  41  		dbClient, err := databasegrpc.New(ctx, &databasegrpc.ClientConfig{
  42  			ServerAddress:  cfg.GRPCDBServer,
  43  			ConnectTimeout: 30 * time.Second,
  44  		})
  45  		if chk.E(err) {
  46  			log.E.F("failed to connect to database server: %v", err)
  47  			os.Exit(1)
  48  		}
  49  		db = dbClient
  50  		dbCloser = func() { dbClient.Close() }
  51  	} else {
  52  		log.E.F("badger mode not yet implemented for sync-distributed")
  53  		os.Exit(1)
  54  	}
  55  
  56  	// Wait for database to be ready
  57  	log.I.F("waiting for database to be ready...")
  58  	<-db.Ready()
  59  	log.I.F("database ready")
  60  
  61  	// Create sync manager configuration
  62  	syncCfg := &distributed.Config{
  63  		NodeID:        cfg.NodeID,
  64  		RelayURL:      cfg.RelayURL,
  65  		Peers:         cfg.Peers,
  66  		SyncInterval:  cfg.SyncInterval,
  67  		NIP11CacheTTL: cfg.NIP11CacheTTL,
  68  	}
  69  
  70  	// The distributed.Manager currently requires *database.D, not the interface.
  71  	// For gRPC mode, we need the manager to be updated to use the database.Database interface.
  72  	// For now, log that this mode requires refactoring.
  73  	if cfg.DBType == "grpc" {
  74  		log.W.F("distributed sync manager needs refactoring to use database.Database interface")
  75  		log.W.F("currently only stub service is available in gRPC mode")
  76  	}
  77  
  78  	log.I.F("initializing distributed sync manager with %d peers", len(cfg.Peers))
  79  
  80  	// Create gRPC server
  81  	grpcServer := grpc.NewServer(
  82  		grpc.MaxRecvMsgSize(16<<20), // 16MB
  83  		grpc.MaxSendMsgSize(16<<20), // 16MB
  84  	)
  85  
  86  	// Register distributed sync service
  87  	// Note: The manager needs refactoring to use the interface before this works fully
  88  	// For now, register a stub service that will return appropriate responses
  89  	_ = syncCfg
  90  	_ = distributedv1.DistributedSyncService_ServiceDesc
  91  	// TODO: Once distributed.Manager uses database.Database interface:
  92  	// syncMgr := distributed.NewManager(ctx, db, syncCfg, nil)
  93  	// service := distributedserver.NewService(db, syncMgr)
  94  	// distributedv1.RegisterDistributedSyncServiceServer(grpcServer, service)
  95  
  96  	// Register reflection for debugging with grpcurl
  97  	reflection.Register(grpcServer)
  98  
  99  	// Start listening
 100  	lis, err := net.Listen("tcp", cfg.Listen)
 101  	if chk.E(err) {
 102  		log.E.F("failed to listen on %s: %v", cfg.Listen, err)
 103  		os.Exit(1)
 104  	}
 105  	log.I.F("gRPC server listening on %s", cfg.Listen)
 106  
 107  	// Handle graceful shutdown
 108  	go func() {
 109  		sigs := make(chan os.Signal, 1)
 110  		signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
 111  		sig := <-sigs
 112  		log.I.F("received signal %v, shutting down...", sig)
 113  
 114  		// Cancel context to stop all operations
 115  		cancel()
 116  
 117  		// Gracefully stop gRPC server with timeout
 118  		stopped := make(chan struct{})
 119  		go func() {
 120  			grpcServer.GracefulStop()
 121  			close(stopped)
 122  		}()
 123  
 124  		select {
 125  		case <-stopped:
 126  			log.I.F("gRPC server stopped gracefully")
 127  		case <-time.After(5 * time.Second):
 128  			log.W.F("gRPC graceful stop timed out, forcing stop")
 129  			grpcServer.Stop()
 130  		}
 131  
 132  		// Close database connection
 133  		if dbCloser != nil {
 134  			log.I.F("closing database connection...")
 135  			dbCloser()
 136  		}
 137  		log.I.F("shutdown complete")
 138  	}()
 139  
 140  	// Serve gRPC
 141  	if err := grpcServer.Serve(lis); err != nil {
 142  		log.E.F("gRPC server error: %v", err)
 143  	}
 144  }
 145