handle-event.go raw
1 package app
2
3 import (
4 "context"
5 "strings"
6 "time"
7
8 "next.orly.dev/pkg/lol/chk"
9 "next.orly.dev/pkg/lol/log"
10 "next.orly.dev/pkg/nostr/encoders/envelopes/authenvelope"
11 "next.orly.dev/pkg/nostr/encoders/envelopes/eventenvelope"
12 "next.orly.dev/pkg/nostr/encoders/envelopes/noticeenvelope"
13 "next.orly.dev/pkg/nostr/encoders/envelopes/okenvelope"
14 "next.orly.dev/pkg/nostr/encoders/event"
15 "next.orly.dev/pkg/nostr/encoders/hex"
16 "next.orly.dev/pkg/nostr/encoders/kind"
17 "next.orly.dev/pkg/nostr/encoders/reason"
18 "next.orly.dev/pkg/acl"
19 "next.orly.dev/pkg/event/ingestion"
20 "next.orly.dev/pkg/protocol/nip43"
21 )
22
23 // HandleEvent processes incoming EVENT messages.
24 // This is a thin protocol adapter that delegates to the ingestion service.
25 func (l *Listener) HandleEvent(msg []byte) (err error) {
26 log.T.F("HandleEvent: START handling event: %s", string(msg[:min(200, len(msg))]))
27
28 // Stage 1: Raw JSON validation (before unmarshal)
29 if result := l.eventValidator.ValidateRawJSON(msg); !result.Valid {
30 log.W.F("HandleEvent: rejecting event with validation error: %s", result.Msg)
31 if noticeErr := noticeenvelope.NewFrom(result.Msg).Write(l); noticeErr != nil {
32 log.E.F("failed to send NOTICE for validation error: %v", noticeErr)
33 }
34 if err = l.sendRawValidationError(result); chk.E(err) {
35 return
36 }
37 return nil
38 }
39
40 // Stage 2: Unmarshal the envelope
41 env := eventenvelope.NewSubmission()
42 if msg, err = env.Unmarshal(msg); chk.E(err) {
43 log.E.F("HandleEvent: failed to unmarshal event: %v", err)
44 return
45 }
46 log.D.F("HandleEvent: unmarshaled event, kind: %d, pubkey: %s, id: %0x",
47 env.E.Kind, hex.Enc(env.E.Pubkey), env.E.ID)
48 defer func() {
49 if env != nil && env.E != nil {
50 env.E.Free()
51 }
52 }()
53
54 // Stage 3: Handle special kinds that need connection context
55 if handled, err := l.handleSpecialKinds(env); handled {
56 return err
57 }
58
59 // Stage 4: Progressive throttle for follows ACL mode
60 if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
61 log.D.F("HandleEvent: applying progressive throttle delay of %v", delay)
62 select {
63 case <-l.ctx.Done():
64 return l.ctx.Err()
65 case <-time.After(delay):
66 }
67 }
68
69 // Stage 5: DM stranger rate limit check
70 if l.dmRateLimiter != nil {
71 if allowed, msg := l.dmRateLimiter.CheckDM(l.ctx, env.E); !allowed {
72 if err := Ok.Blocked(l, env, msg); chk.E(err) {
73 return err
74 }
75 return nil
76 }
77 }
78
79 // Stage 6: Delegate to ingestion service
80 connCtx := &ingestion.ConnectionContext{
81 AuthedPubkey: l.authedPubkey.Load(),
82 Remote: l.remote,
83 ConnectionID: l.connectionID,
84 }
85 result := l.ingestionService.Ingest(context.Background(), env.E, connCtx)
86
87 // Post-ingestion hooks
88 if result.Saved {
89 // Invalidate channel membership cache when kind 41 (ChannelMetadata) is saved
90 if env.E.Kind == kind.ChannelMetadata.K && l.channelMembership != nil {
91 if channelID := ExtractChannelIDFromEvent(env.E); channelID != "" {
92 l.channelMembership.InvalidateChannel(channelID)
93 }
94 }
95 // Update DM bidirectional cache when a DM is saved
96 if l.dmRateLimiter != nil {
97 l.dmRateLimiter.OnDMIngested(env.E)
98 }
99 }
100
101 // Stage 7: Send response based on result
102 return l.sendIngestionResult(env, result)
103 }
104
105 // handleSpecialKinds handles event kinds that need connection context.
106 // Returns (true, err) if the event was handled, (false, nil) to continue normal processing.
107 func (l *Listener) handleSpecialKinds(env *eventenvelope.Submission) (bool, error) {
108 switch env.E.Kind {
109 case nip43.KindJoinRequest:
110 if err := l.HandleNIP43JoinRequest(env.E); chk.E(err) {
111 log.E.F("failed to process NIP-43 join request: %v", err)
112 }
113 return true, nil
114
115 case nip43.KindLeaveRequest:
116 if err := l.HandleNIP43LeaveRequest(env.E); chk.E(err) {
117 log.E.F("failed to process NIP-43 leave request: %v", err)
118 }
119 return true, nil
120
121 case kind.PolicyConfig.K:
122 if err := l.HandlePolicyConfigUpdate(env.E); chk.E(err) {
123 log.E.F("failed to process policy config update: %v", err)
124 if err = Ok.Error(l, env, err.Error()); chk.E(err) {
125 return true, err
126 }
127 return true, nil
128 }
129 if err := Ok.Ok(l, env, "policy configuration updated"); chk.E(err) {
130 return true, err
131 }
132 return true, nil
133
134 case kind.FollowList.K:
135 // Check if this is a follow list update from a policy admin
136 if l.IsPolicyAdminFollowListEvent(env.E) {
137 go func() {
138 if updateErr := l.HandlePolicyAdminFollowListUpdate(env.E); updateErr != nil {
139 log.W.F("failed to update policy follows: %v", updateErr)
140 }
141 }()
142 }
143 // Continue with normal processing
144 return false, nil
145 }
146
147 // Enforce channel membership for write operations on kinds 42-44
148 // Channel kinds always require membership check regardless of ACL mode
149 if kind.IsChannelKind(env.E.Kind) && !kind.IsDiscoverableChannelKind(env.E.Kind) {
150 if l.channelMembership != nil {
151 // Use the event's author pubkey (already signature-verified) for membership check
152 if !l.channelMembership.IsChannelMember(env.E, env.E.Pubkey, l.ctx) {
153 log.D.F("HandleEvent: channel write denied for pubkey %s (not a member)", hex.Enc(env.E.Pubkey))
154 if err := Ok.Blocked(l, env, "restricted: not a channel member"); chk.E(err) {
155 return true, err
156 }
157 return true, nil
158 }
159 }
160 }
161
162 // Enforce channel membership for non-channel events that reference channel
163 // events via e-tags (reactions, reposts, reports, zaps, deletions targeting
164 // channel messages). Two-step indirection: resolve e-tag → referenced event
165 // → channel ID → membership check.
166 if !kind.IsChannelKind(env.E.Kind) && l.channelMembership != nil {
167 if channelIDHex, isChannel := l.channelMembership.ReferencesChannelEvent(env.E, l.ctx); isChannel {
168 if !l.channelMembership.IsChannelMemberByID(channelIDHex, env.E.Kind, env.E.Pubkey, l.ctx) {
169 log.D.F("HandleEvent: channel reference write denied for pubkey %s kind %d (not a member of channel %s)",
170 hex.Enc(env.E.Pubkey), env.E.Kind, channelIDHex)
171 if err := Ok.Blocked(l, env, "restricted: not a channel member"); chk.E(err) {
172 return true, err
173 }
174 return true, nil
175 }
176 }
177 }
178
179 return false, nil
180 }
181
182 // sendIngestionResult sends the appropriate response based on the ingestion result.
183 func (l *Listener) sendIngestionResult(env *eventenvelope.Submission, result ingestion.Result) error {
184 if result.Error != nil {
185 if strings.Contains(result.Error.Error(), "already exists") {
186 log.D.F("HandleEvent: duplicate event: %v", result.Error)
187 } else {
188 log.E.F("HandleEvent: ingestion error: %v", result.Error)
189 }
190 return Ok.Error(l, env, result.Error.Error())
191 }
192
193 if result.RequireAuth {
194 // Send OK false with auth required reason
195 if err := okenvelope.NewFrom(
196 env.Id(), false,
197 reason.AuthRequired.F(result.Message),
198 ).Write(l); chk.E(err) {
199 return err
200 }
201 // Send AUTH challenge
202 return authenvelope.NewChallengeWith(l.challenge.Load()).Write(l)
203 }
204
205 if !result.Accepted {
206 return Ok.Blocked(l, env, result.Message)
207 }
208
209 // Success
210 log.D.F("HandleEvent: event %0x processed successfully", env.E.ID)
211 return Ok.Ok(l, env, result.Message)
212 }
213
214 // isPeerRelayPubkey checks if the given pubkey belongs to a peer relay
215 func (l *Listener) isPeerRelayPubkey(pubkey []byte) bool {
216 if l.syncManager == nil {
217 return false
218 }
219
220 peerPubkeyHex := hex.Enc(pubkey)
221
222 for _, peerURL := range l.syncManager.GetPeers() {
223 if l.syncManager.IsAuthorizedPeer(peerURL, peerPubkeyHex) {
224 return true
225 }
226 }
227
228 return false
229 }
230
231 // HandleCuratingConfigUpdate processes curating configuration events (kind 30078)
232 func (l *Listener) HandleCuratingConfigUpdate(ev *event.E) error {
233 if acl.Registry.Type() != "curating" {
234 return nil
235 }
236
237 for _, aclInstance := range acl.Registry.ACLs() {
238 if aclInstance.Type() == "curating" {
239 if curating, ok := aclInstance.(*acl.Curating); ok {
240 return curating.ProcessConfigEvent(ev)
241 }
242 }
243 }
244
245 return nil
246 }
247