reader.mx raw

   1  // Copyright 2023 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package trace
   6  
   7  import (
   8  	"bufio"
   9  	"fmt"
  10  	"io"
  11  	"slices"
  12  	"bytes"
  13  
  14  	"internal/trace/internal/tracev1"
  15  	"internal/trace/tracev2"
  16  	"internal/trace/version"
  17  )
  18  
  19  // Reader reads a byte stream, validates it, and produces trace events.
  20  //
  21  // Provided the trace is non-empty the Reader always produces a Sync
  22  // event as the first event, and a Sync event as the last event.
  23  // (There may also be any number of Sync events in the middle, too.)
  24  type Reader struct {
  25  	version      version.Version
  26  	r            *bufio.Reader
  27  	lastTs       Time
  28  	gen          *generation
  29  	spill        *spilledBatch
  30  	spillErr     error // error from reading spill
  31  	spillErrSync bool  // whether we emitted a Sync before reporting spillErr
  32  	frontier     []*batchCursor
  33  	cpuSamples   []cpuSample
  34  	order        ordering
  35  	syncs        int
  36  	done         bool
  37  
  38  	v1Events *traceV1Converter
  39  }
  40  
  41  // NewReader creates a new trace reader.
  42  func NewReader(r io.Reader) (*Reader, error) {
  43  	br := bufio.NewReader(r)
  44  	v, err := version.ReadHeader(br)
  45  	if err != nil {
  46  		return nil, err
  47  	}
  48  	switch v {
  49  	case version.Go111, version.Go119, version.Go121:
  50  		tr, err := tracev1.Parse(br, v)
  51  		if err != nil {
  52  			return nil, err
  53  		}
  54  		return &Reader{
  55  			v1Events: convertV1Trace(tr),
  56  		}, nil
  57  	case version.Go122, version.Go123, version.Go125:
  58  		return &Reader{
  59  			version: v,
  60  			r:       br,
  61  			order: ordering{
  62  				traceVer:    v,
  63  				mStates:     map[ThreadID]*mState{},
  64  				pStates:     map[ProcID]*pState{},
  65  				gStates:     map[GoID]*gState{},
  66  				activeTasks: map[TaskID]taskState{},
  67  			},
  68  		}, nil
  69  	default:
  70  		return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
  71  	}
  72  }
  73  
  74  // ReadEvent reads a single event from the stream.
  75  //
  76  // If the stream has been exhausted, it returns an invalid event and io.EOF.
  77  func (r *Reader) ReadEvent() (e Event, err error) {
  78  	// Return only io.EOF if we're done.
  79  	if r.done {
  80  		return Event{}, io.EOF
  81  	}
  82  
  83  	// Handle v1 execution traces.
  84  	if r.v1Events != nil {
  85  		if r.syncs == 0 {
  86  			// Always emit a sync event first, if we have any events at all.
  87  			ev, ok := r.v1Events.events.Peek()
  88  			if ok {
  89  				r.syncs++
  90  				return syncEvent(r.v1Events.evt, Time(ev.Ts-1), r.syncs), nil
  91  			}
  92  		}
  93  		ev, err := r.v1Events.next()
  94  		if err == io.EOF {
  95  			// Always emit a sync event at the end.
  96  			r.done = true
  97  			r.syncs++
  98  			return syncEvent(nil, r.v1Events.lastTs+1, r.syncs), nil
  99  		} else if err != nil {
 100  			return Event{}, err
 101  		}
 102  		return ev, nil
 103  	}
 104  
 105  	// Trace v2 parsing algorithm.
 106  	//
 107  	// (1) Read in all the batches for the next generation from the stream.
 108  	//   (a) Use the size field in the header to quickly find all batches.
 109  	// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
 110  	// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
 111  	// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
 112  	// (5) Try to advance the next event for the M at the top of the min-heap.
 113  	//   (a) On success, select that M.
 114  	//   (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
 115  	//   (c) If there's nothing left to advance, goto (1).
 116  	// (6) Select the latest event for the selected M and get it ready to be returned.
 117  	// (7) Read the next event for the selected M and update the min-heap.
 118  	// (8) Return the selected event, goto (5) on the next call.
 119  
 120  	// Set us up to track the last timestamp and fix up
 121  	// the timestamp of any event that comes through.
 122  	defer func() {
 123  		if err != nil {
 124  			return
 125  		}
 126  		if err = e.validateTableIDs(); err != nil {
 127  			return
 128  		}
 129  		if e.base.time <= r.lastTs {
 130  			e.base.time = r.lastTs + 1
 131  		}
 132  		r.lastTs = e.base.time
 133  	}()
 134  
 135  	// Consume any events in the ordering first.
 136  	if ev, ok := r.order.Next(); ok {
 137  		return ev, nil
 138  	}
 139  
 140  	// Check if we need to refresh the generation.
 141  	if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
 142  		if r.spillErr != nil {
 143  			if r.spillErrSync {
 144  				return Event{}, r.spillErr
 145  			}
 146  			r.spillErrSync = true
 147  			r.syncs++
 148  			return syncEvent(nil, r.lastTs, r.syncs), nil
 149  		}
 150  		if r.gen != nil && r.spill == nil {
 151  			// If we have a generation from the last read,
 152  			// and there's nothing left in the frontier, and
 153  			// there's no spilled batch, indicating that there's
 154  			// no further generation, it means we're done.
 155  			// Emit the final sync event.
 156  			r.done = true
 157  			r.syncs++
 158  			return syncEvent(nil, r.lastTs, r.syncs), nil
 159  		}
 160  		// Read the next generation.
 161  		r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill, r.version)
 162  		if r.gen == nil {
 163  			r.spillErrSync = true
 164  			r.syncs++
 165  			return syncEvent(nil, r.lastTs, r.syncs), nil
 166  		}
 167  
 168  		// Reset CPU samples cursor.
 169  		r.cpuSamples = r.gen.cpuSamples
 170  
 171  		// Reset frontier.
 172  		for _, m := range r.gen.batchMs {
 173  			batches := r.gen.batches[m]
 174  			bc := &batchCursor{m: m}
 175  			ok, err := bc.nextEvent(batches, r.gen.freq)
 176  			if err != nil {
 177  				return Event{}, err
 178  			}
 179  			if !ok {
 180  				// Turns out there aren't actually any events in these batches.
 181  				continue
 182  			}
 183  			r.frontier = heapInsert(r.frontier, bc)
 184  		}
 185  		r.syncs++
 186  		// Always emit a sync event at the beginning of the generation.
 187  		return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
 188  	}
 189  	tryAdvance := func(i int) (bool, error) {
 190  		bc := r.frontier[i]
 191  
 192  		if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
 193  			return ok, err
 194  		}
 195  
 196  		// Refresh the cursor's event.
 197  		ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
 198  		if err != nil {
 199  			return false, err
 200  		}
 201  		if ok {
 202  			// If we successfully refreshed, update the heap.
 203  			heapUpdate(r.frontier, i)
 204  		} else {
 205  			// There's nothing else to read. Delete this cursor from the frontier.
 206  			r.frontier = heapRemove(r.frontier, i)
 207  		}
 208  		return true, nil
 209  	}
 210  	// Inject a CPU sample if it comes next.
 211  	if len(r.cpuSamples) != 0 {
 212  		if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
 213  			e := r.cpuSamples[0].asEvent(r.gen.evTable)
 214  			r.cpuSamples = r.cpuSamples[1:]
 215  			return e, nil
 216  		}
 217  	}
 218  	// Try to advance the head of the frontier, which should have the minimum timestamp.
 219  	// This should be by far the most common case
 220  	if len(r.frontier) == 0 {
 221  		return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
 222  	}
 223  	if ok, err := tryAdvance(0); err != nil {
 224  		return Event{}, err
 225  	} else if !ok {
 226  		// Try to advance the rest of the frontier, in timestamp order.
 227  		//
 228  		// To do this, sort the min-heap. A sorted min-heap is still a
 229  		// min-heap, but now we can iterate over the rest and try to
 230  		// advance in order. This path should be rare.
 231  		slices.SortFunc(r.frontier, (*batchCursor).compare)
 232  		success := false
 233  		for i := 1; i < len(r.frontier); i++ {
 234  			if ok, err = tryAdvance(i); err != nil {
 235  				return Event{}, err
 236  			} else if ok {
 237  				success = true
 238  				break
 239  			}
 240  		}
 241  		if !success {
 242  			return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
 243  		}
 244  	}
 245  
 246  	// Pick off the next event on the queue. At this point, one must exist.
 247  	ev, ok := r.order.Next()
 248  	if !ok {
 249  		panic("invariant violation: advance successful, but queue is empty")
 250  	}
 251  	return ev, nil
 252  }
 253  
 254  func dumpFrontier(frontier []*batchCursor) []byte {
 255  	var sb bytes.Buffer
 256  	for _, bc := range frontier {
 257  		spec := tracev2.Specs()[bc.ev.typ]
 258  		fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
 259  		for i, arg := range spec.Args[1:] {
 260  			fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
 261  		}
 262  		fmt.Fprintf(&sb, "]\n")
 263  	}
 264  	return sb.String()
 265  }
 266