store.mx raw
1 package lattice
2
3 import (
4 "fmt"
5 "os"
6 )
7
8 type cacheEntry struct {
9 N Node
10 Ref bool
11 }
12
13 type recEntry struct {
14 R Record
15 Ref bool
16 }
17
18 type PageCache struct {
19 F *os.File
20 NodeOff int64
21 RecOff int64
22 NodeCap uint32
23 RecCap uint32
24 Loaded map[uint32]*cacheEntry
25 Dirty map[uint32]bool
26 RecLoaded map[uint32]*recEntry
27 RecDirty map[uint32]bool
28 MaxNodes int
29 Clock []uint32
30 ClockHand int
31 }
32
33 const defaultMaxNodes = 4096
34
35 func newPageCache(f *os.File, nodeCap, recCap uint32) *PageCache {
36 return &PageCache{
37 F: f,
38 NodeOff: int64(headerSize),
39 RecOff: int64(headerSize) + int64(nodeCap)*nodeSize,
40 NodeCap: nodeCap,
41 RecCap: recCap,
42 Loaded: map[uint32]*cacheEntry{},
43 Dirty: map[uint32]bool{},
44 RecLoaded: map[uint32]*recEntry{},
45 RecDirty: map[uint32]bool{},
46 MaxNodes: defaultMaxNodes,
47 Clock: []uint32{:0:64},
48 }
49 }
50
51 func CreateStore(path string) (*PageCache, error) {
52 f, err := os.Create(path)
53 if err != nil {
54 return nil, err
55 }
56 nodeCap := uint32(64)
57 recCap := uint32(128)
58 pc := newPageCache(f, nodeCap, recCap)
59
60 t := &Tree{
61 nCount: C,
62 FreeHead: NullIndex,
63 FreeCount: 0,
64 }
65 for i := 0; i < C; i++ {
66 t.Roots[i] = uint32(i)
67 }
68 buf := writeHeaderBuf(t, nodeCap, recCap)
69 if _, err := f.WriteAt(buf[:], 0); err != nil {
70 f.Close()
71 return nil, fmt.Errorf("write header: %w", err)
72 }
73 for i := 0; i < C; i++ {
74 var n Node
75 initNodeChildren(&n)
76 n.SetBranch(Branch(i))
77 n.Mult = 1
78 nbuf := writeNodeBuf(&n)
79 off := pc.NodeOff + int64(i)*nodeSize
80 if _, err := f.WriteAt(nbuf[:], off); err != nil {
81 f.Close()
82 return nil, fmt.Errorf("write root %d: %w", i, err)
83 }
84 }
85 totalSize := pc.RecOff + int64(recCap)*48
86 if err := f.Truncate(totalSize); err != nil {
87 f.Close()
88 return nil, fmt.Errorf("truncate: %w", err)
89 }
90 if err := f.Sync(); err != nil {
91 f.Close()
92 return nil, fmt.Errorf("sync: %w", err)
93 }
94 return pc, nil
95 }
96
97 func OpenStore(path string) (*PageCache, *iskrHeader, error) {
98 f, err := os.OpenFile(path, os.O_RDWR, 0)
99 if err != nil {
100 return nil, nil, err
101 }
102 var buf [headerSize]byte
103 if _, err := f.ReadAt(buf[:], 0); err != nil {
104 f.Close()
105 return nil, nil, fmt.Errorf("read header: %w", err)
106 }
107 hdr, err := readHeaderBuf(buf)
108 if err != nil {
109 f.Close()
110 return nil, nil, err
111 }
112 pc := newPageCache(f, hdr.nodeCap, hdr.recCap)
113 return pc, &hdr, nil
114 }
115
116 func (pc *PageCache) GetNode(idx uint32) *Node {
117 if e, ok := pc.Loaded[idx]; ok {
118 e.Ref = true
119 return &e.N
120 }
121 var buf [nodeSize]byte
122 off := pc.NodeOff + int64(idx)*nodeSize
123 pc.F.ReadAt(buf[:], off)
124 n := readNodeBuf(buf)
125 e := &cacheEntry{N: n, Ref: true}
126 pc.Loaded[idx] = e
127 pc.Clock = append(pc.Clock, idx)
128 if len(pc.Loaded) > pc.MaxNodes {
129 pc.evict()
130 }
131 return &e.N
132 }
133
134 func (pc *PageCache) MarkNodeDirty(idx uint32) {
135 pc.Dirty[idx] = true
136 }
137
138 func (pc *PageCache) GetRecord(idx uint32) *Record {
139 if e, ok := pc.RecLoaded[idx]; ok {
140 e.Ref = true
141 return &e.R
142 }
143 var buf [48]byte
144 off := pc.RecOff + int64(idx)*48
145 pc.F.ReadAt(buf[:], off)
146 r := readRecBuf(buf)
147 e := &recEntry{R: r, Ref: true}
148 pc.RecLoaded[idx] = e
149 return &e.R
150 }
151
152 func (pc *PageCache) MarkRecDirty(idx uint32) {
153 pc.RecDirty[idx] = true
154 }
155
156 func (pc *PageCache) AllocNode(nCount *uint32) uint32 {
157 if *nCount >= pc.NodeCap {
158 pc.growNodes()
159 }
160 idx := *nCount
161 *nCount++
162 var n Node
163 initNodeChildren(&n)
164 e := &cacheEntry{N: n, Ref: true}
165 pc.Loaded[idx] = e
166 pc.Dirty[idx] = true
167 pc.Clock = append(pc.Clock, idx)
168 return idx
169 }
170
171 func (pc *PageCache) AllocRecord(rCount *uint32) uint32 {
172 if *rCount >= pc.RecCap {
173 pc.growRecords()
174 }
175 idx := *rCount
176 *rCount++
177 e := &recEntry{R: Record{}, Ref: true}
178 pc.RecLoaded[idx] = e
179 pc.RecDirty[idx] = true
180 return idx
181 }
182
183 func (pc *PageCache) Flush(t *Tree) error {
184 for idx := range pc.Dirty {
185 e, ok := pc.Loaded[idx]
186 if !ok {
187 continue
188 }
189 buf := writeNodeBuf(&e.N)
190 off := pc.NodeOff + int64(idx)*nodeSize
191 if _, err := pc.F.WriteAt(buf[:], off); err != nil {
192 return fmt.Errorf("write node %d: %w", idx, err)
193 }
194 }
195 for idx := range pc.RecDirty {
196 e, ok := pc.RecLoaded[idx]
197 if !ok {
198 continue
199 }
200 buf := writeRecBuf(&e.R)
201 off := pc.RecOff + int64(idx)*48
202 if _, err := pc.F.WriteAt(buf[:], off); err != nil {
203 return fmt.Errorf("write record %d: %w", idx, err)
204 }
205 }
206 // write pending journal after used records
207 pendOff := pc.RecOff + int64(t.rCount)*48
208 for i := range t.Pending {
209 var buf [4]byte
210 buf[0] = uint8(t.Pending[i].Branch)
211 le.PutUint16(buf[1:3], t.Pending[i].Depth)
212 buf[3] = t.Pending[i].Factor
213 if _, err := pc.F.WriteAt(buf[:], pendOff+int64(i)*4); err != nil {
214 return fmt.Errorf("write pending %d: %w", i, err)
215 }
216 }
217 // commit point: write header
218 hbuf := writeHeaderBuf(t, pc.NodeCap, pc.RecCap)
219 if _, err := pc.F.WriteAt(hbuf[:], 0); err != nil {
220 return fmt.Errorf("write header: %w", err)
221 }
222 if err := pc.F.Sync(); err != nil {
223 return fmt.Errorf("sync: %w", err)
224 }
225 // clear dirty maps
226 for k := range pc.Dirty {
227 delete(pc.Dirty, k)
228 }
229 for k := range pc.RecDirty {
230 delete(pc.RecDirty, k)
231 }
232 return nil
233 }
234
235 func (pc *PageCache) evict() {
236 passes := len(pc.Clock) * 2
237 evicted := 0
238 for i := 0; i < passes && evicted == 0; i++ {
239 if len(pc.Clock) == 0 {
240 return
241 }
242 pc.ClockHand = pc.ClockHand % len(pc.Clock)
243 idx := pc.Clock[pc.ClockHand]
244 e, ok := pc.Loaded[idx]
245 if !ok {
246 pc.Clock = append(pc.Clock[:pc.ClockHand], pc.Clock[pc.ClockHand+1:]...)
247 continue
248 }
249 if pc.Dirty[idx] {
250 pc.ClockHand++
251 continue
252 }
253 if e.Ref {
254 e.Ref = false
255 pc.ClockHand++
256 continue
257 }
258 delete(pc.Loaded, idx)
259 pc.Clock = append(pc.Clock[:pc.ClockHand], pc.Clock[pc.ClockHand+1:]...)
260 evicted++
261 }
262 }
263
264 func (pc *PageCache) Close(t *Tree) error {
265 if err := pc.Flush(t); err != nil {
266 return err
267 }
268 return pc.F.Close()
269 }
270
271 // growNodes doubles the node region capacity.
272 // Records must be moved backward (high addresses first) to avoid corruption
273 // on crash. A mid-copy crash with backward movement leaves either the old
274 // layout (header not yet updated) or the new layout (header updated after
275 // move), both consistent.
276 func (pc *PageCache) growNodes() {
277 // flush dirty records before moving them
278 for idx := range pc.RecDirty {
279 e, ok := pc.RecLoaded[idx]
280 if !ok {
281 continue
282 }
283 buf := writeRecBuf(&e.R)
284 off := pc.RecOff + int64(idx)*48
285 pc.F.WriteAt(buf[:], off)
286 }
287 for k := range pc.RecDirty {
288 delete(pc.RecDirty, k)
289 }
290
291 newCap := pc.NodeCap * 2
292 if newCap < pc.NodeCap+64 {
293 newCap = pc.NodeCap + 64
294 }
295 oldRecOff := pc.RecOff
296 newRecOff := int64(headerSize) + int64(newCap)*80
297 recBytes := int64(pc.RecCap) * 48
298
299 // extend file first
300 totalSize := newRecOff + recBytes
301 pc.F.Truncate(totalSize)
302
303 // move records backward (high to low source offset -> high to low dest offset)
304 var chunk [4096]byte
305 remaining := recBytes
306 for remaining > 0 {
307 n := remaining
308 if n > 4096 {
309 n = 4096
310 }
311 remaining -= n
312 srcOff := oldRecOff + remaining
313 dstOff := newRecOff + remaining
314 pc.F.ReadAt(chunk[:n], srcOff)
315 pc.F.WriteAt(chunk[:n], dstOff)
316 }
317
318 pc.NodeCap = newCap
319 pc.RecOff = newRecOff
320
321 // invalidate record cache (file offsets changed)
322 for k := range pc.RecLoaded {
323 delete(pc.RecLoaded, k)
324 }
325 }
326
327 func (pc *PageCache) growRecords() {
328 newCap := pc.RecCap * 2
329 if newCap < pc.RecCap+128 {
330 newCap = pc.RecCap + 128
331 }
332 totalSize := pc.RecOff + int64(newCap)*48
333 pc.F.Truncate(totalSize)
334 pc.RecCap = newCap
335 }
336