Skip to content

Commit

Permalink
WIP, resty
Browse files Browse the repository at this point in the history
  • Loading branch information
muir committed Oct 2, 2022
1 parent a49a3ef commit 8a3540b
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 37 deletions.
4 changes: 4 additions & 0 deletions xopconst/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ var (
SpanTypeHTTPClientRequest = SpanType.Iota("REST")
SpanTypeCronJob = SpanType.Iota("cron_job")
)

var RemoteTrace = xopat.Make{Key: "http.remote_trace", Namespace: "xop", Indexed: true, Prominence: 40,
Description: "The traceID and spanID for for the remote side of a outgoing HTTP request, if known"}.
LinkAttribute()
210 changes: 173 additions & 37 deletions xopresty/resty.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,36 @@
package xopresty adds to the resty package to
propagate xop context to through an HTTP request.
The resty package does not provide a clean way to
pass in a logger or context. To get around this
we'll need a fresh resty client for each request.
The resty package does not provide a way to have a
logger that knows which request it is logging about.
The resty package does not provide a way to know when
requests are complete.
Another challenge with resty is that the resty Logger
is per-Client, not per-Requet.
There are open pull requests to solve both of these
issues. In the meantime, this package depends upon
https://github.com/muir/resty.
Thiw would all be simpler if there was a Copy method
for resty Clients, but there isn't.
The agumented resty Client requires that a context that
has the parent log span be provided:
client.R().SetContext(log.IntoContext(context.Background()))
If there is no logger in the context, the request will fail.
*/
package xopresty

import (
"context"
"fmt"
"strings"
"time"

"github.com/muir/xop-go"
"github.com/muir/xop-go/trace"
"github.com/muir/xop-go/xopconst"

"github.com/go-resty/resty/v2"
"github.com/muir/resty"
"github.com/pkg/errors"
)

var _ resty.Logger = restyLogger{}
Expand All @@ -30,25 +44,117 @@ func (rl restyLogger) Errorf(format string, v ...interface{}) { rl.log.Error().M
func (rl restyLogger) Warnf(format string, v ...interface{}) { rl.log.Warn().Msgf(format, v...) }
func (rl restyLogger) Debugf(format string, v ...interface{}) { rl.log.Debug().Msgf(format, v...) }

func wrap(log *xop.Log, client *resty.Client, description string) *resty.Client {
type contextKeyType struct{}

var contextKey = contextKeyType{}

type contextNameType struct{}

var contextNameKey = contextNameType{}

type contextValue struct {
b3Sent bool
b3Trace trace.Trace
response bool
log *xop.Log
retryCount int
originalStartTime time.Time
}

type config struct {
requestToName func(r *resty.Request) string
extraLogging ExtraLogging
}

type ClientOpt func(*config)

// WithNameGenerate provides a function to convert a request into
// a description for the span.
func WithNameGenerate(f func(*resty.Request) string) ClientOpt {
return func(config *config) {
config.requestToName = f
}
}

// ExtraLogging provides a hook for extra logging to be done.
// It is possible that the response parameter will be null.
// If error is not null, then the request has failed.
// ExtraLogging should only be called once but if another resty
// callback panic's, it is possible ExtraLogging will be called
// twice.
type ExtraLogging func(log *xop.Log, originalStartTime time.Time, retryCount int, request *resty.Request, response *resty.Response, err error)

func WithExtraLogging(f ExtraLogging) ClientOpt {
return func(config *config) {
config.extraLogging = f
}
}

// WithNameInDescription adds a span name to a context. If present,
// a name in context overrides WithNameGenerate.
func WithNameInContext(ctx context.Context, nameOrDescription string) context.Context {
return context.WithValue(ctx, contextNameKey, nameOrDescription)
}

func Client(client *resty.Client, opts ...ClientOpt) *resty.Client {
config := &config{
requestToName: func(r *resty.Request) string {
url := r.URL
i := strings.IndexByte(url, '?')
if i != -1 {
url = url[:i]
}
return r.Method + " " + url
},
extraLogging: func(log *xop.Log, originalStartTime time.Time, retryCount int, request *resty.Request, response *resty.Response, err error) {
},
}
for _, f := range opts {
f(config)
}
log = log.Sub().Step(description)
var b3Sent bool
var b3Trace trace.Trace
var response bool
defer func() {
if b3Sent && !response {
log.Info().Link("span.far_side_id", b3Trace).Static("span id set with B3")
log.Span().Link("span.far_side_id", b3Trace)
}
log.Done()
}
}()

// c := *client
// c.Header = client.Header.Clone()
// clinet = &c
return client.
SetLogger(restyLogger{log: log}).
OnBeforeRequest(func(_ *Client, r *Request) error {
OnBeforeRequest(func(_ *Client, r *resty.Request) error {
// OnBeforeRequest can execute multiple times for the same attempt if there
// are retries. It won't execute at all of the request is invalid.
ctx := r.Context()
cvRaw := ctx.Value(contextKey)
var cv *contextValue
if cvRaw != nil {
cv = cvRaw.(*contextValue)
cv.retryCount++
return nil
}
log, ok := xop.FromContext(r.Context())
if !ok {
return errors.Errorf("context is missing logger, use Request.SetContext(Log.IntoContext(request.Context()))")
}
nameRaw := ctx.Value(contextNameKey)
var name string
if nameRaw != nil {
name = nameRaw.(string)
} else {
name = config.RequestToName()
}
log = log.Sub().Step(name)
cv = &contextValue{
originalStartTime: time.Now(),
log: log,
}
r.SetContext(context.WithValue(ctx, contextKey, &cv))
r.SetLogger(restyLogger{log: log})

log.Span().EmbeddedEnum(xopconst.SpanTypeHTTPClientRequest)
log.Span().String(xopconst.URL, r.URL.String())
log.Span().String(xopconst.HTTPMethod, r.Method)
Expand All @@ -59,45 +165,75 @@ func wrap(log *xop.Log, client *resty.Client, description string) *resty.Client
if !log.Span().TraceState().IsZero() {
r.Header.Set("state", log.Span().TraceState().String())
}
if log.Config().UseB3 {
if log.Config().UseB3 {
b3Trace := log.Span().Bundle().Trace
b3Trace.SpanID().SetRandom()
r.Header.Set("b3",
b3Trace.GetTraceID().String()+"-"+
b3Trace.TraceID().String()+"-"+
b3Trace.GetFlags().String()[1:2]+"-"+
log.Span().Trace().GetSpanID().String())
b3Trace.TraceID().String()+"-"+
b3Trace.GetFlags().String()[1:2]+"-"+
log.Span().Trace().GetSpanID().String())
cv.b3Trace = b3Trace
cv.b3Sent = true
}
return nil
}).
OnAfterResponse(func(_ *Client, r *Response) error {
tr := r.Header().Get("traceresponse")
OnAfterResponse(func(_ *Client, resp *Response) error {
// OnAfterRequest is run for each individual request attempt
ctx := resp.Request.Context()
cvRaw := ctx.Value(contextKey)
var cv *contextValue
if cvRaw == nil {
return fmt.Errorf("xopresty: internal error, context missing in response")
}
cv = cvRaw.(*contextValue)
log := cv.log

tr := resp.Header().Get("traceresponse")
if tr != "" {
trace, ok := trace.TraceFromString(tr)
if ok {
if ok {
response = true
log.Info().Link("span.far_side_id", trace).Static("traceresponse")
log.Span().Link("span.far_side_id", trace)
log.Info().Link(xopconst.RemoteTrace.Key(), trace).Static("traceresponse")
log.Span().Link(xopconst.RemoteTrace, trace)
} else {
log.Warn().String("header", tr).Static("invalid traceresponse received")
}
}
if res != nil {
log.Info().Any("response", r.Result())
}
ti := r.Request.TraceInfo()
log.Info().
Duration("request_time.total", ti.TotalTime).
Duration("request_time.server", ti.ServerTime).
Duration("request_time.dns", ti.DNSLookup).
Static("timings")


ti := resp.Request.TraceInfo()
if ti.TotalTime != 0 {
log.Info().
Duration("request_time.total", ti.TotalTime).
Duration("request_time.server", ti.ServerTime).
Duration("request_time.dns", ti.DNSLookup).
Static("timings")
}
return nil
}).
EnableTrace().




OnError(func(r *resty.Request, err error) {
ctx := r.Context()
cv := ctx.Value(contextKey).(*contextValue)
log := cv.log
var re *resty.ResponseError
if errors.As(err, &re) {
config.extraLogging(log, cv.originalStartTime, cv.retryCount, r, re.Response, re.Err)
} else {
config.extraLogging(log, cv.originalStartTime, cv.retryCount, r, nil, err)
}
}).
OnPanic(func(r *resty.Request, err error) {
ctx := r.Context()
cv := ctx.Value(contextKey).(*contextValue)
log := cv.log
config.extraLogging(log, cv.originalStartTime, cv.retryCount, r, nil, err)
}).
OnSuccess(func(c *resty.Client, resp *resty.Response) {
ctx := r.Context()
cv := ctx.Value(contextKey).(*contextValue)
log := cv.log
config.extraLogging(log, cv.originalStartTime, cv.retryCount, resp.Request, resp, nil)
})
}
32 changes: 32 additions & 0 deletions xopresty/resty_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package xopresty_test

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/muir/xop-go"
"github.com/muir/xop-go/xopmiddle"
"github.com/muir/xop-go/xopresty"
"github.com/muir/xop-go/xoptest"
)

func TestXopResty(t *testing.T) {
tLog := xoptest.New(t)
seed := xop.Seed(xop.WithBase(tLog))
log := seed.Request("client")

client := resty.NewClient()
client = xopresty.Client(client)
ctx := log.IntoContext(context.Background())

inbound := xopmiddle.New(seed, func(r *http.Request) string {
return r.Method
})

ts := httptest.NewServer(inbound.HandlerMiddlewareFunc()(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "generall broken", 500)
}))
defer ts.Close()
}

0 comments on commit 8a3540b

Please sign in to comment.