save-event.go raw
1 //go:build !(js && wasm)
2
3 package database
4
5 import (
6 "context"
7 "errors"
8 "fmt"
9 "strings"
10
11 "github.com/dgraph-io/badger/v4"
12 "next.orly.dev/pkg/lol/chk"
13 "next.orly.dev/pkg/lol/log"
14 "next.orly.dev/pkg/database/bufpool"
15 "next.orly.dev/pkg/database/indexes"
16 "next.orly.dev/pkg/database/indexes/types"
17 "next.orly.dev/pkg/mode"
18 "next.orly.dev/pkg/nostr/encoders/event"
19 "next.orly.dev/pkg/nostr/encoders/filter"
20 "next.orly.dev/pkg/nostr/encoders/hex"
21 "next.orly.dev/pkg/nostr/encoders/kind"
22 "next.orly.dev/pkg/nostr/encoders/tag"
23 )
24
25 var (
26 // ErrOlderThanExisting is returned when a candidate event is older than an existing replaceable/addressable event.
27 ErrOlderThanExisting = errors.New("older than existing event")
28 // ErrMissingDTag is returned when a parameterized replaceable event lacks the required 'd' tag.
29 ErrMissingDTag = errors.New("event is missing a d tag identifier")
30 )
31
32 func (d *D) GetSerialsFromFilter(f *filter.F) (
33 sers types.Uint40s, err error,
34 ) {
35 // Try p-tag graph optimization first
36 if CanUsePTagGraph(f) {
37 log.D.F("GetSerialsFromFilter: trying p-tag graph optimization")
38 if sers, err = d.QueryPTagGraph(f); err == nil && len(sers) >= 0 {
39 log.D.F("GetSerialsFromFilter: p-tag graph optimization returned %d serials", len(sers))
40 return
41 }
42 // Fall through to traditional indexes on error
43 log.D.F("GetSerialsFromFilter: p-tag graph optimization failed, falling back to traditional indexes: %v", err)
44 err = nil
45 }
46
47 var idxs []Range
48 if idxs, err = GetIndexesFromFilter(f); chk.E(err) {
49 return
50 }
51 // Pre-allocate slice with estimated capacity to reduce reallocations
52 sers = make(
53 types.Uint40s, 0, len(idxs)*100,
54 ) // Estimate 100 serials per index
55 for _, idx := range idxs {
56 var s types.Uint40s
57 if s, err = d.GetSerialsByRange(idx); chk.E(err) {
58 continue
59 }
60 sers = append(sers, s...)
61 }
62 return
63 }
64
65 // WouldReplaceEvent checks if the provided event would replace existing events
66 // based on Nostr's replaceable or parameterized replaceable semantics. It
67 // returns true if the candidate is newer-or-equal than existing events.
68 // If an existing event is newer, it returns (false, nil, ErrOlderThanExisting).
69 // If no conflicts exist, it returns (false, nil, nil).
70 func (d *D) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) {
71 // Only relevant for replaceable or parameterized replaceable kinds
72 if !(kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind)) {
73 return false, nil, nil
74 }
75
76 // Fast path for parameterized replaceable events using AddressableEvent index
77 if kind.IsParameterizedReplaceable(ev.Kind) {
78 dTag := ev.Tags.GetFirst([]byte("d"))
79 if dTag == nil {
80 return false, nil, ErrMissingDTag
81 }
82
83 // Build filter for direct lookup
84 f := &filter.F{
85 Authors: tag.NewFromBytesSlice(ev.Pubkey),
86 Kinds: kind.NewS(kind.New(ev.Kind)),
87 Tags: tag.NewS(
88 tag.NewFromAny("#d", dTag.Value()),
89 ),
90 }
91
92 // Try direct O(1) lookup via AddressableEvent index
93 if serial, err := d.QueryForAddressableEvent(f); err == nil && serial != nil {
94 oldEv, ferr := d.FetchEventBySerial(serial)
95 if ferr == nil && oldEv != nil {
96 if ev.CreatedAt < oldEv.CreatedAt {
97 return false, nil, ErrOlderThanExisting
98 }
99 // Candidate is newer or same age - it should replace
100 return true, nil, nil
101 }
102 // Fetch failed - fall through to slow path
103 }
104 // Index miss (migration pending) - fall through to slow path
105 }
106
107 // Standard path for replaceable events and fallback for parameterized replaceable
108 var f *filter.F
109 if kind.IsReplaceable(ev.Kind) {
110 f = &filter.F{
111 Authors: tag.NewFromBytesSlice(ev.Pubkey),
112 Kinds: kind.NewS(kind.New(ev.Kind)),
113 }
114 } else {
115 // parameterized replaceable - build filter for slow path
116 dTag := ev.Tags.GetFirst([]byte("d"))
117 if dTag == nil {
118 return false, nil, ErrMissingDTag
119 }
120 f = &filter.F{
121 Authors: tag.NewFromBytesSlice(ev.Pubkey),
122 Kinds: kind.NewS(kind.New(ev.Kind)),
123 Tags: tag.NewS(
124 tag.NewFromAny("d", dTag.Value()),
125 ),
126 }
127 }
128
129 sers, err := d.GetSerialsFromFilter(f)
130 if chk.E(err) {
131 return false, nil, err
132 }
133 if len(sers) == 0 {
134 return false, nil, nil
135 }
136
137 // Determine if any existing event is newer than the candidate
138 shouldReplace := true
139 for _, s := range sers {
140 oldEv, ferr := d.FetchEventBySerial(s)
141 if chk.E(ferr) {
142 continue
143 }
144 if ev.CreatedAt < oldEv.CreatedAt {
145 shouldReplace = false
146 break
147 }
148 }
149 if shouldReplace {
150 return true, nil, nil
151 }
152 return false, nil, ErrOlderThanExisting
153 }
154
155 // SaveEvent saves an event to the database, generating all the necessary indexes.
156 func (d *D) SaveEvent(c context.Context, ev *event.E) (
157 replaced bool, err error,
158 ) {
159 if ev == nil {
160 err = errors.New("nil event")
161 return
162 }
163
164 // Reject ephemeral events (kinds 20000-29999) - they should never be stored
165 if ev.Kind >= 20000 && ev.Kind <= 29999 {
166 err = errors.New("blocked: ephemeral events should not be stored")
167 return
168 }
169
170 // Validate kind 3 (follow list) events have at least one p tag
171 // This prevents storing malformed follow lists that may come from buggy relays
172 if ev.Kind == 3 {
173 hasPTag := false
174 tagCount := 0
175 if ev.Tags != nil {
176 tagCount = ev.Tags.Len()
177 for _, tag := range *ev.Tags {
178 if tag != nil && tag.Len() >= 2 {
179 key := tag.Key()
180 if len(key) == 1 && key[0] == 'p' {
181 hasPTag = true
182 break
183 }
184 }
185 }
186 }
187 if !hasPTag {
188 log.W.F("SaveEvent: rejecting kind 3 event without p tags from pubkey %x (total tags: %d, event ID: %x)",
189 ev.Pubkey, tagCount, ev.ID)
190 err = errors.New("blocked: kind 3 follow list events must have at least one p tag")
191 return
192 }
193 }
194
195 // check if the event already exists
196 var ser *types.Uint40
197 if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil {
198 err = errors.New("blocked: event already exists: " + hex.Enc(ev.ID[:]))
199 return
200 }
201
202 // If the error is "id not found", we can proceed with saving the event
203 if err != nil && strings.Contains(err.Error(), "id not found in database") {
204 // Reset error since this is expected for new events
205 err = nil
206 } else if err != nil {
207 // For any other error, return it
208 // log.E.F("error checking if event exists: %s", err)
209 return
210 }
211
212 // Check if the event has been deleted before allowing resubmission
213 // Skip deletion check when ACL is "none" (open relay mode)
214 if !mode.IsOpen() {
215 if err = d.CheckForDeleted(ev, nil); err != nil {
216 // log.I.F(
217 // "SaveEvent: rejecting resubmission of deleted event ID=%s: %v",
218 // hex.Enc(ev.ID), err,
219 // )
220 err = fmt.Errorf("blocked: %s", err.Error())
221 return
222 }
223 }
224 // check for replacement - only validate, don't delete old events
225 if kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind) {
226 var werr error
227 if replaced, _, werr = d.WouldReplaceEvent(ev); werr != nil {
228 if errors.Is(werr, ErrOlderThanExisting) {
229 if kind.IsReplaceable(ev.Kind) {
230 err = errors.New("blocked: event is older than existing replaceable event")
231 } else {
232 err = errors.New("blocked: event is older than existing addressable event")
233 }
234 return
235 }
236 if errors.Is(werr, ErrMissingDTag) {
237 // keep behavior consistent with previous implementation
238 err = ErrMissingDTag
239 return
240 }
241 // any other error
242 return
243 }
244 // Note: replaced flag is kept for compatibility but old events are no longer deleted
245 }
246 // Get the next sequence number for the event
247 var serial uint64
248 if serial, err = d.seq.Next(); chk.E(err) {
249 return
250 }
251 // Generate all indexes for the event
252 var idxs [][]byte
253 if idxs, err = GetIndexesForEvent(ev, serial); chk.E(err) {
254 return
255 }
256
257 // Collect all pubkeys for graph: author + p-tags
258 // Store with direction indicator: author (0) vs p-tag (1)
259 type pubkeyWithDirection struct {
260 serial *types.Uint40
261 isAuthor bool
262 }
263 pubkeysForGraph := make(map[string]pubkeyWithDirection)
264
265 // Add author pubkey
266 var authorSerial *types.Uint40
267 if authorSerial, err = d.GetOrCreatePubkeySerial(ev.Pubkey); chk.E(err) {
268 return
269 }
270 pubkeysForGraph[hex.Enc(ev.Pubkey)] = pubkeyWithDirection{
271 serial: authorSerial,
272 isAuthor: true,
273 }
274
275 // Extract p-tag pubkeys using GetAll
276 pTags := ev.Tags.GetAll([]byte("p"))
277 for _, pTag := range pTags {
278 if pTag.Len() >= 2 {
279 // Get pubkey from p-tag, handling both binary and hex storage formats
280 // ValueHex() returns hex regardless of internal storage format
281 var ptagPubkey []byte
282 if ptagPubkey, err = hex.Dec(string(pTag.ValueHex())); err == nil && len(ptagPubkey) == 32 {
283 pkHex := hex.Enc(ptagPubkey)
284 // Skip if already added as author
285 if _, exists := pubkeysForGraph[pkHex]; !exists {
286 var ptagSerial *types.Uint40
287 if ptagSerial, err = d.GetOrCreatePubkeySerial(ptagPubkey); chk.E(err) {
288 return
289 }
290 pubkeysForGraph[pkHex] = pubkeyWithDirection{
291 serial: ptagSerial,
292 isAuthor: false,
293 }
294 }
295 }
296 }
297 }
298 // log.T.F(
299 // "SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs),
300 // ev.ID, ev.Kind,
301 // )
302
303 // Create serial resolver for compact encoding
304 resolver := NewDatabaseSerialResolver(d, d.serialCache)
305
306 // Serialize event in compact format using serial references
307 // This dramatically reduces storage by replacing 32-byte IDs/pubkeys with 5-byte serials
308 compactData, compactErr := MarshalCompactEvent(ev, resolver)
309
310 // Calculate legacy size for comparison (for metrics tracking)
311 // We marshal to get accurate size comparison
312 legacyBuf := bufpool.GetMedium()
313 defer bufpool.PutMedium(legacyBuf)
314 ev.MarshalBinary(legacyBuf)
315 legacySize := legacyBuf.Len()
316
317 if compactErr != nil {
318 // Fall back to legacy format if compact encoding fails
319 log.W.F("SaveEvent: compact encoding failed, using legacy format: %v", compactErr)
320 compactData = bufpool.CopyBytes(legacyBuf)
321 } else {
322 // Track storage savings
323 TrackCompactSaving(legacySize, len(compactData))
324 log.T.F("SaveEvent: compact %d bytes vs legacy %d bytes (saved %d bytes, %.1f%%)",
325 len(compactData), legacySize, legacySize-len(compactData),
326 float64(legacySize-len(compactData))/float64(legacySize)*100.0)
327 }
328
329 // Start a transaction to save the event and all its indexes
330 err = d.Update(
331 func(txn *badger.Txn) (err error) {
332 // Pre-allocate key buffer to avoid allocations in loop
333 ser := new(types.Uint40)
334 if err = ser.Set(serial); chk.E(err) {
335 return
336 }
337
338 // Save each index
339 for _, key := range idxs {
340 if err = txn.Set(key, nil); chk.E(err) {
341 return
342 }
343 }
344
345 // Store the SerialEventId mapping (serial -> full 32-byte event ID)
346 // This is required for reconstructing compact events
347 if err = d.StoreEventIdSerial(txn, serial, ev.ID); chk.E(err) {
348 return
349 }
350
351 // Cache the event ID mapping
352 d.serialCache.CacheEventId(serial, ev.ID)
353
354 // Write AddressableEvent index for parameterized replaceable events (kinds 30000-39999)
355 // This enables O(1) direct lookup by pubkey + kind + d-tag for NIP-33 queries
356 // Key: aev|pubkey_hash|kind|dtag_hash, Value: serial (5 bytes)
357 if kind.IsParameterizedReplaceable(ev.Kind) {
358 dTag := ev.Tags.GetFirst([]byte("d"))
359 if dTag != nil {
360 aevKey, keyErr := BuildAddressableEventKey(ev.Pubkey, ev.Kind, dTag.Value())
361 if keyErr == nil {
362 // Serialize the serial as the value
363 serBuf := bufpool.GetSmall()
364 if err = ser.MarshalWrite(serBuf); chk.E(err) {
365 bufpool.PutSmall(serBuf)
366 return
367 }
368 if err = txn.Set(aevKey, bufpool.CopyBytes(serBuf)); chk.E(err) {
369 bufpool.PutSmall(serBuf)
370 return
371 }
372 bufpool.PutSmall(serBuf)
373 log.T.F("SaveEvent: wrote AddressableEvent index for kind=%d d=%s", ev.Kind, string(dTag.Value()))
374 }
375 }
376 }
377
378 // Store compact event with cmp prefix
379 // Format: cmp|serial|compact_event_data
380 // This is the only storage format - legacy evt/sev/aev/rev prefixes
381 // are handled by migration and no longer written for new events
382 cmpKeyBuf := bufpool.GetSmall()
383 if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKeyBuf); chk.E(err) {
384 bufpool.PutSmall(cmpKeyBuf)
385 return
386 }
387 if err = txn.Set(bufpool.CopyBytes(cmpKeyBuf), compactData); chk.E(err) {
388 bufpool.PutSmall(cmpKeyBuf)
389 return
390 }
391 bufpool.PutSmall(cmpKeyBuf)
392
393 // Create graph edges between event and all related pubkeys
394 // This creates bidirectional edges: event->pubkey and pubkey->event
395 // Include the event kind and direction for efficient graph queries
396 eventKind := new(types.Uint16)
397 eventKind.Set(ev.Kind)
398
399 // Reuse a single buffer for graph edge keys (reset between uses)
400 graphKeyBuf := bufpool.GetSmall()
401 defer bufpool.PutSmall(graphKeyBuf)
402
403 for _, pkInfo := range pubkeysForGraph {
404 // Determine direction for forward edge (event -> pubkey perspective)
405 directionForward := new(types.Letter)
406 // Determine direction for reverse edge (pubkey -> event perspective)
407 directionReverse := new(types.Letter)
408
409 if pkInfo.isAuthor {
410 // Event author relationship
411 directionForward.Set(types.EdgeDirectionAuthor) // 0: author
412 directionReverse.Set(types.EdgeDirectionAuthor) // 0: is author of event
413 } else {
414 // P-tag relationship
415 directionForward.Set(types.EdgeDirectionPTagOut) // 1: event references pubkey (outbound)
416 directionReverse.Set(types.EdgeDirectionPTagIn) // 2: pubkey is referenced (inbound)
417 }
418
419 // Create event -> pubkey edge (with kind and direction)
420 graphKeyBuf.Reset()
421 if err = indexes.EventPubkeyGraphEnc(ser, pkInfo.serial, eventKind, directionForward).MarshalWrite(graphKeyBuf); chk.E(err) {
422 return
423 }
424 if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
425 return
426 }
427
428 // Create pubkey -> event edge (reverse, with kind and direction for filtering)
429 graphKeyBuf.Reset()
430 if err = indexes.PubkeyEventGraphEnc(pkInfo.serial, eventKind, directionReverse, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
431 return
432 }
433 if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
434 return
435 }
436 }
437
438 // Create pubkey-to-pubkey (noun-noun) graph edges
439 // For every p-tag pubkey in this event, create a direct edge from
440 // the author pubkey to the p-tagged pubkey. This materializes the
441 // two-hop pubkey→event→pubkey traversal into a single-hop lookup.
442 // The event serial is preserved in the edge for back-traversal.
443 for _, pkInfo := range pubkeysForGraph {
444 if pkInfo.isAuthor {
445 continue // skip author→author self-edge
446 }
447 // Forward edge: author → p-tagged pubkey
448 dirOut := new(types.Letter)
449 dirOut.Set(types.EdgeDirectionPubkeyOut)
450 graphKeyBuf.Reset()
451 if err = indexes.PubkeyPubkeyGraphEnc(authorSerial, pkInfo.serial, eventKind, dirOut, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
452 return
453 }
454 if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
455 return
456 }
457
458 // Reverse edge: p-tagged pubkey ← author
459 dirIn := new(types.Letter)
460 dirIn.Set(types.EdgeDirectionPubkeyIn)
461 graphKeyBuf.Reset()
462 if err = indexes.GraphPubkeyPubkeyEnc(pkInfo.serial, eventKind, dirIn, authorSerial, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
463 return
464 }
465 if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
466 return
467 }
468 }
469
470 // Create event-to-event graph edges for e-tags
471 // This enables thread traversal and finding replies/reactions to events
472 eTags := ev.Tags.GetAll([]byte("e"))
473 for _, eTag := range eTags {
474 if eTag.Len() >= 2 {
475 // Get event ID from e-tag, handling both binary and hex storage formats
476 var targetEventID []byte
477 if targetEventID, err = hex.Dec(string(eTag.ValueHex())); err != nil || len(targetEventID) != 32 {
478 continue
479 }
480
481 // Look up the target event's serial (if it exists in our database)
482 var targetSerial *types.Uint40
483 if targetSerial, err = d.GetSerialById(targetEventID); err != nil {
484 // Target event not in our database - skip edge creation
485 // This is normal for replies to events we don't have
486 err = nil
487 continue
488 }
489
490 // Create forward edge: source event -> target event (outbound e-tag)
491 directionOut := new(types.Letter)
492 directionOut.Set(types.EdgeDirectionETagOut)
493 graphKeyBuf.Reset()
494 if err = indexes.EventEventGraphEnc(ser, targetSerial, eventKind, directionOut).MarshalWrite(graphKeyBuf); chk.E(err) {
495 return
496 }
497 if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
498 return
499 }
500
501 // Create reverse edge: target event -> source event (inbound e-tag)
502 directionIn := new(types.Letter)
503 directionIn.Set(types.EdgeDirectionETagIn)
504 graphKeyBuf.Reset()
505 if err = indexes.GraphEventEventEnc(targetSerial, eventKind, directionIn, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
506 return
507 }
508 if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
509 return
510 }
511 }
512 }
513
514 return
515 },
516 )
517 if err != nil {
518 return
519 }
520
521 // Process deletion events to actually delete the referenced events
522 if ev.Kind == kind.Deletion.K {
523 if err = d.ProcessDelete(ev, nil); chk.E(err) {
524 log.W.F("failed to process deletion for event %x: %v", ev.ID, err)
525 // Don't return error - the deletion event was saved successfully
526 err = nil
527 }
528 }
529
530 // Invalidate query cache since a new event was stored
531 // This ensures subsequent queries will see the new event
532 if d.queryCache != nil {
533 d.queryCache.Invalidate()
534 // log.T.F("SaveEvent: invalidated query cache")
535 }
536
537 return
538 }
539