allocator.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package z
7
8 import (
9 "bytes"
10 "fmt"
11 "math"
12 "math/bits"
13 "math/rand"
14 "strings"
15 "sync"
16 "sync/atomic"
17 "time"
18 "unsafe"
19
20 "github.com/dustin/go-humanize"
21 )
22
23 // Allocator amortizes the cost of small allocations by allocating memory in
24 // bigger chunks. Internally it uses z.Calloc to allocate memory. Once
25 // allocated, the memory is not moved, so it is safe to use the allocated bytes
26 // to unsafe cast them to Go struct pointers. Maintaining a freelist is slow.
27 // Instead, Allocator only allocates memory, with the idea that finally we
28 // would just release the entire Allocator.
29 type Allocator struct {
30 sync.Mutex
31 compIdx uint64 // Stores bufIdx in 32 MSBs and posIdx in 32 LSBs.
32 buffers [][]byte
33 Ref uint64
34 Tag string
35 }
36
37 // allocs keeps references to all Allocators, so we can safely discard them later.
38 var allocsMu *sync.Mutex
39 var allocRef uint64
40 var allocs map[uint64]*Allocator
41 var calculatedLog2 []int
42
43 func init() {
44 allocsMu = new(sync.Mutex)
45 allocs = make(map[uint64]*Allocator)
46
47 // Set up a unique Ref per process.
48 allocRef = uint64(rand.Int63n(1<<16)) << 48
49 calculatedLog2 = make([]int, 1025)
50 for i := 1; i <= 1024; i++ {
51 calculatedLog2[i] = int(math.Log2(float64(i)))
52 }
53 }
54
55 // NewAllocator creates an allocator starting with the given size.
56 func NewAllocator(sz int, tag string) *Allocator {
57 ref := atomic.AddUint64(&allocRef, 1)
58 // We should not allow a zero sized page because addBufferWithMinSize
59 // will run into an infinite loop trying to double the pagesize.
60 if sz < 512 {
61 sz = 512
62 }
63 a := &Allocator{
64 Ref: ref,
65 buffers: make([][]byte, 64),
66 Tag: tag,
67 }
68 l2 := uint64(log2(sz))
69 if bits.OnesCount64(uint64(sz)) > 1 {
70 l2 += 1
71 }
72 a.buffers[0] = Calloc(1<<l2, a.Tag)
73
74 allocsMu.Lock()
75 allocs[ref] = a
76 allocsMu.Unlock()
77 return a
78 }
79
80 func (a *Allocator) Reset() {
81 atomic.StoreUint64(&a.compIdx, 0)
82 }
83
84 func Allocators() string {
85 allocsMu.Lock()
86 tags := make(map[string]uint64)
87 num := make(map[string]int)
88 for _, ac := range allocs {
89 tags[ac.Tag] += ac.Allocated()
90 num[ac.Tag] += 1
91 }
92
93 var buf bytes.Buffer
94 for tag, sz := range tags {
95 fmt.Fprintf(&buf, "Tag: %s Num: %d Size: %s . ", tag, num[tag], humanize.IBytes(sz))
96 }
97 allocsMu.Unlock()
98 return buf.String()
99 }
100
101 func (a *Allocator) String() string {
102 var s strings.Builder
103 s.WriteString(fmt.Sprintf("Allocator: %x\n", a.Ref))
104 var cum int
105 for i, b := range a.buffers {
106 cum += len(b)
107 if len(b) == 0 {
108 break
109 }
110 s.WriteString(fmt.Sprintf("idx: %d len: %d cum: %d\n", i, len(b), cum))
111 }
112 pos := atomic.LoadUint64(&a.compIdx)
113 bi, pi := parse(pos)
114 s.WriteString(fmt.Sprintf("bi: %d pi: %d\n", bi, pi))
115 s.WriteString(fmt.Sprintf("Size: %d\n", a.Size()))
116 return s.String()
117 }
118
119 // AllocatorFrom would return the allocator corresponding to the ref.
120 func AllocatorFrom(ref uint64) *Allocator {
121 allocsMu.Lock()
122 a := allocs[ref]
123 allocsMu.Unlock()
124 return a
125 }
126
127 func parse(pos uint64) (bufIdx, posIdx int) {
128 return int(pos >> 32), int(pos & 0xFFFFFFFF)
129 }
130
131 // Size returns the size of the allocations so far.
132 func (a *Allocator) Size() int {
133 pos := atomic.LoadUint64(&a.compIdx)
134 bi, pi := parse(pos)
135 var sz int
136 for i, b := range a.buffers {
137 if i < bi {
138 sz += len(b)
139 continue
140 }
141 sz += pi
142 return sz
143 }
144 panic("Size should not reach here")
145 }
146
147 func log2(sz int) int {
148 if sz < len(calculatedLog2) {
149 return calculatedLog2[sz]
150 }
151 pow := 10
152 sz >>= 10
153 for sz > 1 {
154 sz >>= 1
155 pow++
156 }
157 return pow
158 }
159
160 func (a *Allocator) Allocated() uint64 {
161 var alloc int
162 for _, b := range a.buffers {
163 alloc += cap(b)
164 }
165 return uint64(alloc)
166 }
167
168 func (a *Allocator) TrimTo(max int) {
169 var alloc int
170 for i, b := range a.buffers {
171 if len(b) == 0 {
172 break
173 }
174 alloc += len(b)
175 if alloc < max {
176 continue
177 }
178 Free(b)
179 a.buffers[i] = nil
180 }
181 }
182
183 // Release would release the memory back. Remember to make this call to avoid memory leaks.
184 func (a *Allocator) Release() {
185 if a == nil {
186 return
187 }
188
189 var alloc int
190 for _, b := range a.buffers {
191 if len(b) == 0 {
192 break
193 }
194 alloc += len(b)
195 Free(b)
196 }
197
198 allocsMu.Lock()
199 delete(allocs, a.Ref)
200 allocsMu.Unlock()
201 }
202
203 const maxAlloc = 1 << 30
204
205 func (a *Allocator) MaxAlloc() int {
206 return maxAlloc
207 }
208
209 const nodeAlign = unsafe.Sizeof(uint64(0)) - 1
210
211 func (a *Allocator) AllocateAligned(sz int) []byte {
212 tsz := sz + int(nodeAlign)
213 out := a.Allocate(tsz)
214 // We are reusing allocators. In that case, it's important to zero out the memory allocated
215 // here. We don't always zero it out (in Allocate), because other functions would be immediately
216 // overwriting the allocated slices anyway (see Copy).
217 ZeroOut(out, 0, len(out))
218
219 addr := uintptr(unsafe.Pointer(&out[0]))
220 aligned := (addr + nodeAlign) & ^nodeAlign
221 start := int(aligned - addr)
222
223 return out[start : start+sz]
224 }
225
226 func (a *Allocator) Copy(buf []byte) []byte {
227 if a == nil {
228 return append([]byte{}, buf...)
229 }
230 out := a.Allocate(len(buf))
231 copy(out, buf)
232 return out
233 }
234
235 func (a *Allocator) addBufferAt(bufIdx, minSz int) {
236 for {
237 if bufIdx >= len(a.buffers) {
238 panic(fmt.Sprintf("Allocator can not allocate more than %d buffers", len(a.buffers)))
239 }
240 if len(a.buffers[bufIdx]) == 0 {
241 break
242 }
243 if minSz <= len(a.buffers[bufIdx]) {
244 // No need to do anything. We already have a buffer which can satisfy minSz.
245 return
246 }
247 bufIdx++
248 }
249 assert(bufIdx > 0)
250 // We need to allocate a new buffer.
251 // Make pageSize double of the last allocation.
252 pageSize := 2 * len(a.buffers[bufIdx-1])
253 // Ensure pageSize is bigger than sz.
254 for pageSize < minSz {
255 pageSize *= 2
256 }
257 // If bigger than maxAlloc, trim to maxAlloc.
258 if pageSize > maxAlloc {
259 pageSize = maxAlloc
260 }
261
262 buf := Calloc(pageSize, a.Tag)
263 assert(len(a.buffers[bufIdx]) == 0)
264 a.buffers[bufIdx] = buf
265 }
266
267 func (a *Allocator) Allocate(sz int) []byte {
268 if a == nil {
269 return make([]byte, sz)
270 }
271 if sz > maxAlloc {
272 panic(fmt.Sprintf("Unable to allocate more than %d\n", maxAlloc))
273 }
274 if sz == 0 {
275 return nil
276 }
277 for {
278 pos := atomic.AddUint64(&a.compIdx, uint64(sz))
279 bufIdx, posIdx := parse(pos)
280 buf := a.buffers[bufIdx]
281 if posIdx > len(buf) {
282 a.Lock()
283 newPos := atomic.LoadUint64(&a.compIdx)
284 newBufIdx, _ := parse(newPos)
285 if newBufIdx != bufIdx {
286 a.Unlock()
287 continue
288 }
289 a.addBufferAt(bufIdx+1, sz)
290 atomic.StoreUint64(&a.compIdx, uint64((bufIdx+1)<<32))
291 a.Unlock()
292 // We added a new buffer. Let's acquire slice the right way by going back to the top.
293 continue
294 }
295 data := buf[posIdx-sz : posIdx]
296 return data
297 }
298 }
299
300 type AllocatorPool struct {
301 numGets int64
302 allocCh chan *Allocator
303 closer *Closer
304 }
305
306 func NewAllocatorPool(sz int) *AllocatorPool {
307 a := &AllocatorPool{
308 allocCh: make(chan *Allocator, sz),
309 closer: NewCloser(1),
310 }
311 go a.freeupAllocators()
312 return a
313 }
314
315 func (p *AllocatorPool) Get(sz int, tag string) *Allocator {
316 if p == nil {
317 return NewAllocator(sz, tag)
318 }
319 atomic.AddInt64(&p.numGets, 1)
320 select {
321 case alloc := <-p.allocCh:
322 alloc.Reset()
323 alloc.Tag = tag
324 return alloc
325 default:
326 return NewAllocator(sz, tag)
327 }
328 }
329 func (p *AllocatorPool) Return(a *Allocator) {
330 if a == nil {
331 return
332 }
333 if p == nil {
334 a.Release()
335 return
336 }
337 a.TrimTo(400 << 20)
338
339 select {
340 case p.allocCh <- a:
341 return
342 default:
343 a.Release()
344 }
345 }
346
347 func (p *AllocatorPool) Release() {
348 if p == nil {
349 return
350 }
351 p.closer.SignalAndWait()
352 }
353
354 func (p *AllocatorPool) freeupAllocators() {
355 defer p.closer.Done()
356
357 ticker := time.NewTicker(2 * time.Second)
358 defer ticker.Stop()
359
360 releaseOne := func() bool {
361 select {
362 case alloc := <-p.allocCh:
363 alloc.Release()
364 return true
365 default:
366 return false
367 }
368 }
369
370 var last int64
371 for {
372 select {
373 case <-p.closer.HasBeenClosed():
374 close(p.allocCh)
375 for alloc := range p.allocCh {
376 alloc.Release()
377 }
378 return
379
380 case <-ticker.C:
381 gets := atomic.LoadInt64(&p.numGets)
382 if gets != last {
383 // Some retrievals were made since the last time. So, let's avoid doing a release.
384 last = gets
385 continue
386 }
387 releaseOne()
388 }
389 }
390 }
391