reader.go raw
1 // Copyright 2011 The Snappy-Go Authors. All rights reserved.
2 // Copyright (c) 2019+ Klaus Post. All rights reserved.
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file.
5
6 package s2
7
8 import (
9 "errors"
10 "fmt"
11 "io"
12 "io/ioutil"
13 "math"
14 "runtime"
15 "sync"
16 )
17
18 // ErrCantSeek is returned if the stream cannot be seeked.
19 type ErrCantSeek struct {
20 Reason string
21 }
22
23 // Error returns the error as string.
24 func (e ErrCantSeek) Error() string {
25 return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
26 }
27
28 // NewReader returns a new Reader that decompresses from r, using the framing
29 // format described at
30 // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes.
31 func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
32 nr := Reader{
33 r: r,
34 maxBlock: maxBlockSize,
35 }
36 for _, opt := range opts {
37 if err := opt(&nr); err != nil {
38 nr.err = err
39 return &nr
40 }
41 }
42 nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize
43 if nr.lazyBuf > 0 {
44 nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize)
45 } else {
46 nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
47 }
48 nr.readHeader = nr.ignoreStreamID
49 nr.paramsOK = true
50 return &nr
51 }
52
53 // ReaderOption is an option for creating a decoder.
54 type ReaderOption func(*Reader) error
55
56 // ReaderMaxBlockSize allows to control allocations if the stream
57 // has been compressed with a smaller WriterBlockSize, or with the default 1MB.
58 // Blocks must be this size or smaller to decompress,
59 // otherwise the decoder will return ErrUnsupported.
60 //
61 // For streams compressed with Snappy this can safely be set to 64KB (64 << 10).
62 //
63 // Default is the maximum limit of 4MB.
64 func ReaderMaxBlockSize(blockSize int) ReaderOption {
65 return func(r *Reader) error {
66 if blockSize > maxBlockSize || blockSize <= 0 {
67 return errors.New("s2: block size too large. Must be <= 4MB and > 0")
68 }
69 if r.lazyBuf == 0 && blockSize < defaultBlockSize {
70 r.lazyBuf = blockSize
71 }
72 r.maxBlock = blockSize
73 return nil
74 }
75 }
76
77 // ReaderAllocBlock allows to control upfront stream allocations
78 // and not allocate for frames bigger than this initially.
79 // If frames bigger than this is seen a bigger buffer will be allocated.
80 //
81 // Default is 1MB, which is default output size.
82 func ReaderAllocBlock(blockSize int) ReaderOption {
83 return func(r *Reader) error {
84 if blockSize > maxBlockSize || blockSize < 1024 {
85 return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024")
86 }
87 r.lazyBuf = blockSize
88 return nil
89 }
90 }
91
92 // ReaderIgnoreStreamIdentifier will make the reader skip the expected
93 // stream identifier at the beginning of the stream.
94 // This can be used when serving a stream that has been forwarded to a specific point.
95 func ReaderIgnoreStreamIdentifier() ReaderOption {
96 return func(r *Reader) error {
97 r.ignoreStreamID = true
98 return nil
99 }
100 }
101
102 // ReaderSkippableCB will register a callback for chuncks with the specified ID.
103 // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
104 // For each chunk with the ID, the callback is called with the content.
105 // Any returned non-nil error will abort decompression.
106 // Only one callback per ID is supported, latest sent will be used.
107 // You can peek the stream, triggering the callback, by doing a Read with a 0
108 // byte buffer.
109 func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
110 return func(r *Reader) error {
111 if id < 0x80 || id > 0xfd {
112 return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
113 }
114 r.skippableCB[id-0x80] = fn
115 return nil
116 }
117 }
118
119 // ReaderIgnoreCRC will make the reader skip CRC calculation and checks.
120 func ReaderIgnoreCRC() ReaderOption {
121 return func(r *Reader) error {
122 r.ignoreCRC = true
123 return nil
124 }
125 }
126
127 // Reader is an io.Reader that can read Snappy-compressed bytes.
128 type Reader struct {
129 r io.Reader
130 err error
131 decoded []byte
132 buf []byte
133 skippableCB [0xff - 0x80]func(r io.Reader) error
134 blockStart int64 // Uncompressed offset at start of current.
135 index *Index
136
137 // decoded[i:j] contains decoded bytes that have not yet been passed on.
138 i, j int
139 // maximum block size allowed.
140 maxBlock int
141 // maximum expected buffer size.
142 maxBufSize int
143 // alloc a buffer this size if > 0.
144 lazyBuf int
145 readHeader bool
146 paramsOK bool
147 snappyFrame bool
148 ignoreStreamID bool
149 ignoreCRC bool
150 }
151
152 // GetBufferCapacity returns the capacity of the internal buffer.
153 // This might be useful to know when reusing the same reader in combination
154 // with the lazy buffer option.
155 func (r *Reader) GetBufferCapacity() int {
156 return cap(r.buf)
157 }
158
159 // ensureBufferSize will ensure that the buffer can take at least n bytes.
160 // If false is returned the buffer exceeds maximum allowed size.
161 func (r *Reader) ensureBufferSize(n int) bool {
162 if n > r.maxBufSize {
163 r.err = ErrCorrupt
164 return false
165 }
166 if cap(r.buf) >= n {
167 return true
168 }
169 // Realloc buffer.
170 r.buf = make([]byte, n)
171 return true
172 }
173
174 // Reset discards any buffered data, resets all state, and switches the Snappy
175 // reader to read from r. This permits reusing a Reader rather than allocating
176 // a new one.
177 func (r *Reader) Reset(reader io.Reader) {
178 if !r.paramsOK {
179 return
180 }
181 r.index = nil
182 r.r = reader
183 r.err = nil
184 r.i = 0
185 r.j = 0
186 r.blockStart = 0
187 r.readHeader = r.ignoreStreamID
188 }
189
190 func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
191 if _, r.err = io.ReadFull(r.r, p); r.err != nil {
192 if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
193 r.err = ErrCorrupt
194 }
195 return false
196 }
197 return true
198 }
199
200 // skippable will skip n bytes.
201 // If the supplied reader supports seeking that is used.
202 // tmp is used as a temporary buffer for reading.
203 // The supplied slice does not need to be the size of the read.
204 func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
205 if id < 0x80 {
206 r.err = fmt.Errorf("internal error: skippable id < 0x80")
207 return false
208 }
209 if fn := r.skippableCB[id-0x80]; fn != nil {
210 rd := io.LimitReader(r.r, int64(n))
211 r.err = fn(rd)
212 if r.err != nil {
213 return false
214 }
215 _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
216 return r.err == nil
217 }
218 if rs, ok := r.r.(io.ReadSeeker); ok {
219 _, err := rs.Seek(int64(n), io.SeekCurrent)
220 if err == nil {
221 return true
222 }
223 if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
224 r.err = ErrCorrupt
225 return false
226 }
227 }
228 for n > 0 {
229 if n < len(tmp) {
230 tmp = tmp[:n]
231 }
232 if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
233 if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
234 r.err = ErrCorrupt
235 }
236 return false
237 }
238 n -= len(tmp)
239 }
240 return true
241 }
242
243 // Read satisfies the io.Reader interface.
244 func (r *Reader) Read(p []byte) (int, error) {
245 if r.err != nil {
246 return 0, r.err
247 }
248 for {
249 if r.i < r.j {
250 n := copy(p, r.decoded[r.i:r.j])
251 r.i += n
252 return n, nil
253 }
254 if !r.readFull(r.buf[:4], true) {
255 return 0, r.err
256 }
257 chunkType := r.buf[0]
258 if !r.readHeader {
259 if chunkType != chunkTypeStreamIdentifier {
260 r.err = ErrCorrupt
261 return 0, r.err
262 }
263 r.readHeader = true
264 }
265 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
266
267 // The chunk types are specified at
268 // https://github.com/google/snappy/blob/master/framing_format.txt
269 switch chunkType {
270 case chunkTypeCompressedData:
271 r.blockStart += int64(r.j)
272 // Section 4.2. Compressed data (chunk type 0x00).
273 if chunkLen < checksumSize {
274 r.err = ErrCorrupt
275 return 0, r.err
276 }
277 if !r.ensureBufferSize(chunkLen) {
278 if r.err == nil {
279 r.err = ErrUnsupported
280 }
281 return 0, r.err
282 }
283 buf := r.buf[:chunkLen]
284 if !r.readFull(buf, false) {
285 return 0, r.err
286 }
287 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
288 buf = buf[checksumSize:]
289
290 n, err := DecodedLen(buf)
291 if err != nil {
292 r.err = err
293 return 0, r.err
294 }
295 if r.snappyFrame && n > maxSnappyBlockSize {
296 r.err = ErrCorrupt
297 return 0, r.err
298 }
299
300 if n > len(r.decoded) {
301 if n > r.maxBlock {
302 r.err = ErrCorrupt
303 return 0, r.err
304 }
305 r.decoded = make([]byte, n)
306 }
307 if _, err := Decode(r.decoded, buf); err != nil {
308 r.err = err
309 return 0, r.err
310 }
311 if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
312 r.err = ErrCRC
313 return 0, r.err
314 }
315 r.i, r.j = 0, n
316 continue
317
318 case chunkTypeUncompressedData:
319 r.blockStart += int64(r.j)
320 // Section 4.3. Uncompressed data (chunk type 0x01).
321 if chunkLen < checksumSize {
322 r.err = ErrCorrupt
323 return 0, r.err
324 }
325 if !r.ensureBufferSize(chunkLen) {
326 if r.err == nil {
327 r.err = ErrUnsupported
328 }
329 return 0, r.err
330 }
331 buf := r.buf[:checksumSize]
332 if !r.readFull(buf, false) {
333 return 0, r.err
334 }
335 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
336 // Read directly into r.decoded instead of via r.buf.
337 n := chunkLen - checksumSize
338 if r.snappyFrame && n > maxSnappyBlockSize {
339 r.err = ErrCorrupt
340 return 0, r.err
341 }
342 if n > len(r.decoded) {
343 if n > r.maxBlock {
344 r.err = ErrCorrupt
345 return 0, r.err
346 }
347 r.decoded = make([]byte, n)
348 }
349 if !r.readFull(r.decoded[:n], false) {
350 return 0, r.err
351 }
352 if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
353 r.err = ErrCRC
354 return 0, r.err
355 }
356 r.i, r.j = 0, n
357 continue
358
359 case chunkTypeStreamIdentifier:
360 // Section 4.1. Stream identifier (chunk type 0xff).
361 if chunkLen != len(magicBody) {
362 r.err = ErrCorrupt
363 return 0, r.err
364 }
365 if !r.readFull(r.buf[:len(magicBody)], false) {
366 return 0, r.err
367 }
368 if string(r.buf[:len(magicBody)]) != magicBody {
369 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
370 r.err = ErrCorrupt
371 return 0, r.err
372 } else {
373 r.snappyFrame = true
374 }
375 } else {
376 r.snappyFrame = false
377 }
378 continue
379 }
380
381 if chunkType <= 0x7f {
382 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
383 // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
384 r.err = ErrUnsupported
385 return 0, r.err
386 }
387 // Section 4.4 Padding (chunk type 0xfe).
388 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
389 if chunkLen > maxChunkSize {
390 // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
391 r.err = ErrUnsupported
392 return 0, r.err
393 }
394
395 // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
396 if !r.skippable(r.buf, chunkLen, false, chunkType) {
397 return 0, r.err
398 }
399 }
400 }
401
402 // DecodeConcurrent will decode the full stream to w.
403 // This function should not be combined with reading, seeking or other operations.
404 // Up to 'concurrent' goroutines will be used.
405 // If <= 0, runtime.NumCPU will be used.
406 // On success the number of bytes decompressed nil and is returned.
407 // This is mainly intended for bigger streams.
408 func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
409 if r.i > 0 || r.j > 0 || r.blockStart > 0 {
410 return 0, errors.New("DecodeConcurrent called after ")
411 }
412 if concurrent <= 0 {
413 concurrent = runtime.NumCPU()
414 }
415
416 // Write to output
417 var errMu sync.Mutex
418 var aErr error
419 setErr := func(e error) (ok bool) {
420 errMu.Lock()
421 defer errMu.Unlock()
422 if e == nil {
423 return aErr == nil
424 }
425 if aErr == nil {
426 aErr = e
427 }
428 return false
429 }
430 hasErr := func() (ok bool) {
431 errMu.Lock()
432 v := aErr != nil
433 errMu.Unlock()
434 return v
435 }
436
437 var aWritten int64
438 toRead := make(chan []byte, concurrent)
439 writtenBlocks := make(chan []byte, concurrent)
440 queue := make(chan chan []byte, concurrent)
441 reUse := make(chan chan []byte, concurrent)
442 for i := 0; i < concurrent; i++ {
443 toRead <- make([]byte, 0, r.maxBufSize)
444 writtenBlocks <- make([]byte, 0, r.maxBufSize)
445 reUse <- make(chan []byte, 1)
446 }
447 // Writer
448 var wg sync.WaitGroup
449 wg.Add(1)
450 go func() {
451 defer wg.Done()
452 for toWrite := range queue {
453 entry := <-toWrite
454 reUse <- toWrite
455 if hasErr() || entry == nil {
456 if entry != nil {
457 writtenBlocks <- entry
458 }
459 continue
460 }
461 if hasErr() {
462 writtenBlocks <- entry
463 continue
464 }
465 n, err := w.Write(entry)
466 want := len(entry)
467 writtenBlocks <- entry
468 if err != nil {
469 setErr(err)
470 continue
471 }
472 if n != want {
473 setErr(io.ErrShortWrite)
474 continue
475 }
476 aWritten += int64(n)
477 }
478 }()
479
480 defer func() {
481 if r.err != nil {
482 setErr(r.err)
483 } else if err != nil {
484 setErr(err)
485 }
486 close(queue)
487 wg.Wait()
488 if err == nil {
489 err = aErr
490 }
491 written = aWritten
492 }()
493
494 // Reader
495 for !hasErr() {
496 if !r.readFull(r.buf[:4], true) {
497 if r.err == io.EOF {
498 r.err = nil
499 }
500 return 0, r.err
501 }
502 chunkType := r.buf[0]
503 if !r.readHeader {
504 if chunkType != chunkTypeStreamIdentifier {
505 r.err = ErrCorrupt
506 return 0, r.err
507 }
508 r.readHeader = true
509 }
510 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
511
512 // The chunk types are specified at
513 // https://github.com/google/snappy/blob/master/framing_format.txt
514 switch chunkType {
515 case chunkTypeCompressedData:
516 r.blockStart += int64(r.j)
517 // Section 4.2. Compressed data (chunk type 0x00).
518 if chunkLen < checksumSize {
519 r.err = ErrCorrupt
520 return 0, r.err
521 }
522 if chunkLen > r.maxBufSize {
523 r.err = ErrCorrupt
524 return 0, r.err
525 }
526 orgBuf := <-toRead
527 buf := orgBuf[:chunkLen]
528
529 if !r.readFull(buf, false) {
530 return 0, r.err
531 }
532
533 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
534 buf = buf[checksumSize:]
535
536 n, err := DecodedLen(buf)
537 if err != nil {
538 r.err = err
539 return 0, r.err
540 }
541 if r.snappyFrame && n > maxSnappyBlockSize {
542 r.err = ErrCorrupt
543 return 0, r.err
544 }
545
546 if n > r.maxBlock {
547 r.err = ErrCorrupt
548 return 0, r.err
549 }
550 wg.Add(1)
551
552 decoded := <-writtenBlocks
553 entry := <-reUse
554 queue <- entry
555 go func() {
556 defer wg.Done()
557 decoded = decoded[:n]
558 _, err := Decode(decoded, buf)
559 toRead <- orgBuf
560 if err != nil {
561 writtenBlocks <- decoded
562 setErr(err)
563 entry <- nil
564 return
565 }
566 if !r.ignoreCRC && crc(decoded) != checksum {
567 writtenBlocks <- decoded
568 setErr(ErrCRC)
569 entry <- nil
570 return
571 }
572 entry <- decoded
573 }()
574 continue
575
576 case chunkTypeUncompressedData:
577
578 // Section 4.3. Uncompressed data (chunk type 0x01).
579 if chunkLen < checksumSize {
580 r.err = ErrCorrupt
581 return 0, r.err
582 }
583 if chunkLen > r.maxBufSize {
584 r.err = ErrCorrupt
585 return 0, r.err
586 }
587 // Grab write buffer
588 orgBuf := <-writtenBlocks
589 buf := orgBuf[:checksumSize]
590 if !r.readFull(buf, false) {
591 return 0, r.err
592 }
593 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
594 // Read content.
595 n := chunkLen - checksumSize
596
597 if r.snappyFrame && n > maxSnappyBlockSize {
598 r.err = ErrCorrupt
599 return 0, r.err
600 }
601 if n > r.maxBlock {
602 r.err = ErrCorrupt
603 return 0, r.err
604 }
605 // Read uncompressed
606 buf = orgBuf[:n]
607 if !r.readFull(buf, false) {
608 return 0, r.err
609 }
610
611 if !r.ignoreCRC && crc(buf) != checksum {
612 r.err = ErrCRC
613 return 0, r.err
614 }
615 entry := <-reUse
616 queue <- entry
617 entry <- buf
618 continue
619
620 case chunkTypeStreamIdentifier:
621 // Section 4.1. Stream identifier (chunk type 0xff).
622 if chunkLen != len(magicBody) {
623 r.err = ErrCorrupt
624 return 0, r.err
625 }
626 if !r.readFull(r.buf[:len(magicBody)], false) {
627 return 0, r.err
628 }
629 if string(r.buf[:len(magicBody)]) != magicBody {
630 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
631 r.err = ErrCorrupt
632 return 0, r.err
633 } else {
634 r.snappyFrame = true
635 }
636 } else {
637 r.snappyFrame = false
638 }
639 continue
640 }
641
642 if chunkType <= 0x7f {
643 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
644 // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
645 r.err = ErrUnsupported
646 return 0, r.err
647 }
648 // Section 4.4 Padding (chunk type 0xfe).
649 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
650 if chunkLen > maxChunkSize {
651 // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
652 r.err = ErrUnsupported
653 return 0, r.err
654 }
655
656 // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
657 if !r.skippable(r.buf, chunkLen, false, chunkType) {
658 return 0, r.err
659 }
660 }
661 return 0, r.err
662 }
663
664 // Skip will skip n bytes forward in the decompressed output.
665 // For larger skips this consumes less CPU and is faster than reading output and discarding it.
666 // CRC is not checked on skipped blocks.
667 // io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped.
668 // If a decoding error is encountered subsequent calls to Read will also fail.
669 func (r *Reader) Skip(n int64) error {
670 if n < 0 {
671 return errors.New("attempted negative skip")
672 }
673 if r.err != nil {
674 return r.err
675 }
676
677 for n > 0 {
678 if r.i < r.j {
679 // Skip in buffer.
680 // decoded[i:j] contains decoded bytes that have not yet been passed on.
681 left := int64(r.j - r.i)
682 if left >= n {
683 tmp := int64(r.i) + n
684 if tmp > math.MaxInt32 {
685 return errors.New("s2: internal overflow in skip")
686 }
687 r.i = int(tmp)
688 return nil
689 }
690 n -= int64(r.j - r.i)
691 r.i = r.j
692 }
693
694 // Buffer empty; read blocks until we have content.
695 if !r.readFull(r.buf[:4], true) {
696 if r.err == io.EOF {
697 r.err = io.ErrUnexpectedEOF
698 }
699 return r.err
700 }
701 chunkType := r.buf[0]
702 if !r.readHeader {
703 if chunkType != chunkTypeStreamIdentifier {
704 r.err = ErrCorrupt
705 return r.err
706 }
707 r.readHeader = true
708 }
709 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
710
711 // The chunk types are specified at
712 // https://github.com/google/snappy/blob/master/framing_format.txt
713 switch chunkType {
714 case chunkTypeCompressedData:
715 r.blockStart += int64(r.j)
716 // Section 4.2. Compressed data (chunk type 0x00).
717 if chunkLen < checksumSize {
718 r.err = ErrCorrupt
719 return r.err
720 }
721 if !r.ensureBufferSize(chunkLen) {
722 if r.err == nil {
723 r.err = ErrUnsupported
724 }
725 return r.err
726 }
727 buf := r.buf[:chunkLen]
728 if !r.readFull(buf, false) {
729 return r.err
730 }
731 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
732 buf = buf[checksumSize:]
733
734 dLen, err := DecodedLen(buf)
735 if err != nil {
736 r.err = err
737 return r.err
738 }
739 if dLen > r.maxBlock {
740 r.err = ErrCorrupt
741 return r.err
742 }
743 // Check if destination is within this block
744 if int64(dLen) > n {
745 if len(r.decoded) < dLen {
746 r.decoded = make([]byte, dLen)
747 }
748 if _, err := Decode(r.decoded, buf); err != nil {
749 r.err = err
750 return r.err
751 }
752 if crc(r.decoded[:dLen]) != checksum {
753 r.err = ErrCorrupt
754 return r.err
755 }
756 } else {
757 // Skip block completely
758 n -= int64(dLen)
759 r.blockStart += int64(dLen)
760 dLen = 0
761 }
762 r.i, r.j = 0, dLen
763 continue
764 case chunkTypeUncompressedData:
765 r.blockStart += int64(r.j)
766 // Section 4.3. Uncompressed data (chunk type 0x01).
767 if chunkLen < checksumSize {
768 r.err = ErrCorrupt
769 return r.err
770 }
771 if !r.ensureBufferSize(chunkLen) {
772 if r.err != nil {
773 r.err = ErrUnsupported
774 }
775 return r.err
776 }
777 buf := r.buf[:checksumSize]
778 if !r.readFull(buf, false) {
779 return r.err
780 }
781 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
782 // Read directly into r.decoded instead of via r.buf.
783 n2 := chunkLen - checksumSize
784 if n2 > len(r.decoded) {
785 if n2 > r.maxBlock {
786 r.err = ErrCorrupt
787 return r.err
788 }
789 r.decoded = make([]byte, n2)
790 }
791 if !r.readFull(r.decoded[:n2], false) {
792 return r.err
793 }
794 if int64(n2) < n {
795 if crc(r.decoded[:n2]) != checksum {
796 r.err = ErrCorrupt
797 return r.err
798 }
799 }
800 r.i, r.j = 0, n2
801 continue
802 case chunkTypeStreamIdentifier:
803 // Section 4.1. Stream identifier (chunk type 0xff).
804 if chunkLen != len(magicBody) {
805 r.err = ErrCorrupt
806 return r.err
807 }
808 if !r.readFull(r.buf[:len(magicBody)], false) {
809 return r.err
810 }
811 if string(r.buf[:len(magicBody)]) != magicBody {
812 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
813 r.err = ErrCorrupt
814 return r.err
815 }
816 }
817
818 continue
819 }
820
821 if chunkType <= 0x7f {
822 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
823 r.err = ErrUnsupported
824 return r.err
825 }
826 if chunkLen > maxChunkSize {
827 r.err = ErrUnsupported
828 return r.err
829 }
830 // Section 4.4 Padding (chunk type 0xfe).
831 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
832 if !r.skippable(r.buf, chunkLen, false, chunkType) {
833 return r.err
834 }
835 }
836 return nil
837 }
838
839 // ReadSeeker provides random or forward seeking in compressed content.
840 // See Reader.ReadSeeker
841 type ReadSeeker struct {
842 *Reader
843 readAtMu sync.Mutex
844 }
845
846 // ReadSeeker will return an io.ReadSeeker and io.ReaderAt
847 // compatible version of the reader.
848 // If 'random' is specified the returned io.Seeker can be used for
849 // random seeking, otherwise only forward seeking is supported.
850 // Enabling random seeking requires the original input to support
851 // the io.Seeker interface.
852 // A custom index can be specified which will be used if supplied.
853 // When using a custom index, it will not be read from the input stream.
854 // The ReadAt position will affect regular reads and the current position of Seek.
855 // So using Read after ReadAt will continue from where the ReadAt stopped.
856 // No functions should be used concurrently.
857 // The returned ReadSeeker contains a shallow reference to the existing Reader,
858 // meaning changes performed to one is reflected in the other.
859 func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
860 // Read index if provided.
861 if len(index) != 0 {
862 if r.index == nil {
863 r.index = &Index{}
864 }
865 if _, err := r.index.Load(index); err != nil {
866 return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
867 }
868 }
869
870 // Check if input is seekable
871 rs, ok := r.r.(io.ReadSeeker)
872 if !ok {
873 if !random {
874 return &ReadSeeker{Reader: r}, nil
875 }
876 return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
877 }
878
879 if r.index != nil {
880 // Seekable and index, ok...
881 return &ReadSeeker{Reader: r}, nil
882 }
883
884 // Load from stream.
885 r.index = &Index{}
886
887 // Read current position.
888 pos, err := rs.Seek(0, io.SeekCurrent)
889 if err != nil {
890 return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
891 }
892 err = r.index.LoadStream(rs)
893 if err != nil {
894 if err == ErrUnsupported {
895 // If we don't require random seeking, reset input and return.
896 if !random {
897 _, err = rs.Seek(pos, io.SeekStart)
898 if err != nil {
899 return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()}
900 }
901 r.index = nil
902 return &ReadSeeker{Reader: r}, nil
903 }
904 return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
905 }
906 return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
907 }
908
909 // reset position.
910 _, err = rs.Seek(pos, io.SeekStart)
911 if err != nil {
912 return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
913 }
914 return &ReadSeeker{Reader: r}, nil
915 }
916
917 // Seek allows seeking in compressed data.
918 func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
919 if r.err != nil {
920 if !errors.Is(r.err, io.EOF) {
921 return 0, r.err
922 }
923 // Reset on EOF
924 r.err = nil
925 }
926
927 // Calculate absolute offset.
928 absOffset := offset
929
930 switch whence {
931 case io.SeekStart:
932 case io.SeekCurrent:
933 absOffset = r.blockStart + int64(r.i) + offset
934 case io.SeekEnd:
935 if r.index == nil {
936 return 0, ErrUnsupported
937 }
938 absOffset = r.index.TotalUncompressed + offset
939 default:
940 r.err = ErrUnsupported
941 return 0, r.err
942 }
943
944 if absOffset < 0 {
945 return 0, errors.New("seek before start of file")
946 }
947
948 if !r.readHeader {
949 // Make sure we read the header.
950 _, r.err = r.Read([]byte{})
951 if r.err != nil {
952 return 0, r.err
953 }
954 }
955
956 // If we are inside current block no need to seek.
957 // This includes no offset changes.
958 if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) {
959 r.i = int(absOffset - r.blockStart)
960 return r.blockStart + int64(r.i), nil
961 }
962
963 rs, ok := r.r.(io.ReadSeeker)
964 if r.index == nil || !ok {
965 currOffset := r.blockStart + int64(r.i)
966 if absOffset >= currOffset {
967 err := r.Skip(absOffset - currOffset)
968 return r.blockStart + int64(r.i), err
969 }
970 return 0, ErrUnsupported
971 }
972
973 // We can seek and we have an index.
974 c, u, err := r.index.Find(absOffset)
975 if err != nil {
976 return r.blockStart + int64(r.i), err
977 }
978
979 // Seek to next block
980 _, err = rs.Seek(c, io.SeekStart)
981 if err != nil {
982 return 0, err
983 }
984
985 r.i = r.j // Remove rest of current block.
986 r.blockStart = u - int64(r.j) // Adjust current block start for accounting.
987 if u < absOffset {
988 // Forward inside block
989 return absOffset, r.Skip(absOffset - u)
990 }
991 if u > absOffset {
992 return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset)
993 }
994 return absOffset, nil
995 }
996
997 // ReadAt reads len(p) bytes into p starting at offset off in the
998 // underlying input source. It returns the number of bytes
999 // read (0 <= n <= len(p)) and any error encountered.
1000 //
1001 // When ReadAt returns n < len(p), it returns a non-nil error
1002 // explaining why more bytes were not returned. In this respect,
1003 // ReadAt is stricter than Read.
1004 //
1005 // Even if ReadAt returns n < len(p), it may use all of p as scratch
1006 // space during the call. If some data is available but not len(p) bytes,
1007 // ReadAt blocks until either all the data is available or an error occurs.
1008 // In this respect ReadAt is different from Read.
1009 //
1010 // If the n = len(p) bytes returned by ReadAt are at the end of the
1011 // input source, ReadAt may return either err == EOF or err == nil.
1012 //
1013 // If ReadAt is reading from an input source with a seek offset,
1014 // ReadAt should not affect nor be affected by the underlying
1015 // seek offset.
1016 //
1017 // Clients of ReadAt can execute parallel ReadAt calls on the
1018 // same input source. This is however not recommended.
1019 func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) {
1020 r.readAtMu.Lock()
1021 defer r.readAtMu.Unlock()
1022 _, err := r.Seek(offset, io.SeekStart)
1023 if err != nil {
1024 return 0, err
1025 }
1026 n := 0
1027 for n < len(p) {
1028 n2, err := r.Read(p[n:])
1029 if err != nil {
1030 // This will include io.EOF
1031 return n + n2, err
1032 }
1033 n += n2
1034 }
1035 return n, nil
1036 }
1037
1038 // ReadByte satisfies the io.ByteReader interface.
1039 func (r *Reader) ReadByte() (byte, error) {
1040 if r.err != nil {
1041 return 0, r.err
1042 }
1043 if r.i < r.j {
1044 c := r.decoded[r.i]
1045 r.i++
1046 return c, nil
1047 }
1048 var tmp [1]byte
1049 for range 10 {
1050 n, err := r.Read(tmp[:])
1051 if err != nil {
1052 return 0, err
1053 }
1054 if n == 1 {
1055 return tmp[0], nil
1056 }
1057 }
1058 return 0, io.ErrNoProgress
1059 }
1060
1061 // SkippableCB will register a callback for chunks with the specified ID.
1062 // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
1063 // For each chunk with the ID, the callback is called with the content.
1064 // Any returned non-nil error will abort decompression.
1065 // Only one callback per ID is supported, latest sent will be used.
1066 // Sending a nil function will disable previous callbacks.
1067 // You can peek the stream, triggering the callback, by doing a Read with a 0
1068 // byte buffer.
1069 func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
1070 if id < 0x80 || id >= chunkTypePadding {
1071 return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
1072 }
1073 r.skippableCB[id-0x80] = fn
1074 return nil
1075 }
1076