ttl.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package ristretto
   7  
   8  import (
   9  	"sync"
  10  	"time"
  11  )
  12  
  13  var (
  14  	// TODO: find the optimal value or make it configurable.
  15  	bucketDurationSecs = int64(5)
  16  )
  17  
  18  func storageBucket(t time.Time) int64 {
  19  	return (t.Unix() / bucketDurationSecs) + 1
  20  }
  21  
  22  func cleanupBucket(t time.Time) int64 {
  23  	// The bucket to cleanup is always behind the storage bucket by one so that
  24  	// no elements in that bucket (which might not have expired yet) are deleted.
  25  	return storageBucket(t) - 1
  26  }
  27  
  28  // bucket type is a map of key to conflict.
  29  type bucket map[uint64]uint64
  30  
  31  // expirationMap is a map of bucket number to the corresponding bucket.
  32  type expirationMap[V any] struct {
  33  	sync.RWMutex
  34  	buckets              map[int64]bucket
  35  	lastCleanedBucketNum int64
  36  }
  37  
  38  func newExpirationMap[V any]() *expirationMap[V] {
  39  	return &expirationMap[V]{
  40  		buckets:              make(map[int64]bucket),
  41  		lastCleanedBucketNum: cleanupBucket(time.Now()),
  42  	}
  43  }
  44  
  45  func (m *expirationMap[_]) add(key, conflict uint64, expiration time.Time) {
  46  	if m == nil {
  47  		return
  48  	}
  49  
  50  	// Items that don't expire don't need to be in the expiration map.
  51  	if expiration.IsZero() {
  52  		return
  53  	}
  54  
  55  	bucketNum := storageBucket(expiration)
  56  	m.Lock()
  57  	defer m.Unlock()
  58  
  59  	b, ok := m.buckets[bucketNum]
  60  	if !ok {
  61  		b = make(bucket)
  62  		m.buckets[bucketNum] = b
  63  	}
  64  	b[key] = conflict
  65  }
  66  
  67  func (m *expirationMap[_]) update(key, conflict uint64, oldExpTime, newExpTime time.Time) {
  68  	if m == nil {
  69  		return
  70  	}
  71  
  72  	m.Lock()
  73  	defer m.Unlock()
  74  
  75  	oldBucketNum := storageBucket(oldExpTime)
  76  	oldBucket, ok := m.buckets[oldBucketNum]
  77  	if ok {
  78  		delete(oldBucket, key)
  79  	}
  80  
  81  	// Items that don't expire don't need to be in the expiration map.
  82  	if newExpTime.IsZero() {
  83  		return
  84  	}
  85  
  86  	newBucketNum := storageBucket(newExpTime)
  87  	newBucket, ok := m.buckets[newBucketNum]
  88  	if !ok {
  89  		newBucket = make(bucket)
  90  		m.buckets[newBucketNum] = newBucket
  91  	}
  92  	newBucket[key] = conflict
  93  }
  94  
  95  func (m *expirationMap[_]) del(key uint64, expiration time.Time) {
  96  	if m == nil {
  97  		return
  98  	}
  99  
 100  	bucketNum := storageBucket(expiration)
 101  	m.Lock()
 102  	defer m.Unlock()
 103  	_, ok := m.buckets[bucketNum]
 104  	if !ok {
 105  		return
 106  	}
 107  	delete(m.buckets[bucketNum], key)
 108  }
 109  
 110  // cleanup removes all the items in the bucket that was just completed. It deletes
 111  // those items from the store, and calls the onEvict function on those items.
 112  // This function is meant to be called periodically.
 113  func (m *expirationMap[V]) cleanup(store store[V], policy *defaultPolicy[V], onEvict func(item *Item[V])) int {
 114  	if m == nil {
 115  		return 0
 116  	}
 117  
 118  	m.Lock()
 119  	now := time.Now()
 120  	currentBucketNum := cleanupBucket(now)
 121  	// Clean up all buckets up to and including currentBucketNum, starting from
 122  	// (but not including) the last one that was cleaned up
 123  	var buckets []bucket
 124  	for bucketNum := m.lastCleanedBucketNum + 1; bucketNum <= currentBucketNum; bucketNum++ {
 125  		// With an empty bucket, we don't need to add it to the Clean list
 126  		if b := m.buckets[bucketNum]; b != nil {
 127  			buckets = append(buckets, b)
 128  		}
 129  		delete(m.buckets, bucketNum)
 130  	}
 131  	m.lastCleanedBucketNum = currentBucketNum
 132  	m.Unlock()
 133  
 134  	for _, keys := range buckets {
 135  		for key, conflict := range keys {
 136  			expr := store.Expiration(key)
 137  			// Sanity check. Verify that the store agrees that this key is expired.
 138  			if expr.After(now) {
 139  				continue
 140  			}
 141  
 142  			cost := policy.Cost(key)
 143  			policy.Del(key)
 144  			_, value := store.Del(key, conflict)
 145  
 146  			if onEvict != nil {
 147  				onEvict(&Item[V]{Key: key,
 148  					Conflict:   conflict,
 149  					Value:      value,
 150  					Cost:       cost,
 151  					Expiration: expr,
 152  				})
 153  			}
 154  		}
 155  	}
 156  
 157  	cleanedBucketsCount := len(buckets)
 158  
 159  	return cleanedBucketsCount
 160  }
 161  
 162  // clear clears the expirationMap, the caller is responsible for properly
 163  // evicting the referenced items
 164  func (m *expirationMap[V]) clear() {
 165  	if m == nil {
 166  		return
 167  	}
 168  
 169  	m.Lock()
 170  	m.buckets = make(map[int64]bucket)
 171  	m.lastCleanedBucketNum = cleanupBucket(time.Now())
 172  	m.Unlock()
 173  }
 174