decoder.go raw

   1  // Copyright 2019+ Klaus Post. All rights reserved.
   2  // License information can be found in the LICENSE file.
   3  // Based on work by Yann Collet, released under BSD License.
   4  
   5  package zstd
   6  
   7  import (
   8  	"context"
   9  	"encoding/binary"
  10  	"io"
  11  	"sync"
  12  
  13  	"github.com/klauspost/compress/zstd/internal/xxhash"
  14  )
  15  
  16  // Decoder provides decoding of zstandard streams.
  17  // The decoder has been designed to operate without allocations after a warmup.
  18  // This means that you should store the decoder for best performance.
  19  // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  20  // A decoder can safely be re-used even if the previous stream failed.
  21  // To release the resources, you must call the Close() function on a decoder.
  22  type Decoder struct {
  23  	o decoderOptions
  24  
  25  	// Unreferenced decoders, ready for use.
  26  	decoders chan *blockDec
  27  
  28  	// Current read position used for Reader functionality.
  29  	current decoderState
  30  
  31  	// sync stream decoding
  32  	syncStream struct {
  33  		decodedFrame uint64
  34  		br           readerWrapper
  35  		enabled      bool
  36  		inFrame      bool
  37  		dstBuf       []byte
  38  	}
  39  
  40  	frame *frameDec
  41  
  42  	// Custom dictionaries.
  43  	dicts map[uint32]*dict
  44  
  45  	// streamWg is the waitgroup for all streams
  46  	streamWg sync.WaitGroup
  47  }
  48  
  49  // decoderState is used for maintaining state when the decoder
  50  // is used for streaming.
  51  type decoderState struct {
  52  	// current block being written to stream.
  53  	decodeOutput
  54  
  55  	// output in order to be written to stream.
  56  	output chan decodeOutput
  57  
  58  	// cancel remaining output.
  59  	cancel context.CancelFunc
  60  
  61  	// crc of current frame
  62  	crc *xxhash.Digest
  63  
  64  	flushed bool
  65  }
  66  
  67  var (
  68  	// Check the interfaces we want to support.
  69  	_ = io.WriterTo(&Decoder{})
  70  	_ = io.Reader(&Decoder{})
  71  )
  72  
  73  // NewReader creates a new decoder.
  74  // A nil Reader can be provided in which case Reset can be used to start a decode.
  75  //
  76  // A Decoder can be used in two modes:
  77  //
  78  // 1) As a stream, or
  79  // 2) For stateless decoding using DecodeAll.
  80  //
  81  // Only a single stream can be decoded concurrently, but the same decoder
  82  // can run multiple concurrent stateless decodes. It is even possible to
  83  // use stateless decodes while a stream is being decoded.
  84  //
  85  // The Reset function can be used to initiate a new stream, which will considerably
  86  // reduce the allocations normally caused by NewReader.
  87  func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  88  	initPredefined()
  89  	var d Decoder
  90  	d.o.setDefault()
  91  	for _, o := range opts {
  92  		err := o(&d.o)
  93  		if err != nil {
  94  			return nil, err
  95  		}
  96  	}
  97  	d.current.crc = xxhash.New()
  98  	d.current.flushed = true
  99  
 100  	if r == nil {
 101  		d.current.err = ErrDecoderNilInput
 102  	}
 103  
 104  	// Transfer option dicts.
 105  	d.dicts = make(map[uint32]*dict, len(d.o.dicts))
 106  	for _, dc := range d.o.dicts {
 107  		d.dicts[dc.id] = dc
 108  	}
 109  	d.o.dicts = nil
 110  
 111  	// Create decoders
 112  	d.decoders = make(chan *blockDec, d.o.concurrent)
 113  	for i := 0; i < d.o.concurrent; i++ {
 114  		dec := newBlockDec(d.o.lowMem)
 115  		dec.localFrame = newFrameDec(d.o)
 116  		d.decoders <- dec
 117  	}
 118  
 119  	if r == nil {
 120  		return &d, nil
 121  	}
 122  	return &d, d.Reset(r)
 123  }
 124  
 125  // Read bytes from the decompressed stream into p.
 126  // Returns the number of bytes read and any error that occurred.
 127  // When the stream is done, io.EOF will be returned.
 128  func (d *Decoder) Read(p []byte) (int, error) {
 129  	var n int
 130  	for {
 131  		if len(d.current.b) > 0 {
 132  			filled := copy(p, d.current.b)
 133  			p = p[filled:]
 134  			d.current.b = d.current.b[filled:]
 135  			n += filled
 136  		}
 137  		if len(p) == 0 {
 138  			break
 139  		}
 140  		if len(d.current.b) == 0 {
 141  			// We have an error and no more data
 142  			if d.current.err != nil {
 143  				break
 144  			}
 145  			if !d.nextBlock(n == 0) {
 146  				return n, d.current.err
 147  			}
 148  		}
 149  	}
 150  	if len(d.current.b) > 0 {
 151  		if debugDecoder {
 152  			println("returning", n, "still bytes left:", len(d.current.b))
 153  		}
 154  		// Only return error at end of block
 155  		return n, nil
 156  	}
 157  	if d.current.err != nil {
 158  		d.drainOutput()
 159  	}
 160  	if debugDecoder {
 161  		println("returning", n, d.current.err, len(d.decoders))
 162  	}
 163  	return n, d.current.err
 164  }
 165  
 166  // Reset will reset the decoder the supplied stream after the current has finished processing.
 167  // Note that this functionality cannot be used after Close has been called.
 168  // Reset can be called with a nil reader to release references to the previous reader.
 169  // After being called with a nil reader, no other operations than Reset or DecodeAll or Close
 170  // should be used.
 171  func (d *Decoder) Reset(r io.Reader) error {
 172  	if d.current.err == ErrDecoderClosed {
 173  		return d.current.err
 174  	}
 175  
 176  	d.drainOutput()
 177  
 178  	d.syncStream.br.r = nil
 179  	if r == nil {
 180  		d.current.err = ErrDecoderNilInput
 181  		if len(d.current.b) > 0 {
 182  			d.current.b = d.current.b[:0]
 183  		}
 184  		d.current.flushed = true
 185  		return nil
 186  	}
 187  
 188  	// If bytes buffer and < 5MB, do sync decoding anyway.
 189  	if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap {
 190  		bb2 := bb
 191  		if debugDecoder {
 192  			println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
 193  		}
 194  		b := bb2.Bytes()
 195  		var dst []byte
 196  		if cap(d.syncStream.dstBuf) > 0 {
 197  			dst = d.syncStream.dstBuf[:0]
 198  		}
 199  
 200  		dst, err := d.DecodeAll(b, dst)
 201  		if err == nil {
 202  			err = io.EOF
 203  		}
 204  		// Save output buffer
 205  		d.syncStream.dstBuf = dst
 206  		d.current.b = dst
 207  		d.current.err = err
 208  		d.current.flushed = true
 209  		if debugDecoder {
 210  			println("sync decode to", len(dst), "bytes, err:", err)
 211  		}
 212  		return nil
 213  	}
 214  	// Remove current block.
 215  	d.stashDecoder()
 216  	d.current.decodeOutput = decodeOutput{}
 217  	d.current.err = nil
 218  	d.current.flushed = false
 219  	d.current.d = nil
 220  	d.syncStream.dstBuf = nil
 221  
 222  	// Ensure no-one else is still running...
 223  	d.streamWg.Wait()
 224  	if d.frame == nil {
 225  		d.frame = newFrameDec(d.o)
 226  	}
 227  
 228  	if d.o.concurrent == 1 {
 229  		return d.startSyncDecoder(r)
 230  	}
 231  
 232  	d.current.output = make(chan decodeOutput, d.o.concurrent)
 233  	ctx, cancel := context.WithCancel(context.Background())
 234  	d.current.cancel = cancel
 235  	d.streamWg.Add(1)
 236  	go d.startStreamDecoder(ctx, r, d.current.output)
 237  
 238  	return nil
 239  }
 240  
 241  // drainOutput will drain the output until errEndOfStream is sent.
 242  func (d *Decoder) drainOutput() {
 243  	if d.current.cancel != nil {
 244  		if debugDecoder {
 245  			println("cancelling current")
 246  		}
 247  		d.current.cancel()
 248  		d.current.cancel = nil
 249  	}
 250  	if d.current.d != nil {
 251  		if debugDecoder {
 252  			printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
 253  		}
 254  		d.decoders <- d.current.d
 255  		d.current.d = nil
 256  		d.current.b = nil
 257  	}
 258  	if d.current.output == nil || d.current.flushed {
 259  		println("current already flushed")
 260  		return
 261  	}
 262  	for v := range d.current.output {
 263  		if v.d != nil {
 264  			if debugDecoder {
 265  				printf("re-adding decoder %p", v.d)
 266  			}
 267  			d.decoders <- v.d
 268  		}
 269  	}
 270  	d.current.output = nil
 271  	d.current.flushed = true
 272  }
 273  
 274  // WriteTo writes data to w until there's no more data to write or when an error occurs.
 275  // The return value n is the number of bytes written.
 276  // Any error encountered during the write is also returned.
 277  func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
 278  	var n int64
 279  	for {
 280  		if len(d.current.b) > 0 {
 281  			n2, err2 := w.Write(d.current.b)
 282  			n += int64(n2)
 283  			if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
 284  				d.current.err = err2
 285  			} else if n2 != len(d.current.b) {
 286  				d.current.err = io.ErrShortWrite
 287  			}
 288  		}
 289  		if d.current.err != nil {
 290  			break
 291  		}
 292  		d.nextBlock(true)
 293  	}
 294  	err := d.current.err
 295  	if err != nil {
 296  		d.drainOutput()
 297  	}
 298  	if err == io.EOF {
 299  		err = nil
 300  	}
 301  	return n, err
 302  }
 303  
 304  // DecodeAll allows stateless decoding of a blob of bytes.
 305  // Output will be appended to dst, so if the destination size is known
 306  // you can pre-allocate the destination slice to avoid allocations.
 307  // DecodeAll can be used concurrently.
 308  // The Decoder concurrency limits will be respected.
 309  func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
 310  	if d.decoders == nil {
 311  		return dst, ErrDecoderClosed
 312  	}
 313  
 314  	// Grab a block decoder and frame decoder.
 315  	block := <-d.decoders
 316  	frame := block.localFrame
 317  	initialSize := len(dst)
 318  	defer func() {
 319  		if debugDecoder {
 320  			printf("re-adding decoder: %p", block)
 321  		}
 322  		frame.rawInput = nil
 323  		frame.bBuf = nil
 324  		if frame.history.decoders.br != nil {
 325  			frame.history.decoders.br.in = nil
 326  			frame.history.decoders.br.cursor = 0
 327  		}
 328  		d.decoders <- block
 329  	}()
 330  	frame.bBuf = input
 331  
 332  	for {
 333  		frame.history.reset()
 334  		err := frame.reset(&frame.bBuf)
 335  		if err != nil {
 336  			if err == io.EOF {
 337  				if debugDecoder {
 338  					println("frame reset return EOF")
 339  				}
 340  				return dst, nil
 341  			}
 342  			return dst, err
 343  		}
 344  		if err = d.setDict(frame); err != nil {
 345  			return nil, err
 346  		}
 347  		if frame.WindowSize > d.o.maxWindowSize {
 348  			if debugDecoder {
 349  				println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
 350  			}
 351  			return dst, ErrWindowSizeExceeded
 352  		}
 353  		if frame.FrameContentSize != fcsUnknown {
 354  			if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
 355  				if debugDecoder {
 356  					println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
 357  				}
 358  				return dst, ErrDecoderSizeExceeded
 359  			}
 360  			if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
 361  				if debugDecoder {
 362  					println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
 363  				}
 364  				return dst, ErrDecoderSizeExceeded
 365  			}
 366  			if cap(dst)-len(dst) < int(frame.FrameContentSize) {
 367  				dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
 368  				copy(dst2, dst)
 369  				dst = dst2
 370  			}
 371  		}
 372  
 373  		if cap(dst) == 0 && !d.o.limitToCap {
 374  			// Allocate len(input) * 2 by default if nothing is provided
 375  			// and we didn't get frame content size.
 376  			size := min(
 377  				// Cap to 1 MB.
 378  				len(input)*2, 1<<20)
 379  			if uint64(size) > d.o.maxDecodedSize {
 380  				size = int(d.o.maxDecodedSize)
 381  			}
 382  			dst = make([]byte, 0, size)
 383  		}
 384  
 385  		dst, err = frame.runDecoder(dst, block)
 386  		if err != nil {
 387  			return dst, err
 388  		}
 389  		if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
 390  			return dst, ErrDecoderSizeExceeded
 391  		}
 392  		if len(frame.bBuf) == 0 {
 393  			if debugDecoder {
 394  				println("frame dbuf empty")
 395  			}
 396  			break
 397  		}
 398  	}
 399  	return dst, nil
 400  }
 401  
 402  // nextBlock returns the next block.
 403  // If an error occurs d.err will be set.
 404  // Optionally the function can block for new output.
 405  // If non-blocking mode is used the returned boolean will be false
 406  // if no data was available without blocking.
 407  func (d *Decoder) nextBlock(blocking bool) (ok bool) {
 408  	if d.current.err != nil {
 409  		// Keep error state.
 410  		return false
 411  	}
 412  	d.current.b = d.current.b[:0]
 413  
 414  	// SYNC:
 415  	if d.syncStream.enabled {
 416  		if !blocking {
 417  			return false
 418  		}
 419  		ok = d.nextBlockSync()
 420  		if !ok {
 421  			d.stashDecoder()
 422  		}
 423  		return ok
 424  	}
 425  
 426  	//ASYNC:
 427  	d.stashDecoder()
 428  	if blocking {
 429  		d.current.decodeOutput, ok = <-d.current.output
 430  	} else {
 431  		select {
 432  		case d.current.decodeOutput, ok = <-d.current.output:
 433  		default:
 434  			return false
 435  		}
 436  	}
 437  	if !ok {
 438  		// This should not happen, so signal error state...
 439  		d.current.err = io.ErrUnexpectedEOF
 440  		return false
 441  	}
 442  	next := d.current.decodeOutput
 443  	if next.d != nil && next.d.async.newHist != nil {
 444  		d.current.crc.Reset()
 445  	}
 446  	if debugDecoder {
 447  		var tmp [4]byte
 448  		binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
 449  		println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
 450  	}
 451  
 452  	if d.o.ignoreChecksum {
 453  		return true
 454  	}
 455  
 456  	if len(next.b) > 0 {
 457  		d.current.crc.Write(next.b)
 458  	}
 459  	if next.err == nil && next.d != nil && next.d.hasCRC {
 460  		got := uint32(d.current.crc.Sum64())
 461  		if got != next.d.checkCRC {
 462  			if debugDecoder {
 463  				printf("CRC Check Failed: %08x (got) != %08x (on stream)\n", got, next.d.checkCRC)
 464  			}
 465  			d.current.err = ErrCRCMismatch
 466  		} else {
 467  			if debugDecoder {
 468  				printf("CRC ok %08x\n", got)
 469  			}
 470  		}
 471  	}
 472  
 473  	return true
 474  }
 475  
 476  func (d *Decoder) nextBlockSync() (ok bool) {
 477  	if d.current.d == nil {
 478  		d.current.d = <-d.decoders
 479  	}
 480  	for len(d.current.b) == 0 {
 481  		if !d.syncStream.inFrame {
 482  			d.frame.history.reset()
 483  			d.current.err = d.frame.reset(&d.syncStream.br)
 484  			if d.current.err == nil {
 485  				d.current.err = d.setDict(d.frame)
 486  			}
 487  			if d.current.err != nil {
 488  				return false
 489  			}
 490  			if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
 491  				d.current.err = ErrDecoderSizeExceeded
 492  				return false
 493  			}
 494  
 495  			d.syncStream.decodedFrame = 0
 496  			d.syncStream.inFrame = true
 497  		}
 498  		d.current.err = d.frame.next(d.current.d)
 499  		if d.current.err != nil {
 500  			return false
 501  		}
 502  		d.frame.history.ensureBlock()
 503  		if debugDecoder {
 504  			println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
 505  		}
 506  		histBefore := len(d.frame.history.b)
 507  		d.current.err = d.current.d.decodeBuf(&d.frame.history)
 508  
 509  		if d.current.err != nil {
 510  			println("error after:", d.current.err)
 511  			return false
 512  		}
 513  		d.current.b = d.frame.history.b[histBefore:]
 514  		if debugDecoder {
 515  			println("history after:", len(d.frame.history.b))
 516  		}
 517  
 518  		// Check frame size (before CRC)
 519  		d.syncStream.decodedFrame += uint64(len(d.current.b))
 520  		if d.syncStream.decodedFrame > d.frame.FrameContentSize {
 521  			if debugDecoder {
 522  				printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
 523  			}
 524  			d.current.err = ErrFrameSizeExceeded
 525  			return false
 526  		}
 527  
 528  		// Check FCS
 529  		if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize {
 530  			if debugDecoder {
 531  				printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
 532  			}
 533  			d.current.err = ErrFrameSizeMismatch
 534  			return false
 535  		}
 536  
 537  		// Update/Check CRC
 538  		if d.frame.HasCheckSum {
 539  			if !d.o.ignoreChecksum {
 540  				d.frame.crc.Write(d.current.b)
 541  			}
 542  			if d.current.d.Last {
 543  				if !d.o.ignoreChecksum {
 544  					d.current.err = d.frame.checkCRC()
 545  				} else {
 546  					d.current.err = d.frame.consumeCRC()
 547  				}
 548  				if d.current.err != nil {
 549  					println("CRC error:", d.current.err)
 550  					return false
 551  				}
 552  			}
 553  		}
 554  		d.syncStream.inFrame = !d.current.d.Last
 555  	}
 556  	return true
 557  }
 558  
 559  func (d *Decoder) stashDecoder() {
 560  	if d.current.d != nil {
 561  		if debugDecoder {
 562  			printf("re-adding current decoder %p", d.current.d)
 563  		}
 564  		d.decoders <- d.current.d
 565  		d.current.d = nil
 566  	}
 567  }
 568  
 569  // Close will release all resources.
 570  // It is NOT possible to reuse the decoder after this.
 571  func (d *Decoder) Close() {
 572  	if d.current.err == ErrDecoderClosed {
 573  		return
 574  	}
 575  	d.drainOutput()
 576  	if d.current.cancel != nil {
 577  		d.current.cancel()
 578  		d.streamWg.Wait()
 579  		d.current.cancel = nil
 580  	}
 581  	if d.decoders != nil {
 582  		close(d.decoders)
 583  		for dec := range d.decoders {
 584  			dec.Close()
 585  		}
 586  		d.decoders = nil
 587  	}
 588  	if d.current.d != nil {
 589  		d.current.d.Close()
 590  		d.current.d = nil
 591  	}
 592  	d.current.err = ErrDecoderClosed
 593  }
 594  
 595  // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
 596  // Any changes to the decoder will be reflected, so the returned ReadCloser
 597  // can be reused along with the decoder.
 598  // io.WriterTo is also supported by the returned ReadCloser.
 599  func (d *Decoder) IOReadCloser() io.ReadCloser {
 600  	return closeWrapper{d: d}
 601  }
 602  
 603  // closeWrapper wraps a function call as a closer.
 604  type closeWrapper struct {
 605  	d *Decoder
 606  }
 607  
 608  // WriteTo forwards WriteTo calls to the decoder.
 609  func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
 610  	return c.d.WriteTo(w)
 611  }
 612  
 613  // Read forwards read calls to the decoder.
 614  func (c closeWrapper) Read(p []byte) (n int, err error) {
 615  	return c.d.Read(p)
 616  }
 617  
 618  // Close closes the decoder.
 619  func (c closeWrapper) Close() error {
 620  	c.d.Close()
 621  	return nil
 622  }
 623  
 624  type decodeOutput struct {
 625  	d   *blockDec
 626  	b   []byte
 627  	err error
 628  }
 629  
 630  func (d *Decoder) startSyncDecoder(r io.Reader) error {
 631  	d.frame.history.reset()
 632  	d.syncStream.br = readerWrapper{r: r}
 633  	d.syncStream.inFrame = false
 634  	d.syncStream.enabled = true
 635  	d.syncStream.decodedFrame = 0
 636  	return nil
 637  }
 638  
 639  // Create Decoder:
 640  // ASYNC:
 641  // Spawn 3 go routines.
 642  // 0: Read frames and decode block literals.
 643  // 1: Decode sequences.
 644  // 2: Execute sequences, send to output.
 645  func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
 646  	defer d.streamWg.Done()
 647  	br := readerWrapper{r: r}
 648  
 649  	var seqDecode = make(chan *blockDec, d.o.concurrent)
 650  	var seqExecute = make(chan *blockDec, d.o.concurrent)
 651  
 652  	// Async 1: Decode sequences...
 653  	go func() {
 654  		var hist history
 655  		var hasErr bool
 656  
 657  		for block := range seqDecode {
 658  			if hasErr {
 659  				if block != nil {
 660  					seqExecute <- block
 661  				}
 662  				continue
 663  			}
 664  			if block.async.newHist != nil {
 665  				if debugDecoder {
 666  					println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
 667  				}
 668  				hist.reset()
 669  				hist.decoders = block.async.newHist.decoders
 670  				hist.recentOffsets = block.async.newHist.recentOffsets
 671  				hist.windowSize = block.async.newHist.windowSize
 672  				if block.async.newHist.dict != nil {
 673  					hist.setDict(block.async.newHist.dict)
 674  				}
 675  			}
 676  			if block.err != nil || block.Type != blockTypeCompressed {
 677  				hasErr = block.err != nil
 678  				seqExecute <- block
 679  				continue
 680  			}
 681  
 682  			hist.decoders.literals = block.async.literals
 683  			block.err = block.prepareSequences(block.async.seqData, &hist)
 684  			if debugDecoder && block.err != nil {
 685  				println("prepareSequences returned:", block.err)
 686  			}
 687  			hasErr = block.err != nil
 688  			if block.err == nil {
 689  				block.err = block.decodeSequences(&hist)
 690  				if debugDecoder && block.err != nil {
 691  					println("decodeSequences returned:", block.err)
 692  				}
 693  				hasErr = block.err != nil
 694  				//				block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
 695  				block.async.seqSize = hist.decoders.seqSize
 696  			}
 697  			seqExecute <- block
 698  		}
 699  		close(seqExecute)
 700  		hist.reset()
 701  	}()
 702  
 703  	var wg sync.WaitGroup
 704  	wg.Add(1)
 705  
 706  	// Async 3: Execute sequences...
 707  	frameHistCache := d.frame.history.b
 708  	go func() {
 709  		var hist history
 710  		var decodedFrame uint64
 711  		var fcs uint64
 712  		var hasErr bool
 713  		for block := range seqExecute {
 714  			out := decodeOutput{err: block.err, d: block}
 715  			if block.err != nil || hasErr {
 716  				hasErr = true
 717  				output <- out
 718  				continue
 719  			}
 720  			if block.async.newHist != nil {
 721  				if debugDecoder {
 722  					println("Async 2: new history")
 723  				}
 724  				hist.reset()
 725  				hist.windowSize = block.async.newHist.windowSize
 726  				hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
 727  				if block.async.newHist.dict != nil {
 728  					hist.setDict(block.async.newHist.dict)
 729  				}
 730  
 731  				if cap(hist.b) < hist.allocFrameBuffer {
 732  					if cap(frameHistCache) >= hist.allocFrameBuffer {
 733  						hist.b = frameHistCache
 734  					} else {
 735  						hist.b = make([]byte, 0, hist.allocFrameBuffer)
 736  						println("Alloc history sized", hist.allocFrameBuffer)
 737  					}
 738  				}
 739  				hist.b = hist.b[:0]
 740  				fcs = block.async.fcs
 741  				decodedFrame = 0
 742  			}
 743  			do := decodeOutput{err: block.err, d: block}
 744  			switch block.Type {
 745  			case blockTypeRLE:
 746  				if debugDecoder {
 747  					println("add rle block length:", block.RLESize)
 748  				}
 749  
 750  				if cap(block.dst) < int(block.RLESize) {
 751  					if block.lowMem {
 752  						block.dst = make([]byte, block.RLESize)
 753  					} else {
 754  						block.dst = make([]byte, maxCompressedBlockSize)
 755  					}
 756  				}
 757  				block.dst = block.dst[:block.RLESize]
 758  				v := block.data[0]
 759  				for i := range block.dst {
 760  					block.dst[i] = v
 761  				}
 762  				hist.append(block.dst)
 763  				do.b = block.dst
 764  			case blockTypeRaw:
 765  				if debugDecoder {
 766  					println("add raw block length:", len(block.data))
 767  				}
 768  				hist.append(block.data)
 769  				do.b = block.data
 770  			case blockTypeCompressed:
 771  				if debugDecoder {
 772  					println("execute with history length:", len(hist.b), "window:", hist.windowSize)
 773  				}
 774  				hist.decoders.seqSize = block.async.seqSize
 775  				hist.decoders.literals = block.async.literals
 776  				do.err = block.executeSequences(&hist)
 777  				hasErr = do.err != nil
 778  				if debugDecoder && hasErr {
 779  					println("executeSequences returned:", do.err)
 780  				}
 781  				do.b = block.dst
 782  			}
 783  			if !hasErr {
 784  				decodedFrame += uint64(len(do.b))
 785  				if decodedFrame > fcs {
 786  					println("fcs exceeded", block.Last, fcs, decodedFrame)
 787  					do.err = ErrFrameSizeExceeded
 788  					hasErr = true
 789  				} else if block.Last && fcs != fcsUnknown && decodedFrame != fcs {
 790  					do.err = ErrFrameSizeMismatch
 791  					hasErr = true
 792  				} else {
 793  					if debugDecoder {
 794  						println("fcs ok", block.Last, fcs, decodedFrame)
 795  					}
 796  				}
 797  			}
 798  			output <- do
 799  		}
 800  		close(output)
 801  		frameHistCache = hist.b
 802  		wg.Done()
 803  		if debugDecoder {
 804  			println("decoder goroutines finished")
 805  		}
 806  		hist.reset()
 807  	}()
 808  
 809  	var hist history
 810  decodeStream:
 811  	for {
 812  		var hasErr bool
 813  		hist.reset()
 814  		decodeBlock := func(block *blockDec) {
 815  			if hasErr {
 816  				if block != nil {
 817  					seqDecode <- block
 818  				}
 819  				return
 820  			}
 821  			if block.err != nil || block.Type != blockTypeCompressed {
 822  				hasErr = block.err != nil
 823  				seqDecode <- block
 824  				return
 825  			}
 826  
 827  			remain, err := block.decodeLiterals(block.data, &hist)
 828  			block.err = err
 829  			hasErr = block.err != nil
 830  			if err == nil {
 831  				block.async.literals = hist.decoders.literals
 832  				block.async.seqData = remain
 833  			} else if debugDecoder {
 834  				println("decodeLiterals error:", err)
 835  			}
 836  			seqDecode <- block
 837  		}
 838  		frame := d.frame
 839  		if debugDecoder {
 840  			println("New frame...")
 841  		}
 842  		var historySent bool
 843  		frame.history.reset()
 844  		err := frame.reset(&br)
 845  		if debugDecoder && err != nil {
 846  			println("Frame decoder returned", err)
 847  		}
 848  		if err == nil {
 849  			err = d.setDict(frame)
 850  		}
 851  		if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
 852  			if debugDecoder {
 853  				println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
 854  			}
 855  
 856  			err = ErrDecoderSizeExceeded
 857  		}
 858  		if err != nil {
 859  			select {
 860  			case <-ctx.Done():
 861  			case dec := <-d.decoders:
 862  				dec.sendErr(err)
 863  				decodeBlock(dec)
 864  			}
 865  			break decodeStream
 866  		}
 867  
 868  		// Go through all blocks of the frame.
 869  		for {
 870  			var dec *blockDec
 871  			select {
 872  			case <-ctx.Done():
 873  				break decodeStream
 874  			case dec = <-d.decoders:
 875  				// Once we have a decoder, we MUST return it.
 876  			}
 877  			err := frame.next(dec)
 878  			if !historySent {
 879  				h := frame.history
 880  				if debugDecoder {
 881  					println("Alloc History:", h.allocFrameBuffer)
 882  				}
 883  				hist.reset()
 884  				if h.dict != nil {
 885  					hist.setDict(h.dict)
 886  				}
 887  				dec.async.newHist = &h
 888  				dec.async.fcs = frame.FrameContentSize
 889  				historySent = true
 890  			} else {
 891  				dec.async.newHist = nil
 892  			}
 893  			if debugDecoder && err != nil {
 894  				println("next block returned error:", err)
 895  			}
 896  			dec.err = err
 897  			dec.hasCRC = false
 898  			if dec.Last && frame.HasCheckSum && err == nil {
 899  				crc, err := frame.rawInput.readSmall(4)
 900  				if len(crc) < 4 {
 901  					if err == nil {
 902  						err = io.ErrUnexpectedEOF
 903  
 904  					}
 905  					println("CRC missing?", err)
 906  					dec.err = err
 907  				} else {
 908  					dec.checkCRC = binary.LittleEndian.Uint32(crc)
 909  					dec.hasCRC = true
 910  					if debugDecoder {
 911  						printf("found crc to check: %08x\n", dec.checkCRC)
 912  					}
 913  				}
 914  			}
 915  			err = dec.err
 916  			last := dec.Last
 917  			decodeBlock(dec)
 918  			if err != nil {
 919  				break decodeStream
 920  			}
 921  			if last {
 922  				break
 923  			}
 924  		}
 925  	}
 926  	close(seqDecode)
 927  	wg.Wait()
 928  	hist.reset()
 929  	d.frame.history.b = frameHistCache
 930  }
 931  
 932  func (d *Decoder) setDict(frame *frameDec) (err error) {
 933  	dict, ok := d.dicts[frame.DictionaryID]
 934  	if ok {
 935  		if debugDecoder {
 936  			println("setting dict", frame.DictionaryID)
 937  		}
 938  		frame.history.setDict(dict)
 939  	} else if frame.DictionaryID != 0 {
 940  		// A zero or missing dictionary id is ambiguous:
 941  		// either dictionary zero, or no dictionary. In particular,
 942  		// zstd --patch-from uses this id for the source file,
 943  		// so only return an error if the dictionary id is not zero.
 944  		err = ErrUnknownDictionary
 945  	}
 946  	return err
 947  }
 948