serial.go raw

   1  package neo4j
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"sync"
   7  )
   8  
   9  // Serial number management
  10  // We use a special Marker node in Neo4j to track the next available serial number
  11  
  12  const serialCounterKey = "serial_counter"
  13  
  14  var (
  15  	serialMutex sync.Mutex
  16  )
  17  
  18  // getNextSerial atomically increments and returns the next serial number
  19  func (n *N) getNextSerial() (uint64, error) {
  20  	serialMutex.Lock()
  21  	defer serialMutex.Unlock()
  22  
  23  	ctx := context.Background()
  24  
  25  	// Query current serial value
  26  	cypher := "MATCH (m:Marker {key: $key}) RETURN m.value AS value"
  27  	params := map[string]any{"key": serialCounterKey}
  28  
  29  	result, err := n.ExecuteRead(ctx, cypher, params)
  30  	if err != nil {
  31  		return 0, fmt.Errorf("failed to query serial counter: %w", err)
  32  	}
  33  
  34  	var currentSerial uint64 = 1
  35  	if result.Next(ctx) {
  36  		record := result.Record()
  37  		if record != nil {
  38  			valueRaw, found := record.Get("value")
  39  			if found {
  40  				if value, ok := valueRaw.(int64); ok {
  41  					currentSerial = uint64(value)
  42  				}
  43  			}
  44  		}
  45  	}
  46  
  47  	// Increment serial
  48  	nextSerial := currentSerial + 1
  49  
  50  	// Update counter
  51  	updateCypher := `
  52  MERGE (m:Marker {key: $key})
  53  SET m.value = $value`
  54  	updateParams := map[string]any{
  55  		"key":   serialCounterKey,
  56  		"value": int64(nextSerial),
  57  	}
  58  
  59  	_, err = n.ExecuteWrite(ctx, updateCypher, updateParams)
  60  	if err != nil {
  61  		return 0, fmt.Errorf("failed to update serial counter: %w", err)
  62  	}
  63  
  64  	return currentSerial, nil
  65  }
  66  
  67  // initSerialCounter initializes the serial counter if it doesn't exist
  68  // Uses MERGE to be idempotent - safe to call multiple times
  69  func (n *N) initSerialCounter() error {
  70  	ctx := context.Background()
  71  
  72  	// Use MERGE with ON CREATE to initialize only if it doesn't exist
  73  	// This is idempotent and avoids race conditions
  74  	initCypher := `
  75  MERGE (m:Marker {key: $key})
  76  ON CREATE SET m.value = $value`
  77  	initParams := map[string]any{
  78  		"key":   serialCounterKey,
  79  		"value": int64(1),
  80  	}
  81  
  82  	_, err := n.ExecuteWrite(ctx, initCypher, initParams)
  83  	if err != nil {
  84  		return fmt.Errorf("failed to initialize serial counter: %w", err)
  85  	}
  86  
  87  	n.Logger.Debugf("serial counter initialized/verified")
  88  	return nil
  89  }
  90