manager.go raw

   1  // Package distributed provides serial-based peer-to-peer synchronization
   2  package distributed
   3  
   4  import (
   5  	"bytes"
   6  	"context"
   7  	"encoding/json"
   8  	"fmt"
   9  	"net/http"
  10  	"strings"
  11  	gosync "sync"
  12  	"time"
  13  
  14  	"next.orly.dev/pkg/nostr/encoders/event"
  15  	"next.orly.dev/pkg/nostr/encoders/filter"
  16  	"next.orly.dev/pkg/nostr/encoders/hex"
  17  	"next.orly.dev/pkg/nostr/encoders/tag"
  18  	"next.orly.dev/pkg/lol/log"
  19  	"next.orly.dev/pkg/database"
  20  	"next.orly.dev/pkg/sync/common"
  21  )
  22  
  23  // PolicyChecker is an interface for checking event policies
  24  type PolicyChecker interface {
  25  	CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error)
  26  }
  27  
  28  // RelayGroupConfigProvider provides relay group configuration
  29  type RelayGroupConfigProvider interface {
  30  	FindAuthoritativeConfig(ctx context.Context) ([]string, error)
  31  }
  32  
  33  // Manager handles distributed synchronization between relay peers using serial numbers as clocks
  34  type Manager struct {
  35  	ctx           context.Context
  36  	cancel        context.CancelFunc
  37  	db            *database.D
  38  	nodeID        string
  39  	relayURL      string
  40  	peers         []string
  41  	selfURLs      map[string]bool   // URLs discovered to be ourselves (for fast lookups)
  42  	currentSerial uint64
  43  	peerSerials   map[string]uint64 // peer URL -> latest serial seen
  44  	nip11Cache    *common.NIP11Cache
  45  	policyManager PolicyChecker
  46  	mutex         gosync.RWMutex
  47  }
  48  
  49  // CurrentRequest represents a request for the current serial number
  50  type CurrentRequest struct {
  51  	NodeID   string `json:"node_id"`
  52  	RelayURL string `json:"relay_url"`
  53  }
  54  
  55  // CurrentResponse returns the current serial number
  56  type CurrentResponse struct {
  57  	NodeID   string `json:"node_id"`
  58  	RelayURL string `json:"relay_url"`
  59  	Serial   uint64 `json:"serial"`
  60  }
  61  
  62  // EventIDsRequest represents a request for event IDs with serials
  63  type EventIDsRequest struct {
  64  	NodeID   string `json:"node_id"`
  65  	RelayURL string `json:"relay_url"`
  66  	From     uint64 `json:"from"`
  67  	To       uint64 `json:"to"`
  68  }
  69  
  70  // EventIDsResponse contains event IDs mapped to their serial numbers
  71  type EventIDsResponse struct {
  72  	EventMap map[string]uint64 `json:"event_map"` // event_id -> serial
  73  }
  74  
  75  // Config holds configuration for the distributed sync manager
  76  type Config struct {
  77  	NodeID        string
  78  	RelayURL      string
  79  	Peers         []string
  80  	SyncInterval  time.Duration
  81  	NIP11CacheTTL time.Duration
  82  }
  83  
  84  // DefaultConfig returns default configuration
  85  func DefaultConfig() *Config {
  86  	return &Config{
  87  		SyncInterval:  5 * time.Second,
  88  		NIP11CacheTTL: 30 * time.Minute,
  89  	}
  90  }
  91  
  92  // NewManager creates a new sync manager
  93  func NewManager(ctx context.Context, db *database.D, cfg *Config, policyManager PolicyChecker) *Manager {
  94  	ctx, cancel := context.WithCancel(ctx)
  95  
  96  	if cfg == nil {
  97  		cfg = DefaultConfig()
  98  	}
  99  
 100  	m := &Manager{
 101  		ctx:           ctx,
 102  		cancel:        cancel,
 103  		db:            db,
 104  		nodeID:        cfg.NodeID,
 105  		relayURL:      cfg.RelayURL,
 106  		peers:         cfg.Peers,
 107  		selfURLs:      make(map[string]bool),
 108  		currentSerial: 0,
 109  		peerSerials:   make(map[string]uint64),
 110  		nip11Cache:    common.NewNIP11Cache(cfg.NIP11CacheTTL),
 111  		policyManager: policyManager,
 112  	}
 113  
 114  	// Add our configured relay URL to self-URLs cache if provided
 115  	if m.relayURL != "" {
 116  		m.selfURLs[m.relayURL] = true
 117  	}
 118  
 119  	// Remove self from peer list once at startup if we have a nodeID
 120  	if m.nodeID != "" {
 121  		filteredPeers := make([]string, 0, len(m.peers))
 122  		for _, peerURL := range m.peers {
 123  			// Fast path: check if we already know this URL is ours
 124  			if m.selfURLs[peerURL] {
 125  				log.D.F("removed self from sync peer list (known URL): %s", peerURL)
 126  				continue
 127  			}
 128  
 129  			// Slow path: check via NIP-11 pubkey
 130  			pctx, pcancel := context.WithTimeout(context.Background(), 5*time.Second)
 131  			peerPubkey, err := m.nip11Cache.GetPubkey(pctx, peerURL)
 132  			pcancel()
 133  
 134  			if err != nil {
 135  				log.D.F("couldn't fetch NIP-11 for %s, keeping in peer list: %v", peerURL, err)
 136  				filteredPeers = append(filteredPeers, peerURL)
 137  				continue
 138  			}
 139  
 140  			if peerPubkey == m.nodeID {
 141  				log.D.F("removed self from sync peer list (discovered): %s (pubkey: %s)", peerURL, m.nodeID)
 142  				// Cache this URL as ours for future fast lookups
 143  				m.selfURLs[peerURL] = true
 144  				continue
 145  			}
 146  
 147  			filteredPeers = append(filteredPeers, peerURL)
 148  		}
 149  		m.peers = filteredPeers
 150  	}
 151  
 152  	// Start sync routine
 153  	go m.syncRoutine()
 154  
 155  	return m
 156  }
 157  
 158  // Stop stops the sync manager
 159  func (m *Manager) Stop() {
 160  	m.cancel()
 161  }
 162  
 163  // UpdatePeers updates the peer list from relay group configuration
 164  func (m *Manager) UpdatePeers(newPeers []string) {
 165  	m.mutex.Lock()
 166  	defer m.mutex.Unlock()
 167  	m.peers = newPeers
 168  	log.D.F("updated peer list to %d peers", len(newPeers))
 169  }
 170  
 171  // IsAuthorizedPeer checks if a peer is authorized by validating its NIP-11 pubkey
 172  func (m *Manager) IsAuthorizedPeer(peerURL string, expectedPubkey string) bool {
 173  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 174  	defer cancel()
 175  
 176  	peerPubkey, err := m.nip11Cache.GetPubkey(ctx, peerURL)
 177  	if err != nil {
 178  		log.D.F("failed to fetch NIP-11 pubkey for %s: %v", peerURL, err)
 179  		return false
 180  	}
 181  
 182  	return peerPubkey == expectedPubkey
 183  }
 184  
 185  // GetPeerPubkey fetches and caches the pubkey for a peer relay
 186  func (m *Manager) GetPeerPubkey(peerURL string) (string, error) {
 187  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 188  	defer cancel()
 189  
 190  	return m.nip11Cache.GetPubkey(ctx, peerURL)
 191  }
 192  
 193  // GetCurrentSerial returns the current serial number
 194  func (m *Manager) GetCurrentSerial() uint64 {
 195  	m.mutex.RLock()
 196  	defer m.mutex.RUnlock()
 197  	return m.currentSerial
 198  }
 199  
 200  // GetPeers returns a copy of the current peer list
 201  func (m *Manager) GetPeers() []string {
 202  	m.mutex.RLock()
 203  	defer m.mutex.RUnlock()
 204  	peers := make([]string, len(m.peers))
 205  	copy(peers, m.peers)
 206  	return peers
 207  }
 208  
 209  // GetNodeID returns the node's identity
 210  func (m *Manager) GetNodeID() string {
 211  	return m.nodeID
 212  }
 213  
 214  // GetRelayURL returns the relay's URL
 215  func (m *Manager) GetRelayURL() string {
 216  	return m.relayURL
 217  }
 218  
 219  // UpdateSerial updates the current serial number when a new event is stored
 220  func (m *Manager) UpdateSerial() {
 221  	m.mutex.Lock()
 222  	defer m.mutex.Unlock()
 223  
 224  	// Get the latest serial from database
 225  	if latest, err := m.getLatestSerial(); err == nil {
 226  		m.currentSerial = latest
 227  	}
 228  }
 229  
 230  // NotifyNewEvent notifies the manager of a new event
 231  func (m *Manager) NotifyNewEvent(eventID []byte, serial uint64) {
 232  	m.mutex.Lock()
 233  	defer m.mutex.Unlock()
 234  	if serial > m.currentSerial {
 235  		m.currentSerial = serial
 236  	}
 237  }
 238  
 239  // IsSelfURL checks if a URL is our own relay
 240  func (m *Manager) IsSelfURL(url string) bool {
 241  	m.mutex.RLock()
 242  	if m.selfURLs[url] {
 243  		m.mutex.RUnlock()
 244  		return true
 245  	}
 246  	m.mutex.RUnlock()
 247  	return false
 248  }
 249  
 250  // MarkSelfURL marks a URL as belonging to us
 251  func (m *Manager) MarkSelfURL(url string) {
 252  	m.mutex.Lock()
 253  	m.selfURLs[url] = true
 254  	m.mutex.Unlock()
 255  }
 256  
 257  // IsSelfNodeID checks if a node ID matches ours
 258  func (m *Manager) IsSelfNodeID(nodeID string) bool {
 259  	return nodeID != "" && nodeID == m.nodeID
 260  }
 261  
 262  // getLatestSerial gets the latest serial number from the database
 263  func (m *Manager) getLatestSerial() (uint64, error) {
 264  	// This is a simplified implementation
 265  	// In practice, you'd want to track the highest serial number
 266  	// For now, return the current serial
 267  	return m.currentSerial, nil
 268  }
 269  
 270  // syncRoutine periodically syncs with peers sequentially
 271  func (m *Manager) syncRoutine() {
 272  	ticker := time.NewTicker(5 * time.Second) // Sync every 5 seconds
 273  	defer ticker.Stop()
 274  
 275  	for {
 276  		select {
 277  		case <-m.ctx.Done():
 278  			return
 279  		case <-ticker.C:
 280  			m.syncWithPeersSequentially()
 281  		}
 282  	}
 283  }
 284  
 285  // syncWithPeersSequentially syncs with all configured peers one at a time
 286  func (m *Manager) syncWithPeersSequentially() {
 287  	for _, peerURL := range m.peers {
 288  		// Self-peers are already filtered out during initialization/update
 289  		m.syncWithPeer(peerURL)
 290  		// Small delay between peers to avoid overwhelming
 291  		time.Sleep(100 * time.Millisecond)
 292  	}
 293  }
 294  
 295  // syncWithPeer syncs with a specific peer
 296  func (m *Manager) syncWithPeer(peerURL string) {
 297  	// Get the peer's current serial
 298  	currentReq := CurrentRequest{
 299  		NodeID:   m.nodeID,
 300  		RelayURL: m.relayURL,
 301  	}
 302  
 303  	jsonData, err := json.Marshal(currentReq)
 304  	if err != nil {
 305  		log.E.F("failed to marshal current request: %v", err)
 306  		return
 307  	}
 308  
 309  	resp, err := http.Post(peerURL+"/api/sync/current", "application/json", bytes.NewBuffer(jsonData))
 310  	if err != nil {
 311  		log.D.F("failed to get current serial from %s: %v", peerURL, err)
 312  		return
 313  	}
 314  	defer resp.Body.Close()
 315  
 316  	if resp.StatusCode != http.StatusOK {
 317  		log.D.F("current request failed with %s: status %d", peerURL, resp.StatusCode)
 318  		return
 319  	}
 320  
 321  	var currentResp CurrentResponse
 322  	if err := json.NewDecoder(resp.Body).Decode(&currentResp); err != nil {
 323  		log.E.F("failed to decode current response from %s: %v", peerURL, err)
 324  		return
 325  	}
 326  
 327  	// Check if we need to sync
 328  	peerSerial := currentResp.Serial
 329  	ourLastSeen := m.peerSerials[peerURL]
 330  
 331  	if peerSerial > ourLastSeen {
 332  		// Request event IDs for the missing range
 333  		m.requestEventIDs(peerURL, ourLastSeen+1, peerSerial)
 334  		// Update our knowledge of peer's serial
 335  		m.mutex.Lock()
 336  		m.peerSerials[peerURL] = peerSerial
 337  		m.mutex.Unlock()
 338  	}
 339  }
 340  
 341  // requestEventIDs requests event IDs for a serial range from a peer
 342  func (m *Manager) requestEventIDs(peerURL string, from, to uint64) {
 343  	req := EventIDsRequest{
 344  		NodeID:   m.nodeID,
 345  		RelayURL: m.relayURL,
 346  		From:     from,
 347  		To:       to,
 348  	}
 349  
 350  	jsonData, err := json.Marshal(req)
 351  	if err != nil {
 352  		log.E.F("failed to marshal event-ids request: %v", err)
 353  		return
 354  	}
 355  
 356  	resp, err := http.Post(peerURL+"/api/sync/event-ids", "application/json", bytes.NewBuffer(jsonData))
 357  	if err != nil {
 358  		log.E.F("failed to request event IDs from %s: %v", peerURL, err)
 359  		return
 360  	}
 361  	defer resp.Body.Close()
 362  
 363  	if resp.StatusCode != http.StatusOK {
 364  		log.E.F("event-ids request failed with %s: status %d", peerURL, resp.StatusCode)
 365  		return
 366  	}
 367  
 368  	var eventIDsResp EventIDsResponse
 369  	if err := json.NewDecoder(resp.Body).Decode(&eventIDsResp); err != nil {
 370  		log.E.F("failed to decode event-ids response from %s: %v", peerURL, err)
 371  		return
 372  	}
 373  
 374  	// Check which events we don't have and request them via websocket
 375  	missingEventIDs := m.findMissingEventIDs(eventIDsResp.EventMap)
 376  	if len(missingEventIDs) > 0 {
 377  		m.requestEventsViaWebsocket(missingEventIDs)
 378  		log.D.F("requested %d missing events from peer %s", len(missingEventIDs), peerURL)
 379  	}
 380  }
 381  
 382  // findMissingEventIDs checks which event IDs we don't have locally
 383  func (m *Manager) findMissingEventIDs(eventMap map[string]uint64) []string {
 384  	var missing []string
 385  
 386  	for eventID := range eventMap {
 387  		// Check if we have this event locally
 388  		// This is a simplified check - in practice you'd query the database
 389  		if !m.hasEventLocally(eventID) {
 390  			missing = append(missing, eventID)
 391  		}
 392  	}
 393  
 394  	return missing
 395  }
 396  
 397  // hasEventLocally checks if we have a specific event
 398  func (m *Manager) hasEventLocally(eventID string) bool {
 399  	// Convert hex event ID to bytes
 400  	eventIDBytes, err := hex.Dec(eventID)
 401  	if err != nil {
 402  		log.D.F("invalid event ID format: %s", eventID)
 403  		return false
 404  	}
 405  
 406  	// Query for the event
 407  	f := &filter.F{
 408  		Ids: tag.NewFromBytesSlice(eventIDBytes),
 409  	}
 410  
 411  	events, err := m.db.QueryEvents(context.Background(), f)
 412  	if err != nil {
 413  		log.D.F("error querying for event %s: %v", eventID, err)
 414  		return false
 415  	}
 416  
 417  	return len(events) > 0
 418  }
 419  
 420  // requestEventsViaWebsocket requests specific events via websocket from peers
 421  func (m *Manager) requestEventsViaWebsocket(eventIDs []string) {
 422  	if len(eventIDs) == 0 {
 423  		return
 424  	}
 425  
 426  	// Convert hex event IDs to bytes for websocket requests
 427  	var eventIDBytes [][]byte
 428  	for _, eventID := range eventIDs {
 429  		if evBytes, err := hex.Dec(eventID); err == nil {
 430  			eventIDBytes = append(eventIDBytes, evBytes)
 431  		}
 432  	}
 433  
 434  	if len(eventIDBytes) == 0 {
 435  		return
 436  	}
 437  
 438  	// TODO: Implement websocket connection and REQ message sending
 439  	// For now, try to request from our peers via their websocket endpoints
 440  	for _, peerURL := range m.peers {
 441  		// Convert HTTP URL to WebSocket URL
 442  		wsURL := strings.Replace(peerURL, "http://", "ws://", 1)
 443  		wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
 444  
 445  		log.D.F("would connect to %s and request %d events", wsURL, len(eventIDBytes))
 446  		// Here we would:
 447  		// 1. Establish websocket connection to peer
 448  		// 2. Send NIP-98 auth if required
 449  		// 3. Send REQ message with the filter for specific event IDs
 450  		// 4. Receive and process EVENT messages
 451  		// 5. Import received events
 452  	}
 453  
 454  	limit := 5
 455  	if len(eventIDs) < limit {
 456  		limit = len(eventIDs)
 457  	}
 458  	log.D.F("requested %d events via websocket: %v", len(eventIDs), eventIDs[:limit])
 459  }
 460  
 461  // GetEventsWithIDs retrieves events with their IDs by serial range
 462  func (m *Manager) GetEventsWithIDs(from, to uint64) (map[string]uint64, error) {
 463  	eventMap := make(map[string]uint64)
 464  
 465  	// Get event serials by serial range
 466  	serials, err := m.db.EventIdsBySerial(from, int(to-from+1))
 467  	if err != nil {
 468  		return nil, err
 469  	}
 470  
 471  	// For each serial, we need to map it to an event ID
 472  	// This is a simplified implementation - in practice we'd need to query events by serial
 473  	for i, serial := range serials {
 474  		// TODO: Implement actual event ID retrieval by serial
 475  		// For now, create placeholder event IDs based on serial
 476  		eventID := fmt.Sprintf("event_%d", serial)
 477  		eventMap[eventID] = serial
 478  		_ = i // avoid unused variable warning
 479  	}
 480  
 481  	return eventMap, nil
 482  }
 483  
 484  // GetPeerStatus returns the sync status for all peers
 485  func (m *Manager) GetPeerStatus() map[string]uint64 {
 486  	m.mutex.RLock()
 487  	defer m.mutex.RUnlock()
 488  	result := make(map[string]uint64)
 489  	for k, v := range m.peerSerials {
 490  		result[k] = v
 491  	}
 492  	return result
 493  }
 494  
 495  // HandleCurrentRequest handles requests for current serial number
 496  func (m *Manager) HandleCurrentRequest(w http.ResponseWriter, r *http.Request) {
 497  	if r.Method != http.MethodPost {
 498  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 499  		return
 500  	}
 501  
 502  	var req CurrentRequest
 503  	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
 504  		http.Error(w, "Invalid JSON", http.StatusBadRequest)
 505  		return
 506  	}
 507  
 508  	// Reject requests from ourselves (same nodeID)
 509  	if req.NodeID != "" && req.NodeID == m.nodeID {
 510  		log.D.F("rejecting sync current request from self (nodeID: %s)", req.NodeID)
 511  		// Cache the requesting relay URL as ours for future fast lookups
 512  		if req.RelayURL != "" {
 513  			m.mutex.Lock()
 514  			m.selfURLs[req.RelayURL] = true
 515  			m.mutex.Unlock()
 516  			log.D.F("cached self-URL from inbound request: %s", req.RelayURL)
 517  		}
 518  		http.Error(w, "Cannot sync with self", http.StatusBadRequest)
 519  		return
 520  	}
 521  
 522  	resp := CurrentResponse{
 523  		NodeID:   m.nodeID,
 524  		RelayURL: m.relayURL,
 525  		Serial:   m.GetCurrentSerial(),
 526  	}
 527  
 528  	w.Header().Set("Content-Type", "application/json")
 529  	json.NewEncoder(w).Encode(resp)
 530  }
 531  
 532  // HandleEventIDsRequest handles requests for event IDs with their serial numbers
 533  func (m *Manager) HandleEventIDsRequest(w http.ResponseWriter, r *http.Request) {
 534  	if r.Method != http.MethodPost {
 535  		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 536  		return
 537  	}
 538  
 539  	var req EventIDsRequest
 540  	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
 541  		http.Error(w, "Invalid JSON", http.StatusBadRequest)
 542  		return
 543  	}
 544  
 545  	// Reject requests from ourselves (same nodeID)
 546  	if req.NodeID != "" && req.NodeID == m.nodeID {
 547  		log.D.F("rejecting sync event-ids request from self (nodeID: %s)", req.NodeID)
 548  		// Cache the requesting relay URL as ours for future fast lookups
 549  		if req.RelayURL != "" {
 550  			m.mutex.Lock()
 551  			m.selfURLs[req.RelayURL] = true
 552  			m.mutex.Unlock()
 553  			log.D.F("cached self-URL from inbound request: %s", req.RelayURL)
 554  		}
 555  		http.Error(w, "Cannot sync with self", http.StatusBadRequest)
 556  		return
 557  	}
 558  
 559  	// Get events with IDs in the requested range
 560  	eventMap, err := m.GetEventsWithIDs(req.From, req.To)
 561  	if err != nil {
 562  		http.Error(w, fmt.Sprintf("Failed to get event IDs: %v", err), http.StatusInternalServerError)
 563  		return
 564  	}
 565  
 566  	resp := EventIDsResponse{
 567  		EventMap: eventMap,
 568  	}
 569  
 570  	w.Header().Set("Content-Type", "application/json")
 571  	json.NewEncoder(w).Encode(resp)
 572  }
 573