storage.go raw

   1  package negentropy
   2  
   3  import (
   4  	"crypto/sha256"
   5  	"encoding/binary"
   6  	"encoding/hex"
   7  	"iter"
   8  	"math/bits"
   9  	"sort"
  10  )
  11  
  12  // Storage is the interface for negentropy item storage.
  13  // Implementations must maintain items sorted by (timestamp, id).
  14  type Storage interface {
  15  	// Size returns the number of items in storage.
  16  	Size() int
  17  
  18  	// Range returns an iterator over items in the index range [begin, end).
  19  	Range(begin, end int) iter.Seq2[int, Item]
  20  
  21  	// FindLowerBound returns the index of the first item >= bound in range [begin, end).
  22  	// If no such item exists, returns end.
  23  	FindLowerBound(begin, end int, bound Bound) int
  24  
  25  	// GetBound returns the bound at the given index.
  26  	// If idx == Size(), returns MaxBound.
  27  	GetBound(idx int) Bound
  28  
  29  	// Fingerprint computes the fingerprint of items in range [begin, end).
  30  	Fingerprint(begin, end int) Fingerprint
  31  }
  32  
  33  // Vector is an in-memory Storage implementation backed by a sorted slice.
  34  type Vector struct {
  35  	items  []Item
  36  	sealed bool
  37  }
  38  
  39  // NewVector creates a new empty Vector.
  40  func NewVector() *Vector {
  41  	return &Vector{
  42  		items: make([]Item, 0),
  43  	}
  44  }
  45  
  46  // Insert adds an item to the vector.
  47  // Items can only be inserted before Seal() is called.
  48  func (v *Vector) Insert(timestamp int64, id string) {
  49  	if v.sealed {
  50  		panic("cannot insert into sealed vector")
  51  	}
  52  	v.items = append(v.items, Item{Timestamp: timestamp, ID: id})
  53  }
  54  
  55  // Seal sorts the items and marks the vector as read-only.
  56  // Must be called before using the vector for reconciliation.
  57  func (v *Vector) Seal() {
  58  	if v.sealed {
  59  		return
  60  	}
  61  	sort.Slice(v.items, func(i, j int) bool {
  62  		return v.items[i].Compare(v.items[j]) < 0
  63  	})
  64  	v.sealed = true
  65  }
  66  
  67  // Size returns the number of items.
  68  func (v *Vector) Size() int {
  69  	return len(v.items)
  70  }
  71  
  72  // Range returns an iterator over items in range [begin, end).
  73  func (v *Vector) Range(begin, end int) iter.Seq2[int, Item] {
  74  	return func(yield func(int, Item) bool) {
  75  		for i := begin; i < end && i < len(v.items); i++ {
  76  			if !yield(i, v.items[i]) {
  77  				return
  78  			}
  79  		}
  80  	}
  81  }
  82  
  83  // FindLowerBound returns the index of the first item >= bound in range [begin, end).
  84  func (v *Vector) FindLowerBound(begin, end int, bound Bound) int {
  85  	if begin >= end || begin >= len(v.items) {
  86  		return end
  87  	}
  88  	if end > len(v.items) {
  89  		end = len(v.items)
  90  	}
  91  
  92  	// Binary search for the lower bound
  93  	idx := sort.Search(end-begin, func(i int) bool {
  94  		item := v.items[begin+i]
  95  		cmp := item.Compare(bound.Item)
  96  		return cmp >= 0
  97  	})
  98  
  99  	return begin + idx
 100  }
 101  
 102  // GetBound returns the bound at the given index.
 103  func (v *Vector) GetBound(idx int) Bound {
 104  	if idx >= len(v.items) {
 105  		return MaxBound()
 106  	}
 107  	return Bound{v.items[idx]}
 108  }
 109  
 110  // Fingerprint computes the fingerprint of items in range [begin, end).
 111  // Per the negentropy protocol v1 spec:
 112  // 1. Sum all 32-byte IDs as 256-bit integers (modular addition mod 2^256)
 113  // 2. Append the element count as a varint
 114  // 3. SHA-256 hash the result
 115  // 4. Take the first 16 bytes
 116  func (v *Vector) Fingerprint(begin, end int) Fingerprint {
 117  	if end > len(v.items) {
 118  		end = len(v.items)
 119  	}
 120  
 121  	// Accumulate sum of all IDs as a 256-bit integer (4 little-endian uint64s)
 122  	var accum [4]uint64
 123  
 124  	for i := begin; i < end; i++ {
 125  		idBytes, err := hex.DecodeString(v.items[i].ID)
 126  		if err != nil || len(idBytes) != FullIDSize {
 127  			continue
 128  		}
 129  
 130  		// Add this ID to the accumulator with carry propagation
 131  		var carry uint64
 132  		for j := 0; j < 4; j++ {
 133  			val := binary.LittleEndian.Uint64(idBytes[j*8 : j*8+8])
 134  			accum[j], carry = bits.Add64(accum[j], val, carry)
 135  		}
 136  	}
 137  
 138  	// Convert accumulator back to bytes (little-endian)
 139  	var accumBytes [FullIDSize]byte
 140  	for j := 0; j < 4; j++ {
 141  		binary.LittleEndian.PutUint64(accumBytes[j*8:j*8+8], accum[j])
 142  	}
 143  
 144  	// Append element count as varint, then SHA-256 hash
 145  	count := end - begin
 146  	if count < 0 {
 147  		count = 0
 148  	}
 149  	countBytes := EncodeVarInt(uint64(count))
 150  
 151  	hasher := sha256.New()
 152  	hasher.Write(accumBytes[:])
 153  	hasher.Write(countBytes)
 154  	hash := hasher.Sum(nil)
 155  
 156  	var fp Fingerprint
 157  	copy(fp[:], hash[:DefaultIDSize])
 158  	return fp
 159  }
 160  
 161  // FingerprintWithHash is an alias for Fingerprint (which now uses the correct
 162  // spec-compliant computation: modular addition + count + SHA-256).
 163  // Deprecated: use Fingerprint() directly.
 164  func (v *Vector) FingerprintWithHash(begin, end int) Fingerprint {
 165  	return v.Fingerprint(begin, end)
 166  }
 167  
 168  // GetItems returns a copy of all items (for testing).
 169  func (v *Vector) GetItems() []Item {
 170  	result := make([]Item, len(v.items))
 171  	copy(result, v.items)
 172  	return result
 173  }
 174  
 175  // GetItem returns the item at the given index.
 176  func (v *Vector) GetItem(idx int) Item {
 177  	if idx >= len(v.items) {
 178  		return Item{}
 179  	}
 180  	return v.items[idx]
 181  }
 182