registry.go raw
1 package find
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "next.orly.dev/pkg/lol/chk"
10 "next.orly.dev/pkg/database"
11 "next.orly.dev/pkg/nostr/encoders/event"
12 "next.orly.dev/pkg/nostr/encoders/hex"
13 "next.orly.dev/pkg/nostr/interfaces/signer"
14 )
15
16 // RegistryService implements the FIND name registry consensus protocol
17 type RegistryService struct {
18 ctx context.Context
19 cancel context.CancelFunc
20 db database.Database
21 signer signer.I
22 trustGraph *TrustGraph
23 consensus *ConsensusEngine
24 config *RegistryConfig
25 pendingProposals map[string]*ProposalState
26 mu sync.RWMutex
27 wg sync.WaitGroup
28 }
29
30 // RegistryConfig holds configuration for the registry service
31 type RegistryConfig struct {
32 Enabled bool
33 AttestationDelay time.Duration
34 SparseEnabled bool
35 SamplingRate int
36 BootstrapServices []string
37 MinimumAttesters int
38 }
39
40 // ProposalState tracks a proposal during its attestation window
41 type ProposalState struct {
42 Proposal *RegistrationProposal
43 Attestations []*Attestation
44 ReceivedAt time.Time
45 ProcessedAt *time.Time
46 Timer *time.Timer
47 }
48
49 // NewRegistryService creates a new registry service
50 func NewRegistryService(ctx context.Context, db database.Database, signer signer.I, config *RegistryConfig) (*RegistryService, error) {
51 if !config.Enabled {
52 return nil, nil
53 }
54
55 ctx, cancel := context.WithCancel(ctx)
56
57 trustGraph := NewTrustGraph(signer.Pub())
58 consensus := NewConsensusEngine(db, trustGraph)
59
60 rs := &RegistryService{
61 ctx: ctx,
62 cancel: cancel,
63 db: db,
64 signer: signer,
65 trustGraph: trustGraph,
66 consensus: consensus,
67 config: config,
68 pendingProposals: make(map[string]*ProposalState),
69 }
70
71 // Bootstrap trust graph if configured
72 if len(config.BootstrapServices) > 0 {
73 if err := rs.bootstrapTrustGraph(); chk.E(err) {
74 fmt.Printf("failed to bootstrap trust graph: %v\n", err)
75 }
76 }
77
78 return rs, nil
79 }
80
81 // Start starts the registry service
82 func (rs *RegistryService) Start() error {
83 fmt.Println("starting FIND registry service")
84
85 // Start proposal monitoring goroutine
86 rs.wg.Add(1)
87 go rs.monitorProposals()
88
89 // Start attestation collection goroutine
90 rs.wg.Add(1)
91 go rs.collectAttestations()
92
93 // Start trust graph refresh goroutine
94 rs.wg.Add(1)
95 go rs.refreshTrustGraph()
96
97 return nil
98 }
99
100 // Stop stops the registry service
101 func (rs *RegistryService) Stop() error {
102 fmt.Println("stopping FIND registry service")
103
104 rs.cancel()
105 rs.wg.Wait()
106
107 return nil
108 }
109
110 // monitorProposals monitors for new registration proposals
111 func (rs *RegistryService) monitorProposals() {
112 defer rs.wg.Done()
113
114 ticker := time.NewTicker(10 * time.Second)
115 defer ticker.Stop()
116
117 for {
118 select {
119 case <-rs.ctx.Done():
120 return
121 case <-ticker.C:
122 rs.checkForNewProposals()
123 }
124 }
125 }
126
127 // checkForNewProposals checks database for new registration proposals
128 func (rs *RegistryService) checkForNewProposals() {
129 // Query recent kind 30100 events (registration proposals)
130 // This would use the actual database query API
131 // For now, this is a stub
132
133 // TODO: Implement database query for kind 30100 events
134 // TODO: Parse proposals and add to pendingProposals map
135 // TODO: Start attestation timer for each new proposal
136 }
137
138 // OnProposalReceived is called when a new proposal is received
139 func (rs *RegistryService) OnProposalReceived(proposal *RegistrationProposal) error {
140 // Validate proposal
141 if err := rs.consensus.ValidateProposal(proposal); chk.E(err) {
142 fmt.Printf("invalid proposal: %v\n", err)
143 return err
144 }
145
146 proposalID := hex.Enc(proposal.Event.ID)
147
148 rs.mu.Lock()
149 defer rs.mu.Unlock()
150
151 // Check if already processing
152 if _, exists := rs.pendingProposals[proposalID]; exists {
153 return nil
154 }
155
156 fmt.Printf("received new proposal: %s name: %s\n", proposalID, proposal.Name)
157
158 // Create proposal state
159 state := &ProposalState{
160 Proposal: proposal,
161 Attestations: make([]*Attestation, 0),
162 ReceivedAt: time.Now(),
163 }
164
165 // Start attestation timer
166 state.Timer = time.AfterFunc(rs.config.AttestationDelay, func() {
167 rs.processProposal(proposalID)
168 })
169
170 rs.pendingProposals[proposalID] = state
171
172 // Publish attestation (if not using sparse or if dice roll succeeds)
173 if rs.shouldAttest(proposalID) {
174 go rs.publishAttestation(proposal, DecisionApprove, "valid_proposal")
175 }
176
177 return nil
178 }
179
180 // shouldAttest determines if this service should attest to a proposal
181 func (rs *RegistryService) shouldAttest(proposalID string) bool {
182 if !rs.config.SparseEnabled {
183 return true
184 }
185
186 // Sparse attestation: use hash of (proposal_id || service_pubkey) % K == 0
187 // This provides deterministic but distributed attestation
188 hash, err := hex.Dec(proposalID)
189 if err != nil || len(hash) == 0 {
190 return false
191 }
192
193 // Simple modulo check using first byte of hash
194 return int(hash[0])%rs.config.SamplingRate == 0
195 }
196
197 // publishAttestation publishes an attestation for a proposal
198 func (rs *RegistryService) publishAttestation(proposal *RegistrationProposal, decision string, reason string) {
199 attestation := &Attestation{
200 ProposalID: hex.Enc(proposal.Event.ID),
201 Decision: decision,
202 Weight: 100,
203 Reason: reason,
204 ServiceURL: "", // TODO: Get from config
205 Expiration: time.Now().Add(AttestationExpiry),
206 }
207
208 // TODO: Create and sign attestation event (kind 20100)
209 // TODO: Publish to database
210 _ = attestation
211
212 fmt.Printf("published attestation for proposal: %s decision: %s\n", proposal.Name, decision)
213 }
214
215 // collectAttestations collects attestations from other registry services
216 func (rs *RegistryService) collectAttestations() {
217 defer rs.wg.Done()
218
219 ticker := time.NewTicker(5 * time.Second)
220 defer ticker.Stop()
221
222 for {
223 select {
224 case <-rs.ctx.Done():
225 return
226 case <-ticker.C:
227 rs.updateAttestations()
228 }
229 }
230 }
231
232 // updateAttestations fetches new attestations from database
233 func (rs *RegistryService) updateAttestations() {
234 rs.mu.RLock()
235 proposalIDs := make([]string, 0, len(rs.pendingProposals))
236 for id := range rs.pendingProposals {
237 proposalIDs = append(proposalIDs, id)
238 }
239 rs.mu.RUnlock()
240
241 if len(proposalIDs) == 0 {
242 return
243 }
244
245 // TODO: Query kind 20100 events (attestations) for pending proposals
246 // TODO: Add attestations to proposal states
247 }
248
249 // processProposal processes a proposal after the attestation window expires
250 func (rs *RegistryService) processProposal(proposalID string) {
251 rs.mu.Lock()
252 state, exists := rs.pendingProposals[proposalID]
253 if !exists {
254 rs.mu.Unlock()
255 return
256 }
257
258 // Mark as processed
259 now := time.Now()
260 state.ProcessedAt = &now
261 rs.mu.Unlock()
262
263 fmt.Printf("processing proposal: %s name: %s\n", proposalID, state.Proposal.Name)
264
265 // Check for competing proposals for the same name
266 competingProposals := rs.getCompetingProposals(state.Proposal.Name)
267
268 // Gather all attestations
269 allAttestations := make([]*Attestation, 0)
270 for _, p := range competingProposals {
271 allAttestations = append(allAttestations, p.Attestations...)
272 }
273
274 // Compute consensus
275 proposalList := make([]*RegistrationProposal, 0, len(competingProposals))
276 for _, p := range competingProposals {
277 proposalList = append(proposalList, p.Proposal)
278 }
279
280 result, err := rs.consensus.ComputeConsensus(proposalList, allAttestations)
281 if chk.E(err) {
282 fmt.Printf("consensus computation failed: %v\n", err)
283 return
284 }
285
286 // Log result
287 if result.Conflicted {
288 fmt.Printf("consensus conflicted for name: %s reason: %s\n", state.Proposal.Name, result.Reason)
289 return
290 }
291
292 fmt.Printf("consensus reached for name: %s winner: %s confidence: %f\n",
293 state.Proposal.Name,
294 hex.Enc(result.Winner.Event.ID),
295 result.Confidence)
296
297 // Publish name state (kind 30102)
298 if err := rs.publishNameState(result); chk.E(err) {
299 fmt.Printf("failed to publish name state: %v\n", err)
300 return
301 }
302
303 // Clean up processed proposals
304 rs.cleanupProposals(state.Proposal.Name)
305 }
306
307 // getCompetingProposals returns all pending proposals for the same name
308 func (rs *RegistryService) getCompetingProposals(name string) []*ProposalState {
309 rs.mu.RLock()
310 defer rs.mu.RUnlock()
311
312 proposals := make([]*ProposalState, 0)
313 for _, state := range rs.pendingProposals {
314 if state.Proposal.Name == name {
315 proposals = append(proposals, state)
316 }
317 }
318
319 return proposals
320 }
321
322 // publishNameState publishes a name state event after consensus
323 func (rs *RegistryService) publishNameState(result *ConsensusResult) error {
324 nameState, err := rs.consensus.CreateNameState(result, rs.signer.Pub())
325 if err != nil {
326 return err
327 }
328
329 // TODO: Create kind 30102 event
330 // TODO: Sign with registry service key
331 // TODO: Publish to database
332 _ = nameState
333
334 return nil
335 }
336
337 // cleanupProposals removes processed proposals from the pending map
338 func (rs *RegistryService) cleanupProposals(name string) {
339 rs.mu.Lock()
340 defer rs.mu.Unlock()
341
342 for id, state := range rs.pendingProposals {
343 if state.Proposal.Name == name && state.ProcessedAt != nil {
344 // Cancel timer if still running
345 if state.Timer != nil {
346 state.Timer.Stop()
347 }
348 delete(rs.pendingProposals, id)
349 }
350 }
351 }
352
353 // refreshTrustGraph periodically refreshes the trust graph from other services
354 func (rs *RegistryService) refreshTrustGraph() {
355 defer rs.wg.Done()
356
357 ticker := time.NewTicker(1 * time.Hour)
358 defer ticker.Stop()
359
360 for {
361 select {
362 case <-rs.ctx.Done():
363 return
364 case <-ticker.C:
365 rs.updateTrustGraph()
366 }
367 }
368 }
369
370 // updateTrustGraph fetches trust graphs from other services
371 func (rs *RegistryService) updateTrustGraph() {
372 fmt.Println("updating trust graph")
373
374 // TODO: Query kind 30101 events (trust graphs) from database
375 // TODO: Parse and update trust graph
376 // TODO: Remove expired trust graphs
377 }
378
379 // bootstrapTrustGraph initializes trust relationships with bootstrap services
380 func (rs *RegistryService) bootstrapTrustGraph() error {
381 fmt.Printf("bootstrapping trust graph with %d services\n", len(rs.config.BootstrapServices))
382
383 for _, pubkeyHex := range rs.config.BootstrapServices {
384 entry := TrustEntry{
385 Pubkey: pubkeyHex,
386 ServiceURL: "",
387 TrustScore: 0.7, // Medium trust for bootstrap services
388 }
389
390 if err := rs.trustGraph.AddEntry(entry); chk.E(err) {
391 fmt.Printf("failed to add bootstrap trust entry: %v\n", err)
392 continue
393 }
394 }
395
396 return nil
397 }
398
399 // GetTrustGraph returns the current trust graph
400 func (rs *RegistryService) GetTrustGraph() *TrustGraph {
401 return rs.trustGraph
402 }
403
404 // GetMetrics returns registry service metrics
405 func (rs *RegistryService) GetMetrics() *RegistryMetrics {
406 rs.mu.RLock()
407 defer rs.mu.RUnlock()
408
409 metrics := &RegistryMetrics{
410 PendingProposals: len(rs.pendingProposals),
411 TrustMetrics: rs.trustGraph.CalculateTrustMetrics(),
412 }
413
414 return metrics
415 }
416
417 // RegistryMetrics holds metrics about the registry service
418 type RegistryMetrics struct {
419 PendingProposals int
420 TrustMetrics *TrustMetrics
421 }
422
423 // QueryNameOwnership queries the ownership state of a name
424 func (rs *RegistryService) QueryNameOwnership(name string) (*NameState, error) {
425 return rs.consensus.QueryNameState(name)
426 }
427
428 // ValidateProposal validates a proposal without adding it to pending
429 func (rs *RegistryService) ValidateProposal(proposal *RegistrationProposal) error {
430 return rs.consensus.ValidateProposal(proposal)
431 }
432
433 // HandleEvent processes incoming FIND-related events
434 func (rs *RegistryService) HandleEvent(ev *event.E) error {
435 switch ev.Kind {
436 case KindRegistrationProposal:
437 // Parse proposal
438 proposal, err := ParseRegistrationProposal(ev)
439 if err != nil {
440 return err
441 }
442 return rs.OnProposalReceived(proposal)
443
444 case KindAttestation:
445 // Parse attestation
446 // TODO: Implement attestation parsing and handling
447 return nil
448
449 case KindTrustGraph:
450 // Parse trust graph
451 // TODO: Implement trust graph parsing and integration
452 return nil
453
454 default:
455 return nil
456 }
457 }
458