1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
3 4 package otelhttp // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
5 6 import (
7 "context"
8 "io"
9 "net/http"
10 "net/http/httptrace"
11 "sync/atomic"
12 "time"
13 14 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request"
15 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv"
16 "go.opentelemetry.io/otel"
17 "go.opentelemetry.io/otel/attribute"
18 "go.opentelemetry.io/otel/codes"
19 "go.opentelemetry.io/otel/propagation"
20 21 "go.opentelemetry.io/otel/trace"
22 )
23 24 // Transport implements the http.RoundTripper interface and wraps
25 // outbound HTTP(S) requests with a span and enriches it with metrics.
26 type Transport struct {
27 rt http.RoundTripper
28 29 tracer trace.Tracer
30 propagators propagation.TextMapPropagator
31 spanStartOptions []trace.SpanStartOption
32 filters []Filter
33 spanNameFormatter func(string, *http.Request) string
34 clientTrace func(context.Context) *httptrace.ClientTrace
35 metricAttributesFn func(*http.Request) []attribute.KeyValue
36 37 semconv semconv.HTTPClient
38 }
39 40 var _ http.RoundTripper = &Transport{}
41 42 // NewTransport wraps the provided http.RoundTripper with one that
43 // starts a span, injects the span context into the outbound request headers,
44 // and enriches it with metrics.
45 //
46 // If the provided http.RoundTripper is nil, http.DefaultTransport will be used
47 // as the base http.RoundTripper.
48 func NewTransport(base http.RoundTripper, opts ...Option) *Transport {
49 if base == nil {
50 base = http.DefaultTransport
51 }
52 53 t := Transport{
54 rt: base,
55 }
56 57 defaultOpts := []Option{
58 WithSpanOptions(trace.WithSpanKind(trace.SpanKindClient)),
59 WithSpanNameFormatter(defaultTransportFormatter),
60 }
61 62 c := newConfig(append(defaultOpts, opts...)...)
63 t.applyConfig(c)
64 65 return &t
66 }
67 68 func (t *Transport) applyConfig(c *config) {
69 t.tracer = c.Tracer
70 t.propagators = c.Propagators
71 t.spanStartOptions = c.SpanStartOptions
72 t.filters = c.Filters
73 t.spanNameFormatter = c.SpanNameFormatter
74 t.clientTrace = c.ClientTrace
75 t.semconv = semconv.NewHTTPClient(c.Meter)
76 t.metricAttributesFn = c.MetricAttributesFn
77 }
78 79 func defaultTransportFormatter(_ string, r *http.Request) string {
80 return "HTTP " + r.Method
81 }
82 83 // RoundTrip creates a Span and propagates its context via the provided request's headers
84 // before handing the request to the configured base RoundTripper. The created span will
85 // end when the response body is closed or when a read from the body returns io.EOF.
86 func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
87 requestStartTime := time.Now()
88 for _, f := range t.filters {
89 if !f(r) {
90 // Simply pass through to the base RoundTripper if a filter rejects the request
91 return t.rt.RoundTrip(r)
92 }
93 }
94 95 tracer := t.tracer
96 97 if tracer == nil {
98 if span := trace.SpanFromContext(r.Context()); span.SpanContext().IsValid() {
99 tracer = newTracer(span.TracerProvider())
100 } else {
101 tracer = newTracer(otel.GetTracerProvider())
102 }
103 }
104 105 opts := append([]trace.SpanStartOption{}, t.spanStartOptions...) // start with the configured options
106 107 ctx, span := tracer.Start(r.Context(), t.spanNameFormatter("", r), opts...)
108 109 if t.clientTrace != nil {
110 ctx = httptrace.WithClientTrace(ctx, t.clientTrace(ctx))
111 }
112 113 labeler, found := LabelerFromContext(ctx)
114 if !found {
115 ctx = ContextWithLabeler(ctx, labeler)
116 }
117 118 r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request.
119 120 // if request body is nil or NoBody, we don't want to mutate the body as it
121 // will affect the identity of it in an unforeseeable way because we assert
122 // ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
123 bw := request.NewBodyWrapper(r.Body, func(int64) {})
124 if r.Body != nil && r.Body != http.NoBody {
125 r.Body = bw
126 }
127 128 span.SetAttributes(t.semconv.RequestTraceAttrs(r)...)
129 t.propagators.Inject(ctx, propagation.HeaderCarrier(r.Header))
130 131 res, err := t.rt.RoundTrip(r)
132 if err != nil {
133 // set error type attribute if the error is part of the predefined
134 // error types.
135 // otherwise, record it as an exception
136 if errType := t.semconv.ErrorType(err); errType.Valid() {
137 span.SetAttributes(errType)
138 } else {
139 span.RecordError(err)
140 }
141 142 span.SetStatus(codes.Error, err.Error())
143 span.End()
144 return res, err
145 }
146 147 // metrics
148 metricOpts := t.semconv.MetricOptions(semconv.MetricAttributes{
149 Req: r,
150 StatusCode: res.StatusCode,
151 AdditionalAttributes: append(labeler.Get(), t.metricAttributesFromRequest(r)...),
152 })
153 154 // For handling response bytes we leverage a callback when the client reads the http response
155 readRecordFunc := func(n int64) {
156 t.semconv.RecordResponseSize(ctx, n, metricOpts)
157 }
158 159 // traces
160 span.SetAttributes(t.semconv.ResponseTraceAttrs(res)...)
161 span.SetStatus(t.semconv.Status(res.StatusCode))
162 163 res.Body = newWrappedBody(span, readRecordFunc, res.Body)
164 165 // Use floating point division here for higher precision (instead of Millisecond method).
166 elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
167 168 t.semconv.RecordMetrics(ctx, semconv.MetricData{
169 RequestSize: bw.BytesRead(),
170 ElapsedTime: elapsedTime,
171 }, metricOpts)
172 173 return res, nil
174 }
175 176 func (t *Transport) metricAttributesFromRequest(r *http.Request) []attribute.KeyValue {
177 var attributeForRequest []attribute.KeyValue
178 if t.metricAttributesFn != nil {
179 attributeForRequest = t.metricAttributesFn(r)
180 }
181 return attributeForRequest
182 }
183 184 // newWrappedBody returns a new and appropriately scoped *wrappedBody as an
185 // io.ReadCloser. If the passed body implements io.Writer, the returned value
186 // will implement io.ReadWriteCloser.
187 func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser {
188 // The successful protocol switch responses will have a body that
189 // implement an io.ReadWriteCloser. Ensure this interface type continues
190 // to be satisfied if that is the case.
191 if _, ok := body.(io.ReadWriteCloser); ok {
192 return &wrappedBody{span: span, record: record, body: body}
193 }
194 195 // Remove the implementation of the io.ReadWriteCloser and only implement
196 // the io.ReadCloser.
197 return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}}
198 }
199 200 // wrappedBody is the response body type returned by the transport
201 // instrumentation to complete a span. Errors encountered when using the
202 // response body are recorded in span tracking the response.
203 //
204 // The span tracking the response is ended when this body is closed.
205 //
206 // If the response body implements the io.Writer interface (i.e. for
207 // successful protocol switches), the wrapped body also will.
208 type wrappedBody struct {
209 span trace.Span
210 recorded atomic.Bool
211 record func(n int64)
212 body io.ReadCloser
213 read atomic.Int64
214 }
215 216 var _ io.ReadWriteCloser = &wrappedBody{}
217 218 func (wb *wrappedBody) Write(p []byte) (int, error) {
219 // This will not panic given the guard in newWrappedBody.
220 n, err := wb.body.(io.Writer).Write(p)
221 if err != nil {
222 wb.span.RecordError(err)
223 wb.span.SetStatus(codes.Error, err.Error())
224 }
225 return n, err
226 }
227 228 func (wb *wrappedBody) Read(b []byte) (int, error) {
229 n, err := wb.body.Read(b)
230 // Record the number of bytes read
231 wb.read.Add(int64(n))
232 233 switch err {
234 case nil:
235 // nothing to do here but fall through to the return
236 case io.EOF:
237 wb.recordBytesRead()
238 wb.span.End()
239 default:
240 wb.span.RecordError(err)
241 wb.span.SetStatus(codes.Error, err.Error())
242 }
243 return n, err
244 }
245 246 // recordBytesRead is a function that ensures the number of bytes read is recorded once and only once.
247 func (wb *wrappedBody) recordBytesRead() {
248 // note: it is more performant (and equally correct) to use atomic.Bool over sync.Once here. In the event that
249 // two goroutines are racing to call this method, the number of bytes read will no longer increase. Using
250 // CompareAndSwap allows later goroutines to return quickly and not block waiting for the race winner to finish
251 // calling wb.record(wb.read.Load()).
252 if wb.recorded.CompareAndSwap(false, true) {
253 // Record the total number of bytes read
254 wb.record(wb.read.Load())
255 }
256 }
257 258 func (wb *wrappedBody) Close() error {
259 wb.recordBytesRead()
260 wb.span.End()
261 if wb.body != nil {
262 return wb.body.Close()
263 }
264 return nil
265 }
266