save-event.go raw

   1  //go:build !(js && wasm)
   2  
   3  package database
   4  
   5  import (
   6  	"context"
   7  	"errors"
   8  	"fmt"
   9  	"strings"
  10  
  11  	"github.com/dgraph-io/badger/v4"
  12  	"next.orly.dev/pkg/lol/chk"
  13  	"next.orly.dev/pkg/lol/log"
  14  	"next.orly.dev/pkg/database/bufpool"
  15  	"next.orly.dev/pkg/database/indexes"
  16  	"next.orly.dev/pkg/database/indexes/types"
  17  	"next.orly.dev/pkg/mode"
  18  	"next.orly.dev/pkg/nostr/encoders/event"
  19  	"next.orly.dev/pkg/nostr/encoders/filter"
  20  	"next.orly.dev/pkg/nostr/encoders/hex"
  21  	"next.orly.dev/pkg/nostr/encoders/kind"
  22  	"next.orly.dev/pkg/nostr/encoders/tag"
  23  )
  24  
  25  var (
  26  	// ErrOlderThanExisting is returned when a candidate event is older than an existing replaceable/addressable event.
  27  	ErrOlderThanExisting = errors.New("older than existing event")
  28  	// ErrMissingDTag is returned when a parameterized replaceable event lacks the required 'd' tag.
  29  	ErrMissingDTag = errors.New("event is missing a d tag identifier")
  30  )
  31  
  32  func (d *D) GetSerialsFromFilter(f *filter.F) (
  33  	sers types.Uint40s, err error,
  34  ) {
  35  	// Try p-tag graph optimization first
  36  	if CanUsePTagGraph(f) {
  37  		log.D.F("GetSerialsFromFilter: trying p-tag graph optimization")
  38  		if sers, err = d.QueryPTagGraph(f); err == nil && len(sers) >= 0 {
  39  			log.D.F("GetSerialsFromFilter: p-tag graph optimization returned %d serials", len(sers))
  40  			return
  41  		}
  42  		// Fall through to traditional indexes on error
  43  		log.D.F("GetSerialsFromFilter: p-tag graph optimization failed, falling back to traditional indexes: %v", err)
  44  		err = nil
  45  	}
  46  
  47  	var idxs []Range
  48  	if idxs, err = GetIndexesFromFilter(f); chk.E(err) {
  49  		return
  50  	}
  51  	// Pre-allocate slice with estimated capacity to reduce reallocations
  52  	sers = make(
  53  		types.Uint40s, 0, len(idxs)*100,
  54  	) // Estimate 100 serials per index
  55  	for _, idx := range idxs {
  56  		var s types.Uint40s
  57  		if s, err = d.GetSerialsByRange(idx); chk.E(err) {
  58  			continue
  59  		}
  60  		sers = append(sers, s...)
  61  	}
  62  	return
  63  }
  64  
  65  // WouldReplaceEvent checks if the provided event would replace existing events
  66  // based on Nostr's replaceable or parameterized replaceable semantics. It
  67  // returns true if the candidate is newer-or-equal than existing events.
  68  // If an existing event is newer, it returns (false, nil, ErrOlderThanExisting).
  69  // If no conflicts exist, it returns (false, nil, nil).
  70  func (d *D) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) {
  71  	// Only relevant for replaceable or parameterized replaceable kinds
  72  	if !(kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind)) {
  73  		return false, nil, nil
  74  	}
  75  
  76  	// Fast path for parameterized replaceable events using AddressableEvent index
  77  	if kind.IsParameterizedReplaceable(ev.Kind) {
  78  		dTag := ev.Tags.GetFirst([]byte("d"))
  79  		if dTag == nil {
  80  			return false, nil, ErrMissingDTag
  81  		}
  82  
  83  		// Build filter for direct lookup
  84  		f := &filter.F{
  85  			Authors: tag.NewFromBytesSlice(ev.Pubkey),
  86  			Kinds:   kind.NewS(kind.New(ev.Kind)),
  87  			Tags: tag.NewS(
  88  				tag.NewFromAny("#d", dTag.Value()),
  89  			),
  90  		}
  91  
  92  		// Try direct O(1) lookup via AddressableEvent index
  93  		if serial, err := d.QueryForAddressableEvent(f); err == nil && serial != nil {
  94  			oldEv, ferr := d.FetchEventBySerial(serial)
  95  			if ferr == nil && oldEv != nil {
  96  				if ev.CreatedAt < oldEv.CreatedAt {
  97  					return false, nil, ErrOlderThanExisting
  98  				}
  99  				// Candidate is newer or same age - it should replace
 100  				return true, nil, nil
 101  			}
 102  			// Fetch failed - fall through to slow path
 103  		}
 104  		// Index miss (migration pending) - fall through to slow path
 105  	}
 106  
 107  	// Standard path for replaceable events and fallback for parameterized replaceable
 108  	var f *filter.F
 109  	if kind.IsReplaceable(ev.Kind) {
 110  		f = &filter.F{
 111  			Authors: tag.NewFromBytesSlice(ev.Pubkey),
 112  			Kinds:   kind.NewS(kind.New(ev.Kind)),
 113  		}
 114  	} else {
 115  		// parameterized replaceable - build filter for slow path
 116  		dTag := ev.Tags.GetFirst([]byte("d"))
 117  		if dTag == nil {
 118  			return false, nil, ErrMissingDTag
 119  		}
 120  		f = &filter.F{
 121  			Authors: tag.NewFromBytesSlice(ev.Pubkey),
 122  			Kinds:   kind.NewS(kind.New(ev.Kind)),
 123  			Tags: tag.NewS(
 124  				tag.NewFromAny("d", dTag.Value()),
 125  			),
 126  		}
 127  	}
 128  
 129  	sers, err := d.GetSerialsFromFilter(f)
 130  	if chk.E(err) {
 131  		return false, nil, err
 132  	}
 133  	if len(sers) == 0 {
 134  		return false, nil, nil
 135  	}
 136  
 137  	// Determine if any existing event is newer than the candidate
 138  	shouldReplace := true
 139  	for _, s := range sers {
 140  		oldEv, ferr := d.FetchEventBySerial(s)
 141  		if chk.E(ferr) {
 142  			continue
 143  		}
 144  		if ev.CreatedAt < oldEv.CreatedAt {
 145  			shouldReplace = false
 146  			break
 147  		}
 148  	}
 149  	if shouldReplace {
 150  		return true, nil, nil
 151  	}
 152  	return false, nil, ErrOlderThanExisting
 153  }
 154  
 155  // SaveEvent saves an event to the database, generating all the necessary indexes.
 156  func (d *D) SaveEvent(c context.Context, ev *event.E) (
 157  	replaced bool, err error,
 158  ) {
 159  	if ev == nil {
 160  		err = errors.New("nil event")
 161  		return
 162  	}
 163  
 164  	// Reject ephemeral events (kinds 20000-29999) - they should never be stored
 165  	if ev.Kind >= 20000 && ev.Kind <= 29999 {
 166  		err = errors.New("blocked: ephemeral events should not be stored")
 167  		return
 168  	}
 169  
 170  	// Validate kind 3 (follow list) events have at least one p tag
 171  	// This prevents storing malformed follow lists that may come from buggy relays
 172  	if ev.Kind == 3 {
 173  		hasPTag := false
 174  		tagCount := 0
 175  		if ev.Tags != nil {
 176  			tagCount = ev.Tags.Len()
 177  			for _, tag := range *ev.Tags {
 178  				if tag != nil && tag.Len() >= 2 {
 179  					key := tag.Key()
 180  					if len(key) == 1 && key[0] == 'p' {
 181  						hasPTag = true
 182  						break
 183  					}
 184  				}
 185  			}
 186  		}
 187  		if !hasPTag {
 188  			log.W.F("SaveEvent: rejecting kind 3 event without p tags from pubkey %x (total tags: %d, event ID: %x)",
 189  				ev.Pubkey, tagCount, ev.ID)
 190  			err = errors.New("blocked: kind 3 follow list events must have at least one p tag")
 191  			return
 192  		}
 193  	}
 194  
 195  	// check if the event already exists
 196  	var ser *types.Uint40
 197  	if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil {
 198  		err = errors.New("blocked: event already exists: " + hex.Enc(ev.ID[:]))
 199  		return
 200  	}
 201  
 202  	// If the error is "id not found", we can proceed with saving the event
 203  	if err != nil && strings.Contains(err.Error(), "id not found in database") {
 204  		// Reset error since this is expected for new events
 205  		err = nil
 206  	} else if err != nil {
 207  		// For any other error, return it
 208  		// log.E.F("error checking if event exists: %s", err)
 209  		return
 210  	}
 211  
 212  	// Check if the event has been deleted before allowing resubmission
 213  	// Skip deletion check when ACL is "none" (open relay mode)
 214  	if !mode.IsOpen() {
 215  		if err = d.CheckForDeleted(ev, nil); err != nil {
 216  			// log.I.F(
 217  			// 	"SaveEvent: rejecting resubmission of deleted event ID=%s: %v",
 218  			// 	hex.Enc(ev.ID), err,
 219  			// )
 220  			err = fmt.Errorf("blocked: %s", err.Error())
 221  			return
 222  		}
 223  	}
 224  	// check for replacement - only validate, don't delete old events
 225  	if kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind) {
 226  		var werr error
 227  		if replaced, _, werr = d.WouldReplaceEvent(ev); werr != nil {
 228  			if errors.Is(werr, ErrOlderThanExisting) {
 229  				if kind.IsReplaceable(ev.Kind) {
 230  					err = errors.New("blocked: event is older than existing replaceable event")
 231  				} else {
 232  					err = errors.New("blocked: event is older than existing addressable event")
 233  				}
 234  				return
 235  			}
 236  			if errors.Is(werr, ErrMissingDTag) {
 237  				// keep behavior consistent with previous implementation
 238  				err = ErrMissingDTag
 239  				return
 240  			}
 241  			// any other error
 242  			return
 243  		}
 244  		// Note: replaced flag is kept for compatibility but old events are no longer deleted
 245  	}
 246  	// Get the next sequence number for the event
 247  	var serial uint64
 248  	if serial, err = d.seq.Next(); chk.E(err) {
 249  		return
 250  	}
 251  	// Generate all indexes for the event
 252  	var idxs [][]byte
 253  	if idxs, err = GetIndexesForEvent(ev, serial); chk.E(err) {
 254  		return
 255  	}
 256  
 257  	// Collect all pubkeys for graph: author + p-tags
 258  	// Store with direction indicator: author (0) vs p-tag (1)
 259  	type pubkeyWithDirection struct {
 260  		serial    *types.Uint40
 261  		isAuthor  bool
 262  	}
 263  	pubkeysForGraph := make(map[string]pubkeyWithDirection)
 264  
 265  	// Add author pubkey
 266  	var authorSerial *types.Uint40
 267  	if authorSerial, err = d.GetOrCreatePubkeySerial(ev.Pubkey); chk.E(err) {
 268  		return
 269  	}
 270  	pubkeysForGraph[hex.Enc(ev.Pubkey)] = pubkeyWithDirection{
 271  		serial:   authorSerial,
 272  		isAuthor: true,
 273  	}
 274  
 275  	// Extract p-tag pubkeys using GetAll
 276  	pTags := ev.Tags.GetAll([]byte("p"))
 277  	for _, pTag := range pTags {
 278  		if pTag.Len() >= 2 {
 279  			// Get pubkey from p-tag, handling both binary and hex storage formats
 280  			// ValueHex() returns hex regardless of internal storage format
 281  			var ptagPubkey []byte
 282  			if ptagPubkey, err = hex.Dec(string(pTag.ValueHex())); err == nil && len(ptagPubkey) == 32 {
 283  				pkHex := hex.Enc(ptagPubkey)
 284  				// Skip if already added as author
 285  				if _, exists := pubkeysForGraph[pkHex]; !exists {
 286  					var ptagSerial *types.Uint40
 287  					if ptagSerial, err = d.GetOrCreatePubkeySerial(ptagPubkey); chk.E(err) {
 288  						return
 289  					}
 290  					pubkeysForGraph[pkHex] = pubkeyWithDirection{
 291  						serial:   ptagSerial,
 292  						isAuthor: false,
 293  					}
 294  				}
 295  			}
 296  		}
 297  	}
 298  	// log.T.F(
 299  	// 	"SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs),
 300  	// 	ev.ID, ev.Kind,
 301  	// )
 302  
 303  	// Create serial resolver for compact encoding
 304  	resolver := NewDatabaseSerialResolver(d, d.serialCache)
 305  
 306  	// Serialize event in compact format using serial references
 307  	// This dramatically reduces storage by replacing 32-byte IDs/pubkeys with 5-byte serials
 308  	compactData, compactErr := MarshalCompactEvent(ev, resolver)
 309  
 310  	// Calculate legacy size for comparison (for metrics tracking)
 311  	// We marshal to get accurate size comparison
 312  	legacyBuf := bufpool.GetMedium()
 313  	defer bufpool.PutMedium(legacyBuf)
 314  	ev.MarshalBinary(legacyBuf)
 315  	legacySize := legacyBuf.Len()
 316  
 317  	if compactErr != nil {
 318  		// Fall back to legacy format if compact encoding fails
 319  		log.W.F("SaveEvent: compact encoding failed, using legacy format: %v", compactErr)
 320  		compactData = bufpool.CopyBytes(legacyBuf)
 321  	} else {
 322  		// Track storage savings
 323  		TrackCompactSaving(legacySize, len(compactData))
 324  		log.T.F("SaveEvent: compact %d bytes vs legacy %d bytes (saved %d bytes, %.1f%%)",
 325  			len(compactData), legacySize, legacySize-len(compactData),
 326  			float64(legacySize-len(compactData))/float64(legacySize)*100.0)
 327  	}
 328  
 329  	// Start a transaction to save the event and all its indexes
 330  	err = d.Update(
 331  		func(txn *badger.Txn) (err error) {
 332  			// Pre-allocate key buffer to avoid allocations in loop
 333  			ser := new(types.Uint40)
 334  			if err = ser.Set(serial); chk.E(err) {
 335  				return
 336  			}
 337  
 338  			// Save each index
 339  			for _, key := range idxs {
 340  				if err = txn.Set(key, nil); chk.E(err) {
 341  					return
 342  				}
 343  			}
 344  
 345  			// Store the SerialEventId mapping (serial -> full 32-byte event ID)
 346  			// This is required for reconstructing compact events
 347  			if err = d.StoreEventIdSerial(txn, serial, ev.ID); chk.E(err) {
 348  				return
 349  			}
 350  
 351  			// Cache the event ID mapping
 352  			d.serialCache.CacheEventId(serial, ev.ID)
 353  
 354  			// Write AddressableEvent index for parameterized replaceable events (kinds 30000-39999)
 355  			// This enables O(1) direct lookup by pubkey + kind + d-tag for NIP-33 queries
 356  			// Key: aev|pubkey_hash|kind|dtag_hash, Value: serial (5 bytes)
 357  			if kind.IsParameterizedReplaceable(ev.Kind) {
 358  				dTag := ev.Tags.GetFirst([]byte("d"))
 359  				if dTag != nil {
 360  					aevKey, keyErr := BuildAddressableEventKey(ev.Pubkey, ev.Kind, dTag.Value())
 361  					if keyErr == nil {
 362  						// Serialize the serial as the value
 363  						serBuf := bufpool.GetSmall()
 364  						if err = ser.MarshalWrite(serBuf); chk.E(err) {
 365  							bufpool.PutSmall(serBuf)
 366  							return
 367  						}
 368  						if err = txn.Set(aevKey, bufpool.CopyBytes(serBuf)); chk.E(err) {
 369  							bufpool.PutSmall(serBuf)
 370  							return
 371  						}
 372  						bufpool.PutSmall(serBuf)
 373  						log.T.F("SaveEvent: wrote AddressableEvent index for kind=%d d=%s", ev.Kind, string(dTag.Value()))
 374  					}
 375  				}
 376  			}
 377  
 378  			// Store compact event with cmp prefix
 379  			// Format: cmp|serial|compact_event_data
 380  			// This is the only storage format - legacy evt/sev/aev/rev prefixes
 381  			// are handled by migration and no longer written for new events
 382  			cmpKeyBuf := bufpool.GetSmall()
 383  			if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKeyBuf); chk.E(err) {
 384  				bufpool.PutSmall(cmpKeyBuf)
 385  				return
 386  			}
 387  			if err = txn.Set(bufpool.CopyBytes(cmpKeyBuf), compactData); chk.E(err) {
 388  				bufpool.PutSmall(cmpKeyBuf)
 389  				return
 390  			}
 391  			bufpool.PutSmall(cmpKeyBuf)
 392  
 393  			// Create graph edges between event and all related pubkeys
 394  			// This creates bidirectional edges: event->pubkey and pubkey->event
 395  			// Include the event kind and direction for efficient graph queries
 396  			eventKind := new(types.Uint16)
 397  			eventKind.Set(ev.Kind)
 398  
 399  			// Reuse a single buffer for graph edge keys (reset between uses)
 400  			graphKeyBuf := bufpool.GetSmall()
 401  			defer bufpool.PutSmall(graphKeyBuf)
 402  
 403  			for _, pkInfo := range pubkeysForGraph {
 404  				// Determine direction for forward edge (event -> pubkey perspective)
 405  				directionForward := new(types.Letter)
 406  				// Determine direction for reverse edge (pubkey -> event perspective)
 407  				directionReverse := new(types.Letter)
 408  
 409  				if pkInfo.isAuthor {
 410  					// Event author relationship
 411  					directionForward.Set(types.EdgeDirectionAuthor)  // 0: author
 412  					directionReverse.Set(types.EdgeDirectionAuthor)  // 0: is author of event
 413  				} else {
 414  					// P-tag relationship
 415  					directionForward.Set(types.EdgeDirectionPTagOut) // 1: event references pubkey (outbound)
 416  					directionReverse.Set(types.EdgeDirectionPTagIn)  // 2: pubkey is referenced (inbound)
 417  				}
 418  
 419  				// Create event -> pubkey edge (with kind and direction)
 420  				graphKeyBuf.Reset()
 421  				if err = indexes.EventPubkeyGraphEnc(ser, pkInfo.serial, eventKind, directionForward).MarshalWrite(graphKeyBuf); chk.E(err) {
 422  					return
 423  				}
 424  				if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
 425  					return
 426  				}
 427  
 428  				// Create pubkey -> event edge (reverse, with kind and direction for filtering)
 429  				graphKeyBuf.Reset()
 430  				if err = indexes.PubkeyEventGraphEnc(pkInfo.serial, eventKind, directionReverse, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
 431  					return
 432  				}
 433  				if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
 434  					return
 435  				}
 436  			}
 437  
 438  			// Create pubkey-to-pubkey (noun-noun) graph edges
 439  			// For every p-tag pubkey in this event, create a direct edge from
 440  			// the author pubkey to the p-tagged pubkey. This materializes the
 441  			// two-hop pubkey→event→pubkey traversal into a single-hop lookup.
 442  			// The event serial is preserved in the edge for back-traversal.
 443  			for _, pkInfo := range pubkeysForGraph {
 444  				if pkInfo.isAuthor {
 445  					continue // skip author→author self-edge
 446  				}
 447  				// Forward edge: author → p-tagged pubkey
 448  				dirOut := new(types.Letter)
 449  				dirOut.Set(types.EdgeDirectionPubkeyOut)
 450  				graphKeyBuf.Reset()
 451  				if err = indexes.PubkeyPubkeyGraphEnc(authorSerial, pkInfo.serial, eventKind, dirOut, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
 452  					return
 453  				}
 454  				if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
 455  					return
 456  				}
 457  
 458  				// Reverse edge: p-tagged pubkey ← author
 459  				dirIn := new(types.Letter)
 460  				dirIn.Set(types.EdgeDirectionPubkeyIn)
 461  				graphKeyBuf.Reset()
 462  				if err = indexes.GraphPubkeyPubkeyEnc(pkInfo.serial, eventKind, dirIn, authorSerial, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
 463  					return
 464  				}
 465  				if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
 466  					return
 467  				}
 468  			}
 469  
 470  			// Create event-to-event graph edges for e-tags
 471  			// This enables thread traversal and finding replies/reactions to events
 472  			eTags := ev.Tags.GetAll([]byte("e"))
 473  			for _, eTag := range eTags {
 474  				if eTag.Len() >= 2 {
 475  					// Get event ID from e-tag, handling both binary and hex storage formats
 476  					var targetEventID []byte
 477  					if targetEventID, err = hex.Dec(string(eTag.ValueHex())); err != nil || len(targetEventID) != 32 {
 478  						continue
 479  					}
 480  
 481  					// Look up the target event's serial (if it exists in our database)
 482  					var targetSerial *types.Uint40
 483  					if targetSerial, err = d.GetSerialById(targetEventID); err != nil {
 484  						// Target event not in our database - skip edge creation
 485  						// This is normal for replies to events we don't have
 486  						err = nil
 487  						continue
 488  					}
 489  
 490  					// Create forward edge: source event -> target event (outbound e-tag)
 491  					directionOut := new(types.Letter)
 492  					directionOut.Set(types.EdgeDirectionETagOut)
 493  					graphKeyBuf.Reset()
 494  					if err = indexes.EventEventGraphEnc(ser, targetSerial, eventKind, directionOut).MarshalWrite(graphKeyBuf); chk.E(err) {
 495  						return
 496  					}
 497  					if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
 498  						return
 499  					}
 500  
 501  					// Create reverse edge: target event -> source event (inbound e-tag)
 502  					directionIn := new(types.Letter)
 503  					directionIn.Set(types.EdgeDirectionETagIn)
 504  					graphKeyBuf.Reset()
 505  					if err = indexes.GraphEventEventEnc(targetSerial, eventKind, directionIn, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
 506  						return
 507  					}
 508  					if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
 509  						return
 510  					}
 511  				}
 512  			}
 513  
 514  			return
 515  		},
 516  	)
 517  	if err != nil {
 518  		return
 519  	}
 520  
 521  	// Process deletion events to actually delete the referenced events
 522  	if ev.Kind == kind.Deletion.K {
 523  		if err = d.ProcessDelete(ev, nil); chk.E(err) {
 524  			log.W.F("failed to process deletion for event %x: %v", ev.ID, err)
 525  			// Don't return error - the deletion event was saved successfully
 526  			err = nil
 527  		}
 528  	}
 529  
 530  	// Invalidate query cache since a new event was stored
 531  	// This ensures subsequent queries will see the new event
 532  	if d.queryCache != nil {
 533  		d.queryCache.Invalidate()
 534  		// log.T.F("SaveEvent: invalidated query cache")
 535  	}
 536  
 537  	return
 538  }
 539