index.go raw

   1  // Copyright (c) 2022+ Klaus Post. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package s2
   6  
   7  import (
   8  	"bytes"
   9  	"encoding/binary"
  10  	"encoding/json"
  11  	"fmt"
  12  	"io"
  13  	"sort"
  14  )
  15  
  16  const (
  17  	S2IndexHeader   = "s2idx\x00"
  18  	S2IndexTrailer  = "\x00xdi2s"
  19  	maxIndexEntries = 1 << 16
  20  	// If distance is less than this, we do not add the entry.
  21  	minIndexDist = 1 << 20
  22  )
  23  
  24  // Index represents an S2/Snappy index.
  25  type Index struct {
  26  	TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown.
  27  	TotalCompressed   int64 // Total Compressed size if known. Will be -1 if unknown.
  28  	info              []struct {
  29  		compressedOffset   int64
  30  		uncompressedOffset int64
  31  	}
  32  	estBlockUncomp int64
  33  }
  34  
  35  func (i *Index) reset(maxBlock int) {
  36  	i.estBlockUncomp = int64(maxBlock)
  37  	i.TotalCompressed = -1
  38  	i.TotalUncompressed = -1
  39  	if len(i.info) > 0 {
  40  		i.info = i.info[:0]
  41  	}
  42  }
  43  
  44  // allocInfos will allocate an empty slice of infos.
  45  func (i *Index) allocInfos(n int) {
  46  	if n > maxIndexEntries {
  47  		panic("n > maxIndexEntries")
  48  	}
  49  	i.info = make([]struct {
  50  		compressedOffset   int64
  51  		uncompressedOffset int64
  52  	}, 0, n)
  53  }
  54  
  55  // add an uncompressed and compressed pair.
  56  // Entries must be sent in order.
  57  func (i *Index) add(compressedOffset, uncompressedOffset int64) error {
  58  	if i == nil {
  59  		return nil
  60  	}
  61  	lastIdx := len(i.info) - 1
  62  	if lastIdx >= 0 {
  63  		latest := i.info[lastIdx]
  64  		if latest.uncompressedOffset == uncompressedOffset {
  65  			// Uncompressed didn't change, don't add entry,
  66  			// but update start index.
  67  			latest.compressedOffset = compressedOffset
  68  			i.info[lastIdx] = latest
  69  			return nil
  70  		}
  71  		if latest.uncompressedOffset > uncompressedOffset {
  72  			return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
  73  		}
  74  		if latest.compressedOffset > compressedOffset {
  75  			return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.compressedOffset, compressedOffset)
  76  		}
  77  		if latest.uncompressedOffset+minIndexDist > uncompressedOffset {
  78  			// Only add entry if distance is large enough.
  79  			return nil
  80  		}
  81  	}
  82  	i.info = append(i.info, struct {
  83  		compressedOffset   int64
  84  		uncompressedOffset int64
  85  	}{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset})
  86  	return nil
  87  }
  88  
  89  // Find the offset at or before the wanted (uncompressed) offset.
  90  // If offset is 0 or positive it is the offset from the beginning of the file.
  91  // If the uncompressed size is known, the offset must be within the file.
  92  // If an offset outside the file is requested io.ErrUnexpectedEOF is returned.
  93  // If the offset is negative, it is interpreted as the distance from the end of the file,
  94  // where -1 represents the last byte.
  95  // If offset from the end of the file is requested, but size is unknown,
  96  // ErrUnsupported will be returned.
  97  func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) {
  98  	if i.TotalUncompressed < 0 {
  99  		return 0, 0, ErrCorrupt
 100  	}
 101  	if offset < 0 {
 102  		offset = i.TotalUncompressed + offset
 103  		if offset < 0 {
 104  			return 0, 0, io.ErrUnexpectedEOF
 105  		}
 106  	}
 107  	if offset > i.TotalUncompressed {
 108  		return 0, 0, io.ErrUnexpectedEOF
 109  	}
 110  	if len(i.info) > 200 {
 111  		n := sort.Search(len(i.info), func(n int) bool {
 112  			return i.info[n].uncompressedOffset > offset
 113  		})
 114  		if n == 0 {
 115  			n = 1
 116  		}
 117  		return i.info[n-1].compressedOffset, i.info[n-1].uncompressedOffset, nil
 118  	}
 119  	for _, info := range i.info {
 120  		if info.uncompressedOffset > offset {
 121  			break
 122  		}
 123  		compressedOff = info.compressedOffset
 124  		uncompressedOff = info.uncompressedOffset
 125  	}
 126  	return compressedOff, uncompressedOff, nil
 127  }
 128  
 129  // reduce to stay below maxIndexEntries
 130  func (i *Index) reduce() {
 131  	if len(i.info) < maxIndexEntries && i.estBlockUncomp >= minIndexDist {
 132  		return
 133  	}
 134  
 135  	// Algorithm, keep 1, remove removeN entries...
 136  	removeN := (len(i.info) + 1) / maxIndexEntries
 137  	src := i.info
 138  	j := 0
 139  
 140  	// Each block should be at least 1MB, but don't reduce below 1000 entries.
 141  	for i.estBlockUncomp*(int64(removeN)+1) < minIndexDist && len(i.info)/(removeN+1) > 1000 {
 142  		removeN++
 143  	}
 144  	for idx := 0; idx < len(src); idx++ {
 145  		i.info[j] = src[idx]
 146  		j++
 147  		idx += removeN
 148  	}
 149  	i.info = i.info[:j]
 150  	// Update maxblock estimate.
 151  	i.estBlockUncomp += i.estBlockUncomp * int64(removeN)
 152  }
 153  
 154  func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte {
 155  	i.reduce()
 156  	var tmp [binary.MaxVarintLen64]byte
 157  
 158  	initSize := len(b)
 159  	// We make the start a skippable header+size.
 160  	b = append(b, ChunkTypeIndex, 0, 0, 0)
 161  	b = append(b, []byte(S2IndexHeader)...)
 162  	// Total Uncompressed size
 163  	n := binary.PutVarint(tmp[:], uncompTotal)
 164  	b = append(b, tmp[:n]...)
 165  	// Total Compressed size
 166  	n = binary.PutVarint(tmp[:], compTotal)
 167  	b = append(b, tmp[:n]...)
 168  	// Put EstBlockUncomp size
 169  	n = binary.PutVarint(tmp[:], i.estBlockUncomp)
 170  	b = append(b, tmp[:n]...)
 171  	// Put length
 172  	n = binary.PutVarint(tmp[:], int64(len(i.info)))
 173  	b = append(b, tmp[:n]...)
 174  
 175  	// Check if we should add uncompressed offsets
 176  	var hasUncompressed byte
 177  	for idx, info := range i.info {
 178  		if idx == 0 {
 179  			if info.uncompressedOffset != 0 {
 180  				hasUncompressed = 1
 181  				break
 182  			}
 183  			continue
 184  		}
 185  		if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp {
 186  			hasUncompressed = 1
 187  			break
 188  		}
 189  	}
 190  	b = append(b, hasUncompressed)
 191  
 192  	// Add each entry
 193  	if hasUncompressed == 1 {
 194  		for idx, info := range i.info {
 195  			uOff := info.uncompressedOffset
 196  			if idx > 0 {
 197  				prev := i.info[idx-1]
 198  				uOff -= prev.uncompressedOffset + (i.estBlockUncomp)
 199  			}
 200  			n = binary.PutVarint(tmp[:], uOff)
 201  			b = append(b, tmp[:n]...)
 202  		}
 203  	}
 204  
 205  	// Initial compressed size estimate.
 206  	cPredict := i.estBlockUncomp / 2
 207  
 208  	for idx, info := range i.info {
 209  		cOff := info.compressedOffset
 210  		if idx > 0 {
 211  			prev := i.info[idx-1]
 212  			cOff -= prev.compressedOffset + cPredict
 213  			// Update compressed size prediction, with half the error.
 214  			cPredict += cOff / 2
 215  		}
 216  		n = binary.PutVarint(tmp[:], cOff)
 217  		b = append(b, tmp[:n]...)
 218  	}
 219  
 220  	// Add Total Size.
 221  	// Stored as fixed size for easier reading.
 222  	binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer)))
 223  	b = append(b, tmp[:4]...)
 224  	// Trailer
 225  	b = append(b, []byte(S2IndexTrailer)...)
 226  
 227  	// Update size
 228  	chunkLen := len(b) - initSize - skippableFrameHeader
 229  	b[initSize+1] = uint8(chunkLen >> 0)
 230  	b[initSize+2] = uint8(chunkLen >> 8)
 231  	b[initSize+3] = uint8(chunkLen >> 16)
 232  	//fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal)
 233  	return b
 234  }
 235  
 236  // Load a binary index.
 237  // A zero value Index can be used or a previous one can be reused.
 238  func (i *Index) Load(b []byte) ([]byte, error) {
 239  	if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) {
 240  		return b, io.ErrUnexpectedEOF
 241  	}
 242  	if b[0] != ChunkTypeIndex {
 243  		return b, ErrCorrupt
 244  	}
 245  	chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
 246  	b = b[4:]
 247  
 248  	// Validate we have enough...
 249  	if len(b) < chunkLen {
 250  		return b, io.ErrUnexpectedEOF
 251  	}
 252  	if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
 253  		return b, ErrUnsupported
 254  	}
 255  	b = b[len(S2IndexHeader):]
 256  
 257  	// Total Uncompressed
 258  	if v, n := binary.Varint(b); n <= 0 || v < 0 {
 259  		return b, ErrCorrupt
 260  	} else {
 261  		i.TotalUncompressed = v
 262  		b = b[n:]
 263  	}
 264  
 265  	// Total Compressed
 266  	if v, n := binary.Varint(b); n <= 0 {
 267  		return b, ErrCorrupt
 268  	} else {
 269  		i.TotalCompressed = v
 270  		b = b[n:]
 271  	}
 272  
 273  	// Read EstBlockUncomp
 274  	if v, n := binary.Varint(b); n <= 0 {
 275  		return b, ErrCorrupt
 276  	} else {
 277  		if v < 0 {
 278  			return b, ErrCorrupt
 279  		}
 280  		i.estBlockUncomp = v
 281  		b = b[n:]
 282  	}
 283  
 284  	var entries int
 285  	if v, n := binary.Varint(b); n <= 0 {
 286  		return b, ErrCorrupt
 287  	} else {
 288  		if v < 0 || v > maxIndexEntries {
 289  			return b, ErrCorrupt
 290  		}
 291  		entries = int(v)
 292  		b = b[n:]
 293  	}
 294  	if cap(i.info) < entries {
 295  		i.allocInfos(entries)
 296  	}
 297  	i.info = i.info[:entries]
 298  
 299  	if len(b) < 1 {
 300  		return b, io.ErrUnexpectedEOF
 301  	}
 302  	hasUncompressed := b[0]
 303  	b = b[1:]
 304  	if hasUncompressed&1 != hasUncompressed {
 305  		return b, ErrCorrupt
 306  	}
 307  
 308  	// Add each uncompressed entry
 309  	for idx := range i.info {
 310  		var uOff int64
 311  		if hasUncompressed != 0 {
 312  			// Load delta
 313  			if v, n := binary.Varint(b); n <= 0 {
 314  				return b, ErrCorrupt
 315  			} else {
 316  				uOff = v
 317  				b = b[n:]
 318  			}
 319  		}
 320  
 321  		if idx > 0 {
 322  			prev := i.info[idx-1].uncompressedOffset
 323  			uOff += prev + (i.estBlockUncomp)
 324  			if uOff <= prev {
 325  				return b, ErrCorrupt
 326  			}
 327  		}
 328  		if uOff < 0 {
 329  			return b, ErrCorrupt
 330  		}
 331  		i.info[idx].uncompressedOffset = uOff
 332  	}
 333  
 334  	// Initial compressed size estimate.
 335  	cPredict := i.estBlockUncomp / 2
 336  
 337  	// Add each compressed entry
 338  	for idx := range i.info {
 339  		var cOff int64
 340  		if v, n := binary.Varint(b); n <= 0 {
 341  			return b, ErrCorrupt
 342  		} else {
 343  			cOff = v
 344  			b = b[n:]
 345  		}
 346  
 347  		if idx > 0 {
 348  			// Update compressed size prediction, with half the error.
 349  			cPredictNew := cPredict + cOff/2
 350  
 351  			prev := i.info[idx-1].compressedOffset
 352  			cOff += prev + cPredict
 353  			if cOff <= prev {
 354  				return b, ErrCorrupt
 355  			}
 356  			cPredict = cPredictNew
 357  		}
 358  		if cOff < 0 {
 359  			return b, ErrCorrupt
 360  		}
 361  		i.info[idx].compressedOffset = cOff
 362  	}
 363  	if len(b) < 4+len(S2IndexTrailer) {
 364  		return b, io.ErrUnexpectedEOF
 365  	}
 366  	// Skip size...
 367  	b = b[4:]
 368  
 369  	// Check trailer...
 370  	if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
 371  		return b, ErrCorrupt
 372  	}
 373  	return b[len(S2IndexTrailer):], nil
 374  }
 375  
 376  // LoadStream will load an index from the end of the supplied stream.
 377  // ErrUnsupported will be returned if the signature cannot be found.
 378  // ErrCorrupt will be returned if unexpected values are found.
 379  // io.ErrUnexpectedEOF is returned if there are too few bytes.
 380  // IO errors are returned as-is.
 381  func (i *Index) LoadStream(rs io.ReadSeeker) error {
 382  	// Go to end.
 383  	_, err := rs.Seek(-10, io.SeekEnd)
 384  	if err != nil {
 385  		return err
 386  	}
 387  	var tmp [10]byte
 388  	_, err = io.ReadFull(rs, tmp[:])
 389  	if err != nil {
 390  		return err
 391  	}
 392  	// Check trailer...
 393  	if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
 394  		return ErrUnsupported
 395  	}
 396  	sz := binary.LittleEndian.Uint32(tmp[:4])
 397  	if sz > maxChunkSize+skippableFrameHeader {
 398  		return ErrCorrupt
 399  	}
 400  	_, err = rs.Seek(-int64(sz), io.SeekEnd)
 401  	if err != nil {
 402  		return err
 403  	}
 404  
 405  	// Read index.
 406  	buf := make([]byte, sz)
 407  	_, err = io.ReadFull(rs, buf)
 408  	if err != nil {
 409  		return err
 410  	}
 411  	_, err = i.Load(buf)
 412  	return err
 413  }
 414  
 415  // IndexStream will return an index for a stream.
 416  // The stream structure will be checked, but
 417  // data within blocks is not verified.
 418  // The returned index can either be appended to the end of the stream
 419  // or stored separately.
 420  func IndexStream(r io.Reader) ([]byte, error) {
 421  	var i Index
 422  	var buf [maxChunkSize]byte
 423  	var readHeader bool
 424  	for {
 425  		_, err := io.ReadFull(r, buf[:4])
 426  		if err != nil {
 427  			if err == io.EOF {
 428  				return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil
 429  			}
 430  			return nil, err
 431  		}
 432  		// Start of this chunk.
 433  		startChunk := i.TotalCompressed
 434  		i.TotalCompressed += 4
 435  
 436  		chunkType := buf[0]
 437  		if !readHeader {
 438  			if chunkType != chunkTypeStreamIdentifier {
 439  				return nil, ErrCorrupt
 440  			}
 441  			readHeader = true
 442  		}
 443  		chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16
 444  		if chunkLen < checksumSize {
 445  			return nil, ErrCorrupt
 446  		}
 447  
 448  		i.TotalCompressed += int64(chunkLen)
 449  		_, err = io.ReadFull(r, buf[:chunkLen])
 450  		if err != nil {
 451  			return nil, io.ErrUnexpectedEOF
 452  		}
 453  		// The chunk types are specified at
 454  		// https://github.com/google/snappy/blob/master/framing_format.txt
 455  		switch chunkType {
 456  		case chunkTypeCompressedData:
 457  			// Section 4.2. Compressed data (chunk type 0x00).
 458  			// Skip checksum.
 459  			dLen, err := DecodedLen(buf[checksumSize:])
 460  			if err != nil {
 461  				return nil, err
 462  			}
 463  			if dLen > maxBlockSize {
 464  				return nil, ErrCorrupt
 465  			}
 466  			if i.estBlockUncomp == 0 {
 467  				// Use first block for estimate...
 468  				i.estBlockUncomp = int64(dLen)
 469  			}
 470  			err = i.add(startChunk, i.TotalUncompressed)
 471  			if err != nil {
 472  				return nil, err
 473  			}
 474  			i.TotalUncompressed += int64(dLen)
 475  			continue
 476  		case chunkTypeUncompressedData:
 477  			n2 := chunkLen - checksumSize
 478  			if n2 > maxBlockSize {
 479  				return nil, ErrCorrupt
 480  			}
 481  			if i.estBlockUncomp == 0 {
 482  				// Use first block for estimate...
 483  				i.estBlockUncomp = int64(n2)
 484  			}
 485  			err = i.add(startChunk, i.TotalUncompressed)
 486  			if err != nil {
 487  				return nil, err
 488  			}
 489  			i.TotalUncompressed += int64(n2)
 490  			continue
 491  		case chunkTypeStreamIdentifier:
 492  			// Section 4.1. Stream identifier (chunk type 0xff).
 493  			if chunkLen != len(magicBody) {
 494  				return nil, ErrCorrupt
 495  			}
 496  
 497  			if string(buf[:len(magicBody)]) != magicBody {
 498  				if string(buf[:len(magicBody)]) != magicBodySnappy {
 499  					return nil, ErrCorrupt
 500  				}
 501  			}
 502  
 503  			continue
 504  		}
 505  
 506  		if chunkType <= 0x7f {
 507  			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
 508  			return nil, ErrUnsupported
 509  		}
 510  		if chunkLen > maxChunkSize {
 511  			return nil, ErrUnsupported
 512  		}
 513  		// Section 4.4 Padding (chunk type 0xfe).
 514  		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
 515  	}
 516  }
 517  
 518  // JSON returns the index as JSON text.
 519  func (i *Index) JSON() []byte {
 520  	type offset struct {
 521  		CompressedOffset   int64 `json:"compressed"`
 522  		UncompressedOffset int64 `json:"uncompressed"`
 523  	}
 524  	x := struct {
 525  		TotalUncompressed int64    `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown.
 526  		TotalCompressed   int64    `json:"total_compressed"`   // Total Compressed size if known. Will be -1 if unknown.
 527  		Offsets           []offset `json:"offsets"`
 528  		EstBlockUncomp    int64    `json:"est_block_uncompressed"`
 529  	}{
 530  		TotalUncompressed: i.TotalUncompressed,
 531  		TotalCompressed:   i.TotalCompressed,
 532  		EstBlockUncomp:    i.estBlockUncomp,
 533  	}
 534  	for _, v := range i.info {
 535  		x.Offsets = append(x.Offsets, offset{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset})
 536  	}
 537  	b, _ := json.MarshalIndent(x, "", "  ")
 538  	return b
 539  }
 540  
 541  // RemoveIndexHeaders will trim all headers and trailers from a given index.
 542  // This is expected to save 20 bytes.
 543  // These can be restored using RestoreIndexHeaders.
 544  // This removes a layer of security, but is the most compact representation.
 545  // Returns nil if headers contains errors.
 546  // The returned slice references the provided slice.
 547  func RemoveIndexHeaders(b []byte) []byte {
 548  	const save = 4 + len(S2IndexHeader) + len(S2IndexTrailer) + 4
 549  	if len(b) <= save {
 550  		return nil
 551  	}
 552  	if b[0] != ChunkTypeIndex {
 553  		return nil
 554  	}
 555  	chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
 556  	b = b[4:]
 557  
 558  	// Validate we have enough...
 559  	if len(b) < chunkLen {
 560  		return nil
 561  	}
 562  	b = b[:chunkLen]
 563  
 564  	if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
 565  		return nil
 566  	}
 567  	b = b[len(S2IndexHeader):]
 568  	if !bytes.HasSuffix(b, []byte(S2IndexTrailer)) {
 569  		return nil
 570  	}
 571  	b = bytes.TrimSuffix(b, []byte(S2IndexTrailer))
 572  
 573  	if len(b) < 4 {
 574  		return nil
 575  	}
 576  	return b[:len(b)-4]
 577  }
 578  
 579  // RestoreIndexHeaders will index restore headers removed by RemoveIndexHeaders.
 580  // No error checking is performed on the input.
 581  // If a 0 length slice is sent, it is returned without modification.
 582  func RestoreIndexHeaders(in []byte) []byte {
 583  	if len(in) == 0 {
 584  		return in
 585  	}
 586  	b := make([]byte, 0, 4+len(S2IndexHeader)+len(in)+len(S2IndexTrailer)+4)
 587  	b = append(b, ChunkTypeIndex, 0, 0, 0)
 588  	b = append(b, []byte(S2IndexHeader)...)
 589  	b = append(b, in...)
 590  
 591  	var tmp [4]byte
 592  	binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)+4+len(S2IndexTrailer)))
 593  	b = append(b, tmp[:4]...)
 594  	// Trailer
 595  	b = append(b, []byte(S2IndexTrailer)...)
 596  
 597  	chunkLen := len(b) - skippableFrameHeader
 598  	b[1] = uint8(chunkLen >> 0)
 599  	b[2] = uint8(chunkLen >> 8)
 600  	b[3] = uint8(chunkLen >> 16)
 601  	return b
 602  }
 603