main.go raw

   1  // orly-db is a standalone gRPC database server for the ORLY relay.
   2  // It wraps the Badger database implementation and exposes it via gRPC.
   3  package main
   4  
   5  import (
   6  	"context"
   7  	"net"
   8  	"os"
   9  	"os/signal"
  10  	"syscall"
  11  	"time"
  12  
  13  	"google.golang.org/grpc"
  14  	"google.golang.org/grpc/reflection"
  15  	"next.orly.dev/pkg/lol"
  16  	"next.orly.dev/pkg/lol/chk"
  17  	"next.orly.dev/pkg/lol/log"
  18  
  19  	"next.orly.dev/pkg/database"
  20  	orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
  21  )
  22  
  23  func main() {
  24  	cfg := loadConfig()
  25  
  26  	// Set log level
  27  	lol.SetLogLevel(cfg.LogLevel)
  28  	log.I.F("orly-db starting with log level: %s", cfg.LogLevel)
  29  
  30  	ctx, cancel := context.WithCancel(context.Background())
  31  	defer cancel()
  32  
  33  	// Create database configuration
  34  	dbCfg := &database.DatabaseConfig{
  35  		DataDir:            cfg.DataDir,
  36  		LogLevel:           cfg.LogLevel,
  37  		BlockCacheMB:       cfg.BlockCacheMB,
  38  		IndexCacheMB:       cfg.IndexCacheMB,
  39  		QueryCacheSizeMB:   cfg.QueryCacheSizeMB,
  40  		QueryCacheMaxAge:   cfg.QueryCacheMaxAge,
  41  		QueryCacheDisabled: cfg.QueryCacheDisabled,
  42  		SerialCachePubkeys: cfg.SerialCachePubkeys,
  43  		SerialCacheEventIds: cfg.SerialCacheEventIds,
  44  		ZSTDLevel:          cfg.ZSTDLevel,
  45  	}
  46  
  47  	// Initialize database using existing Badger implementation
  48  	log.I.F("initializing database at %s", cfg.DataDir)
  49  	db, err := database.NewWithConfig(ctx, cancel, dbCfg)
  50  	if chk.E(err) {
  51  		log.E.F("failed to initialize database: %v", err)
  52  		os.Exit(1)
  53  	}
  54  
  55  	// Wait for database to be ready
  56  	log.I.F("waiting for database to be ready...")
  57  	<-db.Ready()
  58  	log.I.F("database ready")
  59  
  60  	// Create gRPC server with large message sizes for events
  61  	grpcServer := grpc.NewServer(
  62  		grpc.MaxRecvMsgSize(64<<20), // 64MB
  63  		grpc.MaxSendMsgSize(64<<20), // 64MB
  64  	)
  65  
  66  	// Register database service
  67  	service := NewDatabaseService(db, cfg)
  68  	orlydbv1.RegisterDatabaseServiceServer(grpcServer, service)
  69  
  70  	// Register reflection for debugging with grpcurl
  71  	reflection.Register(grpcServer)
  72  
  73  	// Start listening
  74  	lis, err := net.Listen("tcp", cfg.Listen)
  75  	if chk.E(err) {
  76  		log.E.F("failed to listen on %s: %v", cfg.Listen, err)
  77  		os.Exit(1)
  78  	}
  79  	log.I.F("gRPC server listening on %s", cfg.Listen)
  80  
  81  	// Handle graceful shutdown
  82  	go func() {
  83  		sigs := make(chan os.Signal, 1)
  84  		signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
  85  		sig := <-sigs
  86  		log.I.F("received signal %v, shutting down...", sig)
  87  
  88  		// Cancel context to stop all operations
  89  		cancel()
  90  
  91  		// Gracefully stop gRPC server with timeout
  92  		stopped := make(chan struct{})
  93  		go func() {
  94  			grpcServer.GracefulStop()
  95  			close(stopped)
  96  		}()
  97  
  98  		select {
  99  		case <-stopped:
 100  			log.I.F("gRPC server stopped gracefully")
 101  		case <-time.After(5 * time.Second):
 102  			log.W.F("gRPC graceful stop timed out, forcing stop")
 103  			grpcServer.Stop()
 104  		}
 105  
 106  		// Sync and close database
 107  		log.I.F("syncing database...")
 108  		if err := db.Sync(); chk.E(err) {
 109  			log.W.F("failed to sync database: %v", err)
 110  		}
 111  		log.I.F("closing database...")
 112  		if err := db.Close(); chk.E(err) {
 113  			log.W.F("failed to close database: %v", err)
 114  		}
 115  		log.I.F("shutdown complete")
 116  	}()
 117  
 118  	// Serve gRPC
 119  	if err := grpcServer.Serve(lis); err != nil {
 120  		log.E.F("gRPC server error: %v", err)
 121  	}
 122  }
 123