batchcursor.mx raw

   1  // Copyright 2023 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package trace
   6  
   7  import (
   8  	"cmp"
   9  	"encoding/binary"
  10  	"fmt"
  11  
  12  	"internal/trace/tracev2"
  13  )
  14  
  15  type batchCursor struct {
  16  	m       ThreadID
  17  	lastTs  Time
  18  	idx     int       // next index into []batch
  19  	dataOff int       // next index into batch.data
  20  	ev      baseEvent // last read event
  21  }
  22  
  23  func (b *batchCursor) nextEvent(batches []batch, freq frequency) (ok bool, err error) {
  24  	// Batches should generally always have at least one event,
  25  	// but let's be defensive about that and accept empty batches.
  26  	for b.idx < len(batches) && len(batches[b.idx].data) == b.dataOff {
  27  		b.idx++
  28  		b.dataOff = 0
  29  		b.lastTs = 0
  30  	}
  31  	// Have we reached the end of the batches?
  32  	if b.idx == len(batches) {
  33  		return false, nil
  34  	}
  35  	// Initialize lastTs if it hasn't been yet.
  36  	if b.lastTs == 0 {
  37  		b.lastTs = freq.mul(batches[b.idx].time)
  38  	}
  39  	// Read an event out.
  40  	n, tsdiff, err := readTimedBaseEvent(batches[b.idx].data[b.dataOff:], &b.ev)
  41  	if err != nil {
  42  		return false, err
  43  	}
  44  	// Complete the timestamp from the cursor's last timestamp.
  45  	b.ev.time = freq.mul(tsdiff) + b.lastTs
  46  
  47  	// Move the cursor's timestamp forward.
  48  	b.lastTs = b.ev.time
  49  
  50  	// Move the cursor forward.
  51  	b.dataOff += n
  52  	return true, nil
  53  }
  54  
  55  func (b *batchCursor) compare(a *batchCursor) int {
  56  	return cmp.Compare(b.ev.time, a.ev.time)
  57  }
  58  
  59  // readTimedBaseEvent reads out the raw event data from b
  60  // into e. It does not try to interpret the arguments
  61  // but it does validate that the event is a regular
  62  // event with a timestamp (vs. a structural event).
  63  //
  64  // It requires that the event its reading be timed, which must
  65  // be the case for every event in a plain EventBatch.
  66  func readTimedBaseEvent(b []byte, e *baseEvent) (int, timestamp, error) {
  67  	// Get the event type.
  68  	typ := tracev2.EventType(b[0])
  69  	specs := tracev2.Specs()
  70  	if int(typ) >= len(specs) {
  71  		return 0, 0, fmt.Errorf("found invalid event type: %v", typ)
  72  	}
  73  	e.typ = typ
  74  
  75  	// Get spec.
  76  	spec := &specs[typ]
  77  	if len(spec.Args) == 0 || !spec.IsTimedEvent {
  78  		return 0, 0, fmt.Errorf("found event without a timestamp: type=%v", typ)
  79  	}
  80  	n := 1
  81  
  82  	// Read timestamp diff.
  83  	ts, nb := binary.Uvarint(b[n:])
  84  	if nb <= 0 {
  85  		return 0, 0, fmt.Errorf("found invalid uvarint for timestamp")
  86  	}
  87  	n += nb
  88  
  89  	// Read the rest of the arguments.
  90  	for i := 0; i < len(spec.Args)-1; i++ {
  91  		arg, nb := binary.Uvarint(b[n:])
  92  		if nb <= 0 {
  93  			return 0, 0, fmt.Errorf("found invalid uvarint")
  94  		}
  95  		e.args[i] = arg
  96  		n += nb
  97  	}
  98  	return n, timestamp(ts), nil
  99  }
 100  
 101  func heapInsert(heap []*batchCursor, bc *batchCursor) []*batchCursor {
 102  	// Add the cursor to the end of the heap.
 103  	heap = append(heap, bc)
 104  
 105  	// Sift the new entry up to the right place.
 106  	heapSiftUp(heap, len(heap)-1)
 107  	return heap
 108  }
 109  
 110  func heapUpdate(heap []*batchCursor, i int) {
 111  	// Try to sift up.
 112  	if heapSiftUp(heap, i) != i {
 113  		return
 114  	}
 115  	// Try to sift down, if sifting up failed.
 116  	heapSiftDown(heap, i)
 117  }
 118  
 119  func heapRemove(heap []*batchCursor, i int) []*batchCursor {
 120  	// Sift index i up to the root, ignoring actual values.
 121  	for i > 0 {
 122  		heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
 123  		i = (i - 1) / 2
 124  	}
 125  	// Swap the root with the last element, then remove it.
 126  	heap[0], heap[len(heap)-1] = heap[len(heap)-1], heap[0]
 127  	heap = heap[:len(heap)-1]
 128  	// Sift the root down.
 129  	heapSiftDown(heap, 0)
 130  	return heap
 131  }
 132  
 133  func heapSiftUp(heap []*batchCursor, i int) int {
 134  	for i > 0 && heap[(i-1)/2].ev.time > heap[i].ev.time {
 135  		heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
 136  		i = (i - 1) / 2
 137  	}
 138  	return i
 139  }
 140  
 141  func heapSiftDown(heap []*batchCursor, i int) int {
 142  	for {
 143  		m := min3(heap, i, 2*i+1, 2*i+2)
 144  		if m == i {
 145  			// Heap invariant already applies.
 146  			break
 147  		}
 148  		heap[i], heap[m] = heap[m], heap[i]
 149  		i = m
 150  	}
 151  	return i
 152  }
 153  
 154  func min3(b []*batchCursor, i0, i1, i2 int) int {
 155  	minIdx := i0
 156  	minT := maxTime
 157  	if i0 < len(b) {
 158  		minT = b[i0].ev.time
 159  	}
 160  	if i1 < len(b) {
 161  		if t := b[i1].ev.time; t < minT {
 162  			minT = t
 163  			minIdx = i1
 164  		}
 165  	}
 166  	if i2 < len(b) {
 167  		if t := b[i2].ev.time; t < minT {
 168  			minT = t
 169  			minIdx = i2
 170  		}
 171  	}
 172  	return minIdx
 173  }
 174