compact-storage_test.go raw

   1  package database
   2  
   3  import (
   4  	"bytes"
   5  	"context"
   6  	"os"
   7  	"testing"
   8  	"time"
   9  
  10  	"github.com/dgraph-io/badger/v4"
  11  	"next.orly.dev/pkg/lol/chk"
  12  	"next.orly.dev/pkg/database/indexes"
  13  	"next.orly.dev/pkg/database/indexes/types"
  14  	"next.orly.dev/pkg/nostr/encoders/event"
  15  	"next.orly.dev/pkg/nostr/encoders/hex"
  16  	"next.orly.dev/pkg/nostr/encoders/kind"
  17  	"next.orly.dev/pkg/nostr/encoders/tag"
  18  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  19  	"next.orly.dev/pkg/nostr/interfaces/signer/p8k"
  20  )
  21  
  22  // TestCompactEventStorage tests the compact storage format (cmp prefix) which
  23  // replaced the old inline storage optimization (sev/evt prefixes).
  24  // All events are now stored in compact format regardless of size.
  25  func TestCompactEventStorage(t *testing.T) {
  26  	// Create a temporary directory for the database
  27  	tempDir, err := os.MkdirTemp("", "test-compact-db-*")
  28  	if err != nil {
  29  		t.Fatalf("Failed to create temporary directory: %v", err)
  30  	}
  31  	defer os.RemoveAll(tempDir)
  32  
  33  	// Create a context and cancel function for the database
  34  	ctx, cancel := context.WithCancel(context.Background())
  35  	defer cancel()
  36  
  37  	// Initialize the database
  38  	db, err := New(ctx, cancel, tempDir, "info")
  39  	if err != nil {
  40  		t.Fatalf("Failed to create database: %v", err)
  41  	}
  42  	defer db.Close()
  43  
  44  	// Create a signer
  45  	sign := p8k.MustNew()
  46  	if err := sign.Generate(); chk.E(err) {
  47  		t.Fatal(err)
  48  	}
  49  
  50  	// Test Case 1: Small event (should use compact storage)
  51  	t.Run("SmallEventCompactStorage", func(t *testing.T) {
  52  		smallEvent := event.New()
  53  		smallEvent.Kind = kind.TextNote.K
  54  		smallEvent.CreatedAt = timestamp.Now().V
  55  		smallEvent.Content = []byte("Hello Nostr!") // Small content
  56  		smallEvent.Pubkey = sign.Pub()
  57  		smallEvent.Tags = tag.NewS()
  58  
  59  		// Sign the event
  60  		if err := smallEvent.Sign(sign); err != nil {
  61  			t.Fatalf("Failed to sign small event: %v", err)
  62  		}
  63  
  64  		// Save the event
  65  		if _, err := db.SaveEvent(ctx, smallEvent); err != nil {
  66  			t.Fatalf("Failed to save small event: %v", err)
  67  		}
  68  
  69  		// Verify it was stored with cmp prefix
  70  		serial, err := db.GetSerialById(smallEvent.ID)
  71  		if err != nil {
  72  			t.Fatalf("Failed to get serial for small event: %v", err)
  73  		}
  74  
  75  		// Check that cmp key exists (compact format)
  76  		cmpKeyExists := false
  77  		db.View(func(txn *badger.Txn) error {
  78  			cmpBuf := new(bytes.Buffer)
  79  			indexes.CompactEventEnc(serial).MarshalWrite(cmpBuf)
  80  
  81  			_, err := txn.Get(cmpBuf.Bytes())
  82  			if err == nil {
  83  				cmpKeyExists = true
  84  			}
  85  			return nil
  86  		})
  87  
  88  		if !cmpKeyExists {
  89  			t.Errorf("Small event was not stored with cmp prefix (compact format)")
  90  		}
  91  
  92  		// Fetch and verify the event
  93  		fetchedEvent, err := db.FetchEventBySerial(serial)
  94  		if err != nil {
  95  			t.Fatalf("Failed to fetch small event: %v", err)
  96  		}
  97  
  98  		if !bytes.Equal(fetchedEvent.ID, smallEvent.ID) {
  99  			t.Errorf("Fetched event ID mismatch: got %x, want %x", fetchedEvent.ID, smallEvent.ID)
 100  		}
 101  		if !bytes.Equal(fetchedEvent.Content, smallEvent.Content) {
 102  			t.Errorf("Fetched event content mismatch: got %q, want %q", fetchedEvent.Content, smallEvent.Content)
 103  		}
 104  	})
 105  
 106  	// Test Case 2: Large event (should also use compact storage)
 107  	t.Run("LargeEventCompactStorage", func(t *testing.T) {
 108  		largeEvent := event.New()
 109  		largeEvent.Kind = kind.TextNote.K
 110  		largeEvent.CreatedAt = timestamp.Now().V
 111  		// Create larger content
 112  		largeContent := make([]byte, 1500)
 113  		for i := range largeContent {
 114  			largeContent[i] = 'x'
 115  		}
 116  		largeEvent.Content = largeContent
 117  		largeEvent.Pubkey = sign.Pub()
 118  		largeEvent.Tags = tag.NewS()
 119  
 120  		// Sign the event
 121  		if err := largeEvent.Sign(sign); err != nil {
 122  			t.Fatalf("Failed to sign large event: %v", err)
 123  		}
 124  
 125  		// Save the event
 126  		if _, err := db.SaveEvent(ctx, largeEvent); err != nil {
 127  			t.Fatalf("Failed to save large event: %v", err)
 128  		}
 129  
 130  		// Verify it was stored with cmp prefix (compact format)
 131  		serial, err := db.GetSerialById(largeEvent.ID)
 132  		if err != nil {
 133  			t.Fatalf("Failed to get serial for large event: %v", err)
 134  		}
 135  
 136  		// Check that cmp key exists
 137  		cmpKeyExists := false
 138  		db.View(func(txn *badger.Txn) error {
 139  			cmpBuf := new(bytes.Buffer)
 140  			indexes.CompactEventEnc(serial).MarshalWrite(cmpBuf)
 141  
 142  			_, err := txn.Get(cmpBuf.Bytes())
 143  			if err == nil {
 144  				cmpKeyExists = true
 145  			}
 146  			return nil
 147  		})
 148  
 149  		if !cmpKeyExists {
 150  			t.Errorf("Large event was not stored with cmp prefix (compact format)")
 151  		}
 152  
 153  		// Fetch and verify the event
 154  		fetchedEvent, err := db.FetchEventBySerial(serial)
 155  		if err != nil {
 156  			t.Fatalf("Failed to fetch large event: %v", err)
 157  		}
 158  
 159  		if !bytes.Equal(fetchedEvent.ID, largeEvent.ID) {
 160  			t.Errorf("Fetched event ID mismatch: got %x, want %x", fetchedEvent.ID, largeEvent.ID)
 161  		}
 162  	})
 163  
 164  	// Test Case 3: Batch fetch with mixed small and large events
 165  	t.Run("BatchFetchMixedEvents", func(t *testing.T) {
 166  		var serials []*types.Uint40
 167  		expectedIDs := make(map[uint64][]byte)
 168  
 169  		// Create 10 small events and 10 large events
 170  		for i := 0; i < 20; i++ {
 171  			ev := event.New()
 172  			ev.Kind = kind.TextNote.K
 173  			ev.CreatedAt = timestamp.Now().V + int64(i)
 174  			ev.Pubkey = sign.Pub()
 175  			ev.Tags = tag.NewS()
 176  
 177  			// Alternate between small and large
 178  			if i%2 == 0 {
 179  				ev.Content = []byte("Small event")
 180  			} else {
 181  				largeContent := make([]byte, 500)
 182  				for j := range largeContent {
 183  					largeContent[j] = 'x'
 184  				}
 185  				ev.Content = largeContent
 186  			}
 187  
 188  			if err := ev.Sign(sign); err != nil {
 189  				t.Fatalf("Failed to sign event %d: %v", i, err)
 190  			}
 191  
 192  			if _, err := db.SaveEvent(ctx, ev); err != nil {
 193  				t.Fatalf("Failed to save event %d: %v", i, err)
 194  			}
 195  
 196  			serial, err := db.GetSerialById(ev.ID)
 197  			if err != nil {
 198  				t.Fatalf("Failed to get serial for event %d: %v", i, err)
 199  			}
 200  
 201  			serials = append(serials, serial)
 202  			expectedIDs[serial.Get()] = ev.ID
 203  		}
 204  
 205  		// Batch fetch all events
 206  		events, err := db.FetchEventsBySerials(serials)
 207  		if err != nil {
 208  			t.Fatalf("Failed to batch fetch events: %v", err)
 209  		}
 210  
 211  		if len(events) != 20 {
 212  			t.Errorf("Expected 20 events, got %d", len(events))
 213  		}
 214  
 215  		// Verify all events were fetched correctly
 216  		for serialValue, ev := range events {
 217  			expectedID := expectedIDs[serialValue]
 218  			if !bytes.Equal(ev.ID, expectedID) {
 219  				t.Errorf("Event ID mismatch for serial %d: got %x, want %x",
 220  					serialValue, ev.ID, expectedID)
 221  			}
 222  		}
 223  	})
 224  
 225  	// Test Case 4: Edge case - event near 384 byte threshold
 226  	t.Run("ThresholdEvent", func(t *testing.T) {
 227  		ev := event.New()
 228  		ev.Kind = kind.TextNote.K
 229  		ev.CreatedAt = timestamp.Now().V
 230  		ev.Pubkey = sign.Pub()
 231  		ev.Tags = tag.NewS()
 232  
 233  		// Create content near the threshold
 234  		testContent := make([]byte, 250)
 235  		for i := range testContent {
 236  			testContent[i] = 'x'
 237  		}
 238  		ev.Content = testContent
 239  
 240  		if err := ev.Sign(sign); err != nil {
 241  			t.Fatalf("Failed to sign threshold event: %v", err)
 242  		}
 243  
 244  		if _, err := db.SaveEvent(ctx, ev); err != nil {
 245  			t.Fatalf("Failed to save threshold event: %v", err)
 246  		}
 247  
 248  		serial, err := db.GetSerialById(ev.ID)
 249  		if err != nil {
 250  			t.Fatalf("Failed to get serial: %v", err)
 251  		}
 252  
 253  		// Fetch and verify
 254  		fetchedEvent, err := db.FetchEventBySerial(serial)
 255  		if err != nil {
 256  			t.Fatalf("Failed to fetch threshold event: %v", err)
 257  		}
 258  
 259  		if !bytes.Equal(fetchedEvent.ID, ev.ID) {
 260  			t.Errorf("Fetched event ID mismatch")
 261  		}
 262  	})
 263  }
 264  
 265  // TestInlineStorageMigration tests the migration from traditional to inline storage
 266  func TestInlineStorageMigration(t *testing.T) {
 267  	// Create a temporary directory for the database
 268  	tempDir, err := os.MkdirTemp("", "test-migration-db-*")
 269  	if err != nil {
 270  		t.Fatalf("Failed to create temporary directory: %v", err)
 271  	}
 272  	defer os.RemoveAll(tempDir)
 273  
 274  	// Create a context and cancel function for the database
 275  	ctx, cancel := context.WithCancel(context.Background())
 276  	defer cancel()
 277  
 278  	// Initialize the database
 279  	db, err := New(ctx, cancel, tempDir, "info")
 280  	if err != nil {
 281  		t.Fatalf("Failed to create database: %v", err)
 282  	}
 283  
 284  	// Create a signer
 285  	sign := p8k.MustNew()
 286  	if err := sign.Generate(); chk.E(err) {
 287  		t.Fatal(err)
 288  	}
 289  
 290  	// Manually set database version to 3 (before inline storage migration)
 291  	db.writeVersionTag(3)
 292  
 293  	// Create and save some small events the old way (manually)
 294  	var testEvents []*event.E
 295  	for i := 0; i < 5; i++ {
 296  		ev := event.New()
 297  		ev.Kind = kind.TextNote.K
 298  		ev.CreatedAt = timestamp.Now().V + int64(i)
 299  		ev.Content = []byte("Test event")
 300  		ev.Pubkey = sign.Pub()
 301  		ev.Tags = tag.NewS()
 302  
 303  		if err := ev.Sign(sign); err != nil {
 304  			t.Fatalf("Failed to sign event: %v", err)
 305  		}
 306  
 307  		// Get next serial
 308  		serial, err := db.seq.Next()
 309  		if err != nil {
 310  			t.Fatalf("Failed to get serial: %v", err)
 311  		}
 312  
 313  		// Generate indexes
 314  		idxs, err := GetIndexesForEvent(ev, serial)
 315  		if err != nil {
 316  			t.Fatalf("Failed to generate indexes: %v", err)
 317  		}
 318  
 319  		// Serialize event
 320  		eventDataBuf := new(bytes.Buffer)
 321  		ev.MarshalBinary(eventDataBuf)
 322  		eventData := eventDataBuf.Bytes()
 323  
 324  		// Save the old way (evt prefix with value)
 325  		db.Update(func(txn *badger.Txn) error {
 326  			ser := new(types.Uint40)
 327  			ser.Set(serial)
 328  
 329  			// Save indexes
 330  			for _, key := range idxs {
 331  				txn.Set(key, nil)
 332  			}
 333  
 334  			// Save event the old way
 335  			keyBuf := new(bytes.Buffer)
 336  			indexes.EventEnc(ser).MarshalWrite(keyBuf)
 337  			txn.Set(keyBuf.Bytes(), eventData)
 338  
 339  			return nil
 340  		})
 341  
 342  		testEvents = append(testEvents, ev)
 343  	}
 344  
 345  	t.Logf("Created %d test events with old storage format", len(testEvents))
 346  
 347  	// Close and reopen database to trigger migration
 348  	db.Close()
 349  
 350  	db, err = New(ctx, cancel, tempDir, "info")
 351  	if err != nil {
 352  		t.Fatalf("Failed to reopen database: %v", err)
 353  	}
 354  	defer db.Close()
 355  
 356  	// Give migration time to complete
 357  	time.Sleep(100 * time.Millisecond)
 358  
 359  	// Verify all events can still be fetched
 360  	for i, ev := range testEvents {
 361  		serial, err := db.GetSerialById(ev.ID)
 362  		if err != nil {
 363  			t.Fatalf("Failed to get serial for event %d after migration: %v", i, err)
 364  		}
 365  
 366  		fetchedEvent, err := db.FetchEventBySerial(serial)
 367  		if err != nil {
 368  			t.Fatalf("Failed to fetch event %d after migration: %v", i, err)
 369  		}
 370  
 371  		if !bytes.Equal(fetchedEvent.ID, ev.ID) {
 372  			t.Errorf("Event %d ID mismatch after migration: got %x, want %x",
 373  				i, fetchedEvent.ID, ev.ID)
 374  		}
 375  
 376  		if !bytes.Equal(fetchedEvent.Content, ev.Content) {
 377  			t.Errorf("Event %d content mismatch after migration: got %q, want %q",
 378  				i, fetchedEvent.Content, ev.Content)
 379  		}
 380  
 381  		// Verify it's now using optimized storage (sev inline OR cmp compact format)
 382  		// The migration may convert to sev (version 4) or cmp (version 6) depending on migration order
 383  		optimizedStorageExists := false
 384  		db.View(func(txn *badger.Txn) error {
 385  			// Check for sev (small event inline) format
 386  			smallBuf := new(bytes.Buffer)
 387  			indexes.SmallEventEnc(serial).MarshalWrite(smallBuf)
 388  
 389  			opts := badger.DefaultIteratorOptions
 390  			opts.Prefix = smallBuf.Bytes()
 391  			it := txn.NewIterator(opts)
 392  			defer it.Close()
 393  
 394  			it.Rewind()
 395  			if it.Valid() {
 396  				optimizedStorageExists = true
 397  				t.Logf("Event %d (%s) successfully migrated to inline (sev) storage",
 398  					i, hex.Enc(ev.ID[:8]))
 399  				return nil
 400  			}
 401  
 402  			// Check for cmp (compact format) storage
 403  			cmpBuf := new(bytes.Buffer)
 404  			indexes.CompactEventEnc(serial).MarshalWrite(cmpBuf)
 405  			if _, err := txn.Get(cmpBuf.Bytes()); err == nil {
 406  				optimizedStorageExists = true
 407  				t.Logf("Event %d (%s) successfully migrated to compact (cmp) storage",
 408  					i, hex.Enc(ev.ID[:8]))
 409  			}
 410  			return nil
 411  		})
 412  
 413  		if !optimizedStorageExists {
 414  			t.Errorf("Event %d was not migrated to optimized storage (sev or cmp)", i)
 415  		}
 416  	}
 417  }
 418  
 419  // BenchmarkCompactStorage benchmarks the compact storage format performance
 420  func BenchmarkCompactStorage(b *testing.B) {
 421  	// Create a temporary directory for the database
 422  	tempDir, err := os.MkdirTemp("", "bench-inline-db-*")
 423  	if err != nil {
 424  		b.Fatalf("Failed to create temporary directory: %v", err)
 425  	}
 426  	defer os.RemoveAll(tempDir)
 427  
 428  	// Create a context and cancel function for the database
 429  	ctx, cancel := context.WithCancel(context.Background())
 430  	defer cancel()
 431  
 432  	// Initialize the database
 433  	db, err := New(ctx, cancel, tempDir, "info")
 434  	if err != nil {
 435  		b.Fatalf("Failed to create database: %v", err)
 436  	}
 437  	defer db.Close()
 438  
 439  	// Create a signer
 440  	sign := p8k.MustNew()
 441  	if err := sign.Generate(); chk.E(err) {
 442  		b.Fatal(err)
 443  	}
 444  
 445  	// Pre-populate database with mix of small and large events
 446  	var smallSerials []*types.Uint40
 447  	var largeSerials []*types.Uint40
 448  
 449  	for i := 0; i < 100; i++ {
 450  		// Small event
 451  		smallEv := event.New()
 452  		smallEv.Kind = kind.TextNote.K
 453  		smallEv.CreatedAt = timestamp.Now().V + int64(i)*2
 454  		smallEv.Content = []byte("Small test event")
 455  		smallEv.Pubkey = sign.Pub()
 456  		smallEv.Tags = tag.NewS()
 457  		smallEv.Sign(sign)
 458  
 459  		db.SaveEvent(ctx, smallEv)
 460  		if serial, err := db.GetSerialById(smallEv.ID); err == nil {
 461  			smallSerials = append(smallSerials, serial)
 462  		}
 463  
 464  		// Large event
 465  		largeEv := event.New()
 466  		largeEv.Kind = kind.TextNote.K
 467  		largeEv.CreatedAt = timestamp.Now().V + int64(i)*2 + 1
 468  		largeContent := make([]byte, 500)
 469  		for j := range largeContent {
 470  			largeContent[j] = 'x'
 471  		}
 472  		largeEv.Content = largeContent
 473  		largeEv.Pubkey = sign.Pub()
 474  		largeEv.Tags = tag.NewS()
 475  		largeEv.Sign(sign)
 476  
 477  		db.SaveEvent(ctx, largeEv)
 478  		if serial, err := db.GetSerialById(largeEv.ID); err == nil {
 479  			largeSerials = append(largeSerials, serial)
 480  		}
 481  	}
 482  
 483  	b.Run("FetchSmallEventsCompact", func(b *testing.B) {
 484  		b.ResetTimer()
 485  		for i := 0; i < b.N; i++ {
 486  			idx := i % len(smallSerials)
 487  			db.FetchEventBySerial(smallSerials[idx])
 488  		}
 489  	})
 490  
 491  	b.Run("FetchLargeEventsCompact", func(b *testing.B) {
 492  		b.ResetTimer()
 493  		for i := 0; i < b.N; i++ {
 494  			idx := i % len(largeSerials)
 495  			db.FetchEventBySerial(largeSerials[idx])
 496  		}
 497  	})
 498  
 499  	b.Run("BatchFetchSmallEvents", func(b *testing.B) {
 500  		b.ResetTimer()
 501  		for i := 0; i < b.N; i++ {
 502  			db.FetchEventsBySerials(smallSerials[:10])
 503  		}
 504  	})
 505  
 506  	b.Run("BatchFetchLargeEvents", func(b *testing.B) {
 507  		b.ResetTimer()
 508  		for i := 0; i < b.N; i++ {
 509  			db.FetchEventsBySerials(largeSerials[:10])
 510  		}
 511  	})
 512  }
 513