dispatcher_test.go raw
1 package events
2
3 import (
4 "sync"
5 "sync/atomic"
6 "testing"
7 "time"
8 )
9
10 func TestDispatcherPublish(t *testing.T) {
11 d := NewDispatcher(DefaultDispatcherConfig())
12 defer d.Stop()
13
14 var received atomic.Int32
15
16 d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
17 received.Add(1)
18 }, EventSavedType))
19
20 // Should be received
21 d.Publish(NewEventSaved(nil, 1, false, false))
22
23 // Should not be received (different type)
24 d.Publish(NewEventDeleted(nil, nil, 1))
25
26 if received.Load() != 1 {
27 t.Errorf("expected 1 event received, got %d", received.Load())
28 }
29 }
30
31 func TestDispatcherPublishAsync(t *testing.T) {
32 d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100})
33 defer d.Stop()
34
35 var received atomic.Int32
36 var wg sync.WaitGroup
37
38 d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
39 received.Add(1)
40 wg.Done()
41 }))
42
43 wg.Add(10)
44 for i := 0; i < 10; i++ {
45 d.PublishAsync(NewEventSaved(nil, uint64(i), false, false))
46 }
47
48 // Wait for all events to be processed
49 done := make(chan struct{})
50 go func() {
51 wg.Wait()
52 close(done)
53 }()
54
55 select {
56 case <-done:
57 case <-time.After(time.Second):
58 t.Fatal("timeout waiting for async events")
59 }
60
61 if received.Load() != 10 {
62 t.Errorf("expected 10 events received, got %d", received.Load())
63 }
64 }
65
66 func TestDispatcherMultipleSubscribers(t *testing.T) {
67 d := NewDispatcher(DefaultDispatcherConfig())
68 defer d.Stop()
69
70 var count1, count2 atomic.Int32
71
72 d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
73 count1.Add(1)
74 }))
75
76 d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
77 count2.Add(1)
78 }))
79
80 d.Publish(NewEventSaved(nil, 1, false, false))
81
82 if count1.Load() != 1 || count2.Load() != 1 {
83 t.Errorf("expected both subscribers to receive event, got %d and %d", count1.Load(), count2.Load())
84 }
85 }
86
87 func TestDispatcherUnsubscribe(t *testing.T) {
88 d := NewDispatcher(DefaultDispatcherConfig())
89 defer d.Stop()
90
91 var received atomic.Int32
92
93 sub := NewSubscriberForAll(func(e DomainEvent) {
94 received.Add(1)
95 })
96
97 d.Subscribe(sub)
98 d.Publish(NewEventSaved(nil, 1, false, false))
99
100 d.Unsubscribe(sub)
101 d.Publish(NewEventSaved(nil, 2, false, false))
102
103 if received.Load() != 1 {
104 t.Errorf("expected 1 event received after unsubscribe, got %d", received.Load())
105 }
106 }
107
108 func TestDispatcherTypedSubscription(t *testing.T) {
109 d := NewDispatcher(DefaultDispatcherConfig())
110 defer d.Stop()
111
112 var savedCount atomic.Int32
113 var deletedCount atomic.Int32
114
115 d.OnEventSaved(func(e *EventSaved) {
116 savedCount.Add(1)
117 })
118
119 d.OnEventDeleted(func(e *EventDeleted) {
120 deletedCount.Add(1)
121 })
122
123 d.Publish(NewEventSaved(nil, 1, false, false))
124 d.Publish(NewEventDeleted(nil, nil, 1))
125
126 if savedCount.Load() != 1 {
127 t.Errorf("expected 1 saved event, got %d", savedCount.Load())
128 }
129 if deletedCount.Load() != 1 {
130 t.Errorf("expected 1 deleted event, got %d", deletedCount.Load())
131 }
132 }
133
134 func TestDispatcherStats(t *testing.T) {
135 d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100})
136 defer d.Stop()
137
138 d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {}))
139
140 d.Publish(NewEventSaved(nil, 1, false, false))
141 d.Publish(NewEventSaved(nil, 2, false, false))
142
143 stats := d.Stats()
144 if stats.EventsPublished != 2 {
145 t.Errorf("expected 2 events published, got %d", stats.EventsPublished)
146 }
147 if stats.SubscriberCount != 1 {
148 t.Errorf("expected 1 subscriber, got %d", stats.SubscriberCount)
149 }
150 if stats.QueueCapacity != 100 {
151 t.Errorf("expected queue capacity 100, got %d", stats.QueueCapacity)
152 }
153 }
154
155 func TestDispatcherQueueFull(t *testing.T) {
156 d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 1})
157 defer d.Stop()
158
159 // Block the processor
160 blocker := make(chan struct{})
161 d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
162 <-blocker
163 }))
164
165 // First should succeed
166 if !d.PublishAsync(NewEventSaved(nil, 1, false, false)) {
167 t.Error("first async publish should succeed")
168 }
169
170 // Wait for it to be picked up
171 time.Sleep(10 * time.Millisecond)
172
173 // Second should fail (queue full)
174 if d.PublishAsync(NewEventSaved(nil, 2, false, false)) {
175 // might succeed if the first was already picked up
176 }
177
178 close(blocker)
179 }
180
181 func TestEventTypes(t *testing.T) {
182 types := AllEventTypes()
183 if len(types) == 0 {
184 t.Error("expected event types to be registered")
185 }
186
187 // Check all types are unique
188 seen := make(map[string]bool)
189 for _, typ := range types {
190 if seen[typ] {
191 t.Errorf("duplicate event type: %s", typ)
192 }
193 seen[typ] = true
194 }
195 }
196
197 func TestDomainEventInterface(t *testing.T) {
198 events := []DomainEvent{
199 NewEventSaved(nil, 1, true, false),
200 NewEventDeleted([]byte{1}, []byte{2}, 3),
201 NewFollowListUpdated([]byte{1}, nil, nil),
202 NewACLMembershipChanged([]byte{1}, "none", "write", "followed"),
203 NewPolicyConfigUpdated([]byte{1}, map[string]interface{}{"key": "value"}),
204 NewPolicyFollowsUpdated([]byte{1}, 10, nil),
205 NewRelayGroupConfigChanged(nil),
206 NewClusterMembershipChanged(nil, "join"),
207 NewSyncSerialUpdated(100),
208 NewUserAuthenticated([]byte{1}, "write", true, "conn-123"),
209 NewConnectionOpened("conn-123", "192.168.1.1:1234"),
210 NewConnectionClosed("conn-123", time.Hour, 100, 50),
211 NewSubscriptionCreated("sub-1", "conn-123", 3),
212 NewSubscriptionClosed("sub-1", "conn-123", 25),
213 NewMemberJoined([]byte{1}, "invite-abc"),
214 NewMemberLeft([]byte{1}),
215 }
216
217 for _, e := range events {
218 if e.OccurredAt().IsZero() {
219 t.Errorf("event %s has zero timestamp", e.EventType())
220 }
221 if e.EventType() == "" {
222 t.Error("event has empty type")
223 }
224 }
225 }
226
227 func TestSubscriberFunc(t *testing.T) {
228 var called bool
229 sf := NewSubscriberFunc(
230 func(e DomainEvent) { called = true },
231 func(typ string) bool { return typ == EventSavedType },
232 )
233
234 if !sf.Supports(EventSavedType) {
235 t.Error("should support EventSavedType")
236 }
237 if sf.Supports(EventDeletedType) {
238 t.Error("should not support EventDeletedType")
239 }
240
241 sf.Handle(NewEventSaved(nil, 1, false, false))
242 if !called {
243 t.Error("handler was not called")
244 }
245 }
246