concurrent_test.go raw
1 package neo4j
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "testing"
8 "time"
9
10 "github.com/neo4j/neo4j-go-driver/v5/neo4j"
11 )
12
13 // TestConcurrentDriverConnections verifies that three independent Neo4j driver
14 // instances can execute Cypher queries concurrently without conflict.
15 // This confirms that multiple processes in the same environment can share
16 // the bolt://localhost:7687 endpoint.
17 func TestConcurrentDriverConnections(t *testing.T) {
18 if testDB == nil {
19 t.Skip("Neo4j not available")
20 }
21
22 const numDrivers = 3
23
24 // Create independent drivers using the same connection URI as the test DB
25 drivers := make([]neo4j.DriverWithContext, numDrivers)
26 for i := range drivers {
27 driver, err := neo4j.NewDriverWithContext(
28 testDB.neo4jURI,
29 neo4j.BasicAuth(testDB.neo4jUser, testDB.neo4jPassword, ""),
30 func(config *neo4j.Config) {
31 config.MaxConnectionPoolSize = 5
32 },
33 )
34 if err != nil {
35 t.Fatalf("failed to create driver %d: %v", i, err)
36 }
37 defer driver.Close(context.Background())
38
39 if err := driver.VerifyConnectivity(context.Background()); err != nil {
40 t.Fatalf("driver %d failed connectivity check: %v", i, err)
41 }
42 drivers[i] = driver
43 }
44
45 // Run concurrent read and write queries across all three drivers
46 var wg sync.WaitGroup
47 errs := make(chan error, numDrivers)
48
49 for i, driver := range drivers {
50 wg.Add(1)
51 go func(id int, d neo4j.DriverWithContext) {
52 defer wg.Done()
53
54 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
55 defer cancel()
56
57 // Write: create a uniquely-labeled node
58 label := fmt.Sprintf("ConcurrentTest_%d", id)
59 writeSession := d.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
60 _, err := writeSession.Run(ctx,
61 fmt.Sprintf("CREATE (n:%s {driver: $id, ts: timestamp()}) RETURN n", label),
62 map[string]any{"id": id},
63 )
64 writeSession.Close(ctx)
65 if err != nil {
66 errs <- fmt.Errorf("driver %d write failed: %w", id, err)
67 return
68 }
69
70 // Read: count nodes with our label
71 readSession := d.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
72 result, err := readSession.Run(ctx,
73 fmt.Sprintf("MATCH (n:%s) RETURN count(n) AS cnt", label),
74 nil,
75 )
76 if err != nil {
77 readSession.Close(ctx)
78 errs <- fmt.Errorf("driver %d read failed: %w", id, err)
79 return
80 }
81
82 if !result.Next(ctx) {
83 readSession.Close(ctx)
84 errs <- fmt.Errorf("driver %d read returned no results", id)
85 return
86 }
87 cnt, ok := result.Record().Values[0].(int64)
88 readSession.Close(ctx)
89 if !ok || cnt < 1 {
90 errs <- fmt.Errorf("driver %d expected count >= 1, got %v", id, cnt)
91 return
92 }
93
94 // Cleanup
95 cleanupSession := d.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
96 _, _ = cleanupSession.Run(ctx,
97 fmt.Sprintf("MATCH (n:%s) DETACH DELETE n", label),
98 nil,
99 )
100 cleanupSession.Close(ctx)
101 }(i, driver)
102 }
103
104 wg.Wait()
105 close(errs)
106
107 for err := range errs {
108 t.Error(err)
109 }
110 }
111