index.go raw
1 // Copyright (c) 2022+ Klaus Post. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package s2
6
7 import (
8 "bytes"
9 "encoding/binary"
10 "encoding/json"
11 "fmt"
12 "io"
13 "sort"
14 )
15
16 const (
17 S2IndexHeader = "s2idx\x00"
18 S2IndexTrailer = "\x00xdi2s"
19 maxIndexEntries = 1 << 16
20 // If distance is less than this, we do not add the entry.
21 minIndexDist = 1 << 20
22 )
23
24 // Index represents an S2/Snappy index.
25 type Index struct {
26 TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown.
27 TotalCompressed int64 // Total Compressed size if known. Will be -1 if unknown.
28 info []struct {
29 compressedOffset int64
30 uncompressedOffset int64
31 }
32 estBlockUncomp int64
33 }
34
35 func (i *Index) reset(maxBlock int) {
36 i.estBlockUncomp = int64(maxBlock)
37 i.TotalCompressed = -1
38 i.TotalUncompressed = -1
39 if len(i.info) > 0 {
40 i.info = i.info[:0]
41 }
42 }
43
44 // allocInfos will allocate an empty slice of infos.
45 func (i *Index) allocInfos(n int) {
46 if n > maxIndexEntries {
47 panic("n > maxIndexEntries")
48 }
49 i.info = make([]struct {
50 compressedOffset int64
51 uncompressedOffset int64
52 }, 0, n)
53 }
54
55 // add an uncompressed and compressed pair.
56 // Entries must be sent in order.
57 func (i *Index) add(compressedOffset, uncompressedOffset int64) error {
58 if i == nil {
59 return nil
60 }
61 lastIdx := len(i.info) - 1
62 if lastIdx >= 0 {
63 latest := i.info[lastIdx]
64 if latest.uncompressedOffset == uncompressedOffset {
65 // Uncompressed didn't change, don't add entry,
66 // but update start index.
67 latest.compressedOffset = compressedOffset
68 i.info[lastIdx] = latest
69 return nil
70 }
71 if latest.uncompressedOffset > uncompressedOffset {
72 return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
73 }
74 if latest.compressedOffset > compressedOffset {
75 return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.compressedOffset, compressedOffset)
76 }
77 if latest.uncompressedOffset+minIndexDist > uncompressedOffset {
78 // Only add entry if distance is large enough.
79 return nil
80 }
81 }
82 i.info = append(i.info, struct {
83 compressedOffset int64
84 uncompressedOffset int64
85 }{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset})
86 return nil
87 }
88
89 // Find the offset at or before the wanted (uncompressed) offset.
90 // If offset is 0 or positive it is the offset from the beginning of the file.
91 // If the uncompressed size is known, the offset must be within the file.
92 // If an offset outside the file is requested io.ErrUnexpectedEOF is returned.
93 // If the offset is negative, it is interpreted as the distance from the end of the file,
94 // where -1 represents the last byte.
95 // If offset from the end of the file is requested, but size is unknown,
96 // ErrUnsupported will be returned.
97 func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) {
98 if i.TotalUncompressed < 0 {
99 return 0, 0, ErrCorrupt
100 }
101 if offset < 0 {
102 offset = i.TotalUncompressed + offset
103 if offset < 0 {
104 return 0, 0, io.ErrUnexpectedEOF
105 }
106 }
107 if offset > i.TotalUncompressed {
108 return 0, 0, io.ErrUnexpectedEOF
109 }
110 if len(i.info) > 200 {
111 n := sort.Search(len(i.info), func(n int) bool {
112 return i.info[n].uncompressedOffset > offset
113 })
114 if n == 0 {
115 n = 1
116 }
117 return i.info[n-1].compressedOffset, i.info[n-1].uncompressedOffset, nil
118 }
119 for _, info := range i.info {
120 if info.uncompressedOffset > offset {
121 break
122 }
123 compressedOff = info.compressedOffset
124 uncompressedOff = info.uncompressedOffset
125 }
126 return compressedOff, uncompressedOff, nil
127 }
128
129 // reduce to stay below maxIndexEntries
130 func (i *Index) reduce() {
131 if len(i.info) < maxIndexEntries && i.estBlockUncomp >= minIndexDist {
132 return
133 }
134
135 // Algorithm, keep 1, remove removeN entries...
136 removeN := (len(i.info) + 1) / maxIndexEntries
137 src := i.info
138 j := 0
139
140 // Each block should be at least 1MB, but don't reduce below 1000 entries.
141 for i.estBlockUncomp*(int64(removeN)+1) < minIndexDist && len(i.info)/(removeN+1) > 1000 {
142 removeN++
143 }
144 for idx := 0; idx < len(src); idx++ {
145 i.info[j] = src[idx]
146 j++
147 idx += removeN
148 }
149 i.info = i.info[:j]
150 // Update maxblock estimate.
151 i.estBlockUncomp += i.estBlockUncomp * int64(removeN)
152 }
153
154 func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte {
155 i.reduce()
156 var tmp [binary.MaxVarintLen64]byte
157
158 initSize := len(b)
159 // We make the start a skippable header+size.
160 b = append(b, ChunkTypeIndex, 0, 0, 0)
161 b = append(b, []byte(S2IndexHeader)...)
162 // Total Uncompressed size
163 n := binary.PutVarint(tmp[:], uncompTotal)
164 b = append(b, tmp[:n]...)
165 // Total Compressed size
166 n = binary.PutVarint(tmp[:], compTotal)
167 b = append(b, tmp[:n]...)
168 // Put EstBlockUncomp size
169 n = binary.PutVarint(tmp[:], i.estBlockUncomp)
170 b = append(b, tmp[:n]...)
171 // Put length
172 n = binary.PutVarint(tmp[:], int64(len(i.info)))
173 b = append(b, tmp[:n]...)
174
175 // Check if we should add uncompressed offsets
176 var hasUncompressed byte
177 for idx, info := range i.info {
178 if idx == 0 {
179 if info.uncompressedOffset != 0 {
180 hasUncompressed = 1
181 break
182 }
183 continue
184 }
185 if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp {
186 hasUncompressed = 1
187 break
188 }
189 }
190 b = append(b, hasUncompressed)
191
192 // Add each entry
193 if hasUncompressed == 1 {
194 for idx, info := range i.info {
195 uOff := info.uncompressedOffset
196 if idx > 0 {
197 prev := i.info[idx-1]
198 uOff -= prev.uncompressedOffset + (i.estBlockUncomp)
199 }
200 n = binary.PutVarint(tmp[:], uOff)
201 b = append(b, tmp[:n]...)
202 }
203 }
204
205 // Initial compressed size estimate.
206 cPredict := i.estBlockUncomp / 2
207
208 for idx, info := range i.info {
209 cOff := info.compressedOffset
210 if idx > 0 {
211 prev := i.info[idx-1]
212 cOff -= prev.compressedOffset + cPredict
213 // Update compressed size prediction, with half the error.
214 cPredict += cOff / 2
215 }
216 n = binary.PutVarint(tmp[:], cOff)
217 b = append(b, tmp[:n]...)
218 }
219
220 // Add Total Size.
221 // Stored as fixed size for easier reading.
222 binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer)))
223 b = append(b, tmp[:4]...)
224 // Trailer
225 b = append(b, []byte(S2IndexTrailer)...)
226
227 // Update size
228 chunkLen := len(b) - initSize - skippableFrameHeader
229 b[initSize+1] = uint8(chunkLen >> 0)
230 b[initSize+2] = uint8(chunkLen >> 8)
231 b[initSize+3] = uint8(chunkLen >> 16)
232 //fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal)
233 return b
234 }
235
236 // Load a binary index.
237 // A zero value Index can be used or a previous one can be reused.
238 func (i *Index) Load(b []byte) ([]byte, error) {
239 if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) {
240 return b, io.ErrUnexpectedEOF
241 }
242 if b[0] != ChunkTypeIndex {
243 return b, ErrCorrupt
244 }
245 chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
246 b = b[4:]
247
248 // Validate we have enough...
249 if len(b) < chunkLen {
250 return b, io.ErrUnexpectedEOF
251 }
252 if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
253 return b, ErrUnsupported
254 }
255 b = b[len(S2IndexHeader):]
256
257 // Total Uncompressed
258 if v, n := binary.Varint(b); n <= 0 || v < 0 {
259 return b, ErrCorrupt
260 } else {
261 i.TotalUncompressed = v
262 b = b[n:]
263 }
264
265 // Total Compressed
266 if v, n := binary.Varint(b); n <= 0 {
267 return b, ErrCorrupt
268 } else {
269 i.TotalCompressed = v
270 b = b[n:]
271 }
272
273 // Read EstBlockUncomp
274 if v, n := binary.Varint(b); n <= 0 {
275 return b, ErrCorrupt
276 } else {
277 if v < 0 {
278 return b, ErrCorrupt
279 }
280 i.estBlockUncomp = v
281 b = b[n:]
282 }
283
284 var entries int
285 if v, n := binary.Varint(b); n <= 0 {
286 return b, ErrCorrupt
287 } else {
288 if v < 0 || v > maxIndexEntries {
289 return b, ErrCorrupt
290 }
291 entries = int(v)
292 b = b[n:]
293 }
294 if cap(i.info) < entries {
295 i.allocInfos(entries)
296 }
297 i.info = i.info[:entries]
298
299 if len(b) < 1 {
300 return b, io.ErrUnexpectedEOF
301 }
302 hasUncompressed := b[0]
303 b = b[1:]
304 if hasUncompressed&1 != hasUncompressed {
305 return b, ErrCorrupt
306 }
307
308 // Add each uncompressed entry
309 for idx := range i.info {
310 var uOff int64
311 if hasUncompressed != 0 {
312 // Load delta
313 if v, n := binary.Varint(b); n <= 0 {
314 return b, ErrCorrupt
315 } else {
316 uOff = v
317 b = b[n:]
318 }
319 }
320
321 if idx > 0 {
322 prev := i.info[idx-1].uncompressedOffset
323 uOff += prev + (i.estBlockUncomp)
324 if uOff <= prev {
325 return b, ErrCorrupt
326 }
327 }
328 if uOff < 0 {
329 return b, ErrCorrupt
330 }
331 i.info[idx].uncompressedOffset = uOff
332 }
333
334 // Initial compressed size estimate.
335 cPredict := i.estBlockUncomp / 2
336
337 // Add each compressed entry
338 for idx := range i.info {
339 var cOff int64
340 if v, n := binary.Varint(b); n <= 0 {
341 return b, ErrCorrupt
342 } else {
343 cOff = v
344 b = b[n:]
345 }
346
347 if idx > 0 {
348 // Update compressed size prediction, with half the error.
349 cPredictNew := cPredict + cOff/2
350
351 prev := i.info[idx-1].compressedOffset
352 cOff += prev + cPredict
353 if cOff <= prev {
354 return b, ErrCorrupt
355 }
356 cPredict = cPredictNew
357 }
358 if cOff < 0 {
359 return b, ErrCorrupt
360 }
361 i.info[idx].compressedOffset = cOff
362 }
363 if len(b) < 4+len(S2IndexTrailer) {
364 return b, io.ErrUnexpectedEOF
365 }
366 // Skip size...
367 b = b[4:]
368
369 // Check trailer...
370 if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
371 return b, ErrCorrupt
372 }
373 return b[len(S2IndexTrailer):], nil
374 }
375
376 // LoadStream will load an index from the end of the supplied stream.
377 // ErrUnsupported will be returned if the signature cannot be found.
378 // ErrCorrupt will be returned if unexpected values are found.
379 // io.ErrUnexpectedEOF is returned if there are too few bytes.
380 // IO errors are returned as-is.
381 func (i *Index) LoadStream(rs io.ReadSeeker) error {
382 // Go to end.
383 _, err := rs.Seek(-10, io.SeekEnd)
384 if err != nil {
385 return err
386 }
387 var tmp [10]byte
388 _, err = io.ReadFull(rs, tmp[:])
389 if err != nil {
390 return err
391 }
392 // Check trailer...
393 if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
394 return ErrUnsupported
395 }
396 sz := binary.LittleEndian.Uint32(tmp[:4])
397 if sz > maxChunkSize+skippableFrameHeader {
398 return ErrCorrupt
399 }
400 _, err = rs.Seek(-int64(sz), io.SeekEnd)
401 if err != nil {
402 return err
403 }
404
405 // Read index.
406 buf := make([]byte, sz)
407 _, err = io.ReadFull(rs, buf)
408 if err != nil {
409 return err
410 }
411 _, err = i.Load(buf)
412 return err
413 }
414
415 // IndexStream will return an index for a stream.
416 // The stream structure will be checked, but
417 // data within blocks is not verified.
418 // The returned index can either be appended to the end of the stream
419 // or stored separately.
420 func IndexStream(r io.Reader) ([]byte, error) {
421 var i Index
422 var buf [maxChunkSize]byte
423 var readHeader bool
424 for {
425 _, err := io.ReadFull(r, buf[:4])
426 if err != nil {
427 if err == io.EOF {
428 return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil
429 }
430 return nil, err
431 }
432 // Start of this chunk.
433 startChunk := i.TotalCompressed
434 i.TotalCompressed += 4
435
436 chunkType := buf[0]
437 if !readHeader {
438 if chunkType != chunkTypeStreamIdentifier {
439 return nil, ErrCorrupt
440 }
441 readHeader = true
442 }
443 chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16
444 if chunkLen < checksumSize {
445 return nil, ErrCorrupt
446 }
447
448 i.TotalCompressed += int64(chunkLen)
449 _, err = io.ReadFull(r, buf[:chunkLen])
450 if err != nil {
451 return nil, io.ErrUnexpectedEOF
452 }
453 // The chunk types are specified at
454 // https://github.com/google/snappy/blob/master/framing_format.txt
455 switch chunkType {
456 case chunkTypeCompressedData:
457 // Section 4.2. Compressed data (chunk type 0x00).
458 // Skip checksum.
459 dLen, err := DecodedLen(buf[checksumSize:])
460 if err != nil {
461 return nil, err
462 }
463 if dLen > maxBlockSize {
464 return nil, ErrCorrupt
465 }
466 if i.estBlockUncomp == 0 {
467 // Use first block for estimate...
468 i.estBlockUncomp = int64(dLen)
469 }
470 err = i.add(startChunk, i.TotalUncompressed)
471 if err != nil {
472 return nil, err
473 }
474 i.TotalUncompressed += int64(dLen)
475 continue
476 case chunkTypeUncompressedData:
477 n2 := chunkLen - checksumSize
478 if n2 > maxBlockSize {
479 return nil, ErrCorrupt
480 }
481 if i.estBlockUncomp == 0 {
482 // Use first block for estimate...
483 i.estBlockUncomp = int64(n2)
484 }
485 err = i.add(startChunk, i.TotalUncompressed)
486 if err != nil {
487 return nil, err
488 }
489 i.TotalUncompressed += int64(n2)
490 continue
491 case chunkTypeStreamIdentifier:
492 // Section 4.1. Stream identifier (chunk type 0xff).
493 if chunkLen != len(magicBody) {
494 return nil, ErrCorrupt
495 }
496
497 if string(buf[:len(magicBody)]) != magicBody {
498 if string(buf[:len(magicBody)]) != magicBodySnappy {
499 return nil, ErrCorrupt
500 }
501 }
502
503 continue
504 }
505
506 if chunkType <= 0x7f {
507 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
508 return nil, ErrUnsupported
509 }
510 if chunkLen > maxChunkSize {
511 return nil, ErrUnsupported
512 }
513 // Section 4.4 Padding (chunk type 0xfe).
514 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
515 }
516 }
517
518 // JSON returns the index as JSON text.
519 func (i *Index) JSON() []byte {
520 type offset struct {
521 CompressedOffset int64 `json:"compressed"`
522 UncompressedOffset int64 `json:"uncompressed"`
523 }
524 x := struct {
525 TotalUncompressed int64 `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown.
526 TotalCompressed int64 `json:"total_compressed"` // Total Compressed size if known. Will be -1 if unknown.
527 Offsets []offset `json:"offsets"`
528 EstBlockUncomp int64 `json:"est_block_uncompressed"`
529 }{
530 TotalUncompressed: i.TotalUncompressed,
531 TotalCompressed: i.TotalCompressed,
532 EstBlockUncomp: i.estBlockUncomp,
533 }
534 for _, v := range i.info {
535 x.Offsets = append(x.Offsets, offset{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset})
536 }
537 b, _ := json.MarshalIndent(x, "", " ")
538 return b
539 }
540
541 // RemoveIndexHeaders will trim all headers and trailers from a given index.
542 // This is expected to save 20 bytes.
543 // These can be restored using RestoreIndexHeaders.
544 // This removes a layer of security, but is the most compact representation.
545 // Returns nil if headers contains errors.
546 // The returned slice references the provided slice.
547 func RemoveIndexHeaders(b []byte) []byte {
548 const save = 4 + len(S2IndexHeader) + len(S2IndexTrailer) + 4
549 if len(b) <= save {
550 return nil
551 }
552 if b[0] != ChunkTypeIndex {
553 return nil
554 }
555 chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
556 b = b[4:]
557
558 // Validate we have enough...
559 if len(b) < chunkLen {
560 return nil
561 }
562 b = b[:chunkLen]
563
564 if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
565 return nil
566 }
567 b = b[len(S2IndexHeader):]
568 if !bytes.HasSuffix(b, []byte(S2IndexTrailer)) {
569 return nil
570 }
571 b = bytes.TrimSuffix(b, []byte(S2IndexTrailer))
572
573 if len(b) < 4 {
574 return nil
575 }
576 return b[:len(b)-4]
577 }
578
579 // RestoreIndexHeaders will index restore headers removed by RemoveIndexHeaders.
580 // No error checking is performed on the input.
581 // If a 0 length slice is sent, it is returned without modification.
582 func RestoreIndexHeaders(in []byte) []byte {
583 if len(in) == 0 {
584 return in
585 }
586 b := make([]byte, 0, 4+len(S2IndexHeader)+len(in)+len(S2IndexTrailer)+4)
587 b = append(b, ChunkTypeIndex, 0, 0, 0)
588 b = append(b, []byte(S2IndexHeader)...)
589 b = append(b, in...)
590
591 var tmp [4]byte
592 binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)+4+len(S2IndexTrailer)))
593 b = append(b, tmp[:4]...)
594 // Trailer
595 b = append(b, []byte(S2IndexTrailer)...)
596
597 chunkLen := len(b) - skippableFrameHeader
598 b[1] = uint8(chunkLen >> 0)
599 b[2] = uint8(chunkLen >> 8)
600 b[3] = uint8(chunkLen >> 16)
601 return b
602 }
603