1 /*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package mem
20 21 import (
22 "fmt"
23 "io"
24 )
25 26 const (
27 // 32 KiB is what io.Copy uses.
28 readAllBufSize = 32 * 1024
29 )
30 31 // BufferSlice offers a means to represent data that spans one or more Buffer
32 // instances. A BufferSlice is meant to be immutable after creation, and methods
33 // like Ref create and return copies of the slice. This is why all methods have
34 // value receivers rather than pointer receivers.
35 //
36 // Note that any of the methods that read the underlying buffers such as Ref,
37 // Len or CopyTo etc., will panic if any underlying buffers have already been
38 // freed. It is recommended to not directly interact with any of the underlying
39 // buffers directly, rather such interactions should be mediated through the
40 // various methods on this type.
41 //
42 // By convention, any APIs that return (mem.BufferSlice, error) should reduce
43 // the burden on the caller by never returning a mem.BufferSlice that needs to
44 // be freed if the error is non-nil, unless explicitly stated.
45 type BufferSlice []Buffer
46 47 // Len returns the sum of the length of all the Buffers in this slice.
48 //
49 // # Warning
50 //
51 // Invoking the built-in len on a BufferSlice will return the number of buffers
52 // in the slice, and *not* the value returned by this function.
53 func (s BufferSlice) Len() int {
54 var length int
55 for _, b := range s {
56 length += b.Len()
57 }
58 return length
59 }
60 61 // Ref invokes Ref on each buffer in the slice.
62 func (s BufferSlice) Ref() {
63 for _, b := range s {
64 b.Ref()
65 }
66 }
67 68 // Free invokes Buffer.Free() on each Buffer in the slice.
69 func (s BufferSlice) Free() {
70 for _, b := range s {
71 b.Free()
72 }
73 }
74 75 // CopyTo copies each of the underlying Buffer's data into the given buffer,
76 // returning the number of bytes copied. Has the same semantics as the copy
77 // builtin in that it will copy as many bytes as it can, stopping when either dst
78 // is full or s runs out of data, returning the minimum of s.Len() and len(dst).
79 func (s BufferSlice) CopyTo(dst []byte) int {
80 off := 0
81 for _, b := range s {
82 off += copy(dst[off:], b.ReadOnlyData())
83 }
84 return off
85 }
86 87 // Materialize concatenates all the underlying Buffer's data into a single
88 // contiguous buffer using CopyTo.
89 func (s BufferSlice) Materialize() []byte {
90 l := s.Len()
91 if l == 0 {
92 return nil
93 }
94 out := make([]byte, l)
95 s.CopyTo(out)
96 return out
97 }
98 99 // MaterializeToBuffer functions like Materialize except that it writes the data
100 // to a single Buffer pulled from the given BufferPool.
101 //
102 // As a special case, if the input BufferSlice only actually has one Buffer, this
103 // function simply increases the refcount before returning said Buffer. Freeing this
104 // buffer won't release it until the BufferSlice is itself released.
105 func (s BufferSlice) MaterializeToBuffer(pool BufferPool) Buffer {
106 if len(s) == 1 {
107 s[0].Ref()
108 return s[0]
109 }
110 sLen := s.Len()
111 if sLen == 0 {
112 return emptyBuffer{}
113 }
114 buf := pool.Get(sLen)
115 s.CopyTo(*buf)
116 return NewBuffer(buf, pool)
117 }
118 119 // Reader returns a new Reader for the input slice after taking references to
120 // each underlying buffer.
121 func (s BufferSlice) Reader() *Reader {
122 s.Ref()
123 return &Reader{
124 data: s,
125 len: s.Len(),
126 }
127 }
128 129 // Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface
130 // with other systems.
131 //
132 // Buffers will be freed as they are read.
133 //
134 // A Reader can be constructed from a BufferSlice; alternatively the zero value
135 // of a Reader may be used after calling Reset on it.
136 type Reader struct {
137 data BufferSlice
138 len int
139 // The index into data[0].ReadOnlyData().
140 bufferIdx int
141 }
142 143 // Remaining returns the number of unread bytes remaining in the slice.
144 func (r *Reader) Remaining() int {
145 return r.len
146 }
147 148 // Reset frees the currently held buffer slice and starts reading from the
149 // provided slice. This allows reusing the reader object.
150 func (r *Reader) Reset(s BufferSlice) {
151 r.data.Free()
152 s.Ref()
153 r.data = s
154 r.len = s.Len()
155 r.bufferIdx = 0
156 }
157 158 // Close frees the underlying BufferSlice and never returns an error. Subsequent
159 // calls to Read will return (0, io.EOF).
160 func (r *Reader) Close() error {
161 r.data.Free()
162 r.data = nil
163 r.len = 0
164 return nil
165 }
166 167 func (r *Reader) freeFirstBufferIfEmpty() bool {
168 if len(r.data) == 0 || r.bufferIdx != len(r.data[0].ReadOnlyData()) {
169 return false
170 }
171 172 r.data[0].Free()
173 r.data = r.data[1:]
174 r.bufferIdx = 0
175 return true
176 }
177 178 func (r *Reader) Read(buf []byte) (n int, _ error) {
179 if r.len == 0 {
180 return 0, io.EOF
181 }
182 183 for len(buf) != 0 && r.len != 0 {
184 // Copy as much as possible from the first Buffer in the slice into the
185 // given byte slice.
186 data := r.data[0].ReadOnlyData()
187 copied := copy(buf, data[r.bufferIdx:])
188 r.len -= copied // Reduce len by the number of bytes copied.
189 r.bufferIdx += copied // Increment the buffer index.
190 n += copied // Increment the total number of bytes read.
191 buf = buf[copied:] // Shrink the given byte slice.
192 193 // If we have copied all the data from the first Buffer, free it and advance to
194 // the next in the slice.
195 r.freeFirstBufferIfEmpty()
196 }
197 198 return n, nil
199 }
200 201 // ReadByte reads a single byte.
202 func (r *Reader) ReadByte() (byte, error) {
203 if r.len == 0 {
204 return 0, io.EOF
205 }
206 207 // There may be any number of empty buffers in the slice, clear them all until a
208 // non-empty buffer is reached. This is guaranteed to exit since r.len is not 0.
209 for r.freeFirstBufferIfEmpty() {
210 }
211 212 b := r.data[0].ReadOnlyData()[r.bufferIdx]
213 r.len--
214 r.bufferIdx++
215 // Free the first buffer in the slice if the last byte was read
216 r.freeFirstBufferIfEmpty()
217 return b, nil
218 }
219 220 var _ io.Writer = (*writer)(nil)
221 222 type writer struct {
223 buffers *BufferSlice
224 pool BufferPool
225 }
226 227 func (w *writer) Write(p []byte) (n int, err error) {
228 b := Copy(p, w.pool)
229 *w.buffers = append(*w.buffers, b)
230 return b.Len(), nil
231 }
232 233 // NewWriter wraps the given BufferSlice and BufferPool to implement the
234 // io.Writer interface. Every call to Write copies the contents of the given
235 // buffer into a new Buffer pulled from the given pool and the Buffer is
236 // added to the given BufferSlice.
237 func NewWriter(buffers *BufferSlice, pool BufferPool) io.Writer {
238 return &writer{buffers: buffers, pool: pool}
239 }
240 241 // ReadAll reads from r until an error or EOF and returns the data it read.
242 // A successful call returns err == nil, not err == EOF. Because ReadAll is
243 // defined to read from src until EOF, it does not treat an EOF from Read
244 // as an error to be reported.
245 //
246 // Important: A failed call returns a non-nil error and may also return
247 // partially read buffers. It is the responsibility of the caller to free the
248 // BufferSlice returned, or its memory will not be reused.
249 func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
250 var result BufferSlice
251 if wt, ok := r.(io.WriterTo); ok {
252 // This is more optimal since wt knows the size of chunks it wants to
253 // write and, hence, we can allocate buffers of an optimal size to fit
254 // them. E.g. might be a single big chunk, and we wouldn't chop it
255 // into pieces.
256 w := NewWriter(&result, pool)
257 _, err := wt.WriteTo(w)
258 return result, err
259 }
260 nextBuffer:
261 for {
262 buf := pool.Get(readAllBufSize)
263 // We asked for 32KiB but may have been given a bigger buffer.
264 // Use all of it if that's the case.
265 *buf = (*buf)[:cap(*buf)]
266 usedCap := 0
267 for {
268 n, err := r.Read((*buf)[usedCap:])
269 usedCap += n
270 if err != nil {
271 if usedCap == 0 {
272 // Nothing in this buf, put it back
273 pool.Put(buf)
274 } else {
275 *buf = (*buf)[:usedCap]
276 result = append(result, NewBuffer(buf, pool))
277 }
278 if err == io.EOF {
279 err = nil
280 }
281 return result, err
282 }
283 if len(*buf) == usedCap {
284 result = append(result, NewBuffer(buf, pool))
285 continue nextBuffer
286 }
287 }
288 }
289 }
290 291 // Discard skips the next n bytes, returning the number of bytes discarded.
292 //
293 // It frees buffers as they are fully consumed.
294 //
295 // If Discard skips fewer than n bytes, it also returns an error.
296 func (r *Reader) Discard(n int) (discarded int, err error) {
297 total := n
298 for n > 0 && r.len > 0 {
299 curData := r.data[0].ReadOnlyData()
300 curSize := min(n, len(curData)-r.bufferIdx)
301 n -= curSize
302 r.len -= curSize
303 r.bufferIdx += curSize
304 if r.bufferIdx >= len(curData) {
305 r.data[0].Free()
306 r.data = r.data[1:]
307 r.bufferIdx = 0
308 }
309 }
310 discarded = total - n
311 if n > 0 {
312 return discarded, fmt.Errorf("insufficient bytes in reader")
313 }
314 return discarded, nil
315 }
316 317 // Peek returns the next n bytes without advancing the reader.
318 //
319 // Peek appends results to the provided res slice and returns the updated slice.
320 // This pattern allows re-using the storage of res if it has sufficient
321 // capacity.
322 //
323 // The returned subslices are views into the underlying buffers and are only
324 // valid until the reader is advanced past the corresponding buffer.
325 //
326 // If Peek returns fewer than n bytes, it also returns an error.
327 func (r *Reader) Peek(n int, res [][]byte) ([][]byte, error) {
328 for i := 0; n > 0 && i < len(r.data); i++ {
329 curData := r.data[i].ReadOnlyData()
330 start := 0
331 if i == 0 {
332 start = r.bufferIdx
333 }
334 curSize := min(n, len(curData)-start)
335 if curSize == 0 {
336 continue
337 }
338 res = append(res, curData[start:start+curSize])
339 n -= curSize
340 }
341 if n > 0 {
342 return nil, fmt.Errorf("insufficient bytes in reader")
343 }
344 return res, nil
345 }
346