Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing logs for Nexus HTTP request retries #7186

Merged
merged 9 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,18 @@ used when the first cache layer has a miss. Requires server restart for change t
30*time.Second,
`The TTL of the Nexus endpoint registry's readthrough LRU cache - the cache is a secondary cache and is only
used when the first cache layer has a miss. Requires server restart for change to be applied.`,
)
NexusHTTPTraceMinAttempt = NewNamespaceIntSetting(
"system.nexusHTTPTraceMinAttempt",
2,
`The minimum attempt of a Nexus request which will log additional HTTP request tracing information.
WARNING: setting to 0 or 1 will log additional tracing information for ALL Nexus HTTP requests and may be expensive.`,
)
NexusHTTPTraceMaxAttempt = NewNamespaceIntSetting(
"system.nexusHTTPTraceMaxAttempt",
5,
`The maximum attempt of a Nexus request which will log additional HTTP request tracing information.
Set to 0 to disable tracing.`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be disabled by default. Generally, I don't think the attempt number matters here, it's more about what to trace IMHO. E.g. whether TLSHandshakeStart should be traced or not. Ideally the config would be for a set of methods, but It'd be hard to express in dynamic config as a set without adding a lot of overhead. We could probably provide a config per method we support intercepting though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think attempt count is useful for limiting the overhead and the overall log spam. Without a notion of attempt count do you imagine we would log trace info for every retry?

Instead of adding a dynamic config per-hook, we could just use fx to provide the httptrace.ClientTrace. Then anyone who wants to change the behavior could do so with fx.replace

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, agree that adding the counts is nice, also requested that we add the dynamic config and make it global so it can be efficiently cached (a limitation of the current DC implementation).

)
FrontendNexusRequestHeadersBlacklist = NewGlobalTypedSetting(
"frontend.nexusRequestHeadersBlacklist",
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func Timestamp(timestamp time.Time) ZapTag {
return NewTimeTag("timestamp", timestamp)
}

// RequestID returns tag for RequestID
func RequestID(requestID string) ZapTag {
return NewStringTag("request-id", requestID)
}

// ========== Workflow tags defined here: ( wf is short for workflow) ==========

// WorkflowAction returns tag for WorkflowAction
Expand Down
101 changes: 101 additions & 0 deletions common/nexus/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// The MIT License
//
// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package nexus

import (
"crypto/tls"
"net/http/httptrace"
"time"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
)

// NewHTTPClientTrace returns a *httptrace.ClientTrace which adds additional logging information at each point in the
// HTTP request lifecycle. This trace must be added to the HTTP request context using httptrace.WithClientTrace for
// the logging hooks to be invoked. The provided logger should already be tagged with relevant request information
// e.g. using log.With(logger, tag.RequestID(id), tag.Operation(op), ...).
func NewHTTPClientTrace(logger log.Logger) *httptrace.ClientTrace {
logger.Info("starting trace for Nexus HTTP request")
return &httptrace.ClientTrace{
GetConn: func(hostPort string) {
logger.Info("attempting to get HTTP connection for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Address(hostPort))
},
GotConn: func(info httptrace.GotConnInfo) {
logger.Info("got HTTP connection for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.NewBoolTag("reused", info.Reused),
tag.NewBoolTag("was-idle", info.WasIdle),
tag.NewDurationTag("idle-time", info.IdleTime))
},
ConnectStart: func(network, addr string) {
logger.Info("starting dial for new connection for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Address(addr),
tag.NewStringTag("network", network))
},
ConnectDone: func(network, addr string, err error) {
logger.Info("finished dial for new connection for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Address(addr),
tag.NewStringTag("network", network),
tag.Error(err))
},
DNSStart: func(info httptrace.DNSStartInfo) {
logger.Info("starting DNS lookup for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Host(info.Host))
},
DNSDone: func(info httptrace.DNSDoneInfo) {
addresses := make([]string, len(info.Addrs))
for i, a := range info.Addrs {
addresses[i] = a.String()
}
logger.Info("finished DNS lookup for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Addresses(addresses),
tag.Error(info.Err),
tag.NewBoolTag("coalesced", info.Coalesced))
},
TLSHandshakeStart: func() {
logger.Info("starting TLS handshake for Nexus request", tag.Timestamp(time.Now().UTC()))
},
TLSHandshakeDone: func(state tls.ConnectionState, err error) {
logger.Info("finished TLS handshake for Nexus request",
tag.Timestamp(time.Now().UTC()),
// TODO: consider other state info
tag.NewBoolTag("handshake-complete", state.HandshakeComplete),
tag.Error(err))
},
WroteRequest: func(info httptrace.WroteRequestInfo) {
logger.Info("finished writing Nexus HTTP request",
tag.Timestamp(time.Now().UTC()),
tag.Error(info.Err))
},
GotFirstResponseByte: func() {
logger.Info("got response to Nexus HTTP request", tag.AttemptEnd(time.Now().UTC()))
},
}
}
10 changes: 7 additions & 3 deletions components/callbacks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
)

type Config struct {
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
RetryPolicy func() backoff.RetryPolicy
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
HTTPTraceMinAttempt dynamicconfig.IntPropertyFnWithNamespaceFilter
HTTPTraceMaxAttempt dynamicconfig.IntPropertyFnWithNamespaceFilter
RetryPolicy func() backoff.RetryPolicy
}

func ConfigProvider(dc *dynamicconfig.Collection) *Config {
return &Config{
RequestTimeout: RequestTimeout.Get(dc),
RequestTimeout: RequestTimeout.Get(dc),
HTTPTraceMinAttempt: dynamicconfig.NexusHTTPTraceMinAttempt.Get(dc),
HTTPTraceMaxAttempt: dynamicconfig.NexusHTTPTraceMaxAttempt.Get(dc),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(
RetryPolicyInitialInterval.Get(dc)(),
Expand Down
3 changes: 3 additions & 0 deletions components/callbacks/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func (e taskExecutor) loadInvocationArgs(
nexusInvokable := nexusInvocation{}
nexusInvokable.nexus = variant.Nexus
nexusInvokable.completion, err = target.GetNexusCompletion(ctx)
nexusInvokable.workflowID = ref.WorkflowKey.WorkflowID
nexusInvokable.runID = ref.WorkflowKey.RunID
nexusInvokable.attempt = callback.Attempt
invokable = nexusInvokable
if err != nil {
return err
Expand Down
23 changes: 21 additions & 2 deletions components/callbacks/nexus_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ import (
"fmt"
"io"
"net/http"
"net/http/httptrace"
"slices"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/service/history/queues"
)

Expand All @@ -48,8 +51,10 @@ type CanGetNexusCompletion interface {
}

type nexusInvocation struct {
nexus *persistencespb.Callback_Nexus
completion nexus.OperationCompletion
nexus *persistencespb.Callback_Nexus
completion nexus.OperationCompletion
workflowID, runID string
attempt int32
}

func isRetryableHTTPResponse(response *http.Response) bool {
Expand All @@ -74,6 +79,20 @@ func (n nexusInvocation) WrapError(result invocationResult, err error) error {
}

func (n nexusInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult {
if n.attempt >= int32(e.Config.HTTPTraceMinAttempt(ns.Name().String())) &&
n.attempt <= int32(e.Config.HTTPTraceMaxAttempt(ns.Name().String())) {
traceLogger := log.With(e.Logger,
tag.WorkflowNamespace(ns.Name().String()),
tag.Operation("CompleteNexusOperation"),
tag.NewStringTag("destination", task.destination),
tag.WorkflowID(n.workflowID),
tag.WorkflowRunID(n.runID),
tag.AttemptStart(time.Now().UTC()),
tag.Attempt(n.attempt),
)
ctx = httptrace.WithClientTrace(ctx, commonnexus.NewHTTPClientTrace(traceLogger))
}

request, err := nexus.NewCompletionHTTPRequest(ctx, n.nexus.Url, n.completion)
if err != nil {
return invocationResultFail{queues.NewUnprocessableTaskError(
Expand Down
4 changes: 4 additions & 0 deletions components/nexusoperations/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type Config struct {
PayloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
CallbackURLTemplate dynamicconfig.StringPropertyFn
EndpointNotFoundAlwaysNonRetryable dynamicconfig.BoolPropertyFnWithNamespaceFilter
HTTPTraceMinAttempt dynamicconfig.IntPropertyFnWithNamespaceFilter
HTTPTraceMaxAttempt dynamicconfig.IntPropertyFnWithNamespaceFilter
RetryPolicy func() backoff.RetryPolicy
}

Expand All @@ -170,6 +172,8 @@ func ConfigProvider(dc *dynamicconfig.Collection) *Config {
PayloadSizeLimit: dynamicconfig.BlobSizeLimitError.Get(dc),
CallbackURLTemplate: CallbackURLTemplate.Get(dc),
EndpointNotFoundAlwaysNonRetryable: EndpointNotFoundAlwaysNonRetryable.Get(dc),
HTTPTraceMinAttempt: dynamicconfig.NexusHTTPTraceMinAttempt.Get(dc),
HTTPTraceMaxAttempt: dynamicconfig.NexusHTTPTraceMaxAttempt.Get(dc),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(
RetryPolicyInitialInterval.Get(dc)(),
Expand Down
38 changes: 35 additions & 3 deletions components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"errors"
"fmt"
"net/http/httptrace"
"strings"
"text/template"
"time"
Expand Down Expand Up @@ -197,6 +198,21 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()

if task.Attempt >= int32(e.Config.HTTPTraceMinAttempt(ns.Name().String())) &&
task.Attempt <= int32(e.Config.HTTPTraceMaxAttempt(ns.Name().String())) {
traceLogger := log.With(e.Logger,
tag.WorkflowNamespace(ns.Name().String()),
tag.RequestID(args.requestID),
tag.Operation(args.operation),
tag.Endpoint(args.endpointName),
tag.WorkflowID(ref.WorkflowKey.WorkflowID),
tag.WorkflowRunID(ref.WorkflowKey.RunID),
tag.AttemptStart(time.Now().UTC()),
tag.Attempt(task.Attempt),
)
callCtx = httptrace.WithClientTrace(callCtx, commonnexus.NewHTTPClientTrace(traceLogger))
}

startTime := time.Now()
var rawResult *nexus.ClientStartOperationResult[*nexus.LazyValue]
var callErr error
Expand Down Expand Up @@ -536,6 +552,21 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()

if task.Attempt >= int32(e.Config.HTTPTraceMinAttempt(ns.Name().String())) &&
task.Attempt <= int32(e.Config.HTTPTraceMaxAttempt(ns.Name().String())) {
traceLogger := log.With(e.Logger,
tag.WorkflowNamespace(ns.Name().String()),
tag.RequestID(args.requestID),
tag.Operation(args.operation),
tag.Endpoint(args.endpointName),
tag.WorkflowID(ref.WorkflowKey.WorkflowID),
tag.WorkflowRunID(ref.WorkflowKey.RunID),
tag.AttemptStart(time.Now().UTC()),
tag.Attempt(task.Attempt),
)
callCtx = httptrace.WithClientTrace(callCtx, commonnexus.NewHTTPClientTrace(traceLogger))
}

var callErr error
startTime := time.Now()
if callTimeout < e.Config.MinOperationTimeout(ns.Name().String()) {
Expand Down Expand Up @@ -565,9 +596,9 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro
}

type cancelArgs struct {
service, operation, operationID, endpointID, endpointName string
scheduledTime time.Time
scheduleToCloseTimeout time.Duration
service, operation, operationID, endpointID, endpointName, requestID string
scheduledTime time.Time
scheduleToCloseTimeout time.Duration
}

// loadArgsForCancelation loads state from the operation state machine that's the parent of the cancelation machine the
Expand All @@ -588,6 +619,7 @@ func (e taskExecutor) loadArgsForCancelation(ctx context.Context, env hsm.Enviro
args.operationID = op.OperationId
args.endpointID = op.EndpointId
args.endpointName = op.Endpoint
args.requestID = op.RequestId
args.scheduledTime = op.ScheduledTime.AsTime()
args.scheduleToCloseTimeout = op.ScheduleToCloseTimeout.AsDuration()
return nil
Expand Down
Loading