database.go raw

   1  //go:build !(js && wasm)
   2  
   3  package database
   4  
   5  import (
   6  	"context"
   7  	"errors"
   8  	"os"
   9  	"path/filepath"
  10  	"time"
  11  
  12  	"github.com/dgraph-io/badger/v4"
  13  	"github.com/dgraph-io/badger/v4/options"
  14  	"next.orly.dev/pkg/lol"
  15  	"next.orly.dev/pkg/lol/chk"
  16  	"next.orly.dev/pkg/lol/log"
  17  	"next.orly.dev/pkg/database/querycache"
  18  	"next.orly.dev/pkg/nostr/encoders/event"
  19  	"next.orly.dev/pkg/nostr/encoders/filter"
  20  	"next.orly.dev/pkg/utils/apputil"
  21  	"next.orly.dev/pkg/nostr/utils/units"
  22  )
  23  
  24  // DefaultMaxConcurrentQueries limits concurrent database queries to prevent memory exhaustion.
  25  // Each query creates Badger iterators that consume significant memory. With many concurrent
  26  // connections, unlimited queries can exhaust available memory in seconds.
  27  // Set very low (3) because each query can internally create many iterators.
  28  const DefaultMaxConcurrentQueries = 3
  29  
  30  // RateLimiterInterface defines the minimal interface for rate limiting during import
  31  type RateLimiterInterface interface {
  32  	IsEnabled() bool
  33  	Wait(ctx context.Context, opType int) time.Duration
  34  }
  35  
  36  // WriteOpType is the operation type constant for write operations
  37  const WriteOpType = 1
  38  
  39  // D implements the Database interface using Badger as the storage backend
  40  type D struct {
  41  	ctx     context.Context
  42  	cancel  context.CancelFunc
  43  	dataDir string
  44  	Logger  *logger
  45  	*badger.DB
  46  	seq        *badger.Sequence
  47  	pubkeySeq  *badger.Sequence // Sequence for pubkey serials
  48  	ready      chan struct{}    // Closed when database is ready to serve requests
  49  	queryCache *querycache.EventCache
  50  
  51  	// Serial cache for compact event storage
  52  	// Caches pubkey and event ID serial mappings for fast compact event decoding
  53  	serialCache *SerialCache
  54  
  55  	// Rate limiter for controlling memory pressure during bulk operations
  56  	rateLimiter RateLimiterInterface
  57  
  58  	// Query semaphore limits concurrent database queries to prevent memory exhaustion.
  59  	// Each query with iterators consumes significant memory; this prevents OOM under load.
  60  	querySem chan struct{}
  61  }
  62  
  63  // SetRateLimiter sets the rate limiter for controlling memory during import/export
  64  func (d *D) SetRateLimiter(limiter RateLimiterInterface) {
  65  	d.rateLimiter = limiter
  66  }
  67  
  68  // Ensure D implements Database interface at compile time
  69  var _ Database = (*D)(nil)
  70  
  71  // New creates a new Badger database instance with default configuration.
  72  // This is provided for backward compatibility with existing callers.
  73  // For full configuration control, use NewWithConfig instead.
  74  func New(
  75  	ctx context.Context, cancel context.CancelFunc, dataDir, logLevel string,
  76  ) (
  77  	d *D, err error,
  78  ) {
  79  	// Create a default config for backward compatibility
  80  	cfg := &DatabaseConfig{
  81  		DataDir:          dataDir,
  82  		LogLevel:         logLevel,
  83  		BlockCacheMB:     1024,            // Default 1024 MB
  84  		IndexCacheMB:     512,             // Default 512 MB
  85  		QueryCacheSizeMB: 512,             // Default 512 MB
  86  		QueryCacheMaxAge: 5 * time.Minute, // Default 5 minutes
  87  	}
  88  	return NewWithConfig(ctx, cancel, cfg)
  89  }
  90  
  91  // NewWithConfig creates a new Badger database instance with full configuration.
  92  // This is the preferred method when you have access to DatabaseConfig.
  93  func NewWithConfig(
  94  	ctx context.Context, cancel context.CancelFunc, cfg *DatabaseConfig,
  95  ) (
  96  	d *D, err error,
  97  ) {
  98  	// Apply defaults for zero values (backward compatibility)
  99  	blockCacheMB := cfg.BlockCacheMB
 100  	if blockCacheMB == 0 {
 101  		blockCacheMB = 1024 // Default 1024 MB
 102  	}
 103  	indexCacheMB := cfg.IndexCacheMB
 104  	if indexCacheMB == 0 {
 105  		indexCacheMB = 512 // Default 512 MB
 106  	}
 107  	queryCacheSizeMB := cfg.QueryCacheSizeMB
 108  	if queryCacheSizeMB == 0 {
 109  		queryCacheSizeMB = 512 // Default 512 MB
 110  	}
 111  	queryCacheMaxAge := cfg.QueryCacheMaxAge
 112  	if queryCacheMaxAge == 0 {
 113  		queryCacheMaxAge = 5 * time.Minute // Default 5 minutes
 114  	}
 115  
 116  	// Serial cache configuration for compact event storage
 117  	serialCachePubkeys := cfg.SerialCachePubkeys
 118  	if serialCachePubkeys == 0 {
 119  		serialCachePubkeys = 100000 // Default 100k pubkeys (~3.2MB memory)
 120  	}
 121  	serialCacheEventIds := cfg.SerialCacheEventIds
 122  	if serialCacheEventIds == 0 {
 123  		serialCacheEventIds = 500000 // Default 500k event IDs (~16MB memory)
 124  	}
 125  
 126  	// ZSTD compression level configuration
 127  	// Level 0 = disabled, 1 = fast (~500 MB/s), 3 = default, 9 = best ratio
 128  	zstdLevel := cfg.ZSTDLevel
 129  	if zstdLevel < 0 {
 130  		zstdLevel = 0
 131  	} else if zstdLevel > 19 {
 132  		zstdLevel = 19 // ZSTD maximum level
 133  	}
 134  
 135  	queryCacheSize := int64(queryCacheSizeMB * 1024 * 1024)
 136  
 137  	// Create query cache only if not disabled
 138  	var qc *querycache.EventCache
 139  	if !cfg.QueryCacheDisabled {
 140  		qc = querycache.NewEventCache(queryCacheSize, queryCacheMaxAge)
 141  	}
 142  
 143  	d = &D{
 144  		ctx:         ctx,
 145  		cancel:      cancel,
 146  		dataDir:     cfg.DataDir,
 147  		Logger:      NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir),
 148  		DB:          nil,
 149  		seq:         nil,
 150  		ready:       make(chan struct{}),
 151  		queryCache:  qc,
 152  		serialCache: NewSerialCache(serialCachePubkeys, serialCacheEventIds),
 153  		querySem:    make(chan struct{}, DefaultMaxConcurrentQueries),
 154  	}
 155  
 156  	// Ensure the data directory exists
 157  	if err = os.MkdirAll(cfg.DataDir, 0755); chk.E(err) {
 158  		return
 159  	}
 160  
 161  	// Also ensure the directory exists using apputil.EnsureDir for any
 162  	// potential subdirectories
 163  	dummyFile := filepath.Join(cfg.DataDir, "dummy.sst")
 164  	if err = apputil.EnsureDir(dummyFile); chk.E(err) {
 165  		return
 166  	}
 167  
 168  	opts := badger.DefaultOptions(d.dataDir)
 169  	// Configure caches based on config to better match workload.
 170  	// Defaults aim for higher hit ratios under read-heavy workloads while remaining safe.
 171  	opts.BlockCacheSize = int64(blockCacheMB * units.Mb)
 172  	opts.IndexCacheSize = int64(indexCacheMB * units.Mb)
 173  	opts.BlockSize = 4 * units.Kb // 4 KB block size
 174  
 175  	// Reduce table sizes to lower cost-per-key in cache
 176  	// Smaller tables mean lower cache cost metric per entry
 177  	opts.BaseTableSize = 8 * units.Mb // 8 MB per table (reduced from 64 MB to lower cache cost)
 178  	opts.MemTableSize = 16 * units.Mb // 16 MB memtable (reduced from 64 MB)
 179  
 180  	// Keep value log files to a moderate size
 181  	opts.ValueLogFileSize = 128 * units.Mb // 128 MB value log files (reduced from 256 MB)
 182  
 183  	// CRITICAL: Keep small inline events in LSM tree, not value log
 184  	// VLogPercentile 0.99 means 99% of values stay in LSM (our optimized inline events!)
 185  	// This dramatically improves read performance for small events
 186  	opts.VLogPercentile = 0.99
 187  
 188  	// Optimize LSM tree structure
 189  	opts.BaseLevelSize = 64 * units.Mb // Increased from default 10 MB for fewer levels
 190  	opts.LevelSizeMultiplier = 10      // Default, good balance
 191  
 192  	opts.CompactL0OnClose = true
 193  	opts.LmaxCompaction = true
 194  
 195  	// Enable compression to reduce cache cost
 196  	// Level 0 disables compression, 1 = fast (~500 MB/s), 3 = default, 9 = best ratio
 197  	if zstdLevel == 0 {
 198  		opts.Compression = options.None
 199  	} else {
 200  		opts.Compression = options.ZSTD
 201  		opts.ZSTDCompressionLevel = zstdLevel
 202  	}
 203  
 204  	// Disable conflict detection for write-heavy relay workloads
 205  	// Nostr events are immutable, no need for transaction conflict checks
 206  	opts.DetectConflicts = false
 207  
 208  	// Performance tuning for high-throughput workloads
 209  	opts.NumCompactors = 8            // Increase from default 4 for faster compaction
 210  	opts.NumLevelZeroTables = 8       // Increase from default 5 to allow more L0 tables before compaction
 211  	opts.NumLevelZeroTablesStall = 16 // Increase from default 15 to reduce write stalls
 212  	opts.NumMemtables = 8             // Increase from default 5 to buffer more writes
 213  	opts.MaxLevels = 7                // Default is 7, keep it
 214  
 215  	opts.Logger = d.Logger
 216  	if d.DB, err = badger.Open(opts); chk.E(err) {
 217  		return
 218  	}
 219  	if d.seq, err = d.DB.GetSequence([]byte("EVENTS"), 1000); chk.E(err) {
 220  		return
 221  	}
 222  	if d.pubkeySeq, err = d.DB.GetSequence([]byte("PUBKEYS"), 1000); chk.E(err) {
 223  		return
 224  	}
 225  	// run code that updates indexes when new indexes have been added and bumps
 226  	// the version so they aren't run again.
 227  	d.RunMigrations()
 228  
 229  	// Start warmup goroutine to signal when database is ready
 230  	go d.warmup()
 231  
 232  	// start up the expiration tag processing and shut down and clean up the
 233  	// database after the context is canceled.
 234  	go func() {
 235  		expirationTicker := time.NewTicker(time.Minute * 10)
 236  		defer expirationTicker.Stop()
 237  		for {
 238  			select {
 239  			case <-expirationTicker.C:
 240  				d.DeleteExpired()
 241  			case <-d.ctx.Done():
 242  				return
 243  			}
 244  		}
 245  	}()
 246  	return
 247  }
 248  
 249  // Path returns the path where the database files are stored.
 250  func (d *D) Path() string { return d.dataDir }
 251  
 252  // Ready returns a channel that closes when the database is ready to serve requests.
 253  // This allows callers to wait for database warmup to complete.
 254  func (d *D) Ready() <-chan struct{} {
 255  	return d.ready
 256  }
 257  
 258  // warmup performs database warmup operations and closes the ready channel when complete.
 259  // Warmup criteria:
 260  // - Wait at least 2 seconds for initial compactions to settle
 261  // - Ensure cache hit ratio is reasonable (if we have metrics available)
 262  func (d *D) warmup() {
 263  	defer close(d.ready)
 264  
 265  	// Give the database time to settle after opening
 266  	// This allows:
 267  	// - Initial compactions to complete
 268  	// - Memory allocations to stabilize
 269  	// - Cache to start warming up
 270  	time.Sleep(2 * time.Second)
 271  
 272  	d.Logger.Infof("database warmup complete, ready to serve requests")
 273  }
 274  
 275  func (d *D) Wipe() (err error) {
 276  	err = errors.New("not implemented")
 277  	return
 278  }
 279  
 280  func (d *D) SetLogLevel(level string) {
 281  	d.Logger.SetLogLevel(lol.GetLogLevel(level))
 282  }
 283  
 284  func (d *D) EventIdsBySerial(start uint64, count int) (
 285  	evs []uint64, err error,
 286  ) {
 287  	err = errors.New("not implemented")
 288  	return
 289  }
 290  
 291  // Init initializes the database with the given path.
 292  func (d *D) Init(path string) (err error) {
 293  	// The database is already initialized in the New function,
 294  	// so we just need to ensure the path is set correctly.
 295  	d.dataDir = path
 296  	return nil
 297  }
 298  
 299  // Sync flushes the database buffers to disk.
 300  func (d *D) Sync() (err error) {
 301  	d.DB.RunValueLogGC(0.5)
 302  	return d.DB.Sync()
 303  }
 304  
 305  // QueryCacheStats returns statistics about the query cache
 306  func (d *D) QueryCacheStats() querycache.CacheStats {
 307  	if d.queryCache == nil {
 308  		return querycache.CacheStats{}
 309  	}
 310  	return d.queryCache.Stats()
 311  }
 312  
 313  // InvalidateQueryCache clears all entries from the query cache
 314  func (d *D) InvalidateQueryCache() {
 315  	if d.queryCache != nil {
 316  		d.queryCache.Invalidate()
 317  	}
 318  }
 319  
 320  // GetCachedJSON retrieves cached marshaled JSON for a filter
 321  // Returns nil, false if not found
 322  func (d *D) GetCachedJSON(f *filter.F) ([][]byte, bool) {
 323  	if d.queryCache == nil {
 324  		return nil, false
 325  	}
 326  	return d.queryCache.Get(f)
 327  }
 328  
 329  // CacheMarshaledJSON stores marshaled JSON event envelopes for a filter
 330  func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {
 331  	if d.queryCache != nil && len(marshaledJSON) > 0 {
 332  		// Store the serialized JSON directly - this is already in envelope format
 333  		// We create a wrapper to store it with the right structure
 334  		d.queryCache.PutJSON(f, marshaledJSON)
 335  	}
 336  }
 337  
 338  // GetCachedEvents retrieves cached events for a filter (without subscription ID)
 339  // Returns nil, false if not found
 340  func (d *D) GetCachedEvents(f *filter.F) (event.S, bool) {
 341  	if d.queryCache == nil {
 342  		return nil, false
 343  	}
 344  	return d.queryCache.GetEvents(f)
 345  }
 346  
 347  // CacheEvents stores events for a filter (without subscription ID)
 348  func (d *D) CacheEvents(f *filter.F, events event.S) {
 349  	if d.queryCache != nil && len(events) > 0 {
 350  		d.queryCache.PutEvents(f, events)
 351  	}
 352  }
 353  
 354  // Close releases resources and closes the database.
 355  func (d *D) Close() (err error) {
 356  	if d.seq != nil {
 357  		if err = d.seq.Release(); chk.E(err) {
 358  			return
 359  		}
 360  	}
 361  	if d.pubkeySeq != nil {
 362  		if err = d.pubkeySeq.Release(); chk.E(err) {
 363  			return
 364  		}
 365  	}
 366  	if d.DB != nil {
 367  		if err = d.DB.Close(); chk.E(err) {
 368  			return
 369  		}
 370  	}
 371  	return
 372  }
 373  
 374  // AcquireQuerySlot acquires a slot from the query semaphore to limit concurrent queries.
 375  // This blocks until a slot is available or the context is cancelled.
 376  // Returns true if slot was acquired, false if context cancelled.
 377  func (d *D) AcquireQuerySlot(ctx context.Context) bool {
 378  	select {
 379  	case d.querySem <- struct{}{}:
 380  		return true
 381  	case <-ctx.Done():
 382  		return false
 383  	}
 384  }
 385  
 386  // ReleaseQuerySlot releases a previously acquired query slot.
 387  func (d *D) ReleaseQuerySlot() {
 388  	select {
 389  	case <-d.querySem:
 390  	default:
 391  		log.W.F("ReleaseQuerySlot called without matching AcquireQuerySlot")
 392  	}
 393  }
 394