level_handler.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "fmt"
10 "sort"
11 "sync"
12
13 "github.com/dgraph-io/badger/v4/table"
14 "github.com/dgraph-io/badger/v4/y"
15 )
16
17 type levelHandler struct {
18 // Guards tables, totalSize.
19 sync.RWMutex
20
21 // For level >= 1, tables are sorted by key ranges, which do not overlap.
22 // For level 0, tables are sorted by time.
23 // For level 0, newest table are at the back. Compact the oldest one first, which is at the front.
24 tables []*table.Table
25 totalSize int64
26 totalStaleSize int64
27
28 // The following are initialized once and const.
29 level int
30 strLevel string
31 db *DB
32 }
33
34 func (s *levelHandler) isLastLevel() bool {
35 return s.level == s.db.opt.MaxLevels-1
36 }
37
38 func (s *levelHandler) getTotalStaleSize() int64 {
39 s.RLock()
40 defer s.RUnlock()
41 return s.totalStaleSize
42 }
43
44 func (s *levelHandler) getTotalSize() int64 {
45 s.RLock()
46 defer s.RUnlock()
47 return s.totalSize
48 }
49
50 // initTables replaces s.tables with given tables. This is done during loading.
51 func (s *levelHandler) initTables(tables []*table.Table) {
52 s.Lock()
53 defer s.Unlock()
54
55 s.tables = tables
56 s.totalSize = 0
57 s.totalStaleSize = 0
58 for _, t := range tables {
59 s.addSize(t)
60 }
61
62 if s.level == 0 {
63 // Key range will overlap. Just sort by fileID in ascending order
64 // because newer tables are at the end of level 0.
65 sort.Slice(s.tables, func(i, j int) bool {
66 return s.tables[i].ID() < s.tables[j].ID()
67 })
68 } else {
69 // Sort tables by keys.
70 sort.Slice(s.tables, func(i, j int) bool {
71 return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
72 })
73 }
74 }
75
76 // deleteTables remove tables idx0, ..., idx1-1.
77 func (s *levelHandler) deleteTables(toDel []*table.Table) error {
78 s.Lock() // s.Unlock() below
79
80 toDelMap := make(map[uint64]struct{})
81 for _, t := range toDel {
82 toDelMap[t.ID()] = struct{}{}
83 }
84
85 // Make a copy as iterators might be keeping a slice of tables.
86 var newTables []*table.Table
87 for _, t := range s.tables {
88 _, found := toDelMap[t.ID()]
89 if !found {
90 newTables = append(newTables, t)
91 continue
92 }
93 s.subtractSize(t)
94 }
95 s.tables = newTables
96
97 s.Unlock() // Unlock s _before_ we DecrRef our tables, which can be slow.
98
99 return decrRefs(toDel)
100 }
101
102 // replaceTables will replace tables[left:right] with newTables. Note this EXCLUDES tables[right].
103 // You must call decr() to delete the old tables _after_ writing the update to the manifest.
104 func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
105 // Need to re-search the range of tables in this level to be replaced as other goroutines might
106 // be changing it as well. (They can't touch our tables, but if they add/remove other tables,
107 // the indices get shifted around.)
108 s.Lock() // We s.Unlock() below.
109
110 toDelMap := make(map[uint64]struct{})
111 for _, t := range toDel {
112 toDelMap[t.ID()] = struct{}{}
113 }
114 var newTables []*table.Table
115 for _, t := range s.tables {
116 _, found := toDelMap[t.ID()]
117 if !found {
118 newTables = append(newTables, t)
119 continue
120 }
121 s.subtractSize(t)
122 }
123
124 // Increase totalSize first.
125 for _, t := range toAdd {
126 s.addSize(t)
127 t.IncrRef()
128 newTables = append(newTables, t)
129 }
130
131 // Assign tables.
132 s.tables = newTables
133 sort.Slice(s.tables, func(i, j int) bool {
134 return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
135 })
136 s.Unlock() // s.Unlock before we DecrRef tables -- that can be slow.
137 return decrRefs(toDel)
138 }
139
140 // addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
141 // tables based on table.Smallest. This is required for correctness of the system. But in case of
142 // stream writer this can be avoided. We can just add tables to levelHandler's table list
143 // and after all addTable calls, we can sort table list(check sortTable method).
144 // NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
145 func (s *levelHandler) addTable(t *table.Table) {
146 s.Lock()
147 defer s.Unlock()
148
149 s.addSize(t) // Increase totalSize first.
150 t.IncrRef()
151 s.tables = append(s.tables, t)
152 }
153
154 // sortTables sorts tables of levelHandler based on table.Smallest.
155 // Normally it should be called after all addTable calls.
156 func (s *levelHandler) sortTables() {
157 s.Lock()
158 defer s.Unlock()
159
160 sort.Slice(s.tables, func(i, j int) bool {
161 return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
162 })
163 }
164
165 func decrRefs(tables []*table.Table) error {
166 for _, table := range tables {
167 if err := table.DecrRef(); err != nil {
168 return err
169 }
170 }
171 return nil
172 }
173
174 func newLevelHandler(db *DB, level int) *levelHandler {
175 return &levelHandler{
176 level: level,
177 strLevel: fmt.Sprintf("l%d", level),
178 db: db,
179 }
180 }
181
182 // tryAddLevel0Table returns true if ok and no stalling.
183 func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
184 y.AssertTrue(s.level == 0)
185 // Need lock as we may be deleting the first table during a level 0 compaction.
186 s.Lock()
187 defer s.Unlock()
188 // Stall (by returning false) if we are above the specified stall setting for L0.
189 if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
190 return false
191 }
192
193 s.tables = append(s.tables, t)
194 t.IncrRef()
195 s.addSize(t)
196
197 return true
198 }
199
200 // This should be called while holding the lock on the level.
201 func (s *levelHandler) addSize(t *table.Table) {
202 s.totalSize += t.Size()
203 s.totalStaleSize += int64(t.StaleDataSize())
204 }
205
206 // This should be called while holding the lock on the level.
207 func (s *levelHandler) subtractSize(t *table.Table) {
208 s.totalSize -= t.Size()
209 s.totalStaleSize -= int64(t.StaleDataSize())
210 }
211 func (s *levelHandler) numTables() int {
212 s.RLock()
213 defer s.RUnlock()
214 return len(s.tables)
215 }
216
217 func (s *levelHandler) close() error {
218 s.RLock()
219 defer s.RUnlock()
220 var err error
221 for _, t := range s.tables {
222 if closeErr := t.Close(-1); closeErr != nil && err == nil {
223 err = closeErr
224 }
225 }
226 return y.Wrap(err, "levelHandler.close")
227 }
228
229 // getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
230 func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error) {
231 s.RLock()
232 defer s.RUnlock()
233
234 if s.level == 0 {
235 // For level 0, we need to check every table. Remember to make a copy as s.tables may change
236 // once we exit this function, and we don't want to lock s.tables while seeking in tables.
237 // CAUTION: Reverse the tables.
238 out := make([]*table.Table, 0, len(s.tables))
239 for i := len(s.tables) - 1; i >= 0; i-- {
240 out = append(out, s.tables[i])
241 s.tables[i].IncrRef()
242 }
243 return out, func() error {
244 for _, t := range out {
245 if err := t.DecrRef(); err != nil {
246 return err
247 }
248 }
249 return nil
250 }
251 }
252 // For level >= 1, we can do a binary search as key range does not overlap.
253 idx := sort.Search(len(s.tables), func(i int) bool {
254 return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
255 })
256 if idx >= len(s.tables) {
257 // Given key is strictly > than every element we have.
258 return nil, func() error { return nil }
259 }
260 tbl := s.tables[idx]
261 tbl.IncrRef()
262 return []*table.Table{tbl}, tbl.DecrRef
263 }
264
265 // get returns value for a given key or the key after that. If not found, return nil.
266 func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
267 tables, decr := s.getTableForKey(key)
268 keyNoTs := y.ParseKey(key)
269
270 hash := y.Hash(keyNoTs)
271 var maxVs y.ValueStruct
272 for _, th := range tables {
273 if th.DoesNotHave(hash) {
274 y.NumLSMBloomHitsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
275 continue
276 }
277
278 it := th.NewIterator(0)
279 defer it.Close()
280
281 y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
282 it.Seek(key)
283 if !it.Valid() {
284 continue
285 }
286 if y.SameKey(key, it.Key()) {
287 if version := y.ParseTs(it.Key()); maxVs.Version < version {
288 maxVs = it.ValueCopy()
289 maxVs.Version = version
290 }
291 }
292 }
293 return maxVs, decr()
294 }
295
296 // appendIterators appends iterators to an array of iterators, for merging.
297 // Note: This obtains references for the table handlers. Remember to close these iterators.
298 func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
299 s.RLock()
300 defer s.RUnlock()
301
302 var topt int
303 if opt.Reverse {
304 topt = table.REVERSED
305 }
306 if s.level == 0 {
307 // Remember to add in reverse order!
308 // The newer table at the end of s.tables should be added first as it takes precedence.
309 // Level 0 tables are not in key sorted order, so we need to consider them one by one.
310 var out []*table.Table
311 for _, t := range s.tables {
312 if opt.pickTable(t) {
313 out = append(out, t)
314 }
315 }
316 return appendIteratorsReversed(iters, out, topt)
317 }
318
319 tables := opt.pickTables(s.tables)
320 if len(tables) == 0 {
321 return iters
322 }
323 return append(iters, table.NewConcatIterator(tables, topt))
324 }
325
326 type levelHandlerRLocked struct{}
327
328 // overlappingTables returns the tables that intersect with key range. Returns a half-interval.
329 // This function should already have acquired a read lock, and this is so important the caller must
330 // pass an empty parameter declaring such.
331 func (s *levelHandler) overlappingTables(_ levelHandlerRLocked, kr keyRange) (int, int) {
332 if len(kr.left) == 0 || len(kr.right) == 0 {
333 return 0, 0
334 }
335 left := sort.Search(len(s.tables), func(i int) bool {
336 return y.CompareKeys(kr.left, s.tables[i].Biggest()) <= 0
337 })
338 right := sort.Search(len(s.tables), func(i int) bool {
339 return y.CompareKeys(kr.right, s.tables[i].Smallest()) < 0
340 })
341 return left, right
342 }
343