manager.go raw

   1  // Package cluster provides cluster replication with persistent state
   2  package cluster
   3  
   4  import (
   5  	"context"
   6  	"encoding/binary"
   7  	"encoding/json"
   8  	"fmt"
   9  	"net/http"
  10  	gosync "sync"
  11  	"time"
  12  
  13  	"github.com/dgraph-io/badger/v4"
  14  	"next.orly.dev/pkg/nostr/crypto/keys"
  15  	"next.orly.dev/pkg/nostr/encoders/event"
  16  	"next.orly.dev/pkg/nostr/encoders/hex"
  17  	"next.orly.dev/pkg/nostr/encoders/kind"
  18  	"next.orly.dev/pkg/lol/log"
  19  	"next.orly.dev/pkg/database"
  20  	"next.orly.dev/pkg/database/indexes/types"
  21  	"next.orly.dev/pkg/sync/common"
  22  )
  23  
  24  // EventPublisher is an interface for publishing events
  25  type EventPublisher interface {
  26  	Deliver(*event.E)
  27  }
  28  
  29  // Manager handles cluster replication between relay instances
  30  type Manager struct {
  31  	ctx                       context.Context
  32  	cancel                    context.CancelFunc
  33  	db                        *database.D
  34  	adminNpubs                []string
  35  	relayIdentityPubkey       string                // Our relay's identity pubkey (hex)
  36  	selfURLs                  map[string]bool       // URLs discovered to be ourselves (for fast lookups)
  37  	members                   map[string]*Member    // keyed by relay URL
  38  	membersMux                gosync.RWMutex
  39  	pollTicker                *time.Ticker
  40  	pollDone                  chan struct{}
  41  	httpClient                *http.Client
  42  	propagatePrivilegedEvents bool
  43  	publisher                 EventPublisher
  44  	nip11Cache                *common.NIP11Cache
  45  }
  46  
  47  // Member represents a cluster member
  48  type Member struct {
  49  	HTTPURL      string
  50  	WebSocketURL string
  51  	LastSerial   uint64
  52  	LastPoll     time.Time
  53  	Status       string // "active", "error", "unknown"
  54  	ErrorCount   int
  55  }
  56  
  57  // LatestSerialResponse returns the latest serial
  58  type LatestSerialResponse struct {
  59  	Serial    uint64 `json:"serial"`
  60  	Timestamp int64  `json:"timestamp"`
  61  }
  62  
  63  // EventsRangeResponse contains events in a range
  64  type EventsRangeResponse struct {
  65  	Events   []EventInfo `json:"events"`
  66  	HasMore  bool        `json:"has_more"`
  67  	NextFrom uint64      `json:"next_from,omitempty"`
  68  }
  69  
  70  // EventInfo contains metadata about an event
  71  type EventInfo struct {
  72  	Serial    uint64 `json:"serial"`
  73  	ID        string `json:"id"`
  74  	Timestamp int64  `json:"timestamp"`
  75  }
  76  
  77  // Config holds configuration for the cluster manager
  78  type Config struct {
  79  	AdminNpubs                []string
  80  	PropagatePrivilegedEvents bool
  81  	PollInterval              time.Duration
  82  	NIP11CacheTTL             time.Duration
  83  }
  84  
  85  // DefaultConfig returns default configuration
  86  func DefaultConfig() *Config {
  87  	return &Config{
  88  		PropagatePrivilegedEvents: true,
  89  		PollInterval:              5 * time.Second,
  90  		NIP11CacheTTL:             30 * time.Minute,
  91  	}
  92  }
  93  
  94  // NewManager creates a new cluster manager
  95  func NewManager(ctx context.Context, db *database.D, cfg *Config, publisher EventPublisher) *Manager {
  96  	ctx, cancel := context.WithCancel(ctx)
  97  
  98  	if cfg == nil {
  99  		cfg = DefaultConfig()
 100  	}
 101  
 102  	// Get our relay identity pubkey
 103  	var relayPubkey string
 104  	if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
 105  		if pk, err := keys.SecretBytesToPubKeyHex(skb); err == nil {
 106  			relayPubkey = pk
 107  		}
 108  	}
 109  
 110  	cm := &Manager{
 111  		ctx:                       ctx,
 112  		cancel:                    cancel,
 113  		db:                        db,
 114  		adminNpubs:                cfg.AdminNpubs,
 115  		relayIdentityPubkey:       relayPubkey,
 116  		selfURLs:                  make(map[string]bool),
 117  		members:                   make(map[string]*Member),
 118  		pollDone:                  make(chan struct{}),
 119  		propagatePrivilegedEvents: cfg.PropagatePrivilegedEvents,
 120  		publisher:                 publisher,
 121  		httpClient: &http.Client{
 122  			Timeout: 30 * time.Second,
 123  		},
 124  		nip11Cache: common.NewNIP11Cache(cfg.NIP11CacheTTL),
 125  	}
 126  
 127  	return cm
 128  }
 129  
 130  // Start starts the cluster polling loop
 131  func (cm *Manager) Start() {
 132  	log.I.Ln("starting cluster replication manager")
 133  
 134  	// Load persisted peer state from database
 135  	if err := cm.loadPeerState(); err != nil {
 136  		log.W.F("failed to load cluster peer state: %v", err)
 137  	}
 138  
 139  	cm.pollTicker = time.NewTicker(5 * time.Second)
 140  	go cm.pollingLoop()
 141  }
 142  
 143  // Stop stops the cluster polling loop
 144  func (cm *Manager) Stop() {
 145  	log.I.Ln("stopping cluster replication manager")
 146  	cm.cancel()
 147  	if cm.pollTicker != nil {
 148  		cm.pollTicker.Stop()
 149  	}
 150  	<-cm.pollDone
 151  }
 152  
 153  // GetRelayIdentityPubkey returns the relay's identity pubkey
 154  func (cm *Manager) GetRelayIdentityPubkey() string {
 155  	return cm.relayIdentityPubkey
 156  }
 157  
 158  // GetMembers returns a copy of the current members
 159  func (cm *Manager) GetMembers() []*Member {
 160  	cm.membersMux.RLock()
 161  	defer cm.membersMux.RUnlock()
 162  	members := make([]*Member, 0, len(cm.members))
 163  	for _, m := range cm.members {
 164  		memberCopy := *m
 165  		members = append(members, &memberCopy)
 166  	}
 167  	return members
 168  }
 169  
 170  // GetMember returns a specific member by URL or nil if not found
 171  func (cm *Manager) GetMember(httpURL string) *Member {
 172  	cm.membersMux.RLock()
 173  	defer cm.membersMux.RUnlock()
 174  	if m, ok := cm.members[httpURL]; ok {
 175  		memberCopy := *m
 176  		return &memberCopy
 177  	}
 178  	return nil
 179  }
 180  
 181  // GetLatestSerial returns the latest serial and timestamp from the database
 182  func (cm *Manager) GetLatestSerial() (uint64, int64) {
 183  	serial, err := cm.getLatestSerialFromDB()
 184  	if err != nil {
 185  		return 0, time.Now().Unix()
 186  	}
 187  	return serial, time.Now().Unix()
 188  }
 189  
 190  // PropagatePrivilegedEvents returns whether privileged events should be propagated
 191  func (cm *Manager) PropagatePrivilegedEvents() bool {
 192  	return cm.propagatePrivilegedEvents
 193  }
 194  
 195  // GetEventsInRange returns events in a serial range with pagination
 196  func (cm *Manager) GetEventsInRange(from, to uint64, limit int) ([]EventInfo, bool, uint64) {
 197  	events, hasMore, nextFrom, err := cm.getEventsInRangeFromDB(from, to, limit)
 198  	if err != nil {
 199  		return nil, false, 0
 200  	}
 201  	return events, hasMore, nextFrom
 202  }
 203  
 204  // IsSelfURL checks if a URL is our own relay
 205  func (cm *Manager) IsSelfURL(url string) bool {
 206  	cm.membersMux.RLock()
 207  	result := cm.selfURLs[url]
 208  	cm.membersMux.RUnlock()
 209  	return result
 210  }
 211  
 212  // MarkSelfURL marks a URL as belonging to us
 213  func (cm *Manager) MarkSelfURL(url string) {
 214  	cm.membersMux.Lock()
 215  	cm.selfURLs[url] = true
 216  	cm.membersMux.Unlock()
 217  }
 218  
 219  func (cm *Manager) pollingLoop() {
 220  	defer close(cm.pollDone)
 221  
 222  	for {
 223  		select {
 224  		case <-cm.ctx.Done():
 225  			return
 226  		case <-cm.pollTicker.C:
 227  			cm.pollAllMembers()
 228  		}
 229  	}
 230  }
 231  
 232  func (cm *Manager) pollAllMembers() {
 233  	cm.membersMux.RLock()
 234  	members := make([]*Member, 0, len(cm.members))
 235  	for _, member := range cm.members {
 236  		members = append(members, member)
 237  	}
 238  	cm.membersMux.RUnlock()
 239  
 240  	for _, member := range members {
 241  		go cm.pollMember(member)
 242  	}
 243  }
 244  
 245  func (cm *Manager) pollMember(member *Member) {
 246  	// Get latest serial from peer
 247  	latestResp, err := cm.getLatestSerial(member.HTTPURL)
 248  	if err != nil {
 249  		log.W.F("failed to get latest serial from %s: %v", member.HTTPURL, err)
 250  		cm.updateMemberStatus(member, "error")
 251  		return
 252  	}
 253  
 254  	cm.updateMemberStatus(member, "active")
 255  	member.LastPoll = time.Now()
 256  
 257  	// Check if we need to fetch new events
 258  	if latestResp.Serial <= member.LastSerial {
 259  		return // No new events
 260  	}
 261  
 262  	// Fetch events in range
 263  	from := member.LastSerial + 1
 264  	to := latestResp.Serial
 265  
 266  	eventsResp, err := cm.getEventsInRange(member.HTTPURL, from, to, 1000)
 267  	if err != nil {
 268  		log.W.F("failed to get events from %s: %v", member.HTTPURL, err)
 269  		return
 270  	}
 271  
 272  	// Process fetched events
 273  	for _, eventInfo := range eventsResp.Events {
 274  		if cm.shouldFetchEvent(eventInfo) {
 275  			// Fetch full event via WebSocket and store it
 276  			if err := cm.fetchAndStoreEvent(member.WebSocketURL, eventInfo.ID, cm.publisher); err != nil {
 277  				log.W.F("failed to fetch/store event %s from %s: %v", eventInfo.ID, member.HTTPURL, err)
 278  			} else {
 279  				log.D.F("successfully replicated event %s from %s", eventInfo.ID, member.HTTPURL)
 280  			}
 281  		}
 282  	}
 283  
 284  	// Update last serial if we processed all events
 285  	if !eventsResp.HasMore && member.LastSerial != to {
 286  		member.LastSerial = to
 287  		// Persist the updated serial to database
 288  		if err := cm.savePeerState(member.HTTPURL, to); err != nil {
 289  			log.W.F("failed to persist serial %d for peer %s: %v", to, member.HTTPURL, err)
 290  		}
 291  	}
 292  }
 293  
 294  func (cm *Manager) getLatestSerial(peerURL string) (*LatestSerialResponse, error) {
 295  	url := fmt.Sprintf("%s/cluster/latest", peerURL)
 296  	resp, err := cm.httpClient.Get(url)
 297  	if err != nil {
 298  		return nil, err
 299  	}
 300  	defer resp.Body.Close()
 301  
 302  	if resp.StatusCode != http.StatusOK {
 303  		return nil, fmt.Errorf("HTTP %d", resp.StatusCode)
 304  	}
 305  
 306  	var result LatestSerialResponse
 307  	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
 308  		return nil, err
 309  	}
 310  
 311  	return &result, nil
 312  }
 313  
 314  func (cm *Manager) getEventsInRange(peerURL string, from, to uint64, limit int) (*EventsRangeResponse, error) {
 315  	url := fmt.Sprintf("%s/cluster/events?from=%d&to=%d&limit=%d", peerURL, from, to, limit)
 316  	resp, err := cm.httpClient.Get(url)
 317  	if err != nil {
 318  		return nil, err
 319  	}
 320  	defer resp.Body.Close()
 321  
 322  	if resp.StatusCode != http.StatusOK {
 323  		return nil, fmt.Errorf("HTTP %d", resp.StatusCode)
 324  	}
 325  
 326  	var result EventsRangeResponse
 327  	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
 328  		return nil, err
 329  	}
 330  
 331  	return &result, nil
 332  }
 333  
 334  func (cm *Manager) shouldFetchEvent(eventInfo EventInfo) bool {
 335  	// Relays MAY choose not to store every event they receive
 336  	// For now, accept all events
 337  	return true
 338  }
 339  
 340  func (cm *Manager) updateMemberStatus(member *Member, status string) {
 341  	member.Status = status
 342  	if status == "error" {
 343  		member.ErrorCount++
 344  	} else {
 345  		member.ErrorCount = 0
 346  	}
 347  }
 348  
 349  // UpdateMembership updates the cluster membership
 350  func (cm *Manager) UpdateMembership(relayURLs []string) {
 351  	cm.membersMux.Lock()
 352  	defer cm.membersMux.Unlock()
 353  
 354  	// Remove members not in the new list
 355  	for url := range cm.members {
 356  		found := false
 357  		for _, newURL := range relayURLs {
 358  			if newURL == url {
 359  				found = true
 360  				break
 361  			}
 362  		}
 363  		if !found {
 364  			delete(cm.members, url)
 365  			// Remove persisted state for removed peer
 366  			if err := cm.removePeerState(url); err != nil {
 367  				log.W.F("failed to remove persisted state for peer %s: %v", url, err)
 368  			}
 369  			log.D.F("removed cluster member: %s", url)
 370  		}
 371  	}
 372  
 373  	// Add new members (filter out self once at this point)
 374  	for _, url := range relayURLs {
 375  		// Skip if already exists
 376  		if _, exists := cm.members[url]; exists {
 377  			continue
 378  		}
 379  
 380  		// Fast path: check if we already know this URL is ours
 381  		if cm.selfURLs[url] {
 382  			log.D.F("removed self from cluster members (known URL): %s", url)
 383  			continue
 384  		}
 385  
 386  		// Slow path: check via NIP-11 pubkey
 387  		if cm.relayIdentityPubkey != "" {
 388  			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 389  			peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, url)
 390  			cancel()
 391  
 392  			if err != nil {
 393  				log.D.F("couldn't fetch NIP-11 for %s, adding to cluster anyway: %v", url, err)
 394  			} else if peerPubkey == cm.relayIdentityPubkey {
 395  				log.D.F("removed self from cluster members (discovered): %s (pubkey: %s)", url, cm.relayIdentityPubkey)
 396  				// Cache this URL as ours for future fast lookups
 397  				cm.selfURLs[url] = true
 398  				continue
 399  			}
 400  		}
 401  
 402  		// Add member
 403  		member := &Member{
 404  			HTTPURL:      url,
 405  			WebSocketURL: url, // TODO: Convert to WebSocket URL
 406  			LastSerial:   0,
 407  			Status:       "unknown",
 408  		}
 409  		cm.members[url] = member
 410  		log.D.F("added cluster member: %s", url)
 411  	}
 412  }
 413  
 414  // HandleMembershipEvent processes a cluster membership event (Kind 39108)
 415  func (cm *Manager) HandleMembershipEvent(ev *event.E) error {
 416  	// Verify the event is signed by a cluster admin
 417  	adminFound := false
 418  	for _, adminNpub := range cm.adminNpubs {
 419  		// TODO: Convert adminNpub to pubkey and verify signature
 420  		// For now, accept all events (this should be properly validated)
 421  		_ = adminNpub // Mark as used to avoid compiler warning
 422  		adminFound = true
 423  		break
 424  	}
 425  
 426  	if !adminFound {
 427  		return fmt.Errorf("event not signed by cluster admin")
 428  	}
 429  
 430  	// Parse the relay URLs from the tags
 431  	var relayURLs []string
 432  	for _, tag := range *ev.Tags {
 433  		if len(tag.T) >= 2 && string(tag.T[0]) == "relay" {
 434  			relayURLs = append(relayURLs, string(tag.T[1]))
 435  		}
 436  	}
 437  
 438  	if len(relayURLs) == 0 {
 439  		return fmt.Errorf("no relay URLs found in membership event")
 440  	}
 441  
 442  	// Update cluster membership
 443  	cm.UpdateMembership(relayURLs)
 444  
 445  	log.D.F("updated cluster membership with %d relays from event %x", len(relayURLs), ev.ID)
 446  
 447  	return nil
 448  }
 449  
 450  // HTTP Handlers
 451  
 452  // HandleLatestSerial handles GET /cluster/latest
 453  func (cm *Manager) HandleLatestSerial(w http.ResponseWriter, r *http.Request) {
 454  	if r.Method != http.MethodGet {
 455  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 456  		return
 457  	}
 458  
 459  	// Check if request is from ourselves by examining the Referer or Origin header
 460  	origin := r.Header.Get("Origin")
 461  	referer := r.Header.Get("Referer")
 462  
 463  	if cm.relayIdentityPubkey != "" && (origin != "" || referer != "") {
 464  		checkURL := origin
 465  		if checkURL == "" {
 466  			checkURL = referer
 467  		}
 468  
 469  		// Fast path: check known self-URLs
 470  		if cm.selfURLs[checkURL] {
 471  			log.D.F("rejecting cluster latest request from self (known URL): %s", checkURL)
 472  			http.Error(w, "Cannot sync with self", http.StatusBadRequest)
 473  			return
 474  		}
 475  
 476  		// Slow path: verify via NIP-11
 477  		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 478  		peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, checkURL)
 479  		cancel()
 480  
 481  		if err == nil && peerPubkey == cm.relayIdentityPubkey {
 482  			log.D.F("rejecting cluster latest request from self (discovered): %s", checkURL)
 483  			// Cache for future fast lookups
 484  			cm.membersMux.Lock()
 485  			cm.selfURLs[checkURL] = true
 486  			cm.membersMux.Unlock()
 487  			http.Error(w, "Cannot sync with self", http.StatusBadRequest)
 488  			return
 489  		}
 490  	}
 491  
 492  	// Get the latest serial from database
 493  	latestSerial, err := cm.getLatestSerialFromDB()
 494  	if err != nil {
 495  		log.W.F("failed to get latest serial: %v", err)
 496  		http.Error(w, "Internal server error", http.StatusInternalServerError)
 497  		return
 498  	}
 499  
 500  	response := LatestSerialResponse{
 501  		Serial:    latestSerial,
 502  		Timestamp: time.Now().Unix(),
 503  	}
 504  
 505  	w.Header().Set("Content-Type", "application/json")
 506  	json.NewEncoder(w).Encode(response)
 507  }
 508  
 509  // HandleEventsRange handles GET /cluster/events
 510  func (cm *Manager) HandleEventsRange(w http.ResponseWriter, r *http.Request) {
 511  	if r.Method != http.MethodGet {
 512  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 513  		return
 514  	}
 515  
 516  	// Check if request is from ourselves
 517  	origin := r.Header.Get("Origin")
 518  	referer := r.Header.Get("Referer")
 519  
 520  	if cm.relayIdentityPubkey != "" && (origin != "" || referer != "") {
 521  		checkURL := origin
 522  		if checkURL == "" {
 523  			checkURL = referer
 524  		}
 525  
 526  		// Fast path: check known self-URLs
 527  		if cm.selfURLs[checkURL] {
 528  			log.D.F("rejecting cluster events request from self (known URL): %s", checkURL)
 529  			http.Error(w, "Cannot sync with self", http.StatusBadRequest)
 530  			return
 531  		}
 532  
 533  		// Slow path: verify via NIP-11
 534  		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 535  		peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, checkURL)
 536  		cancel()
 537  
 538  		if err == nil && peerPubkey == cm.relayIdentityPubkey {
 539  			log.D.F("rejecting cluster events request from self (discovered): %s", checkURL)
 540  			cm.membersMux.Lock()
 541  			cm.selfURLs[checkURL] = true
 542  			cm.membersMux.Unlock()
 543  			http.Error(w, "Cannot sync with self", http.StatusBadRequest)
 544  			return
 545  		}
 546  	}
 547  
 548  	// Parse query parameters
 549  	fromStr := r.URL.Query().Get("from")
 550  	toStr := r.URL.Query().Get("to")
 551  	limitStr := r.URL.Query().Get("limit")
 552  
 553  	from := uint64(0)
 554  	to := uint64(0)
 555  	limit := 1000
 556  
 557  	if fromStr != "" {
 558  		fmt.Sscanf(fromStr, "%d", &from)
 559  	}
 560  	if toStr != "" {
 561  		fmt.Sscanf(toStr, "%d", &to)
 562  	}
 563  	if limitStr != "" {
 564  		fmt.Sscanf(limitStr, "%d", &limit)
 565  		if limit > 10000 {
 566  			limit = 10000
 567  		}
 568  	}
 569  
 570  	// Get events in range
 571  	events, hasMore, nextFrom, err := cm.getEventsInRangeFromDB(from, to, limit)
 572  	if err != nil {
 573  		log.W.F("failed to get events in range: %v", err)
 574  		http.Error(w, "Internal server error", http.StatusInternalServerError)
 575  		return
 576  	}
 577  
 578  	response := EventsRangeResponse{
 579  		Events:   events,
 580  		HasMore:  hasMore,
 581  		NextFrom: nextFrom,
 582  	}
 583  
 584  	w.Header().Set("Content-Type", "application/json")
 585  	json.NewEncoder(w).Encode(response)
 586  }
 587  
 588  func (cm *Manager) getLatestSerialFromDB() (uint64, error) {
 589  	var maxSerial uint64 = 0
 590  
 591  	err := cm.db.View(func(txn *badger.Txn) error {
 592  		it := txn.NewIterator(badger.IteratorOptions{
 593  			Reverse: true,
 594  			Prefix:  []byte{0},
 595  		})
 596  		defer it.Close()
 597  
 598  		it.Seek([]byte{0})
 599  		if it.Valid() {
 600  			key := it.Item().Key()
 601  			if len(key) >= 5 {
 602  				serial := binary.BigEndian.Uint64(key[len(key)-8:]) >> 24
 603  				if serial > maxSerial {
 604  					maxSerial = serial
 605  				}
 606  			}
 607  		}
 608  
 609  		return nil
 610  	})
 611  
 612  	return maxSerial, err
 613  }
 614  
 615  func (cm *Manager) getEventsInRangeFromDB(from, to uint64, limit int) ([]EventInfo, bool, uint64, error) {
 616  	var events []EventInfo
 617  	var hasMore bool
 618  	var nextFrom uint64
 619  
 620  	fromSerial := &types.Uint40{}
 621  	toSerial := &types.Uint40{}
 622  
 623  	if err := fromSerial.Set(from); err != nil {
 624  		return nil, false, 0, err
 625  	}
 626  	if err := toSerial.Set(to); err != nil {
 627  		return nil, false, 0, err
 628  	}
 629  
 630  	err := cm.db.View(func(txn *badger.Txn) error {
 631  		it := txn.NewIterator(badger.IteratorOptions{
 632  			Prefix: []byte{0},
 633  		})
 634  		defer it.Close()
 635  
 636  		count := 0
 637  		it.Seek([]byte{0})
 638  
 639  		for it.Valid() && count < limit {
 640  			key := it.Item().Key()
 641  
 642  			if len(key) >= 8 && key[0] == 0 && key[1] == 0 && key[2] == 0 {
 643  				if len(key) >= 8 {
 644  					serial := binary.BigEndian.Uint64(key[len(key)-8:]) >> 24
 645  
 646  					if serial >= from && serial <= to {
 647  						serial40 := &types.Uint40{}
 648  						if err := serial40.Set(serial); err != nil {
 649  							continue
 650  						}
 651  
 652  						ev, err := cm.db.FetchEventBySerial(serial40)
 653  						if err != nil {
 654  							continue
 655  						}
 656  
 657  						shouldPropagate := true
 658  						if !cm.propagatePrivilegedEvents && kind.IsPrivileged(ev.Kind) {
 659  							shouldPropagate = false
 660  						}
 661  
 662  						if shouldPropagate {
 663  							events = append(events, EventInfo{
 664  								Serial:    serial,
 665  								ID:        hex.Enc(ev.ID),
 666  								Timestamp: ev.CreatedAt,
 667  							})
 668  							count++
 669  						}
 670  
 671  						ev.Free()
 672  					}
 673  				}
 674  			}
 675  
 676  			it.Next()
 677  		}
 678  
 679  		if it.Valid() {
 680  			hasMore = true
 681  			nextKey := it.Item().Key()
 682  			if len(nextKey) >= 8 && nextKey[0] == 0 && nextKey[1] == 0 && nextKey[2] == 0 {
 683  				nextSerial := binary.BigEndian.Uint64(nextKey[len(nextKey)-8:]) >> 24
 684  				nextFrom = nextSerial
 685  			}
 686  		}
 687  
 688  		return nil
 689  	})
 690  
 691  	return events, hasMore, nextFrom, err
 692  }
 693  
 694  func (cm *Manager) fetchAndStoreEvent(wsURL, eventID string, publisher EventPublisher) error {
 695  	// TODO: Implement WebSocket connection and event fetching
 696  	log.D.F("fetchAndStoreEvent called for %s from %s (placeholder implementation)", eventID, wsURL)
 697  	return nil
 698  }
 699  
 700  // Database key prefixes for cluster state persistence
 701  const (
 702  	clusterPeerStatePrefix = "cluster:peer:"
 703  )
 704  
 705  func (cm *Manager) loadPeerState() error {
 706  	cm.membersMux.Lock()
 707  	defer cm.membersMux.Unlock()
 708  
 709  	prefix := []byte(clusterPeerStatePrefix)
 710  	return cm.db.View(func(txn *badger.Txn) error {
 711  		it := txn.NewIterator(badger.IteratorOptions{
 712  			Prefix: prefix,
 713  		})
 714  		defer it.Close()
 715  
 716  		for it.Rewind(); it.Valid(); it.Next() {
 717  			item := it.Item()
 718  			key := item.Key()
 719  
 720  			peerURL := string(key[len(prefix):])
 721  
 722  			var serial uint64
 723  			err := item.Value(func(val []byte) error {
 724  				if len(val) == 8 {
 725  					serial = binary.BigEndian.Uint64(val)
 726  				}
 727  				return nil
 728  			})
 729  			if err != nil {
 730  				log.W.F("failed to read peer state for %s: %v", peerURL, err)
 731  				continue
 732  			}
 733  
 734  			if member, exists := cm.members[peerURL]; exists {
 735  				member.LastSerial = serial
 736  				log.D.F("loaded persisted serial %d for existing peer %s", serial, peerURL)
 737  			} else {
 738  				member := &Member{
 739  					HTTPURL:      peerURL,
 740  					WebSocketURL: peerURL,
 741  					LastSerial:   serial,
 742  					Status:       "unknown",
 743  				}
 744  				cm.members[peerURL] = member
 745  				log.D.F("loaded persisted serial %d for new peer %s", serial, peerURL)
 746  			}
 747  		}
 748  		return nil
 749  	})
 750  }
 751  
 752  func (cm *Manager) savePeerState(peerURL string, serial uint64) error {
 753  	key := []byte(clusterPeerStatePrefix + peerURL)
 754  	value := make([]byte, 8)
 755  	binary.BigEndian.PutUint64(value, serial)
 756  
 757  	return cm.db.Update(func(txn *badger.Txn) error {
 758  		return txn.Set(key, value)
 759  	})
 760  }
 761  
 762  func (cm *Manager) removePeerState(peerURL string) error {
 763  	key := []byte(clusterPeerStatePrefix + peerURL)
 764  
 765  	return cm.db.Update(func(txn *badger.Txn) error {
 766  		return txn.Delete(key)
 767  	})
 768  }
 769