allocator.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package z
   7  
   8  import (
   9  	"bytes"
  10  	"fmt"
  11  	"math"
  12  	"math/bits"
  13  	"math/rand"
  14  	"strings"
  15  	"sync"
  16  	"sync/atomic"
  17  	"time"
  18  	"unsafe"
  19  
  20  	"github.com/dustin/go-humanize"
  21  )
  22  
  23  // Allocator amortizes the cost of small allocations by allocating memory in
  24  // bigger chunks.  Internally it uses z.Calloc to allocate memory. Once
  25  // allocated, the memory is not moved, so it is safe to use the allocated bytes
  26  // to unsafe cast them to Go struct pointers. Maintaining a freelist is slow.
  27  // Instead, Allocator only allocates memory, with the idea that finally we
  28  // would just release the entire Allocator.
  29  type Allocator struct {
  30  	sync.Mutex
  31  	compIdx uint64 // Stores bufIdx in 32 MSBs and posIdx in 32 LSBs.
  32  	buffers [][]byte
  33  	Ref     uint64
  34  	Tag     string
  35  }
  36  
  37  // allocs keeps references to all Allocators, so we can safely discard them later.
  38  var allocsMu *sync.Mutex
  39  var allocRef uint64
  40  var allocs map[uint64]*Allocator
  41  var calculatedLog2 []int
  42  
  43  func init() {
  44  	allocsMu = new(sync.Mutex)
  45  	allocs = make(map[uint64]*Allocator)
  46  
  47  	// Set up a unique Ref per process.
  48  	allocRef = uint64(rand.Int63n(1<<16)) << 48
  49  	calculatedLog2 = make([]int, 1025)
  50  	for i := 1; i <= 1024; i++ {
  51  		calculatedLog2[i] = int(math.Log2(float64(i)))
  52  	}
  53  }
  54  
  55  // NewAllocator creates an allocator starting with the given size.
  56  func NewAllocator(sz int, tag string) *Allocator {
  57  	ref := atomic.AddUint64(&allocRef, 1)
  58  	// We should not allow a zero sized page because addBufferWithMinSize
  59  	// will run into an infinite loop trying to double the pagesize.
  60  	if sz < 512 {
  61  		sz = 512
  62  	}
  63  	a := &Allocator{
  64  		Ref:     ref,
  65  		buffers: make([][]byte, 64),
  66  		Tag:     tag,
  67  	}
  68  	l2 := uint64(log2(sz))
  69  	if bits.OnesCount64(uint64(sz)) > 1 {
  70  		l2 += 1
  71  	}
  72  	a.buffers[0] = Calloc(1<<l2, a.Tag)
  73  
  74  	allocsMu.Lock()
  75  	allocs[ref] = a
  76  	allocsMu.Unlock()
  77  	return a
  78  }
  79  
  80  func (a *Allocator) Reset() {
  81  	atomic.StoreUint64(&a.compIdx, 0)
  82  }
  83  
  84  func Allocators() string {
  85  	allocsMu.Lock()
  86  	tags := make(map[string]uint64)
  87  	num := make(map[string]int)
  88  	for _, ac := range allocs {
  89  		tags[ac.Tag] += ac.Allocated()
  90  		num[ac.Tag] += 1
  91  	}
  92  
  93  	var buf bytes.Buffer
  94  	for tag, sz := range tags {
  95  		fmt.Fprintf(&buf, "Tag: %s Num: %d Size: %s . ", tag, num[tag], humanize.IBytes(sz))
  96  	}
  97  	allocsMu.Unlock()
  98  	return buf.String()
  99  }
 100  
 101  func (a *Allocator) String() string {
 102  	var s strings.Builder
 103  	s.WriteString(fmt.Sprintf("Allocator: %x\n", a.Ref))
 104  	var cum int
 105  	for i, b := range a.buffers {
 106  		cum += len(b)
 107  		if len(b) == 0 {
 108  			break
 109  		}
 110  		s.WriteString(fmt.Sprintf("idx: %d len: %d cum: %d\n", i, len(b), cum))
 111  	}
 112  	pos := atomic.LoadUint64(&a.compIdx)
 113  	bi, pi := parse(pos)
 114  	s.WriteString(fmt.Sprintf("bi: %d pi: %d\n", bi, pi))
 115  	s.WriteString(fmt.Sprintf("Size: %d\n", a.Size()))
 116  	return s.String()
 117  }
 118  
 119  // AllocatorFrom would return the allocator corresponding to the ref.
 120  func AllocatorFrom(ref uint64) *Allocator {
 121  	allocsMu.Lock()
 122  	a := allocs[ref]
 123  	allocsMu.Unlock()
 124  	return a
 125  }
 126  
 127  func parse(pos uint64) (bufIdx, posIdx int) {
 128  	return int(pos >> 32), int(pos & 0xFFFFFFFF)
 129  }
 130  
 131  // Size returns the size of the allocations so far.
 132  func (a *Allocator) Size() int {
 133  	pos := atomic.LoadUint64(&a.compIdx)
 134  	bi, pi := parse(pos)
 135  	var sz int
 136  	for i, b := range a.buffers {
 137  		if i < bi {
 138  			sz += len(b)
 139  			continue
 140  		}
 141  		sz += pi
 142  		return sz
 143  	}
 144  	panic("Size should not reach here")
 145  }
 146  
 147  func log2(sz int) int {
 148  	if sz < len(calculatedLog2) {
 149  		return calculatedLog2[sz]
 150  	}
 151  	pow := 10
 152  	sz >>= 10
 153  	for sz > 1 {
 154  		sz >>= 1
 155  		pow++
 156  	}
 157  	return pow
 158  }
 159  
 160  func (a *Allocator) Allocated() uint64 {
 161  	var alloc int
 162  	for _, b := range a.buffers {
 163  		alloc += cap(b)
 164  	}
 165  	return uint64(alloc)
 166  }
 167  
 168  func (a *Allocator) TrimTo(max int) {
 169  	var alloc int
 170  	for i, b := range a.buffers {
 171  		if len(b) == 0 {
 172  			break
 173  		}
 174  		alloc += len(b)
 175  		if alloc < max {
 176  			continue
 177  		}
 178  		Free(b)
 179  		a.buffers[i] = nil
 180  	}
 181  }
 182  
 183  // Release would release the memory back. Remember to make this call to avoid memory leaks.
 184  func (a *Allocator) Release() {
 185  	if a == nil {
 186  		return
 187  	}
 188  
 189  	var alloc int
 190  	for _, b := range a.buffers {
 191  		if len(b) == 0 {
 192  			break
 193  		}
 194  		alloc += len(b)
 195  		Free(b)
 196  	}
 197  
 198  	allocsMu.Lock()
 199  	delete(allocs, a.Ref)
 200  	allocsMu.Unlock()
 201  }
 202  
 203  const maxAlloc = 1 << 30
 204  
 205  func (a *Allocator) MaxAlloc() int {
 206  	return maxAlloc
 207  }
 208  
 209  const nodeAlign = unsafe.Sizeof(uint64(0)) - 1
 210  
 211  func (a *Allocator) AllocateAligned(sz int) []byte {
 212  	tsz := sz + int(nodeAlign)
 213  	out := a.Allocate(tsz)
 214  	// We are reusing allocators. In that case, it's important to zero out the memory allocated
 215  	// here. We don't always zero it out (in Allocate), because other functions would be immediately
 216  	// overwriting the allocated slices anyway (see Copy).
 217  	ZeroOut(out, 0, len(out))
 218  
 219  	addr := uintptr(unsafe.Pointer(&out[0]))
 220  	aligned := (addr + nodeAlign) & ^nodeAlign
 221  	start := int(aligned - addr)
 222  
 223  	return out[start : start+sz]
 224  }
 225  
 226  func (a *Allocator) Copy(buf []byte) []byte {
 227  	if a == nil {
 228  		return append([]byte{}, buf...)
 229  	}
 230  	out := a.Allocate(len(buf))
 231  	copy(out, buf)
 232  	return out
 233  }
 234  
 235  func (a *Allocator) addBufferAt(bufIdx, minSz int) {
 236  	for {
 237  		if bufIdx >= len(a.buffers) {
 238  			panic(fmt.Sprintf("Allocator can not allocate more than %d buffers", len(a.buffers)))
 239  		}
 240  		if len(a.buffers[bufIdx]) == 0 {
 241  			break
 242  		}
 243  		if minSz <= len(a.buffers[bufIdx]) {
 244  			// No need to do anything. We already have a buffer which can satisfy minSz.
 245  			return
 246  		}
 247  		bufIdx++
 248  	}
 249  	assert(bufIdx > 0)
 250  	// We need to allocate a new buffer.
 251  	// Make pageSize double of the last allocation.
 252  	pageSize := 2 * len(a.buffers[bufIdx-1])
 253  	// Ensure pageSize is bigger than sz.
 254  	for pageSize < minSz {
 255  		pageSize *= 2
 256  	}
 257  	// If bigger than maxAlloc, trim to maxAlloc.
 258  	if pageSize > maxAlloc {
 259  		pageSize = maxAlloc
 260  	}
 261  
 262  	buf := Calloc(pageSize, a.Tag)
 263  	assert(len(a.buffers[bufIdx]) == 0)
 264  	a.buffers[bufIdx] = buf
 265  }
 266  
 267  func (a *Allocator) Allocate(sz int) []byte {
 268  	if a == nil {
 269  		return make([]byte, sz)
 270  	}
 271  	if sz > maxAlloc {
 272  		panic(fmt.Sprintf("Unable to allocate more than %d\n", maxAlloc))
 273  	}
 274  	if sz == 0 {
 275  		return nil
 276  	}
 277  	for {
 278  		pos := atomic.AddUint64(&a.compIdx, uint64(sz))
 279  		bufIdx, posIdx := parse(pos)
 280  		buf := a.buffers[bufIdx]
 281  		if posIdx > len(buf) {
 282  			a.Lock()
 283  			newPos := atomic.LoadUint64(&a.compIdx)
 284  			newBufIdx, _ := parse(newPos)
 285  			if newBufIdx != bufIdx {
 286  				a.Unlock()
 287  				continue
 288  			}
 289  			a.addBufferAt(bufIdx+1, sz)
 290  			atomic.StoreUint64(&a.compIdx, uint64((bufIdx+1)<<32))
 291  			a.Unlock()
 292  			// We added a new buffer. Let's acquire slice the right way by going back to the top.
 293  			continue
 294  		}
 295  		data := buf[posIdx-sz : posIdx]
 296  		return data
 297  	}
 298  }
 299  
 300  type AllocatorPool struct {
 301  	numGets int64
 302  	allocCh chan *Allocator
 303  	closer  *Closer
 304  }
 305  
 306  func NewAllocatorPool(sz int) *AllocatorPool {
 307  	a := &AllocatorPool{
 308  		allocCh: make(chan *Allocator, sz),
 309  		closer:  NewCloser(1),
 310  	}
 311  	go a.freeupAllocators()
 312  	return a
 313  }
 314  
 315  func (p *AllocatorPool) Get(sz int, tag string) *Allocator {
 316  	if p == nil {
 317  		return NewAllocator(sz, tag)
 318  	}
 319  	atomic.AddInt64(&p.numGets, 1)
 320  	select {
 321  	case alloc := <-p.allocCh:
 322  		alloc.Reset()
 323  		alloc.Tag = tag
 324  		return alloc
 325  	default:
 326  		return NewAllocator(sz, tag)
 327  	}
 328  }
 329  func (p *AllocatorPool) Return(a *Allocator) {
 330  	if a == nil {
 331  		return
 332  	}
 333  	if p == nil {
 334  		a.Release()
 335  		return
 336  	}
 337  	a.TrimTo(400 << 20)
 338  
 339  	select {
 340  	case p.allocCh <- a:
 341  		return
 342  	default:
 343  		a.Release()
 344  	}
 345  }
 346  
 347  func (p *AllocatorPool) Release() {
 348  	if p == nil {
 349  		return
 350  	}
 351  	p.closer.SignalAndWait()
 352  }
 353  
 354  func (p *AllocatorPool) freeupAllocators() {
 355  	defer p.closer.Done()
 356  
 357  	ticker := time.NewTicker(2 * time.Second)
 358  	defer ticker.Stop()
 359  
 360  	releaseOne := func() bool {
 361  		select {
 362  		case alloc := <-p.allocCh:
 363  			alloc.Release()
 364  			return true
 365  		default:
 366  			return false
 367  		}
 368  	}
 369  
 370  	var last int64
 371  	for {
 372  		select {
 373  		case <-p.closer.HasBeenClosed():
 374  			close(p.allocCh)
 375  			for alloc := range p.allocCh {
 376  				alloc.Release()
 377  			}
 378  			return
 379  
 380  		case <-ticker.C:
 381  			gets := atomic.LoadInt64(&p.numGets)
 382  			if gets != last {
 383  				// Some retrievals were made since the last time. So, let's avoid doing a release.
 384  				last = gets
 385  				continue
 386  			}
 387  			releaseOne()
 388  		}
 389  	}
 390  }
 391