// Package sorted provides a sorted flat-file index with binary search. // Records are fixed-width. Inserts batch into a write buffer that is // periodically merge-sorted into the main file on Flush. package sorted import ( "bytes" "os" "sort" ) const scanChunk = 256 // records per buffered read // File is a sorted flat-file index. type File struct { path string recLen int // total record length in bytes cmpLen int // bytes to compare for ordering/search f *os.File count int64 // records on disk buf []byte // write buffer (concatenated records) bufN int // records in buffer sorted bool del [][]byte // deleted keys (cmpLen bytes each) } // Open opens or creates a sorted index file. func Open(path string, recLen, cmpLen int) (*File, error) { os.Remove(path + ".tmp") // clean up interrupted flush f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, err } info, err := f.Stat() if err != nil { f.Close() return nil, err } return &File{ path: path, recLen: recLen, cmpLen: cmpLen, f: f, count: info.Size() / int64(recLen), sorted: true, }, nil } // Count returns total records (disk + buffer). func (s *File) Count() int64 { return s.count + int64(s.bufN) } // Clear removes all records (disk and buffer). func (s *File) Clear() error { s.buf = s.buf[:0] s.bufN = 0 s.del = nil s.sorted = true if err := s.f.Truncate(0); err != nil { return err } s.count = 0 return nil } // Put adds a record to the write buffer. func (s *File) Put(rec []byte) { s.buf = append(s.buf, rec...) s.bufN++ s.sorted = false } // Delete marks a key for deletion. The record is excluded from Scan results // and removed on the next Flush. func (s *File) Delete(key []byte) { k := []byte{:s.cmpLen} copy(k, key[:s.cmpLen]) s.del = append(s.del, k) } func (s *File) isDeleted(rec []byte) bool { for _, d := range s.del { if bytes.Equal(rec[:s.cmpLen], d) { return true } } return false } func (s *File) ensureSorted() { if s.sorted || s.bufN <= 1 { s.sorted = true return } tmp := []byte{:s.recLen} rs := &recSorter{s.buf, s.recLen, s.cmpLen, tmp} sort.Sort(rs) s.sorted = true } func (s *File) readAt(dst []byte, idx int64) { s.f.ReadAt(dst[:s.recLen], idx*int64(s.recLen)) } // Get returns the first record matching key (cmpLen bytes). func (s *File) Get(key []byte) ([]byte, bool) { rec := []byte{:s.recLen} lo, hi := int64(0), s.count-1 for lo <= hi { mid := lo + (hi-lo)/2 s.readAt(rec, mid) cmp := bytes.Compare(rec[:s.cmpLen], key[:s.cmpLen]) if cmp < 0 { lo = mid + 1 } else if cmp > 0 { hi = mid - 1 } else { return rec, true } } s.ensureSorted() for i := 0; i < s.bufN; i++ { off := i * s.recLen if bytes.Equal(s.buf[off:off+s.cmpLen], key[:s.cmpLen]) { result := []byte{:s.recLen} copy(result, s.buf[off:off+s.recLen]) return result, true } } return nil, false } // Last returns the record with the largest key. func (s *File) Last() ([]byte, bool) { s.ensureSorted() var lastFile, lastBuf []byte if s.count > 0 { lastFile = []byte{:s.recLen} s.readAt(lastFile, s.count-1) } if s.bufN > 0 { off := (s.bufN - 1) * s.recLen lastBuf = s.buf[off : off+s.recLen] } if lastFile == nil && lastBuf == nil { return nil, false } if lastFile == nil { result := []byte{:s.recLen} copy(result, lastBuf) return result, true } if lastBuf == nil { return lastFile, true } if bytes.Compare(lastBuf[:s.cmpLen], lastFile[:s.cmpLen]) >= 0 { result := []byte{:s.recLen} copy(result, lastBuf) return result, true } return lastFile, true } func (s *File) lowerBound(key []byte) int64 { lo, hi := int64(0), s.count rec := []byte{:s.recLen} for lo < hi { mid := lo + (hi-lo)/2 s.readAt(rec, mid) if bytes.Compare(rec[:s.cmpLen], key) < 0 { lo = mid + 1 } else { hi = mid } } return lo } func (s *File) lowerBoundBuf(key []byte) int { lo, hi := 0, s.bufN for lo < hi { mid := lo + (hi-lo)/2 off := mid * s.recLen if bytes.Compare(s.buf[off:off+s.cmpLen], key) < 0 { lo = mid + 1 } else { hi = mid } } return lo } // Scan iterates records where start <= key <= end. // fn receives each record; return false to stop. func (s *File) Scan(start, end []byte, fn func(rec []byte) bool) { s.ensureSorted() fi := s.lowerBound(start) bi := s.lowerBoundBuf(start) // Buffered reads from the on-disk file. chunk := []byte{:scanChunk*s.recLen} chunkStart := int64(-1) // first index in chunk chunkEnd := int64(-1) // one past last index getFile := func(idx int64) []byte { if idx < chunkStart || idx >= chunkEnd { chunkStart = idx n := s.count - idx if n > scanChunk { n = scanChunk } s.f.ReadAt(chunk[:n*int64(s.recLen)], idx*int64(s.recLen)) chunkEnd = idx + n } off := (idx - chunkStart) * int64(s.recLen) return chunk[off : off+int64(s.recLen)] } for { var haveFile, haveBuf bool var fileRec, bufRec []byte if fi < s.count { fileRec = getFile(fi) haveFile = bytes.Compare(fileRec[:s.cmpLen], end) <= 0 } if bi < s.bufN { boff := bi * s.recLen bufRec = s.buf[boff : boff+s.recLen] haveBuf = bytes.Compare(bufRec[:s.cmpLen], end) <= 0 } if !haveFile && !haveBuf { break } var rec []byte if haveFile && haveBuf { cmp := bytes.Compare(fileRec[:s.cmpLen], bufRec[:s.cmpLen]) if cmp <= 0 { rec = fileRec fi++ if cmp == 0 { bi++ } } else { rec = bufRec bi++ } } else if haveFile { rec = fileRec fi++ } else { rec = bufRec bi++ } if len(s.del) > 0 && s.isDeleted(rec) { continue } if !fn(rec) { break } } } // Flush merge-sorts the write buffer into the on-disk file. func (s *File) Flush() error { if s.bufN == 0 { return nil } s.ensureSorted() // Read entire file into memory (fine for <100MB indexes). var fileData []byte if s.count > 0 { fileData = []byte{:s.count*int64(s.recLen)} if _, err := s.f.ReadAt(fileData, 0); err != nil { return err } } tmpPath := s.path + ".tmp" tmp, err := os.Create(tmpPath) if err != nil { return err } // Merge-sort file data and buffer into tmp. fi, bi := int64(0), 0 for fi < s.count || bi < s.bufN { var fileRec, bufRec []byte haveFile := fi < s.count haveBuf := bi < s.bufN if haveFile { off := fi * int64(s.recLen) fileRec = fileData[off : off+int64(s.recLen)] } if haveBuf { boff := bi * s.recLen bufRec = s.buf[boff : boff+s.recLen] } var rec []byte if haveFile && haveBuf { cmp := bytes.Compare(fileRec[:s.cmpLen], bufRec[:s.cmpLen]) if cmp < 0 { rec = fileRec fi++ } else if cmp > 0 { rec = bufRec bi++ } else { rec = bufRec // buffer wins on tie fi++ bi++ } } else if haveFile { rec = fileRec fi++ } else { rec = bufRec bi++ } if len(s.del) > 0 && s.isDeleted(rec) { continue } if _, err := tmp.Write(rec); err != nil { tmp.Close() os.Remove(tmpPath) return err } } s.del = s.del[:0] info, err := tmp.Stat() if err != nil { tmp.Close() os.Remove(tmpPath) return err } newCount := info.Size() / int64(s.recLen) tmp.Close() s.f.Close() if err := os.Rename(tmpPath, s.path); err != nil { return err } s.f, err = os.OpenFile(s.path, os.O_RDWR, 0644) if err != nil { return err } s.count = newCount s.buf = s.buf[:0] s.bufN = 0 s.sorted = true return nil } // Close flushes and closes the file. func (s *File) Close() error { if err := s.Flush(); err != nil { return err } return s.f.Close() } // recSorter sorts fixed-width records in a flat byte slice. type recSorter struct { data []byte recLen int cmpLen int tmp []byte } func (r *recSorter) Len() int { return len(r.data) / r.recLen } func (r *recSorter) Less(i, j int) bool { return bytes.Compare( r.data[i*r.recLen:i*r.recLen+r.cmpLen], r.data[j*r.recLen:j*r.recLen+r.cmpLen], ) < 0 } func (r *recSorter) Swap(i, j int) { a := r.data[i*r.recLen : (i+1)*r.recLen] b := r.data[j*r.recLen : (j+1)*r.recLen] copy(r.tmp, a) copy(a, b) copy(b, r.tmp) }