negentropy_test.go raw

   1  package negentropy
   2  
   3  import (
   4  	"testing"
   5  )
   6  
   7  func TestVectorBasic(t *testing.T) {
   8  	v := NewVector()
   9  	v.Insert(1000, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
  10  	v.Insert(2000, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
  11  	v.Insert(3000, "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
  12  	v.Seal()
  13  
  14  	if v.Size() != 3 {
  15  		t.Errorf("expected size 3, got %d", v.Size())
  16  	}
  17  
  18  	// Test fingerprint
  19  	fp := v.Fingerprint(0, 3)
  20  	if fp == EmptyFingerprint {
  21  		t.Error("expected non-empty fingerprint")
  22  	}
  23  }
  24  
  25  func TestNegentropy(t *testing.T) {
  26  	// Create two storages with overlapping data
  27  	v1 := NewVector()
  28  	v1.Insert(1000, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
  29  	v1.Insert(2000, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
  30  	v1.Insert(3000, "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
  31  	v1.Seal()
  32  
  33  	v2 := NewVector()
  34  	v2.Insert(1000, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
  35  	v2.Insert(2000, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
  36  	v2.Insert(4000, "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd")
  37  	v2.Seal()
  38  
  39  	// Client initiates
  40  	client := New(v1, DefaultFrameSizeLimit)
  41  	server := New(v2, DefaultFrameSizeLimit)
  42  
  43  	msg, err := client.Start()
  44  	if err != nil {
  45  		t.Fatalf("client.Start failed: %v", err)
  46  	}
  47  
  48  	// Server responds
  49  	response, complete, err := server.Reconcile(msg)
  50  	if err != nil {
  51  		t.Fatalf("server.Reconcile failed: %v", err)
  52  	}
  53  
  54  	t.Logf("Round 1: complete=%v, response len=%d", complete, len(response))
  55  
  56  	// Continue until complete
  57  	for !complete {
  58  		response, complete, err = client.Reconcile(response)
  59  		if err != nil {
  60  			t.Fatalf("reconcile failed: %v", err)
  61  		}
  62  		t.Logf("Round: complete=%v, response len=%d", complete, len(response))
  63  
  64  		if !complete {
  65  			response, complete, err = server.Reconcile(response)
  66  			if err != nil {
  67  				t.Fatalf("reconcile failed: %v", err)
  68  			}
  69  			t.Logf("Round: complete=%v, response len=%d", complete, len(response))
  70  		}
  71  	}
  72  
  73  	// Check results
  74  	clientHaves := client.CollectHaves()
  75  	clientHaveNots := client.CollectHaveNots()
  76  	serverHaves := server.CollectHaves()
  77  	serverHaveNots := server.CollectHaveNots()
  78  
  79  	t.Logf("Client haves: %v", clientHaves)
  80  	t.Logf("Client haveNots: %v", clientHaveNots)
  81  	t.Logf("Server haves: %v", serverHaves)
  82  	t.Logf("Server haveNots: %v", serverHaveNots)
  83  }
  84  
  85  func TestEncoding(t *testing.T) {
  86  	// Test VarInt encoding/decoding
  87  	testCases := []uint64{0, 1, 127, 128, 16383, 16384, 1<<21 - 1, 1 << 21, 1<<63 - 1}
  88  
  89  	for _, n := range testCases {
  90  		encoded := EncodeVarInt(n)
  91  		dec := NewDecoder(encoded)
  92  		decoded, err := dec.ReadVarInt()
  93  		if err != nil {
  94  			t.Errorf("failed to decode %d: %v", n, err)
  95  			continue
  96  		}
  97  		if decoded != n {
  98  			t.Errorf("expected %d, got %d", n, decoded)
  99  		}
 100  	}
 101  }
 102  
 103  // TestConvergenceLargeSets tests that negentropy converges correctly with larger
 104  // sets where each side has unique events the other doesn't.
 105  func TestConvergenceLargeSets(t *testing.T) {
 106  	sharedCount := 200
 107  	clientOnlyCount := 50
 108  	serverOnlyCount := 150
 109  
 110  	v1 := NewVector()
 111  	v2 := NewVector()
 112  
 113  	for i := 0; i < sharedCount; i++ {
 114  		ts := int64(1000 + i)
 115  		id := makeTestID(i)
 116  		v1.Insert(ts, id)
 117  		v2.Insert(ts, id)
 118  	}
 119  
 120  	clientOnlyIDs := make(map[string]bool)
 121  	for i := 0; i < clientOnlyCount; i++ {
 122  		ts := int64(2000 + i)
 123  		id := makeTestID(10000 + i)
 124  		v1.Insert(ts, id)
 125  		clientOnlyIDs[id] = true
 126  	}
 127  
 128  	serverOnlyIDs := make(map[string]bool)
 129  	for i := 0; i < serverOnlyCount; i++ {
 130  		ts := int64(500 + i)
 131  		id := makeTestID(20000 + i)
 132  		v2.Insert(ts, id)
 133  		serverOnlyIDs[id] = true
 134  	}
 135  
 136  	v1.Seal()
 137  	v2.Seal()
 138  
 139  	runConvergenceTest(t, "basic", v1, v2, clientOnlyIDs, serverOnlyIDs,
 140  		clientOnlyCount, serverOnlyCount)
 141  }
 142  
 143  // TestConvergenceInterspersed tests the critical case where client-only and
 144  // server-only events are interspersed among shared events. This exercises the
 145  // bug where splitRange used GetBound(bucketEnd) instead of upperBound for the
 146  // last bucket, causing range misalignment when boundary events don't exist in
 147  // the responder's storage.
 148  func TestConvergenceInterspersed(t *testing.T) {
 149  	v1 := NewVector()
 150  	v2 := NewVector()
 151  
 152  	clientOnlyIDs := make(map[string]bool)
 153  	serverOnlyIDs := make(map[string]bool)
 154  	clientOnlyCount := 0
 155  	serverOnlyCount := 0
 156  
 157  	// Create events at timestamps 0..499. Every 5th timestamp has a
 158  	// client-only event, every 7th has a server-only event, others are shared.
 159  	for i := 0; i < 500; i++ {
 160  		ts := int64(1000 + i)
 161  		if i%5 == 0 && i%7 != 0 {
 162  			// Client-only: interspersed among shared events
 163  			id := makeTestID(30000 + i)
 164  			v1.Insert(ts, id)
 165  			clientOnlyIDs[id] = true
 166  			clientOnlyCount++
 167  		} else if i%7 == 0 && i%5 != 0 {
 168  			// Server-only: interspersed among shared events
 169  			id := makeTestID(40000 + i)
 170  			v2.Insert(ts, id)
 171  			serverOnlyIDs[id] = true
 172  			serverOnlyCount++
 173  		} else {
 174  			// Shared event
 175  			id := makeTestID(50000 + i)
 176  			v1.Insert(ts, id)
 177  			v2.Insert(ts, id)
 178  		}
 179  	}
 180  
 181  	v1.Seal()
 182  	v2.Seal()
 183  
 184  	runConvergenceTest(t, "interspersed", v1, v2, clientOnlyIDs, serverOnlyIDs,
 185  		clientOnlyCount, serverOnlyCount)
 186  }
 187  
 188  // runConvergenceTest runs the full negentropy protocol and verifies correctness.
 189  func runConvergenceTest(t *testing.T, name string, v1, v2 *Vector,
 190  	clientOnlyIDs, serverOnlyIDs map[string]bool,
 191  	expectedClientHaves, expectedClientHaveNots int) {
 192  	t.Helper()
 193  
 194  	t.Logf("[%s] Client has %d items, Server has %d items", name, v1.Size(), v2.Size())
 195  	t.Logf("[%s] Expected client-only: %d, server-only: %d",
 196  		name, expectedClientHaves, expectedClientHaveNots)
 197  
 198  	client := New(v1, DefaultFrameSizeLimit)
 199  	server := New(v2, DefaultFrameSizeLimit)
 200  
 201  	msg, err := client.Start()
 202  	if err != nil {
 203  		t.Fatalf("client.Start failed: %v", err)
 204  	}
 205  
 206  	rounds := 0
 207  	complete := false
 208  	for !complete {
 209  		rounds++
 210  		if rounds > 20 {
 211  			t.Fatalf("[%s] too many rounds (%d), protocol did not converge", name, rounds)
 212  		}
 213  
 214  		response, serverComplete, err := server.Reconcile(msg)
 215  		if err != nil {
 216  			t.Fatalf("[%s] server.Reconcile round %d failed: %v", name, rounds, err)
 217  		}
 218  
 219  		if serverComplete {
 220  			msg, complete, err = client.Reconcile(response)
 221  			if err != nil {
 222  				t.Fatalf("[%s] client.Reconcile final round %d failed: %v", name, rounds, err)
 223  			}
 224  			break
 225  		}
 226  
 227  		msg, complete, err = client.Reconcile(response)
 228  		if err != nil {
 229  			t.Fatalf("[%s] client.Reconcile round %d failed: %v", name, rounds, err)
 230  		}
 231  	}
 232  
 233  	t.Logf("[%s] Converged in %d rounds", name, rounds)
 234  
 235  	clientHaves := client.CollectHaves()
 236  	clientHaveNots := client.CollectHaveNots()
 237  
 238  	t.Logf("[%s] Client haves: %d (expect %d), haveNots: %d (expect %d)",
 239  		name, len(clientHaves), expectedClientHaves,
 240  		len(clientHaveNots), expectedClientHaveNots)
 241  
 242  	if len(clientHaves) != expectedClientHaves {
 243  		t.Errorf("[%s] expected %d client haves, got %d",
 244  			name, expectedClientHaves, len(clientHaves))
 245  	}
 246  	for _, id := range clientHaves {
 247  		if !clientOnlyIDs[id] {
 248  			t.Errorf("[%s] unexpected client have: %s", name, truncateID(id))
 249  		}
 250  	}
 251  
 252  	if len(clientHaveNots) != expectedClientHaveNots {
 253  		t.Errorf("[%s] expected %d client haveNots, got %d",
 254  			name, expectedClientHaveNots, len(clientHaveNots))
 255  	}
 256  	for _, id := range clientHaveNots {
 257  		if !serverOnlyIDs[id] {
 258  			t.Errorf("[%s] unexpected client haveNot: %s", name, truncateID(id))
 259  		}
 260  	}
 261  }
 262  
 263  // makeTestID creates a deterministic 64-char hex ID from an integer.
 264  func makeTestID(n int) string {
 265  	// Create a 32-byte ID by repeating a pattern derived from n
 266  	var buf [32]byte
 267  	for i := 0; i < 32; i++ {
 268  		buf[i] = byte((n >> (8 * (i % 4))) & 0xFF)
 269  	}
 270  	return encodeHex(buf[:])
 271  }
 272  
 273  func TestBoundEncoding(t *testing.T) {
 274  	bounds := []Bound{
 275  		MinBound(),
 276  		MaxBound(),
 277  		{Item{Timestamp: 1234567890, ID: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}},
 278  	}
 279  
 280  	for _, b := range bounds {
 281  		enc := NewEncoder(256)
 282  		enc.WriteBound(b)
 283  
 284  		dec := NewDecoder(enc.Bytes())
 285  		decoded, err := dec.ReadBound()
 286  		if err != nil {
 287  			t.Errorf("failed to decode bound %v: %v", b, err)
 288  			continue
 289  		}
 290  
 291  		// For MaxBound, we only compare timestamps since ID is empty
 292  		if b.IsMax() {
 293  			if !decoded.IsMax() {
 294  				t.Errorf("expected MaxBound, got %v", decoded)
 295  			}
 296  		} else if b.Timestamp != decoded.Timestamp {
 297  			t.Errorf("timestamp mismatch: expected %d, got %d", b.Timestamp, decoded.Timestamp)
 298  		}
 299  	}
 300  }
 301