pipeline.mx raw

   1  // Package pipeline provides the event ingestion pipeline for the relay.
   2  // Validates, verifies, checks ACL, rate-limits, handles special kinds,
   3  // and stores events. Returns a Result suitable for OK envelope responses.
   4  package pipeline
   5  
   6  import (
   7  	"bytes"
   8  	"strconv"
   9  	"time"
  10  
  11  	"smesh.lol/pkg/acl"
  12  	"smesh.lol/pkg/nostr/event"
  13  	"smesh.lol/pkg/nostr/filter"
  14  	"smesh.lol/pkg/nostr/kind"
  15  	"smesh.lol/pkg/nostr/tag"
  16  	"smesh.lol/pkg/relay/ratelimit"
  17  	"smesh.lol/pkg/store"
  18  )
  19  
  20  // Result is the outcome of event ingestion, maps to the OK envelope.
  21  type Result struct {
  22  	OK     bool
  23  	Reason []byte
  24  }
  25  
  26  func accepted() *Result                { return &Result{OK: true} }
  27  func rejected(reason string) *Result   { return &Result{Reason: []byte(reason)} }
  28  func rejectedB(reason []byte) *Result  { return &Result{Reason: reason} }
  29  
  30  // Config holds pipeline limits.
  31  type Config struct {
  32  	MaxFuture  int64 // max seconds ahead of now (default 900)
  33  	MaxPast    int64 // max seconds behind now (0 = unlimited)
  34  	MaxContent int   // max content bytes (default 70000)
  35  	MaxTags    int   // max tags per event (default 2000)
  36  	MaxTagElem int   // max bytes per tag element (default 1024)
  37  }
  38  
  39  // DefaultConfig returns sensible defaults.
  40  func DefaultConfig() Config {
  41  	return Config{
  42  		MaxFuture:  900,
  43  		MaxContent: 70000,
  44  		MaxTags:    2000,
  45  		MaxTagElem: 1024,
  46  	}
  47  }
  48  
  49  // Pipeline processes incoming events through the full ingestion path.
  50  type Pipeline struct {
  51  	store   *store.Engine
  52  	acl     acl.Checker
  53  	limiter *ratelimit.Limiter
  54  	cfg     Config
  55  }
  56  
  57  // New creates a pipeline. Limiter may be nil to disable rate limiting.
  58  func New(s *store.Engine, a acl.Checker, l *ratelimit.Limiter, cfg Config) *Pipeline {
  59  	return &Pipeline{store: s, acl: a, limiter: l, cfg: cfg}
  60  }
  61  
  62  // Ingest validates, verifies, and stores an event.
  63  func (p *Pipeline) Ingest(ev *event.E) *Result {
  64  	// 1. Schema validation.
  65  	if msg := p.validate(ev); msg != nil {
  66  		return rejectedB(msg)
  67  	}
  68  
  69  	// 2. Signature verification.
  70  	valid, _ := ev.Verify()
  71  	if !valid {
  72  		return rejected("invalid: bad signature")
  73  	}
  74  
  75  	// 3. ACL.
  76  	if !p.acl.AllowWrite(ev.Pubkey, ev.Kind) {
  77  		return rejected("blocked: pubkey not allowed")
  78  	}
  79  
  80  	// 4. Rate limit.
  81  	if p.limiter != nil && !p.limiter.Allow(ev.Pubkey) {
  82  		return rejected("rate-limited: slow down")
  83  	}
  84  
  85  	// 5. Expiration check (NIP-40).
  86  	if isExpired(ev) {
  87  		return rejected("invalid: event expired")
  88  	}
  89  
  90  	// 6. Ephemeral — accept but don't store.
  91  	if kind.IsEphemeral(ev.Kind) {
  92  		return accepted()
  93  	}
  94  
  95  	// 7. Replaceable kinds.
  96  	if kind.IsReplaceable(ev.Kind) {
  97  		return p.handleReplaceable(ev)
  98  	}
  99  	if kind.IsParameterizedReplaceable(ev.Kind) {
 100  		return p.handleParamReplaceable(ev)
 101  	}
 102  
 103  	// 8. Deletion (NIP-09).
 104  	if ev.Kind == kind.EventDeletion.K {
 105  		return p.handleDeletion(ev)
 106  	}
 107  
 108  	// 9. Store regular event.
 109  	return p.saveOrDup(ev)
 110  }
 111  
 112  // --- validation ---
 113  
 114  func (p *Pipeline) validate(ev *event.E) []byte {
 115  	if len(ev.ID) != 32 {
 116  		return []byte("invalid: id must be 32 bytes")
 117  	}
 118  	if len(ev.Pubkey) != 32 {
 119  		return []byte("invalid: pubkey must be 32 bytes")
 120  	}
 121  	if len(ev.Sig) != 64 {
 122  		return []byte("invalid: sig must be 64 bytes")
 123  	}
 124  	if ev.CreatedAt == 0 {
 125  		return []byte("invalid: created_at missing")
 126  	}
 127  
 128  	now := time.Now().Unix()
 129  	if p.cfg.MaxFuture > 0 && ev.CreatedAt > now+p.cfg.MaxFuture {
 130  		return []byte("invalid: created_at too far in future")
 131  	}
 132  	if p.cfg.MaxPast > 0 && ev.CreatedAt < now-p.cfg.MaxPast {
 133  		return []byte("invalid: created_at too far in past")
 134  	}
 135  
 136  	if p.cfg.MaxContent > 0 && len(ev.Content) > p.cfg.MaxContent {
 137  		return []byte("invalid: content too large")
 138  	}
 139  
 140  	if ev.Tags != nil {
 141  		if p.cfg.MaxTags > 0 && ev.Tags.Len() > p.cfg.MaxTags {
 142  			return []byte("invalid: too many tags")
 143  		}
 144  		if p.cfg.MaxTagElem > 0 {
 145  			for _, tg := range *ev.Tags {
 146  				for _, elem := range tg.T {
 147  					if len(elem) > p.cfg.MaxTagElem {
 148  						return []byte("invalid: tag element too large")
 149  					}
 150  				}
 151  			}
 152  		}
 153  	}
 154  
 155  	// Verify ID matches canonical hash.
 156  	if !bytes.Equal(ev.ID, ev.GetIDBytes()) {
 157  		return []byte("invalid: id mismatch")
 158  	}
 159  
 160  	return nil
 161  }
 162  
 163  // --- expiration (NIP-40) ---
 164  
 165  func isExpired(ev *event.E) bool {
 166  	if ev.Tags == nil {
 167  		return false
 168  	}
 169  	t := ev.Tags.GetFirst([]byte("expiration"))
 170  	if t == nil || t.Len() < 2 {
 171  		return false
 172  	}
 173  	exp, err := strconv.ParseInt(string(t.Value()), 10, 64)
 174  	if err != nil || exp <= 0 {
 175  		return false
 176  	}
 177  	return time.Now().Unix() > exp
 178  }
 179  
 180  // --- replaceable events ---
 181  // Tiebreaker when timestamps are equal: keep the event with the
 182  // lexicographically lower ID (deterministic on SHA-256 hashes).
 183  // NIP-01 does not specify tiebreaker behavior.
 184  
 185  func (p *Pipeline) handleReplaceable(ev *event.E) *Result {
 186  	f := &filter.F{
 187  		Kinds:   kind.NewS(kind.New(ev.Kind)),
 188  		Authors: tag.NewFromBytesSlice(ev.Pubkey),
 189  	}
 190  	limit := uint(1)
 191  	f.Limit = &limit
 192  
 193  	existing, err := p.store.QueryEvents(f)
 194  	if err == nil && len(existing) > 0 {
 195  		old := existing[0]
 196  		if old.CreatedAt > ev.CreatedAt {
 197  			return rejected("duplicate: newer version exists")
 198  		}
 199  		if old.CreatedAt == ev.CreatedAt && bytes.Compare(old.ID, ev.ID) < 0 {
 200  			return rejected("duplicate: event with lower id exists at same timestamp")
 201  		}
 202  		p.store.DeleteEvent(old.ID)
 203  	}
 204  	return p.saveOrDup(ev)
 205  }
 206  
 207  func (p *Pipeline) handleParamReplaceable(ev *event.E) *Result {
 208  	dVal := dTagValue(ev)
 209  
 210  	f := &filter.F{
 211  		Kinds:   kind.NewS(kind.New(ev.Kind)),
 212  		Authors: tag.NewFromBytesSlice(ev.Pubkey),
 213  	}
 214  	existing, err := p.store.QueryEvents(f)
 215  	if err == nil {
 216  		for _, old := range existing {
 217  			if !bytes.Equal(dTagValue(old), dVal) {
 218  				continue
 219  			}
 220  			if old.CreatedAt > ev.CreatedAt {
 221  				return rejected("duplicate: newer version exists")
 222  			}
 223  			if old.CreatedAt == ev.CreatedAt && bytes.Compare(old.ID, ev.ID) < 0 {
 224  				return rejected("duplicate: event with lower id exists at same timestamp")
 225  			}
 226  			p.store.DeleteEvent(old.ID)
 227  			break
 228  		}
 229  	}
 230  	return p.saveOrDup(ev)
 231  }
 232  
 233  func dTagValue(ev *event.E) []byte {
 234  	if ev.Tags == nil {
 235  		return nil
 236  	}
 237  	t := ev.Tags.GetFirst([]byte("d"))
 238  	if t == nil || t.Len() < 2 {
 239  		return nil
 240  	}
 241  	return t.Value()
 242  }
 243  
 244  // --- deletion (NIP-09) ---
 245  
 246  func (p *Pipeline) handleDeletion(ev *event.E) *Result {
 247  	if ev.Tags == nil {
 248  		return p.saveOrDup(ev)
 249  	}
 250  	eTags := ev.Tags.GetAll([]byte("e"))
 251  	for _, et := range eTags {
 252  		targetID := et.ValueBinary()
 253  		if targetID == nil || len(targetID) != 32 {
 254  			continue
 255  		}
 256  		// Only delete events owned by the same pubkey.
 257  		tf := &filter.F{Ids: tag.NewFromBytesSlice(targetID)}
 258  		targets, err := p.store.QueryEvents(tf)
 259  		if err != nil || len(targets) == 0 {
 260  			continue
 261  		}
 262  		if !bytes.Equal(targets[0].Pubkey, ev.Pubkey) {
 263  			continue
 264  		}
 265  		p.store.DeleteEvent(targetID)
 266  	}
 267  	return p.saveOrDup(ev)
 268  }
 269  
 270  // --- storage helper ---
 271  
 272  func (p *Pipeline) saveOrDup(ev *event.E) *Result {
 273  	err := p.store.SaveEvent(ev)
 274  	if err == nil {
 275  		return accepted()
 276  	}
 277  	msg := err.Error()
 278  	if len(msg) >= 9 && msg[:9] == "duplicate" {
 279  		return rejected("duplicate: already have this event")
 280  	}
 281  	return rejectedB(append([]byte("error: "), msg...))
 282  }
 283