wal.mx raw
1 // Package wal provides an append-only segmented value log.
2 // Event data is written sequentially. The serial encoding
3 // (segment_id << 32 | offset) provides O(1) reads.
4 package wal
5
6 import (
7 "encoding/binary"
8 "fmt"
9 "os"
10 "path/filepath"
11 "sort"
12 "bytes"
13 )
14
15 const (
16 MaxSegSize int64 = 1 << 32 // 4GB per segment
17 HdrSize = 4 // uint32 length prefix
18 )
19
20 // WAL is an append-only segmented value log.
21 type WAL struct {
22 dir string
23 segs []*os.File
24 cur int // current segment index
25 off int64 // write offset in current segment
26 writes int // appends since last sync
27 syncEvery int // sync after this many appends (0 = manual only)
28 }
29
30 // Open opens or creates a WAL in dir.
31 func Open(dir string) (*WAL, error) {
32 if err := os.MkdirAll(dir, 0755); err != nil {
33 return nil, err
34 }
35 w := &WAL{dir: dir, syncEvery: 100}
36 entries, err := os.ReadDir(dir)
37 if err != nil {
38 return nil, err
39 }
40 var names []string
41 for _, e := range entries {
42 if bytes.HasPrefix(e.Name(), "seg-") && bytes.HasSuffix(e.Name(), ".dat") {
43 names = append(names, e.Name())
44 }
45 }
46 sort.Strings(names)
47 for _, name := range names {
48 f, err := os.OpenFile(filepath.Join(dir, name), os.O_RDWR, 0644)
49 if err != nil {
50 return nil, err
51 }
52 w.segs = append(w.segs, f)
53 }
54 if len(w.segs) == 0 {
55 if err := w.newSeg(); err != nil {
56 return nil, err
57 }
58 } else {
59 w.cur = len(w.segs) - 1
60 info, err := w.segs[w.cur].Stat()
61 if err != nil {
62 return nil, err
63 }
64 w.off = info.Size()
65 // Seek to end so Write() appends rather than overwriting from position 0.
66 if _, err := w.segs[w.cur].Seek(0, 2); err != nil {
67 return nil, err
68 }
69 }
70 return w, nil
71 }
72
73 func (w *WAL) segPath(n int) string {
74 return filepath.Join(w.dir, string(append([]byte(nil), fmt.Sprintf("seg-%03d.dat", n)...)))
75 }
76
77 func (w *WAL) newSeg() error {
78 n := len(w.segs)
79 f, err := os.Create(w.segPath(n))
80 if err != nil {
81 return err
82 }
83 w.segs = append(w.segs, f)
84 w.cur = n
85 w.off = 0
86 return nil
87 }
88
89 // Append writes data and returns the serial (segment<<32 | offset).
90 func (w *WAL) Append(data []byte) (uint64, error) {
91 needed := int64(HdrSize) + int64(len(data))
92 if w.off+needed > MaxSegSize {
93 if err := w.newSeg(); err != nil {
94 return 0, err
95 }
96 }
97 off := w.off
98 var hdr [HdrSize]byte
99 binary.BigEndian.PutUint32(hdr[:], uint32(len(data)))
100 if _, err := w.segs[w.cur].Write(hdr[:]); err != nil {
101 return 0, err
102 }
103 if _, err := w.segs[w.cur].Write(data); err != nil {
104 return 0, err
105 }
106 w.off += needed
107 w.writes++
108 if w.syncEvery > 0 && w.writes >= w.syncEvery {
109 w.segs[w.cur].Sync()
110 w.writes = 0
111 }
112 return (uint64(w.cur) << 32) | uint64(off), nil
113 }
114
115 // MaxEntrySize is the largest valid WAL entry (16 MB).
116 const MaxEntrySize = 16 << 20
117
118 // Read returns the data at the given serial.
119 func (w *WAL) Read(ser uint64) ([]byte, error) {
120 seg := int(ser >> 32)
121 off := int64(ser & 0xFFFFFFFF)
122 if seg < 0 || seg >= len(w.segs) {
123 return nil, fmt.Errorf("wal: segment %d out of range (have %d)", seg, len(w.segs))
124 }
125 var hdr [HdrSize]byte
126 if _, err := w.segs[seg].ReadAt(hdr[:], off); err != nil {
127 return nil, fmt.Errorf("wal: read header at seg %d off %d: %w", seg, off, err)
128 }
129 length := binary.BigEndian.Uint32(hdr[:])
130 if length > MaxEntrySize {
131 return nil, fmt.Errorf("wal: entry at seg %d off %d has impossible length %d", seg, off, length)
132 }
133 data := []byte{:length}
134 if _, err := w.segs[seg].ReadAt(data, off+int64(HdrSize)); err != nil {
135 return nil, fmt.Errorf("wal: read data at seg %d off %d len %d: %w", seg, off, length, err)
136 }
137 return data, nil
138 }
139
140 // ForEach iterates all entries in the WAL in order.
141 // The callback receives the serial and raw data. Return false to stop.
142 func (w *WAL) ForEach(fn func(ser uint64, data []byte) bool) error {
143 for seg := 0; seg < len(w.segs); seg++ {
144 info, err := w.segs[seg].Stat()
145 if err != nil {
146 return err
147 }
148 var off int64
149 for off < info.Size() {
150 var hdr [HdrSize]byte
151 if _, err := w.segs[seg].ReadAt(hdr[:], off); err != nil {
152 break // truncated entry at end of segment
153 }
154 length := binary.BigEndian.Uint32(hdr[:])
155 if length > MaxEntrySize || off+int64(HdrSize)+int64(length) > info.Size() {
156 break // truncated entry
157 }
158 data := []byte{:length}
159 if _, err := w.segs[seg].ReadAt(data, off+int64(HdrSize)); err != nil {
160 break
161 }
162 ser := (uint64(seg) << 32) | uint64(off)
163 if !fn(ser, data) {
164 return nil
165 }
166 off += int64(HdrSize) + int64(length)
167 }
168 }
169 return nil
170 }
171
172 // Sync flushes all segment files.
173 func (w *WAL) Sync() error {
174 for _, f := range w.segs {
175 if err := f.Sync(); err != nil {
176 return err
177 }
178 }
179 return nil
180 }
181
182 // Close syncs and closes all segments.
183 func (w *WAL) Close() error {
184 var firstErr error
185 for _, f := range w.segs {
186 f.Sync()
187 if err := f.Close(); err != nil && firstErr == nil {
188 firstErr = err
189 }
190 }
191 return firstErr
192 }
193