writer.go raw
1 // Copyright 2011 The Snappy-Go Authors. All rights reserved.
2 // Copyright (c) 2019+ Klaus Post. All rights reserved.
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file.
5
6 package s2
7
8 import (
9 "crypto/rand"
10 "encoding/binary"
11 "errors"
12 "fmt"
13 "io"
14 "runtime"
15 "sync"
16
17 "github.com/klauspost/compress/internal/race"
18 )
19
20 const (
21 levelUncompressed = iota + 1
22 levelFast
23 levelBetter
24 levelBest
25 )
26
27 // NewWriter returns a new Writer that compresses to w, using the
28 // framing format described at
29 // https://github.com/google/snappy/blob/master/framing_format.txt
30 //
31 // Users must call Close to guarantee all data has been forwarded to
32 // the underlying io.Writer and that resources are released.
33 // They may also call Flush zero or more times before calling Close.
34 func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
35 w2 := Writer{
36 blockSize: defaultBlockSize,
37 concurrency: runtime.GOMAXPROCS(0),
38 randSrc: rand.Reader,
39 level: levelFast,
40 }
41 for _, opt := range opts {
42 if err := opt(&w2); err != nil {
43 w2.errState = err
44 return &w2
45 }
46 }
47 w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
48 w2.paramsOK = true
49 w2.ibuf = make([]byte, 0, w2.blockSize)
50 w2.buffers.New = func() any {
51 return make([]byte, w2.obufLen)
52 }
53 w2.Reset(w)
54 return &w2
55 }
56
57 // Writer is an io.Writer that can write Snappy-compressed bytes.
58 type Writer struct {
59 errMu sync.Mutex
60 errState error
61
62 // ibuf is a buffer for the incoming (uncompressed) bytes.
63 ibuf []byte
64
65 blockSize int
66 obufLen int
67 concurrency int
68 written int64
69 uncompWritten int64 // Bytes sent to compression
70 output chan chan result
71 buffers sync.Pool
72 pad int
73
74 writer io.Writer
75 randSrc io.Reader
76 writerWg sync.WaitGroup
77 index Index
78 customEnc func(dst, src []byte) int
79
80 // wroteStreamHeader is whether we have written the stream header.
81 wroteStreamHeader bool
82 paramsOK bool
83 snappy bool
84 flushOnWrite bool
85 appendIndex bool
86 bufferCB func([]byte)
87 level uint8
88 }
89
90 type result struct {
91 b []byte
92 // return when writing
93 ret []byte
94 // Uncompressed start offset
95 startOffset int64
96 }
97
98 // err returns the previously set error.
99 // If no error has been set it is set to err if not nil.
100 func (w *Writer) err(err error) error {
101 w.errMu.Lock()
102 errSet := w.errState
103 if errSet == nil && err != nil {
104 w.errState = err
105 errSet = err
106 }
107 w.errMu.Unlock()
108 return errSet
109 }
110
111 // Reset discards the writer's state and switches the Snappy writer to write to w.
112 // This permits reusing a Writer rather than allocating a new one.
113 func (w *Writer) Reset(writer io.Writer) {
114 if !w.paramsOK {
115 return
116 }
117 // Close previous writer, if any.
118 if w.output != nil {
119 close(w.output)
120 w.writerWg.Wait()
121 w.output = nil
122 }
123 w.errState = nil
124 w.ibuf = w.ibuf[:0]
125 w.wroteStreamHeader = false
126 w.written = 0
127 w.writer = writer
128 w.uncompWritten = 0
129 w.index.reset(w.blockSize)
130
131 // If we didn't get a writer, stop here.
132 if writer == nil {
133 return
134 }
135 // If no concurrency requested, don't spin up writer goroutine.
136 if w.concurrency == 1 {
137 return
138 }
139
140 toWrite := make(chan chan result, w.concurrency)
141 w.output = toWrite
142 w.writerWg.Add(1)
143
144 // Start a writer goroutine that will write all output in order.
145 go func() {
146 defer w.writerWg.Done()
147
148 // Get a queued write.
149 for write := range toWrite {
150 // Wait for the data to be available.
151 input := <-write
152 if input.ret != nil && w.bufferCB != nil {
153 w.bufferCB(input.ret)
154 input.ret = nil
155 }
156 in := input.b
157 if len(in) > 0 {
158 if w.err(nil) == nil {
159 // Don't expose data from previous buffers.
160 toWrite := in[:len(in):len(in)]
161 // Write to output.
162 n, err := writer.Write(toWrite)
163 if err == nil && n != len(toWrite) {
164 err = io.ErrShortBuffer
165 }
166 _ = w.err(err)
167 w.err(w.index.add(w.written, input.startOffset))
168 w.written += int64(n)
169 }
170 }
171 if cap(in) >= w.obufLen {
172 w.buffers.Put(in)
173 }
174 // close the incoming write request.
175 // This can be used for synchronizing flushes.
176 close(write)
177 }
178 }()
179 }
180
181 // Write satisfies the io.Writer interface.
182 func (w *Writer) Write(p []byte) (nRet int, errRet error) {
183 if err := w.err(nil); err != nil {
184 return 0, err
185 }
186 if w.flushOnWrite {
187 return w.write(p)
188 }
189 // If we exceed the input buffer size, start writing
190 for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
191 var n int
192 if len(w.ibuf) == 0 {
193 // Large write, empty buffer.
194 // Write directly from p to avoid copy.
195 n, _ = w.write(p)
196 } else {
197 n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
198 w.ibuf = w.ibuf[:len(w.ibuf)+n]
199 w.write(w.ibuf)
200 w.ibuf = w.ibuf[:0]
201 }
202 nRet += n
203 p = p[n:]
204 }
205 if err := w.err(nil); err != nil {
206 return nRet, err
207 }
208 // p should always be able to fit into w.ibuf now.
209 n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
210 w.ibuf = w.ibuf[:len(w.ibuf)+n]
211 nRet += n
212 return nRet, nil
213 }
214
215 // ReadFrom implements the io.ReaderFrom interface.
216 // Using this is typically more efficient since it avoids a memory copy.
217 // ReadFrom reads data from r until EOF or error.
218 // The return value n is the number of bytes read.
219 // Any error except io.EOF encountered during the read is also returned.
220 func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
221 if err := w.err(nil); err != nil {
222 return 0, err
223 }
224 if len(w.ibuf) > 0 {
225 err := w.AsyncFlush()
226 if err != nil {
227 return 0, err
228 }
229 }
230 if br, ok := r.(byter); ok {
231 buf := br.Bytes()
232 if err := w.EncodeBuffer(buf); err != nil {
233 return 0, err
234 }
235 return int64(len(buf)), w.AsyncFlush()
236 }
237 for {
238 inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
239 n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
240 if err != nil {
241 if err == io.ErrUnexpectedEOF {
242 err = io.EOF
243 }
244 if err != io.EOF {
245 return n, w.err(err)
246 }
247 }
248 if n2 == 0 {
249 if cap(inbuf) >= w.obufLen {
250 w.buffers.Put(inbuf)
251 }
252 break
253 }
254 n += int64(n2)
255 err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
256 if w.err(err2) != nil {
257 break
258 }
259
260 if err != nil {
261 // We got EOF and wrote everything
262 break
263 }
264 }
265
266 return n, w.err(nil)
267 }
268
269 // AddSkippableBlock will add a skippable block to the stream.
270 // The ID must be 0x80-0xfe (inclusive).
271 // Length of the skippable block must be <= 16777215 bytes.
272 func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
273 if err := w.err(nil); err != nil {
274 return err
275 }
276 if len(data) == 0 {
277 return nil
278 }
279 if id < 0x80 || id > chunkTypePadding {
280 return fmt.Errorf("invalid skippable block id %x", id)
281 }
282 if len(data) > maxChunkSize {
283 return fmt.Errorf("skippable block excessed maximum size")
284 }
285 var header [4]byte
286 chunkLen := len(data)
287 header[0] = id
288 header[1] = uint8(chunkLen >> 0)
289 header[2] = uint8(chunkLen >> 8)
290 header[3] = uint8(chunkLen >> 16)
291 if w.concurrency == 1 {
292 write := func(b []byte) error {
293 n, err := w.writer.Write(b)
294 if err = w.err(err); err != nil {
295 return err
296 }
297 if n != len(b) {
298 return w.err(io.ErrShortWrite)
299 }
300 w.written += int64(n)
301 return w.err(nil)
302 }
303 if !w.wroteStreamHeader {
304 w.wroteStreamHeader = true
305 if w.snappy {
306 if err := write([]byte(magicChunkSnappy)); err != nil {
307 return err
308 }
309 } else {
310 if err := write([]byte(magicChunk)); err != nil {
311 return err
312 }
313 }
314 }
315 if err := write(header[:]); err != nil {
316 return err
317 }
318 return write(data)
319 }
320
321 // Create output...
322 if !w.wroteStreamHeader {
323 w.wroteStreamHeader = true
324 hWriter := make(chan result)
325 w.output <- hWriter
326 if w.snappy {
327 hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
328 } else {
329 hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
330 }
331 }
332
333 // Copy input.
334 inbuf := w.buffers.Get().([]byte)[:4]
335 copy(inbuf, header[:])
336 inbuf = append(inbuf, data...)
337
338 output := make(chan result, 1)
339 // Queue output.
340 w.output <- output
341 output <- result{startOffset: w.uncompWritten, b: inbuf}
342
343 return nil
344 }
345
346 // EncodeBuffer will add a buffer to the stream.
347 // This is the fastest way to encode a stream,
348 // but the input buffer cannot be written to by the caller
349 // until Flush or Close has been called when concurrency != 1.
350 //
351 // Use the WriterBufferDone to receive a callback when the buffer is done
352 // Processing.
353 //
354 // Note that input is not buffered.
355 // This means that each write will result in discrete blocks being created.
356 // For buffered writes, use the regular Write function.
357 func (w *Writer) EncodeBuffer(buf []byte) (err error) {
358 if err := w.err(nil); err != nil {
359 return err
360 }
361
362 if w.flushOnWrite {
363 _, err := w.write(buf)
364 return err
365 }
366 // Flush queued data first.
367 if len(w.ibuf) > 0 {
368 err := w.AsyncFlush()
369 if err != nil {
370 return err
371 }
372 }
373 if w.concurrency == 1 {
374 _, err := w.writeSync(buf)
375 if w.bufferCB != nil {
376 w.bufferCB(buf)
377 }
378 return err
379 }
380
381 // Spawn goroutine and write block to output channel.
382 if !w.wroteStreamHeader {
383 w.wroteStreamHeader = true
384 hWriter := make(chan result)
385 w.output <- hWriter
386 if w.snappy {
387 hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
388 } else {
389 hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
390 }
391 }
392 orgBuf := buf
393 for len(buf) > 0 {
394 // Cut input.
395 uncompressed := buf
396 if len(uncompressed) > w.blockSize {
397 uncompressed = uncompressed[:w.blockSize]
398 }
399 buf = buf[len(uncompressed):]
400 // Get an output buffer.
401 obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
402 race.WriteSlice(obuf)
403
404 output := make(chan result)
405 // Queue output now, so we keep order.
406 w.output <- output
407 res := result{
408 startOffset: w.uncompWritten,
409 }
410 w.uncompWritten += int64(len(uncompressed))
411 if len(buf) == 0 && w.bufferCB != nil {
412 res.ret = orgBuf
413 }
414 go func() {
415 race.ReadSlice(uncompressed)
416
417 checksum := crc(uncompressed)
418
419 // Set to uncompressed.
420 chunkType := uint8(chunkTypeUncompressedData)
421 chunkLen := 4 + len(uncompressed)
422
423 // Attempt compressing.
424 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
425 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
426
427 // Check if we should use this, or store as uncompressed instead.
428 if n2 > 0 {
429 chunkType = uint8(chunkTypeCompressedData)
430 chunkLen = 4 + n + n2
431 obuf = obuf[:obufHeaderLen+n+n2]
432 } else {
433 // copy uncompressed
434 copy(obuf[obufHeaderLen:], uncompressed)
435 }
436
437 // Fill in the per-chunk header that comes before the body.
438 obuf[0] = chunkType
439 obuf[1] = uint8(chunkLen >> 0)
440 obuf[2] = uint8(chunkLen >> 8)
441 obuf[3] = uint8(chunkLen >> 16)
442 obuf[4] = uint8(checksum >> 0)
443 obuf[5] = uint8(checksum >> 8)
444 obuf[6] = uint8(checksum >> 16)
445 obuf[7] = uint8(checksum >> 24)
446
447 // Queue final output.
448 res.b = obuf
449 output <- res
450 }()
451 }
452 return nil
453 }
454
455 func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
456 if w.customEnc != nil {
457 if ret := w.customEnc(obuf, uncompressed); ret >= 0 {
458 return ret
459 }
460 }
461 if w.snappy {
462 switch w.level {
463 case levelFast:
464 return encodeBlockSnappy(obuf, uncompressed)
465 case levelBetter:
466 return encodeBlockBetterSnappy(obuf, uncompressed)
467 case levelBest:
468 return encodeBlockBestSnappy(obuf, uncompressed)
469 }
470 return 0
471 }
472 switch w.level {
473 case levelFast:
474 return encodeBlock(obuf, uncompressed)
475 case levelBetter:
476 return encodeBlockBetter(obuf, uncompressed)
477 case levelBest:
478 return encodeBlockBest(obuf, uncompressed, nil)
479 }
480 return 0
481 }
482
483 func (w *Writer) write(p []byte) (nRet int, errRet error) {
484 if err := w.err(nil); err != nil {
485 return 0, err
486 }
487 if w.concurrency == 1 {
488 return w.writeSync(p)
489 }
490
491 // Spawn goroutine and write block to output channel.
492 for len(p) > 0 {
493 if !w.wroteStreamHeader {
494 w.wroteStreamHeader = true
495 hWriter := make(chan result)
496 w.output <- hWriter
497 if w.snappy {
498 hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
499 } else {
500 hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
501 }
502 }
503
504 var uncompressed []byte
505 if len(p) > w.blockSize {
506 uncompressed, p = p[:w.blockSize], p[w.blockSize:]
507 } else {
508 uncompressed, p = p, nil
509 }
510
511 // Copy input.
512 // If the block is incompressible, this is used for the result.
513 inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
514 obuf := w.buffers.Get().([]byte)[:w.obufLen]
515 copy(inbuf[obufHeaderLen:], uncompressed)
516 uncompressed = inbuf[obufHeaderLen:]
517
518 output := make(chan result)
519 // Queue output now, so we keep order.
520 w.output <- output
521 res := result{
522 startOffset: w.uncompWritten,
523 }
524 w.uncompWritten += int64(len(uncompressed))
525
526 go func() {
527 checksum := crc(uncompressed)
528
529 // Set to uncompressed.
530 chunkType := uint8(chunkTypeUncompressedData)
531 chunkLen := 4 + len(uncompressed)
532
533 // Attempt compressing.
534 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
535 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
536
537 // Check if we should use this, or store as uncompressed instead.
538 if n2 > 0 {
539 chunkType = uint8(chunkTypeCompressedData)
540 chunkLen = 4 + n + n2
541 obuf = obuf[:obufHeaderLen+n+n2]
542 } else {
543 // Use input as output.
544 obuf, inbuf = inbuf, obuf
545 }
546
547 // Fill in the per-chunk header that comes before the body.
548 obuf[0] = chunkType
549 obuf[1] = uint8(chunkLen >> 0)
550 obuf[2] = uint8(chunkLen >> 8)
551 obuf[3] = uint8(chunkLen >> 16)
552 obuf[4] = uint8(checksum >> 0)
553 obuf[5] = uint8(checksum >> 8)
554 obuf[6] = uint8(checksum >> 16)
555 obuf[7] = uint8(checksum >> 24)
556
557 // Queue final output.
558 res.b = obuf
559 output <- res
560
561 // Put unused buffer back in pool.
562 w.buffers.Put(inbuf)
563 }()
564 nRet += len(uncompressed)
565 }
566 return nRet, nil
567 }
568
569 // writeFull is a special version of write that will always write the full buffer.
570 // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
571 // The data will be written as a single block.
572 // The caller is not allowed to use inbuf after this function has been called.
573 func (w *Writer) writeFull(inbuf []byte) (errRet error) {
574 if err := w.err(nil); err != nil {
575 return err
576 }
577
578 if w.concurrency == 1 {
579 _, err := w.writeSync(inbuf[obufHeaderLen:])
580 if cap(inbuf) >= w.obufLen {
581 w.buffers.Put(inbuf)
582 }
583 return err
584 }
585
586 // Spawn goroutine and write block to output channel.
587 if !w.wroteStreamHeader {
588 w.wroteStreamHeader = true
589 hWriter := make(chan result)
590 w.output <- hWriter
591 if w.snappy {
592 hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
593 } else {
594 hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
595 }
596 }
597
598 // Get an output buffer.
599 obuf := w.buffers.Get().([]byte)[:w.obufLen]
600 uncompressed := inbuf[obufHeaderLen:]
601
602 output := make(chan result)
603 // Queue output now, so we keep order.
604 w.output <- output
605 res := result{
606 startOffset: w.uncompWritten,
607 }
608 w.uncompWritten += int64(len(uncompressed))
609
610 go func() {
611 checksum := crc(uncompressed)
612
613 // Set to uncompressed.
614 chunkType := uint8(chunkTypeUncompressedData)
615 chunkLen := 4 + len(uncompressed)
616
617 // Attempt compressing.
618 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
619 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
620
621 // Check if we should use this, or store as uncompressed instead.
622 if n2 > 0 {
623 chunkType = uint8(chunkTypeCompressedData)
624 chunkLen = 4 + n + n2
625 obuf = obuf[:obufHeaderLen+n+n2]
626 } else {
627 // Use input as output.
628 obuf, inbuf = inbuf, obuf
629 }
630
631 // Fill in the per-chunk header that comes before the body.
632 obuf[0] = chunkType
633 obuf[1] = uint8(chunkLen >> 0)
634 obuf[2] = uint8(chunkLen >> 8)
635 obuf[3] = uint8(chunkLen >> 16)
636 obuf[4] = uint8(checksum >> 0)
637 obuf[5] = uint8(checksum >> 8)
638 obuf[6] = uint8(checksum >> 16)
639 obuf[7] = uint8(checksum >> 24)
640
641 // Queue final output.
642 res.b = obuf
643 output <- res
644
645 // Put unused buffer back in pool.
646 w.buffers.Put(inbuf)
647 }()
648 return nil
649 }
650
651 func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
652 if err := w.err(nil); err != nil {
653 return 0, err
654 }
655 if !w.wroteStreamHeader {
656 w.wroteStreamHeader = true
657 var n int
658 var err error
659 if w.snappy {
660 n, err = w.writer.Write(magicChunkSnappyBytes)
661 } else {
662 n, err = w.writer.Write(magicChunkBytes)
663 }
664 if err != nil {
665 return 0, w.err(err)
666 }
667 if n != len(magicChunk) {
668 return 0, w.err(io.ErrShortWrite)
669 }
670 w.written += int64(n)
671 }
672
673 for len(p) > 0 {
674 var uncompressed []byte
675 if len(p) > w.blockSize {
676 uncompressed, p = p[:w.blockSize], p[w.blockSize:]
677 } else {
678 uncompressed, p = p, nil
679 }
680
681 obuf := w.buffers.Get().([]byte)[:w.obufLen]
682 checksum := crc(uncompressed)
683
684 // Set to uncompressed.
685 chunkType := uint8(chunkTypeUncompressedData)
686 chunkLen := 4 + len(uncompressed)
687
688 // Attempt compressing.
689 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
690 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
691
692 if n2 > 0 {
693 chunkType = uint8(chunkTypeCompressedData)
694 chunkLen = 4 + n + n2
695 obuf = obuf[:obufHeaderLen+n+n2]
696 } else {
697 obuf = obuf[:8]
698 }
699
700 // Fill in the per-chunk header that comes before the body.
701 obuf[0] = chunkType
702 obuf[1] = uint8(chunkLen >> 0)
703 obuf[2] = uint8(chunkLen >> 8)
704 obuf[3] = uint8(chunkLen >> 16)
705 obuf[4] = uint8(checksum >> 0)
706 obuf[5] = uint8(checksum >> 8)
707 obuf[6] = uint8(checksum >> 16)
708 obuf[7] = uint8(checksum >> 24)
709
710 n, err := w.writer.Write(obuf)
711 if err != nil {
712 return 0, w.err(err)
713 }
714 if n != len(obuf) {
715 return 0, w.err(io.ErrShortWrite)
716 }
717 w.err(w.index.add(w.written, w.uncompWritten))
718 w.written += int64(n)
719 w.uncompWritten += int64(len(uncompressed))
720
721 if chunkType == chunkTypeUncompressedData {
722 // Write uncompressed data.
723 n, err := w.writer.Write(uncompressed)
724 if err != nil {
725 return 0, w.err(err)
726 }
727 if n != len(uncompressed) {
728 return 0, w.err(io.ErrShortWrite)
729 }
730 w.written += int64(n)
731 }
732 w.buffers.Put(obuf)
733 // Queue final output.
734 nRet += len(uncompressed)
735 }
736 return nRet, nil
737 }
738
739 // AsyncFlush writes any buffered bytes to a block and starts compressing it.
740 // It does not wait for the output has been written as Flush() does.
741 func (w *Writer) AsyncFlush() error {
742 if err := w.err(nil); err != nil {
743 return err
744 }
745
746 // Queue any data still in input buffer.
747 if len(w.ibuf) != 0 {
748 if !w.wroteStreamHeader {
749 _, err := w.writeSync(w.ibuf)
750 w.ibuf = w.ibuf[:0]
751 return w.err(err)
752 } else {
753 _, err := w.write(w.ibuf)
754 w.ibuf = w.ibuf[:0]
755 err = w.err(err)
756 if err != nil {
757 return err
758 }
759 }
760 }
761 return w.err(nil)
762 }
763
764 // Flush flushes the Writer to its underlying io.Writer.
765 // This does not apply padding.
766 func (w *Writer) Flush() error {
767 if err := w.AsyncFlush(); err != nil {
768 return err
769 }
770 if w.output == nil {
771 return w.err(nil)
772 }
773
774 // Send empty buffer
775 res := make(chan result)
776 w.output <- res
777 // Block until this has been picked up.
778 res <- result{b: nil, startOffset: w.uncompWritten}
779 // When it is closed, we have flushed.
780 <-res
781 return w.err(nil)
782 }
783
784 // Close calls Flush and then closes the Writer.
785 // Calling Close multiple times is ok,
786 // but calling CloseIndex after this will make it not return the index.
787 func (w *Writer) Close() error {
788 _, err := w.closeIndex(w.appendIndex)
789 return err
790 }
791
792 // CloseIndex calls Close and returns an index on first call.
793 // This is not required if you are only adding index to a stream.
794 func (w *Writer) CloseIndex() ([]byte, error) {
795 return w.closeIndex(true)
796 }
797
798 func (w *Writer) closeIndex(idx bool) ([]byte, error) {
799 err := w.Flush()
800 if w.output != nil {
801 close(w.output)
802 w.writerWg.Wait()
803 w.output = nil
804 }
805
806 var index []byte
807 if w.err(err) == nil && w.writer != nil {
808 // Create index.
809 if idx {
810 compSize := int64(-1)
811 if w.pad <= 1 {
812 compSize = w.written
813 }
814 index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
815 // Count as written for padding.
816 if w.appendIndex {
817 w.written += int64(len(index))
818 }
819 }
820
821 if w.pad > 1 {
822 tmp := w.ibuf[:0]
823 if len(index) > 0 {
824 // Allocate another buffer.
825 tmp = w.buffers.Get().([]byte)[:0]
826 defer w.buffers.Put(tmp)
827 }
828 add := calcSkippableFrame(w.written, int64(w.pad))
829 frame, err := skippableFrame(tmp, add, w.randSrc)
830 if err = w.err(err); err != nil {
831 return nil, err
832 }
833 n, err2 := w.writer.Write(frame)
834 if err2 == nil && n != len(frame) {
835 err2 = io.ErrShortWrite
836 }
837 _ = w.err(err2)
838 }
839 if len(index) > 0 && w.appendIndex {
840 n, err2 := w.writer.Write(index)
841 if err2 == nil && n != len(index) {
842 err2 = io.ErrShortWrite
843 }
844 _ = w.err(err2)
845 }
846 }
847 err = w.err(errClosed)
848 if err == errClosed {
849 return index, nil
850 }
851 return nil, err
852 }
853
854 // calcSkippableFrame will return a total size to be added for written
855 // to be divisible by multiple.
856 // The value will always be > skippableFrameHeader.
857 // The function will panic if written < 0 or wantMultiple <= 0.
858 func calcSkippableFrame(written, wantMultiple int64) int {
859 if wantMultiple <= 0 {
860 panic("wantMultiple <= 0")
861 }
862 if written < 0 {
863 panic("written < 0")
864 }
865 leftOver := written % wantMultiple
866 if leftOver == 0 {
867 return 0
868 }
869 toAdd := wantMultiple - leftOver
870 for toAdd < skippableFrameHeader {
871 toAdd += wantMultiple
872 }
873 return int(toAdd)
874 }
875
876 // skippableFrame will add a skippable frame with a total size of bytes.
877 // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
878 func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
879 if total == 0 {
880 return dst, nil
881 }
882 if total < skippableFrameHeader {
883 return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
884 }
885 if int64(total) >= maxBlockSize+skippableFrameHeader {
886 return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
887 }
888 // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
889 dst = append(dst, chunkTypePadding)
890 f := uint32(total - skippableFrameHeader)
891 // Add chunk length.
892 dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
893 // Add data
894 start := len(dst)
895 dst = append(dst, make([]byte, f)...)
896 _, err := io.ReadFull(r, dst[start:])
897 return dst, err
898 }
899
900 var errClosed = errors.New("s2: Writer is closed")
901
902 // WriterOption is an option for creating a encoder.
903 type WriterOption func(*Writer) error
904
905 // WriterConcurrency will set the concurrency,
906 // meaning the maximum number of decoders to run concurrently.
907 // The value supplied must be at least 1.
908 // By default this will be set to GOMAXPROCS.
909 func WriterConcurrency(n int) WriterOption {
910 return func(w *Writer) error {
911 if n <= 0 {
912 return errors.New("concurrency must be at least 1")
913 }
914 w.concurrency = n
915 return nil
916 }
917 }
918
919 // WriterAddIndex will append an index to the end of a stream
920 // when it is closed.
921 func WriterAddIndex() WriterOption {
922 return func(w *Writer) error {
923 w.appendIndex = true
924 return nil
925 }
926 }
927
928 // WriterBetterCompression will enable better compression.
929 // EncodeBetter compresses better than Encode but typically with a
930 // 10-40% speed decrease on both compression and decompression.
931 func WriterBetterCompression() WriterOption {
932 return func(w *Writer) error {
933 w.level = levelBetter
934 return nil
935 }
936 }
937
938 // WriterBestCompression will enable better compression.
939 // EncodeBest compresses better than Encode but typically with a
940 // big speed decrease on compression.
941 func WriterBestCompression() WriterOption {
942 return func(w *Writer) error {
943 w.level = levelBest
944 return nil
945 }
946 }
947
948 // WriterUncompressed will bypass compression.
949 // The stream will be written as uncompressed blocks only.
950 // If concurrency is > 1 CRC and output will still be done async.
951 func WriterUncompressed() WriterOption {
952 return func(w *Writer) error {
953 w.level = levelUncompressed
954 return nil
955 }
956 }
957
958 // WriterBufferDone will perform a callback when EncodeBuffer has finished
959 // writing a buffer to the output and the buffer can safely be reused.
960 // If the buffer was split into several blocks, it will be sent after the last block.
961 // Callbacks will not be done concurrently.
962 func WriterBufferDone(fn func(b []byte)) WriterOption {
963 return func(w *Writer) error {
964 w.bufferCB = fn
965 return nil
966 }
967 }
968
969 // WriterBlockSize allows to override the default block size.
970 // Blocks will be this size or smaller.
971 // Minimum size is 4KB and maximum size is 4MB.
972 //
973 // Bigger blocks may give bigger throughput on systems with many cores,
974 // and will increase compression slightly, but it will limit the possible
975 // concurrency for smaller payloads for both encoding and decoding.
976 // Default block size is 1MB.
977 //
978 // When writing Snappy compatible output using WriterSnappyCompat,
979 // the maximum block size is 64KB.
980 func WriterBlockSize(n int) WriterOption {
981 return func(w *Writer) error {
982 if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
983 return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
984 }
985 if n > maxBlockSize || n < minBlockSize {
986 return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
987 }
988 w.blockSize = n
989 return nil
990 }
991 }
992
993 // WriterPadding will add padding to all output so the size will be a multiple of n.
994 // This can be used to obfuscate the exact output size or make blocks of a certain size.
995 // The contents will be a skippable frame, so it will be invisible by the decoder.
996 // n must be > 0 and <= 4MB.
997 // The padded area will be filled with data from crypto/rand.Reader.
998 // The padding will be applied whenever Close is called on the writer.
999 func WriterPadding(n int) WriterOption {
1000 return func(w *Writer) error {
1001 if n <= 0 {
1002 return fmt.Errorf("s2: padding must be at least 1")
1003 }
1004 // No need to waste our time.
1005 if n == 1 {
1006 w.pad = 0
1007 }
1008 if n > maxBlockSize {
1009 return fmt.Errorf("s2: padding must less than 4MB")
1010 }
1011 w.pad = n
1012 return nil
1013 }
1014 }
1015
1016 // WriterPaddingSrc will get random data for padding from the supplied source.
1017 // By default crypto/rand is used.
1018 func WriterPaddingSrc(reader io.Reader) WriterOption {
1019 return func(w *Writer) error {
1020 w.randSrc = reader
1021 return nil
1022 }
1023 }
1024
1025 // WriterSnappyCompat will write snappy compatible output.
1026 // The output can be decompressed using either snappy or s2.
1027 // If block size is more than 64KB it is set to that.
1028 func WriterSnappyCompat() WriterOption {
1029 return func(w *Writer) error {
1030 w.snappy = true
1031 if w.blockSize > 64<<10 {
1032 // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective.
1033 // And allows us to skip some size checks.
1034 w.blockSize = (64 << 10) - 8
1035 }
1036 return nil
1037 }
1038 }
1039
1040 // WriterFlushOnWrite will compress blocks on each call to the Write function.
1041 //
1042 // This is quite inefficient as blocks size will depend on the write size.
1043 //
1044 // Use WriterConcurrency(1) to also make sure that output is flushed.
1045 // When Write calls return, otherwise they will be written when compression is done.
1046 func WriterFlushOnWrite() WriterOption {
1047 return func(w *Writer) error {
1048 w.flushOnWrite = true
1049 return nil
1050 }
1051 }
1052
1053 // WriterCustomEncoder allows to override the encoder for blocks on the stream.
1054 // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
1055 // Block size (initial varint) should not be added by the encoder.
1056 // Returning value 0 indicates the block could not be compressed.
1057 // Returning a negative value indicates that compression should be attempted.
1058 // The function should expect to be called concurrently.
1059 func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
1060 return func(w *Writer) error {
1061 w.customEnc = fn
1062 return nil
1063 }
1064 }
1065