processing.go raw
1 // Package processing provides event processing services for the ORLY relay.
2 // It handles event persistence, delivery to subscribers, and post-save hooks.
3 package processing
4
5 import (
6 "context"
7 "strings"
8 "time"
9
10 "next.orly.dev/pkg/nostr/encoders/event"
11 "next.orly.dev/pkg/nostr/encoders/kind"
12
13 "next.orly.dev/pkg/domain/events"
14 )
15
16 // Result contains the outcome of event processing.
17 type Result struct {
18 Saved bool
19 Duplicate bool
20 Blocked bool
21 BlockMsg string
22 Error error
23 }
24
25 // OK returns a successful processing result.
26 func OK() Result {
27 return Result{Saved: true}
28 }
29
30 // Blocked returns a blocked processing result.
31 func Blocked(msg string) Result {
32 return Result{Blocked: true, BlockMsg: msg}
33 }
34
35 // Failed returns an error processing result.
36 func Failed(err error) Result {
37 return Result{Error: err}
38 }
39
40 // Database abstracts database operations for event processing.
41 type Database interface {
42 // SaveEvent saves an event to the database.
43 SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error)
44 // CheckForDeleted checks if an event has been deleted.
45 CheckForDeleted(ev *event.E, adminOwners [][]byte) error
46 }
47
48 // Publisher abstracts event delivery to subscribers.
49 type Publisher interface {
50 // Deliver sends an event to all matching subscribers.
51 Deliver(ev *event.E)
52 }
53
54 // RateLimiter abstracts rate limiting for write operations.
55 type RateLimiter interface {
56 // IsEnabled returns whether rate limiting is enabled.
57 IsEnabled() bool
58 // Wait blocks until the rate limit allows the operation.
59 Wait(ctx context.Context, opType int) error
60 }
61
62 // SyncManager abstracts sync manager for serial updates.
63 type SyncManager interface {
64 // UpdateSerial updates the serial number after saving an event.
65 UpdateSerial()
66 }
67
68 // ACLRegistry abstracts ACL registry for reconfiguration.
69 type ACLRegistry interface {
70 // Configure reconfigures the ACL system.
71 Configure(cfg ...any) error
72 // Active returns the active ACL mode.
73 Active() string
74 }
75
76 // RelayGroupManager handles relay group configuration events.
77 type RelayGroupManager interface {
78 // ValidateRelayGroupEvent validates a relay group config event.
79 ValidateRelayGroupEvent(ev *event.E) error
80 // HandleRelayGroupEvent processes a relay group event.
81 HandleRelayGroupEvent(ev *event.E, syncMgr any)
82 }
83
84 // ClusterManager handles cluster membership events.
85 type ClusterManager interface {
86 // HandleMembershipEvent processes a cluster membership event.
87 HandleMembershipEvent(ev *event.E) error
88 }
89
90 // DomainEventDispatcher abstracts domain event publishing.
91 type DomainEventDispatcher interface {
92 // PublishAsync queues an event for asynchronous processing.
93 PublishAsync(event events.DomainEvent) bool
94 }
95
96 // Config holds configuration for the processing service.
97 type Config struct {
98 Admins [][]byte
99 Owners [][]byte
100 WriteTimeout time.Duration
101 }
102
103 // DefaultConfig returns the default processing configuration.
104 func DefaultConfig() *Config {
105 return &Config{
106 WriteTimeout: 30 * time.Second,
107 }
108 }
109
110 // Service implements event processing.
111 type Service struct {
112 cfg *Config
113 db Database
114 publisher Publisher
115 rateLimiter RateLimiter
116 syncManager SyncManager
117 aclRegistry ACLRegistry
118 relayGroupMgr RelayGroupManager
119 clusterManager ClusterManager
120 eventDispatcher DomainEventDispatcher
121 }
122
123 // New creates a new processing service.
124 func New(cfg *Config, db Database, publisher Publisher) *Service {
125 if cfg == nil {
126 cfg = DefaultConfig()
127 }
128 return &Service{
129 cfg: cfg,
130 db: db,
131 publisher: publisher,
132 }
133 }
134
135 // SetRateLimiter sets the rate limiter.
136 func (s *Service) SetRateLimiter(rl RateLimiter) {
137 s.rateLimiter = rl
138 }
139
140 // SetSyncManager sets the sync manager.
141 func (s *Service) SetSyncManager(sm SyncManager) {
142 s.syncManager = sm
143 }
144
145 // SetACLRegistry sets the ACL registry.
146 func (s *Service) SetACLRegistry(acl ACLRegistry) {
147 s.aclRegistry = acl
148 }
149
150 // SetRelayGroupManager sets the relay group manager.
151 func (s *Service) SetRelayGroupManager(rgm RelayGroupManager) {
152 s.relayGroupMgr = rgm
153 }
154
155 // SetClusterManager sets the cluster manager.
156 func (s *Service) SetClusterManager(cm ClusterManager) {
157 s.clusterManager = cm
158 }
159
160 // SetEventDispatcher sets the domain event dispatcher.
161 func (s *Service) SetEventDispatcher(d DomainEventDispatcher) {
162 s.eventDispatcher = d
163 }
164
165 // Process saves an event and triggers delivery.
166 func (s *Service) Process(ctx context.Context, ev *event.E) Result {
167 // Check if event was previously deleted (skip for "none" ACL mode and delete events)
168 // Delete events (kind 5) shouldn't be blocked by existing deletes
169 if ev.Kind != kind.EventDeletion.K && s.aclRegistry != nil && s.aclRegistry.Active() != "none" {
170 adminOwners := append(s.cfg.Admins, s.cfg.Owners...)
171 if err := s.db.CheckForDeleted(ev, adminOwners); err != nil {
172 if strings.HasPrefix(err.Error(), "blocked:") {
173 errStr := err.Error()[len("blocked: "):]
174 return Blocked(errStr)
175 }
176 }
177 }
178
179 // Save the event
180 result := s.saveEvent(ctx, ev)
181 if !result.Saved {
182 return result
183 }
184
185 // Run post-save hooks
186 s.runPostSaveHooks(ev)
187
188 // Deliver the event to subscribers
189 s.deliver(ev)
190
191 return OK()
192 }
193
194 // saveEvent handles rate limiting and database persistence.
195 func (s *Service) saveEvent(ctx context.Context, ev *event.E) Result {
196 // Create timeout context
197 saveCtx, cancel := context.WithTimeout(ctx, s.cfg.WriteTimeout)
198 defer cancel()
199
200 // Apply rate limiting (skip for NIP-46 bunker events which need realtime priority)
201 const kindNIP46 = 24133
202 if s.rateLimiter != nil && s.rateLimiter.IsEnabled() && ev.Kind != uint16(kindNIP46) {
203 const writeOpType = 1 // ratelimit.Write
204 s.rateLimiter.Wait(saveCtx, writeOpType)
205 }
206
207 // Save to database
208 _, err := s.db.SaveEvent(saveCtx, ev)
209 if err != nil {
210 if strings.HasPrefix(err.Error(), "blocked:") {
211 errStr := err.Error()[len("blocked: "):]
212 return Blocked(errStr)
213 }
214 return Failed(err)
215 }
216
217 return OK()
218 }
219
220 // deliver sends event to subscribers.
221 func (s *Service) deliver(ev *event.E) {
222 cloned := ev.Clone()
223 go s.publisher.Deliver(cloned)
224 }
225
226 // runPostSaveHooks handles side effects after event persistence.
227 func (s *Service) runPostSaveHooks(ev *event.E) {
228 isAdmin := s.isAdminPubkey(ev.Pubkey)
229 isOwner := s.isOwnerPubkey(ev.Pubkey)
230
231 // Dispatch domain event for saved event
232 if s.eventDispatcher != nil {
233 s.eventDispatcher.PublishAsync(events.NewEventSaved(ev, 0, isAdmin, isOwner))
234 }
235
236 // Handle relay group configuration events
237 if s.relayGroupMgr != nil {
238 if err := s.relayGroupMgr.ValidateRelayGroupEvent(ev); err == nil {
239 if s.syncManager != nil {
240 s.relayGroupMgr.HandleRelayGroupEvent(ev, s.syncManager)
241 }
242 }
243 }
244
245 // Handle cluster membership events (Kind 39108)
246 if ev.Kind == 39108 && s.clusterManager != nil {
247 s.clusterManager.HandleMembershipEvent(ev)
248 }
249
250 // Update serial for distributed synchronization
251 if s.syncManager != nil {
252 s.syncManager.UpdateSerial()
253 }
254
255 // ACL reconfiguration for admin events
256 if isAdmin || isOwner {
257 if ev.Kind == kind.FollowList.K || ev.Kind == kind.RelayListMetadata.K {
258 if s.aclRegistry != nil {
259 go s.aclRegistry.Configure()
260 }
261 }
262 }
263 }
264
265 // isAdminPubkey checks if pubkey is an admin.
266 func (s *Service) isAdminPubkey(pubkey []byte) bool {
267 for _, admin := range s.cfg.Admins {
268 if fastEqual(admin, pubkey) {
269 return true
270 }
271 }
272 return false
273 }
274
275 // isOwnerPubkey checks if pubkey is an owner.
276 func (s *Service) isOwnerPubkey(pubkey []byte) bool {
277 for _, owner := range s.cfg.Owners {
278 if fastEqual(owner, pubkey) {
279 return true
280 }
281 }
282 return false
283 }
284
285 // fastEqual compares two byte slices for equality.
286 func fastEqual(a, b []byte) bool {
287 if len(a) != len(b) {
288 return false
289 }
290 for i := range a {
291 if a[i] != b[i] {
292 return false
293 }
294 }
295 return true
296 }
297