level_handler.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"fmt"
  10  	"sort"
  11  	"sync"
  12  
  13  	"github.com/dgraph-io/badger/v4/table"
  14  	"github.com/dgraph-io/badger/v4/y"
  15  )
  16  
  17  type levelHandler struct {
  18  	// Guards tables, totalSize.
  19  	sync.RWMutex
  20  
  21  	// For level >= 1, tables are sorted by key ranges, which do not overlap.
  22  	// For level 0, tables are sorted by time.
  23  	// For level 0, newest table are at the back. Compact the oldest one first, which is at the front.
  24  	tables         []*table.Table
  25  	totalSize      int64
  26  	totalStaleSize int64
  27  
  28  	// The following are initialized once and const.
  29  	level    int
  30  	strLevel string
  31  	db       *DB
  32  }
  33  
  34  func (s *levelHandler) isLastLevel() bool {
  35  	return s.level == s.db.opt.MaxLevels-1
  36  }
  37  
  38  func (s *levelHandler) getTotalStaleSize() int64 {
  39  	s.RLock()
  40  	defer s.RUnlock()
  41  	return s.totalStaleSize
  42  }
  43  
  44  func (s *levelHandler) getTotalSize() int64 {
  45  	s.RLock()
  46  	defer s.RUnlock()
  47  	return s.totalSize
  48  }
  49  
  50  // initTables replaces s.tables with given tables. This is done during loading.
  51  func (s *levelHandler) initTables(tables []*table.Table) {
  52  	s.Lock()
  53  	defer s.Unlock()
  54  
  55  	s.tables = tables
  56  	s.totalSize = 0
  57  	s.totalStaleSize = 0
  58  	for _, t := range tables {
  59  		s.addSize(t)
  60  	}
  61  
  62  	if s.level == 0 {
  63  		// Key range will overlap. Just sort by fileID in ascending order
  64  		// because newer tables are at the end of level 0.
  65  		sort.Slice(s.tables, func(i, j int) bool {
  66  			return s.tables[i].ID() < s.tables[j].ID()
  67  		})
  68  	} else {
  69  		// Sort tables by keys.
  70  		sort.Slice(s.tables, func(i, j int) bool {
  71  			return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
  72  		})
  73  	}
  74  }
  75  
  76  // deleteTables remove tables idx0, ..., idx1-1.
  77  func (s *levelHandler) deleteTables(toDel []*table.Table) error {
  78  	s.Lock() // s.Unlock() below
  79  
  80  	toDelMap := make(map[uint64]struct{})
  81  	for _, t := range toDel {
  82  		toDelMap[t.ID()] = struct{}{}
  83  	}
  84  
  85  	// Make a copy as iterators might be keeping a slice of tables.
  86  	var newTables []*table.Table
  87  	for _, t := range s.tables {
  88  		_, found := toDelMap[t.ID()]
  89  		if !found {
  90  			newTables = append(newTables, t)
  91  			continue
  92  		}
  93  		s.subtractSize(t)
  94  	}
  95  	s.tables = newTables
  96  
  97  	s.Unlock() // Unlock s _before_ we DecrRef our tables, which can be slow.
  98  
  99  	return decrRefs(toDel)
 100  }
 101  
 102  // replaceTables will replace tables[left:right] with newTables. Note this EXCLUDES tables[right].
 103  // You must call decr() to delete the old tables _after_ writing the update to the manifest.
 104  func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
 105  	// Need to re-search the range of tables in this level to be replaced as other goroutines might
 106  	// be changing it as well.  (They can't touch our tables, but if they add/remove other tables,
 107  	// the indices get shifted around.)
 108  	s.Lock() // We s.Unlock() below.
 109  
 110  	toDelMap := make(map[uint64]struct{})
 111  	for _, t := range toDel {
 112  		toDelMap[t.ID()] = struct{}{}
 113  	}
 114  	var newTables []*table.Table
 115  	for _, t := range s.tables {
 116  		_, found := toDelMap[t.ID()]
 117  		if !found {
 118  			newTables = append(newTables, t)
 119  			continue
 120  		}
 121  		s.subtractSize(t)
 122  	}
 123  
 124  	// Increase totalSize first.
 125  	for _, t := range toAdd {
 126  		s.addSize(t)
 127  		t.IncrRef()
 128  		newTables = append(newTables, t)
 129  	}
 130  
 131  	// Assign tables.
 132  	s.tables = newTables
 133  	sort.Slice(s.tables, func(i, j int) bool {
 134  		return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
 135  	})
 136  	s.Unlock() // s.Unlock before we DecrRef tables -- that can be slow.
 137  	return decrRefs(toDel)
 138  }
 139  
 140  // addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
 141  // tables based on table.Smallest. This is required for correctness of the system. But in case of
 142  // stream writer this can be avoided. We can just add tables to levelHandler's table list
 143  // and after all addTable calls, we can sort table list(check sortTable method).
 144  // NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
 145  func (s *levelHandler) addTable(t *table.Table) {
 146  	s.Lock()
 147  	defer s.Unlock()
 148  
 149  	s.addSize(t) // Increase totalSize first.
 150  	t.IncrRef()
 151  	s.tables = append(s.tables, t)
 152  }
 153  
 154  // sortTables sorts tables of levelHandler based on table.Smallest.
 155  // Normally it should be called after all addTable calls.
 156  func (s *levelHandler) sortTables() {
 157  	s.Lock()
 158  	defer s.Unlock()
 159  
 160  	sort.Slice(s.tables, func(i, j int) bool {
 161  		return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
 162  	})
 163  }
 164  
 165  func decrRefs(tables []*table.Table) error {
 166  	for _, table := range tables {
 167  		if err := table.DecrRef(); err != nil {
 168  			return err
 169  		}
 170  	}
 171  	return nil
 172  }
 173  
 174  func newLevelHandler(db *DB, level int) *levelHandler {
 175  	return &levelHandler{
 176  		level:    level,
 177  		strLevel: fmt.Sprintf("l%d", level),
 178  		db:       db,
 179  	}
 180  }
 181  
 182  // tryAddLevel0Table returns true if ok and no stalling.
 183  func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
 184  	y.AssertTrue(s.level == 0)
 185  	// Need lock as we may be deleting the first table during a level 0 compaction.
 186  	s.Lock()
 187  	defer s.Unlock()
 188  	// Stall (by returning false) if we are above the specified stall setting for L0.
 189  	if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
 190  		return false
 191  	}
 192  
 193  	s.tables = append(s.tables, t)
 194  	t.IncrRef()
 195  	s.addSize(t)
 196  
 197  	return true
 198  }
 199  
 200  // This should be called while holding the lock on the level.
 201  func (s *levelHandler) addSize(t *table.Table) {
 202  	s.totalSize += t.Size()
 203  	s.totalStaleSize += int64(t.StaleDataSize())
 204  }
 205  
 206  // This should be called while holding the lock on the level.
 207  func (s *levelHandler) subtractSize(t *table.Table) {
 208  	s.totalSize -= t.Size()
 209  	s.totalStaleSize -= int64(t.StaleDataSize())
 210  }
 211  func (s *levelHandler) numTables() int {
 212  	s.RLock()
 213  	defer s.RUnlock()
 214  	return len(s.tables)
 215  }
 216  
 217  func (s *levelHandler) close() error {
 218  	s.RLock()
 219  	defer s.RUnlock()
 220  	var err error
 221  	for _, t := range s.tables {
 222  		if closeErr := t.Close(-1); closeErr != nil && err == nil {
 223  			err = closeErr
 224  		}
 225  	}
 226  	return y.Wrap(err, "levelHandler.close")
 227  }
 228  
 229  // getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
 230  func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error) {
 231  	s.RLock()
 232  	defer s.RUnlock()
 233  
 234  	if s.level == 0 {
 235  		// For level 0, we need to check every table. Remember to make a copy as s.tables may change
 236  		// once we exit this function, and we don't want to lock s.tables while seeking in tables.
 237  		// CAUTION: Reverse the tables.
 238  		out := make([]*table.Table, 0, len(s.tables))
 239  		for i := len(s.tables) - 1; i >= 0; i-- {
 240  			out = append(out, s.tables[i])
 241  			s.tables[i].IncrRef()
 242  		}
 243  		return out, func() error {
 244  			for _, t := range out {
 245  				if err := t.DecrRef(); err != nil {
 246  					return err
 247  				}
 248  			}
 249  			return nil
 250  		}
 251  	}
 252  	// For level >= 1, we can do a binary search as key range does not overlap.
 253  	idx := sort.Search(len(s.tables), func(i int) bool {
 254  		return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
 255  	})
 256  	if idx >= len(s.tables) {
 257  		// Given key is strictly > than every element we have.
 258  		return nil, func() error { return nil }
 259  	}
 260  	tbl := s.tables[idx]
 261  	tbl.IncrRef()
 262  	return []*table.Table{tbl}, tbl.DecrRef
 263  }
 264  
 265  // get returns value for a given key or the key after that. If not found, return nil.
 266  func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
 267  	tables, decr := s.getTableForKey(key)
 268  	keyNoTs := y.ParseKey(key)
 269  
 270  	hash := y.Hash(keyNoTs)
 271  	var maxVs y.ValueStruct
 272  	for _, th := range tables {
 273  		if th.DoesNotHave(hash) {
 274  			y.NumLSMBloomHitsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
 275  			continue
 276  		}
 277  
 278  		it := th.NewIterator(0)
 279  		defer it.Close()
 280  
 281  		y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
 282  		it.Seek(key)
 283  		if !it.Valid() {
 284  			continue
 285  		}
 286  		if y.SameKey(key, it.Key()) {
 287  			if version := y.ParseTs(it.Key()); maxVs.Version < version {
 288  				maxVs = it.ValueCopy()
 289  				maxVs.Version = version
 290  			}
 291  		}
 292  	}
 293  	return maxVs, decr()
 294  }
 295  
 296  // appendIterators appends iterators to an array of iterators, for merging.
 297  // Note: This obtains references for the table handlers. Remember to close these iterators.
 298  func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
 299  	s.RLock()
 300  	defer s.RUnlock()
 301  
 302  	var topt int
 303  	if opt.Reverse {
 304  		topt = table.REVERSED
 305  	}
 306  	if s.level == 0 {
 307  		// Remember to add in reverse order!
 308  		// The newer table at the end of s.tables should be added first as it takes precedence.
 309  		// Level 0 tables are not in key sorted order, so we need to consider them one by one.
 310  		var out []*table.Table
 311  		for _, t := range s.tables {
 312  			if opt.pickTable(t) {
 313  				out = append(out, t)
 314  			}
 315  		}
 316  		return appendIteratorsReversed(iters, out, topt)
 317  	}
 318  
 319  	tables := opt.pickTables(s.tables)
 320  	if len(tables) == 0 {
 321  		return iters
 322  	}
 323  	return append(iters, table.NewConcatIterator(tables, topt))
 324  }
 325  
 326  type levelHandlerRLocked struct{}
 327  
 328  // overlappingTables returns the tables that intersect with key range. Returns a half-interval.
 329  // This function should already have acquired a read lock, and this is so important the caller must
 330  // pass an empty parameter declaring such.
 331  func (s *levelHandler) overlappingTables(_ levelHandlerRLocked, kr keyRange) (int, int) {
 332  	if len(kr.left) == 0 || len(kr.right) == 0 {
 333  		return 0, 0
 334  	}
 335  	left := sort.Search(len(s.tables), func(i int) bool {
 336  		return y.CompareKeys(kr.left, s.tables[i].Biggest()) <= 0
 337  	})
 338  	right := sort.Search(len(s.tables), func(i int) bool {
 339  		return y.CompareKeys(kr.right, s.tables[i].Smallest()) < 0
 340  	})
 341  	return left, right
 342  }
 343