transport.go raw

   1  // Copyright The OpenTelemetry Authors
   2  // SPDX-License-Identifier: Apache-2.0
   3  
   4  package otelhttp // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
   5  
   6  import (
   7  	"context"
   8  	"io"
   9  	"net/http"
  10  	"net/http/httptrace"
  11  	"sync/atomic"
  12  	"time"
  13  
  14  	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request"
  15  	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv"
  16  	"go.opentelemetry.io/otel"
  17  	"go.opentelemetry.io/otel/attribute"
  18  	"go.opentelemetry.io/otel/codes"
  19  	"go.opentelemetry.io/otel/propagation"
  20  
  21  	"go.opentelemetry.io/otel/trace"
  22  )
  23  
  24  // Transport implements the http.RoundTripper interface and wraps
  25  // outbound HTTP(S) requests with a span and enriches it with metrics.
  26  type Transport struct {
  27  	rt http.RoundTripper
  28  
  29  	tracer             trace.Tracer
  30  	propagators        propagation.TextMapPropagator
  31  	spanStartOptions   []trace.SpanStartOption
  32  	filters            []Filter
  33  	spanNameFormatter  func(string, *http.Request) string
  34  	clientTrace        func(context.Context) *httptrace.ClientTrace
  35  	metricAttributesFn func(*http.Request) []attribute.KeyValue
  36  
  37  	semconv semconv.HTTPClient
  38  }
  39  
  40  var _ http.RoundTripper = &Transport{}
  41  
  42  // NewTransport wraps the provided http.RoundTripper with one that
  43  // starts a span, injects the span context into the outbound request headers,
  44  // and enriches it with metrics.
  45  //
  46  // If the provided http.RoundTripper is nil, http.DefaultTransport will be used
  47  // as the base http.RoundTripper.
  48  func NewTransport(base http.RoundTripper, opts ...Option) *Transport {
  49  	if base == nil {
  50  		base = http.DefaultTransport
  51  	}
  52  
  53  	t := Transport{
  54  		rt: base,
  55  	}
  56  
  57  	defaultOpts := []Option{
  58  		WithSpanOptions(trace.WithSpanKind(trace.SpanKindClient)),
  59  		WithSpanNameFormatter(defaultTransportFormatter),
  60  	}
  61  
  62  	c := newConfig(append(defaultOpts, opts...)...)
  63  	t.applyConfig(c)
  64  
  65  	return &t
  66  }
  67  
  68  func (t *Transport) applyConfig(c *config) {
  69  	t.tracer = c.Tracer
  70  	t.propagators = c.Propagators
  71  	t.spanStartOptions = c.SpanStartOptions
  72  	t.filters = c.Filters
  73  	t.spanNameFormatter = c.SpanNameFormatter
  74  	t.clientTrace = c.ClientTrace
  75  	t.semconv = semconv.NewHTTPClient(c.Meter)
  76  	t.metricAttributesFn = c.MetricAttributesFn
  77  }
  78  
  79  func defaultTransportFormatter(_ string, r *http.Request) string {
  80  	return "HTTP " + r.Method
  81  }
  82  
  83  // RoundTrip creates a Span and propagates its context via the provided request's headers
  84  // before handing the request to the configured base RoundTripper. The created span will
  85  // end when the response body is closed or when a read from the body returns io.EOF.
  86  func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
  87  	requestStartTime := time.Now()
  88  	for _, f := range t.filters {
  89  		if !f(r) {
  90  			// Simply pass through to the base RoundTripper if a filter rejects the request
  91  			return t.rt.RoundTrip(r)
  92  		}
  93  	}
  94  
  95  	tracer := t.tracer
  96  
  97  	if tracer == nil {
  98  		if span := trace.SpanFromContext(r.Context()); span.SpanContext().IsValid() {
  99  			tracer = newTracer(span.TracerProvider())
 100  		} else {
 101  			tracer = newTracer(otel.GetTracerProvider())
 102  		}
 103  	}
 104  
 105  	opts := append([]trace.SpanStartOption{}, t.spanStartOptions...) // start with the configured options
 106  
 107  	ctx, span := tracer.Start(r.Context(), t.spanNameFormatter("", r), opts...)
 108  
 109  	if t.clientTrace != nil {
 110  		ctx = httptrace.WithClientTrace(ctx, t.clientTrace(ctx))
 111  	}
 112  
 113  	labeler, found := LabelerFromContext(ctx)
 114  	if !found {
 115  		ctx = ContextWithLabeler(ctx, labeler)
 116  	}
 117  
 118  	r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request.
 119  
 120  	// if request body is nil or NoBody, we don't want to mutate the body as it
 121  	// will affect the identity of it in an unforeseeable way because we assert
 122  	// ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
 123  	bw := request.NewBodyWrapper(r.Body, func(int64) {})
 124  	if r.Body != nil && r.Body != http.NoBody {
 125  		r.Body = bw
 126  	}
 127  
 128  	span.SetAttributes(t.semconv.RequestTraceAttrs(r)...)
 129  	t.propagators.Inject(ctx, propagation.HeaderCarrier(r.Header))
 130  
 131  	res, err := t.rt.RoundTrip(r)
 132  	if err != nil {
 133  		// set error type attribute if the error is part of the predefined
 134  		// error types.
 135  		// otherwise, record it as an exception
 136  		if errType := t.semconv.ErrorType(err); errType.Valid() {
 137  			span.SetAttributes(errType)
 138  		} else {
 139  			span.RecordError(err)
 140  		}
 141  
 142  		span.SetStatus(codes.Error, err.Error())
 143  		span.End()
 144  		return res, err
 145  	}
 146  
 147  	// metrics
 148  	metricOpts := t.semconv.MetricOptions(semconv.MetricAttributes{
 149  		Req:                  r,
 150  		StatusCode:           res.StatusCode,
 151  		AdditionalAttributes: append(labeler.Get(), t.metricAttributesFromRequest(r)...),
 152  	})
 153  
 154  	// For handling response bytes we leverage a callback when the client reads the http response
 155  	readRecordFunc := func(n int64) {
 156  		t.semconv.RecordResponseSize(ctx, n, metricOpts)
 157  	}
 158  
 159  	// traces
 160  	span.SetAttributes(t.semconv.ResponseTraceAttrs(res)...)
 161  	span.SetStatus(t.semconv.Status(res.StatusCode))
 162  
 163  	res.Body = newWrappedBody(span, readRecordFunc, res.Body)
 164  
 165  	// Use floating point division here for higher precision (instead of Millisecond method).
 166  	elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
 167  
 168  	t.semconv.RecordMetrics(ctx, semconv.MetricData{
 169  		RequestSize: bw.BytesRead(),
 170  		ElapsedTime: elapsedTime,
 171  	}, metricOpts)
 172  
 173  	return res, nil
 174  }
 175  
 176  func (t *Transport) metricAttributesFromRequest(r *http.Request) []attribute.KeyValue {
 177  	var attributeForRequest []attribute.KeyValue
 178  	if t.metricAttributesFn != nil {
 179  		attributeForRequest = t.metricAttributesFn(r)
 180  	}
 181  	return attributeForRequest
 182  }
 183  
 184  // newWrappedBody returns a new and appropriately scoped *wrappedBody as an
 185  // io.ReadCloser. If the passed body implements io.Writer, the returned value
 186  // will implement io.ReadWriteCloser.
 187  func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser {
 188  	// The successful protocol switch responses will have a body that
 189  	// implement an io.ReadWriteCloser. Ensure this interface type continues
 190  	// to be satisfied if that is the case.
 191  	if _, ok := body.(io.ReadWriteCloser); ok {
 192  		return &wrappedBody{span: span, record: record, body: body}
 193  	}
 194  
 195  	// Remove the implementation of the io.ReadWriteCloser and only implement
 196  	// the io.ReadCloser.
 197  	return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}}
 198  }
 199  
 200  // wrappedBody is the response body type returned by the transport
 201  // instrumentation to complete a span. Errors encountered when using the
 202  // response body are recorded in span tracking the response.
 203  //
 204  // The span tracking the response is ended when this body is closed.
 205  //
 206  // If the response body implements the io.Writer interface (i.e. for
 207  // successful protocol switches), the wrapped body also will.
 208  type wrappedBody struct {
 209  	span     trace.Span
 210  	recorded atomic.Bool
 211  	record   func(n int64)
 212  	body     io.ReadCloser
 213  	read     atomic.Int64
 214  }
 215  
 216  var _ io.ReadWriteCloser = &wrappedBody{}
 217  
 218  func (wb *wrappedBody) Write(p []byte) (int, error) {
 219  	// This will not panic given the guard in newWrappedBody.
 220  	n, err := wb.body.(io.Writer).Write(p)
 221  	if err != nil {
 222  		wb.span.RecordError(err)
 223  		wb.span.SetStatus(codes.Error, err.Error())
 224  	}
 225  	return n, err
 226  }
 227  
 228  func (wb *wrappedBody) Read(b []byte) (int, error) {
 229  	n, err := wb.body.Read(b)
 230  	// Record the number of bytes read
 231  	wb.read.Add(int64(n))
 232  
 233  	switch err {
 234  	case nil:
 235  		// nothing to do here but fall through to the return
 236  	case io.EOF:
 237  		wb.recordBytesRead()
 238  		wb.span.End()
 239  	default:
 240  		wb.span.RecordError(err)
 241  		wb.span.SetStatus(codes.Error, err.Error())
 242  	}
 243  	return n, err
 244  }
 245  
 246  // recordBytesRead is a function that ensures the number of bytes read is recorded once and only once.
 247  func (wb *wrappedBody) recordBytesRead() {
 248  	// note: it is more performant (and equally correct) to use atomic.Bool over sync.Once here. In the event that
 249  	// two goroutines are racing to call this method, the number of bytes read will no longer increase. Using
 250  	// CompareAndSwap allows later goroutines to return quickly and not block waiting for the race winner to finish
 251  	// calling wb.record(wb.read.Load()).
 252  	if wb.recorded.CompareAndSwap(false, true) {
 253  		// Record the total number of bytes read
 254  		wb.record(wb.read.Load())
 255  	}
 256  }
 257  
 258  func (wb *wrappedBody) Close() error {
 259  	wb.recordBytesRead()
 260  	wb.span.End()
 261  	if wb.body != nil {
 262  		return wb.body.Close()
 263  	}
 264  	return nil
 265  }
 266