registry.go raw

   1  package find
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"sync"
   7  	"time"
   8  
   9  	"next.orly.dev/pkg/lol/chk"
  10  	"next.orly.dev/pkg/database"
  11  	"next.orly.dev/pkg/nostr/encoders/event"
  12  	"next.orly.dev/pkg/nostr/encoders/hex"
  13  	"next.orly.dev/pkg/nostr/interfaces/signer"
  14  )
  15  
  16  // RegistryService implements the FIND name registry consensus protocol
  17  type RegistryService struct {
  18  	ctx              context.Context
  19  	cancel           context.CancelFunc
  20  	db               database.Database
  21  	signer           signer.I
  22  	trustGraph       *TrustGraph
  23  	consensus        *ConsensusEngine
  24  	config           *RegistryConfig
  25  	pendingProposals map[string]*ProposalState
  26  	mu               sync.RWMutex
  27  	wg               sync.WaitGroup
  28  }
  29  
  30  // RegistryConfig holds configuration for the registry service
  31  type RegistryConfig struct {
  32  	Enabled            bool
  33  	AttestationDelay   time.Duration
  34  	SparseEnabled      bool
  35  	SamplingRate       int
  36  	BootstrapServices  []string
  37  	MinimumAttesters   int
  38  }
  39  
  40  // ProposalState tracks a proposal during its attestation window
  41  type ProposalState struct {
  42  	Proposal     *RegistrationProposal
  43  	Attestations []*Attestation
  44  	ReceivedAt   time.Time
  45  	ProcessedAt  *time.Time
  46  	Timer        *time.Timer
  47  }
  48  
  49  // NewRegistryService creates a new registry service
  50  func NewRegistryService(ctx context.Context, db database.Database, signer signer.I, config *RegistryConfig) (*RegistryService, error) {
  51  	if !config.Enabled {
  52  		return nil, nil
  53  	}
  54  
  55  	ctx, cancel := context.WithCancel(ctx)
  56  
  57  	trustGraph := NewTrustGraph(signer.Pub())
  58  	consensus := NewConsensusEngine(db, trustGraph)
  59  
  60  	rs := &RegistryService{
  61  		ctx:              ctx,
  62  		cancel:           cancel,
  63  		db:               db,
  64  		signer:           signer,
  65  		trustGraph:       trustGraph,
  66  		consensus:        consensus,
  67  		config:           config,
  68  		pendingProposals: make(map[string]*ProposalState),
  69  	}
  70  
  71  	// Bootstrap trust graph if configured
  72  	if len(config.BootstrapServices) > 0 {
  73  		if err := rs.bootstrapTrustGraph(); chk.E(err) {
  74  			fmt.Printf("failed to bootstrap trust graph: %v\n", err)
  75  		}
  76  	}
  77  
  78  	return rs, nil
  79  }
  80  
  81  // Start starts the registry service
  82  func (rs *RegistryService) Start() error {
  83  	fmt.Println("starting FIND registry service")
  84  
  85  	// Start proposal monitoring goroutine
  86  	rs.wg.Add(1)
  87  	go rs.monitorProposals()
  88  
  89  	// Start attestation collection goroutine
  90  	rs.wg.Add(1)
  91  	go rs.collectAttestations()
  92  
  93  	// Start trust graph refresh goroutine
  94  	rs.wg.Add(1)
  95  	go rs.refreshTrustGraph()
  96  
  97  	return nil
  98  }
  99  
 100  // Stop stops the registry service
 101  func (rs *RegistryService) Stop() error {
 102  	fmt.Println("stopping FIND registry service")
 103  
 104  	rs.cancel()
 105  	rs.wg.Wait()
 106  
 107  	return nil
 108  }
 109  
 110  // monitorProposals monitors for new registration proposals
 111  func (rs *RegistryService) monitorProposals() {
 112  	defer rs.wg.Done()
 113  
 114  	ticker := time.NewTicker(10 * time.Second)
 115  	defer ticker.Stop()
 116  
 117  	for {
 118  		select {
 119  		case <-rs.ctx.Done():
 120  			return
 121  		case <-ticker.C:
 122  			rs.checkForNewProposals()
 123  		}
 124  	}
 125  }
 126  
 127  // checkForNewProposals checks database for new registration proposals
 128  func (rs *RegistryService) checkForNewProposals() {
 129  	// Query recent kind 30100 events (registration proposals)
 130  	// This would use the actual database query API
 131  	// For now, this is a stub
 132  
 133  	// TODO: Implement database query for kind 30100 events
 134  	// TODO: Parse proposals and add to pendingProposals map
 135  	// TODO: Start attestation timer for each new proposal
 136  }
 137  
 138  // OnProposalReceived is called when a new proposal is received
 139  func (rs *RegistryService) OnProposalReceived(proposal *RegistrationProposal) error {
 140  	// Validate proposal
 141  	if err := rs.consensus.ValidateProposal(proposal); chk.E(err) {
 142  		fmt.Printf("invalid proposal: %v\n", err)
 143  		return err
 144  	}
 145  
 146  	proposalID := hex.Enc(proposal.Event.ID)
 147  
 148  	rs.mu.Lock()
 149  	defer rs.mu.Unlock()
 150  
 151  	// Check if already processing
 152  	if _, exists := rs.pendingProposals[proposalID]; exists {
 153  		return nil
 154  	}
 155  
 156  	fmt.Printf("received new proposal: %s name: %s\n", proposalID, proposal.Name)
 157  
 158  	// Create proposal state
 159  	state := &ProposalState{
 160  		Proposal:     proposal,
 161  		Attestations: make([]*Attestation, 0),
 162  		ReceivedAt:   time.Now(),
 163  	}
 164  
 165  	// Start attestation timer
 166  	state.Timer = time.AfterFunc(rs.config.AttestationDelay, func() {
 167  		rs.processProposal(proposalID)
 168  	})
 169  
 170  	rs.pendingProposals[proposalID] = state
 171  
 172  	// Publish attestation (if not using sparse or if dice roll succeeds)
 173  	if rs.shouldAttest(proposalID) {
 174  		go rs.publishAttestation(proposal, DecisionApprove, "valid_proposal")
 175  	}
 176  
 177  	return nil
 178  }
 179  
 180  // shouldAttest determines if this service should attest to a proposal
 181  func (rs *RegistryService) shouldAttest(proposalID string) bool {
 182  	if !rs.config.SparseEnabled {
 183  		return true
 184  	}
 185  
 186  	// Sparse attestation: use hash of (proposal_id || service_pubkey) % K == 0
 187  	// This provides deterministic but distributed attestation
 188  	hash, err := hex.Dec(proposalID)
 189  	if err != nil || len(hash) == 0 {
 190  		return false
 191  	}
 192  
 193  	// Simple modulo check using first byte of hash
 194  	return int(hash[0])%rs.config.SamplingRate == 0
 195  }
 196  
 197  // publishAttestation publishes an attestation for a proposal
 198  func (rs *RegistryService) publishAttestation(proposal *RegistrationProposal, decision string, reason string) {
 199  	attestation := &Attestation{
 200  		ProposalID: hex.Enc(proposal.Event.ID),
 201  		Decision:   decision,
 202  		Weight:     100,
 203  		Reason:     reason,
 204  		ServiceURL: "", // TODO: Get from config
 205  		Expiration: time.Now().Add(AttestationExpiry),
 206  	}
 207  
 208  	// TODO: Create and sign attestation event (kind 20100)
 209  	// TODO: Publish to database
 210  	_ = attestation
 211  
 212  	fmt.Printf("published attestation for proposal: %s decision: %s\n", proposal.Name, decision)
 213  }
 214  
 215  // collectAttestations collects attestations from other registry services
 216  func (rs *RegistryService) collectAttestations() {
 217  	defer rs.wg.Done()
 218  
 219  	ticker := time.NewTicker(5 * time.Second)
 220  	defer ticker.Stop()
 221  
 222  	for {
 223  		select {
 224  		case <-rs.ctx.Done():
 225  			return
 226  		case <-ticker.C:
 227  			rs.updateAttestations()
 228  		}
 229  	}
 230  }
 231  
 232  // updateAttestations fetches new attestations from database
 233  func (rs *RegistryService) updateAttestations() {
 234  	rs.mu.RLock()
 235  	proposalIDs := make([]string, 0, len(rs.pendingProposals))
 236  	for id := range rs.pendingProposals {
 237  		proposalIDs = append(proposalIDs, id)
 238  	}
 239  	rs.mu.RUnlock()
 240  
 241  	if len(proposalIDs) == 0 {
 242  		return
 243  	}
 244  
 245  	// TODO: Query kind 20100 events (attestations) for pending proposals
 246  	// TODO: Add attestations to proposal states
 247  }
 248  
 249  // processProposal processes a proposal after the attestation window expires
 250  func (rs *RegistryService) processProposal(proposalID string) {
 251  	rs.mu.Lock()
 252  	state, exists := rs.pendingProposals[proposalID]
 253  	if !exists {
 254  		rs.mu.Unlock()
 255  		return
 256  	}
 257  
 258  	// Mark as processed
 259  	now := time.Now()
 260  	state.ProcessedAt = &now
 261  	rs.mu.Unlock()
 262  
 263  	fmt.Printf("processing proposal: %s name: %s\n", proposalID, state.Proposal.Name)
 264  
 265  	// Check for competing proposals for the same name
 266  	competingProposals := rs.getCompetingProposals(state.Proposal.Name)
 267  
 268  	// Gather all attestations
 269  	allAttestations := make([]*Attestation, 0)
 270  	for _, p := range competingProposals {
 271  		allAttestations = append(allAttestations, p.Attestations...)
 272  	}
 273  
 274  	// Compute consensus
 275  	proposalList := make([]*RegistrationProposal, 0, len(competingProposals))
 276  	for _, p := range competingProposals {
 277  		proposalList = append(proposalList, p.Proposal)
 278  	}
 279  
 280  	result, err := rs.consensus.ComputeConsensus(proposalList, allAttestations)
 281  	if chk.E(err) {
 282  		fmt.Printf("consensus computation failed: %v\n", err)
 283  		return
 284  	}
 285  
 286  	// Log result
 287  	if result.Conflicted {
 288  		fmt.Printf("consensus conflicted for name: %s reason: %s\n", state.Proposal.Name, result.Reason)
 289  		return
 290  	}
 291  
 292  	fmt.Printf("consensus reached for name: %s winner: %s confidence: %f\n",
 293  		state.Proposal.Name,
 294  		hex.Enc(result.Winner.Event.ID),
 295  		result.Confidence)
 296  
 297  	// Publish name state (kind 30102)
 298  	if err := rs.publishNameState(result); chk.E(err) {
 299  		fmt.Printf("failed to publish name state: %v\n", err)
 300  		return
 301  	}
 302  
 303  	// Clean up processed proposals
 304  	rs.cleanupProposals(state.Proposal.Name)
 305  }
 306  
 307  // getCompetingProposals returns all pending proposals for the same name
 308  func (rs *RegistryService) getCompetingProposals(name string) []*ProposalState {
 309  	rs.mu.RLock()
 310  	defer rs.mu.RUnlock()
 311  
 312  	proposals := make([]*ProposalState, 0)
 313  	for _, state := range rs.pendingProposals {
 314  		if state.Proposal.Name == name {
 315  			proposals = append(proposals, state)
 316  		}
 317  	}
 318  
 319  	return proposals
 320  }
 321  
 322  // publishNameState publishes a name state event after consensus
 323  func (rs *RegistryService) publishNameState(result *ConsensusResult) error {
 324  	nameState, err := rs.consensus.CreateNameState(result, rs.signer.Pub())
 325  	if err != nil {
 326  		return err
 327  	}
 328  
 329  	// TODO: Create kind 30102 event
 330  	// TODO: Sign with registry service key
 331  	// TODO: Publish to database
 332  	_ = nameState
 333  
 334  	return nil
 335  }
 336  
 337  // cleanupProposals removes processed proposals from the pending map
 338  func (rs *RegistryService) cleanupProposals(name string) {
 339  	rs.mu.Lock()
 340  	defer rs.mu.Unlock()
 341  
 342  	for id, state := range rs.pendingProposals {
 343  		if state.Proposal.Name == name && state.ProcessedAt != nil {
 344  			// Cancel timer if still running
 345  			if state.Timer != nil {
 346  				state.Timer.Stop()
 347  			}
 348  			delete(rs.pendingProposals, id)
 349  		}
 350  	}
 351  }
 352  
 353  // refreshTrustGraph periodically refreshes the trust graph from other services
 354  func (rs *RegistryService) refreshTrustGraph() {
 355  	defer rs.wg.Done()
 356  
 357  	ticker := time.NewTicker(1 * time.Hour)
 358  	defer ticker.Stop()
 359  
 360  	for {
 361  		select {
 362  		case <-rs.ctx.Done():
 363  			return
 364  		case <-ticker.C:
 365  			rs.updateTrustGraph()
 366  		}
 367  	}
 368  }
 369  
 370  // updateTrustGraph fetches trust graphs from other services
 371  func (rs *RegistryService) updateTrustGraph() {
 372  	fmt.Println("updating trust graph")
 373  
 374  	// TODO: Query kind 30101 events (trust graphs) from database
 375  	// TODO: Parse and update trust graph
 376  	// TODO: Remove expired trust graphs
 377  }
 378  
 379  // bootstrapTrustGraph initializes trust relationships with bootstrap services
 380  func (rs *RegistryService) bootstrapTrustGraph() error {
 381  	fmt.Printf("bootstrapping trust graph with %d services\n", len(rs.config.BootstrapServices))
 382  
 383  	for _, pubkeyHex := range rs.config.BootstrapServices {
 384  		entry := TrustEntry{
 385  			Pubkey:     pubkeyHex,
 386  			ServiceURL: "",
 387  			TrustScore: 0.7, // Medium trust for bootstrap services
 388  		}
 389  
 390  		if err := rs.trustGraph.AddEntry(entry); chk.E(err) {
 391  			fmt.Printf("failed to add bootstrap trust entry: %v\n", err)
 392  			continue
 393  		}
 394  	}
 395  
 396  	return nil
 397  }
 398  
 399  // GetTrustGraph returns the current trust graph
 400  func (rs *RegistryService) GetTrustGraph() *TrustGraph {
 401  	return rs.trustGraph
 402  }
 403  
 404  // GetMetrics returns registry service metrics
 405  func (rs *RegistryService) GetMetrics() *RegistryMetrics {
 406  	rs.mu.RLock()
 407  	defer rs.mu.RUnlock()
 408  
 409  	metrics := &RegistryMetrics{
 410  		PendingProposals: len(rs.pendingProposals),
 411  		TrustMetrics:     rs.trustGraph.CalculateTrustMetrics(),
 412  	}
 413  
 414  	return metrics
 415  }
 416  
 417  // RegistryMetrics holds metrics about the registry service
 418  type RegistryMetrics struct {
 419  	PendingProposals int
 420  	TrustMetrics     *TrustMetrics
 421  }
 422  
 423  // QueryNameOwnership queries the ownership state of a name
 424  func (rs *RegistryService) QueryNameOwnership(name string) (*NameState, error) {
 425  	return rs.consensus.QueryNameState(name)
 426  }
 427  
 428  // ValidateProposal validates a proposal without adding it to pending
 429  func (rs *RegistryService) ValidateProposal(proposal *RegistrationProposal) error {
 430  	return rs.consensus.ValidateProposal(proposal)
 431  }
 432  
 433  // HandleEvent processes incoming FIND-related events
 434  func (rs *RegistryService) HandleEvent(ev *event.E) error {
 435  	switch ev.Kind {
 436  	case KindRegistrationProposal:
 437  		// Parse proposal
 438  		proposal, err := ParseRegistrationProposal(ev)
 439  		if err != nil {
 440  			return err
 441  		}
 442  		return rs.OnProposalReceived(proposal)
 443  
 444  	case KindAttestation:
 445  		// Parse attestation
 446  		// TODO: Implement attestation parsing and handling
 447  		return nil
 448  
 449  	case KindTrustGraph:
 450  		// Parse trust graph
 451  		// TODO: Implement trust graph parsing and integration
 452  		return nil
 453  
 454  	default:
 455  		return nil
 456  	}
 457  }
 458