generation.mx raw

   1  // Copyright 2023 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package trace
   6  
   7  import (
   8  	"bufio"
   9  	"bytes"
  10  	"cmp"
  11  	"encoding/binary"
  12  	"fmt"
  13  	"io"
  14  	"slices"
  15  	"time"
  16  
  17  	"internal/trace/tracev2"
  18  	"internal/trace/version"
  19  )
  20  
  21  // generation contains all the trace data for a single
  22  // trace generation. It is purely data: it does not
  23  // track any parse state nor does it contain a cursor
  24  // into the generation.
  25  type generation struct {
  26  	gen        uint64
  27  	batches    map[ThreadID][]batch
  28  	batchMs    []ThreadID
  29  	cpuSamples []cpuSample
  30  	minTs      timestamp
  31  	*evTable
  32  }
  33  
  34  // spilledBatch represents a batch that was read out for the next generation,
  35  // while reading the previous one. It's passed on when parsing the next
  36  // generation.
  37  type spilledBatch struct {
  38  	gen uint64
  39  	*batch
  40  }
  41  
  42  // readGeneration buffers and decodes the structural elements of a trace generation
  43  // out of r. spill is the first batch of the new generation (already buffered and
  44  // parsed from reading the last generation). Returns the generation and the first
  45  // batch read of the next generation, if any.
  46  //
  47  // If gen is non-nil, it is valid and must be processed before handling the returned
  48  // error.
  49  func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
  50  	g := &generation{
  51  		evTable: &evTable{
  52  			pcs: map[uint64]frame{},
  53  		},
  54  		batches: map[ThreadID][]batch{},
  55  	}
  56  	// Process the spilled batch.
  57  	if spill != nil {
  58  		g.gen = spill.gen
  59  		g.minTs = spill.batch.time
  60  		if err := processBatch(g, *spill.batch, ver); err != nil {
  61  			return nil, nil, err
  62  		}
  63  		spill = nil
  64  	}
  65  	// Read batches one at a time until we either hit EOF or
  66  	// the next generation.
  67  	var spillErr error
  68  	for {
  69  		b, gen, err := readBatch(r)
  70  		if err == io.EOF {
  71  			break
  72  		}
  73  		if err != nil {
  74  			if g.gen != 0 {
  75  				// This is an error reading the first batch of the next generation.
  76  				// This is fine. Let's forge ahead assuming that what we've got so
  77  				// far is fine.
  78  				spillErr = err
  79  				break
  80  			}
  81  			return nil, nil, err
  82  		}
  83  		if gen == 0 {
  84  			// 0 is a sentinel used by the runtime, so we'll never see it.
  85  			return nil, nil, fmt.Errorf("invalid generation number %d", gen)
  86  		}
  87  		if g.gen == 0 {
  88  			// Initialize gen.
  89  			g.gen = gen
  90  		}
  91  		if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
  92  			spill = &spilledBatch{gen: gen, batch: &b}
  93  			break
  94  		}
  95  		if gen != g.gen {
  96  			// N.B. Fail as fast as possible if we see this. At first it
  97  			// may seem prudent to be fault-tolerant and assume we have a
  98  			// complete generation, parsing and returning that first. However,
  99  			// if the batches are mixed across generations then it's likely
 100  			// we won't be able to parse this generation correctly at all.
 101  			// Rather than return a cryptic error in that case, indicate the
 102  			// problem as soon as we see it.
 103  			return nil, nil, fmt.Errorf("generations out of order")
 104  		}
 105  		if g.minTs == 0 || b.time < g.minTs {
 106  			g.minTs = b.time
 107  		}
 108  		if err := processBatch(g, b, ver); err != nil {
 109  			return nil, nil, err
 110  		}
 111  	}
 112  
 113  	// Check some invariants.
 114  	if g.freq == 0 {
 115  		return nil, nil, fmt.Errorf("no frequency event found")
 116  	}
 117  	if ver >= version.Go125 && !g.hasClockSnapshot {
 118  		return nil, nil, fmt.Errorf("no clock snapshot event found")
 119  	}
 120  
 121  	// N.B. Trust that the batch order is correct. We can't validate the batch order
 122  	// by timestamp because the timestamps could just be plain wrong. The source of
 123  	// truth is the order things appear in the trace and the partial order sequence
 124  	// numbers on certain events. If it turns out the batch order is actually incorrect
 125  	// we'll very likely fail to advance a partial order from the frontier.
 126  
 127  	// Compactify stacks and strings for better lookup performance later.
 128  	g.stacks.compactify()
 129  	g.bytes.compactify()
 130  
 131  	// Validate stacks.
 132  	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
 133  		return nil, nil, err
 134  	}
 135  
 136  	// Fix up the CPU sample timestamps, now that we have freq.
 137  	for i := range g.cpuSamples {
 138  		s := &g.cpuSamples[i]
 139  		s.time = g.freq.mul(timestamp(s.time))
 140  	}
 141  	// Sort the CPU samples.
 142  	slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
 143  		return cmp.Compare(a.time, b.time)
 144  	})
 145  	return g, spill, spillErr
 146  }
 147  
 148  // processBatch adds the batch to the generation.
 149  func processBatch(g *generation, b batch, ver version.Version) error {
 150  	switch {
 151  	case b.isStringsBatch():
 152  		if err := addStrings(&g.strings, b); err != nil {
 153  			return err
 154  		}
 155  	case b.isStacksBatch():
 156  		if err := addStacks(&g.stacks, g.pcs, b); err != nil {
 157  			return err
 158  		}
 159  	case b.isCPUSamplesBatch():
 160  		samples, err := addCPUSamples(g.cpuSamples, b)
 161  		if err != nil {
 162  			return err
 163  		}
 164  		g.cpuSamples = samples
 165  	case b.isSyncBatch(ver):
 166  		if err := setSyncBatch(&g.sync, b, ver); err != nil {
 167  			return err
 168  		}
 169  	case b.exp != tracev2.NoExperiment:
 170  		if g.expBatches == nil {
 171  			g.expBatches = map[tracev2.Experiment][]ExperimentalBatch{}
 172  		}
 173  		if err := addExperimentalBatch(g.expBatches, b); err != nil {
 174  			return err
 175  		}
 176  	default:
 177  		if _, ok := g.batches[b.m]; !ok {
 178  			g.batchMs = append(g.batchMs, b.m)
 179  		}
 180  		g.batches[b.m] = append(g.batches[b.m], b)
 181  	}
 182  	return nil
 183  }
 184  
 185  // validateStackStrings makes sure all the string references in
 186  // the stack table are present in the string table.
 187  func validateStackStrings(
 188  	stacks *dataTable[stackID, stack],
 189  	strings *dataTable[stringID, string],
 190  	frames map[uint64]frame,
 191  ) error {
 192  	var err error
 193  	stacks.forEach(func(id stackID, stk stack) bool {
 194  		for _, pc := range stk.pcs {
 195  			frame, ok := frames[pc]
 196  			if !ok {
 197  				err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
 198  				return false
 199  			}
 200  			_, ok = bytes.get(frame.funcID)
 201  			if !ok {
 202  				err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
 203  				return false
 204  			}
 205  			_, ok = bytes.get(frame.fileID)
 206  			if !ok {
 207  				err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
 208  				return false
 209  			}
 210  		}
 211  		return true
 212  	})
 213  	return err
 214  }
 215  
 216  // addStrings takes a batch whose first byte is an EvStrings event
 217  // (indicating that the batch contains only strings) and adds each
 218  // string contained therein to the provided strings map.
 219  func addStrings(stringTable *dataTable[stringID, string], b batch) error {
 220  	if !b.isStringsBatch() {
 221  		return fmt.Errorf("internal error: addStrings called on non-string batch")
 222  	}
 223  	r := bytes.NewReader(b.data)
 224  	hdr, err := r.ReadByte() // Consume the EvStrings byte.
 225  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
 226  		return fmt.Errorf("missing strings batch header")
 227  	}
 228  
 229  	var sb bytes.Buffer
 230  	for r.Len() != 0 {
 231  		// Read the header.
 232  		ev, err := r.ReadByte()
 233  		if err != nil {
 234  			return err
 235  		}
 236  		if tracev2.EventType(ev) != tracev2.EvString {
 237  			return fmt.Errorf("expected string event, got %d", ev)
 238  		}
 239  
 240  		// Read the string's ID.
 241  		id, err := binary.ReadUvarint(r)
 242  		if err != nil {
 243  			return err
 244  		}
 245  
 246  		// Read the string's length.
 247  		len, err := binary.ReadUvarint(r)
 248  		if err != nil {
 249  			return err
 250  		}
 251  		if len > tracev2.MaxEventTrailerDataSize {
 252  			return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
 253  		}
 254  
 255  		// Copy out the string.
 256  		n, err := io.CopyN(&sb, r, int64(len))
 257  		if n != int64(len) {
 258  			return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
 259  		}
 260  		if err != nil {
 261  			return fmt.Errorf("copying string data: %w", err)
 262  		}
 263  
 264  		// Add the string to the map.
 265  		s := sb.String()
 266  		sb.Reset()
 267  		if err := stringTable.insert(stringID(id), s); err != nil {
 268  			return err
 269  		}
 270  	}
 271  	return nil
 272  }
 273  
 274  // addStacks takes a batch whose first byte is an EvStacks event
 275  // (indicating that the batch contains only stacks) and adds each
 276  // string contained therein to the provided stacks map.
 277  func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
 278  	if !b.isStacksBatch() {
 279  		return fmt.Errorf("internal error: addStacks called on non-stacks batch")
 280  	}
 281  	r := bytes.NewReader(b.data)
 282  	hdr, err := r.ReadByte() // Consume the EvStacks byte.
 283  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
 284  		return fmt.Errorf("missing stacks batch header")
 285  	}
 286  
 287  	for r.Len() != 0 {
 288  		// Read the header.
 289  		ev, err := r.ReadByte()
 290  		if err != nil {
 291  			return err
 292  		}
 293  		if tracev2.EventType(ev) != tracev2.EvStack {
 294  			return fmt.Errorf("expected stack event, got %d", ev)
 295  		}
 296  
 297  		// Read the stack's ID.
 298  		id, err := binary.ReadUvarint(r)
 299  		if err != nil {
 300  			return err
 301  		}
 302  
 303  		// Read how many frames are in each stack.
 304  		nFrames, err := binary.ReadUvarint(r)
 305  		if err != nil {
 306  			return err
 307  		}
 308  		if nFrames > tracev2.MaxFramesPerStack {
 309  			return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
 310  		}
 311  
 312  		// Each frame consists of 4 fields: pc, funcID (string), fileID (string), line.
 313  		frames := []uint64{:0:nFrames}
 314  		for i := uint64(0); i < nFrames; i++ {
 315  			// Read the frame data.
 316  			pc, err := binary.ReadUvarint(r)
 317  			if err != nil {
 318  				return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
 319  			}
 320  			funcID, err := binary.ReadUvarint(r)
 321  			if err != nil {
 322  				return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
 323  			}
 324  			fileID, err := binary.ReadUvarint(r)
 325  			if err != nil {
 326  				return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
 327  			}
 328  			line, err := binary.ReadUvarint(r)
 329  			if err != nil {
 330  				return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
 331  			}
 332  			frames = append(frames, pc)
 333  
 334  			if _, ok := pcs[pc]; !ok {
 335  				pcs[pc] = frame{
 336  					pc:     pc,
 337  					funcID: stringID(funcID),
 338  					fileID: stringID(fileID),
 339  					line:   line,
 340  				}
 341  			}
 342  		}
 343  
 344  		// Add the stack to the map.
 345  		if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
 346  			return err
 347  		}
 348  	}
 349  	return nil
 350  }
 351  
 352  // addCPUSamples takes a batch whose first byte is an EvCPUSamples event
 353  // (indicating that the batch contains only CPU samples) and adds each
 354  // sample contained therein to the provided samples list.
 355  func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
 356  	if !b.isCPUSamplesBatch() {
 357  		return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
 358  	}
 359  	r := bytes.NewReader(b.data)
 360  	hdr, err := r.ReadByte() // Consume the EvCPUSamples byte.
 361  	if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
 362  		return nil, fmt.Errorf("missing CPU samples batch header")
 363  	}
 364  
 365  	for r.Len() != 0 {
 366  		// Read the header.
 367  		ev, err := r.ReadByte()
 368  		if err != nil {
 369  			return nil, err
 370  		}
 371  		if tracev2.EventType(ev) != tracev2.EvCPUSample {
 372  			return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
 373  		}
 374  
 375  		// Read the sample's timestamp.
 376  		ts, err := binary.ReadUvarint(r)
 377  		if err != nil {
 378  			return nil, err
 379  		}
 380  
 381  		// Read the sample's M.
 382  		m, err := binary.ReadUvarint(r)
 383  		if err != nil {
 384  			return nil, err
 385  		}
 386  		mid := ThreadID(m)
 387  
 388  		// Read the sample's P.
 389  		p, err := binary.ReadUvarint(r)
 390  		if err != nil {
 391  			return nil, err
 392  		}
 393  		pid := ProcID(p)
 394  
 395  		// Read the sample's G.
 396  		g, err := binary.ReadUvarint(r)
 397  		if err != nil {
 398  			return nil, err
 399  		}
 400  		goid := GoID(g)
 401  		if g == 0 {
 402  			goid = NoGoroutine
 403  		}
 404  
 405  		// Read the sample's stack.
 406  		s, err := binary.ReadUvarint(r)
 407  		if err != nil {
 408  			return nil, err
 409  		}
 410  
 411  		// Add the sample to the slice.
 412  		samples = append(samples, cpuSample{
 413  			schedCtx: schedCtx{
 414  				M: mid,
 415  				P: pid,
 416  				G: goid,
 417  			},
 418  			time:  Time(ts), // N.B. this is really a "timestamp," not a Time.
 419  			stack: stackID(s),
 420  		})
 421  	}
 422  	return samples, nil
 423  }
 424  
 425  // sync holds the per-generation sync data.
 426  type sync struct {
 427  	freq             frequency
 428  	hasClockSnapshot bool
 429  	snapTime         timestamp
 430  	snapMono         uint64
 431  	snapWall         time.Time
 432  }
 433  
 434  func setSyncBatch(s *sync, b batch, ver version.Version) error {
 435  	if !b.isSyncBatch(ver) {
 436  		return fmt.Errorf("internal error: setSyncBatch called on non-sync batch")
 437  	}
 438  	r := bytes.NewReader(b.data)
 439  	if ver >= version.Go125 {
 440  		hdr, err := r.ReadByte() // Consume the EvSync byte.
 441  		if err != nil || tracev2.EventType(hdr) != tracev2.EvSync {
 442  			return fmt.Errorf("missing sync batch header")
 443  		}
 444  	}
 445  
 446  	lastTs := b.time
 447  	for r.Len() != 0 {
 448  		// Read the header
 449  		ev, err := r.ReadByte()
 450  		if err != nil {
 451  			return err
 452  		}
 453  		et := tracev2.EventType(ev)
 454  		switch {
 455  		case et == tracev2.EvFrequency:
 456  			if s.freq != 0 {
 457  				return fmt.Errorf("found multiple frequency events")
 458  			}
 459  			// Read the frequency. It'll come out as timestamp units per second.
 460  			f, err := binary.ReadUvarint(r)
 461  			if err != nil {
 462  				return err
 463  			}
 464  			// Convert to nanoseconds per timestamp unit.
 465  			s.freq = frequency(1.0 / (float64(f) / 1e9))
 466  		case et == tracev2.EvClockSnapshot && ver >= version.Go125:
 467  			if s.hasClockSnapshot {
 468  				return fmt.Errorf("found multiple clock snapshot events")
 469  			}
 470  			s.hasClockSnapshot = true
 471  			// Read the EvClockSnapshot arguments.
 472  			tdiff, err := binary.ReadUvarint(r)
 473  			if err != nil {
 474  				return err
 475  			}
 476  			lastTs += timestamp(tdiff)
 477  			s.snapTime = lastTs
 478  			mono, err := binary.ReadUvarint(r)
 479  			if err != nil {
 480  				return err
 481  			}
 482  			s.snapMono = mono
 483  			sec, err := binary.ReadUvarint(r)
 484  			if err != nil {
 485  				return err
 486  			}
 487  			nsec, err := binary.ReadUvarint(r)
 488  			if err != nil {
 489  				return err
 490  			}
 491  			// TODO(felixge): In theory we could inject s.snapMono into the time
 492  			// value below to make it comparable. But there is no API for this
 493  			// in the time package right now.
 494  			s.snapWall = time.Unix(int64(sec), int64(nsec))
 495  		default:
 496  			return fmt.Errorf("expected frequency or clock snapshot event, got %d", ev)
 497  		}
 498  	}
 499  	return nil
 500  }
 501  
 502  // addExperimentalBatch takes an experimental batch and adds it to the list of experimental
 503  // batches for the experiment its a part of.
 504  func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
 505  	if b.exp == tracev2.NoExperiment {
 506  		return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
 507  	}
 508  	expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
 509  		Thread: b.m,
 510  		Data:   b.data,
 511  	})
 512  	return nil
 513  }
 514