dispatcher.go raw

   1  package events
   2  
   3  import (
   4  	"context"
   5  	"sync"
   6  	"sync/atomic"
   7  
   8  	"next.orly.dev/pkg/lol/log"
   9  )
  10  
  11  // Subscriber handles domain events.
  12  type Subscriber interface {
  13  	// Handle processes the given domain event.
  14  	Handle(event DomainEvent)
  15  	// Supports returns true if this subscriber handles the given event type.
  16  	Supports(eventType string) bool
  17  }
  18  
  19  // SubscriberFunc is a function that can be used as a Subscriber.
  20  type SubscriberFunc struct {
  21  	handleFunc   func(DomainEvent)
  22  	supportsFunc func(string) bool
  23  }
  24  
  25  // Handle implements Subscriber.
  26  func (s *SubscriberFunc) Handle(event DomainEvent) {
  27  	s.handleFunc(event)
  28  }
  29  
  30  // Supports implements Subscriber.
  31  func (s *SubscriberFunc) Supports(eventType string) bool {
  32  	return s.supportsFunc(eventType)
  33  }
  34  
  35  // NewSubscriberFunc creates a new SubscriberFunc.
  36  func NewSubscriberFunc(handle func(DomainEvent), supports func(string) bool) *SubscriberFunc {
  37  	return &SubscriberFunc{
  38  		handleFunc:   handle,
  39  		supportsFunc: supports,
  40  	}
  41  }
  42  
  43  // NewSubscriberForTypes creates a subscriber that handles specific event types.
  44  func NewSubscriberForTypes(handle func(DomainEvent), types ...string) *SubscriberFunc {
  45  	typeSet := make(map[string]struct{}, len(types))
  46  	for _, t := range types {
  47  		typeSet[t] = struct{}{}
  48  	}
  49  	return &SubscriberFunc{
  50  		handleFunc: handle,
  51  		supportsFunc: func(eventType string) bool {
  52  			_, ok := typeSet[eventType]
  53  			return ok
  54  		},
  55  	}
  56  }
  57  
  58  // NewSubscriberForAll creates a subscriber that handles all event types.
  59  func NewSubscriberForAll(handle func(DomainEvent)) *SubscriberFunc {
  60  	return &SubscriberFunc{
  61  		handleFunc:   handle,
  62  		supportsFunc: func(string) bool { return true },
  63  	}
  64  }
  65  
  66  // Dispatcher publishes domain events to subscribers.
  67  type Dispatcher struct {
  68  	subscribers []Subscriber
  69  	mu          sync.RWMutex
  70  
  71  	asyncChan chan DomainEvent
  72  	ctx       context.Context
  73  	cancel    context.CancelFunc
  74  	wg        sync.WaitGroup
  75  
  76  	// Metrics
  77  	eventsPublished atomic.Int64
  78  	eventsDropped   atomic.Int64
  79  	asyncQueueSize  int
  80  }
  81  
  82  // DispatcherConfig configures the dispatcher.
  83  type DispatcherConfig struct {
  84  	// AsyncBufferSize is the buffer size for async event delivery.
  85  	// Default: 1000
  86  	AsyncBufferSize int
  87  }
  88  
  89  // DefaultDispatcherConfig returns the default dispatcher configuration.
  90  func DefaultDispatcherConfig() DispatcherConfig {
  91  	return DispatcherConfig{
  92  		AsyncBufferSize: 1000,
  93  	}
  94  }
  95  
  96  // NewDispatcher creates a new event dispatcher.
  97  func NewDispatcher(cfg DispatcherConfig) *Dispatcher {
  98  	if cfg.AsyncBufferSize <= 0 {
  99  		cfg.AsyncBufferSize = 1000
 100  	}
 101  
 102  	ctx, cancel := context.WithCancel(context.Background())
 103  	d := &Dispatcher{
 104  		asyncChan:      make(chan DomainEvent, cfg.AsyncBufferSize),
 105  		ctx:            ctx,
 106  		cancel:         cancel,
 107  		asyncQueueSize: cfg.AsyncBufferSize,
 108  	}
 109  
 110  	// Start async processor
 111  	d.wg.Add(1)
 112  	go d.processAsync()
 113  
 114  	return d
 115  }
 116  
 117  // Subscribe adds a subscriber to receive events.
 118  func (d *Dispatcher) Subscribe(s Subscriber) {
 119  	d.mu.Lock()
 120  	defer d.mu.Unlock()
 121  	d.subscribers = append(d.subscribers, s)
 122  }
 123  
 124  // Unsubscribe removes a subscriber.
 125  func (d *Dispatcher) Unsubscribe(s Subscriber) {
 126  	d.mu.Lock()
 127  	defer d.mu.Unlock()
 128  	for i, sub := range d.subscribers {
 129  		if sub == s {
 130  			d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...)
 131  			return
 132  		}
 133  	}
 134  }
 135  
 136  // Publish sends an event to all matching subscribers synchronously.
 137  // This blocks until all subscribers have processed the event.
 138  func (d *Dispatcher) Publish(event DomainEvent) {
 139  	d.mu.RLock()
 140  	subscribers := d.subscribers
 141  	d.mu.RUnlock()
 142  
 143  	for _, s := range subscribers {
 144  		if s.Supports(event.EventType()) {
 145  			s.Handle(event)
 146  		}
 147  	}
 148  	d.eventsPublished.Add(1)
 149  }
 150  
 151  // PublishAsync sends an event to be processed asynchronously.
 152  // This returns immediately and the event is processed in a background goroutine.
 153  // Returns true if the event was queued, false if the queue is full.
 154  func (d *Dispatcher) PublishAsync(event DomainEvent) bool {
 155  	select {
 156  	case d.asyncChan <- event:
 157  		return true
 158  	default:
 159  		d.eventsDropped.Add(1)
 160  		log.W.F("domain event dropped (queue full): %s", event.EventType())
 161  		return false
 162  	}
 163  }
 164  
 165  // processAsync handles async event delivery.
 166  func (d *Dispatcher) processAsync() {
 167  	defer d.wg.Done()
 168  
 169  	for {
 170  		select {
 171  		case <-d.ctx.Done():
 172  			// Drain remaining events before exiting
 173  			for {
 174  				select {
 175  				case event := <-d.asyncChan:
 176  					d.Publish(event)
 177  				default:
 178  					return
 179  				}
 180  			}
 181  		case event := <-d.asyncChan:
 182  			d.Publish(event)
 183  		}
 184  	}
 185  }
 186  
 187  // Stop stops the dispatcher and waits for pending events to be processed.
 188  func (d *Dispatcher) Stop() {
 189  	d.cancel()
 190  	d.wg.Wait()
 191  }
 192  
 193  // Stats returns dispatcher statistics.
 194  func (d *Dispatcher) Stats() DispatcherStats {
 195  	d.mu.RLock()
 196  	subscriberCount := len(d.subscribers)
 197  	d.mu.RUnlock()
 198  
 199  	return DispatcherStats{
 200  		EventsPublished: d.eventsPublished.Load(),
 201  		EventsDropped:   d.eventsDropped.Load(),
 202  		SubscriberCount: subscriberCount,
 203  		QueueSize:       len(d.asyncChan),
 204  		QueueCapacity:   d.asyncQueueSize,
 205  	}
 206  }
 207  
 208  // DispatcherStats contains dispatcher statistics.
 209  type DispatcherStats struct {
 210  	EventsPublished int64
 211  	EventsDropped   int64
 212  	SubscriberCount int
 213  	QueueSize       int
 214  	QueueCapacity   int
 215  }
 216  
 217  // =============================================================================
 218  // Global Dispatcher (Optional Convenience)
 219  // =============================================================================
 220  
 221  var (
 222  	globalDispatcher     *Dispatcher
 223  	globalDispatcherOnce sync.Once
 224  )
 225  
 226  // Global returns the global dispatcher instance.
 227  // Creates one with default config if not already created.
 228  func Global() *Dispatcher {
 229  	globalDispatcherOnce.Do(func() {
 230  		globalDispatcher = NewDispatcher(DefaultDispatcherConfig())
 231  	})
 232  	return globalDispatcher
 233  }
 234  
 235  // SetGlobal sets the global dispatcher instance.
 236  // This should be called early in application startup if custom config is needed.
 237  func SetGlobal(d *Dispatcher) {
 238  	globalDispatcher = d
 239  }
 240  
 241  // =============================================================================
 242  // Typed Subscription Helpers
 243  // =============================================================================
 244  
 245  // OnEventSaved subscribes to EventSaved events.
 246  func (d *Dispatcher) OnEventSaved(handler func(*EventSaved)) {
 247  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
 248  		if es, ok := e.(*EventSaved); ok {
 249  			handler(es)
 250  		}
 251  	}, EventSavedType))
 252  }
 253  
 254  // OnEventDeleted subscribes to EventDeleted events.
 255  func (d *Dispatcher) OnEventDeleted(handler func(*EventDeleted)) {
 256  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
 257  		if ed, ok := e.(*EventDeleted); ok {
 258  			handler(ed)
 259  		}
 260  	}, EventDeletedType))
 261  }
 262  
 263  // OnFollowListUpdated subscribes to FollowListUpdated events.
 264  func (d *Dispatcher) OnFollowListUpdated(handler func(*FollowListUpdated)) {
 265  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
 266  		if flu, ok := e.(*FollowListUpdated); ok {
 267  			handler(flu)
 268  		}
 269  	}, FollowListUpdatedType))
 270  }
 271  
 272  // OnACLMembershipChanged subscribes to ACLMembershipChanged events.
 273  func (d *Dispatcher) OnACLMembershipChanged(handler func(*ACLMembershipChanged)) {
 274  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
 275  		if amc, ok := e.(*ACLMembershipChanged); ok {
 276  			handler(amc)
 277  		}
 278  	}, ACLMembershipChangedType))
 279  }
 280  
 281  // OnPolicyConfigUpdated subscribes to PolicyConfigUpdated events.
 282  func (d *Dispatcher) OnPolicyConfigUpdated(handler func(*PolicyConfigUpdated)) {
 283  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
 284  		if pcu, ok := e.(*PolicyConfigUpdated); ok {
 285  			handler(pcu)
 286  		}
 287  	}, PolicyConfigUpdatedType))
 288  }
 289  
 290  // OnUserAuthenticated subscribes to UserAuthenticated events.
 291  func (d *Dispatcher) OnUserAuthenticated(handler func(*UserAuthenticated)) {
 292  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
 293  		if ua, ok := e.(*UserAuthenticated); ok {
 294  			handler(ua)
 295  		}
 296  	}, UserAuthenticatedType))
 297  }
 298  
 299  // OnMemberJoined subscribes to MemberJoined events.
 300  func (d *Dispatcher) OnMemberJoined(handler func(*MemberJoined)) {
 301  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
 302  		if mj, ok := e.(*MemberJoined); ok {
 303  			handler(mj)
 304  		}
 305  	}, MemberJoinedType))
 306  }
 307  
 308  // OnMemberLeft subscribes to MemberLeft events.
 309  func (d *Dispatcher) OnMemberLeft(handler func(*MemberLeft)) {
 310  	d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
 311  		if ml, ok := e.(*MemberLeft); ok {
 312  			handler(ml)
 313  		}
 314  	}, MemberLeftType))
 315  }
 316