1 /*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 // Package mem provides utilities that facilitate memory reuse in byte slices
20 // that are used as buffers.
21 //
22 // # Experimental
23 //
24 // Notice: All APIs in this package are EXPERIMENTAL and may be changed or
25 // removed in a later release.
26 package mem
27 28 import (
29 "fmt"
30 "sync"
31 "sync/atomic"
32 )
33 34 // A Buffer represents a reference counted piece of data (in bytes) that can be
35 // acquired by a call to NewBuffer() or Copy(). A reference to a Buffer may be
36 // released by calling Free(), which invokes the free function given at creation
37 // only after all references are released.
38 //
39 // Note that a Buffer is not safe for concurrent access and instead each
40 // goroutine should use its own reference to the data, which can be acquired via
41 // a call to Ref().
42 //
43 // Attempts to access the underlying data after releasing the reference to the
44 // Buffer will panic.
45 type Buffer interface {
46 // ReadOnlyData returns the underlying byte slice. Note that it is undefined
47 // behavior to modify the contents of this slice in any way.
48 ReadOnlyData() []byte
49 // Ref increases the reference counter for this Buffer.
50 Ref()
51 // Free decrements this Buffer's reference counter and frees the underlying
52 // byte slice if the counter reaches 0 as a result of this call.
53 Free()
54 // Len returns the Buffer's size.
55 Len() int
56 57 split(n int) (left, right Buffer)
58 read(buf []byte) (int, Buffer)
59 }
60 61 var (
62 bufferPoolingThreshold = 1 << 10
63 64 bufferObjectPool = sync.Pool{New: func() any { return new(buffer) }}
65 refObjectPool = sync.Pool{New: func() any { return new(atomic.Int32) }}
66 )
67 68 // IsBelowBufferPoolingThreshold returns true if the given size is less than or
69 // equal to the threshold for buffer pooling. This is used to determine whether
70 // to pool buffers or allocate them directly.
71 func IsBelowBufferPoolingThreshold(size int) bool {
72 return size <= bufferPoolingThreshold
73 }
74 75 type buffer struct {
76 origData *[]byte
77 data []byte
78 refs *atomic.Int32
79 pool BufferPool
80 }
81 82 func newBuffer() *buffer {
83 return bufferObjectPool.Get().(*buffer)
84 }
85 86 // NewBuffer creates a new Buffer from the given data, initializing the reference
87 // counter to 1. The data will then be returned to the given pool when all
88 // references to the returned Buffer are released. As a special case to avoid
89 // additional allocations, if the given buffer pool is nil, the returned buffer
90 // will be a "no-op" Buffer where invoking Buffer.Free() does nothing and the
91 // underlying data is never freed.
92 //
93 // Note that the backing array of the given data is not copied.
94 func NewBuffer(data *[]byte, pool BufferPool) Buffer {
95 // Use the buffer's capacity instead of the length, otherwise buffers may
96 // not be reused under certain conditions. For example, if a large buffer
97 // is acquired from the pool, but fewer bytes than the buffering threshold
98 // are written to it, the buffer will not be returned to the pool.
99 if pool == nil || IsBelowBufferPoolingThreshold(cap(*data)) {
100 return (SliceBuffer)(*data)
101 }
102 b := newBuffer()
103 b.origData = data
104 b.data = *data
105 b.pool = pool
106 b.refs = refObjectPool.Get().(*atomic.Int32)
107 b.refs.Add(1)
108 return b
109 }
110 111 // Copy creates a new Buffer from the given data, initializing the reference
112 // counter to 1.
113 //
114 // It acquires a []byte from the given pool and copies over the backing array
115 // of the given data. The []byte acquired from the pool is returned to the
116 // pool when all references to the returned Buffer are released.
117 func Copy(data []byte, pool BufferPool) Buffer {
118 if IsBelowBufferPoolingThreshold(len(data)) {
119 buf := make(SliceBuffer, len(data))
120 copy(buf, data)
121 return buf
122 }
123 124 buf := pool.Get(len(data))
125 copy(*buf, data)
126 return NewBuffer(buf, pool)
127 }
128 129 func (b *buffer) ReadOnlyData() []byte {
130 if b.refs == nil {
131 panic("Cannot read freed buffer")
132 }
133 return b.data
134 }
135 136 func (b *buffer) Ref() {
137 if b.refs == nil {
138 panic("Cannot ref freed buffer")
139 }
140 b.refs.Add(1)
141 }
142 143 func (b *buffer) Free() {
144 if b.refs == nil {
145 panic("Cannot free freed buffer")
146 }
147 148 refs := b.refs.Add(-1)
149 switch {
150 case refs > 0:
151 return
152 case refs == 0:
153 if b.pool != nil {
154 b.pool.Put(b.origData)
155 }
156 157 refObjectPool.Put(b.refs)
158 b.origData = nil
159 b.data = nil
160 b.refs = nil
161 b.pool = nil
162 bufferObjectPool.Put(b)
163 default:
164 panic("Cannot free freed buffer")
165 }
166 }
167 168 func (b *buffer) Len() int {
169 return len(b.ReadOnlyData())
170 }
171 172 func (b *buffer) split(n int) (Buffer, Buffer) {
173 if b.refs == nil {
174 panic("Cannot split freed buffer")
175 }
176 177 b.refs.Add(1)
178 split := newBuffer()
179 split.origData = b.origData
180 split.data = b.data[n:]
181 split.refs = b.refs
182 split.pool = b.pool
183 184 b.data = b.data[:n]
185 186 return b, split
187 }
188 189 func (b *buffer) read(buf []byte) (int, Buffer) {
190 if b.refs == nil {
191 panic("Cannot read freed buffer")
192 }
193 194 n := copy(buf, b.data)
195 if n == len(b.data) {
196 b.Free()
197 return n, nil
198 }
199 200 b.data = b.data[n:]
201 return n, b
202 }
203 204 func (b *buffer) String() string {
205 return fmt.Sprintf("mem.Buffer(%p, data: %p, length: %d)", b, b.ReadOnlyData(), len(b.ReadOnlyData()))
206 }
207 208 // ReadUnsafe reads bytes from the given Buffer into the provided slice.
209 // It does not perform safety checks.
210 func ReadUnsafe(dst []byte, buf Buffer) (int, Buffer) {
211 return buf.read(dst)
212 }
213 214 // SplitUnsafe modifies the receiver to point to the first n bytes while it
215 // returns a new reference to the remaining bytes. The returned Buffer
216 // functions just like a normal reference acquired using Ref().
217 func SplitUnsafe(buf Buffer, n int) (left, right Buffer) {
218 return buf.split(n)
219 }
220 221 type emptyBuffer struct{}
222 223 func (e emptyBuffer) ReadOnlyData() []byte {
224 return nil
225 }
226 227 func (e emptyBuffer) Ref() {}
228 func (e emptyBuffer) Free() {}
229 230 func (e emptyBuffer) Len() int {
231 return 0
232 }
233 234 func (e emptyBuffer) split(int) (left, right Buffer) {
235 return e, e
236 }
237 238 func (e emptyBuffer) read([]byte) (int, Buffer) {
239 return 0, e
240 }
241 242 // SliceBuffer is a Buffer implementation that wraps a byte slice. It provides
243 // methods for reading, splitting, and managing the byte slice.
244 type SliceBuffer []byte
245 246 // ReadOnlyData returns the byte slice.
247 func (s SliceBuffer) ReadOnlyData() []byte { return s }
248 249 // Ref is a noop implementation of Ref.
250 func (s SliceBuffer) Ref() {}
251 252 // Free is a noop implementation of Free.
253 func (s SliceBuffer) Free() {}
254 255 // Len is a noop implementation of Len.
256 func (s SliceBuffer) Len() int { return len(s) }
257 258 func (s SliceBuffer) split(n int) (left, right Buffer) {
259 return s[:n], s[n:]
260 }
261 262 func (s SliceBuffer) read(buf []byte) (int, Buffer) {
263 n := copy(buf, s)
264 if n == len(s) {
265 return n, nil
266 }
267 return n, s[n:]
268 }
269