ratelimit.go raw
1 package blossom
2
3 import (
4 "sync"
5 "time"
6 )
7
8 // BandwidthState tracks upload bandwidth for an identity
9 type BandwidthState struct {
10 BucketBytes int64 // Current token bucket level (bytes available)
11 LastUpdate time.Time // Last time bucket was updated
12 }
13
14 // BandwidthLimiter implements token bucket rate limiting for uploads.
15 // Each identity gets a bucket that replenishes at dailyLimit/day rate.
16 // Uploads consume tokens from the bucket.
17 type BandwidthLimiter struct {
18 mu sync.Mutex
19 states map[string]*BandwidthState // keyed by pubkey hex or IP
20 dailyLimit int64 // bytes per day
21 burstLimit int64 // max bucket size (burst capacity)
22 refillRate float64 // bytes per second refill rate
23 }
24
25 // NewBandwidthLimiter creates a new bandwidth limiter.
26 // dailyLimitMB is the average daily limit in megabytes.
27 // burstLimitMB is the maximum burst capacity in megabytes.
28 func NewBandwidthLimiter(dailyLimitMB, burstLimitMB int64) *BandwidthLimiter {
29 dailyBytes := dailyLimitMB * 1024 * 1024
30 burstBytes := burstLimitMB * 1024 * 1024
31
32 return &BandwidthLimiter{
33 states: make(map[string]*BandwidthState),
34 dailyLimit: dailyBytes,
35 burstLimit: burstBytes,
36 refillRate: float64(dailyBytes) / 86400.0, // bytes per second
37 }
38 }
39
40 // CheckAndConsume checks if an upload of the given size is allowed for the identity,
41 // and if so, consumes the tokens. Returns true if allowed, false if rate limited.
42 // The identity should be pubkey hex for authenticated users, or IP for anonymous.
43 func (bl *BandwidthLimiter) CheckAndConsume(identity string, sizeBytes int64) bool {
44 bl.mu.Lock()
45 defer bl.mu.Unlock()
46
47 now := time.Now()
48 state, exists := bl.states[identity]
49
50 if !exists {
51 // New identity starts with full burst capacity
52 state = &BandwidthState{
53 BucketBytes: bl.burstLimit,
54 LastUpdate: now,
55 }
56 bl.states[identity] = state
57 } else {
58 // Refill bucket based on elapsed time
59 elapsed := now.Sub(state.LastUpdate).Seconds()
60 refill := int64(elapsed * bl.refillRate)
61 state.BucketBytes += refill
62 if state.BucketBytes > bl.burstLimit {
63 state.BucketBytes = bl.burstLimit
64 }
65 state.LastUpdate = now
66 }
67
68 // Check if upload fits in bucket
69 if state.BucketBytes >= sizeBytes {
70 state.BucketBytes -= sizeBytes
71 return true
72 }
73
74 return false
75 }
76
77 // GetAvailable returns the currently available bytes for an identity.
78 func (bl *BandwidthLimiter) GetAvailable(identity string) int64 {
79 bl.mu.Lock()
80 defer bl.mu.Unlock()
81
82 state, exists := bl.states[identity]
83 if !exists {
84 return bl.burstLimit // New users have full capacity
85 }
86
87 // Calculate current level with refill
88 now := time.Now()
89 elapsed := now.Sub(state.LastUpdate).Seconds()
90 refill := int64(elapsed * bl.refillRate)
91 available := state.BucketBytes + refill
92 if available > bl.burstLimit {
93 available = bl.burstLimit
94 }
95
96 return available
97 }
98
99 // GetTimeUntilAvailable returns how long until the given bytes will be available.
100 func (bl *BandwidthLimiter) GetTimeUntilAvailable(identity string, sizeBytes int64) time.Duration {
101 available := bl.GetAvailable(identity)
102 if available >= sizeBytes {
103 return 0
104 }
105
106 needed := sizeBytes - available
107 seconds := float64(needed) / bl.refillRate
108 return time.Duration(seconds * float64(time.Second))
109 }
110
111 // Cleanup removes entries that have fully replenished (at burst limit).
112 func (bl *BandwidthLimiter) Cleanup() {
113 bl.mu.Lock()
114 defer bl.mu.Unlock()
115
116 now := time.Now()
117 for key, state := range bl.states {
118 elapsed := now.Sub(state.LastUpdate).Seconds()
119 refill := int64(elapsed * bl.refillRate)
120 if state.BucketBytes+refill >= bl.burstLimit {
121 delete(bl.states, key)
122 }
123 }
124 }
125
126 // Stats returns the number of tracked identities.
127 func (bl *BandwidthLimiter) Stats() int {
128 bl.mu.Lock()
129 defer bl.mu.Unlock()
130 return len(bl.states)
131 }
132