handle-negentropy.go raw
1 package app
2
3 import (
4 "bytes"
5 "context"
6 "encoding/hex"
7 "encoding/json"
8 "fmt"
9
10 "next.orly.dev/pkg/lol/chk"
11 "next.orly.dev/pkg/lol/log"
12
13 "next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
14 "next.orly.dev/pkg/nostr/encoders/filter"
15 hexenc "next.orly.dev/pkg/nostr/encoders/hex"
16 "next.orly.dev/pkg/nostr/encoders/kind"
17 "next.orly.dev/pkg/nostr/encoders/tag"
18 "next.orly.dev/pkg/nostr/encoders/timestamp"
19 negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
20 "next.orly.dev/pkg/policy"
21 commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
22 )
23
24 // NIP-77 Negentropy envelope constants
25 const (
26 NegOpenLabel = "NEG-OPEN"
27 NegMsgLabel = "NEG-MSG"
28 NegCloseLabel = "NEG-CLOSE"
29 NegErrLabel = "NEG-ERR"
30 )
31
32 // negentropyHandler handles NIP-77 negentropy operations
33 // This can be either a gRPC client or an embedded handler
34 var negentropyHandler negentropyiface.Handler
35
36 // SetNegentropyHandler sets the negentropy handler for NIP-77 WebSocket handling
37 func SetNegentropyHandler(handler negentropyiface.Handler) {
38 negentropyHandler = handler
39 }
40
41 // IsNegentropyEnvelope checks if a message starts with a NEG-* envelope type
42 func IsNegentropyEnvelope(msg []byte) bool {
43 // Quick check: must start with '["NEG-'
44 if len(msg) < 8 {
45 return false
46 }
47 return bytes.HasPrefix(msg, []byte(`["NEG-`))
48 }
49
50 // IdentifyNegentropyEnvelope extracts the envelope type and remaining payload
51 func IdentifyNegentropyEnvelope(msg []byte) (envelopeType string, ok bool) {
52 // Parse enough to get the envelope type
53 if !IsNegentropyEnvelope(msg) {
54 return "", false
55 }
56
57 // Find the first comma after the opening label
58 end := bytes.IndexByte(msg[2:], '"')
59 if end < 0 {
60 return "", false
61 }
62 envelopeType = string(msg[2 : 2+end])
63 return envelopeType, true
64 }
65
66 // HandleNegOpen processes NEG-OPEN messages
67 // Format: ["NEG-OPEN", subscription_id, filter, initial_message?]
68 func (l *Listener) HandleNegOpen(msg []byte) error {
69 log.D.F("HandleNegOpen called from %s", l.connectionID)
70 if negentropyHandler == nil {
71 log.E.F("negentropy handler not initialized — client sent NEG-OPEN but NIP-77 is not enabled (check ORLY_NEGENTROPY_ENABLED and startup logs)")
72 return l.sendNegErr("", "negentropy not enabled on this relay")
73 }
74
75 // Parse the message array
76 var parts []json.RawMessage
77 if err := json.Unmarshal(msg, &parts); err != nil {
78 return l.sendNegErr("", fmt.Sprintf("invalid NEG-OPEN format: %v", err))
79 }
80
81 if len(parts) < 3 {
82 return l.sendNegErr("", "NEG-OPEN requires at least 3 elements")
83 }
84
85 // Extract subscription ID
86 var subscriptionID string
87 if err := json.Unmarshal(parts[1], &subscriptionID); err != nil {
88 return l.sendNegErr("", fmt.Sprintf("invalid subscription_id: %v", err))
89 }
90
91 // Extract filter - use custom parsing because filter.F's kinds field
92 // doesn't support standard JSON array unmarshaling
93 f, err := parseNegentropyFilter(parts[2])
94 if err != nil {
95 return l.sendNegErr(subscriptionID, fmt.Sprintf("invalid filter: %v", err))
96 }
97
98 // Extract optional initial message (hex encoded per NIP-77)
99 var initialMessage []byte
100 if len(parts) >= 4 {
101 var msgStr string
102 if err := json.Unmarshal(parts[3], &msgStr); err == nil && msgStr != "" {
103 // NIP-77 uses hex encoding
104 if decoded, err := hex.DecodeString(msgStr); err == nil {
105 initialMessage = decoded
106 } else {
107 log.W.F("NEG-OPEN: invalid hex message: %v", err)
108 }
109 }
110 }
111
112 // Convert filter to proto format
113 protoFilter := filterToProto(f)
114
115 // Call gRPC service
116 ctx := context.Background()
117 respMsg, haveIDs, needIDs, complete, errStr, err := negentropyHandler.HandleNegOpen(
118 ctx,
119 l.connectionID,
120 subscriptionID,
121 protoFilter,
122 initialMessage,
123 )
124 if err != nil {
125 log.E.F("NEG-OPEN gRPC error: %v", err)
126 return l.sendNegErr(subscriptionID, "internal error")
127 }
128
129 if errStr != "" {
130 return l.sendNegErr(subscriptionID, errStr)
131 }
132
133 // Log need_ids (events client should send us)
134 if len(needIDs) > 0 {
135 log.D.F("NEG-OPEN: relay needs %d events from client", len(needIDs))
136 }
137
138 // Send NEG-MSG response FIRST (before events)
139 if err := l.sendNegMsg(subscriptionID, respMsg); err != nil {
140 return err
141 }
142
143 // If reconciliation is complete, send events we have that client needs.
144 // Per NIP-77: The haves/needs are only final when reconcile returns complete=true.
145 if complete {
146 log.D.F("NEG-OPEN: reconciliation complete for %s, sending %d events", subscriptionID, len(haveIDs))
147 if len(haveIDs) > 0 {
148 if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
149 log.E.F("failed to send events for NEG-OPEN: %v", err)
150 }
151 }
152 }
153
154 return nil
155 }
156
157 // HandleNegMsg processes NEG-MSG messages
158 // Format: ["NEG-MSG", subscription_id, message]
159 func (l *Listener) HandleNegMsg(msg []byte) error {
160 if negentropyHandler == nil {
161 return l.sendNegErr("", "negentropy not enabled")
162 }
163
164 // Parse the message array
165 var parts []json.RawMessage
166 if err := json.Unmarshal(msg, &parts); err != nil {
167 return l.sendNegErr("", fmt.Sprintf("invalid NEG-MSG format: %v", err))
168 }
169
170 if len(parts) < 3 {
171 return l.sendNegErr("", "NEG-MSG requires 3 elements")
172 }
173
174 // Extract subscription ID
175 var subscriptionID string
176 if err := json.Unmarshal(parts[1], &subscriptionID); err != nil {
177 return l.sendNegErr("", fmt.Sprintf("invalid subscription_id: %v", err))
178 }
179
180 // Extract message (hex or base64 encoded)
181 var msgStr string
182 if err := json.Unmarshal(parts[2], &msgStr); err != nil {
183 return l.sendNegErr(subscriptionID, fmt.Sprintf("invalid message: %v", err))
184 }
185
186 // Decode message (NIP-77 uses hex encoding)
187 negentropyMsg, err := hex.DecodeString(msgStr)
188 if err != nil {
189 return l.sendNegErr(subscriptionID, fmt.Sprintf("invalid hex message: %v", err))
190 }
191
192 // Call gRPC service
193 ctx := context.Background()
194 respMsg, haveIDs, needIDs, complete, errStr, err := negentropyHandler.HandleNegMsg(
195 ctx,
196 l.connectionID,
197 subscriptionID,
198 negentropyMsg,
199 )
200 if err != nil {
201 log.E.F("NEG-MSG gRPC error: %v", err)
202 return l.sendNegErr(subscriptionID, "internal error")
203 }
204
205 if errStr != "" {
206 return l.sendNegErr(subscriptionID, errStr)
207 }
208
209 // Log need_ids (events client should send us)
210 if len(needIDs) > 0 {
211 log.D.F("NEG-MSG: relay needs %d events from client", len(needIDs))
212 }
213
214 // Send NEG-MSG response FIRST (before events)
215 if err := l.sendNegMsg(subscriptionID, respMsg); err != nil {
216 return err
217 }
218
219 // If reconciliation is complete, send events we have that client needs.
220 // Per NIP-77: The haves/needs are only final when reconcile returns complete=true.
221 if complete {
222 log.D.F("NEG-MSG: reconciliation complete for %s, sending %d events", subscriptionID, len(haveIDs))
223 if len(haveIDs) > 0 {
224 if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
225 log.E.F("failed to send events for NEG-MSG: %v", err)
226 }
227 }
228 }
229
230 return nil
231 }
232
233 // HandleNegClose processes NEG-CLOSE messages
234 // Format: ["NEG-CLOSE", subscription_id]
235 func (l *Listener) HandleNegClose(msg []byte) error {
236 if negentropyHandler == nil {
237 return nil // Silently ignore if not enabled
238 }
239
240 // Parse the message array
241 var parts []json.RawMessage
242 if err := json.Unmarshal(msg, &parts); err != nil {
243 return nil // Silently ignore malformed close
244 }
245
246 if len(parts) < 2 {
247 return nil
248 }
249
250 // Extract subscription ID
251 var subscriptionID string
252 if err := json.Unmarshal(parts[1], &subscriptionID); err != nil {
253 return nil
254 }
255
256 // Call gRPC service to close the session
257 ctx := context.Background()
258 if err := negentropyHandler.HandleNegClose(ctx, l.connectionID, subscriptionID); err != nil {
259 log.E.F("NEG-CLOSE gRPC error: %v", err)
260 }
261
262 return nil
263 }
264
265 // sendNegMsg sends a NEG-MSG response to the client
266 func (l *Listener) sendNegMsg(subscriptionID string, message []byte) error {
267 // Encode message as hex (per NIP-77)
268 encoded := ""
269 if len(message) > 0 {
270 encoded = hex.EncodeToString(message)
271 }
272
273 // Format: ["NEG-MSG", subscription_id, message]
274 resp, err := json.Marshal([]any{NegMsgLabel, subscriptionID, encoded})
275 if err != nil {
276 return err
277 }
278
279 _, err = l.Write(resp)
280 return err
281 }
282
283 // sendNegErr sends a NEG-ERR response to the client
284 func (l *Listener) sendNegErr(subscriptionID, reason string) error {
285 // Format: ["NEG-ERR", subscription_id, reason]
286 resp, err := json.Marshal([]any{NegErrLabel, subscriptionID, reason})
287 if err != nil {
288 return err
289 }
290
291 _, err = l.Write(resp)
292 return err
293 }
294
295 // sendEventsForIDs fetches and sends events for the given IDs.
296 // Auth-aware: unauthenticated clients get public events only; authenticated
297 // clients get full delivery subject to privilege and channel membership checks.
298 func (l *Listener) sendEventsForIDs(subscriptionID string, ids [][]byte) error {
299 if len(ids) == 0 {
300 return nil
301 }
302
303 log.D.F("NEG: sending %d events for subscription %s", len(ids), subscriptionID)
304
305 // Build filter with binary IDs (32 bytes each)
306 f := &filter.F{}
307 f.Ids = &tag.T{}
308 for _, id := range ids {
309 // IDs are binary (32 bytes full or 16 bytes truncated per NIP-77)
310 f.Ids.T = append(f.Ids.T, id)
311 }
312
313 // Query events by IDs
314 ctx := l.ctx
315 events, err := l.Server.db.QueryEvents(ctx, f)
316 if err != nil {
317 log.E.F("NEG: failed to query events: %v", err)
318 return err
319 }
320
321 pk := l.authedPubkey.Load()
322 // Full sync only for whitelisted relay pubkeys; everyone else gets public only
323 isFullSync := len(pk) > 0 && l.negentropyFullSyncPubkeys[hexenc.Enc(pk)]
324
325 // Send each event via EVENT envelope with subscription ID
326 sent, skipped := 0, 0
327 for _, ev := range events {
328 if ev == nil {
329 continue
330 }
331
332 // --- Auth-aware filtering ---
333
334 // Privileged events (DMs, gift wraps, channel kinds, etc.)
335 if kind.IsPrivileged(ev.Kind) {
336 if !isFullSync {
337 skipped++
338 continue
339 }
340
341 // Channel kinds: check membership
342 if kind.IsChannelKind(ev.Kind) {
343 // Discoverable kinds (40, 41) are always allowed for full-sync peers
344 if !kind.IsDiscoverableChannelKind(ev.Kind) {
345 if l.channelMembership != nil {
346 if !l.channelMembership.IsChannelMember(ev, pk, ctx) {
347 skipped++
348 continue
349 }
350 }
351 }
352 } else {
353 // Non-channel privileged: only deliver to involved parties
354 if !policy.IsPartyInvolved(ev, pk) {
355 skipped++
356 continue
357 }
358 }
359 }
360
361 // Non-privileged events that reference channel events via e-tags
362 // (reactions, reposts, zaps, reports, deletions targeting channel messages)
363 if !kind.IsChannelKind(ev.Kind) && !kind.IsPrivileged(ev.Kind) && l.channelMembership != nil {
364 if channelIDHex, isChannel := l.channelMembership.ReferencesChannelEvent(ev, ctx); isChannel {
365 if !isFullSync || !l.channelMembership.IsChannelMemberByID(channelIDHex, ev.Kind, pk, ctx) {
366 log.D.F(
367 "NEG: delivery DENIED for channel-referencing event %s kind %d (not a member of channel %s)",
368 hexenc.Enc(ev.ID), ev.Kind, channelIDHex,
369 )
370 skipped++
371 continue
372 }
373 }
374 }
375
376 // Private tag check (matches publisher.go logic)
377 if ev.Tags != nil && ev.Tags.Len() > 0 {
378 var privatePubkey []byte
379 hasPrivate := false
380 for _, t := range *ev.Tags {
381 if t.Len() >= 2 {
382 keyBytes := t.Key()
383 if len(keyBytes) == 7 && string(keyBytes) == "private" {
384 hasPrivate = true
385 privatePubkey = t.Value()
386 break
387 }
388 }
389 }
390 if hasPrivate {
391 if !l.canSeePrivateEvent(pk, privatePubkey) {
392 skipped++
393 continue
394 }
395 }
396 }
397
398 // --- Passed all checks, send ---
399 res, err := eventenvelope.NewResultWith([]byte(subscriptionID), ev)
400 if err != nil {
401 log.W.F("NEG: failed to create event envelope: %v", err)
402 continue
403 }
404 if err := res.Write(l); err != nil {
405 log.W.F("NEG: failed to send event: %v", err)
406 continue
407 }
408 sent++
409 }
410
411 log.D.F("NEG: sent %d/%d events (skipped %d auth-gated) for subscription %s",
412 sent, len(ids), skipped, subscriptionID)
413 return nil
414 }
415
416 // filterToProto converts a nostr filter to proto format
417 func filterToProto(f *filter.F) *commonv1.Filter {
418 if f == nil {
419 return nil
420 }
421
422 pf := &commonv1.Filter{}
423
424 // Convert Ids
425 if f.Ids != nil {
426 for _, id := range f.Ids.T {
427 pf.Ids = append(pf.Ids, id)
428 }
429 }
430
431 // Convert Authors
432 if f.Authors != nil {
433 for _, author := range f.Authors.T {
434 pf.Authors = append(pf.Authors, author)
435 }
436 }
437
438 // Convert Kinds - kind.S has ToUint16() method
439 if f.Kinds != nil && f.Kinds.Len() > 0 {
440 for _, k := range f.Kinds.ToUint16() {
441 pf.Kinds = append(pf.Kinds, uint32(k))
442 }
443 }
444
445 // Convert Since/Until - timestamp.T has .V field (int64)
446 if f.Since != nil && f.Since.V != 0 {
447 since := f.Since.V
448 pf.Since = &since
449 }
450 if f.Until != nil && f.Until.V != 0 {
451 until := f.Until.V
452 pf.Until = &until
453 }
454
455 // Convert Limit
456 if f.Limit != nil {
457 limit := uint32(*f.Limit)
458 pf.Limit = &limit
459 }
460
461 // Note: Tag filters (e, p, etc.) would need more complex conversion
462 // This is a simplified implementation
463
464 return pf
465 }
466
467 // parseNegentropyFilter parses a NIP-01 filter from JSON.
468 // This is needed because filter.F uses kind.S which doesn't implement
469 // json.Unmarshaler, so we parse manually and construct the filter.
470 func parseNegentropyFilter(data []byte) (*filter.F, error) {
471 // Parse into a generic map first
472 var raw map[string]json.RawMessage
473 if err := json.Unmarshal(data, &raw); err != nil {
474 return nil, err
475 }
476
477 f := filter.New()
478
479 // Parse kinds array
480 if kindsRaw, ok := raw["kinds"]; ok {
481 var kinds []int
482 if err := json.Unmarshal(kindsRaw, &kinds); err != nil {
483 return nil, fmt.Errorf("invalid kinds: %v", err)
484 }
485 f.Kinds = kind.FromIntSlice(kinds)
486 }
487
488 // Parse authors array (hex pubkeys)
489 if authorsRaw, ok := raw["authors"]; ok {
490 var authors []string
491 if err := json.Unmarshal(authorsRaw, &authors); err != nil {
492 return nil, fmt.Errorf("invalid authors: %v", err)
493 }
494 f.Authors = tag.NewWithCap(len(authors))
495 for _, a := range authors {
496 if decoded, err := hex.DecodeString(a); err == nil {
497 f.Authors.T = append(f.Authors.T, decoded)
498 }
499 }
500 }
501
502 // Parse ids array (hex event IDs)
503 if idsRaw, ok := raw["ids"]; ok {
504 var ids []string
505 if err := json.Unmarshal(idsRaw, &ids); err != nil {
506 return nil, fmt.Errorf("invalid ids: %v", err)
507 }
508 f.Ids = tag.NewWithCap(len(ids))
509 for _, id := range ids {
510 if decoded, err := hex.DecodeString(id); err == nil {
511 f.Ids.T = append(f.Ids.T, decoded)
512 }
513 }
514 }
515
516 // Parse since timestamp
517 if sinceRaw, ok := raw["since"]; ok {
518 var since int64
519 if err := json.Unmarshal(sinceRaw, &since); err == nil {
520 f.Since = timestamp.FromUnix(since)
521 }
522 }
523
524 // Parse until timestamp
525 if untilRaw, ok := raw["until"]; ok {
526 var until int64
527 if err := json.Unmarshal(untilRaw, &until); err == nil {
528 f.Until = timestamp.FromUnix(until)
529 }
530 }
531
532 // Parse limit
533 if limitRaw, ok := raw["limit"]; ok {
534 var limit uint
535 if err := json.Unmarshal(limitRaw, &limit); err == nil {
536 f.Limit = &limit
537 }
538 }
539
540 return f, nil
541 }
542
543 // CloseAllNegentropySessions closes all negentropy sessions for a connection
544 // Called when a WebSocket connection is closed
545 func (l *Listener) CloseAllNegentropySessions() {
546 if negentropyHandler == nil {
547 return
548 }
549
550 ctx := context.Background()
551 sessions, err := negentropyHandler.ListSessions(ctx)
552 if chk.E(err) {
553 return
554 }
555
556 for _, sess := range sessions {
557 if sess.ConnectionID == l.connectionID {
558 negentropyHandler.CloseSession(ctx, l.connectionID, sess.SubscriptionID)
559 }
560 }
561 }
562