ratelimit.go raw

   1  package graph
   2  
   3  import (
   4  	"context"
   5  	"sync"
   6  	"time"
   7  )
   8  
   9  // RateLimiter implements a token bucket rate limiter with adaptive throttling
  10  // based on graph query complexity. It allows cooperative scheduling by inserting
  11  // pauses between operations to allow other work to proceed.
  12  type RateLimiter struct {
  13  	mu sync.Mutex
  14  
  15  	// Token bucket parameters
  16  	tokens        float64   // Current available tokens
  17  	maxTokens     float64   // Maximum token capacity
  18  	refillRate    float64   // Tokens per second to add
  19  	lastRefill    time.Time // Last time tokens were refilled
  20  
  21  	// Throttling parameters
  22  	baseDelay     time.Duration // Minimum delay between operations
  23  	maxDelay      time.Duration // Maximum delay for complex queries
  24  	depthFactor   float64       // Multiplier per depth level
  25  	limitFactor   float64       // Multiplier based on result limit
  26  }
  27  
  28  // RateLimiterConfig configures the rate limiter behavior.
  29  type RateLimiterConfig struct {
  30  	// MaxTokens is the maximum number of tokens in the bucket (default: 100)
  31  	MaxTokens float64
  32  
  33  	// RefillRate is tokens added per second (default: 10)
  34  	RefillRate float64
  35  
  36  	// BaseDelay is the minimum delay between operations (default: 1ms)
  37  	BaseDelay time.Duration
  38  
  39  	// MaxDelay is the maximum delay for complex queries (default: 100ms)
  40  	MaxDelay time.Duration
  41  
  42  	// DepthFactor is the cost multiplier per depth level (default: 2.0)
  43  	// A depth-3 query costs 2^3 = 8x more tokens than depth-1
  44  	DepthFactor float64
  45  
  46  	// LimitFactor is additional cost per 100 results requested (default: 0.1)
  47  	LimitFactor float64
  48  }
  49  
  50  // DefaultRateLimiterConfig returns sensible defaults for the rate limiter.
  51  func DefaultRateLimiterConfig() RateLimiterConfig {
  52  	return RateLimiterConfig{
  53  		MaxTokens:   100.0,
  54  		RefillRate:  10.0, // Refills fully in 10 seconds
  55  		BaseDelay:   1 * time.Millisecond,
  56  		MaxDelay:    100 * time.Millisecond,
  57  		DepthFactor: 2.0,
  58  		LimitFactor: 0.1,
  59  	}
  60  }
  61  
  62  // NewRateLimiter creates a new rate limiter with the given configuration.
  63  func NewRateLimiter(cfg RateLimiterConfig) *RateLimiter {
  64  	if cfg.MaxTokens <= 0 {
  65  		cfg.MaxTokens = DefaultRateLimiterConfig().MaxTokens
  66  	}
  67  	if cfg.RefillRate <= 0 {
  68  		cfg.RefillRate = DefaultRateLimiterConfig().RefillRate
  69  	}
  70  	if cfg.BaseDelay <= 0 {
  71  		cfg.BaseDelay = DefaultRateLimiterConfig().BaseDelay
  72  	}
  73  	if cfg.MaxDelay <= 0 {
  74  		cfg.MaxDelay = DefaultRateLimiterConfig().MaxDelay
  75  	}
  76  	if cfg.DepthFactor <= 0 {
  77  		cfg.DepthFactor = DefaultRateLimiterConfig().DepthFactor
  78  	}
  79  	if cfg.LimitFactor <= 0 {
  80  		cfg.LimitFactor = DefaultRateLimiterConfig().LimitFactor
  81  	}
  82  
  83  	return &RateLimiter{
  84  		tokens:      cfg.MaxTokens,
  85  		maxTokens:   cfg.MaxTokens,
  86  		refillRate:  cfg.RefillRate,
  87  		lastRefill:  time.Now(),
  88  		baseDelay:   cfg.BaseDelay,
  89  		maxDelay:    cfg.MaxDelay,
  90  		depthFactor: cfg.DepthFactor,
  91  		limitFactor: cfg.LimitFactor,
  92  	}
  93  }
  94  
  95  // QueryCost calculates the token cost for a graph query based on its complexity.
  96  // Higher depths and larger limits cost exponentially more tokens.
  97  func (rl *RateLimiter) QueryCost(q *Query) float64 {
  98  	if q == nil {
  99  		return 1.0
 100  	}
 101  
 102  	// Base cost is exponential in depth: depthFactor^depth
 103  	// This models the exponential growth of traversal work
 104  	cost := 1.0
 105  	for i := 0; i < q.Depth; i++ {
 106  		cost *= rl.depthFactor
 107  	}
 108  
 109  	// Add cost for bidirectional queries (traversing both directions)
 110  	if q.IsBidirectional() {
 111  		cost *= 1.5
 112  	}
 113  
 114  	return cost
 115  }
 116  
 117  // OperationCost calculates the token cost for a single traversal operation.
 118  // This is used during query execution for per-operation throttling.
 119  func (rl *RateLimiter) OperationCost(depth int, nodesAtDepth int) float64 {
 120  	// Cost increases with depth and number of nodes to process
 121  	depthMultiplier := 1.0
 122  	for i := 0; i < depth; i++ {
 123  		depthMultiplier *= rl.depthFactor
 124  	}
 125  
 126  	// More nodes at this depth = more work
 127  	nodeFactor := 1.0 + float64(nodesAtDepth)*0.01
 128  
 129  	return depthMultiplier * nodeFactor
 130  }
 131  
 132  // refillTokens adds tokens based on elapsed time since last refill.
 133  func (rl *RateLimiter) refillTokens() {
 134  	now := time.Now()
 135  	elapsed := now.Sub(rl.lastRefill).Seconds()
 136  	rl.lastRefill = now
 137  
 138  	rl.tokens += elapsed * rl.refillRate
 139  	if rl.tokens > rl.maxTokens {
 140  		rl.tokens = rl.maxTokens
 141  	}
 142  }
 143  
 144  // Acquire tries to acquire tokens for a query. If not enough tokens are available,
 145  // it waits until they become available or the context is cancelled.
 146  // Returns the delay that was applied, or an error if context was cancelled.
 147  func (rl *RateLimiter) Acquire(ctx context.Context, cost float64) (time.Duration, error) {
 148  	rl.mu.Lock()
 149  	defer rl.mu.Unlock()
 150  
 151  	rl.refillTokens()
 152  
 153  	var totalDelay time.Duration
 154  
 155  	// Wait until we have enough tokens
 156  	for rl.tokens < cost {
 157  		// Calculate how long we need to wait for tokens to refill
 158  		tokensNeeded := cost - rl.tokens
 159  		waitTime := time.Duration(tokensNeeded/rl.refillRate*1000) * time.Millisecond
 160  
 161  		// Clamp to max delay
 162  		if waitTime > rl.maxDelay {
 163  			waitTime = rl.maxDelay
 164  		}
 165  		if waitTime < rl.baseDelay {
 166  			waitTime = rl.baseDelay
 167  		}
 168  
 169  		// Release lock while waiting
 170  		rl.mu.Unlock()
 171  
 172  		select {
 173  		case <-ctx.Done():
 174  			rl.mu.Lock()
 175  			return totalDelay, ctx.Err()
 176  		case <-time.After(waitTime):
 177  		}
 178  
 179  		totalDelay += waitTime
 180  		rl.mu.Lock()
 181  		rl.refillTokens()
 182  	}
 183  
 184  	// Consume tokens
 185  	rl.tokens -= cost
 186  	return totalDelay, nil
 187  }
 188  
 189  // TryAcquire attempts to acquire tokens without waiting.
 190  // Returns true if successful, false if insufficient tokens.
 191  func (rl *RateLimiter) TryAcquire(cost float64) bool {
 192  	rl.mu.Lock()
 193  	defer rl.mu.Unlock()
 194  
 195  	rl.refillTokens()
 196  
 197  	if rl.tokens >= cost {
 198  		rl.tokens -= cost
 199  		return true
 200  	}
 201  	return false
 202  }
 203  
 204  // Pause inserts a cooperative delay to allow other work to proceed.
 205  // The delay is proportional to the current depth and load.
 206  // This should be called periodically during long-running traversals.
 207  func (rl *RateLimiter) Pause(ctx context.Context, depth int, itemsProcessed int) error {
 208  	// Calculate adaptive delay based on depth and progress
 209  	// Deeper traversals and more processed items = longer pauses
 210  	delay := rl.baseDelay
 211  
 212  	// Increase delay with depth
 213  	for i := 0; i < depth; i++ {
 214  		delay += rl.baseDelay
 215  	}
 216  
 217  	// Add extra delay every N items to allow other work
 218  	if itemsProcessed > 0 && itemsProcessed%100 == 0 {
 219  		delay += rl.baseDelay * 5
 220  	}
 221  
 222  	// Cap at max delay
 223  	if delay > rl.maxDelay {
 224  		delay = rl.maxDelay
 225  	}
 226  
 227  	select {
 228  	case <-ctx.Done():
 229  		return ctx.Err()
 230  	case <-time.After(delay):
 231  		return nil
 232  	}
 233  }
 234  
 235  // AvailableTokens returns the current number of available tokens.
 236  func (rl *RateLimiter) AvailableTokens() float64 {
 237  	rl.mu.Lock()
 238  	defer rl.mu.Unlock()
 239  	rl.refillTokens()
 240  	return rl.tokens
 241  }
 242  
 243  // Throttler provides a simple interface for cooperative scheduling during traversal.
 244  // It wraps the rate limiter and provides depth-aware throttling.
 245  type Throttler struct {
 246  	rl            *RateLimiter
 247  	depth         int
 248  	itemsProcessed int
 249  }
 250  
 251  // NewThrottler creates a throttler for a specific traversal operation.
 252  func NewThrottler(rl *RateLimiter, depth int) *Throttler {
 253  	return &Throttler{
 254  		rl:    rl,
 255  		depth: depth,
 256  	}
 257  }
 258  
 259  // Tick should be called after processing each item.
 260  // It tracks progress and inserts pauses as needed.
 261  func (t *Throttler) Tick(ctx context.Context) error {
 262  	t.itemsProcessed++
 263  
 264  	// Insert cooperative pause periodically
 265  	// More frequent pauses at higher depths
 266  	interval := 50
 267  	if t.depth >= 2 {
 268  		interval = 25
 269  	}
 270  	if t.depth >= 4 {
 271  		interval = 10
 272  	}
 273  
 274  	if t.itemsProcessed%interval == 0 {
 275  		return t.rl.Pause(ctx, t.depth, t.itemsProcessed)
 276  	}
 277  	return nil
 278  }
 279  
 280  // Complete marks the throttler as complete and returns stats.
 281  func (t *Throttler) Complete() (itemsProcessed int) {
 282  	return t.itemsProcessed
 283  }
 284