reader.go raw

   1  // Copyright 2011 The Snappy-Go Authors. All rights reserved.
   2  // Copyright (c) 2019+ Klaus Post. All rights reserved.
   3  // Use of this source code is governed by a BSD-style
   4  // license that can be found in the LICENSE file.
   5  
   6  package s2
   7  
   8  import (
   9  	"errors"
  10  	"fmt"
  11  	"io"
  12  	"io/ioutil"
  13  	"math"
  14  	"runtime"
  15  	"sync"
  16  )
  17  
  18  // ErrCantSeek is returned if the stream cannot be seeked.
  19  type ErrCantSeek struct {
  20  	Reason string
  21  }
  22  
  23  // Error returns the error as string.
  24  func (e ErrCantSeek) Error() string {
  25  	return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
  26  }
  27  
  28  // NewReader returns a new Reader that decompresses from r, using the framing
  29  // format described at
  30  // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes.
  31  func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
  32  	nr := Reader{
  33  		r:        r,
  34  		maxBlock: maxBlockSize,
  35  	}
  36  	for _, opt := range opts {
  37  		if err := opt(&nr); err != nil {
  38  			nr.err = err
  39  			return &nr
  40  		}
  41  	}
  42  	nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize
  43  	if nr.lazyBuf > 0 {
  44  		nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize)
  45  	} else {
  46  		nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
  47  	}
  48  	nr.readHeader = nr.ignoreStreamID
  49  	nr.paramsOK = true
  50  	return &nr
  51  }
  52  
  53  // ReaderOption is an option for creating a decoder.
  54  type ReaderOption func(*Reader) error
  55  
  56  // ReaderMaxBlockSize allows to control allocations if the stream
  57  // has been compressed with a smaller WriterBlockSize, or with the default 1MB.
  58  // Blocks must be this size or smaller to decompress,
  59  // otherwise the decoder will return ErrUnsupported.
  60  //
  61  // For streams compressed with Snappy this can safely be set to 64KB (64 << 10).
  62  //
  63  // Default is the maximum limit of 4MB.
  64  func ReaderMaxBlockSize(blockSize int) ReaderOption {
  65  	return func(r *Reader) error {
  66  		if blockSize > maxBlockSize || blockSize <= 0 {
  67  			return errors.New("s2: block size too large. Must be <= 4MB and > 0")
  68  		}
  69  		if r.lazyBuf == 0 && blockSize < defaultBlockSize {
  70  			r.lazyBuf = blockSize
  71  		}
  72  		r.maxBlock = blockSize
  73  		return nil
  74  	}
  75  }
  76  
  77  // ReaderAllocBlock allows to control upfront stream allocations
  78  // and not allocate for frames bigger than this initially.
  79  // If frames bigger than this is seen a bigger buffer will be allocated.
  80  //
  81  // Default is 1MB, which is default output size.
  82  func ReaderAllocBlock(blockSize int) ReaderOption {
  83  	return func(r *Reader) error {
  84  		if blockSize > maxBlockSize || blockSize < 1024 {
  85  			return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024")
  86  		}
  87  		r.lazyBuf = blockSize
  88  		return nil
  89  	}
  90  }
  91  
  92  // ReaderIgnoreStreamIdentifier will make the reader skip the expected
  93  // stream identifier at the beginning of the stream.
  94  // This can be used when serving a stream that has been forwarded to a specific point.
  95  func ReaderIgnoreStreamIdentifier() ReaderOption {
  96  	return func(r *Reader) error {
  97  		r.ignoreStreamID = true
  98  		return nil
  99  	}
 100  }
 101  
 102  // ReaderSkippableCB will register a callback for chuncks with the specified ID.
 103  // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
 104  // For each chunk with the ID, the callback is called with the content.
 105  // Any returned non-nil error will abort decompression.
 106  // Only one callback per ID is supported, latest sent will be used.
 107  // You can peek the stream, triggering the callback, by doing a Read with a 0
 108  // byte buffer.
 109  func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
 110  	return func(r *Reader) error {
 111  		if id < 0x80 || id > 0xfd {
 112  			return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
 113  		}
 114  		r.skippableCB[id-0x80] = fn
 115  		return nil
 116  	}
 117  }
 118  
 119  // ReaderIgnoreCRC will make the reader skip CRC calculation and checks.
 120  func ReaderIgnoreCRC() ReaderOption {
 121  	return func(r *Reader) error {
 122  		r.ignoreCRC = true
 123  		return nil
 124  	}
 125  }
 126  
 127  // Reader is an io.Reader that can read Snappy-compressed bytes.
 128  type Reader struct {
 129  	r           io.Reader
 130  	err         error
 131  	decoded     []byte
 132  	buf         []byte
 133  	skippableCB [0xff - 0x80]func(r io.Reader) error
 134  	blockStart  int64 // Uncompressed offset at start of current.
 135  	index       *Index
 136  
 137  	// decoded[i:j] contains decoded bytes that have not yet been passed on.
 138  	i, j int
 139  	// maximum block size allowed.
 140  	maxBlock int
 141  	// maximum expected buffer size.
 142  	maxBufSize int
 143  	// alloc a buffer this size if > 0.
 144  	lazyBuf        int
 145  	readHeader     bool
 146  	paramsOK       bool
 147  	snappyFrame    bool
 148  	ignoreStreamID bool
 149  	ignoreCRC      bool
 150  }
 151  
 152  // GetBufferCapacity returns the capacity of the internal buffer.
 153  // This might be useful to know when reusing the same reader in combination
 154  // with the lazy buffer option.
 155  func (r *Reader) GetBufferCapacity() int {
 156  	return cap(r.buf)
 157  }
 158  
 159  // ensureBufferSize will ensure that the buffer can take at least n bytes.
 160  // If false is returned the buffer exceeds maximum allowed size.
 161  func (r *Reader) ensureBufferSize(n int) bool {
 162  	if n > r.maxBufSize {
 163  		r.err = ErrCorrupt
 164  		return false
 165  	}
 166  	if cap(r.buf) >= n {
 167  		return true
 168  	}
 169  	// Realloc buffer.
 170  	r.buf = make([]byte, n)
 171  	return true
 172  }
 173  
 174  // Reset discards any buffered data, resets all state, and switches the Snappy
 175  // reader to read from r. This permits reusing a Reader rather than allocating
 176  // a new one.
 177  func (r *Reader) Reset(reader io.Reader) {
 178  	if !r.paramsOK {
 179  		return
 180  	}
 181  	r.index = nil
 182  	r.r = reader
 183  	r.err = nil
 184  	r.i = 0
 185  	r.j = 0
 186  	r.blockStart = 0
 187  	r.readHeader = r.ignoreStreamID
 188  }
 189  
 190  func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
 191  	if _, r.err = io.ReadFull(r.r, p); r.err != nil {
 192  		if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
 193  			r.err = ErrCorrupt
 194  		}
 195  		return false
 196  	}
 197  	return true
 198  }
 199  
 200  // skippable will skip n bytes.
 201  // If the supplied reader supports seeking that is used.
 202  // tmp is used as a temporary buffer for reading.
 203  // The supplied slice does not need to be the size of the read.
 204  func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
 205  	if id < 0x80 {
 206  		r.err = fmt.Errorf("internal error: skippable id < 0x80")
 207  		return false
 208  	}
 209  	if fn := r.skippableCB[id-0x80]; fn != nil {
 210  		rd := io.LimitReader(r.r, int64(n))
 211  		r.err = fn(rd)
 212  		if r.err != nil {
 213  			return false
 214  		}
 215  		_, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
 216  		return r.err == nil
 217  	}
 218  	if rs, ok := r.r.(io.ReadSeeker); ok {
 219  		_, err := rs.Seek(int64(n), io.SeekCurrent)
 220  		if err == nil {
 221  			return true
 222  		}
 223  		if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
 224  			r.err = ErrCorrupt
 225  			return false
 226  		}
 227  	}
 228  	for n > 0 {
 229  		if n < len(tmp) {
 230  			tmp = tmp[:n]
 231  		}
 232  		if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
 233  			if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
 234  				r.err = ErrCorrupt
 235  			}
 236  			return false
 237  		}
 238  		n -= len(tmp)
 239  	}
 240  	return true
 241  }
 242  
 243  // Read satisfies the io.Reader interface.
 244  func (r *Reader) Read(p []byte) (int, error) {
 245  	if r.err != nil {
 246  		return 0, r.err
 247  	}
 248  	for {
 249  		if r.i < r.j {
 250  			n := copy(p, r.decoded[r.i:r.j])
 251  			r.i += n
 252  			return n, nil
 253  		}
 254  		if !r.readFull(r.buf[:4], true) {
 255  			return 0, r.err
 256  		}
 257  		chunkType := r.buf[0]
 258  		if !r.readHeader {
 259  			if chunkType != chunkTypeStreamIdentifier {
 260  				r.err = ErrCorrupt
 261  				return 0, r.err
 262  			}
 263  			r.readHeader = true
 264  		}
 265  		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
 266  
 267  		// The chunk types are specified at
 268  		// https://github.com/google/snappy/blob/master/framing_format.txt
 269  		switch chunkType {
 270  		case chunkTypeCompressedData:
 271  			r.blockStart += int64(r.j)
 272  			// Section 4.2. Compressed data (chunk type 0x00).
 273  			if chunkLen < checksumSize {
 274  				r.err = ErrCorrupt
 275  				return 0, r.err
 276  			}
 277  			if !r.ensureBufferSize(chunkLen) {
 278  				if r.err == nil {
 279  					r.err = ErrUnsupported
 280  				}
 281  				return 0, r.err
 282  			}
 283  			buf := r.buf[:chunkLen]
 284  			if !r.readFull(buf, false) {
 285  				return 0, r.err
 286  			}
 287  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
 288  			buf = buf[checksumSize:]
 289  
 290  			n, err := DecodedLen(buf)
 291  			if err != nil {
 292  				r.err = err
 293  				return 0, r.err
 294  			}
 295  			if r.snappyFrame && n > maxSnappyBlockSize {
 296  				r.err = ErrCorrupt
 297  				return 0, r.err
 298  			}
 299  
 300  			if n > len(r.decoded) {
 301  				if n > r.maxBlock {
 302  					r.err = ErrCorrupt
 303  					return 0, r.err
 304  				}
 305  				r.decoded = make([]byte, n)
 306  			}
 307  			if _, err := Decode(r.decoded, buf); err != nil {
 308  				r.err = err
 309  				return 0, r.err
 310  			}
 311  			if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
 312  				r.err = ErrCRC
 313  				return 0, r.err
 314  			}
 315  			r.i, r.j = 0, n
 316  			continue
 317  
 318  		case chunkTypeUncompressedData:
 319  			r.blockStart += int64(r.j)
 320  			// Section 4.3. Uncompressed data (chunk type 0x01).
 321  			if chunkLen < checksumSize {
 322  				r.err = ErrCorrupt
 323  				return 0, r.err
 324  			}
 325  			if !r.ensureBufferSize(chunkLen) {
 326  				if r.err == nil {
 327  					r.err = ErrUnsupported
 328  				}
 329  				return 0, r.err
 330  			}
 331  			buf := r.buf[:checksumSize]
 332  			if !r.readFull(buf, false) {
 333  				return 0, r.err
 334  			}
 335  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
 336  			// Read directly into r.decoded instead of via r.buf.
 337  			n := chunkLen - checksumSize
 338  			if r.snappyFrame && n > maxSnappyBlockSize {
 339  				r.err = ErrCorrupt
 340  				return 0, r.err
 341  			}
 342  			if n > len(r.decoded) {
 343  				if n > r.maxBlock {
 344  					r.err = ErrCorrupt
 345  					return 0, r.err
 346  				}
 347  				r.decoded = make([]byte, n)
 348  			}
 349  			if !r.readFull(r.decoded[:n], false) {
 350  				return 0, r.err
 351  			}
 352  			if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
 353  				r.err = ErrCRC
 354  				return 0, r.err
 355  			}
 356  			r.i, r.j = 0, n
 357  			continue
 358  
 359  		case chunkTypeStreamIdentifier:
 360  			// Section 4.1. Stream identifier (chunk type 0xff).
 361  			if chunkLen != len(magicBody) {
 362  				r.err = ErrCorrupt
 363  				return 0, r.err
 364  			}
 365  			if !r.readFull(r.buf[:len(magicBody)], false) {
 366  				return 0, r.err
 367  			}
 368  			if string(r.buf[:len(magicBody)]) != magicBody {
 369  				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
 370  					r.err = ErrCorrupt
 371  					return 0, r.err
 372  				} else {
 373  					r.snappyFrame = true
 374  				}
 375  			} else {
 376  				r.snappyFrame = false
 377  			}
 378  			continue
 379  		}
 380  
 381  		if chunkType <= 0x7f {
 382  			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
 383  			// fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
 384  			r.err = ErrUnsupported
 385  			return 0, r.err
 386  		}
 387  		// Section 4.4 Padding (chunk type 0xfe).
 388  		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
 389  		if chunkLen > maxChunkSize {
 390  			// fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
 391  			r.err = ErrUnsupported
 392  			return 0, r.err
 393  		}
 394  
 395  		// fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
 396  		if !r.skippable(r.buf, chunkLen, false, chunkType) {
 397  			return 0, r.err
 398  		}
 399  	}
 400  }
 401  
 402  // DecodeConcurrent will decode the full stream to w.
 403  // This function should not be combined with reading, seeking or other operations.
 404  // Up to 'concurrent' goroutines will be used.
 405  // If <= 0, runtime.NumCPU will be used.
 406  // On success the number of bytes decompressed nil and is returned.
 407  // This is mainly intended for bigger streams.
 408  func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
 409  	if r.i > 0 || r.j > 0 || r.blockStart > 0 {
 410  		return 0, errors.New("DecodeConcurrent called after ")
 411  	}
 412  	if concurrent <= 0 {
 413  		concurrent = runtime.NumCPU()
 414  	}
 415  
 416  	// Write to output
 417  	var errMu sync.Mutex
 418  	var aErr error
 419  	setErr := func(e error) (ok bool) {
 420  		errMu.Lock()
 421  		defer errMu.Unlock()
 422  		if e == nil {
 423  			return aErr == nil
 424  		}
 425  		if aErr == nil {
 426  			aErr = e
 427  		}
 428  		return false
 429  	}
 430  	hasErr := func() (ok bool) {
 431  		errMu.Lock()
 432  		v := aErr != nil
 433  		errMu.Unlock()
 434  		return v
 435  	}
 436  
 437  	var aWritten int64
 438  	toRead := make(chan []byte, concurrent)
 439  	writtenBlocks := make(chan []byte, concurrent)
 440  	queue := make(chan chan []byte, concurrent)
 441  	reUse := make(chan chan []byte, concurrent)
 442  	for i := 0; i < concurrent; i++ {
 443  		toRead <- make([]byte, 0, r.maxBufSize)
 444  		writtenBlocks <- make([]byte, 0, r.maxBufSize)
 445  		reUse <- make(chan []byte, 1)
 446  	}
 447  	// Writer
 448  	var wg sync.WaitGroup
 449  	wg.Add(1)
 450  	go func() {
 451  		defer wg.Done()
 452  		for toWrite := range queue {
 453  			entry := <-toWrite
 454  			reUse <- toWrite
 455  			if hasErr() || entry == nil {
 456  				if entry != nil {
 457  					writtenBlocks <- entry
 458  				}
 459  				continue
 460  			}
 461  			if hasErr() {
 462  				writtenBlocks <- entry
 463  				continue
 464  			}
 465  			n, err := w.Write(entry)
 466  			want := len(entry)
 467  			writtenBlocks <- entry
 468  			if err != nil {
 469  				setErr(err)
 470  				continue
 471  			}
 472  			if n != want {
 473  				setErr(io.ErrShortWrite)
 474  				continue
 475  			}
 476  			aWritten += int64(n)
 477  		}
 478  	}()
 479  
 480  	defer func() {
 481  		if r.err != nil {
 482  			setErr(r.err)
 483  		} else if err != nil {
 484  			setErr(err)
 485  		}
 486  		close(queue)
 487  		wg.Wait()
 488  		if err == nil {
 489  			err = aErr
 490  		}
 491  		written = aWritten
 492  	}()
 493  
 494  	// Reader
 495  	for !hasErr() {
 496  		if !r.readFull(r.buf[:4], true) {
 497  			if r.err == io.EOF {
 498  				r.err = nil
 499  			}
 500  			return 0, r.err
 501  		}
 502  		chunkType := r.buf[0]
 503  		if !r.readHeader {
 504  			if chunkType != chunkTypeStreamIdentifier {
 505  				r.err = ErrCorrupt
 506  				return 0, r.err
 507  			}
 508  			r.readHeader = true
 509  		}
 510  		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
 511  
 512  		// The chunk types are specified at
 513  		// https://github.com/google/snappy/blob/master/framing_format.txt
 514  		switch chunkType {
 515  		case chunkTypeCompressedData:
 516  			r.blockStart += int64(r.j)
 517  			// Section 4.2. Compressed data (chunk type 0x00).
 518  			if chunkLen < checksumSize {
 519  				r.err = ErrCorrupt
 520  				return 0, r.err
 521  			}
 522  			if chunkLen > r.maxBufSize {
 523  				r.err = ErrCorrupt
 524  				return 0, r.err
 525  			}
 526  			orgBuf := <-toRead
 527  			buf := orgBuf[:chunkLen]
 528  
 529  			if !r.readFull(buf, false) {
 530  				return 0, r.err
 531  			}
 532  
 533  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
 534  			buf = buf[checksumSize:]
 535  
 536  			n, err := DecodedLen(buf)
 537  			if err != nil {
 538  				r.err = err
 539  				return 0, r.err
 540  			}
 541  			if r.snappyFrame && n > maxSnappyBlockSize {
 542  				r.err = ErrCorrupt
 543  				return 0, r.err
 544  			}
 545  
 546  			if n > r.maxBlock {
 547  				r.err = ErrCorrupt
 548  				return 0, r.err
 549  			}
 550  			wg.Add(1)
 551  
 552  			decoded := <-writtenBlocks
 553  			entry := <-reUse
 554  			queue <- entry
 555  			go func() {
 556  				defer wg.Done()
 557  				decoded = decoded[:n]
 558  				_, err := Decode(decoded, buf)
 559  				toRead <- orgBuf
 560  				if err != nil {
 561  					writtenBlocks <- decoded
 562  					setErr(err)
 563  					entry <- nil
 564  					return
 565  				}
 566  				if !r.ignoreCRC && crc(decoded) != checksum {
 567  					writtenBlocks <- decoded
 568  					setErr(ErrCRC)
 569  					entry <- nil
 570  					return
 571  				}
 572  				entry <- decoded
 573  			}()
 574  			continue
 575  
 576  		case chunkTypeUncompressedData:
 577  
 578  			// Section 4.3. Uncompressed data (chunk type 0x01).
 579  			if chunkLen < checksumSize {
 580  				r.err = ErrCorrupt
 581  				return 0, r.err
 582  			}
 583  			if chunkLen > r.maxBufSize {
 584  				r.err = ErrCorrupt
 585  				return 0, r.err
 586  			}
 587  			// Grab write buffer
 588  			orgBuf := <-writtenBlocks
 589  			buf := orgBuf[:checksumSize]
 590  			if !r.readFull(buf, false) {
 591  				return 0, r.err
 592  			}
 593  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
 594  			// Read content.
 595  			n := chunkLen - checksumSize
 596  
 597  			if r.snappyFrame && n > maxSnappyBlockSize {
 598  				r.err = ErrCorrupt
 599  				return 0, r.err
 600  			}
 601  			if n > r.maxBlock {
 602  				r.err = ErrCorrupt
 603  				return 0, r.err
 604  			}
 605  			// Read uncompressed
 606  			buf = orgBuf[:n]
 607  			if !r.readFull(buf, false) {
 608  				return 0, r.err
 609  			}
 610  
 611  			if !r.ignoreCRC && crc(buf) != checksum {
 612  				r.err = ErrCRC
 613  				return 0, r.err
 614  			}
 615  			entry := <-reUse
 616  			queue <- entry
 617  			entry <- buf
 618  			continue
 619  
 620  		case chunkTypeStreamIdentifier:
 621  			// Section 4.1. Stream identifier (chunk type 0xff).
 622  			if chunkLen != len(magicBody) {
 623  				r.err = ErrCorrupt
 624  				return 0, r.err
 625  			}
 626  			if !r.readFull(r.buf[:len(magicBody)], false) {
 627  				return 0, r.err
 628  			}
 629  			if string(r.buf[:len(magicBody)]) != magicBody {
 630  				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
 631  					r.err = ErrCorrupt
 632  					return 0, r.err
 633  				} else {
 634  					r.snappyFrame = true
 635  				}
 636  			} else {
 637  				r.snappyFrame = false
 638  			}
 639  			continue
 640  		}
 641  
 642  		if chunkType <= 0x7f {
 643  			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
 644  			// fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
 645  			r.err = ErrUnsupported
 646  			return 0, r.err
 647  		}
 648  		// Section 4.4 Padding (chunk type 0xfe).
 649  		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
 650  		if chunkLen > maxChunkSize {
 651  			// fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
 652  			r.err = ErrUnsupported
 653  			return 0, r.err
 654  		}
 655  
 656  		// fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
 657  		if !r.skippable(r.buf, chunkLen, false, chunkType) {
 658  			return 0, r.err
 659  		}
 660  	}
 661  	return 0, r.err
 662  }
 663  
 664  // Skip will skip n bytes forward in the decompressed output.
 665  // For larger skips this consumes less CPU and is faster than reading output and discarding it.
 666  // CRC is not checked on skipped blocks.
 667  // io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped.
 668  // If a decoding error is encountered subsequent calls to Read will also fail.
 669  func (r *Reader) Skip(n int64) error {
 670  	if n < 0 {
 671  		return errors.New("attempted negative skip")
 672  	}
 673  	if r.err != nil {
 674  		return r.err
 675  	}
 676  
 677  	for n > 0 {
 678  		if r.i < r.j {
 679  			// Skip in buffer.
 680  			// decoded[i:j] contains decoded bytes that have not yet been passed on.
 681  			left := int64(r.j - r.i)
 682  			if left >= n {
 683  				tmp := int64(r.i) + n
 684  				if tmp > math.MaxInt32 {
 685  					return errors.New("s2: internal overflow in skip")
 686  				}
 687  				r.i = int(tmp)
 688  				return nil
 689  			}
 690  			n -= int64(r.j - r.i)
 691  			r.i = r.j
 692  		}
 693  
 694  		// Buffer empty; read blocks until we have content.
 695  		if !r.readFull(r.buf[:4], true) {
 696  			if r.err == io.EOF {
 697  				r.err = io.ErrUnexpectedEOF
 698  			}
 699  			return r.err
 700  		}
 701  		chunkType := r.buf[0]
 702  		if !r.readHeader {
 703  			if chunkType != chunkTypeStreamIdentifier {
 704  				r.err = ErrCorrupt
 705  				return r.err
 706  			}
 707  			r.readHeader = true
 708  		}
 709  		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
 710  
 711  		// The chunk types are specified at
 712  		// https://github.com/google/snappy/blob/master/framing_format.txt
 713  		switch chunkType {
 714  		case chunkTypeCompressedData:
 715  			r.blockStart += int64(r.j)
 716  			// Section 4.2. Compressed data (chunk type 0x00).
 717  			if chunkLen < checksumSize {
 718  				r.err = ErrCorrupt
 719  				return r.err
 720  			}
 721  			if !r.ensureBufferSize(chunkLen) {
 722  				if r.err == nil {
 723  					r.err = ErrUnsupported
 724  				}
 725  				return r.err
 726  			}
 727  			buf := r.buf[:chunkLen]
 728  			if !r.readFull(buf, false) {
 729  				return r.err
 730  			}
 731  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
 732  			buf = buf[checksumSize:]
 733  
 734  			dLen, err := DecodedLen(buf)
 735  			if err != nil {
 736  				r.err = err
 737  				return r.err
 738  			}
 739  			if dLen > r.maxBlock {
 740  				r.err = ErrCorrupt
 741  				return r.err
 742  			}
 743  			// Check if destination is within this block
 744  			if int64(dLen) > n {
 745  				if len(r.decoded) < dLen {
 746  					r.decoded = make([]byte, dLen)
 747  				}
 748  				if _, err := Decode(r.decoded, buf); err != nil {
 749  					r.err = err
 750  					return r.err
 751  				}
 752  				if crc(r.decoded[:dLen]) != checksum {
 753  					r.err = ErrCorrupt
 754  					return r.err
 755  				}
 756  			} else {
 757  				// Skip block completely
 758  				n -= int64(dLen)
 759  				r.blockStart += int64(dLen)
 760  				dLen = 0
 761  			}
 762  			r.i, r.j = 0, dLen
 763  			continue
 764  		case chunkTypeUncompressedData:
 765  			r.blockStart += int64(r.j)
 766  			// Section 4.3. Uncompressed data (chunk type 0x01).
 767  			if chunkLen < checksumSize {
 768  				r.err = ErrCorrupt
 769  				return r.err
 770  			}
 771  			if !r.ensureBufferSize(chunkLen) {
 772  				if r.err != nil {
 773  					r.err = ErrUnsupported
 774  				}
 775  				return r.err
 776  			}
 777  			buf := r.buf[:checksumSize]
 778  			if !r.readFull(buf, false) {
 779  				return r.err
 780  			}
 781  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
 782  			// Read directly into r.decoded instead of via r.buf.
 783  			n2 := chunkLen - checksumSize
 784  			if n2 > len(r.decoded) {
 785  				if n2 > r.maxBlock {
 786  					r.err = ErrCorrupt
 787  					return r.err
 788  				}
 789  				r.decoded = make([]byte, n2)
 790  			}
 791  			if !r.readFull(r.decoded[:n2], false) {
 792  				return r.err
 793  			}
 794  			if int64(n2) < n {
 795  				if crc(r.decoded[:n2]) != checksum {
 796  					r.err = ErrCorrupt
 797  					return r.err
 798  				}
 799  			}
 800  			r.i, r.j = 0, n2
 801  			continue
 802  		case chunkTypeStreamIdentifier:
 803  			// Section 4.1. Stream identifier (chunk type 0xff).
 804  			if chunkLen != len(magicBody) {
 805  				r.err = ErrCorrupt
 806  				return r.err
 807  			}
 808  			if !r.readFull(r.buf[:len(magicBody)], false) {
 809  				return r.err
 810  			}
 811  			if string(r.buf[:len(magicBody)]) != magicBody {
 812  				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
 813  					r.err = ErrCorrupt
 814  					return r.err
 815  				}
 816  			}
 817  
 818  			continue
 819  		}
 820  
 821  		if chunkType <= 0x7f {
 822  			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
 823  			r.err = ErrUnsupported
 824  			return r.err
 825  		}
 826  		if chunkLen > maxChunkSize {
 827  			r.err = ErrUnsupported
 828  			return r.err
 829  		}
 830  		// Section 4.4 Padding (chunk type 0xfe).
 831  		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
 832  		if !r.skippable(r.buf, chunkLen, false, chunkType) {
 833  			return r.err
 834  		}
 835  	}
 836  	return nil
 837  }
 838  
 839  // ReadSeeker provides random or forward seeking in compressed content.
 840  // See Reader.ReadSeeker
 841  type ReadSeeker struct {
 842  	*Reader
 843  	readAtMu sync.Mutex
 844  }
 845  
 846  // ReadSeeker will return an io.ReadSeeker and io.ReaderAt
 847  // compatible version of the reader.
 848  // If 'random' is specified the returned io.Seeker can be used for
 849  // random seeking, otherwise only forward seeking is supported.
 850  // Enabling random seeking requires the original input to support
 851  // the io.Seeker interface.
 852  // A custom index can be specified which will be used if supplied.
 853  // When using a custom index, it will not be read from the input stream.
 854  // The ReadAt position will affect regular reads and the current position of Seek.
 855  // So using Read after ReadAt will continue from where the ReadAt stopped.
 856  // No functions should be used concurrently.
 857  // The returned ReadSeeker contains a shallow reference to the existing Reader,
 858  // meaning changes performed to one is reflected in the other.
 859  func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
 860  	// Read index if provided.
 861  	if len(index) != 0 {
 862  		if r.index == nil {
 863  			r.index = &Index{}
 864  		}
 865  		if _, err := r.index.Load(index); err != nil {
 866  			return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
 867  		}
 868  	}
 869  
 870  	// Check if input is seekable
 871  	rs, ok := r.r.(io.ReadSeeker)
 872  	if !ok {
 873  		if !random {
 874  			return &ReadSeeker{Reader: r}, nil
 875  		}
 876  		return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
 877  	}
 878  
 879  	if r.index != nil {
 880  		// Seekable and index, ok...
 881  		return &ReadSeeker{Reader: r}, nil
 882  	}
 883  
 884  	// Load from stream.
 885  	r.index = &Index{}
 886  
 887  	// Read current position.
 888  	pos, err := rs.Seek(0, io.SeekCurrent)
 889  	if err != nil {
 890  		return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
 891  	}
 892  	err = r.index.LoadStream(rs)
 893  	if err != nil {
 894  		if err == ErrUnsupported {
 895  			// If we don't require random seeking, reset input and return.
 896  			if !random {
 897  				_, err = rs.Seek(pos, io.SeekStart)
 898  				if err != nil {
 899  					return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()}
 900  				}
 901  				r.index = nil
 902  				return &ReadSeeker{Reader: r}, nil
 903  			}
 904  			return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
 905  		}
 906  		return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
 907  	}
 908  
 909  	// reset position.
 910  	_, err = rs.Seek(pos, io.SeekStart)
 911  	if err != nil {
 912  		return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
 913  	}
 914  	return &ReadSeeker{Reader: r}, nil
 915  }
 916  
 917  // Seek allows seeking in compressed data.
 918  func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
 919  	if r.err != nil {
 920  		if !errors.Is(r.err, io.EOF) {
 921  			return 0, r.err
 922  		}
 923  		// Reset on EOF
 924  		r.err = nil
 925  	}
 926  
 927  	// Calculate absolute offset.
 928  	absOffset := offset
 929  
 930  	switch whence {
 931  	case io.SeekStart:
 932  	case io.SeekCurrent:
 933  		absOffset = r.blockStart + int64(r.i) + offset
 934  	case io.SeekEnd:
 935  		if r.index == nil {
 936  			return 0, ErrUnsupported
 937  		}
 938  		absOffset = r.index.TotalUncompressed + offset
 939  	default:
 940  		r.err = ErrUnsupported
 941  		return 0, r.err
 942  	}
 943  
 944  	if absOffset < 0 {
 945  		return 0, errors.New("seek before start of file")
 946  	}
 947  
 948  	if !r.readHeader {
 949  		// Make sure we read the header.
 950  		_, r.err = r.Read([]byte{})
 951  		if r.err != nil {
 952  			return 0, r.err
 953  		}
 954  	}
 955  
 956  	// If we are inside current block no need to seek.
 957  	// This includes no offset changes.
 958  	if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) {
 959  		r.i = int(absOffset - r.blockStart)
 960  		return r.blockStart + int64(r.i), nil
 961  	}
 962  
 963  	rs, ok := r.r.(io.ReadSeeker)
 964  	if r.index == nil || !ok {
 965  		currOffset := r.blockStart + int64(r.i)
 966  		if absOffset >= currOffset {
 967  			err := r.Skip(absOffset - currOffset)
 968  			return r.blockStart + int64(r.i), err
 969  		}
 970  		return 0, ErrUnsupported
 971  	}
 972  
 973  	// We can seek and we have an index.
 974  	c, u, err := r.index.Find(absOffset)
 975  	if err != nil {
 976  		return r.blockStart + int64(r.i), err
 977  	}
 978  
 979  	// Seek to next block
 980  	_, err = rs.Seek(c, io.SeekStart)
 981  	if err != nil {
 982  		return 0, err
 983  	}
 984  
 985  	r.i = r.j                     // Remove rest of current block.
 986  	r.blockStart = u - int64(r.j) // Adjust current block start for accounting.
 987  	if u < absOffset {
 988  		// Forward inside block
 989  		return absOffset, r.Skip(absOffset - u)
 990  	}
 991  	if u > absOffset {
 992  		return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset)
 993  	}
 994  	return absOffset, nil
 995  }
 996  
 997  // ReadAt reads len(p) bytes into p starting at offset off in the
 998  // underlying input source. It returns the number of bytes
 999  // read (0 <= n <= len(p)) and any error encountered.
1000  //
1001  // When ReadAt returns n < len(p), it returns a non-nil error
1002  // explaining why more bytes were not returned. In this respect,
1003  // ReadAt is stricter than Read.
1004  //
1005  // Even if ReadAt returns n < len(p), it may use all of p as scratch
1006  // space during the call. If some data is available but not len(p) bytes,
1007  // ReadAt blocks until either all the data is available or an error occurs.
1008  // In this respect ReadAt is different from Read.
1009  //
1010  // If the n = len(p) bytes returned by ReadAt are at the end of the
1011  // input source, ReadAt may return either err == EOF or err == nil.
1012  //
1013  // If ReadAt is reading from an input source with a seek offset,
1014  // ReadAt should not affect nor be affected by the underlying
1015  // seek offset.
1016  //
1017  // Clients of ReadAt can execute parallel ReadAt calls on the
1018  // same input source. This is however not recommended.
1019  func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) {
1020  	r.readAtMu.Lock()
1021  	defer r.readAtMu.Unlock()
1022  	_, err := r.Seek(offset, io.SeekStart)
1023  	if err != nil {
1024  		return 0, err
1025  	}
1026  	n := 0
1027  	for n < len(p) {
1028  		n2, err := r.Read(p[n:])
1029  		if err != nil {
1030  			// This will include io.EOF
1031  			return n + n2, err
1032  		}
1033  		n += n2
1034  	}
1035  	return n, nil
1036  }
1037  
1038  // ReadByte satisfies the io.ByteReader interface.
1039  func (r *Reader) ReadByte() (byte, error) {
1040  	if r.err != nil {
1041  		return 0, r.err
1042  	}
1043  	if r.i < r.j {
1044  		c := r.decoded[r.i]
1045  		r.i++
1046  		return c, nil
1047  	}
1048  	var tmp [1]byte
1049  	for range 10 {
1050  		n, err := r.Read(tmp[:])
1051  		if err != nil {
1052  			return 0, err
1053  		}
1054  		if n == 1 {
1055  			return tmp[0], nil
1056  		}
1057  	}
1058  	return 0, io.ErrNoProgress
1059  }
1060  
1061  // SkippableCB will register a callback for chunks with the specified ID.
1062  // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
1063  // For each chunk with the ID, the callback is called with the content.
1064  // Any returned non-nil error will abort decompression.
1065  // Only one callback per ID is supported, latest sent will be used.
1066  // Sending a nil function will disable previous callbacks.
1067  // You can peek the stream, triggering the callback, by doing a Read with a 0
1068  // byte buffer.
1069  func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
1070  	if id < 0x80 || id >= chunkTypePadding {
1071  		return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
1072  	}
1073  	r.skippableCB[id-0x80] = fn
1074  	return nil
1075  }
1076