database.go raw
1 //go:build !(js && wasm)
2
3 package database
4
5 import (
6 "context"
7 "errors"
8 "os"
9 "path/filepath"
10 "time"
11
12 "github.com/dgraph-io/badger/v4"
13 "github.com/dgraph-io/badger/v4/options"
14 "next.orly.dev/pkg/lol"
15 "next.orly.dev/pkg/lol/chk"
16 "next.orly.dev/pkg/lol/log"
17 "next.orly.dev/pkg/database/querycache"
18 "next.orly.dev/pkg/nostr/encoders/event"
19 "next.orly.dev/pkg/nostr/encoders/filter"
20 "next.orly.dev/pkg/utils/apputil"
21 "next.orly.dev/pkg/nostr/utils/units"
22 )
23
24 // DefaultMaxConcurrentQueries limits concurrent database queries to prevent memory exhaustion.
25 // Each query creates Badger iterators that consume significant memory. With many concurrent
26 // connections, unlimited queries can exhaust available memory in seconds.
27 // Set very low (3) because each query can internally create many iterators.
28 const DefaultMaxConcurrentQueries = 3
29
30 // RateLimiterInterface defines the minimal interface for rate limiting during import
31 type RateLimiterInterface interface {
32 IsEnabled() bool
33 Wait(ctx context.Context, opType int) time.Duration
34 }
35
36 // WriteOpType is the operation type constant for write operations
37 const WriteOpType = 1
38
39 // D implements the Database interface using Badger as the storage backend
40 type D struct {
41 ctx context.Context
42 cancel context.CancelFunc
43 dataDir string
44 Logger *logger
45 *badger.DB
46 seq *badger.Sequence
47 pubkeySeq *badger.Sequence // Sequence for pubkey serials
48 ready chan struct{} // Closed when database is ready to serve requests
49 queryCache *querycache.EventCache
50
51 // Serial cache for compact event storage
52 // Caches pubkey and event ID serial mappings for fast compact event decoding
53 serialCache *SerialCache
54
55 // Rate limiter for controlling memory pressure during bulk operations
56 rateLimiter RateLimiterInterface
57
58 // Query semaphore limits concurrent database queries to prevent memory exhaustion.
59 // Each query with iterators consumes significant memory; this prevents OOM under load.
60 querySem chan struct{}
61 }
62
63 // SetRateLimiter sets the rate limiter for controlling memory during import/export
64 func (d *D) SetRateLimiter(limiter RateLimiterInterface) {
65 d.rateLimiter = limiter
66 }
67
68 // Ensure D implements Database interface at compile time
69 var _ Database = (*D)(nil)
70
71 // New creates a new Badger database instance with default configuration.
72 // This is provided for backward compatibility with existing callers.
73 // For full configuration control, use NewWithConfig instead.
74 func New(
75 ctx context.Context, cancel context.CancelFunc, dataDir, logLevel string,
76 ) (
77 d *D, err error,
78 ) {
79 // Create a default config for backward compatibility
80 cfg := &DatabaseConfig{
81 DataDir: dataDir,
82 LogLevel: logLevel,
83 BlockCacheMB: 1024, // Default 1024 MB
84 IndexCacheMB: 512, // Default 512 MB
85 QueryCacheSizeMB: 512, // Default 512 MB
86 QueryCacheMaxAge: 5 * time.Minute, // Default 5 minutes
87 }
88 return NewWithConfig(ctx, cancel, cfg)
89 }
90
91 // NewWithConfig creates a new Badger database instance with full configuration.
92 // This is the preferred method when you have access to DatabaseConfig.
93 func NewWithConfig(
94 ctx context.Context, cancel context.CancelFunc, cfg *DatabaseConfig,
95 ) (
96 d *D, err error,
97 ) {
98 // Apply defaults for zero values (backward compatibility)
99 blockCacheMB := cfg.BlockCacheMB
100 if blockCacheMB == 0 {
101 blockCacheMB = 1024 // Default 1024 MB
102 }
103 indexCacheMB := cfg.IndexCacheMB
104 if indexCacheMB == 0 {
105 indexCacheMB = 512 // Default 512 MB
106 }
107 queryCacheSizeMB := cfg.QueryCacheSizeMB
108 if queryCacheSizeMB == 0 {
109 queryCacheSizeMB = 512 // Default 512 MB
110 }
111 queryCacheMaxAge := cfg.QueryCacheMaxAge
112 if queryCacheMaxAge == 0 {
113 queryCacheMaxAge = 5 * time.Minute // Default 5 minutes
114 }
115
116 // Serial cache configuration for compact event storage
117 serialCachePubkeys := cfg.SerialCachePubkeys
118 if serialCachePubkeys == 0 {
119 serialCachePubkeys = 100000 // Default 100k pubkeys (~3.2MB memory)
120 }
121 serialCacheEventIds := cfg.SerialCacheEventIds
122 if serialCacheEventIds == 0 {
123 serialCacheEventIds = 500000 // Default 500k event IDs (~16MB memory)
124 }
125
126 // ZSTD compression level configuration
127 // Level 0 = disabled, 1 = fast (~500 MB/s), 3 = default, 9 = best ratio
128 zstdLevel := cfg.ZSTDLevel
129 if zstdLevel < 0 {
130 zstdLevel = 0
131 } else if zstdLevel > 19 {
132 zstdLevel = 19 // ZSTD maximum level
133 }
134
135 queryCacheSize := int64(queryCacheSizeMB * 1024 * 1024)
136
137 // Create query cache only if not disabled
138 var qc *querycache.EventCache
139 if !cfg.QueryCacheDisabled {
140 qc = querycache.NewEventCache(queryCacheSize, queryCacheMaxAge)
141 }
142
143 d = &D{
144 ctx: ctx,
145 cancel: cancel,
146 dataDir: cfg.DataDir,
147 Logger: NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir),
148 DB: nil,
149 seq: nil,
150 ready: make(chan struct{}),
151 queryCache: qc,
152 serialCache: NewSerialCache(serialCachePubkeys, serialCacheEventIds),
153 querySem: make(chan struct{}, DefaultMaxConcurrentQueries),
154 }
155
156 // Ensure the data directory exists
157 if err = os.MkdirAll(cfg.DataDir, 0755); chk.E(err) {
158 return
159 }
160
161 // Also ensure the directory exists using apputil.EnsureDir for any
162 // potential subdirectories
163 dummyFile := filepath.Join(cfg.DataDir, "dummy.sst")
164 if err = apputil.EnsureDir(dummyFile); chk.E(err) {
165 return
166 }
167
168 opts := badger.DefaultOptions(d.dataDir)
169 // Configure caches based on config to better match workload.
170 // Defaults aim for higher hit ratios under read-heavy workloads while remaining safe.
171 opts.BlockCacheSize = int64(blockCacheMB * units.Mb)
172 opts.IndexCacheSize = int64(indexCacheMB * units.Mb)
173 opts.BlockSize = 4 * units.Kb // 4 KB block size
174
175 // Reduce table sizes to lower cost-per-key in cache
176 // Smaller tables mean lower cache cost metric per entry
177 opts.BaseTableSize = 8 * units.Mb // 8 MB per table (reduced from 64 MB to lower cache cost)
178 opts.MemTableSize = 16 * units.Mb // 16 MB memtable (reduced from 64 MB)
179
180 // Keep value log files to a moderate size
181 opts.ValueLogFileSize = 128 * units.Mb // 128 MB value log files (reduced from 256 MB)
182
183 // CRITICAL: Keep small inline events in LSM tree, not value log
184 // VLogPercentile 0.99 means 99% of values stay in LSM (our optimized inline events!)
185 // This dramatically improves read performance for small events
186 opts.VLogPercentile = 0.99
187
188 // Optimize LSM tree structure
189 opts.BaseLevelSize = 64 * units.Mb // Increased from default 10 MB for fewer levels
190 opts.LevelSizeMultiplier = 10 // Default, good balance
191
192 opts.CompactL0OnClose = true
193 opts.LmaxCompaction = true
194
195 // Enable compression to reduce cache cost
196 // Level 0 disables compression, 1 = fast (~500 MB/s), 3 = default, 9 = best ratio
197 if zstdLevel == 0 {
198 opts.Compression = options.None
199 } else {
200 opts.Compression = options.ZSTD
201 opts.ZSTDCompressionLevel = zstdLevel
202 }
203
204 // Disable conflict detection for write-heavy relay workloads
205 // Nostr events are immutable, no need for transaction conflict checks
206 opts.DetectConflicts = false
207
208 // Performance tuning for high-throughput workloads
209 opts.NumCompactors = 8 // Increase from default 4 for faster compaction
210 opts.NumLevelZeroTables = 8 // Increase from default 5 to allow more L0 tables before compaction
211 opts.NumLevelZeroTablesStall = 16 // Increase from default 15 to reduce write stalls
212 opts.NumMemtables = 8 // Increase from default 5 to buffer more writes
213 opts.MaxLevels = 7 // Default is 7, keep it
214
215 opts.Logger = d.Logger
216 if d.DB, err = badger.Open(opts); chk.E(err) {
217 return
218 }
219 if d.seq, err = d.DB.GetSequence([]byte("EVENTS"), 1000); chk.E(err) {
220 return
221 }
222 if d.pubkeySeq, err = d.DB.GetSequence([]byte("PUBKEYS"), 1000); chk.E(err) {
223 return
224 }
225 // run code that updates indexes when new indexes have been added and bumps
226 // the version so they aren't run again.
227 d.RunMigrations()
228
229 // Start warmup goroutine to signal when database is ready
230 go d.warmup()
231
232 // start up the expiration tag processing and shut down and clean up the
233 // database after the context is canceled.
234 go func() {
235 expirationTicker := time.NewTicker(time.Minute * 10)
236 defer expirationTicker.Stop()
237 for {
238 select {
239 case <-expirationTicker.C:
240 d.DeleteExpired()
241 case <-d.ctx.Done():
242 return
243 }
244 }
245 }()
246 return
247 }
248
249 // Path returns the path where the database files are stored.
250 func (d *D) Path() string { return d.dataDir }
251
252 // Ready returns a channel that closes when the database is ready to serve requests.
253 // This allows callers to wait for database warmup to complete.
254 func (d *D) Ready() <-chan struct{} {
255 return d.ready
256 }
257
258 // warmup performs database warmup operations and closes the ready channel when complete.
259 // Warmup criteria:
260 // - Wait at least 2 seconds for initial compactions to settle
261 // - Ensure cache hit ratio is reasonable (if we have metrics available)
262 func (d *D) warmup() {
263 defer close(d.ready)
264
265 // Give the database time to settle after opening
266 // This allows:
267 // - Initial compactions to complete
268 // - Memory allocations to stabilize
269 // - Cache to start warming up
270 time.Sleep(2 * time.Second)
271
272 d.Logger.Infof("database warmup complete, ready to serve requests")
273 }
274
275 func (d *D) Wipe() (err error) {
276 err = errors.New("not implemented")
277 return
278 }
279
280 func (d *D) SetLogLevel(level string) {
281 d.Logger.SetLogLevel(lol.GetLogLevel(level))
282 }
283
284 func (d *D) EventIdsBySerial(start uint64, count int) (
285 evs []uint64, err error,
286 ) {
287 err = errors.New("not implemented")
288 return
289 }
290
291 // Init initializes the database with the given path.
292 func (d *D) Init(path string) (err error) {
293 // The database is already initialized in the New function,
294 // so we just need to ensure the path is set correctly.
295 d.dataDir = path
296 return nil
297 }
298
299 // Sync flushes the database buffers to disk.
300 func (d *D) Sync() (err error) {
301 d.DB.RunValueLogGC(0.5)
302 return d.DB.Sync()
303 }
304
305 // QueryCacheStats returns statistics about the query cache
306 func (d *D) QueryCacheStats() querycache.CacheStats {
307 if d.queryCache == nil {
308 return querycache.CacheStats{}
309 }
310 return d.queryCache.Stats()
311 }
312
313 // InvalidateQueryCache clears all entries from the query cache
314 func (d *D) InvalidateQueryCache() {
315 if d.queryCache != nil {
316 d.queryCache.Invalidate()
317 }
318 }
319
320 // GetCachedJSON retrieves cached marshaled JSON for a filter
321 // Returns nil, false if not found
322 func (d *D) GetCachedJSON(f *filter.F) ([][]byte, bool) {
323 if d.queryCache == nil {
324 return nil, false
325 }
326 return d.queryCache.Get(f)
327 }
328
329 // CacheMarshaledJSON stores marshaled JSON event envelopes for a filter
330 func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {
331 if d.queryCache != nil && len(marshaledJSON) > 0 {
332 // Store the serialized JSON directly - this is already in envelope format
333 // We create a wrapper to store it with the right structure
334 d.queryCache.PutJSON(f, marshaledJSON)
335 }
336 }
337
338 // GetCachedEvents retrieves cached events for a filter (without subscription ID)
339 // Returns nil, false if not found
340 func (d *D) GetCachedEvents(f *filter.F) (event.S, bool) {
341 if d.queryCache == nil {
342 return nil, false
343 }
344 return d.queryCache.GetEvents(f)
345 }
346
347 // CacheEvents stores events for a filter (without subscription ID)
348 func (d *D) CacheEvents(f *filter.F, events event.S) {
349 if d.queryCache != nil && len(events) > 0 {
350 d.queryCache.PutEvents(f, events)
351 }
352 }
353
354 // Close releases resources and closes the database.
355 func (d *D) Close() (err error) {
356 if d.seq != nil {
357 if err = d.seq.Release(); chk.E(err) {
358 return
359 }
360 }
361 if d.pubkeySeq != nil {
362 if err = d.pubkeySeq.Release(); chk.E(err) {
363 return
364 }
365 }
366 if d.DB != nil {
367 if err = d.DB.Close(); chk.E(err) {
368 return
369 }
370 }
371 return
372 }
373
374 // AcquireQuerySlot acquires a slot from the query semaphore to limit concurrent queries.
375 // This blocks until a slot is available or the context is cancelled.
376 // Returns true if slot was acquired, false if context cancelled.
377 func (d *D) AcquireQuerySlot(ctx context.Context) bool {
378 select {
379 case d.querySem <- struct{}{}:
380 return true
381 case <-ctx.Done():
382 return false
383 }
384 }
385
386 // ReleaseQuerySlot releases a previously acquired query slot.
387 func (d *D) ReleaseQuerySlot() {
388 select {
389 case <-d.querySem:
390 default:
391 log.W.F("ReleaseQuerySlot called without matching AcquireQuerySlot")
392 }
393 }
394