-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransport.go
120 lines (103 loc) · 3.2 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package lumigotracer
import (
"bytes"
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
)
type Transport struct {
rt http.RoundTripper
provider trace.TracerProvider
propagator propagation.TextMapPropagator
}
func NewTransport(transport http.RoundTripper) *Transport {
return &Transport{
rt: transport,
provider: otel.GetTracerProvider(),
propagator: otel.GetTextMapPropagator(),
}
}
func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
traceCtx, span := t.provider.Tracer("lumigo").Start(req.Context(), "HttpSpan")
req = req.WithContext(traceCtx)
span.SetAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req)...)
span.SetAttributes(semconv.HTTPTargetKey.String(req.URL.Path))
span.SetAttributes(semconv.HTTPHostKey.String(req.URL.Host))
t.propagator.Inject(traceCtx, propagation.HeaderCarrier(req.Header))
if req.Body != nil {
bodyBytes, bodyErr := io.ReadAll(req.Body)
if bodyErr != nil {
logger.WithError(bodyErr).Error("failed to parse request body")
}
span.SetAttributes(attribute.String("http.request_body", string(bodyBytes)))
// restore body
req.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
}
reqHeaders := make(map[string]string)
for k, values := range req.Header {
for _, value := range values {
reqHeaders[k] = value
}
}
headersJson, err := json.Marshal(reqHeaders)
if err != nil {
logger.WithError(err).Error("failed to fetch request headers")
}
span.SetAttributes(attribute.String("http.request_headers", string(headersJson)))
resp, err = t.rt.RoundTrip(req)
// response
span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...)
span.SetStatus(semconv.SpanStatusFromHTTPStatusCode(resp.StatusCode))
responseHeaders := make(map[string]string)
for k, values := range resp.Header {
for _, value := range values {
responseHeaders[k] = value
}
}
headersJson, jsonErr := json.Marshal(responseHeaders)
if jsonErr != nil {
logger.WithError(err).Error("failed to fetch response headers")
}
span.SetAttributes(attribute.String("http.response_headers", string(headersJson)))
if resp.Body != nil {
bodyBytes, bodyErr := io.ReadAll(resp.Body)
if bodyErr != nil {
logger.WithError(bodyErr).Error("failed to parse response body")
}
span.SetAttributes(attribute.String("http.response_body", string(bodyBytes)))
resp.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
}
resp.Body = &wrappedBody{ctx: traceCtx, span: span, body: resp.Body}
return resp, err
}
type wrappedBody struct {
ctx context.Context
span trace.Span
body io.ReadCloser
}
var _ io.ReadCloser = &wrappedBody{}
func (wb *wrappedBody) Read(b []byte) (int, error) {
n, err := wb.body.Read(b)
switch err {
case nil:
// nothing to do here but fall through to the return
case io.EOF:
wb.span.End()
default:
wb.span.RecordError(err)
wb.span.SetStatus(codes.Error, err.Error())
}
return n, err
}
func (wb *wrappedBody) Close() error {
wb.span.End()
return wb.body.Close()
}