service.go raw
1 // Package ingestion provides a service for orchestrating the event processing pipeline.
2 // It coordinates validation, special kind handling, authorization, routing, and processing.
3 package ingestion
4
5 import (
6 "context"
7 "fmt"
8
9 "next.orly.dev/pkg/nostr/encoders/event"
10 "next.orly.dev/pkg/event/authorization"
11 "next.orly.dev/pkg/event/processing"
12 "next.orly.dev/pkg/event/routing"
13 "next.orly.dev/pkg/event/specialkinds"
14 "next.orly.dev/pkg/event/validation"
15 )
16
17 // SprocketChecker checks events against the sprocket (external filter).
18 type SprocketChecker interface {
19 IsEnabled() bool
20 IsDisabled() bool
21 IsRunning() bool
22 ProcessEvent(*event.E) (*SprocketResponse, error)
23 }
24
25 // SprocketResponse is the response from sprocket processing.
26 type SprocketResponse struct {
27 Action string // "accept", "reject", "shadowReject"
28 Msg string
29 }
30
31 // ConnectionContext provides connection-specific data for event processing.
32 type ConnectionContext struct {
33 // AuthedPubkey is the authenticated pubkey for this connection (may be nil).
34 AuthedPubkey []byte
35
36 // Remote is the remote address of the connection.
37 Remote string
38
39 // ConnectionID uniquely identifies this connection.
40 ConnectionID string
41 }
42
43 // Result represents the outcome of event ingestion.
44 type Result struct {
45 // Accepted indicates the event was accepted for processing.
46 Accepted bool
47
48 // Saved indicates the event was saved to the database.
49 Saved bool
50
51 // Message is an optional message for the OK response.
52 Message string
53
54 // RequireAuth indicates authentication is required.
55 RequireAuth bool
56
57 // Error contains any error that occurred.
58 Error error
59 }
60
61 // Accepted returns a successful Result.
62 func Accepted(message string) Result {
63 return Result{Accepted: true, Saved: true, Message: message}
64 }
65
66 // AcceptedNotSaved returns a Result where the event was accepted but not saved.
67 func AcceptedNotSaved(message string) Result {
68 return Result{Accepted: true, Saved: false, Message: message}
69 }
70
71 // Rejected returns a rejection Result.
72 func Rejected(message string) Result {
73 return Result{Accepted: false, Message: message}
74 }
75
76 // AuthRequired returns a Result indicating authentication is required.
77 func AuthRequired(message string) Result {
78 return Result{Accepted: false, RequireAuth: true, Message: message}
79 }
80
81 // Errored returns a Result with an error.
82 func Errored(err error) Result {
83 return Result{Accepted: false, Error: err}
84 }
85
86 // Config configures the ingestion service.
87 type Config struct {
88 // SprocketChecker is the optional sprocket checker.
89 SprocketChecker SprocketChecker
90
91 // SpecialKinds is the registry for special kind handlers.
92 SpecialKinds *specialkinds.Registry
93
94 // ACLMode is the current ACL mode (used for NIP-70 validation).
95 ACLMode func() string
96
97 // DeleteHandler handles deletion events.
98 DeleteHandler DeleteHandler
99 }
100
101 // DeleteHandler processes delete events (kind 5).
102 type DeleteHandler interface {
103 // HandleDelete processes a delete event after it's been saved.
104 HandleDelete(ctx context.Context, ev *event.E) error
105 }
106
107 // Service orchestrates the event ingestion pipeline.
108 type Service struct {
109 validator *validation.Service
110 authorizer *authorization.Service
111 router *routing.DefaultRouter
112 processor *processing.Service
113 sprocket SprocketChecker
114 specialKinds *specialkinds.Registry
115 aclMode func() string
116 deleteHandler DeleteHandler
117 }
118
119 // NewService creates a new ingestion service.
120 func NewService(
121 validator *validation.Service,
122 authorizer *authorization.Service,
123 router *routing.DefaultRouter,
124 processor *processing.Service,
125 cfg Config,
126 ) *Service {
127 return &Service{
128 validator: validator,
129 authorizer: authorizer,
130 router: router,
131 processor: processor,
132 sprocket: cfg.SprocketChecker,
133 specialKinds: cfg.SpecialKinds,
134 aclMode: cfg.ACLMode,
135 deleteHandler: cfg.DeleteHandler,
136 }
137 }
138
139 // Ingest processes an event through the full ingestion pipeline.
140 // Returns a Result indicating the outcome.
141 func (s *Service) Ingest(ctx context.Context, ev *event.E, connCtx *ConnectionContext) Result {
142 // Stage 1: Event validation (ID, timestamp, signature)
143 if result := s.validator.ValidateEvent(ev); !result.Valid {
144 return Rejected(result.Msg)
145 }
146
147 // Stage 2: Sprocket check (if enabled)
148 if s.sprocket != nil && s.sprocket.IsEnabled() {
149 if s.sprocket.IsDisabled() {
150 return Rejected("sprocket disabled - events rejected until sprocket is restored")
151 }
152 if !s.sprocket.IsRunning() {
153 return Rejected("sprocket not running - events rejected until sprocket starts")
154 }
155
156 response, err := s.sprocket.ProcessEvent(ev)
157 if err != nil {
158 return Errored(fmt.Errorf("sprocket processing failed: %w", err))
159 }
160
161 switch response.Action {
162 case "accept":
163 // Continue processing
164 case "reject":
165 return Rejected(response.Msg)
166 case "shadowReject":
167 // Accept but don't save
168 return AcceptedNotSaved("")
169 }
170 }
171
172 // Stage 3: Special kind handling
173 if s.specialKinds != nil {
174 hctx := &specialkinds.HandlerContext{
175 AuthedPubkey: connCtx.AuthedPubkey,
176 Remote: connCtx.Remote,
177 ConnectionID: connCtx.ConnectionID,
178 }
179 if result, handled := s.specialKinds.TryHandle(ctx, ev, hctx); handled {
180 if result.Error != nil {
181 return Errored(result.Error)
182 }
183 if result.Handled {
184 if result.SaveEvent {
185 // Handler wants the event saved
186 procResult := s.processor.Process(ctx, ev)
187 if procResult.Error != nil {
188 return Errored(procResult.Error)
189 }
190 return Accepted(result.Message)
191 }
192 return AcceptedNotSaved(result.Message)
193 }
194 // result.Continue - fall through to normal processing
195 }
196 }
197
198 // Stage 4: Authorization check
199 decision := s.authorizer.Authorize(ev, connCtx.AuthedPubkey, connCtx.Remote, ev.Kind)
200 if !decision.Allowed {
201 if decision.RequireAuth {
202 return AuthRequired(decision.DenyReason)
203 }
204 return Rejected(decision.DenyReason)
205 }
206
207 // Stage 5: NIP-70 protected tag validation (only when ACL is active)
208 if s.aclMode != nil && s.aclMode() != "none" {
209 if result := s.validator.ValidateProtectedTag(ev, connCtx.AuthedPubkey); !result.Valid {
210 return Rejected(result.Msg)
211 }
212 }
213
214 // Stage 6: Routing (ephemeral events, etc.)
215 if routeResult := s.router.Route(ev, connCtx.AuthedPubkey); routeResult.Action != routing.Continue {
216 if routeResult.Action == routing.Handled {
217 return AcceptedNotSaved(routeResult.Message)
218 }
219 if routeResult.Action == routing.Error {
220 return Errored(fmt.Errorf("routing error: %s", routeResult.Message))
221 }
222 }
223
224 // Stage 7: Processing (save, hooks, delivery)
225 procResult := s.processor.Process(ctx, ev)
226 if procResult.Blocked {
227 return Rejected(procResult.BlockMsg)
228 }
229 if procResult.Error != nil {
230 return Errored(procResult.Error)
231 }
232
233 // Stage 8: Delete event post-processing
234 const kindEventDeletion = 5
235 if ev.Kind == kindEventDeletion && s.deleteHandler != nil {
236 if err := s.deleteHandler.HandleDelete(ctx, ev); err != nil {
237 // Log but don't fail - the delete event is already saved
238 // Return success since the event was stored
239 }
240 }
241
242 return Accepted("")
243 }
244
245 // IngestWithRawValidation includes raw JSON validation before unmarshaling.
246 // Use this when the event hasn't been validated yet.
247 func (s *Service) IngestWithRawValidation(ctx context.Context, rawJSON []byte, ev *event.E, connCtx *ConnectionContext) Result {
248 // Stage 0: Raw JSON validation
249 if result := s.validator.ValidateRawJSON(rawJSON); !result.Valid {
250 return Rejected(result.Msg)
251 }
252
253 return s.Ingest(ctx, ev, connCtx)
254 }
255