main.go raw

   1  // orly-sync-negentropy is a standalone gRPC negentropy sync service for ORLY.
   2  // It provides NIP-77 negentropy-based set reconciliation for both
   3  // relay-to-relay sync and client-facing WebSocket operations.
   4  package main
   5  
   6  import (
   7  	"context"
   8  	"net"
   9  	"os"
  10  	"os/signal"
  11  	"syscall"
  12  	"time"
  13  
  14  	"google.golang.org/grpc"
  15  	"google.golang.org/grpc/reflection"
  16  	"next.orly.dev/pkg/lol"
  17  	"next.orly.dev/pkg/lol/chk"
  18  	"next.orly.dev/pkg/lol/log"
  19  
  20  	"next.orly.dev/pkg/database"
  21  	databasegrpc "next.orly.dev/pkg/database/grpc"
  22  	negentropyv1 "next.orly.dev/pkg/proto/orlysync/negentropy/v1"
  23  	"next.orly.dev/pkg/sync/negentropy"
  24  	negentropyserver "next.orly.dev/pkg/sync/negentropy/server"
  25  )
  26  
  27  func main() {
  28  	cfg := loadConfig()
  29  
  30  	// Set log level
  31  	lol.SetLogLevel(cfg.LogLevel)
  32  	log.I.F("orly-sync-negentropy starting with log level: %s", cfg.LogLevel)
  33  
  34  	ctx, cancel := context.WithCancel(context.Background())
  35  	defer cancel()
  36  
  37  	// Initialize database connection
  38  	var db database.Database
  39  	var dbCloser func()
  40  
  41  	if cfg.DBType == "grpc" {
  42  		log.I.F("connecting to gRPC database server at %s", cfg.GRPCDBServer)
  43  		dbClient, err := databasegrpc.New(ctx, &databasegrpc.ClientConfig{
  44  			ServerAddress:  cfg.GRPCDBServer,
  45  			ConnectTimeout: 30 * time.Second,
  46  		})
  47  		if chk.E(err) {
  48  			log.E.F("failed to connect to database server: %v", err)
  49  			os.Exit(1)
  50  		}
  51  		db = dbClient
  52  		dbCloser = func() { dbClient.Close() }
  53  	} else {
  54  		log.E.F("badger mode not yet implemented for sync-negentropy")
  55  		os.Exit(1)
  56  	}
  57  
  58  	// Wait for database to be ready
  59  	log.I.F("waiting for database to be ready...")
  60  	<-db.Ready()
  61  	log.I.F("database ready")
  62  
  63  	log.I.F("initializing negentropy sync manager with %d peers", len(cfg.Peers))
  64  
  65  	// Create negentropy manager
  66  	mgrConfig := &negentropy.Config{
  67  		Peers:                cfg.Peers,
  68  		SyncInterval:         cfg.SyncInterval,
  69  		FrameSize:            cfg.FrameSize,
  70  		IDSize:               cfg.IDSize,
  71  		ClientSessionTimeout: cfg.ClientSessionTimeout,
  72  		Filter:               cfg.Filter,
  73  	}
  74  	negentropyMgr := negentropy.NewManager(db, mgrConfig)
  75  
  76  	// Start background sync loop if we have peers
  77  	if len(cfg.Peers) > 0 {
  78  		log.I.F("starting background sync with %d peers", len(cfg.Peers))
  79  		negentropyMgr.Start()
  80  	}
  81  
  82  	// Create gRPC server
  83  	grpcServer := grpc.NewServer(
  84  		grpc.MaxRecvMsgSize(16<<20), // 16MB
  85  		grpc.MaxSendMsgSize(16<<20), // 16MB
  86  	)
  87  
  88  	// Register negentropy service
  89  	service := negentropyserver.NewService(db, negentropyMgr)
  90  	negentropyv1.RegisterNegentropyServiceServer(grpcServer, service)
  91  
  92  	// Register reflection for debugging with grpcurl
  93  	reflection.Register(grpcServer)
  94  
  95  	// Start listening
  96  	lis, err := net.Listen("tcp", cfg.Listen)
  97  	if chk.E(err) {
  98  		log.E.F("failed to listen on %s: %v", cfg.Listen, err)
  99  		os.Exit(1)
 100  	}
 101  	log.I.F("gRPC server listening on %s", cfg.Listen)
 102  
 103  	// Handle graceful shutdown
 104  	go func() {
 105  		sigs := make(chan os.Signal, 1)
 106  		signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
 107  		sig := <-sigs
 108  		log.I.F("received signal %v, shutting down...", sig)
 109  
 110  		cancel()
 111  
 112  		stopped := make(chan struct{})
 113  		go func() {
 114  			grpcServer.GracefulStop()
 115  			close(stopped)
 116  		}()
 117  
 118  		select {
 119  		case <-stopped:
 120  			log.I.F("gRPC server stopped gracefully")
 121  		case <-time.After(5 * time.Second):
 122  			log.W.F("gRPC graceful stop timed out, forcing stop")
 123  			grpcServer.Stop()
 124  		}
 125  
 126  		// Stop the negentropy manager
 127  		log.I.F("stopping negentropy manager...")
 128  		negentropyMgr.Stop()
 129  
 130  		if dbCloser != nil {
 131  			log.I.F("closing database connection...")
 132  			dbCloser()
 133  		}
 134  		log.I.F("shutdown complete")
 135  	}()
 136  
 137  	// Serve gRPC
 138  	if err := grpcServer.Serve(lis); err != nil {
 139  		log.E.F("gRPC server error: %v", err)
 140  	}
 141  }
 142