service.go raw

   1  // Package ingestion provides a service for orchestrating the event processing pipeline.
   2  // It coordinates validation, special kind handling, authorization, routing, and processing.
   3  package ingestion
   4  
   5  import (
   6  	"context"
   7  	"fmt"
   8  
   9  	"next.orly.dev/pkg/nostr/encoders/event"
  10  	"next.orly.dev/pkg/event/authorization"
  11  	"next.orly.dev/pkg/event/processing"
  12  	"next.orly.dev/pkg/event/routing"
  13  	"next.orly.dev/pkg/event/specialkinds"
  14  	"next.orly.dev/pkg/event/validation"
  15  )
  16  
  17  // SprocketChecker checks events against the sprocket (external filter).
  18  type SprocketChecker interface {
  19  	IsEnabled() bool
  20  	IsDisabled() bool
  21  	IsRunning() bool
  22  	ProcessEvent(*event.E) (*SprocketResponse, error)
  23  }
  24  
  25  // SprocketResponse is the response from sprocket processing.
  26  type SprocketResponse struct {
  27  	Action string // "accept", "reject", "shadowReject"
  28  	Msg    string
  29  }
  30  
  31  // ConnectionContext provides connection-specific data for event processing.
  32  type ConnectionContext struct {
  33  	// AuthedPubkey is the authenticated pubkey for this connection (may be nil).
  34  	AuthedPubkey []byte
  35  
  36  	// Remote is the remote address of the connection.
  37  	Remote string
  38  
  39  	// ConnectionID uniquely identifies this connection.
  40  	ConnectionID string
  41  }
  42  
  43  // Result represents the outcome of event ingestion.
  44  type Result struct {
  45  	// Accepted indicates the event was accepted for processing.
  46  	Accepted bool
  47  
  48  	// Saved indicates the event was saved to the database.
  49  	Saved bool
  50  
  51  	// Message is an optional message for the OK response.
  52  	Message string
  53  
  54  	// RequireAuth indicates authentication is required.
  55  	RequireAuth bool
  56  
  57  	// Error contains any error that occurred.
  58  	Error error
  59  }
  60  
  61  // Accepted returns a successful Result.
  62  func Accepted(message string) Result {
  63  	return Result{Accepted: true, Saved: true, Message: message}
  64  }
  65  
  66  // AcceptedNotSaved returns a Result where the event was accepted but not saved.
  67  func AcceptedNotSaved(message string) Result {
  68  	return Result{Accepted: true, Saved: false, Message: message}
  69  }
  70  
  71  // Rejected returns a rejection Result.
  72  func Rejected(message string) Result {
  73  	return Result{Accepted: false, Message: message}
  74  }
  75  
  76  // AuthRequired returns a Result indicating authentication is required.
  77  func AuthRequired(message string) Result {
  78  	return Result{Accepted: false, RequireAuth: true, Message: message}
  79  }
  80  
  81  // Errored returns a Result with an error.
  82  func Errored(err error) Result {
  83  	return Result{Accepted: false, Error: err}
  84  }
  85  
  86  // Config configures the ingestion service.
  87  type Config struct {
  88  	// SprocketChecker is the optional sprocket checker.
  89  	SprocketChecker SprocketChecker
  90  
  91  	// SpecialKinds is the registry for special kind handlers.
  92  	SpecialKinds *specialkinds.Registry
  93  
  94  	// ACLMode is the current ACL mode (used for NIP-70 validation).
  95  	ACLMode func() string
  96  
  97  	// DeleteHandler handles deletion events.
  98  	DeleteHandler DeleteHandler
  99  }
 100  
 101  // DeleteHandler processes delete events (kind 5).
 102  type DeleteHandler interface {
 103  	// HandleDelete processes a delete event after it's been saved.
 104  	HandleDelete(ctx context.Context, ev *event.E) error
 105  }
 106  
 107  // Service orchestrates the event ingestion pipeline.
 108  type Service struct {
 109  	validator     *validation.Service
 110  	authorizer    *authorization.Service
 111  	router        *routing.DefaultRouter
 112  	processor     *processing.Service
 113  	sprocket      SprocketChecker
 114  	specialKinds  *specialkinds.Registry
 115  	aclMode       func() string
 116  	deleteHandler DeleteHandler
 117  }
 118  
 119  // NewService creates a new ingestion service.
 120  func NewService(
 121  	validator *validation.Service,
 122  	authorizer *authorization.Service,
 123  	router *routing.DefaultRouter,
 124  	processor *processing.Service,
 125  	cfg Config,
 126  ) *Service {
 127  	return &Service{
 128  		validator:     validator,
 129  		authorizer:    authorizer,
 130  		router:        router,
 131  		processor:     processor,
 132  		sprocket:      cfg.SprocketChecker,
 133  		specialKinds:  cfg.SpecialKinds,
 134  		aclMode:       cfg.ACLMode,
 135  		deleteHandler: cfg.DeleteHandler,
 136  	}
 137  }
 138  
 139  // Ingest processes an event through the full ingestion pipeline.
 140  // Returns a Result indicating the outcome.
 141  func (s *Service) Ingest(ctx context.Context, ev *event.E, connCtx *ConnectionContext) Result {
 142  	// Stage 1: Event validation (ID, timestamp, signature)
 143  	if result := s.validator.ValidateEvent(ev); !result.Valid {
 144  		return Rejected(result.Msg)
 145  	}
 146  
 147  	// Stage 2: Sprocket check (if enabled)
 148  	if s.sprocket != nil && s.sprocket.IsEnabled() {
 149  		if s.sprocket.IsDisabled() {
 150  			return Rejected("sprocket disabled - events rejected until sprocket is restored")
 151  		}
 152  		if !s.sprocket.IsRunning() {
 153  			return Rejected("sprocket not running - events rejected until sprocket starts")
 154  		}
 155  
 156  		response, err := s.sprocket.ProcessEvent(ev)
 157  		if err != nil {
 158  			return Errored(fmt.Errorf("sprocket processing failed: %w", err))
 159  		}
 160  
 161  		switch response.Action {
 162  		case "accept":
 163  			// Continue processing
 164  		case "reject":
 165  			return Rejected(response.Msg)
 166  		case "shadowReject":
 167  			// Accept but don't save
 168  			return AcceptedNotSaved("")
 169  		}
 170  	}
 171  
 172  	// Stage 3: Special kind handling
 173  	if s.specialKinds != nil {
 174  		hctx := &specialkinds.HandlerContext{
 175  			AuthedPubkey: connCtx.AuthedPubkey,
 176  			Remote:       connCtx.Remote,
 177  			ConnectionID: connCtx.ConnectionID,
 178  		}
 179  		if result, handled := s.specialKinds.TryHandle(ctx, ev, hctx); handled {
 180  			if result.Error != nil {
 181  				return Errored(result.Error)
 182  			}
 183  			if result.Handled {
 184  				if result.SaveEvent {
 185  					// Handler wants the event saved
 186  					procResult := s.processor.Process(ctx, ev)
 187  					if procResult.Error != nil {
 188  						return Errored(procResult.Error)
 189  					}
 190  					return Accepted(result.Message)
 191  				}
 192  				return AcceptedNotSaved(result.Message)
 193  			}
 194  			// result.Continue - fall through to normal processing
 195  		}
 196  	}
 197  
 198  	// Stage 4: Authorization check
 199  	decision := s.authorizer.Authorize(ev, connCtx.AuthedPubkey, connCtx.Remote, ev.Kind)
 200  	if !decision.Allowed {
 201  		if decision.RequireAuth {
 202  			return AuthRequired(decision.DenyReason)
 203  		}
 204  		return Rejected(decision.DenyReason)
 205  	}
 206  
 207  	// Stage 5: NIP-70 protected tag validation (only when ACL is active)
 208  	if s.aclMode != nil && s.aclMode() != "none" {
 209  		if result := s.validator.ValidateProtectedTag(ev, connCtx.AuthedPubkey); !result.Valid {
 210  			return Rejected(result.Msg)
 211  		}
 212  	}
 213  
 214  	// Stage 6: Routing (ephemeral events, etc.)
 215  	if routeResult := s.router.Route(ev, connCtx.AuthedPubkey); routeResult.Action != routing.Continue {
 216  		if routeResult.Action == routing.Handled {
 217  			return AcceptedNotSaved(routeResult.Message)
 218  		}
 219  		if routeResult.Action == routing.Error {
 220  			return Errored(fmt.Errorf("routing error: %s", routeResult.Message))
 221  		}
 222  	}
 223  
 224  	// Stage 7: Processing (save, hooks, delivery)
 225  	procResult := s.processor.Process(ctx, ev)
 226  	if procResult.Blocked {
 227  		return Rejected(procResult.BlockMsg)
 228  	}
 229  	if procResult.Error != nil {
 230  		return Errored(procResult.Error)
 231  	}
 232  
 233  	// Stage 8: Delete event post-processing
 234  	const kindEventDeletion = 5
 235  	if ev.Kind == kindEventDeletion && s.deleteHandler != nil {
 236  		if err := s.deleteHandler.HandleDelete(ctx, ev); err != nil {
 237  			// Log but don't fail - the delete event is already saved
 238  			// Return success since the event was stored
 239  		}
 240  	}
 241  
 242  	return Accepted("")
 243  }
 244  
 245  // IngestWithRawValidation includes raw JSON validation before unmarshaling.
 246  // Use this when the event hasn't been validated yet.
 247  func (s *Service) IngestWithRawValidation(ctx context.Context, rawJSON []byte, ev *event.E, connCtx *ConnectionContext) Result {
 248  	// Stage 0: Raw JSON validation
 249  	if result := s.validator.ValidateRawJSON(rawJSON); !result.Valid {
 250  		return Rejected(result.Msg)
 251  	}
 252  
 253  	return s.Ingest(ctx, ev, connCtx)
 254  }
 255