dispatcher_test.go raw

   1  package events
   2  
   3  import (
   4  	"sync"
   5  	"sync/atomic"
   6  	"testing"
   7  	"time"
   8  )
   9  
  10  func TestDispatcherPublish(t *testing.T) {
  11  	d := NewDispatcher(DefaultDispatcherConfig())
  12  	defer d.Stop()
  13  
  14  	var received atomic.Int32
  15  
  16  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
  17  		received.Add(1)
  18  	}, EventSavedType))
  19  
  20  	// Should be received
  21  	d.Publish(NewEventSaved(nil, 1, false, false))
  22  
  23  	// Should not be received (different type)
  24  	d.Publish(NewEventDeleted(nil, nil, 1))
  25  
  26  	if received.Load() != 1 {
  27  		t.Errorf("expected 1 event received, got %d", received.Load())
  28  	}
  29  }
  30  
  31  func TestDispatcherPublishAsync(t *testing.T) {
  32  	d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100})
  33  	defer d.Stop()
  34  
  35  	var received atomic.Int32
  36  	var wg sync.WaitGroup
  37  
  38  	d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
  39  		received.Add(1)
  40  		wg.Done()
  41  	}))
  42  
  43  	wg.Add(10)
  44  	for i := 0; i < 10; i++ {
  45  		d.PublishAsync(NewEventSaved(nil, uint64(i), false, false))
  46  	}
  47  
  48  	// Wait for all events to be processed
  49  	done := make(chan struct{})
  50  	go func() {
  51  		wg.Wait()
  52  		close(done)
  53  	}()
  54  
  55  	select {
  56  	case <-done:
  57  	case <-time.After(time.Second):
  58  		t.Fatal("timeout waiting for async events")
  59  	}
  60  
  61  	if received.Load() != 10 {
  62  		t.Errorf("expected 10 events received, got %d", received.Load())
  63  	}
  64  }
  65  
  66  func TestDispatcherMultipleSubscribers(t *testing.T) {
  67  	d := NewDispatcher(DefaultDispatcherConfig())
  68  	defer d.Stop()
  69  
  70  	var count1, count2 atomic.Int32
  71  
  72  	d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
  73  		count1.Add(1)
  74  	}))
  75  
  76  	d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
  77  		count2.Add(1)
  78  	}))
  79  
  80  	d.Publish(NewEventSaved(nil, 1, false, false))
  81  
  82  	if count1.Load() != 1 || count2.Load() != 1 {
  83  		t.Errorf("expected both subscribers to receive event, got %d and %d", count1.Load(), count2.Load())
  84  	}
  85  }
  86  
  87  func TestDispatcherUnsubscribe(t *testing.T) {
  88  	d := NewDispatcher(DefaultDispatcherConfig())
  89  	defer d.Stop()
  90  
  91  	var received atomic.Int32
  92  
  93  	sub := NewSubscriberForAll(func(e DomainEvent) {
  94  		received.Add(1)
  95  	})
  96  
  97  	d.Subscribe(sub)
  98  	d.Publish(NewEventSaved(nil, 1, false, false))
  99  
 100  	d.Unsubscribe(sub)
 101  	d.Publish(NewEventSaved(nil, 2, false, false))
 102  
 103  	if received.Load() != 1 {
 104  		t.Errorf("expected 1 event received after unsubscribe, got %d", received.Load())
 105  	}
 106  }
 107  
 108  func TestDispatcherTypedSubscription(t *testing.T) {
 109  	d := NewDispatcher(DefaultDispatcherConfig())
 110  	defer d.Stop()
 111  
 112  	var savedCount atomic.Int32
 113  	var deletedCount atomic.Int32
 114  
 115  	d.OnEventSaved(func(e *EventSaved) {
 116  		savedCount.Add(1)
 117  	})
 118  
 119  	d.OnEventDeleted(func(e *EventDeleted) {
 120  		deletedCount.Add(1)
 121  	})
 122  
 123  	d.Publish(NewEventSaved(nil, 1, false, false))
 124  	d.Publish(NewEventDeleted(nil, nil, 1))
 125  
 126  	if savedCount.Load() != 1 {
 127  		t.Errorf("expected 1 saved event, got %d", savedCount.Load())
 128  	}
 129  	if deletedCount.Load() != 1 {
 130  		t.Errorf("expected 1 deleted event, got %d", deletedCount.Load())
 131  	}
 132  }
 133  
 134  func TestDispatcherStats(t *testing.T) {
 135  	d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100})
 136  	defer d.Stop()
 137  
 138  	d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {}))
 139  
 140  	d.Publish(NewEventSaved(nil, 1, false, false))
 141  	d.Publish(NewEventSaved(nil, 2, false, false))
 142  
 143  	stats := d.Stats()
 144  	if stats.EventsPublished != 2 {
 145  		t.Errorf("expected 2 events published, got %d", stats.EventsPublished)
 146  	}
 147  	if stats.SubscriberCount != 1 {
 148  		t.Errorf("expected 1 subscriber, got %d", stats.SubscriberCount)
 149  	}
 150  	if stats.QueueCapacity != 100 {
 151  		t.Errorf("expected queue capacity 100, got %d", stats.QueueCapacity)
 152  	}
 153  }
 154  
 155  func TestDispatcherQueueFull(t *testing.T) {
 156  	d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 1})
 157  	defer d.Stop()
 158  
 159  	// Block the processor
 160  	blocker := make(chan struct{})
 161  	d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
 162  		<-blocker
 163  	}))
 164  
 165  	// First should succeed
 166  	if !d.PublishAsync(NewEventSaved(nil, 1, false, false)) {
 167  		t.Error("first async publish should succeed")
 168  	}
 169  
 170  	// Wait for it to be picked up
 171  	time.Sleep(10 * time.Millisecond)
 172  
 173  	// Second should fail (queue full)
 174  	if d.PublishAsync(NewEventSaved(nil, 2, false, false)) {
 175  		// might succeed if the first was already picked up
 176  	}
 177  
 178  	close(blocker)
 179  }
 180  
 181  func TestEventTypes(t *testing.T) {
 182  	types := AllEventTypes()
 183  	if len(types) == 0 {
 184  		t.Error("expected event types to be registered")
 185  	}
 186  
 187  	// Check all types are unique
 188  	seen := make(map[string]bool)
 189  	for _, typ := range types {
 190  		if seen[typ] {
 191  			t.Errorf("duplicate event type: %s", typ)
 192  		}
 193  		seen[typ] = true
 194  	}
 195  }
 196  
 197  func TestDomainEventInterface(t *testing.T) {
 198  	events := []DomainEvent{
 199  		NewEventSaved(nil, 1, true, false),
 200  		NewEventDeleted([]byte{1}, []byte{2}, 3),
 201  		NewFollowListUpdated([]byte{1}, nil, nil),
 202  		NewACLMembershipChanged([]byte{1}, "none", "write", "followed"),
 203  		NewPolicyConfigUpdated([]byte{1}, map[string]interface{}{"key": "value"}),
 204  		NewPolicyFollowsUpdated([]byte{1}, 10, nil),
 205  		NewRelayGroupConfigChanged(nil),
 206  		NewClusterMembershipChanged(nil, "join"),
 207  		NewSyncSerialUpdated(100),
 208  		NewUserAuthenticated([]byte{1}, "write", true, "conn-123"),
 209  		NewConnectionOpened("conn-123", "192.168.1.1:1234"),
 210  		NewConnectionClosed("conn-123", time.Hour, 100, 50),
 211  		NewSubscriptionCreated("sub-1", "conn-123", 3),
 212  		NewSubscriptionClosed("sub-1", "conn-123", 25),
 213  		NewMemberJoined([]byte{1}, "invite-abc"),
 214  		NewMemberLeft([]byte{1}),
 215  	}
 216  
 217  	for _, e := range events {
 218  		if e.OccurredAt().IsZero() {
 219  			t.Errorf("event %s has zero timestamp", e.EventType())
 220  		}
 221  		if e.EventType() == "" {
 222  			t.Error("event has empty type")
 223  		}
 224  	}
 225  }
 226  
 227  func TestSubscriberFunc(t *testing.T) {
 228  	var called bool
 229  	sf := NewSubscriberFunc(
 230  		func(e DomainEvent) { called = true },
 231  		func(typ string) bool { return typ == EventSavedType },
 232  	)
 233  
 234  	if !sf.Supports(EventSavedType) {
 235  		t.Error("should support EventSavedType")
 236  	}
 237  	if sf.Supports(EventDeletedType) {
 238  		t.Error("should not support EventDeletedType")
 239  	}
 240  
 241  	sf.Handle(NewEventSaved(nil, 1, false, false))
 242  	if !called {
 243  		t.Error("handler was not called")
 244  	}
 245  }
 246