manager.go raw
1 // Package distributed provides serial-based peer-to-peer synchronization
2 package distributed
3
4 import (
5 "bytes"
6 "context"
7 "encoding/json"
8 "fmt"
9 "net/http"
10 "strings"
11 gosync "sync"
12 "time"
13
14 "next.orly.dev/pkg/nostr/encoders/event"
15 "next.orly.dev/pkg/nostr/encoders/filter"
16 "next.orly.dev/pkg/nostr/encoders/hex"
17 "next.orly.dev/pkg/nostr/encoders/tag"
18 "next.orly.dev/pkg/lol/log"
19 "next.orly.dev/pkg/database"
20 "next.orly.dev/pkg/sync/common"
21 )
22
23 // PolicyChecker is an interface for checking event policies
24 type PolicyChecker interface {
25 CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error)
26 }
27
28 // RelayGroupConfigProvider provides relay group configuration
29 type RelayGroupConfigProvider interface {
30 FindAuthoritativeConfig(ctx context.Context) ([]string, error)
31 }
32
33 // Manager handles distributed synchronization between relay peers using serial numbers as clocks
34 type Manager struct {
35 ctx context.Context
36 cancel context.CancelFunc
37 db *database.D
38 nodeID string
39 relayURL string
40 peers []string
41 selfURLs map[string]bool // URLs discovered to be ourselves (for fast lookups)
42 currentSerial uint64
43 peerSerials map[string]uint64 // peer URL -> latest serial seen
44 nip11Cache *common.NIP11Cache
45 policyManager PolicyChecker
46 mutex gosync.RWMutex
47 }
48
49 // CurrentRequest represents a request for the current serial number
50 type CurrentRequest struct {
51 NodeID string `json:"node_id"`
52 RelayURL string `json:"relay_url"`
53 }
54
55 // CurrentResponse returns the current serial number
56 type CurrentResponse struct {
57 NodeID string `json:"node_id"`
58 RelayURL string `json:"relay_url"`
59 Serial uint64 `json:"serial"`
60 }
61
62 // EventIDsRequest represents a request for event IDs with serials
63 type EventIDsRequest struct {
64 NodeID string `json:"node_id"`
65 RelayURL string `json:"relay_url"`
66 From uint64 `json:"from"`
67 To uint64 `json:"to"`
68 }
69
70 // EventIDsResponse contains event IDs mapped to their serial numbers
71 type EventIDsResponse struct {
72 EventMap map[string]uint64 `json:"event_map"` // event_id -> serial
73 }
74
75 // Config holds configuration for the distributed sync manager
76 type Config struct {
77 NodeID string
78 RelayURL string
79 Peers []string
80 SyncInterval time.Duration
81 NIP11CacheTTL time.Duration
82 }
83
84 // DefaultConfig returns default configuration
85 func DefaultConfig() *Config {
86 return &Config{
87 SyncInterval: 5 * time.Second,
88 NIP11CacheTTL: 30 * time.Minute,
89 }
90 }
91
92 // NewManager creates a new sync manager
93 func NewManager(ctx context.Context, db *database.D, cfg *Config, policyManager PolicyChecker) *Manager {
94 ctx, cancel := context.WithCancel(ctx)
95
96 if cfg == nil {
97 cfg = DefaultConfig()
98 }
99
100 m := &Manager{
101 ctx: ctx,
102 cancel: cancel,
103 db: db,
104 nodeID: cfg.NodeID,
105 relayURL: cfg.RelayURL,
106 peers: cfg.Peers,
107 selfURLs: make(map[string]bool),
108 currentSerial: 0,
109 peerSerials: make(map[string]uint64),
110 nip11Cache: common.NewNIP11Cache(cfg.NIP11CacheTTL),
111 policyManager: policyManager,
112 }
113
114 // Add our configured relay URL to self-URLs cache if provided
115 if m.relayURL != "" {
116 m.selfURLs[m.relayURL] = true
117 }
118
119 // Remove self from peer list once at startup if we have a nodeID
120 if m.nodeID != "" {
121 filteredPeers := make([]string, 0, len(m.peers))
122 for _, peerURL := range m.peers {
123 // Fast path: check if we already know this URL is ours
124 if m.selfURLs[peerURL] {
125 log.D.F("removed self from sync peer list (known URL): %s", peerURL)
126 continue
127 }
128
129 // Slow path: check via NIP-11 pubkey
130 pctx, pcancel := context.WithTimeout(context.Background(), 5*time.Second)
131 peerPubkey, err := m.nip11Cache.GetPubkey(pctx, peerURL)
132 pcancel()
133
134 if err != nil {
135 log.D.F("couldn't fetch NIP-11 for %s, keeping in peer list: %v", peerURL, err)
136 filteredPeers = append(filteredPeers, peerURL)
137 continue
138 }
139
140 if peerPubkey == m.nodeID {
141 log.D.F("removed self from sync peer list (discovered): %s (pubkey: %s)", peerURL, m.nodeID)
142 // Cache this URL as ours for future fast lookups
143 m.selfURLs[peerURL] = true
144 continue
145 }
146
147 filteredPeers = append(filteredPeers, peerURL)
148 }
149 m.peers = filteredPeers
150 }
151
152 // Start sync routine
153 go m.syncRoutine()
154
155 return m
156 }
157
158 // Stop stops the sync manager
159 func (m *Manager) Stop() {
160 m.cancel()
161 }
162
163 // UpdatePeers updates the peer list from relay group configuration
164 func (m *Manager) UpdatePeers(newPeers []string) {
165 m.mutex.Lock()
166 defer m.mutex.Unlock()
167 m.peers = newPeers
168 log.D.F("updated peer list to %d peers", len(newPeers))
169 }
170
171 // IsAuthorizedPeer checks if a peer is authorized by validating its NIP-11 pubkey
172 func (m *Manager) IsAuthorizedPeer(peerURL string, expectedPubkey string) bool {
173 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
174 defer cancel()
175
176 peerPubkey, err := m.nip11Cache.GetPubkey(ctx, peerURL)
177 if err != nil {
178 log.D.F("failed to fetch NIP-11 pubkey for %s: %v", peerURL, err)
179 return false
180 }
181
182 return peerPubkey == expectedPubkey
183 }
184
185 // GetPeerPubkey fetches and caches the pubkey for a peer relay
186 func (m *Manager) GetPeerPubkey(peerURL string) (string, error) {
187 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
188 defer cancel()
189
190 return m.nip11Cache.GetPubkey(ctx, peerURL)
191 }
192
193 // GetCurrentSerial returns the current serial number
194 func (m *Manager) GetCurrentSerial() uint64 {
195 m.mutex.RLock()
196 defer m.mutex.RUnlock()
197 return m.currentSerial
198 }
199
200 // GetPeers returns a copy of the current peer list
201 func (m *Manager) GetPeers() []string {
202 m.mutex.RLock()
203 defer m.mutex.RUnlock()
204 peers := make([]string, len(m.peers))
205 copy(peers, m.peers)
206 return peers
207 }
208
209 // GetNodeID returns the node's identity
210 func (m *Manager) GetNodeID() string {
211 return m.nodeID
212 }
213
214 // GetRelayURL returns the relay's URL
215 func (m *Manager) GetRelayURL() string {
216 return m.relayURL
217 }
218
219 // UpdateSerial updates the current serial number when a new event is stored
220 func (m *Manager) UpdateSerial() {
221 m.mutex.Lock()
222 defer m.mutex.Unlock()
223
224 // Get the latest serial from database
225 if latest, err := m.getLatestSerial(); err == nil {
226 m.currentSerial = latest
227 }
228 }
229
230 // NotifyNewEvent notifies the manager of a new event
231 func (m *Manager) NotifyNewEvent(eventID []byte, serial uint64) {
232 m.mutex.Lock()
233 defer m.mutex.Unlock()
234 if serial > m.currentSerial {
235 m.currentSerial = serial
236 }
237 }
238
239 // IsSelfURL checks if a URL is our own relay
240 func (m *Manager) IsSelfURL(url string) bool {
241 m.mutex.RLock()
242 if m.selfURLs[url] {
243 m.mutex.RUnlock()
244 return true
245 }
246 m.mutex.RUnlock()
247 return false
248 }
249
250 // MarkSelfURL marks a URL as belonging to us
251 func (m *Manager) MarkSelfURL(url string) {
252 m.mutex.Lock()
253 m.selfURLs[url] = true
254 m.mutex.Unlock()
255 }
256
257 // IsSelfNodeID checks if a node ID matches ours
258 func (m *Manager) IsSelfNodeID(nodeID string) bool {
259 return nodeID != "" && nodeID == m.nodeID
260 }
261
262 // getLatestSerial gets the latest serial number from the database
263 func (m *Manager) getLatestSerial() (uint64, error) {
264 // This is a simplified implementation
265 // In practice, you'd want to track the highest serial number
266 // For now, return the current serial
267 return m.currentSerial, nil
268 }
269
270 // syncRoutine periodically syncs with peers sequentially
271 func (m *Manager) syncRoutine() {
272 ticker := time.NewTicker(5 * time.Second) // Sync every 5 seconds
273 defer ticker.Stop()
274
275 for {
276 select {
277 case <-m.ctx.Done():
278 return
279 case <-ticker.C:
280 m.syncWithPeersSequentially()
281 }
282 }
283 }
284
285 // syncWithPeersSequentially syncs with all configured peers one at a time
286 func (m *Manager) syncWithPeersSequentially() {
287 for _, peerURL := range m.peers {
288 // Self-peers are already filtered out during initialization/update
289 m.syncWithPeer(peerURL)
290 // Small delay between peers to avoid overwhelming
291 time.Sleep(100 * time.Millisecond)
292 }
293 }
294
295 // syncWithPeer syncs with a specific peer
296 func (m *Manager) syncWithPeer(peerURL string) {
297 // Get the peer's current serial
298 currentReq := CurrentRequest{
299 NodeID: m.nodeID,
300 RelayURL: m.relayURL,
301 }
302
303 jsonData, err := json.Marshal(currentReq)
304 if err != nil {
305 log.E.F("failed to marshal current request: %v", err)
306 return
307 }
308
309 resp, err := http.Post(peerURL+"/api/sync/current", "application/json", bytes.NewBuffer(jsonData))
310 if err != nil {
311 log.D.F("failed to get current serial from %s: %v", peerURL, err)
312 return
313 }
314 defer resp.Body.Close()
315
316 if resp.StatusCode != http.StatusOK {
317 log.D.F("current request failed with %s: status %d", peerURL, resp.StatusCode)
318 return
319 }
320
321 var currentResp CurrentResponse
322 if err := json.NewDecoder(resp.Body).Decode(¤tResp); err != nil {
323 log.E.F("failed to decode current response from %s: %v", peerURL, err)
324 return
325 }
326
327 // Check if we need to sync
328 peerSerial := currentResp.Serial
329 ourLastSeen := m.peerSerials[peerURL]
330
331 if peerSerial > ourLastSeen {
332 // Request event IDs for the missing range
333 m.requestEventIDs(peerURL, ourLastSeen+1, peerSerial)
334 // Update our knowledge of peer's serial
335 m.mutex.Lock()
336 m.peerSerials[peerURL] = peerSerial
337 m.mutex.Unlock()
338 }
339 }
340
341 // requestEventIDs requests event IDs for a serial range from a peer
342 func (m *Manager) requestEventIDs(peerURL string, from, to uint64) {
343 req := EventIDsRequest{
344 NodeID: m.nodeID,
345 RelayURL: m.relayURL,
346 From: from,
347 To: to,
348 }
349
350 jsonData, err := json.Marshal(req)
351 if err != nil {
352 log.E.F("failed to marshal event-ids request: %v", err)
353 return
354 }
355
356 resp, err := http.Post(peerURL+"/api/sync/event-ids", "application/json", bytes.NewBuffer(jsonData))
357 if err != nil {
358 log.E.F("failed to request event IDs from %s: %v", peerURL, err)
359 return
360 }
361 defer resp.Body.Close()
362
363 if resp.StatusCode != http.StatusOK {
364 log.E.F("event-ids request failed with %s: status %d", peerURL, resp.StatusCode)
365 return
366 }
367
368 var eventIDsResp EventIDsResponse
369 if err := json.NewDecoder(resp.Body).Decode(&eventIDsResp); err != nil {
370 log.E.F("failed to decode event-ids response from %s: %v", peerURL, err)
371 return
372 }
373
374 // Check which events we don't have and request them via websocket
375 missingEventIDs := m.findMissingEventIDs(eventIDsResp.EventMap)
376 if len(missingEventIDs) > 0 {
377 m.requestEventsViaWebsocket(missingEventIDs)
378 log.D.F("requested %d missing events from peer %s", len(missingEventIDs), peerURL)
379 }
380 }
381
382 // findMissingEventIDs checks which event IDs we don't have locally
383 func (m *Manager) findMissingEventIDs(eventMap map[string]uint64) []string {
384 var missing []string
385
386 for eventID := range eventMap {
387 // Check if we have this event locally
388 // This is a simplified check - in practice you'd query the database
389 if !m.hasEventLocally(eventID) {
390 missing = append(missing, eventID)
391 }
392 }
393
394 return missing
395 }
396
397 // hasEventLocally checks if we have a specific event
398 func (m *Manager) hasEventLocally(eventID string) bool {
399 // Convert hex event ID to bytes
400 eventIDBytes, err := hex.Dec(eventID)
401 if err != nil {
402 log.D.F("invalid event ID format: %s", eventID)
403 return false
404 }
405
406 // Query for the event
407 f := &filter.F{
408 Ids: tag.NewFromBytesSlice(eventIDBytes),
409 }
410
411 events, err := m.db.QueryEvents(context.Background(), f)
412 if err != nil {
413 log.D.F("error querying for event %s: %v", eventID, err)
414 return false
415 }
416
417 return len(events) > 0
418 }
419
420 // requestEventsViaWebsocket requests specific events via websocket from peers
421 func (m *Manager) requestEventsViaWebsocket(eventIDs []string) {
422 if len(eventIDs) == 0 {
423 return
424 }
425
426 // Convert hex event IDs to bytes for websocket requests
427 var eventIDBytes [][]byte
428 for _, eventID := range eventIDs {
429 if evBytes, err := hex.Dec(eventID); err == nil {
430 eventIDBytes = append(eventIDBytes, evBytes)
431 }
432 }
433
434 if len(eventIDBytes) == 0 {
435 return
436 }
437
438 // TODO: Implement websocket connection and REQ message sending
439 // For now, try to request from our peers via their websocket endpoints
440 for _, peerURL := range m.peers {
441 // Convert HTTP URL to WebSocket URL
442 wsURL := strings.Replace(peerURL, "http://", "ws://", 1)
443 wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
444
445 log.D.F("would connect to %s and request %d events", wsURL, len(eventIDBytes))
446 // Here we would:
447 // 1. Establish websocket connection to peer
448 // 2. Send NIP-98 auth if required
449 // 3. Send REQ message with the filter for specific event IDs
450 // 4. Receive and process EVENT messages
451 // 5. Import received events
452 }
453
454 limit := 5
455 if len(eventIDs) < limit {
456 limit = len(eventIDs)
457 }
458 log.D.F("requested %d events via websocket: %v", len(eventIDs), eventIDs[:limit])
459 }
460
461 // GetEventsWithIDs retrieves events with their IDs by serial range
462 func (m *Manager) GetEventsWithIDs(from, to uint64) (map[string]uint64, error) {
463 eventMap := make(map[string]uint64)
464
465 // Get event serials by serial range
466 serials, err := m.db.EventIdsBySerial(from, int(to-from+1))
467 if err != nil {
468 return nil, err
469 }
470
471 // For each serial, we need to map it to an event ID
472 // This is a simplified implementation - in practice we'd need to query events by serial
473 for i, serial := range serials {
474 // TODO: Implement actual event ID retrieval by serial
475 // For now, create placeholder event IDs based on serial
476 eventID := fmt.Sprintf("event_%d", serial)
477 eventMap[eventID] = serial
478 _ = i // avoid unused variable warning
479 }
480
481 return eventMap, nil
482 }
483
484 // GetPeerStatus returns the sync status for all peers
485 func (m *Manager) GetPeerStatus() map[string]uint64 {
486 m.mutex.RLock()
487 defer m.mutex.RUnlock()
488 result := make(map[string]uint64)
489 for k, v := range m.peerSerials {
490 result[k] = v
491 }
492 return result
493 }
494
495 // HandleCurrentRequest handles requests for current serial number
496 func (m *Manager) HandleCurrentRequest(w http.ResponseWriter, r *http.Request) {
497 if r.Method != http.MethodPost {
498 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
499 return
500 }
501
502 var req CurrentRequest
503 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
504 http.Error(w, "Invalid JSON", http.StatusBadRequest)
505 return
506 }
507
508 // Reject requests from ourselves (same nodeID)
509 if req.NodeID != "" && req.NodeID == m.nodeID {
510 log.D.F("rejecting sync current request from self (nodeID: %s)", req.NodeID)
511 // Cache the requesting relay URL as ours for future fast lookups
512 if req.RelayURL != "" {
513 m.mutex.Lock()
514 m.selfURLs[req.RelayURL] = true
515 m.mutex.Unlock()
516 log.D.F("cached self-URL from inbound request: %s", req.RelayURL)
517 }
518 http.Error(w, "Cannot sync with self", http.StatusBadRequest)
519 return
520 }
521
522 resp := CurrentResponse{
523 NodeID: m.nodeID,
524 RelayURL: m.relayURL,
525 Serial: m.GetCurrentSerial(),
526 }
527
528 w.Header().Set("Content-Type", "application/json")
529 json.NewEncoder(w).Encode(resp)
530 }
531
532 // HandleEventIDsRequest handles requests for event IDs with their serial numbers
533 func (m *Manager) HandleEventIDsRequest(w http.ResponseWriter, r *http.Request) {
534 if r.Method != http.MethodPost {
535 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
536 return
537 }
538
539 var req EventIDsRequest
540 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
541 http.Error(w, "Invalid JSON", http.StatusBadRequest)
542 return
543 }
544
545 // Reject requests from ourselves (same nodeID)
546 if req.NodeID != "" && req.NodeID == m.nodeID {
547 log.D.F("rejecting sync event-ids request from self (nodeID: %s)", req.NodeID)
548 // Cache the requesting relay URL as ours for future fast lookups
549 if req.RelayURL != "" {
550 m.mutex.Lock()
551 m.selfURLs[req.RelayURL] = true
552 m.mutex.Unlock()
553 log.D.F("cached self-URL from inbound request: %s", req.RelayURL)
554 }
555 http.Error(w, "Cannot sync with self", http.StatusBadRequest)
556 return
557 }
558
559 // Get events with IDs in the requested range
560 eventMap, err := m.GetEventsWithIDs(req.From, req.To)
561 if err != nil {
562 http.Error(w, fmt.Sprintf("Failed to get event IDs: %v", err), http.StatusInternalServerError)
563 return
564 }
565
566 resp := EventIDsResponse{
567 EventMap: eventMap,
568 }
569
570 w.Header().Set("Content-Type", "application/json")
571 json.NewEncoder(w).Encode(resp)
572 }
573