bulkmeta.mx raw
1 package iskra
2
3 import (
4 "os"
5 )
6
7 const bulkMetaCacheMax = 262144 // 256K entries * 32 bytes = 8MB
8
9 type BulkMeta struct {
10 f *os.File
11 count uint32
12 cache map[uint32]*MetaEntry
13 dirty map[uint32]bool
14 }
15
16 func OpenBulkMeta(path string) (*BulkMeta, error) {
17 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
18 if err != nil {
19 return nil, err
20 }
21 info, _ := f.Stat()
22 count := uint32(info.Size() / metaEntrySize)
23 return &BulkMeta{
24 f: f,
25 count: count,
26 cache: map[uint32]*MetaEntry{},
27 dirty: map[uint32]bool{},
28 }, nil
29 }
30
31 func (bm *BulkMeta) Has(idx uint32) bool {
32 return idx < bm.count
33 }
34
35 func (bm *BulkMeta) Get(idx uint32) *MetaEntry {
36 if idx >= bm.count {
37 return nil
38 }
39 if e, ok := bm.cache[idx]; ok {
40 return e
41 }
42 var buf [metaEntrySize]byte
43 bm.f.ReadAt(buf[:], int64(idx)*metaEntrySize)
44 m := decodeBulkEntry(buf)
45 bm.cache[idx] = &m
46 if len(bm.cache) > bulkMetaCacheMax {
47 bm.evict()
48 }
49 return bm.cache[idx]
50 }
51
52 func (bm *BulkMeta) Set(idx uint32, m MetaEntry) {
53 for bm.count <= idx {
54 var zero [metaEntrySize]byte
55 bm.f.WriteAt(zero[:], int64(bm.count)*metaEntrySize)
56 bm.count++
57 }
58 cp := m
59 bm.cache[idx] = &cp
60 bm.dirty[idx] = true
61 if len(bm.cache) > bulkMetaCacheMax {
62 bm.evict()
63 }
64 }
65
66 func (bm *BulkMeta) Inc(idx uint32) {
67 e := bm.Get(idx)
68 if e != nil {
69 e.Count++
70 bm.dirty[idx] = true
71 }
72 }
73
74 func (bm *BulkMeta) Count_() uint32 { return bm.count }
75
76 func (bm *BulkMeta) evict() {
77 target := bulkMetaCacheMax / 2
78 evicted := 0
79 for idx := range bm.cache {
80 if evicted >= target {
81 break
82 }
83 if bm.dirty[idx] {
84 bm.writeEntry(idx)
85 delete(bm.dirty, idx)
86 }
87 delete(bm.cache, idx)
88 evicted++
89 }
90 }
91
92 func (bm *BulkMeta) Flush() error {
93 for idx := range bm.dirty {
94 bm.writeEntry(idx)
95 }
96 for k := range bm.dirty {
97 delete(bm.dirty, k)
98 }
99 return bm.f.Sync()
100 }
101
102 func (bm *BulkMeta) Close() error {
103 bm.Flush()
104 return bm.f.Close()
105 }
106
107 func (bm *BulkMeta) writeEntry(idx uint32) {
108 e := bm.cache[idx]
109 if e == nil {
110 return
111 }
112 buf := encodeBulkEntry(e)
113 bm.f.WriteAt(buf[:], int64(idx)*metaEntrySize)
114 }
115
116 func encodeBulkEntry(m *MetaEntry) [metaEntrySize]byte {
117 var buf [metaEntrySize]byte
118 sle().PutUint32(buf[0:], m.Count)
119 sle().PutUint16(buf[4:], uint16(m.Kind))
120 buf[6] = m.StageTag
121 buf[7] = m.RefKind
122 copy(buf[8:], m.Extra[:])
123 return buf
124 }
125
126 func decodeBulkEntry(buf [metaEntrySize]byte) MetaEntry {
127 var m MetaEntry
128 m.Count = sle().Uint32(buf[0:])
129 m.Kind = NodeKind(sle().Uint16(buf[4:]))
130 m.StageTag = buf[6]
131 m.RefKind = buf[7]
132 copy(m.Extra[:], buf[8:])
133 return m
134 }
135
136 // ExportToSlice reads all entries from disk into a slice for final .meta serialization.
137 func (bm *BulkMeta) ExportToSlice() []MetaEntry {
138 bm.Flush()
139 out := []MetaEntry{:int32(bm.count):int32(bm.count)}
140 for i := uint32(0); i < bm.count; i++ {
141 var buf [metaEntrySize]byte
142 bm.f.ReadAt(buf[:], int64(i)*metaEntrySize)
143 out[i] = decodeBulkEntry(buf)
144 }
145 return out
146 }
147