// Package wal provides an append-only segmented value log. // Event data is written sequentially. The serial encoding // (segment_id << 32 | offset) provides O(1) reads. package wal import ( "encoding/binary" "fmt" "os" "path/filepath" "sort" "bytes" ) const ( MaxSegSize int64 = 1 << 32 // 4GB per segment HdrSize = 4 // uint32 length prefix ) // WAL is an append-only segmented value log. type WAL struct { dir string segs []*os.File cur int // current segment index off int64 // write offset in current segment writes int // appends since last sync syncEvery int // sync after this many appends (0 = manual only) } // Open opens or creates a WAL in dir. func Open(dir string) (*WAL, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, err } w := &WAL{dir: dir, syncEvery: 100} entries, err := os.ReadDir(dir) if err != nil { return nil, err } var names []string for _, e := range entries { if bytes.HasPrefix(e.Name(), "seg-") && bytes.HasSuffix(e.Name(), ".dat") { names = append(names, e.Name()) } } sort.Strings(names) for _, name := range names { f, err := os.OpenFile(filepath.Join(dir, name), os.O_RDWR, 0644) if err != nil { return nil, err } w.segs = append(w.segs, f) } if len(w.segs) == 0 { if err := w.newSeg(); err != nil { return nil, err } } else { w.cur = len(w.segs) - 1 info, err := w.segs[w.cur].Stat() if err != nil { return nil, err } w.off = info.Size() // Seek to end so Write() appends rather than overwriting from position 0. if _, err := w.segs[w.cur].Seek(0, 2); err != nil { return nil, err } } return w, nil } func (w *WAL) segPath(n int) string { return filepath.Join(w.dir, string(append([]byte(nil), fmt.Sprintf("seg-%03d.dat", n)...))) } func (w *WAL) newSeg() error { n := len(w.segs) f, err := os.Create(w.segPath(n)) if err != nil { return err } w.segs = append(w.segs, f) w.cur = n w.off = 0 return nil } // Append writes data and returns the serial (segment<<32 | offset). func (w *WAL) Append(data []byte) (uint64, error) { needed := int64(HdrSize) + int64(len(data)) if w.off+needed > MaxSegSize { if err := w.newSeg(); err != nil { return 0, err } } off := w.off var hdr [HdrSize]byte binary.BigEndian.PutUint32(hdr[:], uint32(len(data))) if _, err := w.segs[w.cur].Write(hdr[:]); err != nil { return 0, err } if _, err := w.segs[w.cur].Write(data); err != nil { return 0, err } w.off += needed w.writes++ if w.syncEvery > 0 && w.writes >= w.syncEvery { w.segs[w.cur].Sync() w.writes = 0 } return (uint64(w.cur) << 32) | uint64(off), nil } // MaxEntrySize is the largest valid WAL entry (16 MB). const MaxEntrySize = 16 << 20 // Read returns the data at the given serial. func (w *WAL) Read(ser uint64) ([]byte, error) { seg := int(ser >> 32) off := int64(ser & 0xFFFFFFFF) if seg < 0 || seg >= len(w.segs) { return nil, fmt.Errorf("wal: segment %d out of range (have %d)", seg, len(w.segs)) } var hdr [HdrSize]byte if _, err := w.segs[seg].ReadAt(hdr[:], off); err != nil { return nil, fmt.Errorf("wal: read header at seg %d off %d: %w", seg, off, err) } length := binary.BigEndian.Uint32(hdr[:]) if length > MaxEntrySize { return nil, fmt.Errorf("wal: entry at seg %d off %d has impossible length %d", seg, off, length) } data := []byte{:length} if _, err := w.segs[seg].ReadAt(data, off+int64(HdrSize)); err != nil { return nil, fmt.Errorf("wal: read data at seg %d off %d len %d: %w", seg, off, length, err) } return data, nil } // ForEach iterates all entries in the WAL in order. // The callback receives the serial and raw data. Return false to stop. func (w *WAL) ForEach(fn func(ser uint64, data []byte) bool) error { for seg := 0; seg < len(w.segs); seg++ { info, err := w.segs[seg].Stat() if err != nil { return err } var off int64 for off < info.Size() { var hdr [HdrSize]byte if _, err := w.segs[seg].ReadAt(hdr[:], off); err != nil { break // truncated entry at end of segment } length := binary.BigEndian.Uint32(hdr[:]) if length > MaxEntrySize || off+int64(HdrSize)+int64(length) > info.Size() { break // truncated entry } data := []byte{:length} if _, err := w.segs[seg].ReadAt(data, off+int64(HdrSize)); err != nil { break } ser := (uint64(seg) << 32) | uint64(off) if !fn(ser, data) { return nil } off += int64(HdrSize) + int64(length) } } return nil } // Sync flushes all segment files. func (w *WAL) Sync() error { for _, f := range w.segs { if err := f.Sync(); err != nil { return err } } return nil } // Close syncs and closes all segments. func (w *WAL) Close() error { var firstErr error for _, f := range w.segs { f.Sync() if err := f.Close(); err != nil && firstErr == nil { firstErr = err } } return firstErr }