concurrent_test.go raw

   1  package neo4j
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"sync"
   7  	"testing"
   8  	"time"
   9  
  10  	"github.com/neo4j/neo4j-go-driver/v5/neo4j"
  11  )
  12  
  13  // TestConcurrentDriverConnections verifies that three independent Neo4j driver
  14  // instances can execute Cypher queries concurrently without conflict.
  15  // This confirms that multiple processes in the same environment can share
  16  // the bolt://localhost:7687 endpoint.
  17  func TestConcurrentDriverConnections(t *testing.T) {
  18  	if testDB == nil {
  19  		t.Skip("Neo4j not available")
  20  	}
  21  
  22  	const numDrivers = 3
  23  
  24  	// Create independent drivers using the same connection URI as the test DB
  25  	drivers := make([]neo4j.DriverWithContext, numDrivers)
  26  	for i := range drivers {
  27  		driver, err := neo4j.NewDriverWithContext(
  28  			testDB.neo4jURI,
  29  			neo4j.BasicAuth(testDB.neo4jUser, testDB.neo4jPassword, ""),
  30  			func(config *neo4j.Config) {
  31  				config.MaxConnectionPoolSize = 5
  32  			},
  33  		)
  34  		if err != nil {
  35  			t.Fatalf("failed to create driver %d: %v", i, err)
  36  		}
  37  		defer driver.Close(context.Background())
  38  
  39  		if err := driver.VerifyConnectivity(context.Background()); err != nil {
  40  			t.Fatalf("driver %d failed connectivity check: %v", i, err)
  41  		}
  42  		drivers[i] = driver
  43  	}
  44  
  45  	// Run concurrent read and write queries across all three drivers
  46  	var wg sync.WaitGroup
  47  	errs := make(chan error, numDrivers)
  48  
  49  	for i, driver := range drivers {
  50  		wg.Add(1)
  51  		go func(id int, d neo4j.DriverWithContext) {
  52  			defer wg.Done()
  53  
  54  			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  55  			defer cancel()
  56  
  57  			// Write: create a uniquely-labeled node
  58  			label := fmt.Sprintf("ConcurrentTest_%d", id)
  59  			writeSession := d.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
  60  			_, err := writeSession.Run(ctx,
  61  				fmt.Sprintf("CREATE (n:%s {driver: $id, ts: timestamp()}) RETURN n", label),
  62  				map[string]any{"id": id},
  63  			)
  64  			writeSession.Close(ctx)
  65  			if err != nil {
  66  				errs <- fmt.Errorf("driver %d write failed: %w", id, err)
  67  				return
  68  			}
  69  
  70  			// Read: count nodes with our label
  71  			readSession := d.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
  72  			result, err := readSession.Run(ctx,
  73  				fmt.Sprintf("MATCH (n:%s) RETURN count(n) AS cnt", label),
  74  				nil,
  75  			)
  76  			if err != nil {
  77  				readSession.Close(ctx)
  78  				errs <- fmt.Errorf("driver %d read failed: %w", id, err)
  79  				return
  80  			}
  81  
  82  			if !result.Next(ctx) {
  83  				readSession.Close(ctx)
  84  				errs <- fmt.Errorf("driver %d read returned no results", id)
  85  				return
  86  			}
  87  			cnt, ok := result.Record().Values[0].(int64)
  88  			readSession.Close(ctx)
  89  			if !ok || cnt < 1 {
  90  				errs <- fmt.Errorf("driver %d expected count >= 1, got %v", id, cnt)
  91  				return
  92  			}
  93  
  94  			// Cleanup
  95  			cleanupSession := d.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
  96  			_, _ = cleanupSession.Run(ctx,
  97  				fmt.Sprintf("MATCH (n:%s) DETACH DELETE n", label),
  98  				nil,
  99  			)
 100  			cleanupSession.Close(ctx)
 101  		}(i, driver)
 102  	}
 103  
 104  	wg.Wait()
 105  	close(errs)
 106  
 107  	for err := range errs {
 108  		t.Error(err)
 109  	}
 110  }
 111