1 // Copyright 2023 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 package trace
6 7 import (
8 "bufio"
9 "bytes"
10 "cmp"
11 "encoding/binary"
12 "fmt"
13 "io"
14 "slices"
15 "time"
16 17 "internal/trace/tracev2"
18 "internal/trace/version"
19 )
20 21 // generation contains all the trace data for a single
22 // trace generation. It is purely data: it does not
23 // track any parse state nor does it contain a cursor
24 // into the generation.
25 type generation struct {
26 gen uint64
27 batches map[ThreadID][]batch
28 batchMs []ThreadID
29 cpuSamples []cpuSample
30 minTs timestamp
31 *evTable
32 }
33 34 // spilledBatch represents a batch that was read out for the next generation,
35 // while reading the previous one. It's passed on when parsing the next
36 // generation.
37 type spilledBatch struct {
38 gen uint64
39 *batch
40 }
41 42 // readGeneration buffers and decodes the structural elements of a trace generation
43 // out of r. spill is the first batch of the new generation (already buffered and
44 // parsed from reading the last generation). Returns the generation and the first
45 // batch read of the next generation, if any.
46 //
47 // If gen is non-nil, it is valid and must be processed before handling the returned
48 // error.
49 func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
50 g := &generation{
51 evTable: &evTable{
52 pcs: map[uint64]frame{},
53 },
54 batches: map[ThreadID][]batch{},
55 }
56 // Process the spilled batch.
57 if spill != nil {
58 g.gen = spill.gen
59 g.minTs = spill.batch.time
60 if err := processBatch(g, *spill.batch, ver); err != nil {
61 return nil, nil, err
62 }
63 spill = nil
64 }
65 // Read batches one at a time until we either hit EOF or
66 // the next generation.
67 var spillErr error
68 for {
69 b, gen, err := readBatch(r)
70 if err == io.EOF {
71 break
72 }
73 if err != nil {
74 if g.gen != 0 {
75 // This is an error reading the first batch of the next generation.
76 // This is fine. Let's forge ahead assuming that what we've got so
77 // far is fine.
78 spillErr = err
79 break
80 }
81 return nil, nil, err
82 }
83 if gen == 0 {
84 // 0 is a sentinel used by the runtime, so we'll never see it.
85 return nil, nil, fmt.Errorf("invalid generation number %d", gen)
86 }
87 if g.gen == 0 {
88 // Initialize gen.
89 g.gen = gen
90 }
91 if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
92 spill = &spilledBatch{gen: gen, batch: &b}
93 break
94 }
95 if gen != g.gen {
96 // N.B. Fail as fast as possible if we see this. At first it
97 // may seem prudent to be fault-tolerant and assume we have a
98 // complete generation, parsing and returning that first. However,
99 // if the batches are mixed across generations then it's likely
100 // we won't be able to parse this generation correctly at all.
101 // Rather than return a cryptic error in that case, indicate the
102 // problem as soon as we see it.
103 return nil, nil, fmt.Errorf("generations out of order")
104 }
105 if g.minTs == 0 || b.time < g.minTs {
106 g.minTs = b.time
107 }
108 if err := processBatch(g, b, ver); err != nil {
109 return nil, nil, err
110 }
111 }
112 113 // Check some invariants.
114 if g.freq == 0 {
115 return nil, nil, fmt.Errorf("no frequency event found")
116 }
117 if ver >= version.Go125 && !g.hasClockSnapshot {
118 return nil, nil, fmt.Errorf("no clock snapshot event found")
119 }
120 121 // N.B. Trust that the batch order is correct. We can't validate the batch order
122 // by timestamp because the timestamps could just be plain wrong. The source of
123 // truth is the order things appear in the trace and the partial order sequence
124 // numbers on certain events. If it turns out the batch order is actually incorrect
125 // we'll very likely fail to advance a partial order from the frontier.
126 127 // Compactify stacks and strings for better lookup performance later.
128 g.stacks.compactify()
129 g.bytes.compactify()
130 131 // Validate stacks.
132 if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
133 return nil, nil, err
134 }
135 136 // Fix up the CPU sample timestamps, now that we have freq.
137 for i := range g.cpuSamples {
138 s := &g.cpuSamples[i]
139 s.time = g.freq.mul(timestamp(s.time))
140 }
141 // Sort the CPU samples.
142 slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
143 return cmp.Compare(a.time, b.time)
144 })
145 return g, spill, spillErr
146 }
147 148 // processBatch adds the batch to the generation.
149 func processBatch(g *generation, b batch, ver version.Version) error {
150 switch {
151 case b.isStringsBatch():
152 if err := addStrings(&g.strings, b); err != nil {
153 return err
154 }
155 case b.isStacksBatch():
156 if err := addStacks(&g.stacks, g.pcs, b); err != nil {
157 return err
158 }
159 case b.isCPUSamplesBatch():
160 samples, err := addCPUSamples(g.cpuSamples, b)
161 if err != nil {
162 return err
163 }
164 g.cpuSamples = samples
165 case b.isSyncBatch(ver):
166 if err := setSyncBatch(&g.sync, b, ver); err != nil {
167 return err
168 }
169 case b.exp != tracev2.NoExperiment:
170 if g.expBatches == nil {
171 g.expBatches = map[tracev2.Experiment][]ExperimentalBatch{}
172 }
173 if err := addExperimentalBatch(g.expBatches, b); err != nil {
174 return err
175 }
176 default:
177 if _, ok := g.batches[b.m]; !ok {
178 g.batchMs = append(g.batchMs, b.m)
179 }
180 g.batches[b.m] = append(g.batches[b.m], b)
181 }
182 return nil
183 }
184 185 // validateStackStrings makes sure all the string references in
186 // the stack table are present in the string table.
187 func validateStackStrings(
188 stacks *dataTable[stackID, stack],
189 strings *dataTable[stringID, string],
190 frames map[uint64]frame,
191 ) error {
192 var err error
193 stacks.forEach(func(id stackID, stk stack) bool {
194 for _, pc := range stk.pcs {
195 frame, ok := frames[pc]
196 if !ok {
197 err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
198 return false
199 }
200 _, ok = bytes.get(frame.funcID)
201 if !ok {
202 err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
203 return false
204 }
205 _, ok = bytes.get(frame.fileID)
206 if !ok {
207 err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
208 return false
209 }
210 }
211 return true
212 })
213 return err
214 }
215 216 // addStrings takes a batch whose first byte is an EvStrings event
217 // (indicating that the batch contains only strings) and adds each
218 // string contained therein to the provided strings map.
219 func addStrings(stringTable *dataTable[stringID, string], b batch) error {
220 if !b.isStringsBatch() {
221 return fmt.Errorf("internal error: addStrings called on non-string batch")
222 }
223 r := bytes.NewReader(b.data)
224 hdr, err := r.ReadByte() // Consume the EvStrings byte.
225 if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
226 return fmt.Errorf("missing strings batch header")
227 }
228 229 var sb bytes.Buffer
230 for r.Len() != 0 {
231 // Read the header.
232 ev, err := r.ReadByte()
233 if err != nil {
234 return err
235 }
236 if tracev2.EventType(ev) != tracev2.EvString {
237 return fmt.Errorf("expected string event, got %d", ev)
238 }
239 240 // Read the string's ID.
241 id, err := binary.ReadUvarint(r)
242 if err != nil {
243 return err
244 }
245 246 // Read the string's length.
247 len, err := binary.ReadUvarint(r)
248 if err != nil {
249 return err
250 }
251 if len > tracev2.MaxEventTrailerDataSize {
252 return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
253 }
254 255 // Copy out the string.
256 n, err := io.CopyN(&sb, r, int64(len))
257 if n != int64(len) {
258 return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
259 }
260 if err != nil {
261 return fmt.Errorf("copying string data: %w", err)
262 }
263 264 // Add the string to the map.
265 s := sb.String()
266 sb.Reset()
267 if err := stringTable.insert(stringID(id), s); err != nil {
268 return err
269 }
270 }
271 return nil
272 }
273 274 // addStacks takes a batch whose first byte is an EvStacks event
275 // (indicating that the batch contains only stacks) and adds each
276 // string contained therein to the provided stacks map.
277 func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
278 if !b.isStacksBatch() {
279 return fmt.Errorf("internal error: addStacks called on non-stacks batch")
280 }
281 r := bytes.NewReader(b.data)
282 hdr, err := r.ReadByte() // Consume the EvStacks byte.
283 if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
284 return fmt.Errorf("missing stacks batch header")
285 }
286 287 for r.Len() != 0 {
288 // Read the header.
289 ev, err := r.ReadByte()
290 if err != nil {
291 return err
292 }
293 if tracev2.EventType(ev) != tracev2.EvStack {
294 return fmt.Errorf("expected stack event, got %d", ev)
295 }
296 297 // Read the stack's ID.
298 id, err := binary.ReadUvarint(r)
299 if err != nil {
300 return err
301 }
302 303 // Read how many frames are in each stack.
304 nFrames, err := binary.ReadUvarint(r)
305 if err != nil {
306 return err
307 }
308 if nFrames > tracev2.MaxFramesPerStack {
309 return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
310 }
311 312 // Each frame consists of 4 fields: pc, funcID (string), fileID (string), line.
313 frames := []uint64{:0:nFrames}
314 for i := uint64(0); i < nFrames; i++ {
315 // Read the frame data.
316 pc, err := binary.ReadUvarint(r)
317 if err != nil {
318 return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
319 }
320 funcID, err := binary.ReadUvarint(r)
321 if err != nil {
322 return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
323 }
324 fileID, err := binary.ReadUvarint(r)
325 if err != nil {
326 return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
327 }
328 line, err := binary.ReadUvarint(r)
329 if err != nil {
330 return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
331 }
332 frames = append(frames, pc)
333 334 if _, ok := pcs[pc]; !ok {
335 pcs[pc] = frame{
336 pc: pc,
337 funcID: stringID(funcID),
338 fileID: stringID(fileID),
339 line: line,
340 }
341 }
342 }
343 344 // Add the stack to the map.
345 if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
346 return err
347 }
348 }
349 return nil
350 }
351 352 // addCPUSamples takes a batch whose first byte is an EvCPUSamples event
353 // (indicating that the batch contains only CPU samples) and adds each
354 // sample contained therein to the provided samples list.
355 func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
356 if !b.isCPUSamplesBatch() {
357 return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
358 }
359 r := bytes.NewReader(b.data)
360 hdr, err := r.ReadByte() // Consume the EvCPUSamples byte.
361 if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
362 return nil, fmt.Errorf("missing CPU samples batch header")
363 }
364 365 for r.Len() != 0 {
366 // Read the header.
367 ev, err := r.ReadByte()
368 if err != nil {
369 return nil, err
370 }
371 if tracev2.EventType(ev) != tracev2.EvCPUSample {
372 return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
373 }
374 375 // Read the sample's timestamp.
376 ts, err := binary.ReadUvarint(r)
377 if err != nil {
378 return nil, err
379 }
380 381 // Read the sample's M.
382 m, err := binary.ReadUvarint(r)
383 if err != nil {
384 return nil, err
385 }
386 mid := ThreadID(m)
387 388 // Read the sample's P.
389 p, err := binary.ReadUvarint(r)
390 if err != nil {
391 return nil, err
392 }
393 pid := ProcID(p)
394 395 // Read the sample's G.
396 g, err := binary.ReadUvarint(r)
397 if err != nil {
398 return nil, err
399 }
400 goid := GoID(g)
401 if g == 0 {
402 goid = NoGoroutine
403 }
404 405 // Read the sample's stack.
406 s, err := binary.ReadUvarint(r)
407 if err != nil {
408 return nil, err
409 }
410 411 // Add the sample to the slice.
412 samples = append(samples, cpuSample{
413 schedCtx: schedCtx{
414 M: mid,
415 P: pid,
416 G: goid,
417 },
418 time: Time(ts), // N.B. this is really a "timestamp," not a Time.
419 stack: stackID(s),
420 })
421 }
422 return samples, nil
423 }
424 425 // sync holds the per-generation sync data.
426 type sync struct {
427 freq frequency
428 hasClockSnapshot bool
429 snapTime timestamp
430 snapMono uint64
431 snapWall time.Time
432 }
433 434 func setSyncBatch(s *sync, b batch, ver version.Version) error {
435 if !b.isSyncBatch(ver) {
436 return fmt.Errorf("internal error: setSyncBatch called on non-sync batch")
437 }
438 r := bytes.NewReader(b.data)
439 if ver >= version.Go125 {
440 hdr, err := r.ReadByte() // Consume the EvSync byte.
441 if err != nil || tracev2.EventType(hdr) != tracev2.EvSync {
442 return fmt.Errorf("missing sync batch header")
443 }
444 }
445 446 lastTs := b.time
447 for r.Len() != 0 {
448 // Read the header
449 ev, err := r.ReadByte()
450 if err != nil {
451 return err
452 }
453 et := tracev2.EventType(ev)
454 switch {
455 case et == tracev2.EvFrequency:
456 if s.freq != 0 {
457 return fmt.Errorf("found multiple frequency events")
458 }
459 // Read the frequency. It'll come out as timestamp units per second.
460 f, err := binary.ReadUvarint(r)
461 if err != nil {
462 return err
463 }
464 // Convert to nanoseconds per timestamp unit.
465 s.freq = frequency(1.0 / (float64(f) / 1e9))
466 case et == tracev2.EvClockSnapshot && ver >= version.Go125:
467 if s.hasClockSnapshot {
468 return fmt.Errorf("found multiple clock snapshot events")
469 }
470 s.hasClockSnapshot = true
471 // Read the EvClockSnapshot arguments.
472 tdiff, err := binary.ReadUvarint(r)
473 if err != nil {
474 return err
475 }
476 lastTs += timestamp(tdiff)
477 s.snapTime = lastTs
478 mono, err := binary.ReadUvarint(r)
479 if err != nil {
480 return err
481 }
482 s.snapMono = mono
483 sec, err := binary.ReadUvarint(r)
484 if err != nil {
485 return err
486 }
487 nsec, err := binary.ReadUvarint(r)
488 if err != nil {
489 return err
490 }
491 // TODO(felixge): In theory we could inject s.snapMono into the time
492 // value below to make it comparable. But there is no API for this
493 // in the time package right now.
494 s.snapWall = time.Unix(int64(sec), int64(nsec))
495 default:
496 return fmt.Errorf("expected frequency or clock snapshot event, got %d", ev)
497 }
498 }
499 return nil
500 }
501 502 // addExperimentalBatch takes an experimental batch and adds it to the list of experimental
503 // batches for the experiment its a part of.
504 func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
505 if b.exp == tracev2.NoExperiment {
506 return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
507 }
508 expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
509 Thread: b.m,
510 Data: b.data,
511 })
512 return nil
513 }
514