// Package pipeline provides the event ingestion pipeline for the relay. // Validates, verifies, checks ACL, rate-limits, handles special kinds, // and stores events. Returns a Result suitable for OK envelope responses. package pipeline import ( "bytes" "strconv" "time" "smesh.lol/pkg/acl" "smesh.lol/pkg/nostr/event" "smesh.lol/pkg/nostr/filter" "smesh.lol/pkg/nostr/kind" "smesh.lol/pkg/nostr/tag" "smesh.lol/pkg/relay/ratelimit" "smesh.lol/pkg/store" ) // Result is the outcome of event ingestion, maps to the OK envelope. type Result struct { OK bool Reason []byte } func accepted() *Result { return &Result{OK: true} } func rejected(reason string) *Result { return &Result{Reason: []byte(reason)} } func rejectedB(reason []byte) *Result { return &Result{Reason: reason} } // Config holds pipeline limits. type Config struct { MaxFuture int64 // max seconds ahead of now (default 900) MaxPast int64 // max seconds behind now (0 = unlimited) MaxContent int // max content bytes (default 70000) MaxTags int // max tags per event (default 2000) MaxTagElem int // max bytes per tag element (default 1024) } // DefaultConfig returns sensible defaults. func DefaultConfig() Config { return Config{ MaxFuture: 900, MaxContent: 70000, MaxTags: 2000, MaxTagElem: 1024, } } // Pipeline processes incoming events through the full ingestion path. type Pipeline struct { store *store.Engine acl acl.Checker limiter *ratelimit.Limiter cfg Config } // New creates a pipeline. Limiter may be nil to disable rate limiting. func New(s *store.Engine, a acl.Checker, l *ratelimit.Limiter, cfg Config) *Pipeline { return &Pipeline{store: s, acl: a, limiter: l, cfg: cfg} } // Ingest validates, verifies, and stores an event. func (p *Pipeline) Ingest(ev *event.E) *Result { // 1. Schema validation. if msg := p.validate(ev); msg != nil { return rejectedB(msg) } // 2. Signature verification. valid, _ := ev.Verify() if !valid { return rejected("invalid: bad signature") } // 3. ACL. if !p.acl.AllowWrite(ev.Pubkey, ev.Kind) { return rejected("blocked: pubkey not allowed") } // 4. Rate limit. if p.limiter != nil && !p.limiter.Allow(ev.Pubkey) { return rejected("rate-limited: slow down") } // 5. Expiration check (NIP-40). if isExpired(ev) { return rejected("invalid: event expired") } // 6. Ephemeral — accept but don't store. if kind.IsEphemeral(ev.Kind) { return accepted() } // 7. Replaceable kinds. if kind.IsReplaceable(ev.Kind) { return p.handleReplaceable(ev) } if kind.IsParameterizedReplaceable(ev.Kind) { return p.handleParamReplaceable(ev) } // 8. Deletion (NIP-09). if ev.Kind == kind.EventDeletion.K { return p.handleDeletion(ev) } // 9. Store regular event. return p.saveOrDup(ev) } // --- validation --- func (p *Pipeline) validate(ev *event.E) []byte { if len(ev.ID) != 32 { return []byte("invalid: id must be 32 bytes") } if len(ev.Pubkey) != 32 { return []byte("invalid: pubkey must be 32 bytes") } if len(ev.Sig) != 64 { return []byte("invalid: sig must be 64 bytes") } if ev.CreatedAt == 0 { return []byte("invalid: created_at missing") } now := time.Now().Unix() if p.cfg.MaxFuture > 0 && ev.CreatedAt > now+p.cfg.MaxFuture { return []byte("invalid: created_at too far in future") } if p.cfg.MaxPast > 0 && ev.CreatedAt < now-p.cfg.MaxPast { return []byte("invalid: created_at too far in past") } if p.cfg.MaxContent > 0 && len(ev.Content) > p.cfg.MaxContent { return []byte("invalid: content too large") } if ev.Tags != nil { if p.cfg.MaxTags > 0 && ev.Tags.Len() > p.cfg.MaxTags { return []byte("invalid: too many tags") } if p.cfg.MaxTagElem > 0 { for _, tg := range *ev.Tags { for _, elem := range tg.T { if len(elem) > p.cfg.MaxTagElem { return []byte("invalid: tag element too large") } } } } } // Verify ID matches canonical hash. if !bytes.Equal(ev.ID, ev.GetIDBytes()) { return []byte("invalid: id mismatch") } return nil } // --- expiration (NIP-40) --- func isExpired(ev *event.E) bool { if ev.Tags == nil { return false } t := ev.Tags.GetFirst([]byte("expiration")) if t == nil || t.Len() < 2 { return false } exp, err := strconv.ParseInt(string(t.Value()), 10, 64) if err != nil || exp <= 0 { return false } return time.Now().Unix() > exp } // --- replaceable events --- // Tiebreaker when timestamps are equal: keep the event with the // lexicographically lower ID (deterministic on SHA-256 hashes). // NIP-01 does not specify tiebreaker behavior. func (p *Pipeline) handleReplaceable(ev *event.E) *Result { f := &filter.F{ Kinds: kind.NewS(kind.New(ev.Kind)), Authors: tag.NewFromBytesSlice(ev.Pubkey), } limit := uint(1) f.Limit = &limit existing, err := p.store.QueryEvents(f) if err == nil && len(existing) > 0 { old := existing[0] if old.CreatedAt > ev.CreatedAt { return rejected("duplicate: newer version exists") } if old.CreatedAt == ev.CreatedAt && bytes.Compare(old.ID, ev.ID) < 0 { return rejected("duplicate: event with lower id exists at same timestamp") } p.store.DeleteEvent(old.ID) } return p.saveOrDup(ev) } func (p *Pipeline) handleParamReplaceable(ev *event.E) *Result { dVal := dTagValue(ev) f := &filter.F{ Kinds: kind.NewS(kind.New(ev.Kind)), Authors: tag.NewFromBytesSlice(ev.Pubkey), } existing, err := p.store.QueryEvents(f) if err == nil { for _, old := range existing { if !bytes.Equal(dTagValue(old), dVal) { continue } if old.CreatedAt > ev.CreatedAt { return rejected("duplicate: newer version exists") } if old.CreatedAt == ev.CreatedAt && bytes.Compare(old.ID, ev.ID) < 0 { return rejected("duplicate: event with lower id exists at same timestamp") } p.store.DeleteEvent(old.ID) break } } return p.saveOrDup(ev) } func dTagValue(ev *event.E) []byte { if ev.Tags == nil { return nil } t := ev.Tags.GetFirst([]byte("d")) if t == nil || t.Len() < 2 { return nil } return t.Value() } // --- deletion (NIP-09) --- func (p *Pipeline) handleDeletion(ev *event.E) *Result { if ev.Tags == nil { return p.saveOrDup(ev) } eTags := ev.Tags.GetAll([]byte("e")) for _, et := range eTags { targetID := et.ValueBinary() if targetID == nil || len(targetID) != 32 { continue } // Only delete events owned by the same pubkey. tf := &filter.F{Ids: tag.NewFromBytesSlice(targetID)} targets, err := p.store.QueryEvents(tf) if err != nil || len(targets) == 0 { continue } if !bytes.Equal(targets[0].Pubkey, ev.Pubkey) { continue } p.store.DeleteEvent(targetID) } return p.saveOrDup(ev) } // --- storage helper --- func (p *Pipeline) saveOrDup(ev *event.E) *Result { err := p.store.SaveEvent(ev) if err == nil { return accepted() } msg := err.Error() if len(msg) >= 9 && msg[:9] == "duplicate" { return rejected("duplicate: already have this event") } return rejectedB(append([]byte("error: "), msg...)) }