client.go raw
1 package nrc
2
3 import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "sync"
9 "time"
10
11 "next.orly.dev/pkg/nostr/crypto/encryption"
12 "next.orly.dev/pkg/nostr/encoders/event"
13 "next.orly.dev/pkg/nostr/encoders/filter"
14 "next.orly.dev/pkg/nostr/encoders/hex"
15 "next.orly.dev/pkg/nostr/encoders/kind"
16 "next.orly.dev/pkg/nostr/encoders/tag"
17 "next.orly.dev/pkg/nostr/encoders/timestamp"
18 "next.orly.dev/pkg/nostr/interfaces/signer"
19 "next.orly.dev/pkg/nostr/ws"
20 "github.com/google/uuid"
21 "next.orly.dev/pkg/lol/chk"
22 "next.orly.dev/pkg/lol/log"
23 )
24
25 // chunkBuffer holds chunks for a message being reassembled.
26 type chunkBuffer struct {
27 chunks map[int]string
28 total int
29 receivedAt time.Time
30 }
31
32 // Client connects to a private relay through the NRC tunnel.
33 type Client struct {
34 uri *ConnectionURI
35 sessionID string
36 rendezvousConn *ws.Client
37 responseSub *ws.Subscription
38 conversationKey []byte
39 clientSigner signer.I
40
41 // pending maps request event IDs to response channels.
42 pending map[string]chan *ResponseMessage
43 pendingMu sync.Mutex
44
45 // subscriptions maps subscription IDs to event channels.
46 subscriptions map[string]chan *event.E
47 subscriptionsMu sync.Mutex
48
49 // chunkBuffers holds partially received chunked messages.
50 chunkBuffers map[string]*chunkBuffer
51 chunkBuffersMu sync.Mutex
52
53 ctx context.Context
54 cancel context.CancelFunc
55 }
56
57 // NewClient creates a new NRC client from a connection URI.
58 func NewClient(connectionURI string) (*Client, error) {
59 uri, err := ParseConnectionURI(connectionURI)
60 if err != nil {
61 return nil, fmt.Errorf("invalid URI: %w", err)
62 }
63
64 if uri.AuthMode != AuthModeSecret {
65 return nil, fmt.Errorf("CAT authentication not yet supported in client")
66 }
67
68 ctx, cancel := context.WithCancel(context.Background())
69 return &Client{
70 uri: uri,
71 sessionID: uuid.New().String(),
72 conversationKey: uri.GetConversationKey(),
73 clientSigner: uri.GetClientSigner(),
74 pending: make(map[string]chan *ResponseMessage),
75 subscriptions: make(map[string]chan *event.E),
76 chunkBuffers: make(map[string]*chunkBuffer),
77 ctx: ctx,
78 cancel: cancel,
79 }, nil
80 }
81
82 // Connect establishes the connection to the rendezvous relay.
83 func (c *Client) Connect(ctx context.Context) error {
84 // Connect to rendezvous relay
85 conn, err := ws.RelayConnect(ctx, c.uri.RendezvousRelay)
86 if chk.E(err) {
87 return fmt.Errorf("%w: %v", ErrRendezvousConnectionFailed, err)
88 }
89 c.rendezvousConn = conn
90
91 // Subscribe to response events
92 clientPubkeyHex := hex.Enc(c.clientSigner.Pub())
93 sub, err := conn.Subscribe(
94 ctx,
95 filter.NewS(&filter.F{
96 Kinds: kind.NewS(kind.New(KindNRCResponse)),
97 Tags: tag.NewS(
98 tag.NewFromAny("p", clientPubkeyHex),
99 ),
100 Since: ×tamp.T{V: time.Now().Unix()},
101 }),
102 )
103 if chk.E(err) {
104 conn.Close()
105 return fmt.Errorf("subscription failed: %w", err)
106 }
107 c.responseSub = sub
108
109 // Start response handler
110 go c.handleResponses()
111
112 log.I.F("NRC client connected to %s via %s",
113 hex.Enc(c.uri.RelayPubkey), c.uri.RendezvousRelay)
114
115 return nil
116 }
117
118 // Close closes the client connection.
119 func (c *Client) Close() {
120 c.cancel()
121 if c.responseSub != nil {
122 c.responseSub.Unsub()
123 }
124 if c.rendezvousConn != nil {
125 c.rendezvousConn.Close()
126 }
127
128 // Close all pending channels
129 c.pendingMu.Lock()
130 for _, ch := range c.pending {
131 close(ch)
132 }
133 c.pending = make(map[string]chan *ResponseMessage)
134 c.pendingMu.Unlock()
135
136 // Close all subscription channels
137 c.subscriptionsMu.Lock()
138 for _, ch := range c.subscriptions {
139 close(ch)
140 }
141 c.subscriptions = make(map[string]chan *event.E)
142 c.subscriptionsMu.Unlock()
143
144 // Clear chunk buffers
145 c.chunkBuffersMu.Lock()
146 c.chunkBuffers = make(map[string]*chunkBuffer)
147 c.chunkBuffersMu.Unlock()
148 }
149
150 // handleResponses processes incoming NRC response events.
151 func (c *Client) handleResponses() {
152 for {
153 select {
154 case <-c.ctx.Done():
155 return
156 case ev := <-c.responseSub.Events:
157 if ev == nil {
158 return
159 }
160 c.processResponse(ev)
161 }
162 }
163 }
164
165 // processResponse decrypts and routes a response event.
166 func (c *Client) processResponse(ev *event.E) {
167 // Decrypt content
168 decrypted, err := encryption.Decrypt(c.conversationKey, string(ev.Content))
169 if err != nil {
170 log.W.F("NRC response decryption failed: %v", err)
171 return
172 }
173
174 // Parse response
175 var resp struct {
176 Type string `json:"type"`
177 Payload []any `json:"payload"`
178 }
179 if err := json.Unmarshal([]byte(decrypted), &resp); err != nil {
180 log.W.F("NRC response parse failed: %v", err)
181 return
182 }
183
184 // Extract request event ID for routing
185 var requestEventID string
186 eTag := ev.Tags.GetFirst([]byte("e"))
187 if eTag != nil && eTag.Len() >= 2 {
188 requestEventID = string(eTag.ValueHex())
189 }
190
191 // Route based on response type
192 switch resp.Type {
193 case "EVENT":
194 c.handleEventResponse(resp.Payload)
195 case "EOSE":
196 c.handleEOSEResponse(resp.Payload, requestEventID)
197 case "OK":
198 c.handleOKResponse(resp.Payload, requestEventID)
199 case "NOTICE":
200 c.handleNoticeResponse(resp.Payload)
201 case "CLOSED":
202 c.handleClosedResponse(resp.Payload)
203 case "COUNT":
204 c.handleCountResponse(resp.Payload, requestEventID)
205 case "AUTH":
206 c.handleAuthResponse(resp.Payload, requestEventID)
207 case "IDS":
208 c.handleIDSResponse(resp.Payload, requestEventID)
209 case "CHUNK":
210 c.handleChunkResponse(resp.Payload, requestEventID)
211 }
212 }
213
214 // handleEventResponse routes an EVENT to the appropriate subscription.
215 func (c *Client) handleEventResponse(payload []any) {
216 if len(payload) < 3 {
217 return
218 }
219 // Payload: ["EVENT", "<sub_id>", {...event...}]
220 subID, ok := payload[1].(string)
221 if !ok {
222 return
223 }
224
225 c.subscriptionsMu.Lock()
226 ch, exists := c.subscriptions[subID]
227 c.subscriptionsMu.Unlock()
228
229 if !exists {
230 return
231 }
232
233 // Parse event from payload
234 eventData, ok := payload[2].(map[string]any)
235 if !ok {
236 return
237 }
238
239 eventBytes, err := json.Marshal(eventData)
240 if err != nil {
241 return
242 }
243
244 var ev event.E
245 if err := json.Unmarshal(eventBytes, &ev); err != nil {
246 return
247 }
248
249 select {
250 case ch <- &ev:
251 default:
252 // Channel full, drop event
253 }
254 }
255
256 // handleEOSEResponse handles an EOSE response.
257 func (c *Client) handleEOSEResponse(payload []any, requestEventID string) {
258 // Route to pending request
259 c.pendingMu.Lock()
260 ch, exists := c.pending[requestEventID]
261 c.pendingMu.Unlock()
262
263 if exists {
264 resp := &ResponseMessage{Type: "EOSE", Payload: payload}
265 select {
266 case ch <- resp:
267 default:
268 }
269 }
270 }
271
272 // handleOKResponse handles an OK response.
273 func (c *Client) handleOKResponse(payload []any, requestEventID string) {
274 c.pendingMu.Lock()
275 ch, exists := c.pending[requestEventID]
276 c.pendingMu.Unlock()
277
278 if exists {
279 resp := &ResponseMessage{Type: "OK", Payload: payload}
280 select {
281 case ch <- resp:
282 default:
283 }
284 }
285 }
286
287 // handleNoticeResponse logs a NOTICE.
288 func (c *Client) handleNoticeResponse(payload []any) {
289 if len(payload) >= 2 {
290 if msg, ok := payload[1].(string); ok {
291 log.W.F("NRC NOTICE: %s", msg)
292 }
293 }
294 }
295
296 // handleClosedResponse handles a subscription close.
297 func (c *Client) handleClosedResponse(payload []any) {
298 if len(payload) >= 2 {
299 if subID, ok := payload[1].(string); ok {
300 c.subscriptionsMu.Lock()
301 if ch, exists := c.subscriptions[subID]; exists {
302 close(ch)
303 delete(c.subscriptions, subID)
304 }
305 c.subscriptionsMu.Unlock()
306 }
307 }
308 }
309
310 // handleCountResponse handles a COUNT response.
311 func (c *Client) handleCountResponse(payload []any, requestEventID string) {
312 c.pendingMu.Lock()
313 ch, exists := c.pending[requestEventID]
314 c.pendingMu.Unlock()
315
316 if exists {
317 resp := &ResponseMessage{Type: "COUNT", Payload: payload}
318 select {
319 case ch <- resp:
320 default:
321 }
322 }
323 }
324
325 // handleAuthResponse handles an AUTH challenge.
326 func (c *Client) handleAuthResponse(payload []any, requestEventID string) {
327 c.pendingMu.Lock()
328 ch, exists := c.pending[requestEventID]
329 c.pendingMu.Unlock()
330
331 if exists {
332 resp := &ResponseMessage{Type: "AUTH", Payload: payload}
333 select {
334 case ch <- resp:
335 default:
336 }
337 }
338 }
339
340 // handleIDSResponse handles an IDS response.
341 func (c *Client) handleIDSResponse(payload []any, requestEventID string) {
342 c.pendingMu.Lock()
343 ch, exists := c.pending[requestEventID]
344 c.pendingMu.Unlock()
345
346 if exists {
347 resp := &ResponseMessage{Type: "IDS", Payload: payload}
348 select {
349 case ch <- resp:
350 default:
351 }
352 }
353 }
354
355 // handleChunkResponse handles a CHUNK response and reassembles the message.
356 func (c *Client) handleChunkResponse(payload []any, requestEventID string) {
357 if len(payload) < 1 {
358 return
359 }
360
361 // Parse chunk message from payload
362 chunkData, ok := payload[0].(map[string]any)
363 if !ok {
364 log.W.F("NRC: invalid chunk payload format")
365 return
366 }
367
368 messageID, _ := chunkData["messageId"].(string)
369 indexFloat, _ := chunkData["index"].(float64)
370 totalFloat, _ := chunkData["total"].(float64)
371 data, _ := chunkData["data"].(string)
372
373 if messageID == "" || data == "" {
374 log.W.F("NRC: chunk missing required fields")
375 return
376 }
377
378 index := int(indexFloat)
379 total := int(totalFloat)
380
381 c.chunkBuffersMu.Lock()
382 defer c.chunkBuffersMu.Unlock()
383
384 // Get or create buffer for this message
385 buf, exists := c.chunkBuffers[messageID]
386 if !exists {
387 buf = &chunkBuffer{
388 chunks: make(map[int]string),
389 total: total,
390 receivedAt: time.Now(),
391 }
392 c.chunkBuffers[messageID] = buf
393 }
394
395 // Store the chunk
396 buf.chunks[index] = data
397 log.D.F("NRC: received chunk %d/%d for message %s", index+1, total, messageID[:8])
398
399 // Check if we have all chunks
400 if len(buf.chunks) == buf.total {
401 // Reassemble the message
402 var encoded string
403 for i := 0; i < buf.total; i++ {
404 part, ok := buf.chunks[i]
405 if !ok {
406 log.W.F("NRC: missing chunk %d for message %s", i, messageID)
407 delete(c.chunkBuffers, messageID)
408 return
409 }
410 encoded += part
411 }
412
413 // Decode from base64
414 decoded, err := base64.StdEncoding.DecodeString(encoded)
415 if err != nil {
416 log.W.F("NRC: failed to decode chunked message: %v", err)
417 delete(c.chunkBuffers, messageID)
418 return
419 }
420
421 // Parse the reassembled response
422 var resp struct {
423 Type string `json:"type"`
424 Payload []any `json:"payload"`
425 }
426 if err := json.Unmarshal(decoded, &resp); err != nil {
427 log.W.F("NRC: failed to parse reassembled message: %v", err)
428 delete(c.chunkBuffers, messageID)
429 return
430 }
431
432 log.D.F("NRC: reassembled chunked message: %s", resp.Type)
433
434 // Clean up buffer
435 delete(c.chunkBuffers, messageID)
436
437 // Route the reassembled response
438 c.pendingMu.Lock()
439 ch, exists := c.pending[requestEventID]
440 c.pendingMu.Unlock()
441
442 if exists {
443 respMsg := &ResponseMessage{Type: resp.Type, Payload: resp.Payload}
444 select {
445 case ch <- respMsg:
446 default:
447 }
448 }
449 }
450
451 // Clean up stale buffers (older than 60 seconds)
452 now := time.Now()
453 for id, b := range c.chunkBuffers {
454 if now.Sub(b.receivedAt) > 60*time.Second {
455 log.W.F("NRC: discarding stale chunk buffer: %s", id)
456 delete(c.chunkBuffers, id)
457 }
458 }
459 }
460
461 // sendRequest sends an NRC request and waits for response.
462 func (c *Client) sendRequest(ctx context.Context, msgType string, payload []any) (*ResponseMessage, error) {
463 // Build request content
464 reqContent := struct {
465 Type string `json:"type"`
466 Payload []any `json:"payload"`
467 }{
468 Type: msgType,
469 Payload: payload,
470 }
471
472 contentBytes, err := json.Marshal(reqContent)
473 if err != nil {
474 return nil, fmt.Errorf("marshal failed: %w", err)
475 }
476
477 // Encrypt content
478 encrypted, err := encryption.Encrypt(c.conversationKey, contentBytes, nil)
479 if err != nil {
480 return nil, fmt.Errorf("%w: %v", ErrEncryptionFailed, err)
481 }
482
483 // Build request event
484 reqEvent := &event.E{
485 Content: []byte(encrypted),
486 CreatedAt: time.Now().Unix(),
487 Kind: KindNRCRequest,
488 Tags: tag.NewS(
489 tag.NewFromAny("p", hex.Enc(c.uri.RelayPubkey)),
490 tag.NewFromAny("encryption", "nip44_v2"),
491 tag.NewFromAny("session", c.sessionID),
492 ),
493 }
494
495 // Sign with client key
496 if err := reqEvent.Sign(c.clientSigner); chk.E(err) {
497 return nil, fmt.Errorf("signing failed: %w", err)
498 }
499
500 // Set up response channel
501 responseCh := make(chan *ResponseMessage, 1)
502 requestEventID := string(hex.Enc(reqEvent.ID[:]))
503
504 c.pendingMu.Lock()
505 c.pending[requestEventID] = responseCh
506 c.pendingMu.Unlock()
507
508 defer func() {
509 c.pendingMu.Lock()
510 delete(c.pending, requestEventID)
511 c.pendingMu.Unlock()
512 }()
513
514 // Publish request
515 if err := c.rendezvousConn.Publish(ctx, reqEvent); chk.E(err) {
516 return nil, fmt.Errorf("publish failed: %w", err)
517 }
518
519 // Wait for response
520 select {
521 case <-ctx.Done():
522 return nil, ctx.Err()
523 case resp := <-responseCh:
524 if resp == nil {
525 return nil, fmt.Errorf("response channel closed")
526 }
527 return resp, nil
528 }
529 }
530
531 // Publish publishes an event to the private relay.
532 func (c *Client) Publish(ctx context.Context, ev *event.E) (bool, string, error) {
533 // Convert event to JSON for payload
534 eventBytes, err := json.Marshal(ev)
535 if err != nil {
536 return false, "", fmt.Errorf("marshal event failed: %w", err)
537 }
538
539 var eventMap map[string]any
540 if err := json.Unmarshal(eventBytes, &eventMap); err != nil {
541 return false, "", fmt.Errorf("unmarshal event failed: %w", err)
542 }
543
544 payload := []any{"EVENT", eventMap}
545
546 resp, err := c.sendRequest(ctx, "EVENT", payload)
547 if err != nil {
548 return false, "", err
549 }
550
551 // Parse OK response: ["OK", "<event_id>", <success>, "<message>"]
552 if resp.Type != "OK" || len(resp.Payload) < 4 {
553 return false, "", fmt.Errorf("unexpected response type: %s", resp.Type)
554 }
555
556 success, _ := resp.Payload[2].(bool)
557 message, _ := resp.Payload[3].(string)
558
559 return success, message, nil
560 }
561
562 // Subscribe creates a subscription to the private relay.
563 func (c *Client) Subscribe(ctx context.Context, subID string, filters ...*filter.F) (<-chan *event.E, error) {
564 // Build payload: ["REQ", "<sub_id>", filter1, filter2, ...]
565 payload := []any{"REQ", subID}
566 for _, f := range filters {
567 filterBytes, err := json.Marshal(f)
568 if err != nil {
569 return nil, fmt.Errorf("marshal filter failed: %w", err)
570 }
571 var filterMap map[string]any
572 if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
573 return nil, fmt.Errorf("unmarshal filter failed: %w", err)
574 }
575 payload = append(payload, filterMap)
576 }
577
578 // Create event channel for this subscription
579 eventCh := make(chan *event.E, 100)
580
581 c.subscriptionsMu.Lock()
582 c.subscriptions[subID] = eventCh
583 c.subscriptionsMu.Unlock()
584
585 // Send request (don't wait for EOSE, events will come asynchronously)
586 go func() {
587 reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
588 defer cancel()
589 _, err := c.sendRequest(reqCtx, "REQ", payload)
590 if err != nil {
591 log.W.F("NRC subscribe failed: %v", err)
592 }
593 }()
594
595 return eventCh, nil
596 }
597
598 // Unsubscribe closes a subscription.
599 func (c *Client) Unsubscribe(ctx context.Context, subID string) error {
600 // Remove from local tracking
601 c.subscriptionsMu.Lock()
602 if ch, exists := c.subscriptions[subID]; exists {
603 close(ch)
604 delete(c.subscriptions, subID)
605 }
606 c.subscriptionsMu.Unlock()
607
608 // Send CLOSE to relay
609 payload := []any{"CLOSE", subID}
610 _, err := c.sendRequest(ctx, "CLOSE", payload)
611 return err
612 }
613
614 // Count sends a COUNT request to the private relay.
615 func (c *Client) Count(ctx context.Context, subID string, filters ...*filter.F) (int64, error) {
616 // Build payload: ["COUNT", "<sub_id>", filter1, filter2, ...]
617 payload := []any{"COUNT", subID}
618 for _, f := range filters {
619 filterBytes, err := json.Marshal(f)
620 if err != nil {
621 return 0, fmt.Errorf("marshal filter failed: %w", err)
622 }
623 var filterMap map[string]any
624 if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
625 return 0, fmt.Errorf("unmarshal filter failed: %w", err)
626 }
627 payload = append(payload, filterMap)
628 }
629
630 resp, err := c.sendRequest(ctx, "COUNT", payload)
631 if err != nil {
632 return 0, err
633 }
634
635 // Parse COUNT response: ["COUNT", "<sub_id>", {"count": N}]
636 if resp.Type != "COUNT" || len(resp.Payload) < 3 {
637 return 0, fmt.Errorf("unexpected response type: %s", resp.Type)
638 }
639
640 countData, ok := resp.Payload[2].(map[string]any)
641 if !ok {
642 return 0, fmt.Errorf("invalid count response")
643 }
644
645 count, ok := countData["count"].(float64)
646 if !ok {
647 return 0, fmt.Errorf("missing count field")
648 }
649
650 return int64(count), nil
651 }
652
653 // RelayURL returns a pseudo-URL for this NRC connection.
654 func (c *Client) RelayURL() string {
655 return "nrc://" + string(hex.Enc(c.uri.RelayPubkey))
656 }
657
658 // RequestIDs sends an IDS request to get event manifests for diffing.
659 func (c *Client) RequestIDs(ctx context.Context, subID string, filters ...*filter.F) ([]EventManifestEntry, error) {
660 // Build payload: ["IDS", "<sub_id>", filter1, filter2, ...]
661 payload := []any{"IDS", subID}
662 for _, f := range filters {
663 filterBytes, err := json.Marshal(f)
664 if err != nil {
665 return nil, fmt.Errorf("marshal filter failed: %w", err)
666 }
667 var filterMap map[string]any
668 if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
669 return nil, fmt.Errorf("unmarshal filter failed: %w", err)
670 }
671 payload = append(payload, filterMap)
672 }
673
674 resp, err := c.sendRequest(ctx, "IDS", payload)
675 if err != nil {
676 return nil, err
677 }
678
679 // Parse IDS response: ["IDS", "<sub_id>", [...manifest...]]
680 if resp.Type != "IDS" || len(resp.Payload) < 3 {
681 return nil, fmt.Errorf("unexpected response type: %s", resp.Type)
682 }
683
684 // Parse manifest entries
685 manifestData, ok := resp.Payload[2].([]any)
686 if !ok {
687 return nil, fmt.Errorf("invalid manifest response")
688 }
689
690 var manifest []EventManifestEntry
691 for _, item := range manifestData {
692 entryMap, ok := item.(map[string]any)
693 if !ok {
694 continue
695 }
696
697 entry := EventManifestEntry{}
698 if k, ok := entryMap["kind"].(float64); ok {
699 entry.Kind = int(k)
700 }
701 if id, ok := entryMap["id"].(string); ok {
702 entry.ID = id
703 }
704 if ca, ok := entryMap["created_at"].(float64); ok {
705 entry.CreatedAt = int64(ca)
706 }
707 if d, ok := entryMap["d"].(string); ok {
708 entry.D = d
709 }
710 manifest = append(manifest, entry)
711 }
712
713 return manifest, nil
714 }
715