1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package y
7
8 import (
9 "bytes"
10 "encoding/binary"
11 stderrors "errors"
12 "fmt"
13 "hash/crc32"
14 "io"
15 "math"
16 "os"
17 "reflect"
18 "strconv"
19 "sync"
20 "time"
21 "unsafe"
22
23 "github.com/dgraph-io/badger/v4/pb"
24 "github.com/dgraph-io/ristretto/v2/z"
25 )
26
27 var (
28 // ErrEOF indicates an end of file when trying to read from a memory mapped file
29 // and encountering the end of slice.
30 ErrEOF = stderrors.New("ErrEOF: End of file")
31
32 // ErrCommitAfterFinish indicates that write batch commit was called after
33 // finish
34 ErrCommitAfterFinish = stderrors.New("Batch commit not permitted after finish")
35 )
36
37 type Flags int
38
39 const (
40 // Sync indicates that O_DSYNC should be set on the underlying file,
41 // ensuring that data writes do not return until the data is flushed
42 // to disk.
43 Sync Flags = 1 << iota
44 // ReadOnly opens the underlying file on a read-only basis.
45 ReadOnly
46 )
47
48 var (
49 // This is O_DSYNC (datasync) on platforms that support it -- see file_unix.go
50 datasyncFileFlag = 0x0
51
52 // CastagnoliCrcTable is a CRC32 polynomial table
53 CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
54 )
55
56 // OpenExistingFile opens an existing file, errors if it doesn't exist.
57 func OpenExistingFile(filename string, flags Flags) (*os.File, error) {
58 openFlags := os.O_RDWR
59 if flags&ReadOnly != 0 {
60 openFlags = os.O_RDONLY
61 }
62
63 if flags&Sync != 0 {
64 openFlags |= datasyncFileFlag
65 }
66 return os.OpenFile(filename, openFlags, 0)
67 }
68
69 // CreateSyncedFile creates a new file (using O_EXCL), errors if it already existed.
70 func CreateSyncedFile(filename string, sync bool) (*os.File, error) {
71 flags := os.O_RDWR | os.O_CREATE | os.O_EXCL
72 if sync {
73 flags |= datasyncFileFlag
74 }
75 return os.OpenFile(filename, flags, 0600)
76 }
77
78 // OpenSyncedFile creates the file if one doesn't exist.
79 func OpenSyncedFile(filename string, sync bool) (*os.File, error) {
80 flags := os.O_RDWR | os.O_CREATE
81 if sync {
82 flags |= datasyncFileFlag
83 }
84 return os.OpenFile(filename, flags, 0600)
85 }
86
87 // OpenTruncFile opens the file with O_RDWR | O_CREATE | O_TRUNC
88 func OpenTruncFile(filename string, sync bool) (*os.File, error) {
89 flags := os.O_RDWR | os.O_CREATE | os.O_TRUNC
90 if sync {
91 flags |= datasyncFileFlag
92 }
93 return os.OpenFile(filename, flags, 0600)
94 }
95
96 // SafeCopy does append(a[:0], src...).
97 func SafeCopy(a, src []byte) []byte {
98 return append(a[:0], src...)
99 }
100
101 // Copy copies a byte slice and returns the copied slice.
102 func Copy(a []byte) []byte {
103 b := make([]byte, len(a))
104 copy(b, a)
105 return b
106 }
107
108 // KeyWithTs generates a new key by appending ts to key.
109 func KeyWithTs(key []byte, ts uint64) []byte {
110 out := make([]byte, len(key)+8)
111 copy(out, key)
112 binary.BigEndian.PutUint64(out[len(key):], math.MaxUint64-ts)
113 return out
114 }
115
116 // ParseTs parses the timestamp from the key bytes.
117 func ParseTs(key []byte) uint64 {
118 if len(key) <= 8 {
119 return 0
120 }
121 return math.MaxUint64 - binary.BigEndian.Uint64(key[len(key)-8:])
122 }
123
124 // CompareKeys checks the key without timestamp and checks the timestamp if keyNoTs
125 // is same.
126 // a<timestamp> would be sorted higher than aa<timestamp> if we use bytes.compare
127 // All keys should have timestamp.
128 func CompareKeys(key1, key2 []byte) int {
129 if cmp := bytes.Compare(key1[:len(key1)-8], key2[:len(key2)-8]); cmp != 0 {
130 return cmp
131 }
132 return bytes.Compare(key1[len(key1)-8:], key2[len(key2)-8:])
133 }
134
135 // ParseKey parses the actual key from the key bytes.
136 func ParseKey(key []byte) []byte {
137 if key == nil {
138 return nil
139 }
140
141 return key[:len(key)-8]
142 }
143
144 // SameKey checks for key equality ignoring the version timestamp suffix.
145 func SameKey(src, dst []byte) bool {
146 if len(src) != len(dst) {
147 return false
148 }
149 return bytes.Equal(ParseKey(src), ParseKey(dst))
150 }
151
152 // Slice holds a reusable buf, will reallocate if you request a larger size than ever before.
153 // One problem is with n distinct sizes in random order it'll reallocate log(n) times.
154 type Slice struct {
155 buf []byte
156 }
157
158 // Resize reuses the Slice's buffer (or makes a new one) and returns a slice in that buffer of
159 // length sz.
160 func (s *Slice) Resize(sz int) []byte {
161 if cap(s.buf) < sz {
162 s.buf = make([]byte, sz)
163 }
164 return s.buf[0:sz]
165 }
166
167 // FixedDuration returns a string representation of the given duration with the
168 // hours, minutes, and seconds.
169 func FixedDuration(d time.Duration) string {
170 str := fmt.Sprintf("%02ds", int(d.Seconds())%60)
171 if d >= time.Minute {
172 str = fmt.Sprintf("%02dm", int(d.Minutes())%60) + str
173 }
174 if d >= time.Hour {
175 str = fmt.Sprintf("%02dh", int(d.Hours())) + str
176 }
177 return str
178 }
179
180 // Throttle allows a limited number of workers to run at a time. It also
181 // provides a mechanism to check for errors encountered by workers and wait for
182 // them to finish.
183 type Throttle struct {
184 once sync.Once
185 wg sync.WaitGroup
186 ch chan struct{}
187 errCh chan error
188 finishErr error
189 }
190
191 // NewThrottle creates a new throttle with a max number of workers.
192 func NewThrottle(max int) *Throttle {
193 return &Throttle{
194 ch: make(chan struct{}, max),
195 errCh: make(chan error, max),
196 }
197 }
198
199 // Do should be called by workers before they start working. It blocks if there
200 // are already maximum number of workers working. If it detects an error from
201 // previously Done workers, it would return it.
202 func (t *Throttle) Do() error {
203 for {
204 select {
205 case t.ch <- struct{}{}:
206 t.wg.Add(1)
207 return nil
208 case err := <-t.errCh:
209 if err != nil {
210 return err
211 }
212 }
213 }
214 }
215
216 // Done should be called by workers when they finish working. They can also
217 // pass the error status of work done.
218 func (t *Throttle) Done(err error) {
219 if err != nil {
220 t.errCh <- err
221 }
222 select {
223 case <-t.ch:
224 default:
225 panic("Throttle Do Done mismatch")
226 }
227 t.wg.Done()
228 }
229
230 // Finish waits until all workers have finished working. It would return any error passed by Done.
231 // If Finish is called multiple time, it will wait for workers to finish only once(first time).
232 // From next calls, it will return same error as found on first call.
233 func (t *Throttle) Finish() error {
234 t.once.Do(func() {
235 t.wg.Wait()
236 close(t.ch)
237 close(t.errCh)
238 for err := range t.errCh {
239 if err != nil {
240 t.finishErr = err
241 return
242 }
243 }
244 })
245
246 return t.finishErr
247 }
248
249 // U16ToBytes converts the given Uint16 to bytes
250 func U16ToBytes(v uint16) []byte {
251 var uBuf [2]byte
252 binary.BigEndian.PutUint16(uBuf[:], v)
253 return uBuf[:]
254 }
255
256 // BytesToU16 converts the given byte slice to uint16
257 func BytesToU16(b []byte) uint16 {
258 return binary.BigEndian.Uint16(b)
259 }
260
261 // U32ToBytes converts the given Uint32 to bytes
262 func U32ToBytes(v uint32) []byte {
263 var uBuf [4]byte
264 binary.BigEndian.PutUint32(uBuf[:], v)
265 return uBuf[:]
266 }
267
268 // BytesToU32 converts the given byte slice to uint32
269 func BytesToU32(b []byte) uint32 {
270 return binary.BigEndian.Uint32(b)
271 }
272
273 // U32SliceToBytes converts the given Uint32 slice to byte slice
274 func U32SliceToBytes(u32s []uint32) []byte {
275 if len(u32s) == 0 {
276 return nil
277 }
278 var b []byte
279 hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
280 hdr.Len = len(u32s) * 4
281 hdr.Cap = hdr.Len
282 hdr.Data = uintptr(unsafe.Pointer(&u32s[0]))
283 return b
284 }
285
286 // BytesToU32Slice converts the given byte slice to uint32 slice
287 func BytesToU32Slice(b []byte) []uint32 {
288 if len(b) == 0 {
289 return nil
290 }
291 var u32s []uint32
292 hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u32s))
293 hdr.Len = len(b) / 4
294 hdr.Cap = hdr.Len
295 hdr.Data = uintptr(unsafe.Pointer(&b[0]))
296 return u32s
297 }
298
299 // U64ToBytes converts the given Uint64 to bytes
300 func U64ToBytes(v uint64) []byte {
301 var uBuf [8]byte
302 binary.BigEndian.PutUint64(uBuf[:], v)
303 return uBuf[:]
304 }
305
306 // BytesToU64 converts the given byte slice to uint64
307 func BytesToU64(b []byte) uint64 {
308 return binary.BigEndian.Uint64(b)
309 }
310
311 // U64SliceToBytes converts the given Uint64 slice to byte slice
312 func U64SliceToBytes(u64s []uint64) []byte {
313 if len(u64s) == 0 {
314 return nil
315 }
316 var b []byte
317 hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
318 hdr.Len = len(u64s) * 8
319 hdr.Cap = hdr.Len
320 hdr.Data = uintptr(unsafe.Pointer(&u64s[0]))
321 return b
322 }
323
324 // BytesToU64Slice converts the given byte slice to uint64 slice
325 func BytesToU64Slice(b []byte) []uint64 {
326 if len(b) == 0 {
327 return nil
328 }
329 var u64s []uint64
330 hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u64s))
331 hdr.Len = len(b) / 8
332 hdr.Cap = hdr.Len
333 hdr.Data = uintptr(unsafe.Pointer(&b[0]))
334 return u64s
335 }
336
337 // page struct contains one underlying buffer.
338 type page struct {
339 buf []byte
340 }
341
342 // PageBuffer consists of many pages. A page is a wrapper over []byte. PageBuffer can act as a
343 // replacement of bytes.Buffer. Instead of having single underlying buffer, it has multiple
344 // underlying buffers. Hence it avoids any copy during relocation(as happens in bytes.Buffer).
345 // PageBuffer allocates memory in pages. Once a page is full, it will allocate page with double the
346 // size of previous page. Its function are not thread safe.
347 type PageBuffer struct {
348 pages []*page
349
350 length int // Length of PageBuffer.
351 nextPageSize int // Size of next page to be allocated.
352 }
353
354 // NewPageBuffer returns a new PageBuffer with first page having size pageSize.
355 func NewPageBuffer(pageSize int) *PageBuffer {
356 b := &PageBuffer{}
357 b.pages = append(b.pages, &page{buf: make([]byte, 0, pageSize)})
358 b.nextPageSize = pageSize * 2
359 return b
360 }
361
362 // Write writes data to PageBuffer b. It returns number of bytes written and any error encountered.
363 func (b *PageBuffer) Write(data []byte) (int, error) {
364 dataLen := len(data)
365 for {
366 cp := b.pages[len(b.pages)-1] // Current page.
367
368 n := copy(cp.buf[len(cp.buf):cap(cp.buf)], data)
369 cp.buf = cp.buf[:len(cp.buf)+n]
370 b.length += n
371
372 if len(data) == n {
373 break
374 }
375 data = data[n:]
376
377 b.pages = append(b.pages, &page{buf: make([]byte, 0, b.nextPageSize)})
378 b.nextPageSize *= 2
379 }
380
381 return dataLen, nil
382 }
383
384 // WriteByte writes data byte to PageBuffer and returns any encountered error.
385 func (b *PageBuffer) WriteByte(data byte) error {
386 _, err := b.Write([]byte{data})
387 return err
388 }
389
390 // Len returns length of PageBuffer.
391 func (b *PageBuffer) Len() int {
392 return b.length
393 }
394
395 // pageForOffset returns pageIdx and startIdx for the offset.
396 func (b *PageBuffer) pageForOffset(offset int) (int, int) {
397 AssertTrue(offset < b.length)
398
399 var pageIdx, startIdx, sizeNow int
400 for i := 0; i < len(b.pages); i++ {
401 cp := b.pages[i]
402
403 if sizeNow+len(cp.buf)-1 < offset {
404 sizeNow += len(cp.buf)
405 } else {
406 pageIdx = i
407 startIdx = offset - sizeNow
408 break
409 }
410 }
411
412 return pageIdx, startIdx
413 }
414
415 // Truncate truncates PageBuffer to length n.
416 func (b *PageBuffer) Truncate(n int) {
417 pageIdx, startIdx := b.pageForOffset(n)
418 // For simplicity of the code reject extra pages. These pages can be kept.
419 b.pages = b.pages[:pageIdx+1]
420 cp := b.pages[len(b.pages)-1]
421 cp.buf = cp.buf[:startIdx]
422 b.length = n
423 }
424
425 // Bytes returns whole Buffer data as single []byte.
426 func (b *PageBuffer) Bytes() []byte {
427 buf := make([]byte, b.length)
428 written := 0
429 for i := 0; i < len(b.pages); i++ {
430 written += copy(buf[written:], b.pages[i].buf)
431 }
432
433 return buf
434 }
435
436 // WriteTo writes whole buffer to w. It returns number of bytes written and any error encountered.
437 func (b *PageBuffer) WriteTo(w io.Writer) (int64, error) {
438 written := int64(0)
439 for i := 0; i < len(b.pages); i++ {
440 n, err := w.Write(b.pages[i].buf)
441 written += int64(n)
442 if err != nil {
443 return written, err
444 }
445 }
446
447 return written, nil
448 }
449
450 // NewReaderAt returns a reader which starts reading from offset in page buffer.
451 func (b *PageBuffer) NewReaderAt(offset int) *PageBufferReader {
452 pageIdx, startIdx := b.pageForOffset(offset)
453
454 return &PageBufferReader{
455 buf: b,
456 pageIdx: pageIdx,
457 startIdx: startIdx,
458 }
459 }
460
461 // PageBufferReader is a reader for PageBuffer.
462 type PageBufferReader struct {
463 buf *PageBuffer // Underlying page buffer.
464 pageIdx int // Idx of page from where it will start reading.
465 startIdx int // Idx inside page - buf.pages[pageIdx] from where it will start reading.
466 }
467
468 // Read reads upto len(p) bytes. It returns number of bytes read and any error encountered.
469 func (r *PageBufferReader) Read(p []byte) (int, error) {
470 // Check if there is enough to Read.
471 pc := len(r.buf.pages)
472
473 read := 0
474 for r.pageIdx < pc && read < len(p) {
475 cp := r.buf.pages[r.pageIdx] // Current Page.
476 endIdx := len(cp.buf) // Last Idx up to which we can read from this page.
477
478 n := copy(p[read:], cp.buf[r.startIdx:endIdx])
479 read += n
480 r.startIdx += n
481
482 // Instead of len(cp.buf), we comparing with cap(cp.buf). This ensures that we move to next
483 // page only when we have read all data. Reading from last page is an edge case. We don't
484 // want to move to next page until last page is full to its capacity.
485 if r.startIdx >= cap(cp.buf) {
486 // We should move to next page.
487 r.pageIdx++
488 r.startIdx = 0
489 continue
490 }
491
492 // When last page in not full to its capacity and we have read all data up to its
493 // length, just break out of the loop.
494 if r.pageIdx == pc-1 {
495 break
496 }
497 }
498
499 if read == 0 && len(p) > 0 {
500 return read, io.EOF
501 }
502
503 return read, nil
504 }
505
506 const kvsz = int(unsafe.Sizeof(pb.KV{}))
507
508 func NewKV(alloc *z.Allocator) *pb.KV {
509 if alloc == nil {
510 return &pb.KV{}
511 }
512 b := alloc.AllocateAligned(kvsz)
513 return (*pb.KV)(unsafe.Pointer(&b[0]))
514 }
515
516 // IBytesToString converts size in bytes to human readable format.
517 // The code is taken from humanize library and changed to provide
518 // value upto custom decimal precision.
519 // IBytesToString(12312412, 1) -> 11.7 MiB
520 func IBytesToString(size uint64, precision int) string {
521 sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"}
522 base := float64(1024)
523 if size < 10 {
524 return fmt.Sprintf("%d B", size)
525 }
526 e := math.Floor(math.Log(float64(size)) / math.Log(base))
527 suffix := sizes[int(e)]
528 val := float64(size) / math.Pow(base, e)
529 f := "%." + strconv.Itoa(precision) + "f %s"
530
531 return fmt.Sprintf(f, val, suffix)
532 }
533
534 type RateMonitor struct {
535 start time.Time
536 lastSent uint64
537 lastCapture time.Time
538 rates []float64
539 idx int
540 }
541
542 func NewRateMonitor(numSamples int) *RateMonitor {
543 return &RateMonitor{
544 start: time.Now(),
545 rates: make([]float64, numSamples),
546 }
547 }
548
549 const minRate = 0.0001
550
551 // Capture captures the current number of sent bytes. This number should be monotonically
552 // increasing.
553 func (rm *RateMonitor) Capture(sent uint64) {
554 diff := sent - rm.lastSent
555 dur := time.Since(rm.lastCapture)
556 rm.lastCapture, rm.lastSent = time.Now(), sent
557
558 rate := float64(diff) / dur.Seconds()
559 if rate < minRate {
560 rate = minRate
561 }
562 rm.rates[rm.idx] = rate
563 rm.idx = (rm.idx + 1) % len(rm.rates)
564 }
565
566 // Rate returns the average rate of transmission smoothed out by the number of samples.
567 func (rm *RateMonitor) Rate() uint64 {
568 var total float64
569 var den float64
570 for _, r := range rm.rates {
571 if r < minRate {
572 // Ignore this. We always set minRate, so this is a zero.
573 // Typically at the start of the rate monitor, we'd have zeros.
574 continue
575 }
576 total += r
577 den += 1.0
578 }
579 if den < minRate {
580 return 0
581 }
582 return uint64(total / den)
583 }
584