handle-negentropy.go raw

   1  package app
   2  
   3  import (
   4  	"bytes"
   5  	"context"
   6  	"encoding/hex"
   7  	"encoding/json"
   8  	"fmt"
   9  
  10  	"next.orly.dev/pkg/lol/chk"
  11  	"next.orly.dev/pkg/lol/log"
  12  
  13  	"next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
  14  	"next.orly.dev/pkg/nostr/encoders/filter"
  15  	hexenc "next.orly.dev/pkg/nostr/encoders/hex"
  16  	"next.orly.dev/pkg/nostr/encoders/kind"
  17  	"next.orly.dev/pkg/nostr/encoders/tag"
  18  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  19  	negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
  20  	"next.orly.dev/pkg/policy"
  21  	commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
  22  )
  23  
  24  // NIP-77 Negentropy envelope constants
  25  const (
  26  	NegOpenLabel  = "NEG-OPEN"
  27  	NegMsgLabel   = "NEG-MSG"
  28  	NegCloseLabel = "NEG-CLOSE"
  29  	NegErrLabel   = "NEG-ERR"
  30  )
  31  
  32  // negentropyHandler handles NIP-77 negentropy operations
  33  // This can be either a gRPC client or an embedded handler
  34  var negentropyHandler negentropyiface.Handler
  35  
  36  // SetNegentropyHandler sets the negentropy handler for NIP-77 WebSocket handling
  37  func SetNegentropyHandler(handler negentropyiface.Handler) {
  38  	negentropyHandler = handler
  39  }
  40  
  41  // IsNegentropyEnvelope checks if a message starts with a NEG-* envelope type
  42  func IsNegentropyEnvelope(msg []byte) bool {
  43  	// Quick check: must start with '["NEG-'
  44  	if len(msg) < 8 {
  45  		return false
  46  	}
  47  	return bytes.HasPrefix(msg, []byte(`["NEG-`))
  48  }
  49  
  50  // IdentifyNegentropyEnvelope extracts the envelope type and remaining payload
  51  func IdentifyNegentropyEnvelope(msg []byte) (envelopeType string, ok bool) {
  52  	// Parse enough to get the envelope type
  53  	if !IsNegentropyEnvelope(msg) {
  54  		return "", false
  55  	}
  56  
  57  	// Find the first comma after the opening label
  58  	end := bytes.IndexByte(msg[2:], '"')
  59  	if end < 0 {
  60  		return "", false
  61  	}
  62  	envelopeType = string(msg[2 : 2+end])
  63  	return envelopeType, true
  64  }
  65  
  66  // HandleNegOpen processes NEG-OPEN messages
  67  // Format: ["NEG-OPEN", subscription_id, filter, initial_message?]
  68  func (l *Listener) HandleNegOpen(msg []byte) error {
  69  	log.D.F("HandleNegOpen called from %s", l.connectionID)
  70  	if negentropyHandler == nil {
  71  		log.E.F("negentropy handler not initialized — client sent NEG-OPEN but NIP-77 is not enabled (check ORLY_NEGENTROPY_ENABLED and startup logs)")
  72  		return l.sendNegErr("", "negentropy not enabled on this relay")
  73  	}
  74  
  75  	// Parse the message array
  76  	var parts []json.RawMessage
  77  	if err := json.Unmarshal(msg, &parts); err != nil {
  78  		return l.sendNegErr("", fmt.Sprintf("invalid NEG-OPEN format: %v", err))
  79  	}
  80  
  81  	if len(parts) < 3 {
  82  		return l.sendNegErr("", "NEG-OPEN requires at least 3 elements")
  83  	}
  84  
  85  	// Extract subscription ID
  86  	var subscriptionID string
  87  	if err := json.Unmarshal(parts[1], &subscriptionID); err != nil {
  88  		return l.sendNegErr("", fmt.Sprintf("invalid subscription_id: %v", err))
  89  	}
  90  
  91  	// Extract filter - use custom parsing because filter.F's kinds field
  92  	// doesn't support standard JSON array unmarshaling
  93  	f, err := parseNegentropyFilter(parts[2])
  94  	if err != nil {
  95  		return l.sendNegErr(subscriptionID, fmt.Sprintf("invalid filter: %v", err))
  96  	}
  97  
  98  	// Extract optional initial message (hex encoded per NIP-77)
  99  	var initialMessage []byte
 100  	if len(parts) >= 4 {
 101  		var msgStr string
 102  		if err := json.Unmarshal(parts[3], &msgStr); err == nil && msgStr != "" {
 103  			// NIP-77 uses hex encoding
 104  			if decoded, err := hex.DecodeString(msgStr); err == nil {
 105  				initialMessage = decoded
 106  			} else {
 107  				log.W.F("NEG-OPEN: invalid hex message: %v", err)
 108  			}
 109  		}
 110  	}
 111  
 112  	// Convert filter to proto format
 113  	protoFilter := filterToProto(f)
 114  
 115  	// Call gRPC service
 116  	ctx := context.Background()
 117  	respMsg, haveIDs, needIDs, complete, errStr, err := negentropyHandler.HandleNegOpen(
 118  		ctx,
 119  		l.connectionID,
 120  		subscriptionID,
 121  		protoFilter,
 122  		initialMessage,
 123  	)
 124  	if err != nil {
 125  		log.E.F("NEG-OPEN gRPC error: %v", err)
 126  		return l.sendNegErr(subscriptionID, "internal error")
 127  	}
 128  
 129  	if errStr != "" {
 130  		return l.sendNegErr(subscriptionID, errStr)
 131  	}
 132  
 133  	// Log need_ids (events client should send us)
 134  	if len(needIDs) > 0 {
 135  		log.D.F("NEG-OPEN: relay needs %d events from client", len(needIDs))
 136  	}
 137  
 138  	// Send NEG-MSG response FIRST (before events)
 139  	if err := l.sendNegMsg(subscriptionID, respMsg); err != nil {
 140  		return err
 141  	}
 142  
 143  	// If reconciliation is complete, send events we have that client needs.
 144  	// Per NIP-77: The haves/needs are only final when reconcile returns complete=true.
 145  	if complete {
 146  		log.D.F("NEG-OPEN: reconciliation complete for %s, sending %d events", subscriptionID, len(haveIDs))
 147  		if len(haveIDs) > 0 {
 148  			if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
 149  				log.E.F("failed to send events for NEG-OPEN: %v", err)
 150  			}
 151  		}
 152  	}
 153  
 154  	return nil
 155  }
 156  
 157  // HandleNegMsg processes NEG-MSG messages
 158  // Format: ["NEG-MSG", subscription_id, message]
 159  func (l *Listener) HandleNegMsg(msg []byte) error {
 160  	if negentropyHandler == nil {
 161  		return l.sendNegErr("", "negentropy not enabled")
 162  	}
 163  
 164  	// Parse the message array
 165  	var parts []json.RawMessage
 166  	if err := json.Unmarshal(msg, &parts); err != nil {
 167  		return l.sendNegErr("", fmt.Sprintf("invalid NEG-MSG format: %v", err))
 168  	}
 169  
 170  	if len(parts) < 3 {
 171  		return l.sendNegErr("", "NEG-MSG requires 3 elements")
 172  	}
 173  
 174  	// Extract subscription ID
 175  	var subscriptionID string
 176  	if err := json.Unmarshal(parts[1], &subscriptionID); err != nil {
 177  		return l.sendNegErr("", fmt.Sprintf("invalid subscription_id: %v", err))
 178  	}
 179  
 180  	// Extract message (hex or base64 encoded)
 181  	var msgStr string
 182  	if err := json.Unmarshal(parts[2], &msgStr); err != nil {
 183  		return l.sendNegErr(subscriptionID, fmt.Sprintf("invalid message: %v", err))
 184  	}
 185  
 186  	// Decode message (NIP-77 uses hex encoding)
 187  	negentropyMsg, err := hex.DecodeString(msgStr)
 188  	if err != nil {
 189  		return l.sendNegErr(subscriptionID, fmt.Sprintf("invalid hex message: %v", err))
 190  	}
 191  
 192  	// Call gRPC service
 193  	ctx := context.Background()
 194  	respMsg, haveIDs, needIDs, complete, errStr, err := negentropyHandler.HandleNegMsg(
 195  		ctx,
 196  		l.connectionID,
 197  		subscriptionID,
 198  		negentropyMsg,
 199  	)
 200  	if err != nil {
 201  		log.E.F("NEG-MSG gRPC error: %v", err)
 202  		return l.sendNegErr(subscriptionID, "internal error")
 203  	}
 204  
 205  	if errStr != "" {
 206  		return l.sendNegErr(subscriptionID, errStr)
 207  	}
 208  
 209  	// Log need_ids (events client should send us)
 210  	if len(needIDs) > 0 {
 211  		log.D.F("NEG-MSG: relay needs %d events from client", len(needIDs))
 212  	}
 213  
 214  	// Send NEG-MSG response FIRST (before events)
 215  	if err := l.sendNegMsg(subscriptionID, respMsg); err != nil {
 216  		return err
 217  	}
 218  
 219  	// If reconciliation is complete, send events we have that client needs.
 220  	// Per NIP-77: The haves/needs are only final when reconcile returns complete=true.
 221  	if complete {
 222  		log.D.F("NEG-MSG: reconciliation complete for %s, sending %d events", subscriptionID, len(haveIDs))
 223  		if len(haveIDs) > 0 {
 224  			if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
 225  				log.E.F("failed to send events for NEG-MSG: %v", err)
 226  			}
 227  		}
 228  	}
 229  
 230  	return nil
 231  }
 232  
 233  // HandleNegClose processes NEG-CLOSE messages
 234  // Format: ["NEG-CLOSE", subscription_id]
 235  func (l *Listener) HandleNegClose(msg []byte) error {
 236  	if negentropyHandler == nil {
 237  		return nil // Silently ignore if not enabled
 238  	}
 239  
 240  	// Parse the message array
 241  	var parts []json.RawMessage
 242  	if err := json.Unmarshal(msg, &parts); err != nil {
 243  		return nil // Silently ignore malformed close
 244  	}
 245  
 246  	if len(parts) < 2 {
 247  		return nil
 248  	}
 249  
 250  	// Extract subscription ID
 251  	var subscriptionID string
 252  	if err := json.Unmarshal(parts[1], &subscriptionID); err != nil {
 253  		return nil
 254  	}
 255  
 256  	// Call gRPC service to close the session
 257  	ctx := context.Background()
 258  	if err := negentropyHandler.HandleNegClose(ctx, l.connectionID, subscriptionID); err != nil {
 259  		log.E.F("NEG-CLOSE gRPC error: %v", err)
 260  	}
 261  
 262  	return nil
 263  }
 264  
 265  // sendNegMsg sends a NEG-MSG response to the client
 266  func (l *Listener) sendNegMsg(subscriptionID string, message []byte) error {
 267  	// Encode message as hex (per NIP-77)
 268  	encoded := ""
 269  	if len(message) > 0 {
 270  		encoded = hex.EncodeToString(message)
 271  	}
 272  
 273  	// Format: ["NEG-MSG", subscription_id, message]
 274  	resp, err := json.Marshal([]any{NegMsgLabel, subscriptionID, encoded})
 275  	if err != nil {
 276  		return err
 277  	}
 278  
 279  	_, err = l.Write(resp)
 280  	return err
 281  }
 282  
 283  // sendNegErr sends a NEG-ERR response to the client
 284  func (l *Listener) sendNegErr(subscriptionID, reason string) error {
 285  	// Format: ["NEG-ERR", subscription_id, reason]
 286  	resp, err := json.Marshal([]any{NegErrLabel, subscriptionID, reason})
 287  	if err != nil {
 288  		return err
 289  	}
 290  
 291  	_, err = l.Write(resp)
 292  	return err
 293  }
 294  
 295  // sendEventsForIDs fetches and sends events for the given IDs.
 296  // Auth-aware: unauthenticated clients get public events only; authenticated
 297  // clients get full delivery subject to privilege and channel membership checks.
 298  func (l *Listener) sendEventsForIDs(subscriptionID string, ids [][]byte) error {
 299  	if len(ids) == 0 {
 300  		return nil
 301  	}
 302  
 303  	log.D.F("NEG: sending %d events for subscription %s", len(ids), subscriptionID)
 304  
 305  	// Build filter with binary IDs (32 bytes each)
 306  	f := &filter.F{}
 307  	f.Ids = &tag.T{}
 308  	for _, id := range ids {
 309  		// IDs are binary (32 bytes full or 16 bytes truncated per NIP-77)
 310  		f.Ids.T = append(f.Ids.T, id)
 311  	}
 312  
 313  	// Query events by IDs
 314  	ctx := l.ctx
 315  	events, err := l.Server.db.QueryEvents(ctx, f)
 316  	if err != nil {
 317  		log.E.F("NEG: failed to query events: %v", err)
 318  		return err
 319  	}
 320  
 321  	pk := l.authedPubkey.Load()
 322  	// Full sync only for whitelisted relay pubkeys; everyone else gets public only
 323  	isFullSync := len(pk) > 0 && l.negentropyFullSyncPubkeys[hexenc.Enc(pk)]
 324  
 325  	// Send each event via EVENT envelope with subscription ID
 326  	sent, skipped := 0, 0
 327  	for _, ev := range events {
 328  		if ev == nil {
 329  			continue
 330  		}
 331  
 332  		// --- Auth-aware filtering ---
 333  
 334  		// Privileged events (DMs, gift wraps, channel kinds, etc.)
 335  		if kind.IsPrivileged(ev.Kind) {
 336  			if !isFullSync {
 337  				skipped++
 338  				continue
 339  			}
 340  
 341  			// Channel kinds: check membership
 342  			if kind.IsChannelKind(ev.Kind) {
 343  				// Discoverable kinds (40, 41) are always allowed for full-sync peers
 344  				if !kind.IsDiscoverableChannelKind(ev.Kind) {
 345  					if l.channelMembership != nil {
 346  						if !l.channelMembership.IsChannelMember(ev, pk, ctx) {
 347  							skipped++
 348  							continue
 349  						}
 350  					}
 351  				}
 352  			} else {
 353  				// Non-channel privileged: only deliver to involved parties
 354  				if !policy.IsPartyInvolved(ev, pk) {
 355  					skipped++
 356  					continue
 357  				}
 358  			}
 359  		}
 360  
 361  		// Non-privileged events that reference channel events via e-tags
 362  		// (reactions, reposts, zaps, reports, deletions targeting channel messages)
 363  		if !kind.IsChannelKind(ev.Kind) && !kind.IsPrivileged(ev.Kind) && l.channelMembership != nil {
 364  			if channelIDHex, isChannel := l.channelMembership.ReferencesChannelEvent(ev, ctx); isChannel {
 365  				if !isFullSync || !l.channelMembership.IsChannelMemberByID(channelIDHex, ev.Kind, pk, ctx) {
 366  					log.D.F(
 367  						"NEG: delivery DENIED for channel-referencing event %s kind %d (not a member of channel %s)",
 368  						hexenc.Enc(ev.ID), ev.Kind, channelIDHex,
 369  					)
 370  					skipped++
 371  					continue
 372  				}
 373  			}
 374  		}
 375  
 376  		// Private tag check (matches publisher.go logic)
 377  		if ev.Tags != nil && ev.Tags.Len() > 0 {
 378  			var privatePubkey []byte
 379  			hasPrivate := false
 380  			for _, t := range *ev.Tags {
 381  				if t.Len() >= 2 {
 382  					keyBytes := t.Key()
 383  					if len(keyBytes) == 7 && string(keyBytes) == "private" {
 384  						hasPrivate = true
 385  						privatePubkey = t.Value()
 386  						break
 387  					}
 388  				}
 389  			}
 390  			if hasPrivate {
 391  				if !l.canSeePrivateEvent(pk, privatePubkey) {
 392  					skipped++
 393  					continue
 394  				}
 395  			}
 396  		}
 397  
 398  		// --- Passed all checks, send ---
 399  		res, err := eventenvelope.NewResultWith([]byte(subscriptionID), ev)
 400  		if err != nil {
 401  			log.W.F("NEG: failed to create event envelope: %v", err)
 402  			continue
 403  		}
 404  		if err := res.Write(l); err != nil {
 405  			log.W.F("NEG: failed to send event: %v", err)
 406  			continue
 407  		}
 408  		sent++
 409  	}
 410  
 411  	log.D.F("NEG: sent %d/%d events (skipped %d auth-gated) for subscription %s",
 412  		sent, len(ids), skipped, subscriptionID)
 413  	return nil
 414  }
 415  
 416  // filterToProto converts a nostr filter to proto format
 417  func filterToProto(f *filter.F) *commonv1.Filter {
 418  	if f == nil {
 419  		return nil
 420  	}
 421  
 422  	pf := &commonv1.Filter{}
 423  
 424  	// Convert Ids
 425  	if f.Ids != nil {
 426  		for _, id := range f.Ids.T {
 427  			pf.Ids = append(pf.Ids, id)
 428  		}
 429  	}
 430  
 431  	// Convert Authors
 432  	if f.Authors != nil {
 433  		for _, author := range f.Authors.T {
 434  			pf.Authors = append(pf.Authors, author)
 435  		}
 436  	}
 437  
 438  	// Convert Kinds - kind.S has ToUint16() method
 439  	if f.Kinds != nil && f.Kinds.Len() > 0 {
 440  		for _, k := range f.Kinds.ToUint16() {
 441  			pf.Kinds = append(pf.Kinds, uint32(k))
 442  		}
 443  	}
 444  
 445  	// Convert Since/Until - timestamp.T has .V field (int64)
 446  	if f.Since != nil && f.Since.V != 0 {
 447  		since := f.Since.V
 448  		pf.Since = &since
 449  	}
 450  	if f.Until != nil && f.Until.V != 0 {
 451  		until := f.Until.V
 452  		pf.Until = &until
 453  	}
 454  
 455  	// Convert Limit
 456  	if f.Limit != nil {
 457  		limit := uint32(*f.Limit)
 458  		pf.Limit = &limit
 459  	}
 460  
 461  	// Note: Tag filters (e, p, etc.) would need more complex conversion
 462  	// This is a simplified implementation
 463  
 464  	return pf
 465  }
 466  
 467  // parseNegentropyFilter parses a NIP-01 filter from JSON.
 468  // This is needed because filter.F uses kind.S which doesn't implement
 469  // json.Unmarshaler, so we parse manually and construct the filter.
 470  func parseNegentropyFilter(data []byte) (*filter.F, error) {
 471  	// Parse into a generic map first
 472  	var raw map[string]json.RawMessage
 473  	if err := json.Unmarshal(data, &raw); err != nil {
 474  		return nil, err
 475  	}
 476  
 477  	f := filter.New()
 478  
 479  	// Parse kinds array
 480  	if kindsRaw, ok := raw["kinds"]; ok {
 481  		var kinds []int
 482  		if err := json.Unmarshal(kindsRaw, &kinds); err != nil {
 483  			return nil, fmt.Errorf("invalid kinds: %v", err)
 484  		}
 485  		f.Kinds = kind.FromIntSlice(kinds)
 486  	}
 487  
 488  	// Parse authors array (hex pubkeys)
 489  	if authorsRaw, ok := raw["authors"]; ok {
 490  		var authors []string
 491  		if err := json.Unmarshal(authorsRaw, &authors); err != nil {
 492  			return nil, fmt.Errorf("invalid authors: %v", err)
 493  		}
 494  		f.Authors = tag.NewWithCap(len(authors))
 495  		for _, a := range authors {
 496  			if decoded, err := hex.DecodeString(a); err == nil {
 497  				f.Authors.T = append(f.Authors.T, decoded)
 498  			}
 499  		}
 500  	}
 501  
 502  	// Parse ids array (hex event IDs)
 503  	if idsRaw, ok := raw["ids"]; ok {
 504  		var ids []string
 505  		if err := json.Unmarshal(idsRaw, &ids); err != nil {
 506  			return nil, fmt.Errorf("invalid ids: %v", err)
 507  		}
 508  		f.Ids = tag.NewWithCap(len(ids))
 509  		for _, id := range ids {
 510  			if decoded, err := hex.DecodeString(id); err == nil {
 511  				f.Ids.T = append(f.Ids.T, decoded)
 512  			}
 513  		}
 514  	}
 515  
 516  	// Parse since timestamp
 517  	if sinceRaw, ok := raw["since"]; ok {
 518  		var since int64
 519  		if err := json.Unmarshal(sinceRaw, &since); err == nil {
 520  			f.Since = timestamp.FromUnix(since)
 521  		}
 522  	}
 523  
 524  	// Parse until timestamp
 525  	if untilRaw, ok := raw["until"]; ok {
 526  		var until int64
 527  		if err := json.Unmarshal(untilRaw, &until); err == nil {
 528  			f.Until = timestamp.FromUnix(until)
 529  		}
 530  	}
 531  
 532  	// Parse limit
 533  	if limitRaw, ok := raw["limit"]; ok {
 534  		var limit uint
 535  		if err := json.Unmarshal(limitRaw, &limit); err == nil {
 536  			f.Limit = &limit
 537  		}
 538  	}
 539  
 540  	return f, nil
 541  }
 542  
 543  // CloseAllNegentropySessions closes all negentropy sessions for a connection
 544  // Called when a WebSocket connection is closed
 545  func (l *Listener) CloseAllNegentropySessions() {
 546  	if negentropyHandler == nil {
 547  		return
 548  	}
 549  
 550  	ctx := context.Background()
 551  	sessions, err := negentropyHandler.ListSessions(ctx)
 552  	if chk.E(err) {
 553  		return
 554  	}
 555  
 556  	for _, sess := range sessions {
 557  		if sess.ConnectionID == l.connectionID {
 558  			negentropyHandler.CloseSession(ctx, l.connectionID, sess.SubscriptionID)
 559  		}
 560  	}
 561  }
 562