batch.mx raw
1 // Copyright 2023 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package trace
6
7 import (
8 "bytes"
9 "encoding/binary"
10 "fmt"
11 "io"
12
13 "internal/trace/tracev2"
14 "internal/trace/version"
15 )
16
17 // timestamp is an unprocessed timestamp.
18 type timestamp uint64
19
20 // batch represents a batch of trace events.
21 // It is unparsed except for its header.
22 type batch struct {
23 m ThreadID
24 time timestamp
25 data []byte
26 exp tracev2.Experiment
27 }
28
29 func (b *batch) isStringsBatch() bool {
30 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStrings
31 }
32
33 func (b *batch) isStacksBatch() bool {
34 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStacks
35 }
36
37 func (b *batch) isCPUSamplesBatch() bool {
38 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvCPUSamples
39 }
40
41 func (b *batch) isSyncBatch(ver version.Version) bool {
42 return (b.exp == tracev2.NoExperiment && len(b.data) > 0) &&
43 ((tracev2.EventType(b.data[0]) == tracev2.EvFrequency && ver < version.Go125) ||
44 (tracev2.EventType(b.data[0]) == tracev2.EvSync && ver >= version.Go125))
45 }
46
47 // readBatch reads the next full batch from r.
48 func readBatch(r interface {
49 io.Reader
50 io.ByteReader
51 }) (batch, uint64, error) {
52 // Read batch header byte.
53 b, err := r.ReadByte()
54 if err != nil {
55 return batch{}, 0, err
56 }
57 if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
58 return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
59 }
60
61 // Read the experiment of we have one.
62 exp := tracev2.NoExperiment
63 if tracev2.EventType(b) == tracev2.EvExperimentalBatch {
64 e, err := r.ReadByte()
65 if err != nil {
66 return batch{}, 0, err
67 }
68 exp = tracev2.Experiment(e)
69 }
70
71 // Read the batch header: gen (generation), thread (M) ID, base timestamp
72 // for the batch.
73 gen, err := binary.ReadUvarint(r)
74 if err != nil {
75 return batch{}, gen, fmt.Errorf("error reading batch gen: %w", err)
76 }
77 m, err := binary.ReadUvarint(r)
78 if err != nil {
79 return batch{}, gen, fmt.Errorf("error reading batch M ID: %w", err)
80 }
81 ts, err := binary.ReadUvarint(r)
82 if err != nil {
83 return batch{}, gen, fmt.Errorf("error reading batch timestamp: %w", err)
84 }
85
86 // Read in the size of the batch to follow.
87 size, err := binary.ReadUvarint(r)
88 if err != nil {
89 return batch{}, gen, fmt.Errorf("error reading batch size: %w", err)
90 }
91 if size > tracev2.MaxBatchSize {
92 return batch{}, gen, fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
93 }
94
95 // Copy out the batch for later processing.
96 var data bytes.Buffer
97 data.Grow(int(size))
98 n, err := io.CopyN(&data, r, int64(size))
99 if n != int64(size) {
100 return batch{}, gen, fmt.Errorf("failed to read full batch: read %d but wanted %d", n, size)
101 }
102 if err != nil {
103 return batch{}, gen, fmt.Errorf("copying batch data: %w", err)
104 }
105
106 // Return the batch.
107 return batch{
108 m: ThreadID(m),
109 time: timestamp(ts),
110 data: data.Bytes(),
111 exp: exp,
112 }, gen, nil
113 }
114