bulkmeta.mx raw

   1  package iskra
   2  
   3  import (
   4  	"os"
   5  )
   6  
   7  const bulkMetaCacheMax = 262144 // 256K entries * 32 bytes = 8MB
   8  
   9  type BulkMeta struct {
  10  	f     *os.File
  11  	count uint32
  12  	cache map[uint32]*MetaEntry
  13  	dirty map[uint32]bool
  14  }
  15  
  16  func OpenBulkMeta(path string) (*BulkMeta, error) {
  17  	f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
  18  	if err != nil {
  19  		return nil, err
  20  	}
  21  	info, _ := f.Stat()
  22  	count := uint32(info.Size() / metaEntrySize)
  23  	return &BulkMeta{
  24  		f:     f,
  25  		count: count,
  26  		cache: map[uint32]*MetaEntry{},
  27  		dirty: map[uint32]bool{},
  28  	}, nil
  29  }
  30  
  31  func (bm *BulkMeta) Has(idx uint32) bool {
  32  	return idx < bm.count
  33  }
  34  
  35  func (bm *BulkMeta) Get(idx uint32) *MetaEntry {
  36  	if idx >= bm.count {
  37  		return nil
  38  	}
  39  	if e, ok := bm.cache[idx]; ok {
  40  		return e
  41  	}
  42  	var buf [metaEntrySize]byte
  43  	bm.f.ReadAt(buf[:], int64(idx)*metaEntrySize)
  44  	m := decodeBulkEntry(buf)
  45  	bm.cache[idx] = &m
  46  	if len(bm.cache) > bulkMetaCacheMax {
  47  		bm.evict()
  48  	}
  49  	return bm.cache[idx]
  50  }
  51  
  52  func (bm *BulkMeta) Set(idx uint32, m MetaEntry) {
  53  	for bm.count <= idx {
  54  		var zero [metaEntrySize]byte
  55  		bm.f.WriteAt(zero[:], int64(bm.count)*metaEntrySize)
  56  		bm.count++
  57  	}
  58  	cp := m
  59  	bm.cache[idx] = &cp
  60  	bm.dirty[idx] = true
  61  	if len(bm.cache) > bulkMetaCacheMax {
  62  		bm.evict()
  63  	}
  64  }
  65  
  66  func (bm *BulkMeta) Inc(idx uint32) {
  67  	e := bm.Get(idx)
  68  	if e != nil {
  69  		e.Count++
  70  		bm.dirty[idx] = true
  71  	}
  72  }
  73  
  74  func (bm *BulkMeta) Count_() uint32 { return bm.count }
  75  
  76  func (bm *BulkMeta) evict() {
  77  	target := bulkMetaCacheMax / 2
  78  	evicted := 0
  79  	for idx := range bm.cache {
  80  		if evicted >= target {
  81  			break
  82  		}
  83  		if bm.dirty[idx] {
  84  			bm.writeEntry(idx)
  85  			delete(bm.dirty, idx)
  86  		}
  87  		delete(bm.cache, idx)
  88  		evicted++
  89  	}
  90  }
  91  
  92  func (bm *BulkMeta) Flush() error {
  93  	for idx := range bm.dirty {
  94  		bm.writeEntry(idx)
  95  	}
  96  	for k := range bm.dirty {
  97  		delete(bm.dirty, k)
  98  	}
  99  	return bm.f.Sync()
 100  }
 101  
 102  func (bm *BulkMeta) Close() error {
 103  	bm.Flush()
 104  	return bm.f.Close()
 105  }
 106  
 107  func (bm *BulkMeta) writeEntry(idx uint32) {
 108  	e := bm.cache[idx]
 109  	if e == nil {
 110  		return
 111  	}
 112  	buf := encodeBulkEntry(e)
 113  	bm.f.WriteAt(buf[:], int64(idx)*metaEntrySize)
 114  }
 115  
 116  func encodeBulkEntry(m *MetaEntry) [metaEntrySize]byte {
 117  	var buf [metaEntrySize]byte
 118  	sle().PutUint32(buf[0:], m.Count)
 119  	sle().PutUint16(buf[4:], uint16(m.Kind))
 120  	buf[6] = m.StageTag
 121  	buf[7] = m.RefKind
 122  	copy(buf[8:], m.Extra[:])
 123  	return buf
 124  }
 125  
 126  func decodeBulkEntry(buf [metaEntrySize]byte) MetaEntry {
 127  	var m MetaEntry
 128  	m.Count = sle().Uint32(buf[0:])
 129  	m.Kind = NodeKind(sle().Uint16(buf[4:]))
 130  	m.StageTag = buf[6]
 131  	m.RefKind = buf[7]
 132  	copy(m.Extra[:], buf[8:])
 133  	return m
 134  }
 135  
 136  // ExportToSlice reads all entries from disk into a slice for final .meta serialization.
 137  func (bm *BulkMeta) ExportToSlice() []MetaEntry {
 138  	bm.Flush()
 139  	out := []MetaEntry{:int32(bm.count):int32(bm.count)}
 140  	for i := uint32(0); i < bm.count; i++ {
 141  		var buf [metaEntrySize]byte
 142  		bm.f.ReadAt(buf[:], int64(i)*metaEntrySize)
 143  		out[i] = decodeBulkEntry(buf)
 144  	}
 145  	return out
 146  }
 147