handle-event.go raw

   1  package app
   2  
   3  import (
   4  	"context"
   5  	"strings"
   6  	"time"
   7  
   8  	"next.orly.dev/pkg/lol/chk"
   9  	"next.orly.dev/pkg/lol/log"
  10  	"next.orly.dev/pkg/nostr/encoders/envelopes/authenvelope"
  11  	"next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
  12  	"next.orly.dev/pkg/nostr/encoders/envelopes/noticeenvelope"
  13  	"next.orly.dev/pkg/nostr/encoders/envelopes/okenvelope"
  14  	"next.orly.dev/pkg/nostr/encoders/event"
  15  	"next.orly.dev/pkg/nostr/encoders/hex"
  16  	"next.orly.dev/pkg/nostr/encoders/kind"
  17  	"next.orly.dev/pkg/nostr/encoders/reason"
  18  	"next.orly.dev/pkg/acl"
  19  	"next.orly.dev/pkg/event/ingestion"
  20  	"next.orly.dev/pkg/protocol/nip43"
  21  )
  22  
  23  // HandleEvent processes incoming EVENT messages.
  24  // This is a thin protocol adapter that delegates to the ingestion service.
  25  func (l *Listener) HandleEvent(msg []byte) (err error) {
  26  	log.T.F("HandleEvent: START handling event: %s", string(msg[:min(200, len(msg))]))
  27  
  28  	// Stage 1: Raw JSON validation (before unmarshal)
  29  	if result := l.eventValidator.ValidateRawJSON(msg); !result.Valid {
  30  		log.W.F("HandleEvent: rejecting event with validation error: %s", result.Msg)
  31  		if noticeErr := noticeenvelope.NewFrom(result.Msg).Write(l); noticeErr != nil {
  32  			log.E.F("failed to send NOTICE for validation error: %v", noticeErr)
  33  		}
  34  		if err = l.sendRawValidationError(result); chk.E(err) {
  35  			return
  36  		}
  37  		return nil
  38  	}
  39  
  40  	// Stage 2: Unmarshal the envelope
  41  	env := eventenvelope.NewSubmission()
  42  	if msg, err = env.Unmarshal(msg); chk.E(err) {
  43  		log.E.F("HandleEvent: failed to unmarshal event: %v", err)
  44  		return
  45  	}
  46  	log.D.F("HandleEvent: unmarshaled event, kind: %d, pubkey: %s, id: %0x",
  47  		env.E.Kind, hex.Enc(env.E.Pubkey), env.E.ID)
  48  	defer func() {
  49  		if env != nil && env.E != nil {
  50  			env.E.Free()
  51  		}
  52  	}()
  53  
  54  	// Stage 3: Handle special kinds that need connection context
  55  	if handled, err := l.handleSpecialKinds(env); handled {
  56  		return err
  57  	}
  58  
  59  	// Stage 4: Progressive throttle for follows ACL mode
  60  	if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
  61  		log.D.F("HandleEvent: applying progressive throttle delay of %v", delay)
  62  		select {
  63  		case <-l.ctx.Done():
  64  			return l.ctx.Err()
  65  		case <-time.After(delay):
  66  		}
  67  	}
  68  
  69  	// Stage 5: DM stranger rate limit check
  70  	if l.dmRateLimiter != nil {
  71  		if allowed, msg := l.dmRateLimiter.CheckDM(l.ctx, env.E); !allowed {
  72  			if err := Ok.Blocked(l, env, msg); chk.E(err) {
  73  				return err
  74  			}
  75  			return nil
  76  		}
  77  	}
  78  
  79  	// Stage 6: Delegate to ingestion service
  80  	connCtx := &ingestion.ConnectionContext{
  81  		AuthedPubkey: l.authedPubkey.Load(),
  82  		Remote:       l.remote,
  83  		ConnectionID: l.connectionID,
  84  	}
  85  	result := l.ingestionService.Ingest(context.Background(), env.E, connCtx)
  86  
  87  	// Post-ingestion hooks
  88  	if result.Saved {
  89  		// Invalidate channel membership cache when kind 41 (ChannelMetadata) is saved
  90  		if env.E.Kind == kind.ChannelMetadata.K && l.channelMembership != nil {
  91  			if channelID := ExtractChannelIDFromEvent(env.E); channelID != "" {
  92  				l.channelMembership.InvalidateChannel(channelID)
  93  			}
  94  		}
  95  		// Update DM bidirectional cache when a DM is saved
  96  		if l.dmRateLimiter != nil {
  97  			l.dmRateLimiter.OnDMIngested(env.E)
  98  		}
  99  	}
 100  
 101  	// Stage 7: Send response based on result
 102  	return l.sendIngestionResult(env, result)
 103  }
 104  
 105  // handleSpecialKinds handles event kinds that need connection context.
 106  // Returns (true, err) if the event was handled, (false, nil) to continue normal processing.
 107  func (l *Listener) handleSpecialKinds(env *eventenvelope.Submission) (bool, error) {
 108  	switch env.E.Kind {
 109  	case nip43.KindJoinRequest:
 110  		if err := l.HandleNIP43JoinRequest(env.E); chk.E(err) {
 111  			log.E.F("failed to process NIP-43 join request: %v", err)
 112  		}
 113  		return true, nil
 114  
 115  	case nip43.KindLeaveRequest:
 116  		if err := l.HandleNIP43LeaveRequest(env.E); chk.E(err) {
 117  			log.E.F("failed to process NIP-43 leave request: %v", err)
 118  		}
 119  		return true, nil
 120  
 121  	case kind.PolicyConfig.K:
 122  		if err := l.HandlePolicyConfigUpdate(env.E); chk.E(err) {
 123  			log.E.F("failed to process policy config update: %v", err)
 124  			if err = Ok.Error(l, env, err.Error()); chk.E(err) {
 125  				return true, err
 126  			}
 127  			return true, nil
 128  		}
 129  		if err := Ok.Ok(l, env, "policy configuration updated"); chk.E(err) {
 130  			return true, err
 131  		}
 132  		return true, nil
 133  
 134  	case kind.FollowList.K:
 135  		// Check if this is a follow list update from a policy admin
 136  		if l.IsPolicyAdminFollowListEvent(env.E) {
 137  			go func() {
 138  				if updateErr := l.HandlePolicyAdminFollowListUpdate(env.E); updateErr != nil {
 139  					log.W.F("failed to update policy follows: %v", updateErr)
 140  				}
 141  			}()
 142  		}
 143  		// Continue with normal processing
 144  		return false, nil
 145  	}
 146  
 147  	// Enforce channel membership for write operations on kinds 42-44
 148  	// Channel kinds always require membership check regardless of ACL mode
 149  	if kind.IsChannelKind(env.E.Kind) && !kind.IsDiscoverableChannelKind(env.E.Kind) {
 150  		if l.channelMembership != nil {
 151  			// Use the event's author pubkey (already signature-verified) for membership check
 152  			if !l.channelMembership.IsChannelMember(env.E, env.E.Pubkey, l.ctx) {
 153  				log.D.F("HandleEvent: channel write denied for pubkey %s (not a member)", hex.Enc(env.E.Pubkey))
 154  				if err := Ok.Blocked(l, env, "restricted: not a channel member"); chk.E(err) {
 155  					return true, err
 156  				}
 157  				return true, nil
 158  			}
 159  		}
 160  	}
 161  
 162  	// Enforce channel membership for non-channel events that reference channel
 163  	// events via e-tags (reactions, reposts, reports, zaps, deletions targeting
 164  	// channel messages). Two-step indirection: resolve e-tag → referenced event
 165  	// → channel ID → membership check.
 166  	if !kind.IsChannelKind(env.E.Kind) && l.channelMembership != nil {
 167  		if channelIDHex, isChannel := l.channelMembership.ReferencesChannelEvent(env.E, l.ctx); isChannel {
 168  			if !l.channelMembership.IsChannelMemberByID(channelIDHex, env.E.Kind, env.E.Pubkey, l.ctx) {
 169  				log.D.F("HandleEvent: channel reference write denied for pubkey %s kind %d (not a member of channel %s)",
 170  					hex.Enc(env.E.Pubkey), env.E.Kind, channelIDHex)
 171  				if err := Ok.Blocked(l, env, "restricted: not a channel member"); chk.E(err) {
 172  					return true, err
 173  				}
 174  				return true, nil
 175  			}
 176  		}
 177  	}
 178  
 179  	return false, nil
 180  }
 181  
 182  // sendIngestionResult sends the appropriate response based on the ingestion result.
 183  func (l *Listener) sendIngestionResult(env *eventenvelope.Submission, result ingestion.Result) error {
 184  	if result.Error != nil {
 185  		if strings.Contains(result.Error.Error(), "already exists") {
 186  			log.D.F("HandleEvent: duplicate event: %v", result.Error)
 187  		} else {
 188  			log.E.F("HandleEvent: ingestion error: %v", result.Error)
 189  		}
 190  		return Ok.Error(l, env, result.Error.Error())
 191  	}
 192  
 193  	if result.RequireAuth {
 194  		// Send OK false with auth required reason
 195  		if err := okenvelope.NewFrom(
 196  			env.Id(), false,
 197  			reason.AuthRequired.F(result.Message),
 198  		).Write(l); chk.E(err) {
 199  			return err
 200  		}
 201  		// Send AUTH challenge
 202  		return authenvelope.NewChallengeWith(l.challenge.Load()).Write(l)
 203  	}
 204  
 205  	if !result.Accepted {
 206  		return Ok.Blocked(l, env, result.Message)
 207  	}
 208  
 209  	// Success
 210  	log.D.F("HandleEvent: event %0x processed successfully", env.E.ID)
 211  	return Ok.Ok(l, env, result.Message)
 212  }
 213  
 214  // isPeerRelayPubkey checks if the given pubkey belongs to a peer relay
 215  func (l *Listener) isPeerRelayPubkey(pubkey []byte) bool {
 216  	if l.syncManager == nil {
 217  		return false
 218  	}
 219  
 220  	peerPubkeyHex := hex.Enc(pubkey)
 221  
 222  	for _, peerURL := range l.syncManager.GetPeers() {
 223  		if l.syncManager.IsAuthorizedPeer(peerURL, peerPubkeyHex) {
 224  			return true
 225  		}
 226  	}
 227  
 228  	return false
 229  }
 230  
 231  // HandleCuratingConfigUpdate processes curating configuration events (kind 30078)
 232  func (l *Listener) HandleCuratingConfigUpdate(ev *event.E) error {
 233  	if acl.Registry.Type() != "curating" {
 234  		return nil
 235  	}
 236  
 237  	for _, aclInstance := range acl.Registry.ACLs() {
 238  		if aclInstance.Type() == "curating" {
 239  			if curating, ok := aclInstance.(*acl.Curating); ok {
 240  				return curating.ProcessConfigEvent(ev)
 241  			}
 242  		}
 243  	}
 244  
 245  	return nil
 246  }
 247