writer.go raw

   1  // Copyright 2011 The Snappy-Go Authors. All rights reserved.
   2  // Copyright (c) 2019+ Klaus Post. All rights reserved.
   3  // Use of this source code is governed by a BSD-style
   4  // license that can be found in the LICENSE file.
   5  
   6  package s2
   7  
   8  import (
   9  	"crypto/rand"
  10  	"encoding/binary"
  11  	"errors"
  12  	"fmt"
  13  	"io"
  14  	"runtime"
  15  	"sync"
  16  
  17  	"github.com/klauspost/compress/internal/race"
  18  )
  19  
  20  const (
  21  	levelUncompressed = iota + 1
  22  	levelFast
  23  	levelBetter
  24  	levelBest
  25  )
  26  
  27  // NewWriter returns a new Writer that compresses to w, using the
  28  // framing format described at
  29  // https://github.com/google/snappy/blob/master/framing_format.txt
  30  //
  31  // Users must call Close to guarantee all data has been forwarded to
  32  // the underlying io.Writer and that resources are released.
  33  // They may also call Flush zero or more times before calling Close.
  34  func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
  35  	w2 := Writer{
  36  		blockSize:   defaultBlockSize,
  37  		concurrency: runtime.GOMAXPROCS(0),
  38  		randSrc:     rand.Reader,
  39  		level:       levelFast,
  40  	}
  41  	for _, opt := range opts {
  42  		if err := opt(&w2); err != nil {
  43  			w2.errState = err
  44  			return &w2
  45  		}
  46  	}
  47  	w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
  48  	w2.paramsOK = true
  49  	w2.ibuf = make([]byte, 0, w2.blockSize)
  50  	w2.buffers.New = func() any {
  51  		return make([]byte, w2.obufLen)
  52  	}
  53  	w2.Reset(w)
  54  	return &w2
  55  }
  56  
  57  // Writer is an io.Writer that can write Snappy-compressed bytes.
  58  type Writer struct {
  59  	errMu    sync.Mutex
  60  	errState error
  61  
  62  	// ibuf is a buffer for the incoming (uncompressed) bytes.
  63  	ibuf []byte
  64  
  65  	blockSize     int
  66  	obufLen       int
  67  	concurrency   int
  68  	written       int64
  69  	uncompWritten int64 // Bytes sent to compression
  70  	output        chan chan result
  71  	buffers       sync.Pool
  72  	pad           int
  73  
  74  	writer    io.Writer
  75  	randSrc   io.Reader
  76  	writerWg  sync.WaitGroup
  77  	index     Index
  78  	customEnc func(dst, src []byte) int
  79  
  80  	// wroteStreamHeader is whether we have written the stream header.
  81  	wroteStreamHeader bool
  82  	paramsOK          bool
  83  	snappy            bool
  84  	flushOnWrite      bool
  85  	appendIndex       bool
  86  	bufferCB          func([]byte)
  87  	level             uint8
  88  }
  89  
  90  type result struct {
  91  	b []byte
  92  	// return when writing
  93  	ret []byte
  94  	// Uncompressed start offset
  95  	startOffset int64
  96  }
  97  
  98  // err returns the previously set error.
  99  // If no error has been set it is set to err if not nil.
 100  func (w *Writer) err(err error) error {
 101  	w.errMu.Lock()
 102  	errSet := w.errState
 103  	if errSet == nil && err != nil {
 104  		w.errState = err
 105  		errSet = err
 106  	}
 107  	w.errMu.Unlock()
 108  	return errSet
 109  }
 110  
 111  // Reset discards the writer's state and switches the Snappy writer to write to w.
 112  // This permits reusing a Writer rather than allocating a new one.
 113  func (w *Writer) Reset(writer io.Writer) {
 114  	if !w.paramsOK {
 115  		return
 116  	}
 117  	// Close previous writer, if any.
 118  	if w.output != nil {
 119  		close(w.output)
 120  		w.writerWg.Wait()
 121  		w.output = nil
 122  	}
 123  	w.errState = nil
 124  	w.ibuf = w.ibuf[:0]
 125  	w.wroteStreamHeader = false
 126  	w.written = 0
 127  	w.writer = writer
 128  	w.uncompWritten = 0
 129  	w.index.reset(w.blockSize)
 130  
 131  	// If we didn't get a writer, stop here.
 132  	if writer == nil {
 133  		return
 134  	}
 135  	// If no concurrency requested, don't spin up writer goroutine.
 136  	if w.concurrency == 1 {
 137  		return
 138  	}
 139  
 140  	toWrite := make(chan chan result, w.concurrency)
 141  	w.output = toWrite
 142  	w.writerWg.Add(1)
 143  
 144  	// Start a writer goroutine that will write all output in order.
 145  	go func() {
 146  		defer w.writerWg.Done()
 147  
 148  		// Get a queued write.
 149  		for write := range toWrite {
 150  			// Wait for the data to be available.
 151  			input := <-write
 152  			if input.ret != nil && w.bufferCB != nil {
 153  				w.bufferCB(input.ret)
 154  				input.ret = nil
 155  			}
 156  			in := input.b
 157  			if len(in) > 0 {
 158  				if w.err(nil) == nil {
 159  					// Don't expose data from previous buffers.
 160  					toWrite := in[:len(in):len(in)]
 161  					// Write to output.
 162  					n, err := writer.Write(toWrite)
 163  					if err == nil && n != len(toWrite) {
 164  						err = io.ErrShortBuffer
 165  					}
 166  					_ = w.err(err)
 167  					w.err(w.index.add(w.written, input.startOffset))
 168  					w.written += int64(n)
 169  				}
 170  			}
 171  			if cap(in) >= w.obufLen {
 172  				w.buffers.Put(in)
 173  			}
 174  			// close the incoming write request.
 175  			// This can be used for synchronizing flushes.
 176  			close(write)
 177  		}
 178  	}()
 179  }
 180  
 181  // Write satisfies the io.Writer interface.
 182  func (w *Writer) Write(p []byte) (nRet int, errRet error) {
 183  	if err := w.err(nil); err != nil {
 184  		return 0, err
 185  	}
 186  	if w.flushOnWrite {
 187  		return w.write(p)
 188  	}
 189  	// If we exceed the input buffer size, start writing
 190  	for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
 191  		var n int
 192  		if len(w.ibuf) == 0 {
 193  			// Large write, empty buffer.
 194  			// Write directly from p to avoid copy.
 195  			n, _ = w.write(p)
 196  		} else {
 197  			n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
 198  			w.ibuf = w.ibuf[:len(w.ibuf)+n]
 199  			w.write(w.ibuf)
 200  			w.ibuf = w.ibuf[:0]
 201  		}
 202  		nRet += n
 203  		p = p[n:]
 204  	}
 205  	if err := w.err(nil); err != nil {
 206  		return nRet, err
 207  	}
 208  	// p should always be able to fit into w.ibuf now.
 209  	n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
 210  	w.ibuf = w.ibuf[:len(w.ibuf)+n]
 211  	nRet += n
 212  	return nRet, nil
 213  }
 214  
 215  // ReadFrom implements the io.ReaderFrom interface.
 216  // Using this is typically more efficient since it avoids a memory copy.
 217  // ReadFrom reads data from r until EOF or error.
 218  // The return value n is the number of bytes read.
 219  // Any error except io.EOF encountered during the read is also returned.
 220  func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
 221  	if err := w.err(nil); err != nil {
 222  		return 0, err
 223  	}
 224  	if len(w.ibuf) > 0 {
 225  		err := w.AsyncFlush()
 226  		if err != nil {
 227  			return 0, err
 228  		}
 229  	}
 230  	if br, ok := r.(byter); ok {
 231  		buf := br.Bytes()
 232  		if err := w.EncodeBuffer(buf); err != nil {
 233  			return 0, err
 234  		}
 235  		return int64(len(buf)), w.AsyncFlush()
 236  	}
 237  	for {
 238  		inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
 239  		n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
 240  		if err != nil {
 241  			if err == io.ErrUnexpectedEOF {
 242  				err = io.EOF
 243  			}
 244  			if err != io.EOF {
 245  				return n, w.err(err)
 246  			}
 247  		}
 248  		if n2 == 0 {
 249  			if cap(inbuf) >= w.obufLen {
 250  				w.buffers.Put(inbuf)
 251  			}
 252  			break
 253  		}
 254  		n += int64(n2)
 255  		err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
 256  		if w.err(err2) != nil {
 257  			break
 258  		}
 259  
 260  		if err != nil {
 261  			// We got EOF and wrote everything
 262  			break
 263  		}
 264  	}
 265  
 266  	return n, w.err(nil)
 267  }
 268  
 269  // AddSkippableBlock will add a skippable block to the stream.
 270  // The ID must be 0x80-0xfe (inclusive).
 271  // Length of the skippable block must be <= 16777215 bytes.
 272  func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
 273  	if err := w.err(nil); err != nil {
 274  		return err
 275  	}
 276  	if len(data) == 0 {
 277  		return nil
 278  	}
 279  	if id < 0x80 || id > chunkTypePadding {
 280  		return fmt.Errorf("invalid skippable block id %x", id)
 281  	}
 282  	if len(data) > maxChunkSize {
 283  		return fmt.Errorf("skippable block excessed maximum size")
 284  	}
 285  	var header [4]byte
 286  	chunkLen := len(data)
 287  	header[0] = id
 288  	header[1] = uint8(chunkLen >> 0)
 289  	header[2] = uint8(chunkLen >> 8)
 290  	header[3] = uint8(chunkLen >> 16)
 291  	if w.concurrency == 1 {
 292  		write := func(b []byte) error {
 293  			n, err := w.writer.Write(b)
 294  			if err = w.err(err); err != nil {
 295  				return err
 296  			}
 297  			if n != len(b) {
 298  				return w.err(io.ErrShortWrite)
 299  			}
 300  			w.written += int64(n)
 301  			return w.err(nil)
 302  		}
 303  		if !w.wroteStreamHeader {
 304  			w.wroteStreamHeader = true
 305  			if w.snappy {
 306  				if err := write([]byte(magicChunkSnappy)); err != nil {
 307  					return err
 308  				}
 309  			} else {
 310  				if err := write([]byte(magicChunk)); err != nil {
 311  					return err
 312  				}
 313  			}
 314  		}
 315  		if err := write(header[:]); err != nil {
 316  			return err
 317  		}
 318  		return write(data)
 319  	}
 320  
 321  	// Create output...
 322  	if !w.wroteStreamHeader {
 323  		w.wroteStreamHeader = true
 324  		hWriter := make(chan result)
 325  		w.output <- hWriter
 326  		if w.snappy {
 327  			hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
 328  		} else {
 329  			hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
 330  		}
 331  	}
 332  
 333  	// Copy input.
 334  	inbuf := w.buffers.Get().([]byte)[:4]
 335  	copy(inbuf, header[:])
 336  	inbuf = append(inbuf, data...)
 337  
 338  	output := make(chan result, 1)
 339  	// Queue output.
 340  	w.output <- output
 341  	output <- result{startOffset: w.uncompWritten, b: inbuf}
 342  
 343  	return nil
 344  }
 345  
 346  // EncodeBuffer will add a buffer to the stream.
 347  // This is the fastest way to encode a stream,
 348  // but the input buffer cannot be written to by the caller
 349  // until Flush or Close has been called when concurrency != 1.
 350  //
 351  // Use the WriterBufferDone to receive a callback when the buffer is done
 352  // Processing.
 353  //
 354  // Note that input is not buffered.
 355  // This means that each write will result in discrete blocks being created.
 356  // For buffered writes, use the regular Write function.
 357  func (w *Writer) EncodeBuffer(buf []byte) (err error) {
 358  	if err := w.err(nil); err != nil {
 359  		return err
 360  	}
 361  
 362  	if w.flushOnWrite {
 363  		_, err := w.write(buf)
 364  		return err
 365  	}
 366  	// Flush queued data first.
 367  	if len(w.ibuf) > 0 {
 368  		err := w.AsyncFlush()
 369  		if err != nil {
 370  			return err
 371  		}
 372  	}
 373  	if w.concurrency == 1 {
 374  		_, err := w.writeSync(buf)
 375  		if w.bufferCB != nil {
 376  			w.bufferCB(buf)
 377  		}
 378  		return err
 379  	}
 380  
 381  	// Spawn goroutine and write block to output channel.
 382  	if !w.wroteStreamHeader {
 383  		w.wroteStreamHeader = true
 384  		hWriter := make(chan result)
 385  		w.output <- hWriter
 386  		if w.snappy {
 387  			hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
 388  		} else {
 389  			hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
 390  		}
 391  	}
 392  	orgBuf := buf
 393  	for len(buf) > 0 {
 394  		// Cut input.
 395  		uncompressed := buf
 396  		if len(uncompressed) > w.blockSize {
 397  			uncompressed = uncompressed[:w.blockSize]
 398  		}
 399  		buf = buf[len(uncompressed):]
 400  		// Get an output buffer.
 401  		obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
 402  		race.WriteSlice(obuf)
 403  
 404  		output := make(chan result)
 405  		// Queue output now, so we keep order.
 406  		w.output <- output
 407  		res := result{
 408  			startOffset: w.uncompWritten,
 409  		}
 410  		w.uncompWritten += int64(len(uncompressed))
 411  		if len(buf) == 0 && w.bufferCB != nil {
 412  			res.ret = orgBuf
 413  		}
 414  		go func() {
 415  			race.ReadSlice(uncompressed)
 416  
 417  			checksum := crc(uncompressed)
 418  
 419  			// Set to uncompressed.
 420  			chunkType := uint8(chunkTypeUncompressedData)
 421  			chunkLen := 4 + len(uncompressed)
 422  
 423  			// Attempt compressing.
 424  			n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
 425  			n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
 426  
 427  			// Check if we should use this, or store as uncompressed instead.
 428  			if n2 > 0 {
 429  				chunkType = uint8(chunkTypeCompressedData)
 430  				chunkLen = 4 + n + n2
 431  				obuf = obuf[:obufHeaderLen+n+n2]
 432  			} else {
 433  				// copy uncompressed
 434  				copy(obuf[obufHeaderLen:], uncompressed)
 435  			}
 436  
 437  			// Fill in the per-chunk header that comes before the body.
 438  			obuf[0] = chunkType
 439  			obuf[1] = uint8(chunkLen >> 0)
 440  			obuf[2] = uint8(chunkLen >> 8)
 441  			obuf[3] = uint8(chunkLen >> 16)
 442  			obuf[4] = uint8(checksum >> 0)
 443  			obuf[5] = uint8(checksum >> 8)
 444  			obuf[6] = uint8(checksum >> 16)
 445  			obuf[7] = uint8(checksum >> 24)
 446  
 447  			// Queue final output.
 448  			res.b = obuf
 449  			output <- res
 450  		}()
 451  	}
 452  	return nil
 453  }
 454  
 455  func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
 456  	if w.customEnc != nil {
 457  		if ret := w.customEnc(obuf, uncompressed); ret >= 0 {
 458  			return ret
 459  		}
 460  	}
 461  	if w.snappy {
 462  		switch w.level {
 463  		case levelFast:
 464  			return encodeBlockSnappy(obuf, uncompressed)
 465  		case levelBetter:
 466  			return encodeBlockBetterSnappy(obuf, uncompressed)
 467  		case levelBest:
 468  			return encodeBlockBestSnappy(obuf, uncompressed)
 469  		}
 470  		return 0
 471  	}
 472  	switch w.level {
 473  	case levelFast:
 474  		return encodeBlock(obuf, uncompressed)
 475  	case levelBetter:
 476  		return encodeBlockBetter(obuf, uncompressed)
 477  	case levelBest:
 478  		return encodeBlockBest(obuf, uncompressed, nil)
 479  	}
 480  	return 0
 481  }
 482  
 483  func (w *Writer) write(p []byte) (nRet int, errRet error) {
 484  	if err := w.err(nil); err != nil {
 485  		return 0, err
 486  	}
 487  	if w.concurrency == 1 {
 488  		return w.writeSync(p)
 489  	}
 490  
 491  	// Spawn goroutine and write block to output channel.
 492  	for len(p) > 0 {
 493  		if !w.wroteStreamHeader {
 494  			w.wroteStreamHeader = true
 495  			hWriter := make(chan result)
 496  			w.output <- hWriter
 497  			if w.snappy {
 498  				hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
 499  			} else {
 500  				hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
 501  			}
 502  		}
 503  
 504  		var uncompressed []byte
 505  		if len(p) > w.blockSize {
 506  			uncompressed, p = p[:w.blockSize], p[w.blockSize:]
 507  		} else {
 508  			uncompressed, p = p, nil
 509  		}
 510  
 511  		// Copy input.
 512  		// If the block is incompressible, this is used for the result.
 513  		inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
 514  		obuf := w.buffers.Get().([]byte)[:w.obufLen]
 515  		copy(inbuf[obufHeaderLen:], uncompressed)
 516  		uncompressed = inbuf[obufHeaderLen:]
 517  
 518  		output := make(chan result)
 519  		// Queue output now, so we keep order.
 520  		w.output <- output
 521  		res := result{
 522  			startOffset: w.uncompWritten,
 523  		}
 524  		w.uncompWritten += int64(len(uncompressed))
 525  
 526  		go func() {
 527  			checksum := crc(uncompressed)
 528  
 529  			// Set to uncompressed.
 530  			chunkType := uint8(chunkTypeUncompressedData)
 531  			chunkLen := 4 + len(uncompressed)
 532  
 533  			// Attempt compressing.
 534  			n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
 535  			n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
 536  
 537  			// Check if we should use this, or store as uncompressed instead.
 538  			if n2 > 0 {
 539  				chunkType = uint8(chunkTypeCompressedData)
 540  				chunkLen = 4 + n + n2
 541  				obuf = obuf[:obufHeaderLen+n+n2]
 542  			} else {
 543  				// Use input as output.
 544  				obuf, inbuf = inbuf, obuf
 545  			}
 546  
 547  			// Fill in the per-chunk header that comes before the body.
 548  			obuf[0] = chunkType
 549  			obuf[1] = uint8(chunkLen >> 0)
 550  			obuf[2] = uint8(chunkLen >> 8)
 551  			obuf[3] = uint8(chunkLen >> 16)
 552  			obuf[4] = uint8(checksum >> 0)
 553  			obuf[5] = uint8(checksum >> 8)
 554  			obuf[6] = uint8(checksum >> 16)
 555  			obuf[7] = uint8(checksum >> 24)
 556  
 557  			// Queue final output.
 558  			res.b = obuf
 559  			output <- res
 560  
 561  			// Put unused buffer back in pool.
 562  			w.buffers.Put(inbuf)
 563  		}()
 564  		nRet += len(uncompressed)
 565  	}
 566  	return nRet, nil
 567  }
 568  
 569  // writeFull is a special version of write that will always write the full buffer.
 570  // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
 571  // The data will be written as a single block.
 572  // The caller is not allowed to use inbuf after this function has been called.
 573  func (w *Writer) writeFull(inbuf []byte) (errRet error) {
 574  	if err := w.err(nil); err != nil {
 575  		return err
 576  	}
 577  
 578  	if w.concurrency == 1 {
 579  		_, err := w.writeSync(inbuf[obufHeaderLen:])
 580  		if cap(inbuf) >= w.obufLen {
 581  			w.buffers.Put(inbuf)
 582  		}
 583  		return err
 584  	}
 585  
 586  	// Spawn goroutine and write block to output channel.
 587  	if !w.wroteStreamHeader {
 588  		w.wroteStreamHeader = true
 589  		hWriter := make(chan result)
 590  		w.output <- hWriter
 591  		if w.snappy {
 592  			hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
 593  		} else {
 594  			hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
 595  		}
 596  	}
 597  
 598  	// Get an output buffer.
 599  	obuf := w.buffers.Get().([]byte)[:w.obufLen]
 600  	uncompressed := inbuf[obufHeaderLen:]
 601  
 602  	output := make(chan result)
 603  	// Queue output now, so we keep order.
 604  	w.output <- output
 605  	res := result{
 606  		startOffset: w.uncompWritten,
 607  	}
 608  	w.uncompWritten += int64(len(uncompressed))
 609  
 610  	go func() {
 611  		checksum := crc(uncompressed)
 612  
 613  		// Set to uncompressed.
 614  		chunkType := uint8(chunkTypeUncompressedData)
 615  		chunkLen := 4 + len(uncompressed)
 616  
 617  		// Attempt compressing.
 618  		n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
 619  		n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
 620  
 621  		// Check if we should use this, or store as uncompressed instead.
 622  		if n2 > 0 {
 623  			chunkType = uint8(chunkTypeCompressedData)
 624  			chunkLen = 4 + n + n2
 625  			obuf = obuf[:obufHeaderLen+n+n2]
 626  		} else {
 627  			// Use input as output.
 628  			obuf, inbuf = inbuf, obuf
 629  		}
 630  
 631  		// Fill in the per-chunk header that comes before the body.
 632  		obuf[0] = chunkType
 633  		obuf[1] = uint8(chunkLen >> 0)
 634  		obuf[2] = uint8(chunkLen >> 8)
 635  		obuf[3] = uint8(chunkLen >> 16)
 636  		obuf[4] = uint8(checksum >> 0)
 637  		obuf[5] = uint8(checksum >> 8)
 638  		obuf[6] = uint8(checksum >> 16)
 639  		obuf[7] = uint8(checksum >> 24)
 640  
 641  		// Queue final output.
 642  		res.b = obuf
 643  		output <- res
 644  
 645  		// Put unused buffer back in pool.
 646  		w.buffers.Put(inbuf)
 647  	}()
 648  	return nil
 649  }
 650  
 651  func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
 652  	if err := w.err(nil); err != nil {
 653  		return 0, err
 654  	}
 655  	if !w.wroteStreamHeader {
 656  		w.wroteStreamHeader = true
 657  		var n int
 658  		var err error
 659  		if w.snappy {
 660  			n, err = w.writer.Write(magicChunkSnappyBytes)
 661  		} else {
 662  			n, err = w.writer.Write(magicChunkBytes)
 663  		}
 664  		if err != nil {
 665  			return 0, w.err(err)
 666  		}
 667  		if n != len(magicChunk) {
 668  			return 0, w.err(io.ErrShortWrite)
 669  		}
 670  		w.written += int64(n)
 671  	}
 672  
 673  	for len(p) > 0 {
 674  		var uncompressed []byte
 675  		if len(p) > w.blockSize {
 676  			uncompressed, p = p[:w.blockSize], p[w.blockSize:]
 677  		} else {
 678  			uncompressed, p = p, nil
 679  		}
 680  
 681  		obuf := w.buffers.Get().([]byte)[:w.obufLen]
 682  		checksum := crc(uncompressed)
 683  
 684  		// Set to uncompressed.
 685  		chunkType := uint8(chunkTypeUncompressedData)
 686  		chunkLen := 4 + len(uncompressed)
 687  
 688  		// Attempt compressing.
 689  		n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
 690  		n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
 691  
 692  		if n2 > 0 {
 693  			chunkType = uint8(chunkTypeCompressedData)
 694  			chunkLen = 4 + n + n2
 695  			obuf = obuf[:obufHeaderLen+n+n2]
 696  		} else {
 697  			obuf = obuf[:8]
 698  		}
 699  
 700  		// Fill in the per-chunk header that comes before the body.
 701  		obuf[0] = chunkType
 702  		obuf[1] = uint8(chunkLen >> 0)
 703  		obuf[2] = uint8(chunkLen >> 8)
 704  		obuf[3] = uint8(chunkLen >> 16)
 705  		obuf[4] = uint8(checksum >> 0)
 706  		obuf[5] = uint8(checksum >> 8)
 707  		obuf[6] = uint8(checksum >> 16)
 708  		obuf[7] = uint8(checksum >> 24)
 709  
 710  		n, err := w.writer.Write(obuf)
 711  		if err != nil {
 712  			return 0, w.err(err)
 713  		}
 714  		if n != len(obuf) {
 715  			return 0, w.err(io.ErrShortWrite)
 716  		}
 717  		w.err(w.index.add(w.written, w.uncompWritten))
 718  		w.written += int64(n)
 719  		w.uncompWritten += int64(len(uncompressed))
 720  
 721  		if chunkType == chunkTypeUncompressedData {
 722  			// Write uncompressed data.
 723  			n, err := w.writer.Write(uncompressed)
 724  			if err != nil {
 725  				return 0, w.err(err)
 726  			}
 727  			if n != len(uncompressed) {
 728  				return 0, w.err(io.ErrShortWrite)
 729  			}
 730  			w.written += int64(n)
 731  		}
 732  		w.buffers.Put(obuf)
 733  		// Queue final output.
 734  		nRet += len(uncompressed)
 735  	}
 736  	return nRet, nil
 737  }
 738  
 739  // AsyncFlush writes any buffered bytes to a block and starts compressing it.
 740  // It does not wait for the output has been written as Flush() does.
 741  func (w *Writer) AsyncFlush() error {
 742  	if err := w.err(nil); err != nil {
 743  		return err
 744  	}
 745  
 746  	// Queue any data still in input buffer.
 747  	if len(w.ibuf) != 0 {
 748  		if !w.wroteStreamHeader {
 749  			_, err := w.writeSync(w.ibuf)
 750  			w.ibuf = w.ibuf[:0]
 751  			return w.err(err)
 752  		} else {
 753  			_, err := w.write(w.ibuf)
 754  			w.ibuf = w.ibuf[:0]
 755  			err = w.err(err)
 756  			if err != nil {
 757  				return err
 758  			}
 759  		}
 760  	}
 761  	return w.err(nil)
 762  }
 763  
 764  // Flush flushes the Writer to its underlying io.Writer.
 765  // This does not apply padding.
 766  func (w *Writer) Flush() error {
 767  	if err := w.AsyncFlush(); err != nil {
 768  		return err
 769  	}
 770  	if w.output == nil {
 771  		return w.err(nil)
 772  	}
 773  
 774  	// Send empty buffer
 775  	res := make(chan result)
 776  	w.output <- res
 777  	// Block until this has been picked up.
 778  	res <- result{b: nil, startOffset: w.uncompWritten}
 779  	// When it is closed, we have flushed.
 780  	<-res
 781  	return w.err(nil)
 782  }
 783  
 784  // Close calls Flush and then closes the Writer.
 785  // Calling Close multiple times is ok,
 786  // but calling CloseIndex after this will make it not return the index.
 787  func (w *Writer) Close() error {
 788  	_, err := w.closeIndex(w.appendIndex)
 789  	return err
 790  }
 791  
 792  // CloseIndex calls Close and returns an index on first call.
 793  // This is not required if you are only adding index to a stream.
 794  func (w *Writer) CloseIndex() ([]byte, error) {
 795  	return w.closeIndex(true)
 796  }
 797  
 798  func (w *Writer) closeIndex(idx bool) ([]byte, error) {
 799  	err := w.Flush()
 800  	if w.output != nil {
 801  		close(w.output)
 802  		w.writerWg.Wait()
 803  		w.output = nil
 804  	}
 805  
 806  	var index []byte
 807  	if w.err(err) == nil && w.writer != nil {
 808  		// Create index.
 809  		if idx {
 810  			compSize := int64(-1)
 811  			if w.pad <= 1 {
 812  				compSize = w.written
 813  			}
 814  			index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
 815  			// Count as written for padding.
 816  			if w.appendIndex {
 817  				w.written += int64(len(index))
 818  			}
 819  		}
 820  
 821  		if w.pad > 1 {
 822  			tmp := w.ibuf[:0]
 823  			if len(index) > 0 {
 824  				// Allocate another buffer.
 825  				tmp = w.buffers.Get().([]byte)[:0]
 826  				defer w.buffers.Put(tmp)
 827  			}
 828  			add := calcSkippableFrame(w.written, int64(w.pad))
 829  			frame, err := skippableFrame(tmp, add, w.randSrc)
 830  			if err = w.err(err); err != nil {
 831  				return nil, err
 832  			}
 833  			n, err2 := w.writer.Write(frame)
 834  			if err2 == nil && n != len(frame) {
 835  				err2 = io.ErrShortWrite
 836  			}
 837  			_ = w.err(err2)
 838  		}
 839  		if len(index) > 0 && w.appendIndex {
 840  			n, err2 := w.writer.Write(index)
 841  			if err2 == nil && n != len(index) {
 842  				err2 = io.ErrShortWrite
 843  			}
 844  			_ = w.err(err2)
 845  		}
 846  	}
 847  	err = w.err(errClosed)
 848  	if err == errClosed {
 849  		return index, nil
 850  	}
 851  	return nil, err
 852  }
 853  
 854  // calcSkippableFrame will return a total size to be added for written
 855  // to be divisible by multiple.
 856  // The value will always be > skippableFrameHeader.
 857  // The function will panic if written < 0 or wantMultiple <= 0.
 858  func calcSkippableFrame(written, wantMultiple int64) int {
 859  	if wantMultiple <= 0 {
 860  		panic("wantMultiple <= 0")
 861  	}
 862  	if written < 0 {
 863  		panic("written < 0")
 864  	}
 865  	leftOver := written % wantMultiple
 866  	if leftOver == 0 {
 867  		return 0
 868  	}
 869  	toAdd := wantMultiple - leftOver
 870  	for toAdd < skippableFrameHeader {
 871  		toAdd += wantMultiple
 872  	}
 873  	return int(toAdd)
 874  }
 875  
 876  // skippableFrame will add a skippable frame with a total size of bytes.
 877  // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
 878  func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
 879  	if total == 0 {
 880  		return dst, nil
 881  	}
 882  	if total < skippableFrameHeader {
 883  		return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
 884  	}
 885  	if int64(total) >= maxBlockSize+skippableFrameHeader {
 886  		return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
 887  	}
 888  	// Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
 889  	dst = append(dst, chunkTypePadding)
 890  	f := uint32(total - skippableFrameHeader)
 891  	// Add chunk length.
 892  	dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
 893  	// Add data
 894  	start := len(dst)
 895  	dst = append(dst, make([]byte, f)...)
 896  	_, err := io.ReadFull(r, dst[start:])
 897  	return dst, err
 898  }
 899  
 900  var errClosed = errors.New("s2: Writer is closed")
 901  
 902  // WriterOption is an option for creating a encoder.
 903  type WriterOption func(*Writer) error
 904  
 905  // WriterConcurrency will set the concurrency,
 906  // meaning the maximum number of decoders to run concurrently.
 907  // The value supplied must be at least 1.
 908  // By default this will be set to GOMAXPROCS.
 909  func WriterConcurrency(n int) WriterOption {
 910  	return func(w *Writer) error {
 911  		if n <= 0 {
 912  			return errors.New("concurrency must be at least 1")
 913  		}
 914  		w.concurrency = n
 915  		return nil
 916  	}
 917  }
 918  
 919  // WriterAddIndex will append an index to the end of a stream
 920  // when it is closed.
 921  func WriterAddIndex() WriterOption {
 922  	return func(w *Writer) error {
 923  		w.appendIndex = true
 924  		return nil
 925  	}
 926  }
 927  
 928  // WriterBetterCompression will enable better compression.
 929  // EncodeBetter compresses better than Encode but typically with a
 930  // 10-40% speed decrease on both compression and decompression.
 931  func WriterBetterCompression() WriterOption {
 932  	return func(w *Writer) error {
 933  		w.level = levelBetter
 934  		return nil
 935  	}
 936  }
 937  
 938  // WriterBestCompression will enable better compression.
 939  // EncodeBest compresses better than Encode but typically with a
 940  // big speed decrease on compression.
 941  func WriterBestCompression() WriterOption {
 942  	return func(w *Writer) error {
 943  		w.level = levelBest
 944  		return nil
 945  	}
 946  }
 947  
 948  // WriterUncompressed will bypass compression.
 949  // The stream will be written as uncompressed blocks only.
 950  // If concurrency is > 1 CRC and output will still be done async.
 951  func WriterUncompressed() WriterOption {
 952  	return func(w *Writer) error {
 953  		w.level = levelUncompressed
 954  		return nil
 955  	}
 956  }
 957  
 958  // WriterBufferDone will perform a callback when EncodeBuffer has finished
 959  // writing a buffer to the output and the buffer can safely be reused.
 960  // If the buffer was split into several blocks, it will be sent after the last block.
 961  // Callbacks will not be done concurrently.
 962  func WriterBufferDone(fn func(b []byte)) WriterOption {
 963  	return func(w *Writer) error {
 964  		w.bufferCB = fn
 965  		return nil
 966  	}
 967  }
 968  
 969  // WriterBlockSize allows to override the default block size.
 970  // Blocks will be this size or smaller.
 971  // Minimum size is 4KB and maximum size is 4MB.
 972  //
 973  // Bigger blocks may give bigger throughput on systems with many cores,
 974  // and will increase compression slightly, but it will limit the possible
 975  // concurrency for smaller payloads for both encoding and decoding.
 976  // Default block size is 1MB.
 977  //
 978  // When writing Snappy compatible output using WriterSnappyCompat,
 979  // the maximum block size is 64KB.
 980  func WriterBlockSize(n int) WriterOption {
 981  	return func(w *Writer) error {
 982  		if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
 983  			return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
 984  		}
 985  		if n > maxBlockSize || n < minBlockSize {
 986  			return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
 987  		}
 988  		w.blockSize = n
 989  		return nil
 990  	}
 991  }
 992  
 993  // WriterPadding will add padding to all output so the size will be a multiple of n.
 994  // This can be used to obfuscate the exact output size or make blocks of a certain size.
 995  // The contents will be a skippable frame, so it will be invisible by the decoder.
 996  // n must be > 0 and <= 4MB.
 997  // The padded area will be filled with data from crypto/rand.Reader.
 998  // The padding will be applied whenever Close is called on the writer.
 999  func WriterPadding(n int) WriterOption {
1000  	return func(w *Writer) error {
1001  		if n <= 0 {
1002  			return fmt.Errorf("s2: padding must be at least 1")
1003  		}
1004  		// No need to waste our time.
1005  		if n == 1 {
1006  			w.pad = 0
1007  		}
1008  		if n > maxBlockSize {
1009  			return fmt.Errorf("s2: padding must less than 4MB")
1010  		}
1011  		w.pad = n
1012  		return nil
1013  	}
1014  }
1015  
1016  // WriterPaddingSrc will get random data for padding from the supplied source.
1017  // By default crypto/rand is used.
1018  func WriterPaddingSrc(reader io.Reader) WriterOption {
1019  	return func(w *Writer) error {
1020  		w.randSrc = reader
1021  		return nil
1022  	}
1023  }
1024  
1025  // WriterSnappyCompat will write snappy compatible output.
1026  // The output can be decompressed using either snappy or s2.
1027  // If block size is more than 64KB it is set to that.
1028  func WriterSnappyCompat() WriterOption {
1029  	return func(w *Writer) error {
1030  		w.snappy = true
1031  		if w.blockSize > 64<<10 {
1032  			// We choose 8 bytes less than 64K, since that will make literal emits slightly more effective.
1033  			// And allows us to skip some size checks.
1034  			w.blockSize = (64 << 10) - 8
1035  		}
1036  		return nil
1037  	}
1038  }
1039  
1040  // WriterFlushOnWrite will compress blocks on each call to the Write function.
1041  //
1042  // This is quite inefficient as blocks size will depend on the write size.
1043  //
1044  // Use WriterConcurrency(1) to also make sure that output is flushed.
1045  // When Write calls return, otherwise they will be written when compression is done.
1046  func WriterFlushOnWrite() WriterOption {
1047  	return func(w *Writer) error {
1048  		w.flushOnWrite = true
1049  		return nil
1050  	}
1051  }
1052  
1053  // WriterCustomEncoder allows to override the encoder for blocks on the stream.
1054  // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
1055  // Block size (initial varint) should not be added by the encoder.
1056  // Returning value 0 indicates the block could not be compressed.
1057  // Returning a negative value indicates that compression should be attempted.
1058  // The function should expect to be called concurrently.
1059  func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
1060  	return func(w *Writer) error {
1061  		w.customEnc = fn
1062  		return nil
1063  	}
1064  }
1065