buffer.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package z
7
8 import (
9 "encoding/binary"
10 "errors"
11 "fmt"
12 "log"
13 "os"
14 "sort"
15 "sync/atomic"
16 )
17
18 const (
19 defaultCapacity = 64
20 defaultTag = "buffer"
21 )
22
23 // Buffer is equivalent of bytes.Buffer without the ability to read. It is NOT thread-safe.
24 //
25 // In UseCalloc mode, z.Calloc is used to allocate memory, which depending upon how the code is
26 // compiled could use jemalloc for allocations.
27 //
28 // In UseMmap mode, Buffer uses file mmap to allocate memory. This allows us to store big data
29 // structures without using physical memory.
30 //
31 // MaxSize can be set to limit the memory usage.
32 type Buffer struct {
33 padding uint64 // number of starting bytes used for padding
34 offset uint64 // used length of the buffer
35 buf []byte // backing slice for the buffer
36 bufType BufferType // type of the underlying buffer
37 curSz int // capacity of the buffer
38 maxSz int // causes a panic if the buffer grows beyond this size
39 mmapFile *MmapFile // optional mmap backing for the buffer
40 autoMmapAfter int // Calloc falls back to an mmaped tmpfile after crossing this size
41 autoMmapDir string // directory for autoMmap to create a tempfile in
42 persistent bool // when enabled, Release will not delete the underlying mmap file
43 tag string // used for jemalloc stats
44 }
45
46 func NewBuffer(capacity int, tag string) *Buffer {
47 if capacity < defaultCapacity {
48 capacity = defaultCapacity
49 }
50 if tag == "" {
51 tag = defaultTag
52 }
53 return &Buffer{
54 buf: Calloc(capacity, tag),
55 bufType: UseCalloc,
56 curSz: capacity,
57 offset: 8,
58 padding: 8,
59 tag: tag,
60 }
61 }
62
63 // It is the caller's responsibility to set offset after this, because Buffer
64 // doesn't remember what it was.
65 func NewBufferPersistent(path string, capacity int) (*Buffer, error) {
66 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
67 if err != nil {
68 return nil, err
69 }
70 buffer, err := newBufferFile(file, capacity)
71 if err != nil {
72 return nil, err
73 }
74 buffer.persistent = true
75 return buffer, nil
76 }
77
78 func NewBufferTmp(dir string, capacity int) (*Buffer, error) {
79 if dir == "" {
80 dir = tmpDir
81 }
82 file, err := os.CreateTemp(dir, "buffer")
83 if err != nil {
84 return nil, err
85 }
86 return newBufferFile(file, capacity)
87 }
88
89 func newBufferFile(file *os.File, capacity int) (*Buffer, error) {
90 if capacity < defaultCapacity {
91 capacity = defaultCapacity
92 }
93 mmapFile, err := OpenMmapFileUsing(file, capacity, true)
94 if err != nil && err != NewFile {
95 return nil, err
96 }
97 buf := &Buffer{
98 buf: mmapFile.Data,
99 bufType: UseMmap,
100 curSz: len(mmapFile.Data),
101 mmapFile: mmapFile,
102 offset: 8,
103 padding: 8,
104 }
105 return buf, nil
106 }
107
108 func NewBufferSlice(slice []byte) *Buffer {
109 return &Buffer{
110 offset: uint64(len(slice)),
111 buf: slice,
112 bufType: UseInvalid,
113 }
114 }
115
116 func (b *Buffer) WithAutoMmap(threshold int, path string) *Buffer {
117 if b.bufType != UseCalloc {
118 panic("can only autoMmap with UseCalloc")
119 }
120 b.autoMmapAfter = threshold
121 if path == "" {
122 b.autoMmapDir = tmpDir
123 } else {
124 b.autoMmapDir = path
125 }
126 return b
127 }
128
129 func (b *Buffer) WithMaxSize(size int) *Buffer {
130 b.maxSz = size
131 return b
132 }
133
134 func (b *Buffer) IsEmpty() bool {
135 return int(b.offset) == b.StartOffset()
136 }
137
138 // LenWithPadding would return the number of bytes written to the buffer so far
139 // plus the padding at the start of the buffer.
140 func (b *Buffer) LenWithPadding() int {
141 return int(atomic.LoadUint64(&b.offset))
142 }
143
144 // LenNoPadding would return the number of bytes written to the buffer so far
145 // (without the padding).
146 func (b *Buffer) LenNoPadding() int {
147 return int(atomic.LoadUint64(&b.offset) - b.padding)
148 }
149
150 // Bytes would return all the written bytes as a slice.
151 func (b *Buffer) Bytes() []byte {
152 off := atomic.LoadUint64(&b.offset)
153 return b.buf[b.padding:off]
154 }
155
156 // Grow would grow the buffer to have at least n more bytes. In case the buffer is at capacity, it
157 // would reallocate twice the size of current capacity + n, to ensure n bytes can be written to the
158 // buffer without further allocation. In UseMmap mode, this might result in underlying file
159 // expansion.
160 func (b *Buffer) Grow(n int) {
161 if b.buf == nil {
162 panic("z.Buffer needs to be initialized before using")
163 }
164 if b.maxSz > 0 && int(b.offset)+n > b.maxSz {
165 err := fmt.Errorf(
166 "z.Buffer max size exceeded: %d offset: %d grow: %d", b.maxSz, b.offset, n)
167 panic(err)
168 }
169 if int(b.offset)+n < b.curSz {
170 return
171 }
172
173 // Calculate new capacity.
174 growBy := b.curSz + n
175 // Don't allocate more than 1GB at a time.
176 if growBy > 1<<30 {
177 growBy = 1 << 30
178 }
179 // Allocate at least n, even if it exceeds the 1GB limit above.
180 if n > growBy {
181 growBy = n
182 }
183 b.curSz += growBy
184
185 switch b.bufType {
186 case UseCalloc:
187 // If autoMmap gets triggered, copy the slice over to an mmaped file.
188 if b.autoMmapAfter > 0 && b.curSz > b.autoMmapAfter {
189 b.bufType = UseMmap
190 file, err := os.CreateTemp(b.autoMmapDir, "")
191 if err != nil {
192 panic(err)
193 }
194 mmapFile, err := OpenMmapFileUsing(file, b.curSz, true)
195 if err != nil && err != NewFile {
196 panic(err)
197 }
198 assert(int(b.offset) == copy(mmapFile.Data, b.buf[:b.offset]))
199 Free(b.buf)
200 b.mmapFile = mmapFile
201 b.buf = mmapFile.Data
202 break
203 }
204
205 // Else, reallocate the slice.
206 newBuf := Calloc(b.curSz, b.tag)
207 assert(int(b.offset) == copy(newBuf, b.buf[:b.offset]))
208 Free(b.buf)
209 b.buf = newBuf
210
211 case UseMmap:
212 // Truncate and remap the underlying file.
213 if err := b.mmapFile.Truncate(int64(b.curSz)); err != nil {
214 err = errors.Join(err,
215 fmt.Errorf("while trying to truncate file: %s to size: %d", b.mmapFile.Fd.Name(), b.curSz))
216 panic(err)
217 }
218 b.buf = b.mmapFile.Data
219
220 default:
221 panic("can only use Grow on UseCalloc and UseMmap buffers")
222 }
223 }
224
225 // Allocate is a way to get a slice of size n back from the buffer. This slice can be directly
226 // written to. Warning: Allocate is not thread-safe. The byte slice returned MUST be used before
227 // further calls to Buffer.
228 func (b *Buffer) Allocate(n int) []byte {
229 b.Grow(n)
230 off := b.offset
231 b.offset += uint64(n)
232 return b.buf[off:int(b.offset)]
233 }
234
235 // AllocateOffset works the same way as allocate, but instead of returning a byte slice, it returns
236 // the offset of the allocation.
237 func (b *Buffer) AllocateOffset(n int) int {
238 b.Grow(n)
239 b.offset += uint64(n)
240 return int(b.offset) - n
241 }
242
243 func (b *Buffer) writeLen(sz int) {
244 buf := b.Allocate(8)
245 binary.BigEndian.PutUint64(buf, uint64(sz))
246 }
247
248 // SliceAllocate would encode the size provided into the buffer, followed by a call to Allocate,
249 // hence returning the slice of size sz. This can be used to allocate a lot of small buffers into
250 // this big buffer.
251 // Note that SliceAllocate should NOT be mixed with normal calls to Write.
252 func (b *Buffer) SliceAllocate(sz int) []byte {
253 b.Grow(8 + sz)
254 b.writeLen(sz)
255 return b.Allocate(sz)
256 }
257
258 func (b *Buffer) StartOffset() int {
259 return int(b.padding)
260 }
261
262 func (b *Buffer) WriteSlice(slice []byte) {
263 dst := b.SliceAllocate(len(slice))
264 assert(len(slice) == copy(dst, slice))
265 }
266
267 func (b *Buffer) SliceIterate(f func(slice []byte) error) error {
268 if b.IsEmpty() {
269 return nil
270 }
271
272 next := b.StartOffset()
273 var slice []byte
274 for next >= 0 {
275 slice, next = b.Slice(next)
276 if len(slice) == 0 {
277 continue
278 }
279 if err := f(slice); err != nil {
280 return err
281 }
282 }
283
284 return nil
285 }
286
287 const (
288 UseCalloc BufferType = iota
289 UseMmap
290 UseInvalid
291 )
292
293 type BufferType int
294
295 func (t BufferType) String() string {
296 switch t {
297 case UseCalloc:
298 return "UseCalloc"
299 case UseMmap:
300 return "UseMmap"
301 default:
302 return "UseInvalid"
303 }
304 }
305
306 type LessFunc func(a, b []byte) bool
307 type sortHelper struct {
308 offsets []int
309 b *Buffer
310 tmp *Buffer
311 less LessFunc
312 small []int
313 }
314
315 func (s *sortHelper) sortSmall(start, end int) {
316 s.tmp.Reset()
317 s.small = s.small[:0]
318 next := start
319 for next >= 0 && next < end {
320 s.small = append(s.small, next)
321 _, next = s.b.Slice(next)
322 }
323
324 // We are sorting the slices pointed to by s.small offsets, but only moving the offsets around.
325 sort.Slice(s.small, func(i, j int) bool {
326 left, _ := s.b.Slice(s.small[i])
327 right, _ := s.b.Slice(s.small[j])
328 return s.less(left, right)
329 })
330 // Now we iterate over the s.small offsets and copy over the slices. The result is now in order.
331 for _, off := range s.small {
332 _, _ = s.tmp.Write(rawSlice(s.b.buf[off:]))
333 }
334 assert(end-start == copy(s.b.buf[start:end], s.tmp.Bytes()))
335 }
336
337 func assert(b bool) {
338 if !b {
339 log.Fatalf("%+v", errors.New("Assertion failure"))
340 }
341 }
342 func check(err error) {
343 if err != nil {
344 log.Fatalf("%+v", err)
345 }
346 }
347 func check2(_ interface{}, err error) {
348 check(err)
349 }
350
351 func (s *sortHelper) merge(left, right []byte, start, end int) {
352 if len(left) == 0 || len(right) == 0 {
353 return
354 }
355 s.tmp.Reset()
356 check2(s.tmp.Write(left))
357 left = s.tmp.Bytes()
358
359 var ls, rs []byte
360
361 copyLeft := func() {
362 assert(len(ls) == copy(s.b.buf[start:], ls))
363 left = left[len(ls):]
364 start += len(ls)
365 }
366 copyRight := func() {
367 assert(len(rs) == copy(s.b.buf[start:], rs))
368 right = right[len(rs):]
369 start += len(rs)
370 }
371
372 for start < end {
373 if len(left) == 0 {
374 assert(len(right) == copy(s.b.buf[start:end], right))
375 return
376 }
377 if len(right) == 0 {
378 assert(len(left) == copy(s.b.buf[start:end], left))
379 return
380 }
381 ls = rawSlice(left)
382 rs = rawSlice(right)
383
384 // We skip the first 4 bytes in the rawSlice, because that stores the length.
385 if s.less(ls[8:], rs[8:]) {
386 copyLeft()
387 } else {
388 copyRight()
389 }
390 }
391 }
392
393 func (s *sortHelper) sort(lo, hi int) []byte {
394 assert(lo <= hi)
395
396 mid := lo + (hi-lo)/2
397 loff, hoff := s.offsets[lo], s.offsets[hi]
398 if lo == mid {
399 // No need to sort, just return the buffer.
400 return s.b.buf[loff:hoff]
401 }
402
403 // lo, mid would sort from [offset[lo], offset[mid]) .
404 left := s.sort(lo, mid)
405 // Typically we'd use mid+1, but here mid represents an offset in the buffer. Each offset
406 // contains a thousand entries. So, if we do mid+1, we'd skip over those entries.
407 right := s.sort(mid, hi)
408
409 s.merge(left, right, loff, hoff)
410 return s.b.buf[loff:hoff]
411 }
412
413 // SortSlice is like SortSliceBetween but sorting over the entire buffer.
414 func (b *Buffer) SortSlice(less func(left, right []byte) bool) {
415 b.SortSliceBetween(b.StartOffset(), int(b.offset), less)
416 }
417 func (b *Buffer) SortSliceBetween(start, end int, less LessFunc) {
418 if start >= end {
419 return
420 }
421 if start == 0 {
422 panic("start can never be zero")
423 }
424
425 var offsets []int
426 next, count := start, 0
427 for next >= 0 && next < end {
428 if count%1024 == 0 {
429 offsets = append(offsets, next)
430 }
431 _, next = b.Slice(next)
432 count++
433 }
434 assert(len(offsets) > 0)
435 if offsets[len(offsets)-1] != end {
436 offsets = append(offsets, end)
437 }
438
439 szTmp := int(float64((end-start)/2) * 1.1)
440 s := &sortHelper{
441 offsets: offsets,
442 b: b,
443 less: less,
444 small: make([]int, 0, 1024),
445 tmp: NewBuffer(szTmp, b.tag),
446 }
447 defer func() { _ = s.tmp.Release() }()
448
449 left := offsets[0]
450 for _, off := range offsets[1:] {
451 s.sortSmall(left, off)
452 left = off
453 }
454 s.sort(0, len(offsets)-1)
455 }
456
457 func rawSlice(buf []byte) []byte {
458 sz := binary.BigEndian.Uint64(buf)
459 return buf[:8+int(sz)]
460 }
461
462 // Slice would return the slice written at offset.
463 func (b *Buffer) Slice(offset int) ([]byte, int) {
464 if offset >= int(b.offset) {
465 return nil, -1
466 }
467
468 sz := binary.BigEndian.Uint64(b.buf[offset:])
469 start := offset + 8
470 next := start + int(sz)
471 res := b.buf[start:next]
472 if next >= int(b.offset) {
473 next = -1
474 }
475 return res, next
476 }
477
478 // SliceOffsets is an expensive function. Use sparingly.
479 func (b *Buffer) SliceOffsets() []int {
480 next := b.StartOffset()
481 var offsets []int
482 for next >= 0 {
483 offsets = append(offsets, next)
484 _, next = b.Slice(next)
485 }
486 return offsets
487 }
488
489 func (b *Buffer) Data(offset int) []byte {
490 if offset > b.curSz {
491 panic("offset beyond current size")
492 }
493 return b.buf[offset:b.curSz]
494 }
495
496 // Write would write p bytes to the buffer.
497 func (b *Buffer) Write(p []byte) (n int, err error) {
498 n = len(p)
499 b.Grow(n)
500 assert(n == copy(b.buf[b.offset:], p))
501 b.offset += uint64(n)
502 return n, nil
503 }
504
505 // Reset would reset the buffer to be reused.
506 func (b *Buffer) Reset() {
507 b.offset = uint64(b.StartOffset())
508 }
509
510 // Release would free up the memory allocated by the buffer. Once the usage of buffer is done, it is
511 // important to call Release, otherwise a memory leak can happen.
512 func (b *Buffer) Release() error {
513 if b == nil {
514 return nil
515 }
516 switch b.bufType {
517 case UseCalloc:
518 Free(b.buf)
519 case UseMmap:
520 if b.mmapFile == nil {
521 return nil
522 }
523 path := b.mmapFile.Fd.Name()
524 if err := b.mmapFile.Close(-1); err != nil {
525 return errors.Join(err, fmt.Errorf("while closing file: %s", path))
526 }
527 if !b.persistent {
528 if err := os.Remove(path); err != nil {
529 return errors.Join(err, fmt.Errorf("while deleting file %s", path))
530 }
531 }
532 }
533 return nil
534 }
535