manager.go raw
1 // Package negentropy provides NIP-77 negentropy-based set reconciliation
2 // for both relay-to-relay sync and client-facing WebSocket operations.
3 package negentropy
4
5 import (
6 "context"
7 "encoding/hex"
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "strings"
12 gosync "sync"
13 "time"
14
15 "github.com/gorilla/websocket"
16 "next.orly.dev/pkg/lol/chk"
17 "next.orly.dev/pkg/lol/log"
18
19 "next.orly.dev/pkg/nostr/encoders/event"
20 "next.orly.dev/pkg/nostr/encoders/filter"
21 "next.orly.dev/pkg/nostr/encoders/kind"
22 "next.orly.dev/pkg/nostr/encoders/tag"
23 "next.orly.dev/pkg/nostr/negentropy"
24 "next.orly.dev/pkg/database"
25 "next.orly.dev/pkg/ratelimit"
26 )
27
28 // PeerState represents the sync state for a peer relay.
29 type PeerState struct {
30 URL string
31 LastSync time.Time
32 EventsSynced int64
33 Status string // "idle", "syncing", "error"
34 LastError string
35 ConsecutiveFailures int32
36 }
37
38 // ClientSession represents an active client negentropy session.
39 type ClientSession struct {
40 SubscriptionID string
41 ConnectionID string
42 CreatedAt time.Time
43 LastActivity time.Time
44 RoundCount int32
45 neg *negentropy.Negentropy
46 storage *negentropy.Vector
47 }
48
49 // SetNegentropy sets the negentropy instance and storage for this session.
50 func (s *ClientSession) SetNegentropy(neg *negentropy.Negentropy, storage *negentropy.Vector) {
51 s.neg = neg
52 s.storage = storage
53 }
54
55 // GetNegentropy returns the negentropy instance for this session.
56 func (s *ClientSession) GetNegentropy() *negentropy.Negentropy {
57 return s.neg
58 }
59
60 // Config holds configuration for the negentropy manager.
61 type Config struct {
62 Peers []string
63 SyncInterval time.Duration
64 FrameSize int
65 IDSize int
66 ClientSessionTimeout time.Duration
67 Filter *filter.F // Optional filter for selective sync
68 MaxEvents uint // Max events to sync per cycle (0 = unlimited)
69 MemoryTargetMB int // Memory target for backpressure (0 = disabled)
70 }
71
72 // Manager handles negentropy sync operations.
73 type Manager struct {
74 db database.Database
75 config *Config
76
77 mu gosync.RWMutex
78 peers map[string]*PeerState
79 sessions map[string]*ClientSession // keyed by connectionID:subscriptionID
80 active bool
81 lastSync time.Time
82 stopChan chan struct{}
83 syncWg gosync.WaitGroup
84 memoryMonitor *ratelimit.MemoryMonitor // nil if backpressure disabled
85 }
86
87 // NewManager creates a new negentropy manager.
88 func NewManager(db database.Database, cfg *Config) *Manager {
89 if cfg == nil {
90 cfg = &Config{
91 SyncInterval: 60 * time.Second,
92 FrameSize: 128 * 1024,
93 IDSize: 16,
94 ClientSessionTimeout: 5 * time.Minute,
95 }
96 }
97
98 m := &Manager{
99 db: db,
100 config: cfg,
101 peers: make(map[string]*PeerState),
102 sessions: make(map[string]*ClientSession),
103 }
104
105 // Initialize memory monitor for backpressure if configured
106 if cfg.MemoryTargetMB > 0 {
107 m.memoryMonitor = ratelimit.NewMemoryMonitor(500 * time.Millisecond)
108 m.memoryMonitor.SetMemoryTarget(uint64(cfg.MemoryTargetMB) * 1024 * 1024)
109 m.memoryMonitor.Start()
110 log.I.F("negentropy: backpressure enabled (target %dMB)", cfg.MemoryTargetMB)
111 }
112
113 // Initialize peers from config
114 for _, peerURL := range cfg.Peers {
115 m.peers[peerURL] = &PeerState{
116 URL: peerURL,
117 Status: "idle",
118 }
119 }
120
121 return m
122 }
123
124 // Start starts the background sync loop.
125 func (m *Manager) Start() {
126 m.mu.Lock()
127 if m.active {
128 m.mu.Unlock()
129 return
130 }
131 m.active = true
132 m.stopChan = make(chan struct{})
133 m.mu.Unlock()
134
135 log.I.F("negentropy manager starting background sync")
136
137 m.syncWg.Add(1)
138 go m.syncLoop()
139 }
140
141 // Stop stops the background sync loop.
142 func (m *Manager) Stop() {
143 m.mu.Lock()
144 if !m.active {
145 m.mu.Unlock()
146 return
147 }
148 m.active = false
149 close(m.stopChan)
150 m.mu.Unlock()
151
152 m.syncWg.Wait()
153
154 if m.memoryMonitor != nil {
155 m.memoryMonitor.Stop()
156 }
157
158 log.I.F("negentropy manager stopped")
159 }
160
161 // checkBackpressure applies progressive delays when memory pressure is high.
162 // Returns nil normally, ctx.Err() if the context is cancelled during a pause.
163 func (m *Manager) checkBackpressure(ctx context.Context) error {
164 if m.memoryMonitor == nil {
165 return nil
166 }
167 metrics := m.memoryMonitor.GetMetrics()
168
169 // Emergency mode: pause 10s to let the system recover
170 if metrics.InEmergencyMode {
171 log.W.F("negentropy: pausing sync — emergency mode (memory pressure %.1f%%)",
172 metrics.MemoryPressure*100)
173 select {
174 case <-time.After(10 * time.Second):
175 return nil
176 case <-ctx.Done():
177 return ctx.Err()
178 }
179 }
180
181 // Progressive delay: 0ms at 70% pressure, scaling to 5s at 100%
182 if metrics.MemoryPressure > 0.7 {
183 fraction := (metrics.MemoryPressure - 0.7) / 0.3
184 if fraction > 1.0 {
185 fraction = 1.0
186 }
187 delay := time.Duration(fraction*5000) * time.Millisecond
188 log.D.F("negentropy: backpressure %v (memory pressure %.1f%%)",
189 delay, metrics.MemoryPressure*100)
190 select {
191 case <-time.After(delay):
192 case <-ctx.Done():
193 return ctx.Err()
194 }
195 }
196
197 return nil
198 }
199
200 func (m *Manager) syncLoop() {
201 defer m.syncWg.Done()
202
203 // Do initial sync after a short delay
204 time.Sleep(5 * time.Second)
205 m.syncAllPeers()
206
207 ticker := time.NewTicker(m.config.SyncInterval)
208 defer ticker.Stop()
209
210 for {
211 select {
212 case <-m.stopChan:
213 return
214 case <-ticker.C:
215 m.syncAllPeers()
216 }
217 }
218 }
219
220 func (m *Manager) syncAllPeers() {
221 m.mu.RLock()
222 peers := make([]string, 0, len(m.peers))
223 for url := range m.peers {
224 peers = append(peers, url)
225 }
226 m.mu.RUnlock()
227
228 for _, peerURL := range peers {
229 m.syncWithPeer(context.Background(), peerURL)
230 }
231
232 m.mu.Lock()
233 m.lastSync = time.Now()
234 m.mu.Unlock()
235 }
236
237 func (m *Manager) syncWithPeer(ctx context.Context, peerURL string) {
238 m.mu.Lock()
239 peer, ok := m.peers[peerURL]
240 if !ok {
241 m.mu.Unlock()
242 return
243 }
244 peer.Status = "syncing"
245 m.mu.Unlock()
246
247 log.D.F("negentropy sync starting with %s", peerURL)
248
249 eventsSynced, err := m.performNegentropy(ctx, peerURL)
250
251 m.mu.Lock()
252 peer.LastSync = time.Now()
253 if err != nil {
254 peer.Status = "error"
255 peer.LastError = err.Error()
256 peer.ConsecutiveFailures++
257 log.E.F("negentropy sync with %s failed: %v", peerURL, err)
258 } else {
259 peer.Status = "idle"
260 peer.LastError = ""
261 peer.ConsecutiveFailures = 0
262 peer.EventsSynced += eventsSynced
263 log.D.F("negentropy sync with %s complete: %d events synced", peerURL, eventsSynced)
264 }
265 m.mu.Unlock()
266 }
267
268 // performNegentropy performs the actual NIP-77 negentropy sync with a peer.
269 func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, error) {
270 // Build local storage from our events
271 storage, err := m.buildStorage(ctx)
272 if err != nil {
273 return 0, fmt.Errorf("failed to build storage: %w", err)
274 }
275
276 log.D.F("built negentropy storage with %d events", storage.Size())
277
278 // Create negentropy instance
279 neg := negentropy.New(storage, m.config.FrameSize)
280 defer neg.Close()
281
282 // Connect to peer WebSocket
283 wsURL := strings.Replace(peerURL, "wss://", "wss://", 1)
284 wsURL = strings.Replace(wsURL, "ws://", "ws://", 1)
285 if !strings.HasPrefix(wsURL, "ws") {
286 wsURL = "wss://" + wsURL
287 }
288
289 dialer := websocket.Dialer{
290 HandshakeTimeout: 30 * time.Second,
291 }
292
293 conn, _, err := dialer.DialContext(ctx, wsURL, http.Header{})
294 if err != nil {
295 return 0, fmt.Errorf("failed to connect to peer: %w", err)
296 }
297 defer conn.Close()
298
299 // Generate subscription ID
300 subID := fmt.Sprintf("neg-%d", time.Now().UnixNano())
301
302 // Start negentropy protocol
303 initialMsg, err := neg.Start()
304 if err != nil {
305 return 0, fmt.Errorf("failed to start negentropy: %w", err)
306 }
307
308 // Send NEG-OPEN: ["NEG-OPEN", subscription_id, filter, initial_message]
309 // Use configured filter or empty filter for all events
310 negFilter := m.filterToMap()
311 negOpen := []any{"NEG-OPEN", subID, negFilter, hex.EncodeToString(initialMsg)}
312 if err := conn.WriteJSON(negOpen); err != nil {
313 return 0, fmt.Errorf("failed to send NEG-OPEN: %w", err)
314 }
315
316 var eventsSynced int64
317 var needIDs []string
318 var haveIDs []string
319
320 // Phase 1: Reconciliation - exchange NEG-MSG until complete
321 for i := 0; i < 20; i++ { // Max 20 reconciliation rounds
322 _, msgBytes, err := conn.ReadMessage()
323 if err != nil {
324 return eventsSynced, fmt.Errorf("failed to read message during reconciliation: %w", err)
325 }
326
327 var msg []json.RawMessage
328 if err := json.Unmarshal(msgBytes, &msg); err != nil {
329 return eventsSynced, fmt.Errorf("failed to parse message: %w", err)
330 }
331
332 if len(msg) < 2 {
333 continue
334 }
335
336 var msgType string
337 if err := json.Unmarshal(msg[0], &msgType); err != nil {
338 continue
339 }
340
341 switch msgType {
342 case "NEG-MSG":
343 if len(msg) < 3 {
344 continue
345 }
346 var hexMsg string
347 if err := json.Unmarshal(msg[2], &hexMsg); err != nil {
348 continue
349 }
350
351 negMsg, err := hex.DecodeString(hexMsg)
352 if err != nil {
353 continue
354 }
355
356 response, complete, err := neg.Reconcile(negMsg)
357 if err != nil {
358 return eventsSynced, fmt.Errorf("reconcile failed: %w", err)
359 }
360
361 // Collect IDs we need and IDs we have
362 needIDs = append(needIDs, neg.CollectHaveNots()...)
363 haveIDs = append(haveIDs, neg.CollectHaves()...)
364
365 // Always send the response to the server, even when complete.
366 // The server needs this to finalize its own reconciliation and send events.
367 if len(response) > 0 {
368 negMsgResp := []any{"NEG-MSG", subID, hex.EncodeToString(response)}
369 if err := conn.WriteJSON(negMsgResp); err != nil {
370 return eventsSynced, fmt.Errorf("failed to send NEG-MSG: %w", err)
371 }
372 }
373
374 if complete {
375 log.D.F("negentropy: reconciliation complete, need %d events, have %d to push", len(needIDs), len(haveIDs))
376 goto fetchAndPush
377 }
378
379 case "NEG-ERR":
380 var errMsg string
381 if len(msg) >= 3 {
382 json.Unmarshal(msg[2], &errMsg)
383 }
384 return eventsSynced, fmt.Errorf("peer returned error: %s", errMsg)
385 }
386 }
387
388 fetchAndPush:
389 // Send NEG-CLOSE to end the negentropy session
390 {
391 negClose := []any{"NEG-CLOSE", subID}
392 conn.WriteJSON(negClose)
393 }
394 // Clear any read deadline from the negotiation phase
395 conn.SetReadDeadline(time.Time{})
396
397 log.D.F("negentropy: need %d events, have %d events to send", len(needIDs), len(haveIDs))
398
399 // Phase 2: Fetch events we need from the peer via REQ
400 // The negentropy library only populates haves/haveNots on the initiator (client) side.
401 // The server (responder) does not know which events to push. The client must
402 // actively fetch needed events using standard NIP-01 REQ with ID prefixes.
403 if len(needIDs) > 0 {
404 fetched, err := m.fetchEventsFromPeer(ctx, conn, subID, needIDs)
405 if err != nil {
406 log.W.F("negentropy: failed to fetch events: %v", err)
407 } else {
408 log.D.F("negentropy: fetched %d events from peer", fetched)
409 eventsSynced += int64(fetched)
410 }
411 }
412
413 // Phase 3: Push events we have to the peer
414 if len(haveIDs) > 0 {
415 pushed, err := m.pushEventsToPeer(ctx, conn, haveIDs)
416 if err != nil {
417 log.W.F("failed to push events to peer: %v", err)
418 } else {
419 log.D.F("negentropy: pushed %d events to peer", pushed)
420 eventsSynced += int64(pushed)
421 }
422 }
423
424 return eventsSynced, nil
425 }
426
427 // buildStorage creates a negentropy Vector from local events.
428 func (m *Manager) buildStorage(ctx context.Context) (*negentropy.Vector, error) {
429 storage := negentropy.NewVector()
430
431 // Build filter - start with configured filter or empty
432 // Use configured MaxEvents or default to 1,000,000
433 limit := m.config.MaxEvents
434 if limit == 0 {
435 limit = 1000000 // Default to 1M events
436 }
437 var f *filter.F
438 if m.config.Filter != nil {
439 // Use configured filter with our limit
440 f = m.config.Filter
441 f.Limit = &limit
442 } else {
443 f = &filter.F{
444 Limit: &limit,
445 }
446 }
447
448 idPkTs, err := m.db.QueryForIds(ctx, f)
449 if err != nil {
450 return nil, fmt.Errorf("failed to query events: %w", err)
451 }
452
453 for _, item := range idPkTs {
454 // IDHex() returns lowercase hex string of the event ID
455 storage.Insert(item.Ts, item.IDHex())
456 }
457
458 storage.Seal()
459 return storage, nil
460 }
461
462 // filterToMap converts the configured filter to a map for NEG-OPEN message.
463 func (m *Manager) filterToMap() map[string]any {
464 result := map[string]any{}
465
466 if m.config.Filter == nil {
467 return result
468 }
469
470 f := m.config.Filter
471
472 // Add kinds if present
473 if f.Kinds != nil && f.Kinds.Len() > 0 {
474 kinds := make([]int, 0, f.Kinds.Len())
475 for _, k := range f.Kinds.K {
476 kinds = append(kinds, k.ToInt())
477 }
478 result["kinds"] = kinds
479 }
480
481 // Add authors if present
482 if f.Authors != nil && f.Authors.Len() > 0 {
483 authors := make([]string, 0, f.Authors.Len())
484 for _, a := range f.Authors.T {
485 authors = append(authors, hex.EncodeToString(a))
486 }
487 result["authors"] = authors
488 }
489
490 // Add IDs if present
491 if f.Ids != nil && f.Ids.Len() > 0 {
492 ids := make([]string, 0, f.Ids.Len())
493 for _, id := range f.Ids.T {
494 ids = append(ids, hex.EncodeToString(id))
495 }
496 result["ids"] = ids
497 }
498
499 // Add since if present
500 if f.Since != nil && f.Since.V != 0 {
501 result["since"] = f.Since.V
502 }
503
504 // Add until if present
505 if f.Until != nil && f.Until.V != 0 {
506 result["until"] = f.Until.V
507 }
508
509 // Add limit if present
510 if f.Limit != nil && *f.Limit > 0 {
511 result["limit"] = *f.Limit
512 }
513
514 return result
515 }
516
517 // pushEventsToPeer sends events we have to the peer.
518 // The truncated IDs are 32-char hex prefixes, so we query our local DB and push matching events.
519 func (m *Manager) pushEventsToPeer(ctx context.Context, conn *websocket.Conn, truncatedIDs []string) (int, error) {
520 if len(truncatedIDs) == 0 {
521 return 0, nil
522 }
523 log.D.F("pushEventsToPeer: looking up %d events to push", len(truncatedIDs))
524
525 pushed := 0
526 for _, truncID := range truncatedIDs {
527 // Apply backpressure before each push
528 if err := m.checkBackpressure(ctx); err != nil {
529 return pushed, err
530 }
531
532 // Query local database for events matching this ID prefix
533 // Use QueryByIDPrefix if available, otherwise fall back to broader query
534 events, err := m.queryEventsByIDPrefix(ctx, truncID)
535 if err != nil {
536 log.D.F("failed to query event with prefix %s: %v", truncID, err)
537 continue
538 }
539
540 for _, ev := range events {
541 // Never push privileged or channel events (DMs, gift wraps, NIRC
542 // messages) to peers. These stay on the hosting relay only.
543 if kind.IsPrivileged(ev.Kind) {
544 continue
545 }
546 // Send event to peer
547 eventMsg := []any{"EVENT", ev}
548 if err := conn.WriteJSON(eventMsg); err != nil {
549 log.W.F("failed to push event %s: %v", truncID, err)
550 continue
551 }
552 pushed++
553 }
554 }
555
556 return pushed, nil
557 }
558
559 // queryEventsByIDPrefix queries local database for events matching an ID prefix.
560 func (m *Manager) queryEventsByIDPrefix(ctx context.Context, idPrefix string) ([]*event.E, error) {
561 // For now, query by the prefix - Badger supports prefix iteration
562 // The ID prefix is 32 hex chars = 16 bytes
563 limit := uint(10000) // Get enough events to find our prefix matches
564
565 // Query IDs and filter by prefix
566 f := &filter.F{
567 Limit: &limit,
568 }
569
570 idPkTs, err := m.db.QueryForIds(ctx, f)
571 if err != nil {
572 return nil, err
573 }
574
575 var results []*event.E
576 for _, item := range idPkTs {
577 fullID := item.IDHex()
578 if len(fullID) >= len(idPrefix) && fullID[:len(idPrefix)] == idPrefix {
579 // Found a match - decode the full ID and fetch the event
580 idBytes, err := hex.DecodeString(fullID)
581 if err != nil {
582 log.D.F("failed to decode ID %s: %v", fullID, err)
583 continue
584 }
585
586 // Create filter with the full ID
587 idTag := tag.NewFromBytesSlice(idBytes)
588 evs, err := m.db.QueryEvents(ctx, &filter.F{
589 Ids: idTag,
590 })
591 if err != nil {
592 log.D.F("failed to fetch event %s: %v", fullID, err)
593 continue
594 }
595 if len(evs) > 0 {
596 results = append(results, evs[0])
597 }
598 }
599 }
600
601 return results, nil
602 }
603
604 // fetchEventsFromPeer fetches specific events from a peer by ID (can be prefixes).
605 // NOTE: This is deprecated in favor of push-based sync, but kept for reference.
606 func (m *Manager) fetchEventsFromPeer(ctx context.Context, conn *websocket.Conn, baseSubID string, ids []string) (int, error) {
607 if len(ids) == 0 {
608 return 0, nil
609 }
610 log.D.F("fetchEventsFromPeer: fetching %d events with IDs (first 3): %v", len(ids), ids[:min(3, len(ids))])
611
612 // Batch IDs into chunks of 100
613 const batchSize = 100
614 fetched := 0
615
616 for i := 0; i < len(ids); i += batchSize {
617 // Apply backpressure between batches
618 if err := m.checkBackpressure(ctx); err != nil {
619 return fetched, err
620 }
621
622 end := i + batchSize
623 if end > len(ids) {
624 end = len(ids)
625 }
626 batch := ids[i:end]
627
628 subID := fmt.Sprintf("%s-fetch-%d", baseSubID, i/batchSize)
629 log.D.F("fetchEventsFromPeer: sending REQ %s for batch of %d IDs", subID, len(batch))
630
631 // Send REQ for these IDs
632 filter := map[string]any{
633 "ids": batch,
634 }
635 req := []any{"REQ", subID, filter}
636 reqJSON, _ := json.Marshal(req)
637 log.D.F("fetchEventsFromPeer: REQ message: %s", string(reqJSON)[:min(500, len(reqJSON))])
638
639 if err := conn.WriteJSON(req); err != nil {
640 log.E.F("fetchEventsFromPeer: failed to send REQ: %v", err)
641 return fetched, fmt.Errorf("failed to send REQ: %w", err)
642 }
643
644 // Read events until EOSE
645 messageCount := 0
646 for {
647 _, msgBytes, err := conn.ReadMessage()
648 if err != nil {
649 log.E.F("fetchEventsFromPeer: failed to read after %d messages: %v", messageCount, err)
650 return fetched, fmt.Errorf("failed to read: %w", err)
651 }
652 messageCount++
653
654 var msg []json.RawMessage
655 if err := json.Unmarshal(msgBytes, &msg); err != nil {
656 log.D.F("fetchEventsFromPeer: failed to unmarshal message: %v", err)
657 continue
658 }
659
660 if len(msg) < 2 {
661 log.D.F("fetchEventsFromPeer: message too short: %d elements", len(msg))
662 continue
663 }
664
665 var msgType string
666 if err := json.Unmarshal(msg[0], &msgType); err != nil {
667 log.D.F("fetchEventsFromPeer: failed to unmarshal message type: %v", err)
668 continue
669 }
670
671 switch msgType {
672 case "EVENT":
673 if len(msg) >= 3 {
674 // Apply backpressure before writing each event
675 if err := m.checkBackpressure(ctx); err != nil {
676 return fetched, err
677 }
678 // Store the event
679 if err := m.storeEventFromJSON(ctx, msg[2]); err != nil {
680 log.W.F("fetchEventsFromPeer: failed to store event: %v", err)
681 } else {
682 fetched++
683 if fetched%10 == 0 {
684 log.D.F("fetchEventsFromPeer: stored %d events so far", fetched)
685 }
686 }
687 }
688 case "EOSE":
689 log.D.F("fetchEventsFromPeer: received EOSE for %s after %d messages, fetched %d events in batch", subID, messageCount, fetched)
690 goto nextBatch
691 case "CLOSED":
692 var reason string
693 if len(msg) >= 3 {
694 json.Unmarshal(msg[2], &reason)
695 }
696 log.W.F("fetchEventsFromPeer: subscription %s closed: %s", subID, reason)
697 goto nextBatch
698 case "NOTICE":
699 var notice string
700 if len(msg) >= 2 {
701 json.Unmarshal(msg[1], ¬ice)
702 }
703 log.W.F("fetchEventsFromPeer: NOTICE from peer: %s", notice)
704 default:
705 log.D.F("fetchEventsFromPeer: unknown message type: %s", msgType)
706 }
707 }
708 nextBatch:
709 // Send CLOSE for this subscription
710 closeMsg := []any{"CLOSE", subID}
711 conn.WriteJSON(closeMsg)
712 }
713
714 log.D.F("fetchEventsFromPeer: completed, total fetched: %d", fetched)
715 return fetched, nil
716 }
717
718 // storeEventFromJSON stores an event from raw JSON.
719 func (m *Manager) storeEventFromJSON(ctx context.Context, eventJSON json.RawMessage) error {
720 // Parse the event using the nostr event encoder
721 ev := &event.E{}
722 if err := ev.UnmarshalJSON(eventJSON); err != nil {
723 return fmt.Errorf("failed to unmarshal event: %w", err)
724 }
725
726 // Verify the event signature
727 if ok, err := ev.Verify(); err != nil || !ok {
728 return fmt.Errorf("event verification failed")
729 }
730
731 // Store via database using the standard SaveEvent method
732 _, err := m.db.SaveEvent(ctx, ev)
733 return err
734 }
735
736 // IsActive returns whether background sync is running.
737 func (m *Manager) IsActive() bool {
738 m.mu.RLock()
739 defer m.mu.RUnlock()
740 return m.active
741 }
742
743 // LastSync returns the timestamp of the last sync cycle.
744 func (m *Manager) LastSync() time.Time {
745 m.mu.RLock()
746 defer m.mu.RUnlock()
747 return m.lastSync
748 }
749
750 // GetPeers returns the list of peer URLs.
751 func (m *Manager) GetPeers() []string {
752 m.mu.RLock()
753 defer m.mu.RUnlock()
754 peers := make([]string, 0, len(m.peers))
755 for url := range m.peers {
756 peers = append(peers, url)
757 }
758 return peers
759 }
760
761 // GetPeerStates returns the sync state for all peers.
762 func (m *Manager) GetPeerStates() []*PeerState {
763 m.mu.RLock()
764 defer m.mu.RUnlock()
765 states := make([]*PeerState, 0, len(m.peers))
766 for _, peer := range m.peers {
767 states = append(states, &PeerState{
768 URL: peer.URL,
769 LastSync: peer.LastSync,
770 EventsSynced: peer.EventsSynced,
771 Status: peer.Status,
772 LastError: peer.LastError,
773 ConsecutiveFailures: peer.ConsecutiveFailures,
774 })
775 }
776 return states
777 }
778
779 // GetPeerState returns the sync state for a specific peer.
780 func (m *Manager) GetPeerState(peerURL string) (*PeerState, bool) {
781 m.mu.RLock()
782 defer m.mu.RUnlock()
783 peer, ok := m.peers[peerURL]
784 if !ok {
785 return nil, false
786 }
787 return &PeerState{
788 URL: peer.URL,
789 LastSync: peer.LastSync,
790 EventsSynced: peer.EventsSynced,
791 Status: peer.Status,
792 LastError: peer.LastError,
793 ConsecutiveFailures: peer.ConsecutiveFailures,
794 }, true
795 }
796
797 // AddPeer adds a peer for negentropy sync.
798 func (m *Manager) AddPeer(peerURL string) {
799 m.mu.Lock()
800 defer m.mu.Unlock()
801 if _, ok := m.peers[peerURL]; !ok {
802 m.peers[peerURL] = &PeerState{
803 URL: peerURL,
804 Status: "idle",
805 }
806 }
807 }
808
809 // RemovePeer removes a peer from negentropy sync.
810 func (m *Manager) RemovePeer(peerURL string) {
811 m.mu.Lock()
812 defer m.mu.Unlock()
813 delete(m.peers, peerURL)
814 }
815
816 // TriggerSync manually triggers sync with a specific peer or all peers.
817 func (m *Manager) TriggerSync(ctx context.Context, peerURL string) {
818 if peerURL == "" {
819 m.syncAllPeers()
820 } else {
821 m.syncWithPeer(ctx, peerURL)
822 }
823 }
824
825 // sessionKey creates a unique key for a session.
826 func sessionKey(connectionID, subscriptionID string) string {
827 return connectionID + ":" + subscriptionID
828 }
829
830 // OpenSession opens a new client negentropy session.
831 func (m *Manager) OpenSession(connectionID, subscriptionID string) *ClientSession {
832 m.mu.Lock()
833 defer m.mu.Unlock()
834
835 key := sessionKey(connectionID, subscriptionID)
836 session := &ClientSession{
837 SubscriptionID: subscriptionID,
838 ConnectionID: connectionID,
839 CreatedAt: time.Now(),
840 LastActivity: time.Now(),
841 RoundCount: 0,
842 }
843 m.sessions[key] = session
844 return session
845 }
846
847 // GetSession retrieves an existing session.
848 func (m *Manager) GetSession(connectionID, subscriptionID string) (*ClientSession, bool) {
849 m.mu.RLock()
850 defer m.mu.RUnlock()
851 key := sessionKey(connectionID, subscriptionID)
852 session, ok := m.sessions[key]
853 return session, ok
854 }
855
856 // UpdateSessionActivity updates the last activity time for a session.
857 func (m *Manager) UpdateSessionActivity(connectionID, subscriptionID string) {
858 m.mu.Lock()
859 defer m.mu.Unlock()
860 key := sessionKey(connectionID, subscriptionID)
861 if session, ok := m.sessions[key]; ok {
862 session.LastActivity = time.Now()
863 session.RoundCount++
864 }
865 }
866
867 // CloseSession closes a client session.
868 func (m *Manager) CloseSession(connectionID, subscriptionID string) {
869 m.mu.Lock()
870 defer m.mu.Unlock()
871 key := sessionKey(connectionID, subscriptionID)
872 if session, ok := m.sessions[key]; ok {
873 if session.neg != nil {
874 session.neg.Close()
875 }
876 }
877 delete(m.sessions, key)
878 }
879
880 // CloseSessionsByConnection closes all sessions for a connection.
881 func (m *Manager) CloseSessionsByConnection(connectionID string) {
882 m.mu.Lock()
883 defer m.mu.Unlock()
884 for key, session := range m.sessions {
885 if session.ConnectionID == connectionID {
886 if session.neg != nil {
887 session.neg.Close()
888 }
889 delete(m.sessions, key)
890 }
891 }
892 }
893
894 // ListSessions returns all active sessions.
895 func (m *Manager) ListSessions() []*ClientSession {
896 m.mu.RLock()
897 defer m.mu.RUnlock()
898 sessions := make([]*ClientSession, 0, len(m.sessions))
899 for _, session := range m.sessions {
900 sessions = append(sessions, &ClientSession{
901 SubscriptionID: session.SubscriptionID,
902 ConnectionID: session.ConnectionID,
903 CreatedAt: session.CreatedAt,
904 LastActivity: session.LastActivity,
905 RoundCount: session.RoundCount,
906 })
907 }
908 return sessions
909 }
910
911 // CleanupExpiredSessions removes sessions that have been inactive beyond timeout.
912 func (m *Manager) CleanupExpiredSessions() int {
913 m.mu.Lock()
914 defer m.mu.Unlock()
915
916 cutoff := time.Now().Add(-m.config.ClientSessionTimeout)
917 removed := 0
918 for key, session := range m.sessions {
919 if session.LastActivity.Before(cutoff) {
920 if session.neg != nil {
921 session.neg.Close()
922 }
923 delete(m.sessions, key)
924 removed++
925 }
926 }
927 return removed
928 }
929
930 // Ensure chk is used
931 var _ = chk.E
932