latency_recorder.go raw
1 package main
2
3 import (
4 "bufio"
5 "encoding/binary"
6 "fmt"
7 "os"
8 "path/filepath"
9 "sort"
10 "sync"
11 "time"
12 )
13
14 // LatencyRecorder writes latency measurements to disk to avoid memory bloat
15 type LatencyRecorder struct {
16 file *os.File
17 writer *bufio.Writer
18 mu sync.Mutex
19 count int64
20 }
21
22 // LatencyStats contains calculated latency statistics
23 type LatencyStats struct {
24 Avg time.Duration
25 P90 time.Duration
26 P95 time.Duration
27 P99 time.Duration
28 Bottom10 time.Duration
29 Count int64
30 }
31
32 // NewLatencyRecorder creates a new latency recorder that writes to disk
33 func NewLatencyRecorder(baseDir string, testName string) (*LatencyRecorder, error) {
34 latencyFile := filepath.Join(baseDir, fmt.Sprintf("latency_%s.bin", testName))
35 f, err := os.Create(latencyFile)
36 if err != nil {
37 return nil, fmt.Errorf("failed to create latency file: %w", err)
38 }
39
40 return &LatencyRecorder{
41 file: f,
42 writer: bufio.NewWriter(f),
43 count: 0,
44 }, nil
45 }
46
47 // Record writes a latency measurement to disk (8 bytes per measurement)
48 func (lr *LatencyRecorder) Record(latency time.Duration) error {
49 lr.mu.Lock()
50 defer lr.mu.Unlock()
51
52 // Write latency as 8-byte value (int64 nanoseconds)
53 buf := make([]byte, 8)
54 binary.LittleEndian.PutUint64(buf, uint64(latency.Nanoseconds()))
55
56 if _, err := lr.writer.Write(buf); err != nil {
57 return fmt.Errorf("failed to write latency: %w", err)
58 }
59
60 lr.count++
61 return nil
62 }
63
64 // Close flushes and closes the latency file
65 func (lr *LatencyRecorder) Close() error {
66 lr.mu.Lock()
67 defer lr.mu.Unlock()
68
69 if err := lr.writer.Flush(); err != nil {
70 return fmt.Errorf("failed to flush latency file: %w", err)
71 }
72
73 if err := lr.file.Close(); err != nil {
74 return fmt.Errorf("failed to close latency file: %w", err)
75 }
76
77 return nil
78 }
79
80 // CalculateStats reads all latencies from disk, sorts them, and calculates statistics
81 // This is done on-demand to avoid keeping all latencies in memory during the test
82 func (lr *LatencyRecorder) CalculateStats() (*LatencyStats, error) {
83 lr.mu.Lock()
84 filePath := lr.file.Name()
85 count := lr.count
86 lr.mu.Unlock()
87
88 // If no measurements, return zeros
89 if count == 0 {
90 return &LatencyStats{
91 Avg: 0,
92 P90: 0,
93 P95: 0,
94 P99: 0,
95 Bottom10: 0,
96 Count: 0,
97 }, nil
98 }
99
100 // Open file for reading
101 f, err := os.Open(filePath)
102 if err != nil {
103 return nil, fmt.Errorf("failed to open latency file for reading: %w", err)
104 }
105 defer f.Close()
106
107 // Read all latencies into memory temporarily for sorting
108 latencies := make([]time.Duration, 0, count)
109 buf := make([]byte, 8)
110 reader := bufio.NewReader(f)
111
112 for {
113 n, err := reader.Read(buf)
114 if err != nil {
115 if err.Error() == "EOF" {
116 break
117 }
118 return nil, fmt.Errorf("failed to read latency data: %w", err)
119 }
120 if n != 8 {
121 break
122 }
123
124 nanos := binary.LittleEndian.Uint64(buf)
125 latencies = append(latencies, time.Duration(nanos))
126 }
127
128 // Check if we actually got any latencies
129 if len(latencies) == 0 {
130 return &LatencyStats{
131 Avg: 0,
132 P90: 0,
133 P95: 0,
134 P99: 0,
135 Bottom10: 0,
136 Count: 0,
137 }, nil
138 }
139
140 // Sort for percentile calculation
141 sort.Slice(latencies, func(i, j int) bool {
142 return latencies[i] < latencies[j]
143 })
144
145 // Calculate statistics
146 stats := &LatencyStats{
147 Count: int64(len(latencies)),
148 }
149
150 // Average
151 var sum time.Duration
152 for _, lat := range latencies {
153 sum += lat
154 }
155 stats.Avg = sum / time.Duration(len(latencies))
156
157 // Percentiles
158 stats.P90 = latencies[int(float64(len(latencies))*0.90)]
159 stats.P95 = latencies[int(float64(len(latencies))*0.95)]
160 stats.P99 = latencies[int(float64(len(latencies))*0.99)]
161
162 // Bottom 10% average
163 bottom10Count := int(float64(len(latencies)) * 0.10)
164 if bottom10Count > 0 {
165 var bottom10Sum time.Duration
166 for i := 0; i < bottom10Count; i++ {
167 bottom10Sum += latencies[i]
168 }
169 stats.Bottom10 = bottom10Sum / time.Duration(bottom10Count)
170 }
171
172 return stats, nil
173 }
174