1 // Copyright 2023 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 package trace
6 7 import (
8 "bufio"
9 "fmt"
10 "io"
11 "slices"
12 "bytes"
13 14 "internal/trace/internal/tracev1"
15 "internal/trace/tracev2"
16 "internal/trace/version"
17 )
18 19 // Reader reads a byte stream, validates it, and produces trace events.
20 //
21 // Provided the trace is non-empty the Reader always produces a Sync
22 // event as the first event, and a Sync event as the last event.
23 // (There may also be any number of Sync events in the middle, too.)
24 type Reader struct {
25 version version.Version
26 r *bufio.Reader
27 lastTs Time
28 gen *generation
29 spill *spilledBatch
30 spillErr error // error from reading spill
31 spillErrSync bool // whether we emitted a Sync before reporting spillErr
32 frontier []*batchCursor
33 cpuSamples []cpuSample
34 order ordering
35 syncs int
36 done bool
37 38 v1Events *traceV1Converter
39 }
40 41 // NewReader creates a new trace reader.
42 func NewReader(r io.Reader) (*Reader, error) {
43 br := bufio.NewReader(r)
44 v, err := version.ReadHeader(br)
45 if err != nil {
46 return nil, err
47 }
48 switch v {
49 case version.Go111, version.Go119, version.Go121:
50 tr, err := tracev1.Parse(br, v)
51 if err != nil {
52 return nil, err
53 }
54 return &Reader{
55 v1Events: convertV1Trace(tr),
56 }, nil
57 case version.Go122, version.Go123, version.Go125:
58 return &Reader{
59 version: v,
60 r: br,
61 order: ordering{
62 traceVer: v,
63 mStates: map[ThreadID]*mState{},
64 pStates: map[ProcID]*pState{},
65 gStates: map[GoID]*gState{},
66 activeTasks: map[TaskID]taskState{},
67 },
68 }, nil
69 default:
70 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
71 }
72 }
73 74 // ReadEvent reads a single event from the stream.
75 //
76 // If the stream has been exhausted, it returns an invalid event and io.EOF.
77 func (r *Reader) ReadEvent() (e Event, err error) {
78 // Return only io.EOF if we're done.
79 if r.done {
80 return Event{}, io.EOF
81 }
82 83 // Handle v1 execution traces.
84 if r.v1Events != nil {
85 if r.syncs == 0 {
86 // Always emit a sync event first, if we have any events at all.
87 ev, ok := r.v1Events.events.Peek()
88 if ok {
89 r.syncs++
90 return syncEvent(r.v1Events.evt, Time(ev.Ts-1), r.syncs), nil
91 }
92 }
93 ev, err := r.v1Events.next()
94 if err == io.EOF {
95 // Always emit a sync event at the end.
96 r.done = true
97 r.syncs++
98 return syncEvent(nil, r.v1Events.lastTs+1, r.syncs), nil
99 } else if err != nil {
100 return Event{}, err
101 }
102 return ev, nil
103 }
104 105 // Trace v2 parsing algorithm.
106 //
107 // (1) Read in all the batches for the next generation from the stream.
108 // (a) Use the size field in the header to quickly find all batches.
109 // (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
110 // (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
111 // (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
112 // (5) Try to advance the next event for the M at the top of the min-heap.
113 // (a) On success, select that M.
114 // (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
115 // (c) If there's nothing left to advance, goto (1).
116 // (6) Select the latest event for the selected M and get it ready to be returned.
117 // (7) Read the next event for the selected M and update the min-heap.
118 // (8) Return the selected event, goto (5) on the next call.
119 120 // Set us up to track the last timestamp and fix up
121 // the timestamp of any event that comes through.
122 defer func() {
123 if err != nil {
124 return
125 }
126 if err = e.validateTableIDs(); err != nil {
127 return
128 }
129 if e.base.time <= r.lastTs {
130 e.base.time = r.lastTs + 1
131 }
132 r.lastTs = e.base.time
133 }()
134 135 // Consume any events in the ordering first.
136 if ev, ok := r.order.Next(); ok {
137 return ev, nil
138 }
139 140 // Check if we need to refresh the generation.
141 if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
142 if r.spillErr != nil {
143 if r.spillErrSync {
144 return Event{}, r.spillErr
145 }
146 r.spillErrSync = true
147 r.syncs++
148 return syncEvent(nil, r.lastTs, r.syncs), nil
149 }
150 if r.gen != nil && r.spill == nil {
151 // If we have a generation from the last read,
152 // and there's nothing left in the frontier, and
153 // there's no spilled batch, indicating that there's
154 // no further generation, it means we're done.
155 // Emit the final sync event.
156 r.done = true
157 r.syncs++
158 return syncEvent(nil, r.lastTs, r.syncs), nil
159 }
160 // Read the next generation.
161 r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill, r.version)
162 if r.gen == nil {
163 r.spillErrSync = true
164 r.syncs++
165 return syncEvent(nil, r.lastTs, r.syncs), nil
166 }
167 168 // Reset CPU samples cursor.
169 r.cpuSamples = r.gen.cpuSamples
170 171 // Reset frontier.
172 for _, m := range r.gen.batchMs {
173 batches := r.gen.batches[m]
174 bc := &batchCursor{m: m}
175 ok, err := bc.nextEvent(batches, r.gen.freq)
176 if err != nil {
177 return Event{}, err
178 }
179 if !ok {
180 // Turns out there aren't actually any events in these batches.
181 continue
182 }
183 r.frontier = heapInsert(r.frontier, bc)
184 }
185 r.syncs++
186 // Always emit a sync event at the beginning of the generation.
187 return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
188 }
189 tryAdvance := func(i int) (bool, error) {
190 bc := r.frontier[i]
191 192 if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
193 return ok, err
194 }
195 196 // Refresh the cursor's event.
197 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
198 if err != nil {
199 return false, err
200 }
201 if ok {
202 // If we successfully refreshed, update the heap.
203 heapUpdate(r.frontier, i)
204 } else {
205 // There's nothing else to read. Delete this cursor from the frontier.
206 r.frontier = heapRemove(r.frontier, i)
207 }
208 return true, nil
209 }
210 // Inject a CPU sample if it comes next.
211 if len(r.cpuSamples) != 0 {
212 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
213 e := r.cpuSamples[0].asEvent(r.gen.evTable)
214 r.cpuSamples = r.cpuSamples[1:]
215 return e, nil
216 }
217 }
218 // Try to advance the head of the frontier, which should have the minimum timestamp.
219 // This should be by far the most common case
220 if len(r.frontier) == 0 {
221 return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
222 }
223 if ok, err := tryAdvance(0); err != nil {
224 return Event{}, err
225 } else if !ok {
226 // Try to advance the rest of the frontier, in timestamp order.
227 //
228 // To do this, sort the min-heap. A sorted min-heap is still a
229 // min-heap, but now we can iterate over the rest and try to
230 // advance in order. This path should be rare.
231 slices.SortFunc(r.frontier, (*batchCursor).compare)
232 success := false
233 for i := 1; i < len(r.frontier); i++ {
234 if ok, err = tryAdvance(i); err != nil {
235 return Event{}, err
236 } else if ok {
237 success = true
238 break
239 }
240 }
241 if !success {
242 return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
243 }
244 }
245 246 // Pick off the next event on the queue. At this point, one must exist.
247 ev, ok := r.order.Next()
248 if !ok {
249 panic("invariant violation: advance successful, but queue is empty")
250 }
251 return ev, nil
252 }
253 254 func dumpFrontier(frontier []*batchCursor) []byte {
255 var sb bytes.Buffer
256 for _, bc := range frontier {
257 spec := tracev2.Specs()[bc.ev.typ]
258 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
259 for i, arg := range spec.Args[1:] {
260 fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
261 }
262 fmt.Fprintf(&sb, "]\n")
263 }
264 return sb.String()
265 }
266