neo4j.go raw
1 // Package neo4j provides a Neo4j-based implementation of the database interface.
2 // Neo4j is a native graph database optimized for relationship-heavy queries,
3 // making it ideal for Nostr's social graph and event reference patterns.
4 package neo4j
5
6 import (
7 "context"
8 "fmt"
9 "os"
10 "path/filepath"
11 "strings"
12 "time"
13
14 "github.com/neo4j/neo4j-go-driver/v5/neo4j"
15 "next.orly.dev/pkg/lol"
16 "next.orly.dev/pkg/lol/chk"
17 "next.orly.dev/pkg/database"
18 "next.orly.dev/pkg/nostr/encoders/event"
19 "next.orly.dev/pkg/nostr/encoders/filter"
20 "next.orly.dev/pkg/utils/apputil"
21 )
22
23 // Default configuration values (used when config values are 0 or not set)
24 const (
25 defaultMaxConcurrentQueries = 10
26 defaultMaxConnPoolSize = 25
27 defaultFetchSize = 1000
28 defaultMaxTxRetrySeconds = 30
29 defaultQueryResultLimit = 10000
30 )
31
32 // maxRetryAttempts is the maximum number of times to retry a query on rate limit
33 const maxRetryAttempts = 3
34
35 // retryBaseDelay is the base delay for exponential backoff
36 const retryBaseDelay = 500 * time.Millisecond
37
38 // N implements the database.Database interface using Neo4j as the storage backend
39 type N struct {
40 ctx context.Context
41 cancel context.CancelFunc
42 dataDir string
43 Logger *logger
44
45 // Neo4j client connection
46 driver neo4j.DriverWithContext
47
48 // Configuration
49 neo4jURI string
50 neo4jUser string
51 neo4jPassword string
52
53 // Driver tuning options
54 maxConnPoolSize int // max connections in pool
55 fetchSize int // records per fetch batch
56 maxTxRetryTime time.Duration
57 queryResultLimit int // max results per query (0=unlimited)
58
59 ready chan struct{} // Closed when database is ready to serve requests
60
61 // querySem limits concurrent queries to prevent rate limiting
62 querySem chan struct{}
63 }
64
65 // Ensure N implements database.Database interface at compile time
66 var _ database.Database = (*N)(nil)
67
68 // CollectedResult wraps pre-fetched Neo4j records for iteration after session close
69 // This is necessary because Neo4j results are lazy and need an open session for iteration
70 type CollectedResult struct {
71 records []*neo4j.Record
72 index int
73 }
74
75 // Next advances to the next record, returning true if there is one
76 func (r *CollectedResult) Next(ctx context.Context) bool {
77 r.index++
78 return r.index < len(r.records)
79 }
80
81 // Record returns the current record
82 func (r *CollectedResult) Record() *neo4j.Record {
83 if r.index < 0 || r.index >= len(r.records) {
84 return nil
85 }
86 return r.records[r.index]
87 }
88
89 // Len returns the number of records
90 func (r *CollectedResult) Len() int {
91 return len(r.records)
92 }
93
94 // Err returns any error from iteration (always nil for pre-collected results)
95 // This method satisfies the resultiter.Neo4jResultIterator interface
96 func (r *CollectedResult) Err() error {
97 return nil
98 }
99
100 // init registers the neo4j database factory
101 func init() {
102 database.RegisterNeo4jFactory(func(
103 ctx context.Context,
104 cancel context.CancelFunc,
105 cfg *database.DatabaseConfig,
106 ) (database.Database, error) {
107 return NewWithConfig(ctx, cancel, cfg)
108 })
109 }
110
111 // NewWithConfig creates a new Neo4j-based database instance with full configuration.
112 // Configuration is passed from the centralized app config via DatabaseConfig.
113 func NewWithConfig(
114 ctx context.Context, cancel context.CancelFunc, cfg *database.DatabaseConfig,
115 ) (
116 n *N, err error,
117 ) {
118 // Apply defaults for empty values
119 neo4jURI := cfg.Neo4jURI
120 if neo4jURI == "" {
121 neo4jURI = "bolt://localhost:7687"
122 }
123 neo4jUser := cfg.Neo4jUser
124 if neo4jUser == "" {
125 neo4jUser = "neo4j"
126 }
127 neo4jPassword := cfg.Neo4jPassword
128 if neo4jPassword == "" {
129 neo4jPassword = "password"
130 }
131
132 // Apply defaults for driver tuning options
133 maxConnPoolSize := cfg.Neo4jMaxConnPoolSize
134 if maxConnPoolSize <= 0 {
135 maxConnPoolSize = defaultMaxConnPoolSize
136 }
137 fetchSize := cfg.Neo4jFetchSize
138 if fetchSize == 0 {
139 fetchSize = defaultFetchSize
140 }
141 maxTxRetrySeconds := cfg.Neo4jMaxTxRetrySeconds
142 if maxTxRetrySeconds <= 0 {
143 maxTxRetrySeconds = defaultMaxTxRetrySeconds
144 }
145 queryResultLimit := cfg.Neo4jQueryResultLimit
146 if queryResultLimit == 0 {
147 queryResultLimit = defaultQueryResultLimit
148 }
149
150 n = &N{
151 ctx: ctx,
152 cancel: cancel,
153 dataDir: cfg.DataDir,
154 Logger: NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir),
155 neo4jURI: neo4jURI,
156 neo4jUser: neo4jUser,
157 neo4jPassword: neo4jPassword,
158 maxConnPoolSize: maxConnPoolSize,
159 fetchSize: fetchSize,
160 maxTxRetryTime: time.Duration(maxTxRetrySeconds) * time.Second,
161 queryResultLimit: queryResultLimit,
162 ready: make(chan struct{}),
163 querySem: make(chan struct{}, defaultMaxConcurrentQueries),
164 }
165
166 // Ensure the data directory exists
167 if err = os.MkdirAll(cfg.DataDir, 0755); chk.E(err) {
168 return
169 }
170
171 // Ensure directory structure
172 dummyFile := filepath.Join(cfg.DataDir, "dummy.sst")
173 if err = apputil.EnsureDir(dummyFile); chk.E(err) {
174 return
175 }
176
177 // Initialize neo4j client connection
178 if err = n.initNeo4jClient(); chk.E(err) {
179 return
180 }
181
182 // Apply Nostr schema to neo4j (create constraints and indexes)
183 if err = n.applySchema(ctx); chk.E(err) {
184 return
185 }
186
187 // Run database migrations (e.g., Author -> NostrUser consolidation)
188 n.RunMigrations()
189
190 // Initialize serial counter
191 if err = n.initSerialCounter(); chk.E(err) {
192 return
193 }
194
195 // Start warmup goroutine to signal when database is ready
196 go n.warmup()
197
198 // Setup shutdown handler
199 go func() {
200 <-n.ctx.Done()
201 n.cancel()
202 if n.driver != nil {
203 n.driver.Close(context.Background())
204 }
205 }()
206
207 return
208 }
209
210 // New creates a new Neo4j-based database instance with default configuration.
211 // This is provided for backward compatibility with existing callers (tests, etc.).
212 // For full configuration control, use NewWithConfig instead.
213 func New(
214 ctx context.Context, cancel context.CancelFunc, dataDir, logLevel string,
215 ) (
216 n *N, err error,
217 ) {
218 cfg := &database.DatabaseConfig{
219 DataDir: dataDir,
220 LogLevel: logLevel,
221 }
222 return NewWithConfig(ctx, cancel, cfg)
223 }
224
225 // initNeo4jClient establishes connection to Neo4j server
226 func (n *N) initNeo4jClient() error {
227 n.Logger.Infof("connecting to neo4j at %s (pool=%d, fetch=%d, txRetry=%v)",
228 n.neo4jURI, n.maxConnPoolSize, n.fetchSize, n.maxTxRetryTime)
229
230 // Create Neo4j driver with tuned configuration
231 driver, err := neo4j.NewDriverWithContext(
232 n.neo4jURI,
233 neo4j.BasicAuth(n.neo4jUser, n.neo4jPassword, ""),
234 func(config *neo4j.Config) {
235 // Limit connection pool size to reduce memory usage
236 config.MaxConnectionPoolSize = n.maxConnPoolSize
237
238 // Set fetch size to batch records and prevent memory overflow
239 // -1 means fetch all (driver default), positive value limits batch size
240 config.FetchSize = n.fetchSize
241
242 // Set max transaction retry time
243 config.MaxTransactionRetryTime = n.maxTxRetryTime
244 },
245 )
246 if err != nil {
247 return fmt.Errorf("failed to create neo4j driver: %w", err)
248 }
249
250 n.driver = driver
251
252 // Verify connectivity
253 ctx := context.Background()
254 if err := driver.VerifyConnectivity(ctx); err != nil {
255 return fmt.Errorf("failed to verify neo4j connectivity: %w", err)
256 }
257
258 n.Logger.Infof("successfully connected to neo4j")
259 return nil
260 }
261
262
263 // isRateLimitError checks if an error is due to authentication rate limiting
264 func isRateLimitError(err error) bool {
265 if err == nil {
266 return false
267 }
268 errStr := err.Error()
269 return strings.Contains(errStr, "AuthenticationRateLimit") ||
270 strings.Contains(errStr, "Too many failed authentication attempts")
271 }
272
273 // acquireQuerySlot acquires a slot from the query semaphore
274 func (n *N) acquireQuerySlot(ctx context.Context) error {
275 select {
276 case n.querySem <- struct{}{}:
277 return nil
278 case <-ctx.Done():
279 return ctx.Err()
280 }
281 }
282
283 // releaseQuerySlot releases a slot back to the query semaphore
284 func (n *N) releaseQuerySlot() {
285 <-n.querySem
286 }
287
288 // ExecuteRead executes a read query against Neo4j with rate limiting and retry
289 // Returns a collected result that can be iterated after the session closes
290 func (n *N) ExecuteRead(ctx context.Context, cypher string, params map[string]any) (*CollectedResult, error) {
291 // Acquire semaphore slot to limit concurrent queries
292 if err := n.acquireQuerySlot(ctx); err != nil {
293 return nil, fmt.Errorf("failed to acquire query slot: %w", err)
294 }
295 defer n.releaseQuerySlot()
296
297 var lastErr error
298 for attempt := 0; attempt < maxRetryAttempts; attempt++ {
299 if attempt > 0 {
300 // Exponential backoff
301 delay := retryBaseDelay * time.Duration(1<<uint(attempt-1))
302 n.Logger.Warningf("retrying read query after %v (attempt %d/%d)", delay, attempt+1, maxRetryAttempts)
303 select {
304 case <-time.After(delay):
305 case <-ctx.Done():
306 return nil, ctx.Err()
307 }
308 }
309
310 session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
311 result, err := session.Run(ctx, cypher, params)
312 if err != nil {
313 session.Close(ctx)
314 lastErr = err
315 if isRateLimitError(err) {
316 continue // Retry on rate limit
317 }
318 return nil, fmt.Errorf("neo4j read query failed: %w", err)
319 }
320
321 // Collect all records before the session closes
322 // (Neo4j results are lazy and need an open session for iteration)
323 records, err := result.Collect(ctx)
324 session.Close(ctx)
325 if err != nil {
326 lastErr = err
327 if isRateLimitError(err) {
328 continue // Retry on rate limit
329 }
330 return nil, fmt.Errorf("neo4j result collect failed: %w", err)
331 }
332
333 return &CollectedResult{records: records, index: -1}, nil
334 }
335
336 return nil, fmt.Errorf("neo4j read query failed after %d attempts: %w", maxRetryAttempts, lastErr)
337 }
338
339 // ExecuteWrite executes a write query against Neo4j with rate limiting and retry
340 func (n *N) ExecuteWrite(ctx context.Context, cypher string, params map[string]any) (neo4j.ResultWithContext, error) {
341 // Acquire semaphore slot to limit concurrent queries
342 if err := n.acquireQuerySlot(ctx); err != nil {
343 return nil, fmt.Errorf("failed to acquire query slot: %w", err)
344 }
345 defer n.releaseQuerySlot()
346
347 var lastErr error
348 for attempt := 0; attempt < maxRetryAttempts; attempt++ {
349 if attempt > 0 {
350 // Exponential backoff
351 delay := retryBaseDelay * time.Duration(1<<uint(attempt-1))
352 n.Logger.Warningf("retrying write query after %v (attempt %d/%d)", delay, attempt+1, maxRetryAttempts)
353 select {
354 case <-time.After(delay):
355 case <-ctx.Done():
356 return nil, ctx.Err()
357 }
358 }
359
360 session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
361 result, err := session.Run(ctx, cypher, params)
362 if err != nil {
363 session.Close(ctx)
364 lastErr = err
365 if isRateLimitError(err) {
366 continue // Retry on rate limit
367 }
368 return nil, fmt.Errorf("neo4j write query failed: %w", err)
369 }
370
371 // Consume the result to ensure the query completes before closing session
372 _, err = result.Consume(ctx)
373 session.Close(ctx)
374 if err != nil {
375 lastErr = err
376 if isRateLimitError(err) {
377 continue // Retry on rate limit
378 }
379 return nil, fmt.Errorf("neo4j write consume failed: %w", err)
380 }
381
382 return result, nil
383 }
384
385 return nil, fmt.Errorf("neo4j write query failed after %d attempts: %w", maxRetryAttempts, lastErr)
386 }
387
388 // ExecuteWriteTransaction executes a transactional write operation with rate limiting
389 func (n *N) ExecuteWriteTransaction(ctx context.Context, work func(tx neo4j.ManagedTransaction) (any, error)) (any, error) {
390 // Acquire semaphore slot to limit concurrent queries
391 if err := n.acquireQuerySlot(ctx); err != nil {
392 return nil, fmt.Errorf("failed to acquire query slot: %w", err)
393 }
394 defer n.releaseQuerySlot()
395
396 session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
397 defer session.Close(ctx)
398
399 return session.ExecuteWrite(ctx, work)
400 }
401
402 // Path returns the data directory path
403 func (n *N) Path() string { return n.dataDir }
404
405 // Init initializes the database with a given path (no-op, path set in New)
406 func (n *N) Init(path string) (err error) {
407 // Path already set in New()
408 return nil
409 }
410
411 // Sync flushes pending writes (Neo4j handles persistence automatically)
412 func (n *N) Sync() (err error) {
413 return nil
414 }
415
416 // Close closes the database
417 func (n *N) Close() (err error) {
418 n.cancel()
419 if n.driver != nil {
420 if e := n.driver.Close(context.Background()); e != nil {
421 err = e
422 }
423 }
424 return
425 }
426
427 // Wipe removes all data and re-applies schema
428 func (n *N) Wipe() (err error) {
429 // Delete all nodes and relationships in Neo4j
430 ctx := context.Background()
431 _, err = n.ExecuteWrite(ctx, "MATCH (n) DETACH DELETE n", nil)
432 if err != nil {
433 return fmt.Errorf("failed to wipe neo4j database: %w", err)
434 }
435
436 // Re-apply schema (indexes and constraints were deleted with the data)
437 if err = n.applySchema(ctx); err != nil {
438 return fmt.Errorf("failed to re-apply schema after wipe: %w", err)
439 }
440
441 // Re-initialize serial counter (it was deleted with the Marker node)
442 if err = n.initSerialCounter(); err != nil {
443 return fmt.Errorf("failed to re-init serial counter after wipe: %w", err)
444 }
445
446 return nil
447 }
448
449 // SetLogLevel sets the logging level
450 func (n *N) SetLogLevel(level string) {
451 // n.Logger.SetLevel(lol.GetLogLevel(level))
452 }
453
454 // EventIdsBySerial retrieves event IDs by serial range (stub)
455 func (n *N) EventIdsBySerial(start uint64, count int) (
456 evs []uint64, err error,
457 ) {
458 err = fmt.Errorf("not implemented")
459 return
460 }
461
462 // RunMigrations is implemented in migrations.go
463 // It handles schema migrations like the Author -> NostrUser consolidation
464
465 // Ready returns a channel that closes when the database is ready to serve requests.
466 // This allows callers to wait for database warmup to complete.
467 func (n *N) Ready() <-chan struct{} {
468 return n.ready
469 }
470
471 // warmup performs database warmup operations and closes the ready channel when complete.
472 // For Neo4j, warmup ensures the connection is healthy and constraints are applied.
473 func (n *N) warmup() {
474 defer close(n.ready)
475
476 // Neo4j connection and schema are already verified during initialization
477 // Just give a brief moment for any background processes to settle
478 n.Logger.Infof("neo4j database warmup complete, ready to serve requests")
479 }
480
481 // GetCachedJSON returns cached query results (not implemented for Neo4j)
482 func (n *N) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false }
483
484 // CacheMarshaledJSON caches marshaled JSON results (not implemented for Neo4j)
485 func (n *N) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {}
486
487 // GetCachedEvents retrieves cached events (not implemented for Neo4j)
488 func (n *N) GetCachedEvents(f *filter.F) (event.S, bool) { return nil, false }
489
490 // CacheEvents caches events (not implemented for Neo4j)
491 func (n *N) CacheEvents(f *filter.F, events event.S) {}
492
493 // InvalidateQueryCache invalidates the query cache (not implemented for Neo4j)
494 func (n *N) InvalidateQueryCache() {}
495
496 // Driver returns the Neo4j driver for use in rate limiting.
497 func (n *N) Driver() neo4j.DriverWithContext {
498 return n.driver
499 }
500
501 // QuerySem returns the query semaphore for use in rate limiting.
502 func (n *N) QuerySem() chan struct{} {
503 return n.querySem
504 }
505
506 // MaxConcurrentQueries returns the maximum concurrent query limit.
507 func (n *N) MaxConcurrentQueries() int {
508 return cap(n.querySem)
509 }
510
511 // QueryResultLimit returns the configured maximum results per query.
512 // Returns 0 if unlimited (no limit applied).
513 func (n *N) QueryResultLimit() int {
514 return n.queryResultLimit
515 }
516
517 // FetchSize returns the configured fetch batch size.
518 func (n *N) FetchSize() int {
519 return n.fetchSize
520 }
521
522 // MaxConnPoolSize returns the configured connection pool size.
523 func (n *N) MaxConnPoolSize() int {
524 return n.maxConnPoolSize
525 }
526
527 // NRC (Nostr Relay Connect) stubs - not supported in Neo4j backend
528 // NRC requires Badger for key-value storage of connection secrets
529
530 var errNRCNotSupported = fmt.Errorf("NRC not supported in Neo4j backend")
531
532 func (n *N) CreateNRCConnection(label string, createdBy []byte) (*database.NRCConnection, error) {
533 return nil, errNRCNotSupported
534 }
535
536 func (n *N) GetNRCConnection(id string) (*database.NRCConnection, error) {
537 return nil, errNRCNotSupported
538 }
539
540 func (n *N) GetNRCConnectionByDerivedPubkey(derivedPubkey []byte) (*database.NRCConnection, error) {
541 return nil, errNRCNotSupported
542 }
543
544 func (n *N) SaveNRCConnection(conn *database.NRCConnection) error {
545 return errNRCNotSupported
546 }
547
548 func (n *N) DeleteNRCConnection(id string) error {
549 return errNRCNotSupported
550 }
551
552 func (n *N) GetAllNRCConnections() ([]*database.NRCConnection, error) {
553 return nil, errNRCNotSupported
554 }
555
556 func (n *N) GetNRCAuthorizedSecrets() (map[string]string, error) {
557 return nil, errNRCNotSupported
558 }
559
560 func (n *N) UpdateNRCConnectionLastUsed(id string) error {
561 return errNRCNotSupported
562 }
563
564 func (n *N) GetNRCConnectionURI(conn *database.NRCConnection, relayPubkey []byte, rendezvousURL string) (string, error) {
565 return "", errNRCNotSupported
566 }
567