1 // Copyright 2023 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 package trace
6 7 import (
8 "cmp"
9 "encoding/binary"
10 "fmt"
11 12 "internal/trace/tracev2"
13 )
14 15 type batchCursor struct {
16 m ThreadID
17 lastTs Time
18 idx int // next index into []batch
19 dataOff int // next index into batch.data
20 ev baseEvent // last read event
21 }
22 23 func (b *batchCursor) nextEvent(batches []batch, freq frequency) (ok bool, err error) {
24 // Batches should generally always have at least one event,
25 // but let's be defensive about that and accept empty batches.
26 for b.idx < len(batches) && len(batches[b.idx].data) == b.dataOff {
27 b.idx++
28 b.dataOff = 0
29 b.lastTs = 0
30 }
31 // Have we reached the end of the batches?
32 if b.idx == len(batches) {
33 return false, nil
34 }
35 // Initialize lastTs if it hasn't been yet.
36 if b.lastTs == 0 {
37 b.lastTs = freq.mul(batches[b.idx].time)
38 }
39 // Read an event out.
40 n, tsdiff, err := readTimedBaseEvent(batches[b.idx].data[b.dataOff:], &b.ev)
41 if err != nil {
42 return false, err
43 }
44 // Complete the timestamp from the cursor's last timestamp.
45 b.ev.time = freq.mul(tsdiff) + b.lastTs
46 47 // Move the cursor's timestamp forward.
48 b.lastTs = b.ev.time
49 50 // Move the cursor forward.
51 b.dataOff += n
52 return true, nil
53 }
54 55 func (b *batchCursor) compare(a *batchCursor) int {
56 return cmp.Compare(b.ev.time, a.ev.time)
57 }
58 59 // readTimedBaseEvent reads out the raw event data from b
60 // into e. It does not try to interpret the arguments
61 // but it does validate that the event is a regular
62 // event with a timestamp (vs. a structural event).
63 //
64 // It requires that the event its reading be timed, which must
65 // be the case for every event in a plain EventBatch.
66 func readTimedBaseEvent(b []byte, e *baseEvent) (int, timestamp, error) {
67 // Get the event type.
68 typ := tracev2.EventType(b[0])
69 specs := tracev2.Specs()
70 if int(typ) >= len(specs) {
71 return 0, 0, fmt.Errorf("found invalid event type: %v", typ)
72 }
73 e.typ = typ
74 75 // Get spec.
76 spec := &specs[typ]
77 if len(spec.Args) == 0 || !spec.IsTimedEvent {
78 return 0, 0, fmt.Errorf("found event without a timestamp: type=%v", typ)
79 }
80 n := 1
81 82 // Read timestamp diff.
83 ts, nb := binary.Uvarint(b[n:])
84 if nb <= 0 {
85 return 0, 0, fmt.Errorf("found invalid uvarint for timestamp")
86 }
87 n += nb
88 89 // Read the rest of the arguments.
90 for i := 0; i < len(spec.Args)-1; i++ {
91 arg, nb := binary.Uvarint(b[n:])
92 if nb <= 0 {
93 return 0, 0, fmt.Errorf("found invalid uvarint")
94 }
95 e.args[i] = arg
96 n += nb
97 }
98 return n, timestamp(ts), nil
99 }
100 101 func heapInsert(heap []*batchCursor, bc *batchCursor) []*batchCursor {
102 // Add the cursor to the end of the heap.
103 heap = append(heap, bc)
104 105 // Sift the new entry up to the right place.
106 heapSiftUp(heap, len(heap)-1)
107 return heap
108 }
109 110 func heapUpdate(heap []*batchCursor, i int) {
111 // Try to sift up.
112 if heapSiftUp(heap, i) != i {
113 return
114 }
115 // Try to sift down, if sifting up failed.
116 heapSiftDown(heap, i)
117 }
118 119 func heapRemove(heap []*batchCursor, i int) []*batchCursor {
120 // Sift index i up to the root, ignoring actual values.
121 for i > 0 {
122 heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
123 i = (i - 1) / 2
124 }
125 // Swap the root with the last element, then remove it.
126 heap[0], heap[len(heap)-1] = heap[len(heap)-1], heap[0]
127 heap = heap[:len(heap)-1]
128 // Sift the root down.
129 heapSiftDown(heap, 0)
130 return heap
131 }
132 133 func heapSiftUp(heap []*batchCursor, i int) int {
134 for i > 0 && heap[(i-1)/2].ev.time > heap[i].ev.time {
135 heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
136 i = (i - 1) / 2
137 }
138 return i
139 }
140 141 func heapSiftDown(heap []*batchCursor, i int) int {
142 for {
143 m := min3(heap, i, 2*i+1, 2*i+2)
144 if m == i {
145 // Heap invariant already applies.
146 break
147 }
148 heap[i], heap[m] = heap[m], heap[i]
149 i = m
150 }
151 return i
152 }
153 154 func min3(b []*batchCursor, i0, i1, i2 int) int {
155 minIdx := i0
156 minT := maxTime
157 if i0 < len(b) {
158 minT = b[i0].ev.time
159 }
160 if i1 < len(b) {
161 if t := b[i1].ev.time; t < minT {
162 minT = t
163 minIdx = i1
164 }
165 }
166 if i2 < len(b) {
167 if t := b[i2].ev.time; t < minT {
168 minT = t
169 minIdx = i2
170 }
171 }
172 return minIdx
173 }
174