latency_recorder.go raw

   1  package main
   2  
   3  import (
   4  	"bufio"
   5  	"encoding/binary"
   6  	"fmt"
   7  	"os"
   8  	"path/filepath"
   9  	"sort"
  10  	"sync"
  11  	"time"
  12  )
  13  
  14  // LatencyRecorder writes latency measurements to disk to avoid memory bloat
  15  type LatencyRecorder struct {
  16  	file   *os.File
  17  	writer *bufio.Writer
  18  	mu     sync.Mutex
  19  	count  int64
  20  }
  21  
  22  // LatencyStats contains calculated latency statistics
  23  type LatencyStats struct {
  24  	Avg        time.Duration
  25  	P90        time.Duration
  26  	P95        time.Duration
  27  	P99        time.Duration
  28  	Bottom10   time.Duration
  29  	Count      int64
  30  }
  31  
  32  // NewLatencyRecorder creates a new latency recorder that writes to disk
  33  func NewLatencyRecorder(baseDir string, testName string) (*LatencyRecorder, error) {
  34  	latencyFile := filepath.Join(baseDir, fmt.Sprintf("latency_%s.bin", testName))
  35  	f, err := os.Create(latencyFile)
  36  	if err != nil {
  37  		return nil, fmt.Errorf("failed to create latency file: %w", err)
  38  	}
  39  
  40  	return &LatencyRecorder{
  41  		file:   f,
  42  		writer: bufio.NewWriter(f),
  43  		count:  0,
  44  	}, nil
  45  }
  46  
  47  // Record writes a latency measurement to disk (8 bytes per measurement)
  48  func (lr *LatencyRecorder) Record(latency time.Duration) error {
  49  	lr.mu.Lock()
  50  	defer lr.mu.Unlock()
  51  
  52  	// Write latency as 8-byte value (int64 nanoseconds)
  53  	buf := make([]byte, 8)
  54  	binary.LittleEndian.PutUint64(buf, uint64(latency.Nanoseconds()))
  55  
  56  	if _, err := lr.writer.Write(buf); err != nil {
  57  		return fmt.Errorf("failed to write latency: %w", err)
  58  	}
  59  
  60  	lr.count++
  61  	return nil
  62  }
  63  
  64  // Close flushes and closes the latency file
  65  func (lr *LatencyRecorder) Close() error {
  66  	lr.mu.Lock()
  67  	defer lr.mu.Unlock()
  68  
  69  	if err := lr.writer.Flush(); err != nil {
  70  		return fmt.Errorf("failed to flush latency file: %w", err)
  71  	}
  72  
  73  	if err := lr.file.Close(); err != nil {
  74  		return fmt.Errorf("failed to close latency file: %w", err)
  75  	}
  76  
  77  	return nil
  78  }
  79  
  80  // CalculateStats reads all latencies from disk, sorts them, and calculates statistics
  81  // This is done on-demand to avoid keeping all latencies in memory during the test
  82  func (lr *LatencyRecorder) CalculateStats() (*LatencyStats, error) {
  83  	lr.mu.Lock()
  84  	filePath := lr.file.Name()
  85  	count := lr.count
  86  	lr.mu.Unlock()
  87  
  88  	// If no measurements, return zeros
  89  	if count == 0 {
  90  		return &LatencyStats{
  91  			Avg:        0,
  92  			P90:        0,
  93  			P95:        0,
  94  			P99:        0,
  95  			Bottom10:   0,
  96  			Count:      0,
  97  		}, nil
  98  	}
  99  
 100  	// Open file for reading
 101  	f, err := os.Open(filePath)
 102  	if err != nil {
 103  		return nil, fmt.Errorf("failed to open latency file for reading: %w", err)
 104  	}
 105  	defer f.Close()
 106  
 107  	// Read all latencies into memory temporarily for sorting
 108  	latencies := make([]time.Duration, 0, count)
 109  	buf := make([]byte, 8)
 110  	reader := bufio.NewReader(f)
 111  
 112  	for {
 113  		n, err := reader.Read(buf)
 114  		if err != nil {
 115  			if err.Error() == "EOF" {
 116  				break
 117  			}
 118  			return nil, fmt.Errorf("failed to read latency data: %w", err)
 119  		}
 120  		if n != 8 {
 121  			break
 122  		}
 123  
 124  		nanos := binary.LittleEndian.Uint64(buf)
 125  		latencies = append(latencies, time.Duration(nanos))
 126  	}
 127  
 128  	// Check if we actually got any latencies
 129  	if len(latencies) == 0 {
 130  		return &LatencyStats{
 131  			Avg:        0,
 132  			P90:        0,
 133  			P95:        0,
 134  			P99:        0,
 135  			Bottom10:   0,
 136  			Count:      0,
 137  		}, nil
 138  	}
 139  
 140  	// Sort for percentile calculation
 141  	sort.Slice(latencies, func(i, j int) bool {
 142  		return latencies[i] < latencies[j]
 143  	})
 144  
 145  	// Calculate statistics
 146  	stats := &LatencyStats{
 147  		Count: int64(len(latencies)),
 148  	}
 149  
 150  	// Average
 151  	var sum time.Duration
 152  	for _, lat := range latencies {
 153  		sum += lat
 154  	}
 155  	stats.Avg = sum / time.Duration(len(latencies))
 156  
 157  	// Percentiles
 158  	stats.P90 = latencies[int(float64(len(latencies))*0.90)]
 159  	stats.P95 = latencies[int(float64(len(latencies))*0.95)]
 160  	stats.P99 = latencies[int(float64(len(latencies))*0.99)]
 161  
 162  	// Bottom 10% average
 163  	bottom10Count := int(float64(len(latencies)) * 0.10)
 164  	if bottom10Count > 0 {
 165  		var bottom10Sum time.Duration
 166  		for i := 0; i < bottom10Count; i++ {
 167  			bottom10Sum += latencies[i]
 168  		}
 169  		stats.Bottom10 = bottom10Sum / time.Duration(bottom10Count)
 170  	}
 171  
 172  	return stats, nil
 173  }
 174