store.mx raw

   1  package lattice
   2  
   3  import (
   4  	"fmt"
   5  	"os"
   6  )
   7  
   8  type cacheEntry struct {
   9  	N   Node
  10  	Ref bool
  11  }
  12  
  13  type recEntry struct {
  14  	R   Record
  15  	Ref bool
  16  }
  17  
  18  type PageCache struct {
  19  	F         *os.File
  20  	NodeOff   int64
  21  	RecOff    int64
  22  	NodeCap   uint32
  23  	RecCap    uint32
  24  	Loaded    map[uint32]*cacheEntry
  25  	Dirty     map[uint32]bool
  26  	RecLoaded map[uint32]*recEntry
  27  	RecDirty  map[uint32]bool
  28  	MaxNodes  int
  29  	Clock     []uint32
  30  	ClockHand int
  31  }
  32  
  33  const defaultMaxNodes = 4096
  34  
  35  func newPageCache(f *os.File, nodeCap, recCap uint32) *PageCache {
  36  	return &PageCache{
  37  		F:         f,
  38  		NodeOff:   int64(headerSize),
  39  		RecOff:    int64(headerSize) + int64(nodeCap)*nodeSize,
  40  		NodeCap:   nodeCap,
  41  		RecCap:    recCap,
  42  		Loaded:    map[uint32]*cacheEntry{},
  43  		Dirty:     map[uint32]bool{},
  44  		RecLoaded: map[uint32]*recEntry{},
  45  		RecDirty:  map[uint32]bool{},
  46  		MaxNodes:  defaultMaxNodes,
  47  		Clock:     []uint32{:0:64},
  48  	}
  49  }
  50  
  51  func CreateStore(path string) (*PageCache, error) {
  52  	f, err := os.Create(path)
  53  	if err != nil {
  54  		return nil, err
  55  	}
  56  	nodeCap := uint32(64)
  57  	recCap := uint32(128)
  58  	pc := newPageCache(f, nodeCap, recCap)
  59  
  60  	t := &Tree{
  61  		nCount:    C,
  62  		FreeHead:  NullIndex,
  63  		FreeCount: 0,
  64  	}
  65  	for i := 0; i < C; i++ {
  66  		t.Roots[i] = uint32(i)
  67  	}
  68  	buf := writeHeaderBuf(t, nodeCap, recCap)
  69  	if _, err := f.WriteAt(buf[:], 0); err != nil {
  70  		f.Close()
  71  		return nil, fmt.Errorf("write header: %w", err)
  72  	}
  73  	for i := 0; i < C; i++ {
  74  		var n Node
  75  		initNodeChildren(&n)
  76  		n.SetBranch(Branch(i))
  77  		n.Mult = 1
  78  		nbuf := writeNodeBuf(&n)
  79  		off := pc.NodeOff + int64(i)*nodeSize
  80  		if _, err := f.WriteAt(nbuf[:], off); err != nil {
  81  			f.Close()
  82  			return nil, fmt.Errorf("write root %d: %w", i, err)
  83  		}
  84  	}
  85  	totalSize := pc.RecOff + int64(recCap)*48
  86  	if err := f.Truncate(totalSize); err != nil {
  87  		f.Close()
  88  		return nil, fmt.Errorf("truncate: %w", err)
  89  	}
  90  	if err := f.Sync(); err != nil {
  91  		f.Close()
  92  		return nil, fmt.Errorf("sync: %w", err)
  93  	}
  94  	return pc, nil
  95  }
  96  
  97  func OpenStore(path string) (*PageCache, *iskrHeader, error) {
  98  	f, err := os.OpenFile(path, os.O_RDWR, 0)
  99  	if err != nil {
 100  		return nil, nil, err
 101  	}
 102  	var buf [headerSize]byte
 103  	if _, err := f.ReadAt(buf[:], 0); err != nil {
 104  		f.Close()
 105  		return nil, nil, fmt.Errorf("read header: %w", err)
 106  	}
 107  	hdr, err := readHeaderBuf(buf)
 108  	if err != nil {
 109  		f.Close()
 110  		return nil, nil, err
 111  	}
 112  	pc := newPageCache(f, hdr.nodeCap, hdr.recCap)
 113  	return pc, &hdr, nil
 114  }
 115  
 116  func (pc *PageCache) GetNode(idx uint32) *Node {
 117  	if e, ok := pc.Loaded[idx]; ok {
 118  		e.Ref = true
 119  		return &e.N
 120  	}
 121  	var buf [nodeSize]byte
 122  	off := pc.NodeOff + int64(idx)*nodeSize
 123  	pc.F.ReadAt(buf[:], off)
 124  	n := readNodeBuf(buf)
 125  	e := &cacheEntry{N: n, Ref: true}
 126  	pc.Loaded[idx] = e
 127  	pc.Clock = append(pc.Clock, idx)
 128  	if len(pc.Loaded) > pc.MaxNodes {
 129  		pc.evict()
 130  	}
 131  	return &e.N
 132  }
 133  
 134  func (pc *PageCache) MarkNodeDirty(idx uint32) {
 135  	pc.Dirty[idx] = true
 136  }
 137  
 138  func (pc *PageCache) GetRecord(idx uint32) *Record {
 139  	if e, ok := pc.RecLoaded[idx]; ok {
 140  		e.Ref = true
 141  		return &e.R
 142  	}
 143  	var buf [48]byte
 144  	off := pc.RecOff + int64(idx)*48
 145  	pc.F.ReadAt(buf[:], off)
 146  	r := readRecBuf(buf)
 147  	e := &recEntry{R: r, Ref: true}
 148  	pc.RecLoaded[idx] = e
 149  	return &e.R
 150  }
 151  
 152  func (pc *PageCache) MarkRecDirty(idx uint32) {
 153  	pc.RecDirty[idx] = true
 154  }
 155  
 156  func (pc *PageCache) AllocNode(nCount *uint32) uint32 {
 157  	if *nCount >= pc.NodeCap {
 158  		pc.growNodes()
 159  	}
 160  	idx := *nCount
 161  	*nCount++
 162  	var n Node
 163  	initNodeChildren(&n)
 164  	e := &cacheEntry{N: n, Ref: true}
 165  	pc.Loaded[idx] = e
 166  	pc.Dirty[idx] = true
 167  	pc.Clock = append(pc.Clock, idx)
 168  	return idx
 169  }
 170  
 171  func (pc *PageCache) AllocRecord(rCount *uint32) uint32 {
 172  	if *rCount >= pc.RecCap {
 173  		pc.growRecords()
 174  	}
 175  	idx := *rCount
 176  	*rCount++
 177  	e := &recEntry{R: Record{}, Ref: true}
 178  	pc.RecLoaded[idx] = e
 179  	pc.RecDirty[idx] = true
 180  	return idx
 181  }
 182  
 183  func (pc *PageCache) Flush(t *Tree) error {
 184  	for idx := range pc.Dirty {
 185  		e, ok := pc.Loaded[idx]
 186  		if !ok {
 187  			continue
 188  		}
 189  		buf := writeNodeBuf(&e.N)
 190  		off := pc.NodeOff + int64(idx)*nodeSize
 191  		if _, err := pc.F.WriteAt(buf[:], off); err != nil {
 192  			return fmt.Errorf("write node %d: %w", idx, err)
 193  		}
 194  	}
 195  	for idx := range pc.RecDirty {
 196  		e, ok := pc.RecLoaded[idx]
 197  		if !ok {
 198  			continue
 199  		}
 200  		buf := writeRecBuf(&e.R)
 201  		off := pc.RecOff + int64(idx)*48
 202  		if _, err := pc.F.WriteAt(buf[:], off); err != nil {
 203  			return fmt.Errorf("write record %d: %w", idx, err)
 204  		}
 205  	}
 206  	// write pending journal after used records
 207  	pendOff := pc.RecOff + int64(t.rCount)*48
 208  	for i := range t.Pending {
 209  		var buf [4]byte
 210  		buf[0] = uint8(t.Pending[i].Branch)
 211  		le.PutUint16(buf[1:3], t.Pending[i].Depth)
 212  		buf[3] = t.Pending[i].Factor
 213  		if _, err := pc.F.WriteAt(buf[:], pendOff+int64(i)*4); err != nil {
 214  			return fmt.Errorf("write pending %d: %w", i, err)
 215  		}
 216  	}
 217  	// commit point: write header
 218  	hbuf := writeHeaderBuf(t, pc.NodeCap, pc.RecCap)
 219  	if _, err := pc.F.WriteAt(hbuf[:], 0); err != nil {
 220  		return fmt.Errorf("write header: %w", err)
 221  	}
 222  	if err := pc.F.Sync(); err != nil {
 223  		return fmt.Errorf("sync: %w", err)
 224  	}
 225  	// clear dirty maps
 226  	for k := range pc.Dirty {
 227  		delete(pc.Dirty, k)
 228  	}
 229  	for k := range pc.RecDirty {
 230  		delete(pc.RecDirty, k)
 231  	}
 232  	return nil
 233  }
 234  
 235  func (pc *PageCache) evict() {
 236  	passes := len(pc.Clock) * 2
 237  	evicted := 0
 238  	for i := 0; i < passes && evicted == 0; i++ {
 239  		if len(pc.Clock) == 0 {
 240  			return
 241  		}
 242  		pc.ClockHand = pc.ClockHand % len(pc.Clock)
 243  		idx := pc.Clock[pc.ClockHand]
 244  		e, ok := pc.Loaded[idx]
 245  		if !ok {
 246  			pc.Clock = append(pc.Clock[:pc.ClockHand], pc.Clock[pc.ClockHand+1:]...)
 247  			continue
 248  		}
 249  		if pc.Dirty[idx] {
 250  			pc.ClockHand++
 251  			continue
 252  		}
 253  		if e.Ref {
 254  			e.Ref = false
 255  			pc.ClockHand++
 256  			continue
 257  		}
 258  		delete(pc.Loaded, idx)
 259  		pc.Clock = append(pc.Clock[:pc.ClockHand], pc.Clock[pc.ClockHand+1:]...)
 260  		evicted++
 261  	}
 262  }
 263  
 264  func (pc *PageCache) Close(t *Tree) error {
 265  	if err := pc.Flush(t); err != nil {
 266  		return err
 267  	}
 268  	return pc.F.Close()
 269  }
 270  
 271  // growNodes doubles the node region capacity.
 272  // Records must be moved backward (high addresses first) to avoid corruption
 273  // on crash. A mid-copy crash with backward movement leaves either the old
 274  // layout (header not yet updated) or the new layout (header updated after
 275  // move), both consistent.
 276  func (pc *PageCache) growNodes() {
 277  	// flush dirty records before moving them
 278  	for idx := range pc.RecDirty {
 279  		e, ok := pc.RecLoaded[idx]
 280  		if !ok {
 281  			continue
 282  		}
 283  		buf := writeRecBuf(&e.R)
 284  		off := pc.RecOff + int64(idx)*48
 285  		pc.F.WriteAt(buf[:], off)
 286  	}
 287  	for k := range pc.RecDirty {
 288  		delete(pc.RecDirty, k)
 289  	}
 290  
 291  	newCap := pc.NodeCap * 2
 292  	if newCap < pc.NodeCap+64 {
 293  		newCap = pc.NodeCap + 64
 294  	}
 295  	oldRecOff := pc.RecOff
 296  	newRecOff := int64(headerSize) + int64(newCap)*80
 297  	recBytes := int64(pc.RecCap) * 48
 298  
 299  	// extend file first
 300  	totalSize := newRecOff + recBytes
 301  	pc.F.Truncate(totalSize)
 302  
 303  	// move records backward (high to low source offset -> high to low dest offset)
 304  	var chunk [4096]byte
 305  	remaining := recBytes
 306  	for remaining > 0 {
 307  		n := remaining
 308  		if n > 4096 {
 309  			n = 4096
 310  		}
 311  		remaining -= n
 312  		srcOff := oldRecOff + remaining
 313  		dstOff := newRecOff + remaining
 314  		pc.F.ReadAt(chunk[:n], srcOff)
 315  		pc.F.WriteAt(chunk[:n], dstOff)
 316  	}
 317  
 318  	pc.NodeCap = newCap
 319  	pc.RecOff = newRecOff
 320  
 321  	// invalidate record cache (file offsets changed)
 322  	for k := range pc.RecLoaded {
 323  		delete(pc.RecLoaded, k)
 324  	}
 325  }
 326  
 327  func (pc *PageCache) growRecords() {
 328  	newCap := pc.RecCap * 2
 329  	if newCap < pc.RecCap+128 {
 330  		newCap = pc.RecCap + 128
 331  	}
 332  	totalSize := pc.RecOff + int64(newCap)*48
 333  	pc.F.Truncate(totalSize)
 334  	pc.RecCap = newCap
 335  }
 336