sorted.mx raw

   1  // Package sorted provides a sorted flat-file index with binary search.
   2  // Records are fixed-width. Inserts batch into a write buffer that is
   3  // periodically merge-sorted into the main file on Flush.
   4  package sorted
   5  
   6  import (
   7  	"bytes"
   8  	"os"
   9  	"sort"
  10  )
  11  
  12  const scanChunk = 256 // records per buffered read
  13  
  14  // File is a sorted flat-file index.
  15  type File struct {
  16  	path   string
  17  	recLen int      // total record length in bytes
  18  	cmpLen int      // bytes to compare for ordering/search
  19  	f      *os.File
  20  	count  int64    // records on disk
  21  	buf    []byte   // write buffer (concatenated records)
  22  	bufN   int      // records in buffer
  23  	sorted bool
  24  	del    [][]byte // deleted keys (cmpLen bytes each)
  25  }
  26  
  27  // Open opens or creates a sorted index file.
  28  func Open(path string, recLen, cmpLen int) (*File, error) {
  29  	os.Remove(path + ".tmp") // clean up interrupted flush
  30  	f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
  31  	if err != nil {
  32  		return nil, err
  33  	}
  34  	info, err := f.Stat()
  35  	if err != nil {
  36  		f.Close()
  37  		return nil, err
  38  	}
  39  	return &File{
  40  		path:   path,
  41  		recLen: recLen,
  42  		cmpLen: cmpLen,
  43  		f:      f,
  44  		count:  info.Size() / int64(recLen),
  45  		sorted: true,
  46  	}, nil
  47  }
  48  
  49  // Count returns total records (disk + buffer).
  50  func (s *File) Count() int64 { return s.count + int64(s.bufN) }
  51  
  52  // Clear removes all records (disk and buffer).
  53  func (s *File) Clear() error {
  54  	s.buf = s.buf[:0]
  55  	s.bufN = 0
  56  	s.del = nil
  57  	s.sorted = true
  58  	if err := s.f.Truncate(0); err != nil {
  59  		return err
  60  	}
  61  	s.count = 0
  62  	return nil
  63  }
  64  
  65  // Put adds a record to the write buffer.
  66  func (s *File) Put(rec []byte) {
  67  	s.buf = append(s.buf, rec...)
  68  	s.bufN++
  69  	s.sorted = false
  70  }
  71  
  72  // Delete marks a key for deletion. The record is excluded from Scan results
  73  // and removed on the next Flush.
  74  func (s *File) Delete(key []byte) {
  75  	k := []byte{:s.cmpLen}
  76  	copy(k, key[:s.cmpLen])
  77  	s.del = append(s.del, k)
  78  }
  79  
  80  func (s *File) isDeleted(rec []byte) bool {
  81  	for _, d := range s.del {
  82  		if bytes.Equal(rec[:s.cmpLen], d) {
  83  			return true
  84  		}
  85  	}
  86  	return false
  87  }
  88  
  89  func (s *File) ensureSorted() {
  90  	if s.sorted || s.bufN <= 1 {
  91  		s.sorted = true
  92  		return
  93  	}
  94  	tmp := []byte{:s.recLen}
  95  	rs := &recSorter{s.buf, s.recLen, s.cmpLen, tmp}
  96  	sort.Sort(rs)
  97  	s.sorted = true
  98  }
  99  
 100  func (s *File) readAt(dst []byte, idx int64) {
 101  	s.f.ReadAt(dst[:s.recLen], idx*int64(s.recLen))
 102  }
 103  
 104  // Get returns the first record matching key (cmpLen bytes).
 105  func (s *File) Get(key []byte) ([]byte, bool) {
 106  	rec := []byte{:s.recLen}
 107  	lo, hi := int64(0), s.count-1
 108  	for lo <= hi {
 109  		mid := lo + (hi-lo)/2
 110  		s.readAt(rec, mid)
 111  		cmp := bytes.Compare(rec[:s.cmpLen], key[:s.cmpLen])
 112  		if cmp < 0 {
 113  			lo = mid + 1
 114  		} else if cmp > 0 {
 115  			hi = mid - 1
 116  		} else {
 117  			return rec, true
 118  		}
 119  	}
 120  	s.ensureSorted()
 121  	for i := 0; i < s.bufN; i++ {
 122  		off := i * s.recLen
 123  		if bytes.Equal(s.buf[off:off+s.cmpLen], key[:s.cmpLen]) {
 124  			result := []byte{:s.recLen}
 125  			copy(result, s.buf[off:off+s.recLen])
 126  			return result, true
 127  		}
 128  	}
 129  	return nil, false
 130  }
 131  
 132  // Last returns the record with the largest key.
 133  func (s *File) Last() ([]byte, bool) {
 134  	s.ensureSorted()
 135  	var lastFile, lastBuf []byte
 136  	if s.count > 0 {
 137  		lastFile = []byte{:s.recLen}
 138  		s.readAt(lastFile, s.count-1)
 139  	}
 140  	if s.bufN > 0 {
 141  		off := (s.bufN - 1) * s.recLen
 142  		lastBuf = s.buf[off : off+s.recLen]
 143  	}
 144  	if lastFile == nil && lastBuf == nil {
 145  		return nil, false
 146  	}
 147  	if lastFile == nil {
 148  		result := []byte{:s.recLen}
 149  		copy(result, lastBuf)
 150  		return result, true
 151  	}
 152  	if lastBuf == nil {
 153  		return lastFile, true
 154  	}
 155  	if bytes.Compare(lastBuf[:s.cmpLen], lastFile[:s.cmpLen]) >= 0 {
 156  		result := []byte{:s.recLen}
 157  		copy(result, lastBuf)
 158  		return result, true
 159  	}
 160  	return lastFile, true
 161  }
 162  
 163  func (s *File) lowerBound(key []byte) int64 {
 164  	lo, hi := int64(0), s.count
 165  	rec := []byte{:s.recLen}
 166  	for lo < hi {
 167  		mid := lo + (hi-lo)/2
 168  		s.readAt(rec, mid)
 169  		if bytes.Compare(rec[:s.cmpLen], key) < 0 {
 170  			lo = mid + 1
 171  		} else {
 172  			hi = mid
 173  		}
 174  	}
 175  	return lo
 176  }
 177  
 178  func (s *File) lowerBoundBuf(key []byte) int {
 179  	lo, hi := 0, s.bufN
 180  	for lo < hi {
 181  		mid := lo + (hi-lo)/2
 182  		off := mid * s.recLen
 183  		if bytes.Compare(s.buf[off:off+s.cmpLen], key) < 0 {
 184  			lo = mid + 1
 185  		} else {
 186  			hi = mid
 187  		}
 188  	}
 189  	return lo
 190  }
 191  
 192  // Scan iterates records where start <= key <= end.
 193  // fn receives each record; return false to stop.
 194  func (s *File) Scan(start, end []byte, fn func(rec []byte) bool) {
 195  	s.ensureSorted()
 196  	fi := s.lowerBound(start)
 197  	bi := s.lowerBoundBuf(start)
 198  
 199  	// Buffered reads from the on-disk file.
 200  	chunk := []byte{:scanChunk*s.recLen}
 201  	chunkStart := int64(-1) // first index in chunk
 202  	chunkEnd := int64(-1)   // one past last index
 203  
 204  	getFile := func(idx int64) []byte {
 205  		if idx < chunkStart || idx >= chunkEnd {
 206  			chunkStart = idx
 207  			n := s.count - idx
 208  			if n > scanChunk {
 209  				n = scanChunk
 210  			}
 211  			s.f.ReadAt(chunk[:n*int64(s.recLen)], idx*int64(s.recLen))
 212  			chunkEnd = idx + n
 213  		}
 214  		off := (idx - chunkStart) * int64(s.recLen)
 215  		return chunk[off : off+int64(s.recLen)]
 216  	}
 217  
 218  	for {
 219  		var haveFile, haveBuf bool
 220  		var fileRec, bufRec []byte
 221  
 222  		if fi < s.count {
 223  			fileRec = getFile(fi)
 224  			haveFile = bytes.Compare(fileRec[:s.cmpLen], end) <= 0
 225  		}
 226  		if bi < s.bufN {
 227  			boff := bi * s.recLen
 228  			bufRec = s.buf[boff : boff+s.recLen]
 229  			haveBuf = bytes.Compare(bufRec[:s.cmpLen], end) <= 0
 230  		}
 231  		if !haveFile && !haveBuf {
 232  			break
 233  		}
 234  
 235  		var rec []byte
 236  		if haveFile && haveBuf {
 237  			cmp := bytes.Compare(fileRec[:s.cmpLen], bufRec[:s.cmpLen])
 238  			if cmp <= 0 {
 239  				rec = fileRec
 240  				fi++
 241  				if cmp == 0 {
 242  					bi++
 243  				}
 244  			} else {
 245  				rec = bufRec
 246  				bi++
 247  			}
 248  		} else if haveFile {
 249  			rec = fileRec
 250  			fi++
 251  		} else {
 252  			rec = bufRec
 253  			bi++
 254  		}
 255  
 256  		if len(s.del) > 0 && s.isDeleted(rec) {
 257  			continue
 258  		}
 259  		if !fn(rec) {
 260  			break
 261  		}
 262  	}
 263  }
 264  
 265  // Flush merge-sorts the write buffer into the on-disk file.
 266  func (s *File) Flush() error {
 267  	if s.bufN == 0 {
 268  		return nil
 269  	}
 270  	s.ensureSorted()
 271  
 272  	// Read entire file into memory (fine for <100MB indexes).
 273  	var fileData []byte
 274  	if s.count > 0 {
 275  		fileData = []byte{:s.count*int64(s.recLen)}
 276  		if _, err := s.f.ReadAt(fileData, 0); err != nil {
 277  			return err
 278  		}
 279  	}
 280  
 281  	tmpPath := s.path + ".tmp"
 282  	tmp, err := os.Create(tmpPath)
 283  	if err != nil {
 284  		return err
 285  	}
 286  
 287  	// Merge-sort file data and buffer into tmp.
 288  	fi, bi := int64(0), 0
 289  	for fi < s.count || bi < s.bufN {
 290  		var fileRec, bufRec []byte
 291  		haveFile := fi < s.count
 292  		haveBuf := bi < s.bufN
 293  		if haveFile {
 294  			off := fi * int64(s.recLen)
 295  			fileRec = fileData[off : off+int64(s.recLen)]
 296  		}
 297  		if haveBuf {
 298  			boff := bi * s.recLen
 299  			bufRec = s.buf[boff : boff+s.recLen]
 300  		}
 301  		var rec []byte
 302  		if haveFile && haveBuf {
 303  			cmp := bytes.Compare(fileRec[:s.cmpLen], bufRec[:s.cmpLen])
 304  			if cmp < 0 {
 305  				rec = fileRec
 306  				fi++
 307  			} else if cmp > 0 {
 308  				rec = bufRec
 309  				bi++
 310  			} else {
 311  				rec = bufRec // buffer wins on tie
 312  				fi++
 313  				bi++
 314  			}
 315  		} else if haveFile {
 316  			rec = fileRec
 317  			fi++
 318  		} else {
 319  			rec = bufRec
 320  			bi++
 321  		}
 322  		if len(s.del) > 0 && s.isDeleted(rec) {
 323  			continue
 324  		}
 325  		if _, err := tmp.Write(rec); err != nil {
 326  			tmp.Close()
 327  			os.Remove(tmpPath)
 328  			return err
 329  		}
 330  	}
 331  	s.del = s.del[:0]
 332  
 333  	info, err := tmp.Stat()
 334  	if err != nil {
 335  		tmp.Close()
 336  		os.Remove(tmpPath)
 337  		return err
 338  	}
 339  	newCount := info.Size() / int64(s.recLen)
 340  	tmp.Close()
 341  	s.f.Close()
 342  
 343  	if err := os.Rename(tmpPath, s.path); err != nil {
 344  		return err
 345  	}
 346  	s.f, err = os.OpenFile(s.path, os.O_RDWR, 0644)
 347  	if err != nil {
 348  		return err
 349  	}
 350  	s.count = newCount
 351  	s.buf = s.buf[:0]
 352  	s.bufN = 0
 353  	s.sorted = true
 354  	return nil
 355  }
 356  
 357  // Close flushes and closes the file.
 358  func (s *File) Close() error {
 359  	if err := s.Flush(); err != nil {
 360  		return err
 361  	}
 362  	return s.f.Close()
 363  }
 364  
 365  // recSorter sorts fixed-width records in a flat byte slice.
 366  type recSorter struct {
 367  	data   []byte
 368  	recLen int
 369  	cmpLen int
 370  	tmp    []byte
 371  }
 372  
 373  func (r *recSorter) Len() int { return len(r.data) / r.recLen }
 374  func (r *recSorter) Less(i, j int) bool {
 375  	return bytes.Compare(
 376  		r.data[i*r.recLen:i*r.recLen+r.cmpLen],
 377  		r.data[j*r.recLen:j*r.recLen+r.cmpLen],
 378  	) < 0
 379  }
 380  func (r *recSorter) Swap(i, j int) {
 381  	a := r.data[i*r.recLen : (i+1)*r.recLen]
 382  	b := r.data[j*r.recLen : (j+1)*r.recLen]
 383  	copy(r.tmp, a)
 384  	copy(a, b)
 385  	copy(b, r.tmp)
 386  }
 387