decoder.go raw
1 // Copyright 2019+ Klaus Post. All rights reserved.
2 // License information can be found in the LICENSE file.
3 // Based on work by Yann Collet, released under BSD License.
4
5 package zstd
6
7 import (
8 "context"
9 "encoding/binary"
10 "io"
11 "sync"
12
13 "github.com/klauspost/compress/zstd/internal/xxhash"
14 )
15
16 // Decoder provides decoding of zstandard streams.
17 // The decoder has been designed to operate without allocations after a warmup.
18 // This means that you should store the decoder for best performance.
19 // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
20 // A decoder can safely be re-used even if the previous stream failed.
21 // To release the resources, you must call the Close() function on a decoder.
22 type Decoder struct {
23 o decoderOptions
24
25 // Unreferenced decoders, ready for use.
26 decoders chan *blockDec
27
28 // Current read position used for Reader functionality.
29 current decoderState
30
31 // sync stream decoding
32 syncStream struct {
33 decodedFrame uint64
34 br readerWrapper
35 enabled bool
36 inFrame bool
37 dstBuf []byte
38 }
39
40 frame *frameDec
41
42 // Custom dictionaries.
43 dicts map[uint32]*dict
44
45 // streamWg is the waitgroup for all streams
46 streamWg sync.WaitGroup
47 }
48
49 // decoderState is used for maintaining state when the decoder
50 // is used for streaming.
51 type decoderState struct {
52 // current block being written to stream.
53 decodeOutput
54
55 // output in order to be written to stream.
56 output chan decodeOutput
57
58 // cancel remaining output.
59 cancel context.CancelFunc
60
61 // crc of current frame
62 crc *xxhash.Digest
63
64 flushed bool
65 }
66
67 var (
68 // Check the interfaces we want to support.
69 _ = io.WriterTo(&Decoder{})
70 _ = io.Reader(&Decoder{})
71 )
72
73 // NewReader creates a new decoder.
74 // A nil Reader can be provided in which case Reset can be used to start a decode.
75 //
76 // A Decoder can be used in two modes:
77 //
78 // 1) As a stream, or
79 // 2) For stateless decoding using DecodeAll.
80 //
81 // Only a single stream can be decoded concurrently, but the same decoder
82 // can run multiple concurrent stateless decodes. It is even possible to
83 // use stateless decodes while a stream is being decoded.
84 //
85 // The Reset function can be used to initiate a new stream, which will considerably
86 // reduce the allocations normally caused by NewReader.
87 func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
88 initPredefined()
89 var d Decoder
90 d.o.setDefault()
91 for _, o := range opts {
92 err := o(&d.o)
93 if err != nil {
94 return nil, err
95 }
96 }
97 d.current.crc = xxhash.New()
98 d.current.flushed = true
99
100 if r == nil {
101 d.current.err = ErrDecoderNilInput
102 }
103
104 // Transfer option dicts.
105 d.dicts = make(map[uint32]*dict, len(d.o.dicts))
106 for _, dc := range d.o.dicts {
107 d.dicts[dc.id] = dc
108 }
109 d.o.dicts = nil
110
111 // Create decoders
112 d.decoders = make(chan *blockDec, d.o.concurrent)
113 for i := 0; i < d.o.concurrent; i++ {
114 dec := newBlockDec(d.o.lowMem)
115 dec.localFrame = newFrameDec(d.o)
116 d.decoders <- dec
117 }
118
119 if r == nil {
120 return &d, nil
121 }
122 return &d, d.Reset(r)
123 }
124
125 // Read bytes from the decompressed stream into p.
126 // Returns the number of bytes read and any error that occurred.
127 // When the stream is done, io.EOF will be returned.
128 func (d *Decoder) Read(p []byte) (int, error) {
129 var n int
130 for {
131 if len(d.current.b) > 0 {
132 filled := copy(p, d.current.b)
133 p = p[filled:]
134 d.current.b = d.current.b[filled:]
135 n += filled
136 }
137 if len(p) == 0 {
138 break
139 }
140 if len(d.current.b) == 0 {
141 // We have an error and no more data
142 if d.current.err != nil {
143 break
144 }
145 if !d.nextBlock(n == 0) {
146 return n, d.current.err
147 }
148 }
149 }
150 if len(d.current.b) > 0 {
151 if debugDecoder {
152 println("returning", n, "still bytes left:", len(d.current.b))
153 }
154 // Only return error at end of block
155 return n, nil
156 }
157 if d.current.err != nil {
158 d.drainOutput()
159 }
160 if debugDecoder {
161 println("returning", n, d.current.err, len(d.decoders))
162 }
163 return n, d.current.err
164 }
165
166 // Reset will reset the decoder the supplied stream after the current has finished processing.
167 // Note that this functionality cannot be used after Close has been called.
168 // Reset can be called with a nil reader to release references to the previous reader.
169 // After being called with a nil reader, no other operations than Reset or DecodeAll or Close
170 // should be used.
171 func (d *Decoder) Reset(r io.Reader) error {
172 if d.current.err == ErrDecoderClosed {
173 return d.current.err
174 }
175
176 d.drainOutput()
177
178 d.syncStream.br.r = nil
179 if r == nil {
180 d.current.err = ErrDecoderNilInput
181 if len(d.current.b) > 0 {
182 d.current.b = d.current.b[:0]
183 }
184 d.current.flushed = true
185 return nil
186 }
187
188 // If bytes buffer and < 5MB, do sync decoding anyway.
189 if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap {
190 bb2 := bb
191 if debugDecoder {
192 println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
193 }
194 b := bb2.Bytes()
195 var dst []byte
196 if cap(d.syncStream.dstBuf) > 0 {
197 dst = d.syncStream.dstBuf[:0]
198 }
199
200 dst, err := d.DecodeAll(b, dst)
201 if err == nil {
202 err = io.EOF
203 }
204 // Save output buffer
205 d.syncStream.dstBuf = dst
206 d.current.b = dst
207 d.current.err = err
208 d.current.flushed = true
209 if debugDecoder {
210 println("sync decode to", len(dst), "bytes, err:", err)
211 }
212 return nil
213 }
214 // Remove current block.
215 d.stashDecoder()
216 d.current.decodeOutput = decodeOutput{}
217 d.current.err = nil
218 d.current.flushed = false
219 d.current.d = nil
220 d.syncStream.dstBuf = nil
221
222 // Ensure no-one else is still running...
223 d.streamWg.Wait()
224 if d.frame == nil {
225 d.frame = newFrameDec(d.o)
226 }
227
228 if d.o.concurrent == 1 {
229 return d.startSyncDecoder(r)
230 }
231
232 d.current.output = make(chan decodeOutput, d.o.concurrent)
233 ctx, cancel := context.WithCancel(context.Background())
234 d.current.cancel = cancel
235 d.streamWg.Add(1)
236 go d.startStreamDecoder(ctx, r, d.current.output)
237
238 return nil
239 }
240
241 // drainOutput will drain the output until errEndOfStream is sent.
242 func (d *Decoder) drainOutput() {
243 if d.current.cancel != nil {
244 if debugDecoder {
245 println("cancelling current")
246 }
247 d.current.cancel()
248 d.current.cancel = nil
249 }
250 if d.current.d != nil {
251 if debugDecoder {
252 printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
253 }
254 d.decoders <- d.current.d
255 d.current.d = nil
256 d.current.b = nil
257 }
258 if d.current.output == nil || d.current.flushed {
259 println("current already flushed")
260 return
261 }
262 for v := range d.current.output {
263 if v.d != nil {
264 if debugDecoder {
265 printf("re-adding decoder %p", v.d)
266 }
267 d.decoders <- v.d
268 }
269 }
270 d.current.output = nil
271 d.current.flushed = true
272 }
273
274 // WriteTo writes data to w until there's no more data to write or when an error occurs.
275 // The return value n is the number of bytes written.
276 // Any error encountered during the write is also returned.
277 func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
278 var n int64
279 for {
280 if len(d.current.b) > 0 {
281 n2, err2 := w.Write(d.current.b)
282 n += int64(n2)
283 if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
284 d.current.err = err2
285 } else if n2 != len(d.current.b) {
286 d.current.err = io.ErrShortWrite
287 }
288 }
289 if d.current.err != nil {
290 break
291 }
292 d.nextBlock(true)
293 }
294 err := d.current.err
295 if err != nil {
296 d.drainOutput()
297 }
298 if err == io.EOF {
299 err = nil
300 }
301 return n, err
302 }
303
304 // DecodeAll allows stateless decoding of a blob of bytes.
305 // Output will be appended to dst, so if the destination size is known
306 // you can pre-allocate the destination slice to avoid allocations.
307 // DecodeAll can be used concurrently.
308 // The Decoder concurrency limits will be respected.
309 func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
310 if d.decoders == nil {
311 return dst, ErrDecoderClosed
312 }
313
314 // Grab a block decoder and frame decoder.
315 block := <-d.decoders
316 frame := block.localFrame
317 initialSize := len(dst)
318 defer func() {
319 if debugDecoder {
320 printf("re-adding decoder: %p", block)
321 }
322 frame.rawInput = nil
323 frame.bBuf = nil
324 if frame.history.decoders.br != nil {
325 frame.history.decoders.br.in = nil
326 frame.history.decoders.br.cursor = 0
327 }
328 d.decoders <- block
329 }()
330 frame.bBuf = input
331
332 for {
333 frame.history.reset()
334 err := frame.reset(&frame.bBuf)
335 if err != nil {
336 if err == io.EOF {
337 if debugDecoder {
338 println("frame reset return EOF")
339 }
340 return dst, nil
341 }
342 return dst, err
343 }
344 if err = d.setDict(frame); err != nil {
345 return nil, err
346 }
347 if frame.WindowSize > d.o.maxWindowSize {
348 if debugDecoder {
349 println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
350 }
351 return dst, ErrWindowSizeExceeded
352 }
353 if frame.FrameContentSize != fcsUnknown {
354 if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
355 if debugDecoder {
356 println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
357 }
358 return dst, ErrDecoderSizeExceeded
359 }
360 if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
361 if debugDecoder {
362 println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
363 }
364 return dst, ErrDecoderSizeExceeded
365 }
366 if cap(dst)-len(dst) < int(frame.FrameContentSize) {
367 dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
368 copy(dst2, dst)
369 dst = dst2
370 }
371 }
372
373 if cap(dst) == 0 && !d.o.limitToCap {
374 // Allocate len(input) * 2 by default if nothing is provided
375 // and we didn't get frame content size.
376 size := min(
377 // Cap to 1 MB.
378 len(input)*2, 1<<20)
379 if uint64(size) > d.o.maxDecodedSize {
380 size = int(d.o.maxDecodedSize)
381 }
382 dst = make([]byte, 0, size)
383 }
384
385 dst, err = frame.runDecoder(dst, block)
386 if err != nil {
387 return dst, err
388 }
389 if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
390 return dst, ErrDecoderSizeExceeded
391 }
392 if len(frame.bBuf) == 0 {
393 if debugDecoder {
394 println("frame dbuf empty")
395 }
396 break
397 }
398 }
399 return dst, nil
400 }
401
402 // nextBlock returns the next block.
403 // If an error occurs d.err will be set.
404 // Optionally the function can block for new output.
405 // If non-blocking mode is used the returned boolean will be false
406 // if no data was available without blocking.
407 func (d *Decoder) nextBlock(blocking bool) (ok bool) {
408 if d.current.err != nil {
409 // Keep error state.
410 return false
411 }
412 d.current.b = d.current.b[:0]
413
414 // SYNC:
415 if d.syncStream.enabled {
416 if !blocking {
417 return false
418 }
419 ok = d.nextBlockSync()
420 if !ok {
421 d.stashDecoder()
422 }
423 return ok
424 }
425
426 //ASYNC:
427 d.stashDecoder()
428 if blocking {
429 d.current.decodeOutput, ok = <-d.current.output
430 } else {
431 select {
432 case d.current.decodeOutput, ok = <-d.current.output:
433 default:
434 return false
435 }
436 }
437 if !ok {
438 // This should not happen, so signal error state...
439 d.current.err = io.ErrUnexpectedEOF
440 return false
441 }
442 next := d.current.decodeOutput
443 if next.d != nil && next.d.async.newHist != nil {
444 d.current.crc.Reset()
445 }
446 if debugDecoder {
447 var tmp [4]byte
448 binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
449 println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
450 }
451
452 if d.o.ignoreChecksum {
453 return true
454 }
455
456 if len(next.b) > 0 {
457 d.current.crc.Write(next.b)
458 }
459 if next.err == nil && next.d != nil && next.d.hasCRC {
460 got := uint32(d.current.crc.Sum64())
461 if got != next.d.checkCRC {
462 if debugDecoder {
463 printf("CRC Check Failed: %08x (got) != %08x (on stream)\n", got, next.d.checkCRC)
464 }
465 d.current.err = ErrCRCMismatch
466 } else {
467 if debugDecoder {
468 printf("CRC ok %08x\n", got)
469 }
470 }
471 }
472
473 return true
474 }
475
476 func (d *Decoder) nextBlockSync() (ok bool) {
477 if d.current.d == nil {
478 d.current.d = <-d.decoders
479 }
480 for len(d.current.b) == 0 {
481 if !d.syncStream.inFrame {
482 d.frame.history.reset()
483 d.current.err = d.frame.reset(&d.syncStream.br)
484 if d.current.err == nil {
485 d.current.err = d.setDict(d.frame)
486 }
487 if d.current.err != nil {
488 return false
489 }
490 if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
491 d.current.err = ErrDecoderSizeExceeded
492 return false
493 }
494
495 d.syncStream.decodedFrame = 0
496 d.syncStream.inFrame = true
497 }
498 d.current.err = d.frame.next(d.current.d)
499 if d.current.err != nil {
500 return false
501 }
502 d.frame.history.ensureBlock()
503 if debugDecoder {
504 println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
505 }
506 histBefore := len(d.frame.history.b)
507 d.current.err = d.current.d.decodeBuf(&d.frame.history)
508
509 if d.current.err != nil {
510 println("error after:", d.current.err)
511 return false
512 }
513 d.current.b = d.frame.history.b[histBefore:]
514 if debugDecoder {
515 println("history after:", len(d.frame.history.b))
516 }
517
518 // Check frame size (before CRC)
519 d.syncStream.decodedFrame += uint64(len(d.current.b))
520 if d.syncStream.decodedFrame > d.frame.FrameContentSize {
521 if debugDecoder {
522 printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
523 }
524 d.current.err = ErrFrameSizeExceeded
525 return false
526 }
527
528 // Check FCS
529 if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize {
530 if debugDecoder {
531 printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
532 }
533 d.current.err = ErrFrameSizeMismatch
534 return false
535 }
536
537 // Update/Check CRC
538 if d.frame.HasCheckSum {
539 if !d.o.ignoreChecksum {
540 d.frame.crc.Write(d.current.b)
541 }
542 if d.current.d.Last {
543 if !d.o.ignoreChecksum {
544 d.current.err = d.frame.checkCRC()
545 } else {
546 d.current.err = d.frame.consumeCRC()
547 }
548 if d.current.err != nil {
549 println("CRC error:", d.current.err)
550 return false
551 }
552 }
553 }
554 d.syncStream.inFrame = !d.current.d.Last
555 }
556 return true
557 }
558
559 func (d *Decoder) stashDecoder() {
560 if d.current.d != nil {
561 if debugDecoder {
562 printf("re-adding current decoder %p", d.current.d)
563 }
564 d.decoders <- d.current.d
565 d.current.d = nil
566 }
567 }
568
569 // Close will release all resources.
570 // It is NOT possible to reuse the decoder after this.
571 func (d *Decoder) Close() {
572 if d.current.err == ErrDecoderClosed {
573 return
574 }
575 d.drainOutput()
576 if d.current.cancel != nil {
577 d.current.cancel()
578 d.streamWg.Wait()
579 d.current.cancel = nil
580 }
581 if d.decoders != nil {
582 close(d.decoders)
583 for dec := range d.decoders {
584 dec.Close()
585 }
586 d.decoders = nil
587 }
588 if d.current.d != nil {
589 d.current.d.Close()
590 d.current.d = nil
591 }
592 d.current.err = ErrDecoderClosed
593 }
594
595 // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
596 // Any changes to the decoder will be reflected, so the returned ReadCloser
597 // can be reused along with the decoder.
598 // io.WriterTo is also supported by the returned ReadCloser.
599 func (d *Decoder) IOReadCloser() io.ReadCloser {
600 return closeWrapper{d: d}
601 }
602
603 // closeWrapper wraps a function call as a closer.
604 type closeWrapper struct {
605 d *Decoder
606 }
607
608 // WriteTo forwards WriteTo calls to the decoder.
609 func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
610 return c.d.WriteTo(w)
611 }
612
613 // Read forwards read calls to the decoder.
614 func (c closeWrapper) Read(p []byte) (n int, err error) {
615 return c.d.Read(p)
616 }
617
618 // Close closes the decoder.
619 func (c closeWrapper) Close() error {
620 c.d.Close()
621 return nil
622 }
623
624 type decodeOutput struct {
625 d *blockDec
626 b []byte
627 err error
628 }
629
630 func (d *Decoder) startSyncDecoder(r io.Reader) error {
631 d.frame.history.reset()
632 d.syncStream.br = readerWrapper{r: r}
633 d.syncStream.inFrame = false
634 d.syncStream.enabled = true
635 d.syncStream.decodedFrame = 0
636 return nil
637 }
638
639 // Create Decoder:
640 // ASYNC:
641 // Spawn 3 go routines.
642 // 0: Read frames and decode block literals.
643 // 1: Decode sequences.
644 // 2: Execute sequences, send to output.
645 func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
646 defer d.streamWg.Done()
647 br := readerWrapper{r: r}
648
649 var seqDecode = make(chan *blockDec, d.o.concurrent)
650 var seqExecute = make(chan *blockDec, d.o.concurrent)
651
652 // Async 1: Decode sequences...
653 go func() {
654 var hist history
655 var hasErr bool
656
657 for block := range seqDecode {
658 if hasErr {
659 if block != nil {
660 seqExecute <- block
661 }
662 continue
663 }
664 if block.async.newHist != nil {
665 if debugDecoder {
666 println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
667 }
668 hist.reset()
669 hist.decoders = block.async.newHist.decoders
670 hist.recentOffsets = block.async.newHist.recentOffsets
671 hist.windowSize = block.async.newHist.windowSize
672 if block.async.newHist.dict != nil {
673 hist.setDict(block.async.newHist.dict)
674 }
675 }
676 if block.err != nil || block.Type != blockTypeCompressed {
677 hasErr = block.err != nil
678 seqExecute <- block
679 continue
680 }
681
682 hist.decoders.literals = block.async.literals
683 block.err = block.prepareSequences(block.async.seqData, &hist)
684 if debugDecoder && block.err != nil {
685 println("prepareSequences returned:", block.err)
686 }
687 hasErr = block.err != nil
688 if block.err == nil {
689 block.err = block.decodeSequences(&hist)
690 if debugDecoder && block.err != nil {
691 println("decodeSequences returned:", block.err)
692 }
693 hasErr = block.err != nil
694 // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
695 block.async.seqSize = hist.decoders.seqSize
696 }
697 seqExecute <- block
698 }
699 close(seqExecute)
700 hist.reset()
701 }()
702
703 var wg sync.WaitGroup
704 wg.Add(1)
705
706 // Async 3: Execute sequences...
707 frameHistCache := d.frame.history.b
708 go func() {
709 var hist history
710 var decodedFrame uint64
711 var fcs uint64
712 var hasErr bool
713 for block := range seqExecute {
714 out := decodeOutput{err: block.err, d: block}
715 if block.err != nil || hasErr {
716 hasErr = true
717 output <- out
718 continue
719 }
720 if block.async.newHist != nil {
721 if debugDecoder {
722 println("Async 2: new history")
723 }
724 hist.reset()
725 hist.windowSize = block.async.newHist.windowSize
726 hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
727 if block.async.newHist.dict != nil {
728 hist.setDict(block.async.newHist.dict)
729 }
730
731 if cap(hist.b) < hist.allocFrameBuffer {
732 if cap(frameHistCache) >= hist.allocFrameBuffer {
733 hist.b = frameHistCache
734 } else {
735 hist.b = make([]byte, 0, hist.allocFrameBuffer)
736 println("Alloc history sized", hist.allocFrameBuffer)
737 }
738 }
739 hist.b = hist.b[:0]
740 fcs = block.async.fcs
741 decodedFrame = 0
742 }
743 do := decodeOutput{err: block.err, d: block}
744 switch block.Type {
745 case blockTypeRLE:
746 if debugDecoder {
747 println("add rle block length:", block.RLESize)
748 }
749
750 if cap(block.dst) < int(block.RLESize) {
751 if block.lowMem {
752 block.dst = make([]byte, block.RLESize)
753 } else {
754 block.dst = make([]byte, maxCompressedBlockSize)
755 }
756 }
757 block.dst = block.dst[:block.RLESize]
758 v := block.data[0]
759 for i := range block.dst {
760 block.dst[i] = v
761 }
762 hist.append(block.dst)
763 do.b = block.dst
764 case blockTypeRaw:
765 if debugDecoder {
766 println("add raw block length:", len(block.data))
767 }
768 hist.append(block.data)
769 do.b = block.data
770 case blockTypeCompressed:
771 if debugDecoder {
772 println("execute with history length:", len(hist.b), "window:", hist.windowSize)
773 }
774 hist.decoders.seqSize = block.async.seqSize
775 hist.decoders.literals = block.async.literals
776 do.err = block.executeSequences(&hist)
777 hasErr = do.err != nil
778 if debugDecoder && hasErr {
779 println("executeSequences returned:", do.err)
780 }
781 do.b = block.dst
782 }
783 if !hasErr {
784 decodedFrame += uint64(len(do.b))
785 if decodedFrame > fcs {
786 println("fcs exceeded", block.Last, fcs, decodedFrame)
787 do.err = ErrFrameSizeExceeded
788 hasErr = true
789 } else if block.Last && fcs != fcsUnknown && decodedFrame != fcs {
790 do.err = ErrFrameSizeMismatch
791 hasErr = true
792 } else {
793 if debugDecoder {
794 println("fcs ok", block.Last, fcs, decodedFrame)
795 }
796 }
797 }
798 output <- do
799 }
800 close(output)
801 frameHistCache = hist.b
802 wg.Done()
803 if debugDecoder {
804 println("decoder goroutines finished")
805 }
806 hist.reset()
807 }()
808
809 var hist history
810 decodeStream:
811 for {
812 var hasErr bool
813 hist.reset()
814 decodeBlock := func(block *blockDec) {
815 if hasErr {
816 if block != nil {
817 seqDecode <- block
818 }
819 return
820 }
821 if block.err != nil || block.Type != blockTypeCompressed {
822 hasErr = block.err != nil
823 seqDecode <- block
824 return
825 }
826
827 remain, err := block.decodeLiterals(block.data, &hist)
828 block.err = err
829 hasErr = block.err != nil
830 if err == nil {
831 block.async.literals = hist.decoders.literals
832 block.async.seqData = remain
833 } else if debugDecoder {
834 println("decodeLiterals error:", err)
835 }
836 seqDecode <- block
837 }
838 frame := d.frame
839 if debugDecoder {
840 println("New frame...")
841 }
842 var historySent bool
843 frame.history.reset()
844 err := frame.reset(&br)
845 if debugDecoder && err != nil {
846 println("Frame decoder returned", err)
847 }
848 if err == nil {
849 err = d.setDict(frame)
850 }
851 if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
852 if debugDecoder {
853 println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
854 }
855
856 err = ErrDecoderSizeExceeded
857 }
858 if err != nil {
859 select {
860 case <-ctx.Done():
861 case dec := <-d.decoders:
862 dec.sendErr(err)
863 decodeBlock(dec)
864 }
865 break decodeStream
866 }
867
868 // Go through all blocks of the frame.
869 for {
870 var dec *blockDec
871 select {
872 case <-ctx.Done():
873 break decodeStream
874 case dec = <-d.decoders:
875 // Once we have a decoder, we MUST return it.
876 }
877 err := frame.next(dec)
878 if !historySent {
879 h := frame.history
880 if debugDecoder {
881 println("Alloc History:", h.allocFrameBuffer)
882 }
883 hist.reset()
884 if h.dict != nil {
885 hist.setDict(h.dict)
886 }
887 dec.async.newHist = &h
888 dec.async.fcs = frame.FrameContentSize
889 historySent = true
890 } else {
891 dec.async.newHist = nil
892 }
893 if debugDecoder && err != nil {
894 println("next block returned error:", err)
895 }
896 dec.err = err
897 dec.hasCRC = false
898 if dec.Last && frame.HasCheckSum && err == nil {
899 crc, err := frame.rawInput.readSmall(4)
900 if len(crc) < 4 {
901 if err == nil {
902 err = io.ErrUnexpectedEOF
903
904 }
905 println("CRC missing?", err)
906 dec.err = err
907 } else {
908 dec.checkCRC = binary.LittleEndian.Uint32(crc)
909 dec.hasCRC = true
910 if debugDecoder {
911 printf("found crc to check: %08x\n", dec.checkCRC)
912 }
913 }
914 }
915 err = dec.err
916 last := dec.Last
917 decodeBlock(dec)
918 if err != nil {
919 break decodeStream
920 }
921 if last {
922 break
923 }
924 }
925 }
926 close(seqDecode)
927 wg.Wait()
928 hist.reset()
929 d.frame.history.b = frameHistCache
930 }
931
932 func (d *Decoder) setDict(frame *frameDec) (err error) {
933 dict, ok := d.dicts[frame.DictionaryID]
934 if ok {
935 if debugDecoder {
936 println("setting dict", frame.DictionaryID)
937 }
938 frame.history.setDict(dict)
939 } else if frame.DictionaryID != 0 {
940 // A zero or missing dictionary id is ambiguous:
941 // either dictionary zero, or no dictionary. In particular,
942 // zstd --patch-from uses this id for the source file,
943 // so only return an error if the dictionary id is not zero.
944 err = ErrUnknownDictionary
945 }
946 return err
947 }
948