manager.go raw

   1  // Package negentropy provides NIP-77 negentropy-based set reconciliation
   2  // for both relay-to-relay sync and client-facing WebSocket operations.
   3  package negentropy
   4  
   5  import (
   6  	"context"
   7  	"encoding/hex"
   8  	"encoding/json"
   9  	"fmt"
  10  	"net/http"
  11  	"strings"
  12  	gosync "sync"
  13  	"time"
  14  
  15  	"github.com/gorilla/websocket"
  16  	"next.orly.dev/pkg/lol/chk"
  17  	"next.orly.dev/pkg/lol/log"
  18  
  19  	"next.orly.dev/pkg/nostr/encoders/event"
  20  	"next.orly.dev/pkg/nostr/encoders/filter"
  21  	"next.orly.dev/pkg/nostr/encoders/kind"
  22  	"next.orly.dev/pkg/nostr/encoders/tag"
  23  	"next.orly.dev/pkg/nostr/negentropy"
  24  	"next.orly.dev/pkg/database"
  25  	"next.orly.dev/pkg/ratelimit"
  26  )
  27  
  28  // PeerState represents the sync state for a peer relay.
  29  type PeerState struct {
  30  	URL                 string
  31  	LastSync            time.Time
  32  	EventsSynced        int64
  33  	Status              string // "idle", "syncing", "error"
  34  	LastError           string
  35  	ConsecutiveFailures int32
  36  }
  37  
  38  // ClientSession represents an active client negentropy session.
  39  type ClientSession struct {
  40  	SubscriptionID string
  41  	ConnectionID   string
  42  	CreatedAt      time.Time
  43  	LastActivity   time.Time
  44  	RoundCount     int32
  45  	neg            *negentropy.Negentropy
  46  	storage        *negentropy.Vector
  47  }
  48  
  49  // SetNegentropy sets the negentropy instance and storage for this session.
  50  func (s *ClientSession) SetNegentropy(neg *negentropy.Negentropy, storage *negentropy.Vector) {
  51  	s.neg = neg
  52  	s.storage = storage
  53  }
  54  
  55  // GetNegentropy returns the negentropy instance for this session.
  56  func (s *ClientSession) GetNegentropy() *negentropy.Negentropy {
  57  	return s.neg
  58  }
  59  
  60  // Config holds configuration for the negentropy manager.
  61  type Config struct {
  62  	Peers                []string
  63  	SyncInterval         time.Duration
  64  	FrameSize            int
  65  	IDSize               int
  66  	ClientSessionTimeout time.Duration
  67  	Filter               *filter.F // Optional filter for selective sync
  68  	MaxEvents            uint      // Max events to sync per cycle (0 = unlimited)
  69  	MemoryTargetMB       int       // Memory target for backpressure (0 = disabled)
  70  }
  71  
  72  // Manager handles negentropy sync operations.
  73  type Manager struct {
  74  	db     database.Database
  75  	config *Config
  76  
  77  	mu             gosync.RWMutex
  78  	peers          map[string]*PeerState
  79  	sessions       map[string]*ClientSession // keyed by connectionID:subscriptionID
  80  	active         bool
  81  	lastSync       time.Time
  82  	stopChan       chan struct{}
  83  	syncWg         gosync.WaitGroup
  84  	memoryMonitor  *ratelimit.MemoryMonitor // nil if backpressure disabled
  85  }
  86  
  87  // NewManager creates a new negentropy manager.
  88  func NewManager(db database.Database, cfg *Config) *Manager {
  89  	if cfg == nil {
  90  		cfg = &Config{
  91  			SyncInterval:         60 * time.Second,
  92  			FrameSize:            128 * 1024,
  93  			IDSize:               16,
  94  			ClientSessionTimeout: 5 * time.Minute,
  95  		}
  96  	}
  97  
  98  	m := &Manager{
  99  		db:       db,
 100  		config:   cfg,
 101  		peers:    make(map[string]*PeerState),
 102  		sessions: make(map[string]*ClientSession),
 103  	}
 104  
 105  	// Initialize memory monitor for backpressure if configured
 106  	if cfg.MemoryTargetMB > 0 {
 107  		m.memoryMonitor = ratelimit.NewMemoryMonitor(500 * time.Millisecond)
 108  		m.memoryMonitor.SetMemoryTarget(uint64(cfg.MemoryTargetMB) * 1024 * 1024)
 109  		m.memoryMonitor.Start()
 110  		log.I.F("negentropy: backpressure enabled (target %dMB)", cfg.MemoryTargetMB)
 111  	}
 112  
 113  	// Initialize peers from config
 114  	for _, peerURL := range cfg.Peers {
 115  		m.peers[peerURL] = &PeerState{
 116  			URL:    peerURL,
 117  			Status: "idle",
 118  		}
 119  	}
 120  
 121  	return m
 122  }
 123  
 124  // Start starts the background sync loop.
 125  func (m *Manager) Start() {
 126  	m.mu.Lock()
 127  	if m.active {
 128  		m.mu.Unlock()
 129  		return
 130  	}
 131  	m.active = true
 132  	m.stopChan = make(chan struct{})
 133  	m.mu.Unlock()
 134  
 135  	log.I.F("negentropy manager starting background sync")
 136  
 137  	m.syncWg.Add(1)
 138  	go m.syncLoop()
 139  }
 140  
 141  // Stop stops the background sync loop.
 142  func (m *Manager) Stop() {
 143  	m.mu.Lock()
 144  	if !m.active {
 145  		m.mu.Unlock()
 146  		return
 147  	}
 148  	m.active = false
 149  	close(m.stopChan)
 150  	m.mu.Unlock()
 151  
 152  	m.syncWg.Wait()
 153  
 154  	if m.memoryMonitor != nil {
 155  		m.memoryMonitor.Stop()
 156  	}
 157  
 158  	log.I.F("negentropy manager stopped")
 159  }
 160  
 161  // checkBackpressure applies progressive delays when memory pressure is high.
 162  // Returns nil normally, ctx.Err() if the context is cancelled during a pause.
 163  func (m *Manager) checkBackpressure(ctx context.Context) error {
 164  	if m.memoryMonitor == nil {
 165  		return nil
 166  	}
 167  	metrics := m.memoryMonitor.GetMetrics()
 168  
 169  	// Emergency mode: pause 10s to let the system recover
 170  	if metrics.InEmergencyMode {
 171  		log.W.F("negentropy: pausing sync — emergency mode (memory pressure %.1f%%)",
 172  			metrics.MemoryPressure*100)
 173  		select {
 174  		case <-time.After(10 * time.Second):
 175  			return nil
 176  		case <-ctx.Done():
 177  			return ctx.Err()
 178  		}
 179  	}
 180  
 181  	// Progressive delay: 0ms at 70% pressure, scaling to 5s at 100%
 182  	if metrics.MemoryPressure > 0.7 {
 183  		fraction := (metrics.MemoryPressure - 0.7) / 0.3
 184  		if fraction > 1.0 {
 185  			fraction = 1.0
 186  		}
 187  		delay := time.Duration(fraction*5000) * time.Millisecond
 188  		log.D.F("negentropy: backpressure %v (memory pressure %.1f%%)",
 189  			delay, metrics.MemoryPressure*100)
 190  		select {
 191  		case <-time.After(delay):
 192  		case <-ctx.Done():
 193  			return ctx.Err()
 194  		}
 195  	}
 196  
 197  	return nil
 198  }
 199  
 200  func (m *Manager) syncLoop() {
 201  	defer m.syncWg.Done()
 202  
 203  	// Do initial sync after a short delay
 204  	time.Sleep(5 * time.Second)
 205  	m.syncAllPeers()
 206  
 207  	ticker := time.NewTicker(m.config.SyncInterval)
 208  	defer ticker.Stop()
 209  
 210  	for {
 211  		select {
 212  		case <-m.stopChan:
 213  			return
 214  		case <-ticker.C:
 215  			m.syncAllPeers()
 216  		}
 217  	}
 218  }
 219  
 220  func (m *Manager) syncAllPeers() {
 221  	m.mu.RLock()
 222  	peers := make([]string, 0, len(m.peers))
 223  	for url := range m.peers {
 224  		peers = append(peers, url)
 225  	}
 226  	m.mu.RUnlock()
 227  
 228  	for _, peerURL := range peers {
 229  		m.syncWithPeer(context.Background(), peerURL)
 230  	}
 231  
 232  	m.mu.Lock()
 233  	m.lastSync = time.Now()
 234  	m.mu.Unlock()
 235  }
 236  
 237  func (m *Manager) syncWithPeer(ctx context.Context, peerURL string) {
 238  	m.mu.Lock()
 239  	peer, ok := m.peers[peerURL]
 240  	if !ok {
 241  		m.mu.Unlock()
 242  		return
 243  	}
 244  	peer.Status = "syncing"
 245  	m.mu.Unlock()
 246  
 247  	log.D.F("negentropy sync starting with %s", peerURL)
 248  
 249  	eventsSynced, err := m.performNegentropy(ctx, peerURL)
 250  
 251  	m.mu.Lock()
 252  	peer.LastSync = time.Now()
 253  	if err != nil {
 254  		peer.Status = "error"
 255  		peer.LastError = err.Error()
 256  		peer.ConsecutiveFailures++
 257  		log.E.F("negentropy sync with %s failed: %v", peerURL, err)
 258  	} else {
 259  		peer.Status = "idle"
 260  		peer.LastError = ""
 261  		peer.ConsecutiveFailures = 0
 262  		peer.EventsSynced += eventsSynced
 263  		log.D.F("negentropy sync with %s complete: %d events synced", peerURL, eventsSynced)
 264  	}
 265  	m.mu.Unlock()
 266  }
 267  
 268  // performNegentropy performs the actual NIP-77 negentropy sync with a peer.
 269  func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, error) {
 270  	// Build local storage from our events
 271  	storage, err := m.buildStorage(ctx)
 272  	if err != nil {
 273  		return 0, fmt.Errorf("failed to build storage: %w", err)
 274  	}
 275  
 276  	log.D.F("built negentropy storage with %d events", storage.Size())
 277  
 278  	// Create negentropy instance
 279  	neg := negentropy.New(storage, m.config.FrameSize)
 280  	defer neg.Close()
 281  
 282  	// Connect to peer WebSocket
 283  	wsURL := strings.Replace(peerURL, "wss://", "wss://", 1)
 284  	wsURL = strings.Replace(wsURL, "ws://", "ws://", 1)
 285  	if !strings.HasPrefix(wsURL, "ws") {
 286  		wsURL = "wss://" + wsURL
 287  	}
 288  
 289  	dialer := websocket.Dialer{
 290  		HandshakeTimeout: 30 * time.Second,
 291  	}
 292  
 293  	conn, _, err := dialer.DialContext(ctx, wsURL, http.Header{})
 294  	if err != nil {
 295  		return 0, fmt.Errorf("failed to connect to peer: %w", err)
 296  	}
 297  	defer conn.Close()
 298  
 299  	// Generate subscription ID
 300  	subID := fmt.Sprintf("neg-%d", time.Now().UnixNano())
 301  
 302  	// Start negentropy protocol
 303  	initialMsg, err := neg.Start()
 304  	if err != nil {
 305  		return 0, fmt.Errorf("failed to start negentropy: %w", err)
 306  	}
 307  
 308  	// Send NEG-OPEN: ["NEG-OPEN", subscription_id, filter, initial_message]
 309  	// Use configured filter or empty filter for all events
 310  	negFilter := m.filterToMap()
 311  	negOpen := []any{"NEG-OPEN", subID, negFilter, hex.EncodeToString(initialMsg)}
 312  	if err := conn.WriteJSON(negOpen); err != nil {
 313  		return 0, fmt.Errorf("failed to send NEG-OPEN: %w", err)
 314  	}
 315  
 316  	var eventsSynced int64
 317  	var needIDs []string
 318  	var haveIDs []string
 319  
 320  	// Phase 1: Reconciliation - exchange NEG-MSG until complete
 321  	for i := 0; i < 20; i++ { // Max 20 reconciliation rounds
 322  		_, msgBytes, err := conn.ReadMessage()
 323  		if err != nil {
 324  			return eventsSynced, fmt.Errorf("failed to read message during reconciliation: %w", err)
 325  		}
 326  
 327  		var msg []json.RawMessage
 328  		if err := json.Unmarshal(msgBytes, &msg); err != nil {
 329  			return eventsSynced, fmt.Errorf("failed to parse message: %w", err)
 330  		}
 331  
 332  		if len(msg) < 2 {
 333  			continue
 334  		}
 335  
 336  		var msgType string
 337  		if err := json.Unmarshal(msg[0], &msgType); err != nil {
 338  			continue
 339  		}
 340  
 341  		switch msgType {
 342  		case "NEG-MSG":
 343  			if len(msg) < 3 {
 344  				continue
 345  			}
 346  			var hexMsg string
 347  			if err := json.Unmarshal(msg[2], &hexMsg); err != nil {
 348  				continue
 349  			}
 350  
 351  			negMsg, err := hex.DecodeString(hexMsg)
 352  			if err != nil {
 353  				continue
 354  			}
 355  
 356  			response, complete, err := neg.Reconcile(negMsg)
 357  			if err != nil {
 358  				return eventsSynced, fmt.Errorf("reconcile failed: %w", err)
 359  			}
 360  
 361  			// Collect IDs we need and IDs we have
 362  			needIDs = append(needIDs, neg.CollectHaveNots()...)
 363  			haveIDs = append(haveIDs, neg.CollectHaves()...)
 364  
 365  			// Always send the response to the server, even when complete.
 366  			// The server needs this to finalize its own reconciliation and send events.
 367  			if len(response) > 0 {
 368  				negMsgResp := []any{"NEG-MSG", subID, hex.EncodeToString(response)}
 369  				if err := conn.WriteJSON(negMsgResp); err != nil {
 370  					return eventsSynced, fmt.Errorf("failed to send NEG-MSG: %w", err)
 371  				}
 372  			}
 373  
 374  			if complete {
 375  				log.D.F("negentropy: reconciliation complete, need %d events, have %d to push", len(needIDs), len(haveIDs))
 376  				goto fetchAndPush
 377  			}
 378  
 379  		case "NEG-ERR":
 380  			var errMsg string
 381  			if len(msg) >= 3 {
 382  				json.Unmarshal(msg[2], &errMsg)
 383  			}
 384  			return eventsSynced, fmt.Errorf("peer returned error: %s", errMsg)
 385  		}
 386  	}
 387  
 388  fetchAndPush:
 389  	// Send NEG-CLOSE to end the negentropy session
 390  	{
 391  		negClose := []any{"NEG-CLOSE", subID}
 392  		conn.WriteJSON(negClose)
 393  	}
 394  	// Clear any read deadline from the negotiation phase
 395  	conn.SetReadDeadline(time.Time{})
 396  
 397  	log.D.F("negentropy: need %d events, have %d events to send", len(needIDs), len(haveIDs))
 398  
 399  	// Phase 2: Fetch events we need from the peer via REQ
 400  	// The negentropy library only populates haves/haveNots on the initiator (client) side.
 401  	// The server (responder) does not know which events to push. The client must
 402  	// actively fetch needed events using standard NIP-01 REQ with ID prefixes.
 403  	if len(needIDs) > 0 {
 404  		fetched, err := m.fetchEventsFromPeer(ctx, conn, subID, needIDs)
 405  		if err != nil {
 406  			log.W.F("negentropy: failed to fetch events: %v", err)
 407  		} else {
 408  			log.D.F("negentropy: fetched %d events from peer", fetched)
 409  			eventsSynced += int64(fetched)
 410  		}
 411  	}
 412  
 413  	// Phase 3: Push events we have to the peer
 414  	if len(haveIDs) > 0 {
 415  		pushed, err := m.pushEventsToPeer(ctx, conn, haveIDs)
 416  		if err != nil {
 417  			log.W.F("failed to push events to peer: %v", err)
 418  		} else {
 419  			log.D.F("negentropy: pushed %d events to peer", pushed)
 420  			eventsSynced += int64(pushed)
 421  		}
 422  	}
 423  
 424  	return eventsSynced, nil
 425  }
 426  
 427  // buildStorage creates a negentropy Vector from local events.
 428  func (m *Manager) buildStorage(ctx context.Context) (*negentropy.Vector, error) {
 429  	storage := negentropy.NewVector()
 430  
 431  	// Build filter - start with configured filter or empty
 432  	// Use configured MaxEvents or default to 1,000,000
 433  	limit := m.config.MaxEvents
 434  	if limit == 0 {
 435  		limit = 1000000 // Default to 1M events
 436  	}
 437  	var f *filter.F
 438  	if m.config.Filter != nil {
 439  		// Use configured filter with our limit
 440  		f = m.config.Filter
 441  		f.Limit = &limit
 442  	} else {
 443  		f = &filter.F{
 444  			Limit: &limit,
 445  		}
 446  	}
 447  
 448  	idPkTs, err := m.db.QueryForIds(ctx, f)
 449  	if err != nil {
 450  		return nil, fmt.Errorf("failed to query events: %w", err)
 451  	}
 452  
 453  	for _, item := range idPkTs {
 454  		// IDHex() returns lowercase hex string of the event ID
 455  		storage.Insert(item.Ts, item.IDHex())
 456  	}
 457  
 458  	storage.Seal()
 459  	return storage, nil
 460  }
 461  
 462  // filterToMap converts the configured filter to a map for NEG-OPEN message.
 463  func (m *Manager) filterToMap() map[string]any {
 464  	result := map[string]any{}
 465  
 466  	if m.config.Filter == nil {
 467  		return result
 468  	}
 469  
 470  	f := m.config.Filter
 471  
 472  	// Add kinds if present
 473  	if f.Kinds != nil && f.Kinds.Len() > 0 {
 474  		kinds := make([]int, 0, f.Kinds.Len())
 475  		for _, k := range f.Kinds.K {
 476  			kinds = append(kinds, k.ToInt())
 477  		}
 478  		result["kinds"] = kinds
 479  	}
 480  
 481  	// Add authors if present
 482  	if f.Authors != nil && f.Authors.Len() > 0 {
 483  		authors := make([]string, 0, f.Authors.Len())
 484  		for _, a := range f.Authors.T {
 485  			authors = append(authors, hex.EncodeToString(a))
 486  		}
 487  		result["authors"] = authors
 488  	}
 489  
 490  	// Add IDs if present
 491  	if f.Ids != nil && f.Ids.Len() > 0 {
 492  		ids := make([]string, 0, f.Ids.Len())
 493  		for _, id := range f.Ids.T {
 494  			ids = append(ids, hex.EncodeToString(id))
 495  		}
 496  		result["ids"] = ids
 497  	}
 498  
 499  	// Add since if present
 500  	if f.Since != nil && f.Since.V != 0 {
 501  		result["since"] = f.Since.V
 502  	}
 503  
 504  	// Add until if present
 505  	if f.Until != nil && f.Until.V != 0 {
 506  		result["until"] = f.Until.V
 507  	}
 508  
 509  	// Add limit if present
 510  	if f.Limit != nil && *f.Limit > 0 {
 511  		result["limit"] = *f.Limit
 512  	}
 513  
 514  	return result
 515  }
 516  
 517  // pushEventsToPeer sends events we have to the peer.
 518  // The truncated IDs are 32-char hex prefixes, so we query our local DB and push matching events.
 519  func (m *Manager) pushEventsToPeer(ctx context.Context, conn *websocket.Conn, truncatedIDs []string) (int, error) {
 520  	if len(truncatedIDs) == 0 {
 521  		return 0, nil
 522  	}
 523  	log.D.F("pushEventsToPeer: looking up %d events to push", len(truncatedIDs))
 524  
 525  	pushed := 0
 526  	for _, truncID := range truncatedIDs {
 527  		// Apply backpressure before each push
 528  		if err := m.checkBackpressure(ctx); err != nil {
 529  			return pushed, err
 530  		}
 531  
 532  		// Query local database for events matching this ID prefix
 533  		// Use QueryByIDPrefix if available, otherwise fall back to broader query
 534  		events, err := m.queryEventsByIDPrefix(ctx, truncID)
 535  		if err != nil {
 536  			log.D.F("failed to query event with prefix %s: %v", truncID, err)
 537  			continue
 538  		}
 539  
 540  		for _, ev := range events {
 541  			// Never push privileged or channel events (DMs, gift wraps, NIRC
 542  			// messages) to peers. These stay on the hosting relay only.
 543  			if kind.IsPrivileged(ev.Kind) {
 544  				continue
 545  			}
 546  			// Send event to peer
 547  			eventMsg := []any{"EVENT", ev}
 548  			if err := conn.WriteJSON(eventMsg); err != nil {
 549  				log.W.F("failed to push event %s: %v", truncID, err)
 550  				continue
 551  			}
 552  			pushed++
 553  		}
 554  	}
 555  
 556  	return pushed, nil
 557  }
 558  
 559  // queryEventsByIDPrefix queries local database for events matching an ID prefix.
 560  func (m *Manager) queryEventsByIDPrefix(ctx context.Context, idPrefix string) ([]*event.E, error) {
 561  	// For now, query by the prefix - Badger supports prefix iteration
 562  	// The ID prefix is 32 hex chars = 16 bytes
 563  	limit := uint(10000) // Get enough events to find our prefix matches
 564  
 565  	// Query IDs and filter by prefix
 566  	f := &filter.F{
 567  		Limit: &limit,
 568  	}
 569  
 570  	idPkTs, err := m.db.QueryForIds(ctx, f)
 571  	if err != nil {
 572  		return nil, err
 573  	}
 574  
 575  	var results []*event.E
 576  	for _, item := range idPkTs {
 577  		fullID := item.IDHex()
 578  		if len(fullID) >= len(idPrefix) && fullID[:len(idPrefix)] == idPrefix {
 579  			// Found a match - decode the full ID and fetch the event
 580  			idBytes, err := hex.DecodeString(fullID)
 581  			if err != nil {
 582  				log.D.F("failed to decode ID %s: %v", fullID, err)
 583  				continue
 584  			}
 585  
 586  			// Create filter with the full ID
 587  			idTag := tag.NewFromBytesSlice(idBytes)
 588  			evs, err := m.db.QueryEvents(ctx, &filter.F{
 589  				Ids: idTag,
 590  			})
 591  			if err != nil {
 592  				log.D.F("failed to fetch event %s: %v", fullID, err)
 593  				continue
 594  			}
 595  			if len(evs) > 0 {
 596  				results = append(results, evs[0])
 597  			}
 598  		}
 599  	}
 600  
 601  	return results, nil
 602  }
 603  
 604  // fetchEventsFromPeer fetches specific events from a peer by ID (can be prefixes).
 605  // NOTE: This is deprecated in favor of push-based sync, but kept for reference.
 606  func (m *Manager) fetchEventsFromPeer(ctx context.Context, conn *websocket.Conn, baseSubID string, ids []string) (int, error) {
 607  	if len(ids) == 0 {
 608  		return 0, nil
 609  	}
 610  	log.D.F("fetchEventsFromPeer: fetching %d events with IDs (first 3): %v", len(ids), ids[:min(3, len(ids))])
 611  
 612  	// Batch IDs into chunks of 100
 613  	const batchSize = 100
 614  	fetched := 0
 615  
 616  	for i := 0; i < len(ids); i += batchSize {
 617  		// Apply backpressure between batches
 618  		if err := m.checkBackpressure(ctx); err != nil {
 619  			return fetched, err
 620  		}
 621  
 622  		end := i + batchSize
 623  		if end > len(ids) {
 624  			end = len(ids)
 625  		}
 626  		batch := ids[i:end]
 627  
 628  		subID := fmt.Sprintf("%s-fetch-%d", baseSubID, i/batchSize)
 629  		log.D.F("fetchEventsFromPeer: sending REQ %s for batch of %d IDs", subID, len(batch))
 630  
 631  		// Send REQ for these IDs
 632  		filter := map[string]any{
 633  			"ids": batch,
 634  		}
 635  		req := []any{"REQ", subID, filter}
 636  		reqJSON, _ := json.Marshal(req)
 637  		log.D.F("fetchEventsFromPeer: REQ message: %s", string(reqJSON)[:min(500, len(reqJSON))])
 638  
 639  		if err := conn.WriteJSON(req); err != nil {
 640  			log.E.F("fetchEventsFromPeer: failed to send REQ: %v", err)
 641  			return fetched, fmt.Errorf("failed to send REQ: %w", err)
 642  		}
 643  
 644  		// Read events until EOSE
 645  		messageCount := 0
 646  		for {
 647  			_, msgBytes, err := conn.ReadMessage()
 648  			if err != nil {
 649  				log.E.F("fetchEventsFromPeer: failed to read after %d messages: %v", messageCount, err)
 650  				return fetched, fmt.Errorf("failed to read: %w", err)
 651  			}
 652  			messageCount++
 653  
 654  			var msg []json.RawMessage
 655  			if err := json.Unmarshal(msgBytes, &msg); err != nil {
 656  				log.D.F("fetchEventsFromPeer: failed to unmarshal message: %v", err)
 657  				continue
 658  			}
 659  
 660  			if len(msg) < 2 {
 661  				log.D.F("fetchEventsFromPeer: message too short: %d elements", len(msg))
 662  				continue
 663  			}
 664  
 665  			var msgType string
 666  			if err := json.Unmarshal(msg[0], &msgType); err != nil {
 667  				log.D.F("fetchEventsFromPeer: failed to unmarshal message type: %v", err)
 668  				continue
 669  			}
 670  
 671  			switch msgType {
 672  			case "EVENT":
 673  				if len(msg) >= 3 {
 674  					// Apply backpressure before writing each event
 675  					if err := m.checkBackpressure(ctx); err != nil {
 676  						return fetched, err
 677  					}
 678  					// Store the event
 679  					if err := m.storeEventFromJSON(ctx, msg[2]); err != nil {
 680  						log.W.F("fetchEventsFromPeer: failed to store event: %v", err)
 681  					} else {
 682  						fetched++
 683  						if fetched%10 == 0 {
 684  							log.D.F("fetchEventsFromPeer: stored %d events so far", fetched)
 685  						}
 686  					}
 687  				}
 688  			case "EOSE":
 689  				log.D.F("fetchEventsFromPeer: received EOSE for %s after %d messages, fetched %d events in batch", subID, messageCount, fetched)
 690  				goto nextBatch
 691  			case "CLOSED":
 692  				var reason string
 693  				if len(msg) >= 3 {
 694  					json.Unmarshal(msg[2], &reason)
 695  				}
 696  				log.W.F("fetchEventsFromPeer: subscription %s closed: %s", subID, reason)
 697  				goto nextBatch
 698  			case "NOTICE":
 699  				var notice string
 700  				if len(msg) >= 2 {
 701  					json.Unmarshal(msg[1], &notice)
 702  				}
 703  				log.W.F("fetchEventsFromPeer: NOTICE from peer: %s", notice)
 704  			default:
 705  				log.D.F("fetchEventsFromPeer: unknown message type: %s", msgType)
 706  			}
 707  		}
 708  	nextBatch:
 709  		// Send CLOSE for this subscription
 710  		closeMsg := []any{"CLOSE", subID}
 711  		conn.WriteJSON(closeMsg)
 712  	}
 713  
 714  	log.D.F("fetchEventsFromPeer: completed, total fetched: %d", fetched)
 715  	return fetched, nil
 716  }
 717  
 718  // storeEventFromJSON stores an event from raw JSON.
 719  func (m *Manager) storeEventFromJSON(ctx context.Context, eventJSON json.RawMessage) error {
 720  	// Parse the event using the nostr event encoder
 721  	ev := &event.E{}
 722  	if err := ev.UnmarshalJSON(eventJSON); err != nil {
 723  		return fmt.Errorf("failed to unmarshal event: %w", err)
 724  	}
 725  
 726  	// Verify the event signature
 727  	if ok, err := ev.Verify(); err != nil || !ok {
 728  		return fmt.Errorf("event verification failed")
 729  	}
 730  
 731  	// Store via database using the standard SaveEvent method
 732  	_, err := m.db.SaveEvent(ctx, ev)
 733  	return err
 734  }
 735  
 736  // IsActive returns whether background sync is running.
 737  func (m *Manager) IsActive() bool {
 738  	m.mu.RLock()
 739  	defer m.mu.RUnlock()
 740  	return m.active
 741  }
 742  
 743  // LastSync returns the timestamp of the last sync cycle.
 744  func (m *Manager) LastSync() time.Time {
 745  	m.mu.RLock()
 746  	defer m.mu.RUnlock()
 747  	return m.lastSync
 748  }
 749  
 750  // GetPeers returns the list of peer URLs.
 751  func (m *Manager) GetPeers() []string {
 752  	m.mu.RLock()
 753  	defer m.mu.RUnlock()
 754  	peers := make([]string, 0, len(m.peers))
 755  	for url := range m.peers {
 756  		peers = append(peers, url)
 757  	}
 758  	return peers
 759  }
 760  
 761  // GetPeerStates returns the sync state for all peers.
 762  func (m *Manager) GetPeerStates() []*PeerState {
 763  	m.mu.RLock()
 764  	defer m.mu.RUnlock()
 765  	states := make([]*PeerState, 0, len(m.peers))
 766  	for _, peer := range m.peers {
 767  		states = append(states, &PeerState{
 768  			URL:                 peer.URL,
 769  			LastSync:            peer.LastSync,
 770  			EventsSynced:        peer.EventsSynced,
 771  			Status:              peer.Status,
 772  			LastError:           peer.LastError,
 773  			ConsecutiveFailures: peer.ConsecutiveFailures,
 774  		})
 775  	}
 776  	return states
 777  }
 778  
 779  // GetPeerState returns the sync state for a specific peer.
 780  func (m *Manager) GetPeerState(peerURL string) (*PeerState, bool) {
 781  	m.mu.RLock()
 782  	defer m.mu.RUnlock()
 783  	peer, ok := m.peers[peerURL]
 784  	if !ok {
 785  		return nil, false
 786  	}
 787  	return &PeerState{
 788  		URL:                 peer.URL,
 789  		LastSync:            peer.LastSync,
 790  		EventsSynced:        peer.EventsSynced,
 791  		Status:              peer.Status,
 792  		LastError:           peer.LastError,
 793  		ConsecutiveFailures: peer.ConsecutiveFailures,
 794  	}, true
 795  }
 796  
 797  // AddPeer adds a peer for negentropy sync.
 798  func (m *Manager) AddPeer(peerURL string) {
 799  	m.mu.Lock()
 800  	defer m.mu.Unlock()
 801  	if _, ok := m.peers[peerURL]; !ok {
 802  		m.peers[peerURL] = &PeerState{
 803  			URL:    peerURL,
 804  			Status: "idle",
 805  		}
 806  	}
 807  }
 808  
 809  // RemovePeer removes a peer from negentropy sync.
 810  func (m *Manager) RemovePeer(peerURL string) {
 811  	m.mu.Lock()
 812  	defer m.mu.Unlock()
 813  	delete(m.peers, peerURL)
 814  }
 815  
 816  // TriggerSync manually triggers sync with a specific peer or all peers.
 817  func (m *Manager) TriggerSync(ctx context.Context, peerURL string) {
 818  	if peerURL == "" {
 819  		m.syncAllPeers()
 820  	} else {
 821  		m.syncWithPeer(ctx, peerURL)
 822  	}
 823  }
 824  
 825  // sessionKey creates a unique key for a session.
 826  func sessionKey(connectionID, subscriptionID string) string {
 827  	return connectionID + ":" + subscriptionID
 828  }
 829  
 830  // OpenSession opens a new client negentropy session.
 831  func (m *Manager) OpenSession(connectionID, subscriptionID string) *ClientSession {
 832  	m.mu.Lock()
 833  	defer m.mu.Unlock()
 834  
 835  	key := sessionKey(connectionID, subscriptionID)
 836  	session := &ClientSession{
 837  		SubscriptionID: subscriptionID,
 838  		ConnectionID:   connectionID,
 839  		CreatedAt:      time.Now(),
 840  		LastActivity:   time.Now(),
 841  		RoundCount:     0,
 842  	}
 843  	m.sessions[key] = session
 844  	return session
 845  }
 846  
 847  // GetSession retrieves an existing session.
 848  func (m *Manager) GetSession(connectionID, subscriptionID string) (*ClientSession, bool) {
 849  	m.mu.RLock()
 850  	defer m.mu.RUnlock()
 851  	key := sessionKey(connectionID, subscriptionID)
 852  	session, ok := m.sessions[key]
 853  	return session, ok
 854  }
 855  
 856  // UpdateSessionActivity updates the last activity time for a session.
 857  func (m *Manager) UpdateSessionActivity(connectionID, subscriptionID string) {
 858  	m.mu.Lock()
 859  	defer m.mu.Unlock()
 860  	key := sessionKey(connectionID, subscriptionID)
 861  	if session, ok := m.sessions[key]; ok {
 862  		session.LastActivity = time.Now()
 863  		session.RoundCount++
 864  	}
 865  }
 866  
 867  // CloseSession closes a client session.
 868  func (m *Manager) CloseSession(connectionID, subscriptionID string) {
 869  	m.mu.Lock()
 870  	defer m.mu.Unlock()
 871  	key := sessionKey(connectionID, subscriptionID)
 872  	if session, ok := m.sessions[key]; ok {
 873  		if session.neg != nil {
 874  			session.neg.Close()
 875  		}
 876  	}
 877  	delete(m.sessions, key)
 878  }
 879  
 880  // CloseSessionsByConnection closes all sessions for a connection.
 881  func (m *Manager) CloseSessionsByConnection(connectionID string) {
 882  	m.mu.Lock()
 883  	defer m.mu.Unlock()
 884  	for key, session := range m.sessions {
 885  		if session.ConnectionID == connectionID {
 886  			if session.neg != nil {
 887  				session.neg.Close()
 888  			}
 889  			delete(m.sessions, key)
 890  		}
 891  	}
 892  }
 893  
 894  // ListSessions returns all active sessions.
 895  func (m *Manager) ListSessions() []*ClientSession {
 896  	m.mu.RLock()
 897  	defer m.mu.RUnlock()
 898  	sessions := make([]*ClientSession, 0, len(m.sessions))
 899  	for _, session := range m.sessions {
 900  		sessions = append(sessions, &ClientSession{
 901  			SubscriptionID: session.SubscriptionID,
 902  			ConnectionID:   session.ConnectionID,
 903  			CreatedAt:      session.CreatedAt,
 904  			LastActivity:   session.LastActivity,
 905  			RoundCount:     session.RoundCount,
 906  		})
 907  	}
 908  	return sessions
 909  }
 910  
 911  // CleanupExpiredSessions removes sessions that have been inactive beyond timeout.
 912  func (m *Manager) CleanupExpiredSessions() int {
 913  	m.mu.Lock()
 914  	defer m.mu.Unlock()
 915  
 916  	cutoff := time.Now().Add(-m.config.ClientSessionTimeout)
 917  	removed := 0
 918  	for key, session := range m.sessions {
 919  		if session.LastActivity.Before(cutoff) {
 920  			if session.neg != nil {
 921  				session.neg.Close()
 922  			}
 923  			delete(m.sessions, key)
 924  			removed++
 925  		}
 926  	}
 927  	return removed
 928  }
 929  
 930  // Ensure chk is used
 931  var _ = chk.E
 932