pipeline.mx raw
1 // Package pipeline provides the event ingestion pipeline for the relay.
2 // Validates, verifies, checks ACL, rate-limits, handles special kinds,
3 // and stores events. Returns a Result suitable for OK envelope responses.
4 package pipeline
5
6 import (
7 "bytes"
8 "strconv"
9 "time"
10
11 "smesh.lol/pkg/acl"
12 "smesh.lol/pkg/nostr/event"
13 "smesh.lol/pkg/nostr/filter"
14 "smesh.lol/pkg/nostr/kind"
15 "smesh.lol/pkg/nostr/tag"
16 "smesh.lol/pkg/relay/ratelimit"
17 "smesh.lol/pkg/store"
18 )
19
20 // Result is the outcome of event ingestion, maps to the OK envelope.
21 type Result struct {
22 OK bool
23 Reason []byte
24 }
25
26 func accepted() *Result { return &Result{OK: true} }
27 func rejected(reason string) *Result { return &Result{Reason: []byte(reason)} }
28 func rejectedB(reason []byte) *Result { return &Result{Reason: reason} }
29
30 // Config holds pipeline limits.
31 type Config struct {
32 MaxFuture int64 // max seconds ahead of now (default 900)
33 MaxPast int64 // max seconds behind now (0 = unlimited)
34 MaxContent int // max content bytes (default 70000)
35 MaxTags int // max tags per event (default 2000)
36 MaxTagElem int // max bytes per tag element (default 1024)
37 }
38
39 // DefaultConfig returns sensible defaults.
40 func DefaultConfig() Config {
41 return Config{
42 MaxFuture: 900,
43 MaxContent: 70000,
44 MaxTags: 2000,
45 MaxTagElem: 1024,
46 }
47 }
48
49 // Pipeline processes incoming events through the full ingestion path.
50 type Pipeline struct {
51 store *store.Engine
52 acl acl.Checker
53 limiter *ratelimit.Limiter
54 cfg Config
55 }
56
57 // New creates a pipeline. Limiter may be nil to disable rate limiting.
58 func New(s *store.Engine, a acl.Checker, l *ratelimit.Limiter, cfg Config) *Pipeline {
59 return &Pipeline{store: s, acl: a, limiter: l, cfg: cfg}
60 }
61
62 // Ingest validates, verifies, and stores an event.
63 func (p *Pipeline) Ingest(ev *event.E) *Result {
64 // 1. Schema validation.
65 if msg := p.validate(ev); msg != nil {
66 return rejectedB(msg)
67 }
68
69 // 2. Signature verification.
70 valid, _ := ev.Verify()
71 if !valid {
72 return rejected("invalid: bad signature")
73 }
74
75 // 3. ACL.
76 if !p.acl.AllowWrite(ev.Pubkey, ev.Kind) {
77 return rejected("blocked: pubkey not allowed")
78 }
79
80 // 4. Rate limit.
81 if p.limiter != nil && !p.limiter.Allow(ev.Pubkey) {
82 return rejected("rate-limited: slow down")
83 }
84
85 // 5. Expiration check (NIP-40).
86 if isExpired(ev) {
87 return rejected("invalid: event expired")
88 }
89
90 // 6. Ephemeral — accept but don't store.
91 if kind.IsEphemeral(ev.Kind) {
92 return accepted()
93 }
94
95 // 7. Replaceable kinds.
96 if kind.IsReplaceable(ev.Kind) {
97 return p.handleReplaceable(ev)
98 }
99 if kind.IsParameterizedReplaceable(ev.Kind) {
100 return p.handleParamReplaceable(ev)
101 }
102
103 // 8. Deletion (NIP-09).
104 if ev.Kind == kind.EventDeletion.K {
105 return p.handleDeletion(ev)
106 }
107
108 // 9. Store regular event.
109 return p.saveOrDup(ev)
110 }
111
112 // --- validation ---
113
114 func (p *Pipeline) validate(ev *event.E) []byte {
115 if len(ev.ID) != 32 {
116 return []byte("invalid: id must be 32 bytes")
117 }
118 if len(ev.Pubkey) != 32 {
119 return []byte("invalid: pubkey must be 32 bytes")
120 }
121 if len(ev.Sig) != 64 {
122 return []byte("invalid: sig must be 64 bytes")
123 }
124 if ev.CreatedAt == 0 {
125 return []byte("invalid: created_at missing")
126 }
127
128 now := time.Now().Unix()
129 if p.cfg.MaxFuture > 0 && ev.CreatedAt > now+p.cfg.MaxFuture {
130 return []byte("invalid: created_at too far in future")
131 }
132 if p.cfg.MaxPast > 0 && ev.CreatedAt < now-p.cfg.MaxPast {
133 return []byte("invalid: created_at too far in past")
134 }
135
136 if p.cfg.MaxContent > 0 && len(ev.Content) > p.cfg.MaxContent {
137 return []byte("invalid: content too large")
138 }
139
140 if ev.Tags != nil {
141 if p.cfg.MaxTags > 0 && ev.Tags.Len() > p.cfg.MaxTags {
142 return []byte("invalid: too many tags")
143 }
144 if p.cfg.MaxTagElem > 0 {
145 for _, tg := range *ev.Tags {
146 for _, elem := range tg.T {
147 if len(elem) > p.cfg.MaxTagElem {
148 return []byte("invalid: tag element too large")
149 }
150 }
151 }
152 }
153 }
154
155 // Verify ID matches canonical hash.
156 if !bytes.Equal(ev.ID, ev.GetIDBytes()) {
157 return []byte("invalid: id mismatch")
158 }
159
160 return nil
161 }
162
163 // --- expiration (NIP-40) ---
164
165 func isExpired(ev *event.E) bool {
166 if ev.Tags == nil {
167 return false
168 }
169 t := ev.Tags.GetFirst([]byte("expiration"))
170 if t == nil || t.Len() < 2 {
171 return false
172 }
173 exp, err := strconv.ParseInt(string(t.Value()), 10, 64)
174 if err != nil || exp <= 0 {
175 return false
176 }
177 return time.Now().Unix() > exp
178 }
179
180 // --- replaceable events ---
181 // Tiebreaker when timestamps are equal: keep the event with the
182 // lexicographically lower ID (deterministic on SHA-256 hashes).
183 // NIP-01 does not specify tiebreaker behavior.
184
185 func (p *Pipeline) handleReplaceable(ev *event.E) *Result {
186 f := &filter.F{
187 Kinds: kind.NewS(kind.New(ev.Kind)),
188 Authors: tag.NewFromBytesSlice(ev.Pubkey),
189 }
190 limit := uint(1)
191 f.Limit = &limit
192
193 existing, err := p.store.QueryEvents(f)
194 if err == nil && len(existing) > 0 {
195 old := existing[0]
196 if old.CreatedAt > ev.CreatedAt {
197 return rejected("duplicate: newer version exists")
198 }
199 if old.CreatedAt == ev.CreatedAt && bytes.Compare(old.ID, ev.ID) < 0 {
200 return rejected("duplicate: event with lower id exists at same timestamp")
201 }
202 p.store.DeleteEvent(old.ID)
203 }
204 return p.saveOrDup(ev)
205 }
206
207 func (p *Pipeline) handleParamReplaceable(ev *event.E) *Result {
208 dVal := dTagValue(ev)
209
210 f := &filter.F{
211 Kinds: kind.NewS(kind.New(ev.Kind)),
212 Authors: tag.NewFromBytesSlice(ev.Pubkey),
213 }
214 existing, err := p.store.QueryEvents(f)
215 if err == nil {
216 for _, old := range existing {
217 if !bytes.Equal(dTagValue(old), dVal) {
218 continue
219 }
220 if old.CreatedAt > ev.CreatedAt {
221 return rejected("duplicate: newer version exists")
222 }
223 if old.CreatedAt == ev.CreatedAt && bytes.Compare(old.ID, ev.ID) < 0 {
224 return rejected("duplicate: event with lower id exists at same timestamp")
225 }
226 p.store.DeleteEvent(old.ID)
227 break
228 }
229 }
230 return p.saveOrDup(ev)
231 }
232
233 func dTagValue(ev *event.E) []byte {
234 if ev.Tags == nil {
235 return nil
236 }
237 t := ev.Tags.GetFirst([]byte("d"))
238 if t == nil || t.Len() < 2 {
239 return nil
240 }
241 return t.Value()
242 }
243
244 // --- deletion (NIP-09) ---
245
246 func (p *Pipeline) handleDeletion(ev *event.E) *Result {
247 if ev.Tags == nil {
248 return p.saveOrDup(ev)
249 }
250 eTags := ev.Tags.GetAll([]byte("e"))
251 for _, et := range eTags {
252 targetID := et.ValueBinary()
253 if targetID == nil || len(targetID) != 32 {
254 continue
255 }
256 // Only delete events owned by the same pubkey.
257 tf := &filter.F{Ids: tag.NewFromBytesSlice(targetID)}
258 targets, err := p.store.QueryEvents(tf)
259 if err != nil || len(targets) == 0 {
260 continue
261 }
262 if !bytes.Equal(targets[0].Pubkey, ev.Pubkey) {
263 continue
264 }
265 p.store.DeleteEvent(targetID)
266 }
267 return p.saveOrDup(ev)
268 }
269
270 // --- storage helper ---
271
272 func (p *Pipeline) saveOrDup(ev *event.E) *Result {
273 err := p.store.SaveEvent(ev)
274 if err == nil {
275 return accepted()
276 }
277 msg := err.Error()
278 if len(msg) >= 9 && msg[:9] == "duplicate" {
279 return rejected("duplicate: already have this event")
280 }
281 return rejectedB(append([]byte("error: "), msg...))
282 }
283