dispatcher.go raw
1 package events
2
3 import (
4 "context"
5 "sync"
6 "sync/atomic"
7
8 "next.orly.dev/pkg/lol/log"
9 )
10
11 // Subscriber handles domain events.
12 type Subscriber interface {
13 // Handle processes the given domain event.
14 Handle(event DomainEvent)
15 // Supports returns true if this subscriber handles the given event type.
16 Supports(eventType string) bool
17 }
18
19 // SubscriberFunc is a function that can be used as a Subscriber.
20 type SubscriberFunc struct {
21 handleFunc func(DomainEvent)
22 supportsFunc func(string) bool
23 }
24
25 // Handle implements Subscriber.
26 func (s *SubscriberFunc) Handle(event DomainEvent) {
27 s.handleFunc(event)
28 }
29
30 // Supports implements Subscriber.
31 func (s *SubscriberFunc) Supports(eventType string) bool {
32 return s.supportsFunc(eventType)
33 }
34
35 // NewSubscriberFunc creates a new SubscriberFunc.
36 func NewSubscriberFunc(handle func(DomainEvent), supports func(string) bool) *SubscriberFunc {
37 return &SubscriberFunc{
38 handleFunc: handle,
39 supportsFunc: supports,
40 }
41 }
42
43 // NewSubscriberForTypes creates a subscriber that handles specific event types.
44 func NewSubscriberForTypes(handle func(DomainEvent), types ...string) *SubscriberFunc {
45 typeSet := make(map[string]struct{}, len(types))
46 for _, t := range types {
47 typeSet[t] = struct{}{}
48 }
49 return &SubscriberFunc{
50 handleFunc: handle,
51 supportsFunc: func(eventType string) bool {
52 _, ok := typeSet[eventType]
53 return ok
54 },
55 }
56 }
57
58 // NewSubscriberForAll creates a subscriber that handles all event types.
59 func NewSubscriberForAll(handle func(DomainEvent)) *SubscriberFunc {
60 return &SubscriberFunc{
61 handleFunc: handle,
62 supportsFunc: func(string) bool { return true },
63 }
64 }
65
66 // Dispatcher publishes domain events to subscribers.
67 type Dispatcher struct {
68 subscribers []Subscriber
69 mu sync.RWMutex
70
71 asyncChan chan DomainEvent
72 ctx context.Context
73 cancel context.CancelFunc
74 wg sync.WaitGroup
75
76 // Metrics
77 eventsPublished atomic.Int64
78 eventsDropped atomic.Int64
79 asyncQueueSize int
80 }
81
82 // DispatcherConfig configures the dispatcher.
83 type DispatcherConfig struct {
84 // AsyncBufferSize is the buffer size for async event delivery.
85 // Default: 1000
86 AsyncBufferSize int
87 }
88
89 // DefaultDispatcherConfig returns the default dispatcher configuration.
90 func DefaultDispatcherConfig() DispatcherConfig {
91 return DispatcherConfig{
92 AsyncBufferSize: 1000,
93 }
94 }
95
96 // NewDispatcher creates a new event dispatcher.
97 func NewDispatcher(cfg DispatcherConfig) *Dispatcher {
98 if cfg.AsyncBufferSize <= 0 {
99 cfg.AsyncBufferSize = 1000
100 }
101
102 ctx, cancel := context.WithCancel(context.Background())
103 d := &Dispatcher{
104 asyncChan: make(chan DomainEvent, cfg.AsyncBufferSize),
105 ctx: ctx,
106 cancel: cancel,
107 asyncQueueSize: cfg.AsyncBufferSize,
108 }
109
110 // Start async processor
111 d.wg.Add(1)
112 go d.processAsync()
113
114 return d
115 }
116
117 // Subscribe adds a subscriber to receive events.
118 func (d *Dispatcher) Subscribe(s Subscriber) {
119 d.mu.Lock()
120 defer d.mu.Unlock()
121 d.subscribers = append(d.subscribers, s)
122 }
123
124 // Unsubscribe removes a subscriber.
125 func (d *Dispatcher) Unsubscribe(s Subscriber) {
126 d.mu.Lock()
127 defer d.mu.Unlock()
128 for i, sub := range d.subscribers {
129 if sub == s {
130 d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...)
131 return
132 }
133 }
134 }
135
136 // Publish sends an event to all matching subscribers synchronously.
137 // This blocks until all subscribers have processed the event.
138 func (d *Dispatcher) Publish(event DomainEvent) {
139 d.mu.RLock()
140 subscribers := d.subscribers
141 d.mu.RUnlock()
142
143 for _, s := range subscribers {
144 if s.Supports(event.EventType()) {
145 s.Handle(event)
146 }
147 }
148 d.eventsPublished.Add(1)
149 }
150
151 // PublishAsync sends an event to be processed asynchronously.
152 // This returns immediately and the event is processed in a background goroutine.
153 // Returns true if the event was queued, false if the queue is full.
154 func (d *Dispatcher) PublishAsync(event DomainEvent) bool {
155 select {
156 case d.asyncChan <- event:
157 return true
158 default:
159 d.eventsDropped.Add(1)
160 log.W.F("domain event dropped (queue full): %s", event.EventType())
161 return false
162 }
163 }
164
165 // processAsync handles async event delivery.
166 func (d *Dispatcher) processAsync() {
167 defer d.wg.Done()
168
169 for {
170 select {
171 case <-d.ctx.Done():
172 // Drain remaining events before exiting
173 for {
174 select {
175 case event := <-d.asyncChan:
176 d.Publish(event)
177 default:
178 return
179 }
180 }
181 case event := <-d.asyncChan:
182 d.Publish(event)
183 }
184 }
185 }
186
187 // Stop stops the dispatcher and waits for pending events to be processed.
188 func (d *Dispatcher) Stop() {
189 d.cancel()
190 d.wg.Wait()
191 }
192
193 // Stats returns dispatcher statistics.
194 func (d *Dispatcher) Stats() DispatcherStats {
195 d.mu.RLock()
196 subscriberCount := len(d.subscribers)
197 d.mu.RUnlock()
198
199 return DispatcherStats{
200 EventsPublished: d.eventsPublished.Load(),
201 EventsDropped: d.eventsDropped.Load(),
202 SubscriberCount: subscriberCount,
203 QueueSize: len(d.asyncChan),
204 QueueCapacity: d.asyncQueueSize,
205 }
206 }
207
208 // DispatcherStats contains dispatcher statistics.
209 type DispatcherStats struct {
210 EventsPublished int64
211 EventsDropped int64
212 SubscriberCount int
213 QueueSize int
214 QueueCapacity int
215 }
216
217 // =============================================================================
218 // Global Dispatcher (Optional Convenience)
219 // =============================================================================
220
221 var (
222 globalDispatcher *Dispatcher
223 globalDispatcherOnce sync.Once
224 )
225
226 // Global returns the global dispatcher instance.
227 // Creates one with default config if not already created.
228 func Global() *Dispatcher {
229 globalDispatcherOnce.Do(func() {
230 globalDispatcher = NewDispatcher(DefaultDispatcherConfig())
231 })
232 return globalDispatcher
233 }
234
235 // SetGlobal sets the global dispatcher instance.
236 // This should be called early in application startup if custom config is needed.
237 func SetGlobal(d *Dispatcher) {
238 globalDispatcher = d
239 }
240
241 // =============================================================================
242 // Typed Subscription Helpers
243 // =============================================================================
244
245 // OnEventSaved subscribes to EventSaved events.
246 func (d *Dispatcher) OnEventSaved(handler func(*EventSaved)) {
247 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
248 if es, ok := e.(*EventSaved); ok {
249 handler(es)
250 }
251 }, EventSavedType))
252 }
253
254 // OnEventDeleted subscribes to EventDeleted events.
255 func (d *Dispatcher) OnEventDeleted(handler func(*EventDeleted)) {
256 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
257 if ed, ok := e.(*EventDeleted); ok {
258 handler(ed)
259 }
260 }, EventDeletedType))
261 }
262
263 // OnFollowListUpdated subscribes to FollowListUpdated events.
264 func (d *Dispatcher) OnFollowListUpdated(handler func(*FollowListUpdated)) {
265 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
266 if flu, ok := e.(*FollowListUpdated); ok {
267 handler(flu)
268 }
269 }, FollowListUpdatedType))
270 }
271
272 // OnACLMembershipChanged subscribes to ACLMembershipChanged events.
273 func (d *Dispatcher) OnACLMembershipChanged(handler func(*ACLMembershipChanged)) {
274 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
275 if amc, ok := e.(*ACLMembershipChanged); ok {
276 handler(amc)
277 }
278 }, ACLMembershipChangedType))
279 }
280
281 // OnPolicyConfigUpdated subscribes to PolicyConfigUpdated events.
282 func (d *Dispatcher) OnPolicyConfigUpdated(handler func(*PolicyConfigUpdated)) {
283 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
284 if pcu, ok := e.(*PolicyConfigUpdated); ok {
285 handler(pcu)
286 }
287 }, PolicyConfigUpdatedType))
288 }
289
290 // OnUserAuthenticated subscribes to UserAuthenticated events.
291 func (d *Dispatcher) OnUserAuthenticated(handler func(*UserAuthenticated)) {
292 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
293 if ua, ok := e.(*UserAuthenticated); ok {
294 handler(ua)
295 }
296 }, UserAuthenticatedType))
297 }
298
299 // OnMemberJoined subscribes to MemberJoined events.
300 func (d *Dispatcher) OnMemberJoined(handler func(*MemberJoined)) {
301 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
302 if mj, ok := e.(*MemberJoined); ok {
303 handler(mj)
304 }
305 }, MemberJoinedType))
306 }
307
308 // OnMemberLeft subscribes to MemberLeft events.
309 func (d *Dispatcher) OnMemberLeft(handler func(*MemberLeft)) {
310 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
311 if ml, ok := e.(*MemberLeft); ok {
312 handler(ml)
313 }
314 }, MemberLeftType))
315 }
316