Constrain import memory utilization to ≤1.5GB to ensure system disk cache flushing completes adequately before continuing.
wot_reference.jsonl (2.7 GB, ~2.16 million events)| Time | Memory (RSS) | Events | Notes |
|---|---|---|---|
| Start | 95 MB | 0 | Initial state |
| +10 min | 2.7 GB | 283k | Warming up |
| +20 min | 4.1 GB | 475k | Memory growing |
| +30 min | 5.2 GB | 720k | Peak approaching |
| +35 min | 5.9 GB | 485k | Near peak |
| +40 min | 5.6 GB | 1.3M | GC recovered memory |
| +48 min | 6.4 GB | 2.1M | Final (42% of RAM) |
database.go)debug.FreeOSMemory() only runs every 5 secondsSaveEvent creates a transactionAdd import-specific configuration options in app/config/config.go:
ImportBlockCacheMB int `env:"ORLY_IMPORT_BLOCK_CACHE_MB" default:"256"`
ImportIndexCacheMB int `env:"ORLY_IMPORT_INDEX_CACHE_MB" default:"128"`
ImportMemTableSize int `env:"ORLY_IMPORT_MEMTABLE_SIZE_MB" default:"8"`
For a 1.5GB target:
| Component | Size | Notes |
|---|---|---|
| Block cache | 256 MB | Reduced from 1024 MB |
| Index cache | 128 MB | Reduced from 512 MB |
| Memtables | 4 × 8 MB = 32 MB | Reduced from 8 × 16 MB |
| Serial cache | ~20 MB | Unchanged |
| Working memory | ~200 MB | Buffer for processing |
| Total | ~636 MB | Leaves headroom for 1.5GB target |
Modify import_utils.go to batch writes and force sync:
const (
importBatchSize = 500 // Events per batch
importSyncInterval = 2000 // Events before forcing sync
importMemCheckEvents = 1000 // Events between memory checks
importMaxMemoryMB = 1400 // Target max memory (MB)
)
// In processJSONLEventsWithPolicy:
var batchCount int
for scan.Scan() {
// ... existing event processing ...
batchCount++
count++
// Force sync periodically to flush writes to disk
if batchCount >= importSyncInterval {
d.DB.Sync() // Force write to disk
batchCount = 0
}
// Memory pressure check
if count % importMemCheckEvents == 0 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
heapMB := m.HeapAlloc / 1024 / 1024
if heapMB > importMaxMemoryMB {
// Apply backpressure
d.DB.Sync()
runtime.GC()
debug.FreeOSMemory()
// Wait for compaction to catch up
time.Sleep(100 * time.Millisecond)
}
}
}
Instead of one transaction per event, batch multiple events:
// Accumulate events for batch write
const txnBatchSize = 100
type pendingWrite struct {
idxs [][]byte
compactKey []byte
compactVal []byte
graphKeys [][]byte
}
var pendingWrites []pendingWrite
// In the event processing loop
pendingWrites = append(pendingWrites, pw)
if len(pendingWrites) >= txnBatchSize {
err = d.Update(func(txn *badger.Txn) error {
for _, pw := range pendingWrites {
for _, key := range pw.idxs {
txn.Set(key, nil)
}
txn.Set(pw.compactKey, pw.compactVal)
for _, gk := range pw.graphKeys {
txn.Set(gk, nil)
}
}
return nil
})
pendingWrites = pendingWrites[:0]
}
type importRateLimiter struct {
targetMemMB uint64
checkInterval int
baseDelay time.Duration
maxDelay time.Duration
}
func (r *importRateLimiter) maybeThrottle(eventCount int) {
if eventCount % r.checkInterval != 0 {
return
}
var m runtime.MemStats
runtime.ReadMemStats(&m)
heapMB := m.HeapAlloc / 1024 / 1024
if heapMB > r.targetMemMB {
// Calculate delay proportional to overage
overage := float64(heapMB - r.targetMemMB) / float64(r.targetMemMB)
delay := time.Duration(float64(r.baseDelay) * (1 + overage*10))
if delay > r.maxDelay {
delay = r.maxDelay
}
// Force GC and wait
runtime.GC()
debug.FreeOSMemory()
time.Sleep(delay)
}
}
d.DB.Sync() call every N events in import loop| Approach | Memory Target | Throughput Impact |
|---|---|---|
| Current | ~6 GB peak | 736 events/sec |
| Phase 1 (cache reduction) | ~2 GB | ~700 events/sec |
| Phase 2 (sync + GC) | ~1.5 GB | ~500 events/sec |
| Phase 3 (batching) | ~1.5 GB | ~600 events/sec |
| Phase 4 (adaptive) | ~1.4 GB | Variable |
app/config/config.go - Add import-specific config optionspkg/database/database.go - Add import mode with reduced cachespkg/database/import_utils.go - Add batching, sync, and rate limitingpkg/database/save-event.go - Add batch save method (optional, for Phase 3)# Import-specific cache settings (only apply during import operations)
ORLY_IMPORT_BLOCK_CACHE_MB=256 # Block cache size during import
ORLY_IMPORT_INDEX_CACHE_MB=128 # Index cache size during import
ORLY_IMPORT_MEMTABLE_SIZE_MB=8 # Memtable size during import
# Import rate limiting
ORLY_IMPORT_SYNC_INTERVAL=2000 # Events between forced syncs
ORLY_IMPORT_MAX_MEMORY_MB=1400 # Target max memory during import
ORLY_IMPORT_BATCH_SIZE=100 # Events per transaction batch
SaveEventTo re-run the import test with memory monitoring:
# Start relay with import-optimized settings
export ORLY_DATA_DIR=/tmp/orly-import-test
export ORLY_ACL_MODE=none
export ORLY_PORT=10548
export ORLY_LOG_LEVEL=info
./orly &
# Upload test file
curl -X POST \
-F "file=@/path/to/wot_reference.jsonl" \
http://localhost:10548/api/import
# Monitor memory
watch -n 5 'ps -p $(pgrep orly) -o pid,rss,pmem --no-headers'