IMPORT_MEMORY_OPTIMIZATION_PLAN.md raw

Import Memory Optimization Plan

Goal

Constrain import memory utilization to ≤1.5GB to ensure system disk cache flushing completes adequately before continuing.

Test Results (Baseline)

Memory Timeline (Baseline)

TimeMemory (RSS)EventsNotes
Start95 MB0Initial state
+10 min2.7 GB283kWarming up
+20 min4.1 GB475kMemory growing
+30 min5.2 GB720kPeak approaching
+35 min5.9 GB485kNear peak
+40 min5.6 GB1.3MGC recovered memory
+48 min6.4 GB2.1MFinal (42% of RAM)

Root Causes of Memory Growth

1. Badger Internal Caches (configured in database.go)

2. Badger Write Buffers

3. No Backpressure in Import Loop

4. Transaction Overhead

Proposed Mitigations

Phase 1: Reduce Badger Cache Configuration for Import

Add import-specific configuration options in app/config/config.go:

ImportBlockCacheMB  int `env:"ORLY_IMPORT_BLOCK_CACHE_MB" default:"256"`
ImportIndexCacheMB  int `env:"ORLY_IMPORT_INDEX_CACHE_MB" default:"128"`
ImportMemTableSize  int `env:"ORLY_IMPORT_MEMTABLE_SIZE_MB" default:"8"`

For a 1.5GB target:

ComponentSizeNotes
Block cache256 MBReduced from 1024 MB
Index cache128 MBReduced from 512 MB
Memtables4 × 8 MB = 32 MBReduced from 8 × 16 MB
Serial cache~20 MBUnchanged
Working memory~200 MBBuffer for processing
Total~636 MBLeaves headroom for 1.5GB target

Phase 2: Add Batching with Sync to Import Loop

Modify import_utils.go to batch writes and force sync:

const (
    importBatchSize      = 500    // Events per batch
    importSyncInterval   = 2000   // Events before forcing sync
    importMemCheckEvents = 1000   // Events between memory checks
    importMaxMemoryMB    = 1400   // Target max memory (MB)
)

// In processJSONLEventsWithPolicy:
var batchCount int
for scan.Scan() {
    // ... existing event processing ...

    batchCount++
    count++

    // Force sync periodically to flush writes to disk
    if batchCount >= importSyncInterval {
        d.DB.Sync()  // Force write to disk
        batchCount = 0
    }

    // Memory pressure check
    if count % importMemCheckEvents == 0 {
        var m runtime.MemStats
        runtime.ReadMemStats(&m)
        heapMB := m.HeapAlloc / 1024 / 1024

        if heapMB > importMaxMemoryMB {
            // Apply backpressure
            d.DB.Sync()
            runtime.GC()
            debug.FreeOSMemory()

            // Wait for compaction to catch up
            time.Sleep(100 * time.Millisecond)
        }
    }
}

Phase 3: Use Batch Transactions

Instead of one transaction per event, batch multiple events:

// Accumulate events for batch write
const txnBatchSize = 100

type pendingWrite struct {
    idxs       [][]byte
    compactKey []byte
    compactVal []byte
    graphKeys  [][]byte
}

var pendingWrites []pendingWrite

// In the event processing loop
pendingWrites = append(pendingWrites, pw)

if len(pendingWrites) >= txnBatchSize {
    err = d.Update(func(txn *badger.Txn) error {
        for _, pw := range pendingWrites {
            for _, key := range pw.idxs {
                txn.Set(key, nil)
            }
            txn.Set(pw.compactKey, pw.compactVal)
            for _, gk := range pw.graphKeys {
                txn.Set(gk, nil)
            }
        }
        return nil
    })
    pendingWrites = pendingWrites[:0]
}

Phase 4: Implement Adaptive Rate Limiting

type importRateLimiter struct {
    targetMemMB   uint64
    checkInterval int
    baseDelay     time.Duration
    maxDelay      time.Duration
}

func (r *importRateLimiter) maybeThrottle(eventCount int) {
    if eventCount % r.checkInterval != 0 {
        return
    }

    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    heapMB := m.HeapAlloc / 1024 / 1024

    if heapMB > r.targetMemMB {
        // Calculate delay proportional to overage
        overage := float64(heapMB - r.targetMemMB) / float64(r.targetMemMB)
        delay := time.Duration(float64(r.baseDelay) * (1 + overage*10))
        if delay > r.maxDelay {
            delay = r.maxDelay
        }

        // Force GC and wait
        runtime.GC()
        debug.FreeOSMemory()
        time.Sleep(delay)
    }
}

Implementation Order

  1. Quick Win: Add d.DB.Sync() call every N events in import loop
  2. Configuration: Add environment variables for import-specific cache sizes
  3. Batching: Implement batch transactions to reduce overhead
  4. Adaptive: Add memory-aware rate limiting

Expected Results

ApproachMemory TargetThroughput Impact
Current~6 GB peak736 events/sec
Phase 1 (cache reduction)~2 GB~700 events/sec
Phase 2 (sync + GC)~1.5 GB~500 events/sec
Phase 3 (batching)~1.5 GB~600 events/sec
Phase 4 (adaptive)~1.4 GBVariable

Files to Modify

  1. app/config/config.go - Add import-specific config options
  2. pkg/database/database.go - Add import mode with reduced caches
  3. pkg/database/import_utils.go - Add batching, sync, and rate limiting
  4. pkg/database/save-event.go - Add batch save method (optional, for Phase 3)

Environment Variables (Proposed)

# Import-specific cache settings (only apply during import operations)
ORLY_IMPORT_BLOCK_CACHE_MB=256      # Block cache size during import
ORLY_IMPORT_INDEX_CACHE_MB=128      # Index cache size during import
ORLY_IMPORT_MEMTABLE_SIZE_MB=8      # Memtable size during import

# Import rate limiting
ORLY_IMPORT_SYNC_INTERVAL=2000      # Events between forced syncs
ORLY_IMPORT_MAX_MEMORY_MB=1400      # Target max memory during import
ORLY_IMPORT_BATCH_SIZE=100          # Events per transaction batch

Notes

Test Command

To re-run the import test with memory monitoring:

# Start relay with import-optimized settings
export ORLY_DATA_DIR=/tmp/orly-import-test
export ORLY_ACL_MODE=none
export ORLY_PORT=10548
export ORLY_LOG_LEVEL=info
./orly &

# Upload test file
curl -X POST \
  -F "file=@/path/to/wot_reference.jsonl" \
  http://localhost:10548/api/import

# Monitor memory
watch -n 5 'ps -p $(pgrep orly) -o pid,rss,pmem --no-headers'