neo4j.go raw

   1  // Package neo4j provides a Neo4j-based implementation of the database interface.
   2  // Neo4j is a native graph database optimized for relationship-heavy queries,
   3  // making it ideal for Nostr's social graph and event reference patterns.
   4  package neo4j
   5  
   6  import (
   7  	"context"
   8  	"fmt"
   9  	"os"
  10  	"path/filepath"
  11  	"strings"
  12  	"time"
  13  
  14  	"github.com/neo4j/neo4j-go-driver/v5/neo4j"
  15  	"next.orly.dev/pkg/lol"
  16  	"next.orly.dev/pkg/lol/chk"
  17  	"next.orly.dev/pkg/database"
  18  	"next.orly.dev/pkg/nostr/encoders/event"
  19  	"next.orly.dev/pkg/nostr/encoders/filter"
  20  	"next.orly.dev/pkg/utils/apputil"
  21  )
  22  
  23  // Default configuration values (used when config values are 0 or not set)
  24  const (
  25  	defaultMaxConcurrentQueries = 10
  26  	defaultMaxConnPoolSize      = 25
  27  	defaultFetchSize            = 1000
  28  	defaultMaxTxRetrySeconds    = 30
  29  	defaultQueryResultLimit     = 10000
  30  )
  31  
  32  // maxRetryAttempts is the maximum number of times to retry a query on rate limit
  33  const maxRetryAttempts = 3
  34  
  35  // retryBaseDelay is the base delay for exponential backoff
  36  const retryBaseDelay = 500 * time.Millisecond
  37  
  38  // N implements the database.Database interface using Neo4j as the storage backend
  39  type N struct {
  40  	ctx     context.Context
  41  	cancel  context.CancelFunc
  42  	dataDir string
  43  	Logger  *logger
  44  
  45  	// Neo4j client connection
  46  	driver neo4j.DriverWithContext
  47  
  48  	// Configuration
  49  	neo4jURI      string
  50  	neo4jUser     string
  51  	neo4jPassword string
  52  
  53  	// Driver tuning options
  54  	maxConnPoolSize   int // max connections in pool
  55  	fetchSize         int // records per fetch batch
  56  	maxTxRetryTime    time.Duration
  57  	queryResultLimit  int // max results per query (0=unlimited)
  58  
  59  	ready chan struct{} // Closed when database is ready to serve requests
  60  
  61  	// querySem limits concurrent queries to prevent rate limiting
  62  	querySem chan struct{}
  63  }
  64  
  65  // Ensure N implements database.Database interface at compile time
  66  var _ database.Database = (*N)(nil)
  67  
  68  // CollectedResult wraps pre-fetched Neo4j records for iteration after session close
  69  // This is necessary because Neo4j results are lazy and need an open session for iteration
  70  type CollectedResult struct {
  71  	records []*neo4j.Record
  72  	index   int
  73  }
  74  
  75  // Next advances to the next record, returning true if there is one
  76  func (r *CollectedResult) Next(ctx context.Context) bool {
  77  	r.index++
  78  	return r.index < len(r.records)
  79  }
  80  
  81  // Record returns the current record
  82  func (r *CollectedResult) Record() *neo4j.Record {
  83  	if r.index < 0 || r.index >= len(r.records) {
  84  		return nil
  85  	}
  86  	return r.records[r.index]
  87  }
  88  
  89  // Len returns the number of records
  90  func (r *CollectedResult) Len() int {
  91  	return len(r.records)
  92  }
  93  
  94  // Err returns any error from iteration (always nil for pre-collected results)
  95  // This method satisfies the resultiter.Neo4jResultIterator interface
  96  func (r *CollectedResult) Err() error {
  97  	return nil
  98  }
  99  
 100  // init registers the neo4j database factory
 101  func init() {
 102  	database.RegisterNeo4jFactory(func(
 103  		ctx context.Context,
 104  		cancel context.CancelFunc,
 105  		cfg *database.DatabaseConfig,
 106  	) (database.Database, error) {
 107  		return NewWithConfig(ctx, cancel, cfg)
 108  	})
 109  }
 110  
 111  // NewWithConfig creates a new Neo4j-based database instance with full configuration.
 112  // Configuration is passed from the centralized app config via DatabaseConfig.
 113  func NewWithConfig(
 114  	ctx context.Context, cancel context.CancelFunc, cfg *database.DatabaseConfig,
 115  ) (
 116  	n *N, err error,
 117  ) {
 118  	// Apply defaults for empty values
 119  	neo4jURI := cfg.Neo4jURI
 120  	if neo4jURI == "" {
 121  		neo4jURI = "bolt://localhost:7687"
 122  	}
 123  	neo4jUser := cfg.Neo4jUser
 124  	if neo4jUser == "" {
 125  		neo4jUser = "neo4j"
 126  	}
 127  	neo4jPassword := cfg.Neo4jPassword
 128  	if neo4jPassword == "" {
 129  		neo4jPassword = "password"
 130  	}
 131  
 132  	// Apply defaults for driver tuning options
 133  	maxConnPoolSize := cfg.Neo4jMaxConnPoolSize
 134  	if maxConnPoolSize <= 0 {
 135  		maxConnPoolSize = defaultMaxConnPoolSize
 136  	}
 137  	fetchSize := cfg.Neo4jFetchSize
 138  	if fetchSize == 0 {
 139  		fetchSize = defaultFetchSize
 140  	}
 141  	maxTxRetrySeconds := cfg.Neo4jMaxTxRetrySeconds
 142  	if maxTxRetrySeconds <= 0 {
 143  		maxTxRetrySeconds = defaultMaxTxRetrySeconds
 144  	}
 145  	queryResultLimit := cfg.Neo4jQueryResultLimit
 146  	if queryResultLimit == 0 {
 147  		queryResultLimit = defaultQueryResultLimit
 148  	}
 149  
 150  	n = &N{
 151  		ctx:              ctx,
 152  		cancel:           cancel,
 153  		dataDir:          cfg.DataDir,
 154  		Logger:           NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir),
 155  		neo4jURI:         neo4jURI,
 156  		neo4jUser:        neo4jUser,
 157  		neo4jPassword:    neo4jPassword,
 158  		maxConnPoolSize:  maxConnPoolSize,
 159  		fetchSize:        fetchSize,
 160  		maxTxRetryTime:   time.Duration(maxTxRetrySeconds) * time.Second,
 161  		queryResultLimit: queryResultLimit,
 162  		ready:            make(chan struct{}),
 163  		querySem:         make(chan struct{}, defaultMaxConcurrentQueries),
 164  	}
 165  
 166  	// Ensure the data directory exists
 167  	if err = os.MkdirAll(cfg.DataDir, 0755); chk.E(err) {
 168  		return
 169  	}
 170  
 171  	// Ensure directory structure
 172  	dummyFile := filepath.Join(cfg.DataDir, "dummy.sst")
 173  	if err = apputil.EnsureDir(dummyFile); chk.E(err) {
 174  		return
 175  	}
 176  
 177  	// Initialize neo4j client connection
 178  	if err = n.initNeo4jClient(); chk.E(err) {
 179  		return
 180  	}
 181  
 182  	// Apply Nostr schema to neo4j (create constraints and indexes)
 183  	if err = n.applySchema(ctx); chk.E(err) {
 184  		return
 185  	}
 186  
 187  	// Run database migrations (e.g., Author -> NostrUser consolidation)
 188  	n.RunMigrations()
 189  
 190  	// Initialize serial counter
 191  	if err = n.initSerialCounter(); chk.E(err) {
 192  		return
 193  	}
 194  
 195  	// Start warmup goroutine to signal when database is ready
 196  	go n.warmup()
 197  
 198  	// Setup shutdown handler
 199  	go func() {
 200  		<-n.ctx.Done()
 201  		n.cancel()
 202  		if n.driver != nil {
 203  			n.driver.Close(context.Background())
 204  		}
 205  	}()
 206  
 207  	return
 208  }
 209  
 210  // New creates a new Neo4j-based database instance with default configuration.
 211  // This is provided for backward compatibility with existing callers (tests, etc.).
 212  // For full configuration control, use NewWithConfig instead.
 213  func New(
 214  	ctx context.Context, cancel context.CancelFunc, dataDir, logLevel string,
 215  ) (
 216  	n *N, err error,
 217  ) {
 218  	cfg := &database.DatabaseConfig{
 219  		DataDir:  dataDir,
 220  		LogLevel: logLevel,
 221  	}
 222  	return NewWithConfig(ctx, cancel, cfg)
 223  }
 224  
 225  // initNeo4jClient establishes connection to Neo4j server
 226  func (n *N) initNeo4jClient() error {
 227  	n.Logger.Infof("connecting to neo4j at %s (pool=%d, fetch=%d, txRetry=%v)",
 228  		n.neo4jURI, n.maxConnPoolSize, n.fetchSize, n.maxTxRetryTime)
 229  
 230  	// Create Neo4j driver with tuned configuration
 231  	driver, err := neo4j.NewDriverWithContext(
 232  		n.neo4jURI,
 233  		neo4j.BasicAuth(n.neo4jUser, n.neo4jPassword, ""),
 234  		func(config *neo4j.Config) {
 235  			// Limit connection pool size to reduce memory usage
 236  			config.MaxConnectionPoolSize = n.maxConnPoolSize
 237  
 238  			// Set fetch size to batch records and prevent memory overflow
 239  			// -1 means fetch all (driver default), positive value limits batch size
 240  			config.FetchSize = n.fetchSize
 241  
 242  			// Set max transaction retry time
 243  			config.MaxTransactionRetryTime = n.maxTxRetryTime
 244  		},
 245  	)
 246  	if err != nil {
 247  		return fmt.Errorf("failed to create neo4j driver: %w", err)
 248  	}
 249  
 250  	n.driver = driver
 251  
 252  	// Verify connectivity
 253  	ctx := context.Background()
 254  	if err := driver.VerifyConnectivity(ctx); err != nil {
 255  		return fmt.Errorf("failed to verify neo4j connectivity: %w", err)
 256  	}
 257  
 258  	n.Logger.Infof("successfully connected to neo4j")
 259  	return nil
 260  }
 261  
 262  
 263  // isRateLimitError checks if an error is due to authentication rate limiting
 264  func isRateLimitError(err error) bool {
 265  	if err == nil {
 266  		return false
 267  	}
 268  	errStr := err.Error()
 269  	return strings.Contains(errStr, "AuthenticationRateLimit") ||
 270  		strings.Contains(errStr, "Too many failed authentication attempts")
 271  }
 272  
 273  // acquireQuerySlot acquires a slot from the query semaphore
 274  func (n *N) acquireQuerySlot(ctx context.Context) error {
 275  	select {
 276  	case n.querySem <- struct{}{}:
 277  		return nil
 278  	case <-ctx.Done():
 279  		return ctx.Err()
 280  	}
 281  }
 282  
 283  // releaseQuerySlot releases a slot back to the query semaphore
 284  func (n *N) releaseQuerySlot() {
 285  	<-n.querySem
 286  }
 287  
 288  // ExecuteRead executes a read query against Neo4j with rate limiting and retry
 289  // Returns a collected result that can be iterated after the session closes
 290  func (n *N) ExecuteRead(ctx context.Context, cypher string, params map[string]any) (*CollectedResult, error) {
 291  	// Acquire semaphore slot to limit concurrent queries
 292  	if err := n.acquireQuerySlot(ctx); err != nil {
 293  		return nil, fmt.Errorf("failed to acquire query slot: %w", err)
 294  	}
 295  	defer n.releaseQuerySlot()
 296  
 297  	var lastErr error
 298  	for attempt := 0; attempt < maxRetryAttempts; attempt++ {
 299  		if attempt > 0 {
 300  			// Exponential backoff
 301  			delay := retryBaseDelay * time.Duration(1<<uint(attempt-1))
 302  			n.Logger.Warningf("retrying read query after %v (attempt %d/%d)", delay, attempt+1, maxRetryAttempts)
 303  			select {
 304  			case <-time.After(delay):
 305  			case <-ctx.Done():
 306  				return nil, ctx.Err()
 307  			}
 308  		}
 309  
 310  		session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
 311  		result, err := session.Run(ctx, cypher, params)
 312  		if err != nil {
 313  			session.Close(ctx)
 314  			lastErr = err
 315  			if isRateLimitError(err) {
 316  				continue // Retry on rate limit
 317  			}
 318  			return nil, fmt.Errorf("neo4j read query failed: %w", err)
 319  		}
 320  
 321  		// Collect all records before the session closes
 322  		// (Neo4j results are lazy and need an open session for iteration)
 323  		records, err := result.Collect(ctx)
 324  		session.Close(ctx)
 325  		if err != nil {
 326  			lastErr = err
 327  			if isRateLimitError(err) {
 328  				continue // Retry on rate limit
 329  			}
 330  			return nil, fmt.Errorf("neo4j result collect failed: %w", err)
 331  		}
 332  
 333  		return &CollectedResult{records: records, index: -1}, nil
 334  	}
 335  
 336  	return nil, fmt.Errorf("neo4j read query failed after %d attempts: %w", maxRetryAttempts, lastErr)
 337  }
 338  
 339  // ExecuteWrite executes a write query against Neo4j with rate limiting and retry
 340  func (n *N) ExecuteWrite(ctx context.Context, cypher string, params map[string]any) (neo4j.ResultWithContext, error) {
 341  	// Acquire semaphore slot to limit concurrent queries
 342  	if err := n.acquireQuerySlot(ctx); err != nil {
 343  		return nil, fmt.Errorf("failed to acquire query slot: %w", err)
 344  	}
 345  	defer n.releaseQuerySlot()
 346  
 347  	var lastErr error
 348  	for attempt := 0; attempt < maxRetryAttempts; attempt++ {
 349  		if attempt > 0 {
 350  			// Exponential backoff
 351  			delay := retryBaseDelay * time.Duration(1<<uint(attempt-1))
 352  			n.Logger.Warningf("retrying write query after %v (attempt %d/%d)", delay, attempt+1, maxRetryAttempts)
 353  			select {
 354  			case <-time.After(delay):
 355  			case <-ctx.Done():
 356  				return nil, ctx.Err()
 357  			}
 358  		}
 359  
 360  		session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
 361  		result, err := session.Run(ctx, cypher, params)
 362  		if err != nil {
 363  			session.Close(ctx)
 364  			lastErr = err
 365  			if isRateLimitError(err) {
 366  				continue // Retry on rate limit
 367  			}
 368  			return nil, fmt.Errorf("neo4j write query failed: %w", err)
 369  		}
 370  
 371  		// Consume the result to ensure the query completes before closing session
 372  		_, err = result.Consume(ctx)
 373  		session.Close(ctx)
 374  		if err != nil {
 375  			lastErr = err
 376  			if isRateLimitError(err) {
 377  				continue // Retry on rate limit
 378  			}
 379  			return nil, fmt.Errorf("neo4j write consume failed: %w", err)
 380  		}
 381  
 382  		return result, nil
 383  	}
 384  
 385  	return nil, fmt.Errorf("neo4j write query failed after %d attempts: %w", maxRetryAttempts, lastErr)
 386  }
 387  
 388  // ExecuteWriteTransaction executes a transactional write operation with rate limiting
 389  func (n *N) ExecuteWriteTransaction(ctx context.Context, work func(tx neo4j.ManagedTransaction) (any, error)) (any, error) {
 390  	// Acquire semaphore slot to limit concurrent queries
 391  	if err := n.acquireQuerySlot(ctx); err != nil {
 392  		return nil, fmt.Errorf("failed to acquire query slot: %w", err)
 393  	}
 394  	defer n.releaseQuerySlot()
 395  
 396  	session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
 397  	defer session.Close(ctx)
 398  
 399  	return session.ExecuteWrite(ctx, work)
 400  }
 401  
 402  // Path returns the data directory path
 403  func (n *N) Path() string { return n.dataDir }
 404  
 405  // Init initializes the database with a given path (no-op, path set in New)
 406  func (n *N) Init(path string) (err error) {
 407  	// Path already set in New()
 408  	return nil
 409  }
 410  
 411  // Sync flushes pending writes (Neo4j handles persistence automatically)
 412  func (n *N) Sync() (err error) {
 413  	return nil
 414  }
 415  
 416  // Close closes the database
 417  func (n *N) Close() (err error) {
 418  	n.cancel()
 419  	if n.driver != nil {
 420  		if e := n.driver.Close(context.Background()); e != nil {
 421  			err = e
 422  		}
 423  	}
 424  	return
 425  }
 426  
 427  // Wipe removes all data and re-applies schema
 428  func (n *N) Wipe() (err error) {
 429  	// Delete all nodes and relationships in Neo4j
 430  	ctx := context.Background()
 431  	_, err = n.ExecuteWrite(ctx, "MATCH (n) DETACH DELETE n", nil)
 432  	if err != nil {
 433  		return fmt.Errorf("failed to wipe neo4j database: %w", err)
 434  	}
 435  
 436  	// Re-apply schema (indexes and constraints were deleted with the data)
 437  	if err = n.applySchema(ctx); err != nil {
 438  		return fmt.Errorf("failed to re-apply schema after wipe: %w", err)
 439  	}
 440  
 441  	// Re-initialize serial counter (it was deleted with the Marker node)
 442  	if err = n.initSerialCounter(); err != nil {
 443  		return fmt.Errorf("failed to re-init serial counter after wipe: %w", err)
 444  	}
 445  
 446  	return nil
 447  }
 448  
 449  // SetLogLevel sets the logging level
 450  func (n *N) SetLogLevel(level string) {
 451  	// n.Logger.SetLevel(lol.GetLogLevel(level))
 452  }
 453  
 454  // EventIdsBySerial retrieves event IDs by serial range (stub)
 455  func (n *N) EventIdsBySerial(start uint64, count int) (
 456  	evs []uint64, err error,
 457  ) {
 458  	err = fmt.Errorf("not implemented")
 459  	return
 460  }
 461  
 462  // RunMigrations is implemented in migrations.go
 463  // It handles schema migrations like the Author -> NostrUser consolidation
 464  
 465  // Ready returns a channel that closes when the database is ready to serve requests.
 466  // This allows callers to wait for database warmup to complete.
 467  func (n *N) Ready() <-chan struct{} {
 468  	return n.ready
 469  }
 470  
 471  // warmup performs database warmup operations and closes the ready channel when complete.
 472  // For Neo4j, warmup ensures the connection is healthy and constraints are applied.
 473  func (n *N) warmup() {
 474  	defer close(n.ready)
 475  
 476  	// Neo4j connection and schema are already verified during initialization
 477  	// Just give a brief moment for any background processes to settle
 478  	n.Logger.Infof("neo4j database warmup complete, ready to serve requests")
 479  }
 480  
 481  // GetCachedJSON returns cached query results (not implemented for Neo4j)
 482  func (n *N) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false }
 483  
 484  // CacheMarshaledJSON caches marshaled JSON results (not implemented for Neo4j)
 485  func (n *N) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {}
 486  
 487  // GetCachedEvents retrieves cached events (not implemented for Neo4j)
 488  func (n *N) GetCachedEvents(f *filter.F) (event.S, bool) { return nil, false }
 489  
 490  // CacheEvents caches events (not implemented for Neo4j)
 491  func (n *N) CacheEvents(f *filter.F, events event.S) {}
 492  
 493  // InvalidateQueryCache invalidates the query cache (not implemented for Neo4j)
 494  func (n *N) InvalidateQueryCache() {}
 495  
 496  // Driver returns the Neo4j driver for use in rate limiting.
 497  func (n *N) Driver() neo4j.DriverWithContext {
 498  	return n.driver
 499  }
 500  
 501  // QuerySem returns the query semaphore for use in rate limiting.
 502  func (n *N) QuerySem() chan struct{} {
 503  	return n.querySem
 504  }
 505  
 506  // MaxConcurrentQueries returns the maximum concurrent query limit.
 507  func (n *N) MaxConcurrentQueries() int {
 508  	return cap(n.querySem)
 509  }
 510  
 511  // QueryResultLimit returns the configured maximum results per query.
 512  // Returns 0 if unlimited (no limit applied).
 513  func (n *N) QueryResultLimit() int {
 514  	return n.queryResultLimit
 515  }
 516  
 517  // FetchSize returns the configured fetch batch size.
 518  func (n *N) FetchSize() int {
 519  	return n.fetchSize
 520  }
 521  
 522  // MaxConnPoolSize returns the configured connection pool size.
 523  func (n *N) MaxConnPoolSize() int {
 524  	return n.maxConnPoolSize
 525  }
 526  
 527  // NRC (Nostr Relay Connect) stubs - not supported in Neo4j backend
 528  // NRC requires Badger for key-value storage of connection secrets
 529  
 530  var errNRCNotSupported = fmt.Errorf("NRC not supported in Neo4j backend")
 531  
 532  func (n *N) CreateNRCConnection(label string, createdBy []byte) (*database.NRCConnection, error) {
 533  	return nil, errNRCNotSupported
 534  }
 535  
 536  func (n *N) GetNRCConnection(id string) (*database.NRCConnection, error) {
 537  	return nil, errNRCNotSupported
 538  }
 539  
 540  func (n *N) GetNRCConnectionByDerivedPubkey(derivedPubkey []byte) (*database.NRCConnection, error) {
 541  	return nil, errNRCNotSupported
 542  }
 543  
 544  func (n *N) SaveNRCConnection(conn *database.NRCConnection) error {
 545  	return errNRCNotSupported
 546  }
 547  
 548  func (n *N) DeleteNRCConnection(id string) error {
 549  	return errNRCNotSupported
 550  }
 551  
 552  func (n *N) GetAllNRCConnections() ([]*database.NRCConnection, error) {
 553  	return nil, errNRCNotSupported
 554  }
 555  
 556  func (n *N) GetNRCAuthorizedSecrets() (map[string]string, error) {
 557  	return nil, errNRCNotSupported
 558  }
 559  
 560  func (n *N) UpdateNRCConnectionLastUsed(id string) error {
 561  	return errNRCNotSupported
 562  }
 563  
 564  func (n *N) GetNRCConnectionURI(conn *database.NRCConnection, relayPubkey []byte, rendezvousURL string) (string, error) {
 565  	return "", errNRCNotSupported
 566  }
 567