sorted.mx raw
1 // Package sorted provides a sorted flat-file index with binary search.
2 // Records are fixed-width. Inserts batch into a write buffer that is
3 // periodically merge-sorted into the main file on Flush.
4 package sorted
5
6 import (
7 "bytes"
8 "os"
9 "sort"
10 )
11
12 const scanChunk = 256 // records per buffered read
13
14 // File is a sorted flat-file index.
15 type File struct {
16 path string
17 recLen int // total record length in bytes
18 cmpLen int // bytes to compare for ordering/search
19 f *os.File
20 count int64 // records on disk
21 buf []byte // write buffer (concatenated records)
22 bufN int // records in buffer
23 sorted bool
24 del [][]byte // deleted keys (cmpLen bytes each)
25 }
26
27 // Open opens or creates a sorted index file.
28 func Open(path string, recLen, cmpLen int) (*File, error) {
29 os.Remove(path + ".tmp") // clean up interrupted flush
30 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
31 if err != nil {
32 return nil, err
33 }
34 info, err := f.Stat()
35 if err != nil {
36 f.Close()
37 return nil, err
38 }
39 return &File{
40 path: path,
41 recLen: recLen,
42 cmpLen: cmpLen,
43 f: f,
44 count: info.Size() / int64(recLen),
45 sorted: true,
46 }, nil
47 }
48
49 // Count returns total records (disk + buffer).
50 func (s *File) Count() int64 { return s.count + int64(s.bufN) }
51
52 // Clear removes all records (disk and buffer).
53 func (s *File) Clear() error {
54 s.buf = s.buf[:0]
55 s.bufN = 0
56 s.del = nil
57 s.sorted = true
58 if err := s.f.Truncate(0); err != nil {
59 return err
60 }
61 s.count = 0
62 return nil
63 }
64
65 // Put adds a record to the write buffer.
66 func (s *File) Put(rec []byte) {
67 s.buf = append(s.buf, rec...)
68 s.bufN++
69 s.sorted = false
70 }
71
72 // Delete marks a key for deletion. The record is excluded from Scan results
73 // and removed on the next Flush.
74 func (s *File) Delete(key []byte) {
75 k := []byte{:s.cmpLen}
76 copy(k, key[:s.cmpLen])
77 s.del = append(s.del, k)
78 }
79
80 func (s *File) isDeleted(rec []byte) bool {
81 for _, d := range s.del {
82 if bytes.Equal(rec[:s.cmpLen], d) {
83 return true
84 }
85 }
86 return false
87 }
88
89 func (s *File) ensureSorted() {
90 if s.sorted || s.bufN <= 1 {
91 s.sorted = true
92 return
93 }
94 tmp := []byte{:s.recLen}
95 rs := &recSorter{s.buf, s.recLen, s.cmpLen, tmp}
96 sort.Sort(rs)
97 s.sorted = true
98 }
99
100 func (s *File) readAt(dst []byte, idx int64) {
101 s.f.ReadAt(dst[:s.recLen], idx*int64(s.recLen))
102 }
103
104 // Get returns the first record matching key (cmpLen bytes).
105 func (s *File) Get(key []byte) ([]byte, bool) {
106 rec := []byte{:s.recLen}
107 lo, hi := int64(0), s.count-1
108 for lo <= hi {
109 mid := lo + (hi-lo)/2
110 s.readAt(rec, mid)
111 cmp := bytes.Compare(rec[:s.cmpLen], key[:s.cmpLen])
112 if cmp < 0 {
113 lo = mid + 1
114 } else if cmp > 0 {
115 hi = mid - 1
116 } else {
117 return rec, true
118 }
119 }
120 s.ensureSorted()
121 for i := 0; i < s.bufN; i++ {
122 off := i * s.recLen
123 if bytes.Equal(s.buf[off:off+s.cmpLen], key[:s.cmpLen]) {
124 result := []byte{:s.recLen}
125 copy(result, s.buf[off:off+s.recLen])
126 return result, true
127 }
128 }
129 return nil, false
130 }
131
132 // Last returns the record with the largest key.
133 func (s *File) Last() ([]byte, bool) {
134 s.ensureSorted()
135 var lastFile, lastBuf []byte
136 if s.count > 0 {
137 lastFile = []byte{:s.recLen}
138 s.readAt(lastFile, s.count-1)
139 }
140 if s.bufN > 0 {
141 off := (s.bufN - 1) * s.recLen
142 lastBuf = s.buf[off : off+s.recLen]
143 }
144 if lastFile == nil && lastBuf == nil {
145 return nil, false
146 }
147 if lastFile == nil {
148 result := []byte{:s.recLen}
149 copy(result, lastBuf)
150 return result, true
151 }
152 if lastBuf == nil {
153 return lastFile, true
154 }
155 if bytes.Compare(lastBuf[:s.cmpLen], lastFile[:s.cmpLen]) >= 0 {
156 result := []byte{:s.recLen}
157 copy(result, lastBuf)
158 return result, true
159 }
160 return lastFile, true
161 }
162
163 func (s *File) lowerBound(key []byte) int64 {
164 lo, hi := int64(0), s.count
165 rec := []byte{:s.recLen}
166 for lo < hi {
167 mid := lo + (hi-lo)/2
168 s.readAt(rec, mid)
169 if bytes.Compare(rec[:s.cmpLen], key) < 0 {
170 lo = mid + 1
171 } else {
172 hi = mid
173 }
174 }
175 return lo
176 }
177
178 func (s *File) lowerBoundBuf(key []byte) int {
179 lo, hi := 0, s.bufN
180 for lo < hi {
181 mid := lo + (hi-lo)/2
182 off := mid * s.recLen
183 if bytes.Compare(s.buf[off:off+s.cmpLen], key) < 0 {
184 lo = mid + 1
185 } else {
186 hi = mid
187 }
188 }
189 return lo
190 }
191
192 // Scan iterates records where start <= key <= end.
193 // fn receives each record; return false to stop.
194 func (s *File) Scan(start, end []byte, fn func(rec []byte) bool) {
195 s.ensureSorted()
196 fi := s.lowerBound(start)
197 bi := s.lowerBoundBuf(start)
198
199 // Buffered reads from the on-disk file.
200 chunk := []byte{:scanChunk*s.recLen}
201 chunkStart := int64(-1) // first index in chunk
202 chunkEnd := int64(-1) // one past last index
203
204 getFile := func(idx int64) []byte {
205 if idx < chunkStart || idx >= chunkEnd {
206 chunkStart = idx
207 n := s.count - idx
208 if n > scanChunk {
209 n = scanChunk
210 }
211 s.f.ReadAt(chunk[:n*int64(s.recLen)], idx*int64(s.recLen))
212 chunkEnd = idx + n
213 }
214 off := (idx - chunkStart) * int64(s.recLen)
215 return chunk[off : off+int64(s.recLen)]
216 }
217
218 for {
219 var haveFile, haveBuf bool
220 var fileRec, bufRec []byte
221
222 if fi < s.count {
223 fileRec = getFile(fi)
224 haveFile = bytes.Compare(fileRec[:s.cmpLen], end) <= 0
225 }
226 if bi < s.bufN {
227 boff := bi * s.recLen
228 bufRec = s.buf[boff : boff+s.recLen]
229 haveBuf = bytes.Compare(bufRec[:s.cmpLen], end) <= 0
230 }
231 if !haveFile && !haveBuf {
232 break
233 }
234
235 var rec []byte
236 if haveFile && haveBuf {
237 cmp := bytes.Compare(fileRec[:s.cmpLen], bufRec[:s.cmpLen])
238 if cmp <= 0 {
239 rec = fileRec
240 fi++
241 if cmp == 0 {
242 bi++
243 }
244 } else {
245 rec = bufRec
246 bi++
247 }
248 } else if haveFile {
249 rec = fileRec
250 fi++
251 } else {
252 rec = bufRec
253 bi++
254 }
255
256 if len(s.del) > 0 && s.isDeleted(rec) {
257 continue
258 }
259 if !fn(rec) {
260 break
261 }
262 }
263 }
264
265 // Flush merge-sorts the write buffer into the on-disk file.
266 func (s *File) Flush() error {
267 if s.bufN == 0 {
268 return nil
269 }
270 s.ensureSorted()
271
272 // Read entire file into memory (fine for <100MB indexes).
273 var fileData []byte
274 if s.count > 0 {
275 fileData = []byte{:s.count*int64(s.recLen)}
276 if _, err := s.f.ReadAt(fileData, 0); err != nil {
277 return err
278 }
279 }
280
281 tmpPath := s.path + ".tmp"
282 tmp, err := os.Create(tmpPath)
283 if err != nil {
284 return err
285 }
286
287 // Merge-sort file data and buffer into tmp.
288 fi, bi := int64(0), 0
289 for fi < s.count || bi < s.bufN {
290 var fileRec, bufRec []byte
291 haveFile := fi < s.count
292 haveBuf := bi < s.bufN
293 if haveFile {
294 off := fi * int64(s.recLen)
295 fileRec = fileData[off : off+int64(s.recLen)]
296 }
297 if haveBuf {
298 boff := bi * s.recLen
299 bufRec = s.buf[boff : boff+s.recLen]
300 }
301 var rec []byte
302 if haveFile && haveBuf {
303 cmp := bytes.Compare(fileRec[:s.cmpLen], bufRec[:s.cmpLen])
304 if cmp < 0 {
305 rec = fileRec
306 fi++
307 } else if cmp > 0 {
308 rec = bufRec
309 bi++
310 } else {
311 rec = bufRec // buffer wins on tie
312 fi++
313 bi++
314 }
315 } else if haveFile {
316 rec = fileRec
317 fi++
318 } else {
319 rec = bufRec
320 bi++
321 }
322 if len(s.del) > 0 && s.isDeleted(rec) {
323 continue
324 }
325 if _, err := tmp.Write(rec); err != nil {
326 tmp.Close()
327 os.Remove(tmpPath)
328 return err
329 }
330 }
331 s.del = s.del[:0]
332
333 info, err := tmp.Stat()
334 if err != nil {
335 tmp.Close()
336 os.Remove(tmpPath)
337 return err
338 }
339 newCount := info.Size() / int64(s.recLen)
340 tmp.Close()
341 s.f.Close()
342
343 if err := os.Rename(tmpPath, s.path); err != nil {
344 return err
345 }
346 s.f, err = os.OpenFile(s.path, os.O_RDWR, 0644)
347 if err != nil {
348 return err
349 }
350 s.count = newCount
351 s.buf = s.buf[:0]
352 s.bufN = 0
353 s.sorted = true
354 return nil
355 }
356
357 // Close flushes and closes the file.
358 func (s *File) Close() error {
359 if err := s.Flush(); err != nil {
360 return err
361 }
362 return s.f.Close()
363 }
364
365 // recSorter sorts fixed-width records in a flat byte slice.
366 type recSorter struct {
367 data []byte
368 recLen int
369 cmpLen int
370 tmp []byte
371 }
372
373 func (r *recSorter) Len() int { return len(r.data) / r.recLen }
374 func (r *recSorter) Less(i, j int) bool {
375 return bytes.Compare(
376 r.data[i*r.recLen:i*r.recLen+r.cmpLen],
377 r.data[j*r.recLen:j*r.recLen+r.cmpLen],
378 ) < 0
379 }
380 func (r *recSorter) Swap(i, j int) {
381 a := r.data[i*r.recLen : (i+1)*r.recLen]
382 b := r.data[j*r.recLen : (j+1)*r.recLen]
383 copy(r.tmp, a)
384 copy(a, b)
385 copy(b, r.tmp)
386 }
387