ratelimit.go raw

   1  package blossom
   2  
   3  import (
   4  	"sync"
   5  	"time"
   6  )
   7  
   8  // BandwidthState tracks upload bandwidth for an identity
   9  type BandwidthState struct {
  10  	BucketBytes   int64     // Current token bucket level (bytes available)
  11  	LastUpdate    time.Time // Last time bucket was updated
  12  }
  13  
  14  // BandwidthLimiter implements token bucket rate limiting for uploads.
  15  // Each identity gets a bucket that replenishes at dailyLimit/day rate.
  16  // Uploads consume tokens from the bucket.
  17  type BandwidthLimiter struct {
  18  	mu           sync.Mutex
  19  	states       map[string]*BandwidthState // keyed by pubkey hex or IP
  20  	dailyLimit   int64                      // bytes per day
  21  	burstLimit   int64                      // max bucket size (burst capacity)
  22  	refillRate   float64                    // bytes per second refill rate
  23  }
  24  
  25  // NewBandwidthLimiter creates a new bandwidth limiter.
  26  // dailyLimitMB is the average daily limit in megabytes.
  27  // burstLimitMB is the maximum burst capacity in megabytes.
  28  func NewBandwidthLimiter(dailyLimitMB, burstLimitMB int64) *BandwidthLimiter {
  29  	dailyBytes := dailyLimitMB * 1024 * 1024
  30  	burstBytes := burstLimitMB * 1024 * 1024
  31  
  32  	return &BandwidthLimiter{
  33  		states:     make(map[string]*BandwidthState),
  34  		dailyLimit: dailyBytes,
  35  		burstLimit: burstBytes,
  36  		refillRate: float64(dailyBytes) / 86400.0, // bytes per second
  37  	}
  38  }
  39  
  40  // CheckAndConsume checks if an upload of the given size is allowed for the identity,
  41  // and if so, consumes the tokens. Returns true if allowed, false if rate limited.
  42  // The identity should be pubkey hex for authenticated users, or IP for anonymous.
  43  func (bl *BandwidthLimiter) CheckAndConsume(identity string, sizeBytes int64) bool {
  44  	bl.mu.Lock()
  45  	defer bl.mu.Unlock()
  46  
  47  	now := time.Now()
  48  	state, exists := bl.states[identity]
  49  
  50  	if !exists {
  51  		// New identity starts with full burst capacity
  52  		state = &BandwidthState{
  53  			BucketBytes: bl.burstLimit,
  54  			LastUpdate:  now,
  55  		}
  56  		bl.states[identity] = state
  57  	} else {
  58  		// Refill bucket based on elapsed time
  59  		elapsed := now.Sub(state.LastUpdate).Seconds()
  60  		refill := int64(elapsed * bl.refillRate)
  61  		state.BucketBytes += refill
  62  		if state.BucketBytes > bl.burstLimit {
  63  			state.BucketBytes = bl.burstLimit
  64  		}
  65  		state.LastUpdate = now
  66  	}
  67  
  68  	// Check if upload fits in bucket
  69  	if state.BucketBytes >= sizeBytes {
  70  		state.BucketBytes -= sizeBytes
  71  		return true
  72  	}
  73  
  74  	return false
  75  }
  76  
  77  // GetAvailable returns the currently available bytes for an identity.
  78  func (bl *BandwidthLimiter) GetAvailable(identity string) int64 {
  79  	bl.mu.Lock()
  80  	defer bl.mu.Unlock()
  81  
  82  	state, exists := bl.states[identity]
  83  	if !exists {
  84  		return bl.burstLimit // New users have full capacity
  85  	}
  86  
  87  	// Calculate current level with refill
  88  	now := time.Now()
  89  	elapsed := now.Sub(state.LastUpdate).Seconds()
  90  	refill := int64(elapsed * bl.refillRate)
  91  	available := state.BucketBytes + refill
  92  	if available > bl.burstLimit {
  93  		available = bl.burstLimit
  94  	}
  95  
  96  	return available
  97  }
  98  
  99  // GetTimeUntilAvailable returns how long until the given bytes will be available.
 100  func (bl *BandwidthLimiter) GetTimeUntilAvailable(identity string, sizeBytes int64) time.Duration {
 101  	available := bl.GetAvailable(identity)
 102  	if available >= sizeBytes {
 103  		return 0
 104  	}
 105  
 106  	needed := sizeBytes - available
 107  	seconds := float64(needed) / bl.refillRate
 108  	return time.Duration(seconds * float64(time.Second))
 109  }
 110  
 111  // Cleanup removes entries that have fully replenished (at burst limit).
 112  func (bl *BandwidthLimiter) Cleanup() {
 113  	bl.mu.Lock()
 114  	defer bl.mu.Unlock()
 115  
 116  	now := time.Now()
 117  	for key, state := range bl.states {
 118  		elapsed := now.Sub(state.LastUpdate).Seconds()
 119  		refill := int64(elapsed * bl.refillRate)
 120  		if state.BucketBytes+refill >= bl.burstLimit {
 121  			delete(bl.states, key)
 122  		}
 123  	}
 124  }
 125  
 126  // Stats returns the number of tracked identities.
 127  func (bl *BandwidthLimiter) Stats() int {
 128  	bl.mu.Lock()
 129  	defer bl.mu.Unlock()
 130  	return len(bl.states)
 131  }
 132