batch.mx raw

   1  // Copyright 2023 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package trace
   6  
   7  import (
   8  	"bytes"
   9  	"encoding/binary"
  10  	"fmt"
  11  	"io"
  12  
  13  	"internal/trace/tracev2"
  14  	"internal/trace/version"
  15  )
  16  
  17  // timestamp is an unprocessed timestamp.
  18  type timestamp uint64
  19  
  20  // batch represents a batch of trace events.
  21  // It is unparsed except for its header.
  22  type batch struct {
  23  	m    ThreadID
  24  	time timestamp
  25  	data []byte
  26  	exp  tracev2.Experiment
  27  }
  28  
  29  func (b *batch) isStringsBatch() bool {
  30  	return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStrings
  31  }
  32  
  33  func (b *batch) isStacksBatch() bool {
  34  	return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStacks
  35  }
  36  
  37  func (b *batch) isCPUSamplesBatch() bool {
  38  	return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvCPUSamples
  39  }
  40  
  41  func (b *batch) isSyncBatch(ver version.Version) bool {
  42  	return (b.exp == tracev2.NoExperiment && len(b.data) > 0) &&
  43  		((tracev2.EventType(b.data[0]) == tracev2.EvFrequency && ver < version.Go125) ||
  44  			(tracev2.EventType(b.data[0]) == tracev2.EvSync && ver >= version.Go125))
  45  }
  46  
  47  // readBatch reads the next full batch from r.
  48  func readBatch(r interface {
  49  	io.Reader
  50  	io.ByteReader
  51  }) (batch, uint64, error) {
  52  	// Read batch header byte.
  53  	b, err := r.ReadByte()
  54  	if err != nil {
  55  		return batch{}, 0, err
  56  	}
  57  	if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
  58  		return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
  59  	}
  60  
  61  	// Read the experiment of we have one.
  62  	exp := tracev2.NoExperiment
  63  	if tracev2.EventType(b) == tracev2.EvExperimentalBatch {
  64  		e, err := r.ReadByte()
  65  		if err != nil {
  66  			return batch{}, 0, err
  67  		}
  68  		exp = tracev2.Experiment(e)
  69  	}
  70  
  71  	// Read the batch header: gen (generation), thread (M) ID, base timestamp
  72  	// for the batch.
  73  	gen, err := binary.ReadUvarint(r)
  74  	if err != nil {
  75  		return batch{}, gen, fmt.Errorf("error reading batch gen: %w", err)
  76  	}
  77  	m, err := binary.ReadUvarint(r)
  78  	if err != nil {
  79  		return batch{}, gen, fmt.Errorf("error reading batch M ID: %w", err)
  80  	}
  81  	ts, err := binary.ReadUvarint(r)
  82  	if err != nil {
  83  		return batch{}, gen, fmt.Errorf("error reading batch timestamp: %w", err)
  84  	}
  85  
  86  	// Read in the size of the batch to follow.
  87  	size, err := binary.ReadUvarint(r)
  88  	if err != nil {
  89  		return batch{}, gen, fmt.Errorf("error reading batch size: %w", err)
  90  	}
  91  	if size > tracev2.MaxBatchSize {
  92  		return batch{}, gen, fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
  93  	}
  94  
  95  	// Copy out the batch for later processing.
  96  	var data bytes.Buffer
  97  	data.Grow(int(size))
  98  	n, err := io.CopyN(&data, r, int64(size))
  99  	if n != int64(size) {
 100  		return batch{}, gen, fmt.Errorf("failed to read full batch: read %d but wanted %d", n, size)
 101  	}
 102  	if err != nil {
 103  		return batch{}, gen, fmt.Errorf("copying batch data: %w", err)
 104  	}
 105  
 106  	// Return the batch.
 107  	return batch{
 108  		m:    ThreadID(m),
 109  		time: timestamp(ts),
 110  		data: data.Bytes(),
 111  		exp:  exp,
 112  	}, gen, nil
 113  }
 114