package iskra import ( "os" ) const bulkMetaCacheMax = 262144 // 256K entries * 32 bytes = 8MB type BulkMeta struct { f *os.File count uint32 cache map[uint32]*MetaEntry dirty map[uint32]bool } func OpenBulkMeta(path string) (*BulkMeta, error) { f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, err } info, _ := f.Stat() count := uint32(info.Size() / metaEntrySize) return &BulkMeta{ f: f, count: count, cache: map[uint32]*MetaEntry{}, dirty: map[uint32]bool{}, }, nil } func (bm *BulkMeta) Has(idx uint32) bool { return idx < bm.count } func (bm *BulkMeta) Get(idx uint32) *MetaEntry { if idx >= bm.count { return nil } if e, ok := bm.cache[idx]; ok { return e } var buf [metaEntrySize]byte bm.f.ReadAt(buf[:], int64(idx)*metaEntrySize) m := decodeBulkEntry(buf) bm.cache[idx] = &m if len(bm.cache) > bulkMetaCacheMax { bm.evict() } return bm.cache[idx] } func (bm *BulkMeta) Set(idx uint32, m MetaEntry) { for bm.count <= idx { var zero [metaEntrySize]byte bm.f.WriteAt(zero[:], int64(bm.count)*metaEntrySize) bm.count++ } cp := m bm.cache[idx] = &cp bm.dirty[idx] = true if len(bm.cache) > bulkMetaCacheMax { bm.evict() } } func (bm *BulkMeta) Inc(idx uint32) { e := bm.Get(idx) if e != nil { e.Count++ bm.dirty[idx] = true } } func (bm *BulkMeta) Count_() uint32 { return bm.count } func (bm *BulkMeta) evict() { target := bulkMetaCacheMax / 2 evicted := 0 for idx := range bm.cache { if evicted >= target { break } if bm.dirty[idx] { bm.writeEntry(idx) delete(bm.dirty, idx) } delete(bm.cache, idx) evicted++ } } func (bm *BulkMeta) Flush() error { for idx := range bm.dirty { bm.writeEntry(idx) } for k := range bm.dirty { delete(bm.dirty, k) } return bm.f.Sync() } func (bm *BulkMeta) Close() error { bm.Flush() return bm.f.Close() } func (bm *BulkMeta) writeEntry(idx uint32) { e := bm.cache[idx] if e == nil { return } buf := encodeBulkEntry(e) bm.f.WriteAt(buf[:], int64(idx)*metaEntrySize) } func encodeBulkEntry(m *MetaEntry) [metaEntrySize]byte { var buf [metaEntrySize]byte sle().PutUint32(buf[0:], m.Count) sle().PutUint16(buf[4:], uint16(m.Kind)) buf[6] = m.StageTag buf[7] = m.RefKind copy(buf[8:], m.Extra[:]) return buf } func decodeBulkEntry(buf [metaEntrySize]byte) MetaEntry { var m MetaEntry m.Count = sle().Uint32(buf[0:]) m.Kind = NodeKind(sle().Uint16(buf[4:])) m.StageTag = buf[6] m.RefKind = buf[7] copy(m.Extra[:], buf[8:]) return m } // ExportToSlice reads all entries from disk into a slice for final .meta serialization. func (bm *BulkMeta) ExportToSlice() []MetaEntry { bm.Flush() out := []MetaEntry{:int32(bm.count):int32(bm.count)} for i := uint32(0); i < bm.count; i++ { var buf [metaEntrySize]byte bm.f.ReadAt(buf[:], int64(i)*metaEntrySize) out[i] = decodeBulkEntry(buf) } return out }