client.go raw

   1  package nrc
   2  
   3  import (
   4  	"context"
   5  	"encoding/base64"
   6  	"encoding/json"
   7  	"fmt"
   8  	"sync"
   9  	"time"
  10  
  11  	"next.orly.dev/pkg/nostr/crypto/encryption"
  12  	"next.orly.dev/pkg/nostr/encoders/event"
  13  	"next.orly.dev/pkg/nostr/encoders/filter"
  14  	"next.orly.dev/pkg/nostr/encoders/hex"
  15  	"next.orly.dev/pkg/nostr/encoders/kind"
  16  	"next.orly.dev/pkg/nostr/encoders/tag"
  17  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  18  	"next.orly.dev/pkg/nostr/interfaces/signer"
  19  	"next.orly.dev/pkg/nostr/ws"
  20  	"github.com/google/uuid"
  21  	"next.orly.dev/pkg/lol/chk"
  22  	"next.orly.dev/pkg/lol/log"
  23  )
  24  
  25  // chunkBuffer holds chunks for a message being reassembled.
  26  type chunkBuffer struct {
  27  	chunks     map[int]string
  28  	total      int
  29  	receivedAt time.Time
  30  }
  31  
  32  // Client connects to a private relay through the NRC tunnel.
  33  type Client struct {
  34  	uri             *ConnectionURI
  35  	sessionID       string
  36  	rendezvousConn  *ws.Client
  37  	responseSub     *ws.Subscription
  38  	conversationKey []byte
  39  	clientSigner    signer.I
  40  
  41  	// pending maps request event IDs to response channels.
  42  	pending   map[string]chan *ResponseMessage
  43  	pendingMu sync.Mutex
  44  
  45  	// subscriptions maps subscription IDs to event channels.
  46  	subscriptions   map[string]chan *event.E
  47  	subscriptionsMu sync.Mutex
  48  
  49  	// chunkBuffers holds partially received chunked messages.
  50  	chunkBuffers   map[string]*chunkBuffer
  51  	chunkBuffersMu sync.Mutex
  52  
  53  	ctx    context.Context
  54  	cancel context.CancelFunc
  55  }
  56  
  57  // NewClient creates a new NRC client from a connection URI.
  58  func NewClient(connectionURI string) (*Client, error) {
  59  	uri, err := ParseConnectionURI(connectionURI)
  60  	if err != nil {
  61  		return nil, fmt.Errorf("invalid URI: %w", err)
  62  	}
  63  
  64  	if uri.AuthMode != AuthModeSecret {
  65  		return nil, fmt.Errorf("CAT authentication not yet supported in client")
  66  	}
  67  
  68  	ctx, cancel := context.WithCancel(context.Background())
  69  	return &Client{
  70  		uri:             uri,
  71  		sessionID:       uuid.New().String(),
  72  		conversationKey: uri.GetConversationKey(),
  73  		clientSigner:    uri.GetClientSigner(),
  74  		pending:         make(map[string]chan *ResponseMessage),
  75  		subscriptions:   make(map[string]chan *event.E),
  76  		chunkBuffers:    make(map[string]*chunkBuffer),
  77  		ctx:             ctx,
  78  		cancel:          cancel,
  79  	}, nil
  80  }
  81  
  82  // Connect establishes the connection to the rendezvous relay.
  83  func (c *Client) Connect(ctx context.Context) error {
  84  	// Connect to rendezvous relay
  85  	conn, err := ws.RelayConnect(ctx, c.uri.RendezvousRelay)
  86  	if chk.E(err) {
  87  		return fmt.Errorf("%w: %v", ErrRendezvousConnectionFailed, err)
  88  	}
  89  	c.rendezvousConn = conn
  90  
  91  	// Subscribe to response events
  92  	clientPubkeyHex := hex.Enc(c.clientSigner.Pub())
  93  	sub, err := conn.Subscribe(
  94  		ctx,
  95  		filter.NewS(&filter.F{
  96  			Kinds: kind.NewS(kind.New(KindNRCResponse)),
  97  			Tags: tag.NewS(
  98  				tag.NewFromAny("p", clientPubkeyHex),
  99  			),
 100  			Since: &timestamp.T{V: time.Now().Unix()},
 101  		}),
 102  	)
 103  	if chk.E(err) {
 104  		conn.Close()
 105  		return fmt.Errorf("subscription failed: %w", err)
 106  	}
 107  	c.responseSub = sub
 108  
 109  	// Start response handler
 110  	go c.handleResponses()
 111  
 112  	log.I.F("NRC client connected to %s via %s",
 113  		hex.Enc(c.uri.RelayPubkey), c.uri.RendezvousRelay)
 114  
 115  	return nil
 116  }
 117  
 118  // Close closes the client connection.
 119  func (c *Client) Close() {
 120  	c.cancel()
 121  	if c.responseSub != nil {
 122  		c.responseSub.Unsub()
 123  	}
 124  	if c.rendezvousConn != nil {
 125  		c.rendezvousConn.Close()
 126  	}
 127  
 128  	// Close all pending channels
 129  	c.pendingMu.Lock()
 130  	for _, ch := range c.pending {
 131  		close(ch)
 132  	}
 133  	c.pending = make(map[string]chan *ResponseMessage)
 134  	c.pendingMu.Unlock()
 135  
 136  	// Close all subscription channels
 137  	c.subscriptionsMu.Lock()
 138  	for _, ch := range c.subscriptions {
 139  		close(ch)
 140  	}
 141  	c.subscriptions = make(map[string]chan *event.E)
 142  	c.subscriptionsMu.Unlock()
 143  
 144  	// Clear chunk buffers
 145  	c.chunkBuffersMu.Lock()
 146  	c.chunkBuffers = make(map[string]*chunkBuffer)
 147  	c.chunkBuffersMu.Unlock()
 148  }
 149  
 150  // handleResponses processes incoming NRC response events.
 151  func (c *Client) handleResponses() {
 152  	for {
 153  		select {
 154  		case <-c.ctx.Done():
 155  			return
 156  		case ev := <-c.responseSub.Events:
 157  			if ev == nil {
 158  				return
 159  			}
 160  			c.processResponse(ev)
 161  		}
 162  	}
 163  }
 164  
 165  // processResponse decrypts and routes a response event.
 166  func (c *Client) processResponse(ev *event.E) {
 167  	// Decrypt content
 168  	decrypted, err := encryption.Decrypt(c.conversationKey, string(ev.Content))
 169  	if err != nil {
 170  		log.W.F("NRC response decryption failed: %v", err)
 171  		return
 172  	}
 173  
 174  	// Parse response
 175  	var resp struct {
 176  		Type    string `json:"type"`
 177  		Payload []any  `json:"payload"`
 178  	}
 179  	if err := json.Unmarshal([]byte(decrypted), &resp); err != nil {
 180  		log.W.F("NRC response parse failed: %v", err)
 181  		return
 182  	}
 183  
 184  	// Extract request event ID for routing
 185  	var requestEventID string
 186  	eTag := ev.Tags.GetFirst([]byte("e"))
 187  	if eTag != nil && eTag.Len() >= 2 {
 188  		requestEventID = string(eTag.ValueHex())
 189  	}
 190  
 191  	// Route based on response type
 192  	switch resp.Type {
 193  	case "EVENT":
 194  		c.handleEventResponse(resp.Payload)
 195  	case "EOSE":
 196  		c.handleEOSEResponse(resp.Payload, requestEventID)
 197  	case "OK":
 198  		c.handleOKResponse(resp.Payload, requestEventID)
 199  	case "NOTICE":
 200  		c.handleNoticeResponse(resp.Payload)
 201  	case "CLOSED":
 202  		c.handleClosedResponse(resp.Payload)
 203  	case "COUNT":
 204  		c.handleCountResponse(resp.Payload, requestEventID)
 205  	case "AUTH":
 206  		c.handleAuthResponse(resp.Payload, requestEventID)
 207  	case "IDS":
 208  		c.handleIDSResponse(resp.Payload, requestEventID)
 209  	case "CHUNK":
 210  		c.handleChunkResponse(resp.Payload, requestEventID)
 211  	}
 212  }
 213  
 214  // handleEventResponse routes an EVENT to the appropriate subscription.
 215  func (c *Client) handleEventResponse(payload []any) {
 216  	if len(payload) < 3 {
 217  		return
 218  	}
 219  	// Payload: ["EVENT", "<sub_id>", {...event...}]
 220  	subID, ok := payload[1].(string)
 221  	if !ok {
 222  		return
 223  	}
 224  
 225  	c.subscriptionsMu.Lock()
 226  	ch, exists := c.subscriptions[subID]
 227  	c.subscriptionsMu.Unlock()
 228  
 229  	if !exists {
 230  		return
 231  	}
 232  
 233  	// Parse event from payload
 234  	eventData, ok := payload[2].(map[string]any)
 235  	if !ok {
 236  		return
 237  	}
 238  
 239  	eventBytes, err := json.Marshal(eventData)
 240  	if err != nil {
 241  		return
 242  	}
 243  
 244  	var ev event.E
 245  	if err := json.Unmarshal(eventBytes, &ev); err != nil {
 246  		return
 247  	}
 248  
 249  	select {
 250  	case ch <- &ev:
 251  	default:
 252  		// Channel full, drop event
 253  	}
 254  }
 255  
 256  // handleEOSEResponse handles an EOSE response.
 257  func (c *Client) handleEOSEResponse(payload []any, requestEventID string) {
 258  	// Route to pending request
 259  	c.pendingMu.Lock()
 260  	ch, exists := c.pending[requestEventID]
 261  	c.pendingMu.Unlock()
 262  
 263  	if exists {
 264  		resp := &ResponseMessage{Type: "EOSE", Payload: payload}
 265  		select {
 266  		case ch <- resp:
 267  		default:
 268  		}
 269  	}
 270  }
 271  
 272  // handleOKResponse handles an OK response.
 273  func (c *Client) handleOKResponse(payload []any, requestEventID string) {
 274  	c.pendingMu.Lock()
 275  	ch, exists := c.pending[requestEventID]
 276  	c.pendingMu.Unlock()
 277  
 278  	if exists {
 279  		resp := &ResponseMessage{Type: "OK", Payload: payload}
 280  		select {
 281  		case ch <- resp:
 282  		default:
 283  		}
 284  	}
 285  }
 286  
 287  // handleNoticeResponse logs a NOTICE.
 288  func (c *Client) handleNoticeResponse(payload []any) {
 289  	if len(payload) >= 2 {
 290  		if msg, ok := payload[1].(string); ok {
 291  			log.W.F("NRC NOTICE: %s", msg)
 292  		}
 293  	}
 294  }
 295  
 296  // handleClosedResponse handles a subscription close.
 297  func (c *Client) handleClosedResponse(payload []any) {
 298  	if len(payload) >= 2 {
 299  		if subID, ok := payload[1].(string); ok {
 300  			c.subscriptionsMu.Lock()
 301  			if ch, exists := c.subscriptions[subID]; exists {
 302  				close(ch)
 303  				delete(c.subscriptions, subID)
 304  			}
 305  			c.subscriptionsMu.Unlock()
 306  		}
 307  	}
 308  }
 309  
 310  // handleCountResponse handles a COUNT response.
 311  func (c *Client) handleCountResponse(payload []any, requestEventID string) {
 312  	c.pendingMu.Lock()
 313  	ch, exists := c.pending[requestEventID]
 314  	c.pendingMu.Unlock()
 315  
 316  	if exists {
 317  		resp := &ResponseMessage{Type: "COUNT", Payload: payload}
 318  		select {
 319  		case ch <- resp:
 320  		default:
 321  		}
 322  	}
 323  }
 324  
 325  // handleAuthResponse handles an AUTH challenge.
 326  func (c *Client) handleAuthResponse(payload []any, requestEventID string) {
 327  	c.pendingMu.Lock()
 328  	ch, exists := c.pending[requestEventID]
 329  	c.pendingMu.Unlock()
 330  
 331  	if exists {
 332  		resp := &ResponseMessage{Type: "AUTH", Payload: payload}
 333  		select {
 334  		case ch <- resp:
 335  		default:
 336  		}
 337  	}
 338  }
 339  
 340  // handleIDSResponse handles an IDS response.
 341  func (c *Client) handleIDSResponse(payload []any, requestEventID string) {
 342  	c.pendingMu.Lock()
 343  	ch, exists := c.pending[requestEventID]
 344  	c.pendingMu.Unlock()
 345  
 346  	if exists {
 347  		resp := &ResponseMessage{Type: "IDS", Payload: payload}
 348  		select {
 349  		case ch <- resp:
 350  		default:
 351  		}
 352  	}
 353  }
 354  
 355  // handleChunkResponse handles a CHUNK response and reassembles the message.
 356  func (c *Client) handleChunkResponse(payload []any, requestEventID string) {
 357  	if len(payload) < 1 {
 358  		return
 359  	}
 360  
 361  	// Parse chunk message from payload
 362  	chunkData, ok := payload[0].(map[string]any)
 363  	if !ok {
 364  		log.W.F("NRC: invalid chunk payload format")
 365  		return
 366  	}
 367  
 368  	messageID, _ := chunkData["messageId"].(string)
 369  	indexFloat, _ := chunkData["index"].(float64)
 370  	totalFloat, _ := chunkData["total"].(float64)
 371  	data, _ := chunkData["data"].(string)
 372  
 373  	if messageID == "" || data == "" {
 374  		log.W.F("NRC: chunk missing required fields")
 375  		return
 376  	}
 377  
 378  	index := int(indexFloat)
 379  	total := int(totalFloat)
 380  
 381  	c.chunkBuffersMu.Lock()
 382  	defer c.chunkBuffersMu.Unlock()
 383  
 384  	// Get or create buffer for this message
 385  	buf, exists := c.chunkBuffers[messageID]
 386  	if !exists {
 387  		buf = &chunkBuffer{
 388  			chunks:     make(map[int]string),
 389  			total:      total,
 390  			receivedAt: time.Now(),
 391  		}
 392  		c.chunkBuffers[messageID] = buf
 393  	}
 394  
 395  	// Store the chunk
 396  	buf.chunks[index] = data
 397  	log.D.F("NRC: received chunk %d/%d for message %s", index+1, total, messageID[:8])
 398  
 399  	// Check if we have all chunks
 400  	if len(buf.chunks) == buf.total {
 401  		// Reassemble the message
 402  		var encoded string
 403  		for i := 0; i < buf.total; i++ {
 404  			part, ok := buf.chunks[i]
 405  			if !ok {
 406  				log.W.F("NRC: missing chunk %d for message %s", i, messageID)
 407  				delete(c.chunkBuffers, messageID)
 408  				return
 409  			}
 410  			encoded += part
 411  		}
 412  
 413  		// Decode from base64
 414  		decoded, err := base64.StdEncoding.DecodeString(encoded)
 415  		if err != nil {
 416  			log.W.F("NRC: failed to decode chunked message: %v", err)
 417  			delete(c.chunkBuffers, messageID)
 418  			return
 419  		}
 420  
 421  		// Parse the reassembled response
 422  		var resp struct {
 423  			Type    string `json:"type"`
 424  			Payload []any  `json:"payload"`
 425  		}
 426  		if err := json.Unmarshal(decoded, &resp); err != nil {
 427  			log.W.F("NRC: failed to parse reassembled message: %v", err)
 428  			delete(c.chunkBuffers, messageID)
 429  			return
 430  		}
 431  
 432  		log.D.F("NRC: reassembled chunked message: %s", resp.Type)
 433  
 434  		// Clean up buffer
 435  		delete(c.chunkBuffers, messageID)
 436  
 437  		// Route the reassembled response
 438  		c.pendingMu.Lock()
 439  		ch, exists := c.pending[requestEventID]
 440  		c.pendingMu.Unlock()
 441  
 442  		if exists {
 443  			respMsg := &ResponseMessage{Type: resp.Type, Payload: resp.Payload}
 444  			select {
 445  			case ch <- respMsg:
 446  			default:
 447  			}
 448  		}
 449  	}
 450  
 451  	// Clean up stale buffers (older than 60 seconds)
 452  	now := time.Now()
 453  	for id, b := range c.chunkBuffers {
 454  		if now.Sub(b.receivedAt) > 60*time.Second {
 455  			log.W.F("NRC: discarding stale chunk buffer: %s", id)
 456  			delete(c.chunkBuffers, id)
 457  		}
 458  	}
 459  }
 460  
 461  // sendRequest sends an NRC request and waits for response.
 462  func (c *Client) sendRequest(ctx context.Context, msgType string, payload []any) (*ResponseMessage, error) {
 463  	// Build request content
 464  	reqContent := struct {
 465  		Type    string `json:"type"`
 466  		Payload []any  `json:"payload"`
 467  	}{
 468  		Type:    msgType,
 469  		Payload: payload,
 470  	}
 471  
 472  	contentBytes, err := json.Marshal(reqContent)
 473  	if err != nil {
 474  		return nil, fmt.Errorf("marshal failed: %w", err)
 475  	}
 476  
 477  	// Encrypt content
 478  	encrypted, err := encryption.Encrypt(c.conversationKey, contentBytes, nil)
 479  	if err != nil {
 480  		return nil, fmt.Errorf("%w: %v", ErrEncryptionFailed, err)
 481  	}
 482  
 483  	// Build request event
 484  	reqEvent := &event.E{
 485  		Content:   []byte(encrypted),
 486  		CreatedAt: time.Now().Unix(),
 487  		Kind:      KindNRCRequest,
 488  		Tags: tag.NewS(
 489  			tag.NewFromAny("p", hex.Enc(c.uri.RelayPubkey)),
 490  			tag.NewFromAny("encryption", "nip44_v2"),
 491  			tag.NewFromAny("session", c.sessionID),
 492  		),
 493  	}
 494  
 495  	// Sign with client key
 496  	if err := reqEvent.Sign(c.clientSigner); chk.E(err) {
 497  		return nil, fmt.Errorf("signing failed: %w", err)
 498  	}
 499  
 500  	// Set up response channel
 501  	responseCh := make(chan *ResponseMessage, 1)
 502  	requestEventID := string(hex.Enc(reqEvent.ID[:]))
 503  
 504  	c.pendingMu.Lock()
 505  	c.pending[requestEventID] = responseCh
 506  	c.pendingMu.Unlock()
 507  
 508  	defer func() {
 509  		c.pendingMu.Lock()
 510  		delete(c.pending, requestEventID)
 511  		c.pendingMu.Unlock()
 512  	}()
 513  
 514  	// Publish request
 515  	if err := c.rendezvousConn.Publish(ctx, reqEvent); chk.E(err) {
 516  		return nil, fmt.Errorf("publish failed: %w", err)
 517  	}
 518  
 519  	// Wait for response
 520  	select {
 521  	case <-ctx.Done():
 522  		return nil, ctx.Err()
 523  	case resp := <-responseCh:
 524  		if resp == nil {
 525  			return nil, fmt.Errorf("response channel closed")
 526  		}
 527  		return resp, nil
 528  	}
 529  }
 530  
 531  // Publish publishes an event to the private relay.
 532  func (c *Client) Publish(ctx context.Context, ev *event.E) (bool, string, error) {
 533  	// Convert event to JSON for payload
 534  	eventBytes, err := json.Marshal(ev)
 535  	if err != nil {
 536  		return false, "", fmt.Errorf("marshal event failed: %w", err)
 537  	}
 538  
 539  	var eventMap map[string]any
 540  	if err := json.Unmarshal(eventBytes, &eventMap); err != nil {
 541  		return false, "", fmt.Errorf("unmarshal event failed: %w", err)
 542  	}
 543  
 544  	payload := []any{"EVENT", eventMap}
 545  
 546  	resp, err := c.sendRequest(ctx, "EVENT", payload)
 547  	if err != nil {
 548  		return false, "", err
 549  	}
 550  
 551  	// Parse OK response: ["OK", "<event_id>", <success>, "<message>"]
 552  	if resp.Type != "OK" || len(resp.Payload) < 4 {
 553  		return false, "", fmt.Errorf("unexpected response type: %s", resp.Type)
 554  	}
 555  
 556  	success, _ := resp.Payload[2].(bool)
 557  	message, _ := resp.Payload[3].(string)
 558  
 559  	return success, message, nil
 560  }
 561  
 562  // Subscribe creates a subscription to the private relay.
 563  func (c *Client) Subscribe(ctx context.Context, subID string, filters ...*filter.F) (<-chan *event.E, error) {
 564  	// Build payload: ["REQ", "<sub_id>", filter1, filter2, ...]
 565  	payload := []any{"REQ", subID}
 566  	for _, f := range filters {
 567  		filterBytes, err := json.Marshal(f)
 568  		if err != nil {
 569  			return nil, fmt.Errorf("marshal filter failed: %w", err)
 570  		}
 571  		var filterMap map[string]any
 572  		if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
 573  			return nil, fmt.Errorf("unmarshal filter failed: %w", err)
 574  		}
 575  		payload = append(payload, filterMap)
 576  	}
 577  
 578  	// Create event channel for this subscription
 579  	eventCh := make(chan *event.E, 100)
 580  
 581  	c.subscriptionsMu.Lock()
 582  	c.subscriptions[subID] = eventCh
 583  	c.subscriptionsMu.Unlock()
 584  
 585  	// Send request (don't wait for EOSE, events will come asynchronously)
 586  	go func() {
 587  		reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
 588  		defer cancel()
 589  		_, err := c.sendRequest(reqCtx, "REQ", payload)
 590  		if err != nil {
 591  			log.W.F("NRC subscribe failed: %v", err)
 592  		}
 593  	}()
 594  
 595  	return eventCh, nil
 596  }
 597  
 598  // Unsubscribe closes a subscription.
 599  func (c *Client) Unsubscribe(ctx context.Context, subID string) error {
 600  	// Remove from local tracking
 601  	c.subscriptionsMu.Lock()
 602  	if ch, exists := c.subscriptions[subID]; exists {
 603  		close(ch)
 604  		delete(c.subscriptions, subID)
 605  	}
 606  	c.subscriptionsMu.Unlock()
 607  
 608  	// Send CLOSE to relay
 609  	payload := []any{"CLOSE", subID}
 610  	_, err := c.sendRequest(ctx, "CLOSE", payload)
 611  	return err
 612  }
 613  
 614  // Count sends a COUNT request to the private relay.
 615  func (c *Client) Count(ctx context.Context, subID string, filters ...*filter.F) (int64, error) {
 616  	// Build payload: ["COUNT", "<sub_id>", filter1, filter2, ...]
 617  	payload := []any{"COUNT", subID}
 618  	for _, f := range filters {
 619  		filterBytes, err := json.Marshal(f)
 620  		if err != nil {
 621  			return 0, fmt.Errorf("marshal filter failed: %w", err)
 622  		}
 623  		var filterMap map[string]any
 624  		if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
 625  			return 0, fmt.Errorf("unmarshal filter failed: %w", err)
 626  		}
 627  		payload = append(payload, filterMap)
 628  	}
 629  
 630  	resp, err := c.sendRequest(ctx, "COUNT", payload)
 631  	if err != nil {
 632  		return 0, err
 633  	}
 634  
 635  	// Parse COUNT response: ["COUNT", "<sub_id>", {"count": N}]
 636  	if resp.Type != "COUNT" || len(resp.Payload) < 3 {
 637  		return 0, fmt.Errorf("unexpected response type: %s", resp.Type)
 638  	}
 639  
 640  	countData, ok := resp.Payload[2].(map[string]any)
 641  	if !ok {
 642  		return 0, fmt.Errorf("invalid count response")
 643  	}
 644  
 645  	count, ok := countData["count"].(float64)
 646  	if !ok {
 647  		return 0, fmt.Errorf("missing count field")
 648  	}
 649  
 650  	return int64(count), nil
 651  }
 652  
 653  // RelayURL returns a pseudo-URL for this NRC connection.
 654  func (c *Client) RelayURL() string {
 655  	return "nrc://" + string(hex.Enc(c.uri.RelayPubkey))
 656  }
 657  
 658  // RequestIDs sends an IDS request to get event manifests for diffing.
 659  func (c *Client) RequestIDs(ctx context.Context, subID string, filters ...*filter.F) ([]EventManifestEntry, error) {
 660  	// Build payload: ["IDS", "<sub_id>", filter1, filter2, ...]
 661  	payload := []any{"IDS", subID}
 662  	for _, f := range filters {
 663  		filterBytes, err := json.Marshal(f)
 664  		if err != nil {
 665  			return nil, fmt.Errorf("marshal filter failed: %w", err)
 666  		}
 667  		var filterMap map[string]any
 668  		if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
 669  			return nil, fmt.Errorf("unmarshal filter failed: %w", err)
 670  		}
 671  		payload = append(payload, filterMap)
 672  	}
 673  
 674  	resp, err := c.sendRequest(ctx, "IDS", payload)
 675  	if err != nil {
 676  		return nil, err
 677  	}
 678  
 679  	// Parse IDS response: ["IDS", "<sub_id>", [...manifest...]]
 680  	if resp.Type != "IDS" || len(resp.Payload) < 3 {
 681  		return nil, fmt.Errorf("unexpected response type: %s", resp.Type)
 682  	}
 683  
 684  	// Parse manifest entries
 685  	manifestData, ok := resp.Payload[2].([]any)
 686  	if !ok {
 687  		return nil, fmt.Errorf("invalid manifest response")
 688  	}
 689  
 690  	var manifest []EventManifestEntry
 691  	for _, item := range manifestData {
 692  		entryMap, ok := item.(map[string]any)
 693  		if !ok {
 694  			continue
 695  		}
 696  
 697  		entry := EventManifestEntry{}
 698  		if k, ok := entryMap["kind"].(float64); ok {
 699  			entry.Kind = int(k)
 700  		}
 701  		if id, ok := entryMap["id"].(string); ok {
 702  			entry.ID = id
 703  		}
 704  		if ca, ok := entryMap["created_at"].(float64); ok {
 705  			entry.CreatedAt = int64(ca)
 706  		}
 707  		if d, ok := entryMap["d"].(string); ok {
 708  			entry.D = d
 709  		}
 710  		manifest = append(manifest, entry)
 711  	}
 712  
 713  	return manifest, nil
 714  }
 715