wal.mx raw

   1  // Package wal provides an append-only segmented value log.
   2  // Event data is written sequentially. The serial encoding
   3  // (segment_id << 32 | offset) provides O(1) reads.
   4  package wal
   5  
   6  import (
   7  	"encoding/binary"
   8  	"fmt"
   9  	"os"
  10  	"path/filepath"
  11  	"sort"
  12  	"bytes"
  13  )
  14  
  15  const (
  16  	MaxSegSize int64 = 1 << 32 // 4GB per segment
  17  	HdrSize          = 4       // uint32 length prefix
  18  )
  19  
  20  // WAL is an append-only segmented value log.
  21  type WAL struct {
  22  	dir      string
  23  	segs     []*os.File
  24  	cur      int   // current segment index
  25  	off      int64 // write offset in current segment
  26  	writes   int   // appends since last sync
  27  	syncEvery int  // sync after this many appends (0 = manual only)
  28  }
  29  
  30  // Open opens or creates a WAL in dir.
  31  func Open(dir string) (*WAL, error) {
  32  	if err := os.MkdirAll(dir, 0755); err != nil {
  33  		return nil, err
  34  	}
  35  	w := &WAL{dir: dir, syncEvery: 100}
  36  	entries, err := os.ReadDir(dir)
  37  	if err != nil {
  38  		return nil, err
  39  	}
  40  	var names []string
  41  	for _, e := range entries {
  42  		if bytes.HasPrefix(e.Name(), "seg-") && bytes.HasSuffix(e.Name(), ".dat") {
  43  			names = append(names, e.Name())
  44  		}
  45  	}
  46  	sort.Strings(names)
  47  	for _, name := range names {
  48  		f, err := os.OpenFile(filepath.Join(dir, name), os.O_RDWR, 0644)
  49  		if err != nil {
  50  			return nil, err
  51  		}
  52  		w.segs = append(w.segs, f)
  53  	}
  54  	if len(w.segs) == 0 {
  55  		if err := w.newSeg(); err != nil {
  56  			return nil, err
  57  		}
  58  	} else {
  59  		w.cur = len(w.segs) - 1
  60  		info, err := w.segs[w.cur].Stat()
  61  		if err != nil {
  62  			return nil, err
  63  		}
  64  		w.off = info.Size()
  65  		// Seek to end so Write() appends rather than overwriting from position 0.
  66  		if _, err := w.segs[w.cur].Seek(0, 2); err != nil {
  67  			return nil, err
  68  		}
  69  	}
  70  	return w, nil
  71  }
  72  
  73  func (w *WAL) segPath(n int) string {
  74  	return filepath.Join(w.dir, string(append([]byte(nil), fmt.Sprintf("seg-%03d.dat", n)...)))
  75  }
  76  
  77  func (w *WAL) newSeg() error {
  78  	n := len(w.segs)
  79  	f, err := os.Create(w.segPath(n))
  80  	if err != nil {
  81  		return err
  82  	}
  83  	w.segs = append(w.segs, f)
  84  	w.cur = n
  85  	w.off = 0
  86  	return nil
  87  }
  88  
  89  // Append writes data and returns the serial (segment<<32 | offset).
  90  func (w *WAL) Append(data []byte) (uint64, error) {
  91  	needed := int64(HdrSize) + int64(len(data))
  92  	if w.off+needed > MaxSegSize {
  93  		if err := w.newSeg(); err != nil {
  94  			return 0, err
  95  		}
  96  	}
  97  	off := w.off
  98  	var hdr [HdrSize]byte
  99  	binary.BigEndian.PutUint32(hdr[:], uint32(len(data)))
 100  	if _, err := w.segs[w.cur].Write(hdr[:]); err != nil {
 101  		return 0, err
 102  	}
 103  	if _, err := w.segs[w.cur].Write(data); err != nil {
 104  		return 0, err
 105  	}
 106  	w.off += needed
 107  	w.writes++
 108  	if w.syncEvery > 0 && w.writes >= w.syncEvery {
 109  		w.segs[w.cur].Sync()
 110  		w.writes = 0
 111  	}
 112  	return (uint64(w.cur) << 32) | uint64(off), nil
 113  }
 114  
 115  // MaxEntrySize is the largest valid WAL entry (16 MB).
 116  const MaxEntrySize = 16 << 20
 117  
 118  // Read returns the data at the given serial.
 119  func (w *WAL) Read(ser uint64) ([]byte, error) {
 120  	seg := int(ser >> 32)
 121  	off := int64(ser & 0xFFFFFFFF)
 122  	if seg < 0 || seg >= len(w.segs) {
 123  		return nil, fmt.Errorf("wal: segment %d out of range (have %d)", seg, len(w.segs))
 124  	}
 125  	var hdr [HdrSize]byte
 126  	if _, err := w.segs[seg].ReadAt(hdr[:], off); err != nil {
 127  		return nil, fmt.Errorf("wal: read header at seg %d off %d: %w", seg, off, err)
 128  	}
 129  	length := binary.BigEndian.Uint32(hdr[:])
 130  	if length > MaxEntrySize {
 131  		return nil, fmt.Errorf("wal: entry at seg %d off %d has impossible length %d", seg, off, length)
 132  	}
 133  	data := []byte{:length}
 134  	if _, err := w.segs[seg].ReadAt(data, off+int64(HdrSize)); err != nil {
 135  		return nil, fmt.Errorf("wal: read data at seg %d off %d len %d: %w", seg, off, length, err)
 136  	}
 137  	return data, nil
 138  }
 139  
 140  // ForEach iterates all entries in the WAL in order.
 141  // The callback receives the serial and raw data. Return false to stop.
 142  func (w *WAL) ForEach(fn func(ser uint64, data []byte) bool) error {
 143  	for seg := 0; seg < len(w.segs); seg++ {
 144  		info, err := w.segs[seg].Stat()
 145  		if err != nil {
 146  			return err
 147  		}
 148  		var off int64
 149  		for off < info.Size() {
 150  			var hdr [HdrSize]byte
 151  			if _, err := w.segs[seg].ReadAt(hdr[:], off); err != nil {
 152  				break // truncated entry at end of segment
 153  			}
 154  			length := binary.BigEndian.Uint32(hdr[:])
 155  			if length > MaxEntrySize || off+int64(HdrSize)+int64(length) > info.Size() {
 156  				break // truncated entry
 157  			}
 158  			data := []byte{:length}
 159  			if _, err := w.segs[seg].ReadAt(data, off+int64(HdrSize)); err != nil {
 160  				break
 161  			}
 162  			ser := (uint64(seg) << 32) | uint64(off)
 163  			if !fn(ser, data) {
 164  				return nil
 165  			}
 166  			off += int64(HdrSize) + int64(length)
 167  		}
 168  	}
 169  	return nil
 170  }
 171  
 172  // Sync flushes all segment files.
 173  func (w *WAL) Sync() error {
 174  	for _, f := range w.segs {
 175  		if err := f.Sync(); err != nil {
 176  			return err
 177  		}
 178  	}
 179  	return nil
 180  }
 181  
 182  // Close syncs and closes all segments.
 183  func (w *WAL) Close() error {
 184  	var firstErr error
 185  	for _, f := range w.segs {
 186  		f.Sync()
 187  		if err := f.Close(); err != nil && firstErr == nil {
 188  			firstErr = err
 189  		}
 190  	}
 191  	return firstErr
 192  }
 193