negentropy_test.go raw
1 package negentropy
2
3 import (
4 "testing"
5 )
6
7 func TestVectorBasic(t *testing.T) {
8 v := NewVector()
9 v.Insert(1000, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
10 v.Insert(2000, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
11 v.Insert(3000, "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
12 v.Seal()
13
14 if v.Size() != 3 {
15 t.Errorf("expected size 3, got %d", v.Size())
16 }
17
18 // Test fingerprint
19 fp := v.Fingerprint(0, 3)
20 if fp == EmptyFingerprint {
21 t.Error("expected non-empty fingerprint")
22 }
23 }
24
25 func TestNegentropy(t *testing.T) {
26 // Create two storages with overlapping data
27 v1 := NewVector()
28 v1.Insert(1000, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
29 v1.Insert(2000, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
30 v1.Insert(3000, "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
31 v1.Seal()
32
33 v2 := NewVector()
34 v2.Insert(1000, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
35 v2.Insert(2000, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
36 v2.Insert(4000, "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd")
37 v2.Seal()
38
39 // Client initiates
40 client := New(v1, DefaultFrameSizeLimit)
41 server := New(v2, DefaultFrameSizeLimit)
42
43 msg, err := client.Start()
44 if err != nil {
45 t.Fatalf("client.Start failed: %v", err)
46 }
47
48 // Server responds
49 response, complete, err := server.Reconcile(msg)
50 if err != nil {
51 t.Fatalf("server.Reconcile failed: %v", err)
52 }
53
54 t.Logf("Round 1: complete=%v, response len=%d", complete, len(response))
55
56 // Continue until complete
57 for !complete {
58 response, complete, err = client.Reconcile(response)
59 if err != nil {
60 t.Fatalf("reconcile failed: %v", err)
61 }
62 t.Logf("Round: complete=%v, response len=%d", complete, len(response))
63
64 if !complete {
65 response, complete, err = server.Reconcile(response)
66 if err != nil {
67 t.Fatalf("reconcile failed: %v", err)
68 }
69 t.Logf("Round: complete=%v, response len=%d", complete, len(response))
70 }
71 }
72
73 // Check results
74 clientHaves := client.CollectHaves()
75 clientHaveNots := client.CollectHaveNots()
76 serverHaves := server.CollectHaves()
77 serverHaveNots := server.CollectHaveNots()
78
79 t.Logf("Client haves: %v", clientHaves)
80 t.Logf("Client haveNots: %v", clientHaveNots)
81 t.Logf("Server haves: %v", serverHaves)
82 t.Logf("Server haveNots: %v", serverHaveNots)
83 }
84
85 func TestEncoding(t *testing.T) {
86 // Test VarInt encoding/decoding
87 testCases := []uint64{0, 1, 127, 128, 16383, 16384, 1<<21 - 1, 1 << 21, 1<<63 - 1}
88
89 for _, n := range testCases {
90 encoded := EncodeVarInt(n)
91 dec := NewDecoder(encoded)
92 decoded, err := dec.ReadVarInt()
93 if err != nil {
94 t.Errorf("failed to decode %d: %v", n, err)
95 continue
96 }
97 if decoded != n {
98 t.Errorf("expected %d, got %d", n, decoded)
99 }
100 }
101 }
102
103 // TestConvergenceLargeSets tests that negentropy converges correctly with larger
104 // sets where each side has unique events the other doesn't.
105 func TestConvergenceLargeSets(t *testing.T) {
106 sharedCount := 200
107 clientOnlyCount := 50
108 serverOnlyCount := 150
109
110 v1 := NewVector()
111 v2 := NewVector()
112
113 for i := 0; i < sharedCount; i++ {
114 ts := int64(1000 + i)
115 id := makeTestID(i)
116 v1.Insert(ts, id)
117 v2.Insert(ts, id)
118 }
119
120 clientOnlyIDs := make(map[string]bool)
121 for i := 0; i < clientOnlyCount; i++ {
122 ts := int64(2000 + i)
123 id := makeTestID(10000 + i)
124 v1.Insert(ts, id)
125 clientOnlyIDs[id] = true
126 }
127
128 serverOnlyIDs := make(map[string]bool)
129 for i := 0; i < serverOnlyCount; i++ {
130 ts := int64(500 + i)
131 id := makeTestID(20000 + i)
132 v2.Insert(ts, id)
133 serverOnlyIDs[id] = true
134 }
135
136 v1.Seal()
137 v2.Seal()
138
139 runConvergenceTest(t, "basic", v1, v2, clientOnlyIDs, serverOnlyIDs,
140 clientOnlyCount, serverOnlyCount)
141 }
142
143 // TestConvergenceInterspersed tests the critical case where client-only and
144 // server-only events are interspersed among shared events. This exercises the
145 // bug where splitRange used GetBound(bucketEnd) instead of upperBound for the
146 // last bucket, causing range misalignment when boundary events don't exist in
147 // the responder's storage.
148 func TestConvergenceInterspersed(t *testing.T) {
149 v1 := NewVector()
150 v2 := NewVector()
151
152 clientOnlyIDs := make(map[string]bool)
153 serverOnlyIDs := make(map[string]bool)
154 clientOnlyCount := 0
155 serverOnlyCount := 0
156
157 // Create events at timestamps 0..499. Every 5th timestamp has a
158 // client-only event, every 7th has a server-only event, others are shared.
159 for i := 0; i < 500; i++ {
160 ts := int64(1000 + i)
161 if i%5 == 0 && i%7 != 0 {
162 // Client-only: interspersed among shared events
163 id := makeTestID(30000 + i)
164 v1.Insert(ts, id)
165 clientOnlyIDs[id] = true
166 clientOnlyCount++
167 } else if i%7 == 0 && i%5 != 0 {
168 // Server-only: interspersed among shared events
169 id := makeTestID(40000 + i)
170 v2.Insert(ts, id)
171 serverOnlyIDs[id] = true
172 serverOnlyCount++
173 } else {
174 // Shared event
175 id := makeTestID(50000 + i)
176 v1.Insert(ts, id)
177 v2.Insert(ts, id)
178 }
179 }
180
181 v1.Seal()
182 v2.Seal()
183
184 runConvergenceTest(t, "interspersed", v1, v2, clientOnlyIDs, serverOnlyIDs,
185 clientOnlyCount, serverOnlyCount)
186 }
187
188 // runConvergenceTest runs the full negentropy protocol and verifies correctness.
189 func runConvergenceTest(t *testing.T, name string, v1, v2 *Vector,
190 clientOnlyIDs, serverOnlyIDs map[string]bool,
191 expectedClientHaves, expectedClientHaveNots int) {
192 t.Helper()
193
194 t.Logf("[%s] Client has %d items, Server has %d items", name, v1.Size(), v2.Size())
195 t.Logf("[%s] Expected client-only: %d, server-only: %d",
196 name, expectedClientHaves, expectedClientHaveNots)
197
198 client := New(v1, DefaultFrameSizeLimit)
199 server := New(v2, DefaultFrameSizeLimit)
200
201 msg, err := client.Start()
202 if err != nil {
203 t.Fatalf("client.Start failed: %v", err)
204 }
205
206 rounds := 0
207 complete := false
208 for !complete {
209 rounds++
210 if rounds > 20 {
211 t.Fatalf("[%s] too many rounds (%d), protocol did not converge", name, rounds)
212 }
213
214 response, serverComplete, err := server.Reconcile(msg)
215 if err != nil {
216 t.Fatalf("[%s] server.Reconcile round %d failed: %v", name, rounds, err)
217 }
218
219 if serverComplete {
220 msg, complete, err = client.Reconcile(response)
221 if err != nil {
222 t.Fatalf("[%s] client.Reconcile final round %d failed: %v", name, rounds, err)
223 }
224 break
225 }
226
227 msg, complete, err = client.Reconcile(response)
228 if err != nil {
229 t.Fatalf("[%s] client.Reconcile round %d failed: %v", name, rounds, err)
230 }
231 }
232
233 t.Logf("[%s] Converged in %d rounds", name, rounds)
234
235 clientHaves := client.CollectHaves()
236 clientHaveNots := client.CollectHaveNots()
237
238 t.Logf("[%s] Client haves: %d (expect %d), haveNots: %d (expect %d)",
239 name, len(clientHaves), expectedClientHaves,
240 len(clientHaveNots), expectedClientHaveNots)
241
242 if len(clientHaves) != expectedClientHaves {
243 t.Errorf("[%s] expected %d client haves, got %d",
244 name, expectedClientHaves, len(clientHaves))
245 }
246 for _, id := range clientHaves {
247 if !clientOnlyIDs[id] {
248 t.Errorf("[%s] unexpected client have: %s", name, truncateID(id))
249 }
250 }
251
252 if len(clientHaveNots) != expectedClientHaveNots {
253 t.Errorf("[%s] expected %d client haveNots, got %d",
254 name, expectedClientHaveNots, len(clientHaveNots))
255 }
256 for _, id := range clientHaveNots {
257 if !serverOnlyIDs[id] {
258 t.Errorf("[%s] unexpected client haveNot: %s", name, truncateID(id))
259 }
260 }
261 }
262
263 // makeTestID creates a deterministic 64-char hex ID from an integer.
264 func makeTestID(n int) string {
265 // Create a 32-byte ID by repeating a pattern derived from n
266 var buf [32]byte
267 for i := 0; i < 32; i++ {
268 buf[i] = byte((n >> (8 * (i % 4))) & 0xFF)
269 }
270 return encodeHex(buf[:])
271 }
272
273 func TestBoundEncoding(t *testing.T) {
274 bounds := []Bound{
275 MinBound(),
276 MaxBound(),
277 {Item{Timestamp: 1234567890, ID: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}},
278 }
279
280 for _, b := range bounds {
281 enc := NewEncoder(256)
282 enc.WriteBound(b)
283
284 dec := NewDecoder(enc.Bytes())
285 decoded, err := dec.ReadBound()
286 if err != nil {
287 t.Errorf("failed to decode bound %v: %v", b, err)
288 continue
289 }
290
291 // For MaxBound, we only compare timestamps since ID is empty
292 if b.IsMax() {
293 if !decoded.IsMax() {
294 t.Errorf("expected MaxBound, got %v", decoded)
295 }
296 } else if b.Timestamp != decoded.Timestamp {
297 t.Errorf("timestamp mismatch: expected %d, got %d", b.Timestamp, decoded.Timestamp)
298 }
299 }
300 }
301