processing.go raw

   1  // Package processing provides event processing services for the ORLY relay.
   2  // It handles event persistence, delivery to subscribers, and post-save hooks.
   3  package processing
   4  
   5  import (
   6  	"context"
   7  	"strings"
   8  	"time"
   9  
  10  	"next.orly.dev/pkg/nostr/encoders/event"
  11  	"next.orly.dev/pkg/nostr/encoders/kind"
  12  
  13  	"next.orly.dev/pkg/domain/events"
  14  )
  15  
  16  // Result contains the outcome of event processing.
  17  type Result struct {
  18  	Saved     bool
  19  	Duplicate bool
  20  	Blocked   bool
  21  	BlockMsg  string
  22  	Error     error
  23  }
  24  
  25  // OK returns a successful processing result.
  26  func OK() Result {
  27  	return Result{Saved: true}
  28  }
  29  
  30  // Blocked returns a blocked processing result.
  31  func Blocked(msg string) Result {
  32  	return Result{Blocked: true, BlockMsg: msg}
  33  }
  34  
  35  // Failed returns an error processing result.
  36  func Failed(err error) Result {
  37  	return Result{Error: err}
  38  }
  39  
  40  // Database abstracts database operations for event processing.
  41  type Database interface {
  42  	// SaveEvent saves an event to the database.
  43  	SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error)
  44  	// CheckForDeleted checks if an event has been deleted.
  45  	CheckForDeleted(ev *event.E, adminOwners [][]byte) error
  46  }
  47  
  48  // Publisher abstracts event delivery to subscribers.
  49  type Publisher interface {
  50  	// Deliver sends an event to all matching subscribers.
  51  	Deliver(ev *event.E)
  52  }
  53  
  54  // RateLimiter abstracts rate limiting for write operations.
  55  type RateLimiter interface {
  56  	// IsEnabled returns whether rate limiting is enabled.
  57  	IsEnabled() bool
  58  	// Wait blocks until the rate limit allows the operation.
  59  	Wait(ctx context.Context, opType int) error
  60  }
  61  
  62  // SyncManager abstracts sync manager for serial updates.
  63  type SyncManager interface {
  64  	// UpdateSerial updates the serial number after saving an event.
  65  	UpdateSerial()
  66  }
  67  
  68  // ACLRegistry abstracts ACL registry for reconfiguration.
  69  type ACLRegistry interface {
  70  	// Configure reconfigures the ACL system.
  71  	Configure(cfg ...any) error
  72  	// Active returns the active ACL mode.
  73  	Active() string
  74  }
  75  
  76  // RelayGroupManager handles relay group configuration events.
  77  type RelayGroupManager interface {
  78  	// ValidateRelayGroupEvent validates a relay group config event.
  79  	ValidateRelayGroupEvent(ev *event.E) error
  80  	// HandleRelayGroupEvent processes a relay group event.
  81  	HandleRelayGroupEvent(ev *event.E, syncMgr any)
  82  }
  83  
  84  // ClusterManager handles cluster membership events.
  85  type ClusterManager interface {
  86  	// HandleMembershipEvent processes a cluster membership event.
  87  	HandleMembershipEvent(ev *event.E) error
  88  }
  89  
  90  // DomainEventDispatcher abstracts domain event publishing.
  91  type DomainEventDispatcher interface {
  92  	// PublishAsync queues an event for asynchronous processing.
  93  	PublishAsync(event events.DomainEvent) bool
  94  }
  95  
  96  // Config holds configuration for the processing service.
  97  type Config struct {
  98  	Admins       [][]byte
  99  	Owners       [][]byte
 100  	WriteTimeout time.Duration
 101  }
 102  
 103  // DefaultConfig returns the default processing configuration.
 104  func DefaultConfig() *Config {
 105  	return &Config{
 106  		WriteTimeout: 30 * time.Second,
 107  	}
 108  }
 109  
 110  // Service implements event processing.
 111  type Service struct {
 112  	cfg             *Config
 113  	db              Database
 114  	publisher       Publisher
 115  	rateLimiter     RateLimiter
 116  	syncManager     SyncManager
 117  	aclRegistry     ACLRegistry
 118  	relayGroupMgr   RelayGroupManager
 119  	clusterManager  ClusterManager
 120  	eventDispatcher DomainEventDispatcher
 121  }
 122  
 123  // New creates a new processing service.
 124  func New(cfg *Config, db Database, publisher Publisher) *Service {
 125  	if cfg == nil {
 126  		cfg = DefaultConfig()
 127  	}
 128  	return &Service{
 129  		cfg:       cfg,
 130  		db:        db,
 131  		publisher: publisher,
 132  	}
 133  }
 134  
 135  // SetRateLimiter sets the rate limiter.
 136  func (s *Service) SetRateLimiter(rl RateLimiter) {
 137  	s.rateLimiter = rl
 138  }
 139  
 140  // SetSyncManager sets the sync manager.
 141  func (s *Service) SetSyncManager(sm SyncManager) {
 142  	s.syncManager = sm
 143  }
 144  
 145  // SetACLRegistry sets the ACL registry.
 146  func (s *Service) SetACLRegistry(acl ACLRegistry) {
 147  	s.aclRegistry = acl
 148  }
 149  
 150  // SetRelayGroupManager sets the relay group manager.
 151  func (s *Service) SetRelayGroupManager(rgm RelayGroupManager) {
 152  	s.relayGroupMgr = rgm
 153  }
 154  
 155  // SetClusterManager sets the cluster manager.
 156  func (s *Service) SetClusterManager(cm ClusterManager) {
 157  	s.clusterManager = cm
 158  }
 159  
 160  // SetEventDispatcher sets the domain event dispatcher.
 161  func (s *Service) SetEventDispatcher(d DomainEventDispatcher) {
 162  	s.eventDispatcher = d
 163  }
 164  
 165  // Process saves an event and triggers delivery.
 166  func (s *Service) Process(ctx context.Context, ev *event.E) Result {
 167  	// Check if event was previously deleted (skip for "none" ACL mode and delete events)
 168  	// Delete events (kind 5) shouldn't be blocked by existing deletes
 169  	if ev.Kind != kind.EventDeletion.K && s.aclRegistry != nil && s.aclRegistry.Active() != "none" {
 170  		adminOwners := append(s.cfg.Admins, s.cfg.Owners...)
 171  		if err := s.db.CheckForDeleted(ev, adminOwners); err != nil {
 172  			if strings.HasPrefix(err.Error(), "blocked:") {
 173  				errStr := err.Error()[len("blocked: "):]
 174  				return Blocked(errStr)
 175  			}
 176  		}
 177  	}
 178  
 179  	// Save the event
 180  	result := s.saveEvent(ctx, ev)
 181  	if !result.Saved {
 182  		return result
 183  	}
 184  
 185  	// Run post-save hooks
 186  	s.runPostSaveHooks(ev)
 187  
 188  	// Deliver the event to subscribers
 189  	s.deliver(ev)
 190  
 191  	return OK()
 192  }
 193  
 194  // saveEvent handles rate limiting and database persistence.
 195  func (s *Service) saveEvent(ctx context.Context, ev *event.E) Result {
 196  	// Create timeout context
 197  	saveCtx, cancel := context.WithTimeout(ctx, s.cfg.WriteTimeout)
 198  	defer cancel()
 199  
 200  	// Apply rate limiting (skip for NIP-46 bunker events which need realtime priority)
 201  	const kindNIP46 = 24133
 202  	if s.rateLimiter != nil && s.rateLimiter.IsEnabled() && ev.Kind != uint16(kindNIP46) {
 203  		const writeOpType = 1 // ratelimit.Write
 204  		s.rateLimiter.Wait(saveCtx, writeOpType)
 205  	}
 206  
 207  	// Save to database
 208  	_, err := s.db.SaveEvent(saveCtx, ev)
 209  	if err != nil {
 210  		if strings.HasPrefix(err.Error(), "blocked:") {
 211  			errStr := err.Error()[len("blocked: "):]
 212  			return Blocked(errStr)
 213  		}
 214  		return Failed(err)
 215  	}
 216  
 217  	return OK()
 218  }
 219  
 220  // deliver sends event to subscribers.
 221  func (s *Service) deliver(ev *event.E) {
 222  	cloned := ev.Clone()
 223  	go s.publisher.Deliver(cloned)
 224  }
 225  
 226  // runPostSaveHooks handles side effects after event persistence.
 227  func (s *Service) runPostSaveHooks(ev *event.E) {
 228  	isAdmin := s.isAdminPubkey(ev.Pubkey)
 229  	isOwner := s.isOwnerPubkey(ev.Pubkey)
 230  
 231  	// Dispatch domain event for saved event
 232  	if s.eventDispatcher != nil {
 233  		s.eventDispatcher.PublishAsync(events.NewEventSaved(ev, 0, isAdmin, isOwner))
 234  	}
 235  
 236  	// Handle relay group configuration events
 237  	if s.relayGroupMgr != nil {
 238  		if err := s.relayGroupMgr.ValidateRelayGroupEvent(ev); err == nil {
 239  			if s.syncManager != nil {
 240  				s.relayGroupMgr.HandleRelayGroupEvent(ev, s.syncManager)
 241  			}
 242  		}
 243  	}
 244  
 245  	// Handle cluster membership events (Kind 39108)
 246  	if ev.Kind == 39108 && s.clusterManager != nil {
 247  		s.clusterManager.HandleMembershipEvent(ev)
 248  	}
 249  
 250  	// Update serial for distributed synchronization
 251  	if s.syncManager != nil {
 252  		s.syncManager.UpdateSerial()
 253  	}
 254  
 255  	// ACL reconfiguration for admin events
 256  	if isAdmin || isOwner {
 257  		if ev.Kind == kind.FollowList.K || ev.Kind == kind.RelayListMetadata.K {
 258  			if s.aclRegistry != nil {
 259  				go s.aclRegistry.Configure()
 260  			}
 261  		}
 262  	}
 263  }
 264  
 265  // isAdminPubkey checks if pubkey is an admin.
 266  func (s *Service) isAdminPubkey(pubkey []byte) bool {
 267  	for _, admin := range s.cfg.Admins {
 268  		if fastEqual(admin, pubkey) {
 269  			return true
 270  		}
 271  	}
 272  	return false
 273  }
 274  
 275  // isOwnerPubkey checks if pubkey is an owner.
 276  func (s *Service) isOwnerPubkey(pubkey []byte) bool {
 277  	for _, owner := range s.cfg.Owners {
 278  		if fastEqual(owner, pubkey) {
 279  			return true
 280  		}
 281  	}
 282  	return false
 283  }
 284  
 285  // fastEqual compares two byte slices for equality.
 286  func fastEqual(a, b []byte) bool {
 287  	if len(a) != len(b) {
 288  		return false
 289  	}
 290  	for i := range a {
 291  		if a[i] != b[i] {
 292  			return false
 293  		}
 294  	}
 295  	return true
 296  }
 297