Skip to content

Commit

Permalink
Merge branch 'main' into shahab/user-data
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahabT authored Feb 12, 2025
2 parents 75e7bac + a5a0cd6 commit 9112037
Show file tree
Hide file tree
Showing 25 changed files with 940 additions and 794 deletions.
1,461 changes: 736 additions & 725 deletions api/deployment/v1/message.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
httpStatusTagName = "http_status"
nexusMethodTagName = "method"
nexusEndpointTagName = "nexus_endpoint"
nexusServiceTagName = "nexus_service"
nexusOperationTagName = "nexus_operation"
outcomeTagName = "outcome"
versionedTagName = "versioned"
resourceExhaustedTag = "resource_exhausted_cause"
Expand Down
8 changes: 8 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,14 @@ func NexusEndpointTag(value string) Tag {
return &tagImpl{key: nexusEndpointTagName, value: value}
}

func NexusServiceTag(value string) Tag {
return &tagImpl{key: nexusServiceTagName, value: value}
}

func NexusOperationTag(value string) Tag {
return &tagImpl{key: nexusOperationTagName, value: value}
}

// HttpStatusTag returns a new httpStatusTag.
func HttpStatusTag(value int) Tag {
return &tagImpl{key: httpStatusTagName, value: strconv.Itoa(value)}
Expand Down
76 changes: 46 additions & 30 deletions components/nexusoperations/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
package nexusoperations

import (
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -96,26 +95,20 @@ ScheduleNexusOperation commands with a "nexus_header" field that exceeds this li
Uses Go's len() function on header keys and values to determine the total size.`,
)

var DisallowedOperationHeaders = dynamicconfig.NewNamespaceTypedSettingWithConverter(
// defaultDisallowedOperationHeaders - set in the convert function below due to a limitation in the dynamic config framework.
// TODO: restore after an upgrade to Go 1.24 and merging #7052.
var defaultDisallowedOperationHeaders = []string{
"request-timeout",
interceptor.DCRedirectionApiHeaderName,
interceptor.DCRedirectionContextHeaderName,
headers.CallerNameHeaderName,
headers.CallerTypeHeaderName,
headers.CallOriginHeaderName,
}

var DisallowedOperationHeaders = dynamicconfig.NewGlobalTypedSetting(
"component.nexusoperations.disallowedHeaders",
func(a any) ([]string, error) {
keys, ok := a.([]string)
if !ok {
return nil, fmt.Errorf("expected a string slice, got: %v", a)
}
for i, k := range keys {
keys[i] = strings.ToLower(k)
}
return keys, nil
},
[]string{
"request-timeout",
interceptor.DCRedirectionApiHeaderName,
interceptor.DCRedirectionContextHeaderName,
headers.CallerNameHeaderName,
headers.CallerTypeHeaderName,
headers.CallOriginHeaderName,
},
[]string(nil),
`Case insensitive list of disallowed header keys for Nexus Operations.
ScheduleNexusOperation commands with a "nexus_header" field that contains any of these disallowed keys will be
rejected.`,
Expand Down Expand Up @@ -149,6 +142,19 @@ var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
`The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
)

var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting(
"component.nexusoperations.metrics.tags",
NexusMetricTagConfig{},
`Controls which metric tags are included with Nexus operation metrics. This configuration supports:
1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag)
2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag)
3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings)
Note: default metric tags (like namespace, endpoint) are always included and not affected by this configuration.
Adding high-cardinality tags (like unique operation names) can significantly increase metric storage
requirements and query complexity. Consider the cardinality impact when enabling these tags.`,
)

type Config struct {
Enabled dynamicconfig.BoolPropertyFn
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
Expand All @@ -158,7 +164,7 @@ type Config struct {
MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationTokenLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationHeaderSize dynamicconfig.IntPropertyFnWithNamespaceFilter
DisallowedOperationHeaders dynamicconfig.TypedPropertyFnWithNamespaceFilter[[]string]
DisallowedOperationHeaders dynamicconfig.TypedPropertyFn[[]string]
MaxOperationScheduleToCloseTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
PayloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
CallbackURLTemplate dynamicconfig.StringPropertyFn
Expand All @@ -168,15 +174,25 @@ type Config struct {

func ConfigProvider(dc *dynamicconfig.Collection) *Config {
return &Config{
Enabled: dynamicconfig.EnableNexus.Get(dc),
RequestTimeout: RequestTimeout.Get(dc),
MinOperationTimeout: MinOperationTimeout.Get(dc),
MaxConcurrentOperations: MaxConcurrentOperations.Get(dc),
MaxServiceNameLength: MaxServiceNameLength.Get(dc),
MaxOperationNameLength: MaxOperationNameLength.Get(dc),
MaxOperationTokenLength: MaxOperationTokenLength.Get(dc),
MaxOperationHeaderSize: MaxOperationHeaderSize.Get(dc),
DisallowedOperationHeaders: DisallowedOperationHeaders.Get(dc),
Enabled: dynamicconfig.EnableNexus.Get(dc),
RequestTimeout: RequestTimeout.Get(dc),
MinOperationTimeout: MinOperationTimeout.Get(dc),
MaxConcurrentOperations: MaxConcurrentOperations.Get(dc),
MaxServiceNameLength: MaxServiceNameLength.Get(dc),
MaxOperationNameLength: MaxOperationNameLength.Get(dc),
MaxOperationTokenLength: MaxOperationTokenLength.Get(dc),
MaxOperationHeaderSize: MaxOperationHeaderSize.Get(dc),
DisallowedOperationHeaders: dynamicconfig.NewGlobalCachedTypedValue(dc, DisallowedOperationHeaders, func(keys []string) ([]string, error) {
// Override with defaults unless explicitly set.
// Note that this prevents the ability to unset the config but that's an acceptable limitation.
if len(keys) == 0 {
keys = defaultDisallowedOperationHeaders
}
for i, k := range keys {
keys[i] = strings.ToLower(k)
}
return keys, nil
}).Get,
MaxOperationScheduleToCloseTimeout: MaxOperationScheduleToCloseTimeout.Get(dc),
PayloadSizeLimit: dynamicconfig.BlobSizeLimitError.Get(dc),
CallbackURLTemplate: CallbackURLTemplate.Get(dc),
Expand Down
16 changes: 16 additions & 0 deletions components/nexusoperations/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,19 @@ var OutboundRequestLatency = metrics.NewTimerDef(
"nexus_outbound_latency",
metrics.WithDescription("Latency of outbound Nexus requests made by the history service."),
)

type NexusMetricTagConfig struct {
// Include service name as a metric tag
IncludeServiceTag bool
// Include operation name as a metric tag
IncludeOperationTag bool
// Configuration for mapping request headers to metric tags
HeaderTagMappings []NexusHeaderTagMapping
}

type NexusHeaderTagMapping struct {
// Name of the request header to extract value from
SourceHeader string
// Name of the metric tag to set with the header value
TargetTag string
}
2 changes: 1 addition & 1 deletion components/nexusoperations/workflow/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (ch *commandHandler) HandleScheduleCommand(
headerLength := 0
for k, v := range attrs.NexusHeader {
headerLength += len(k) + len(v)
if slices.Contains(ch.config.DisallowedOperationHeaders(nsName), strings.ToLower(k)) {
if slices.Contains(ch.config.DisallowedOperationHeaders(), strings.ToLower(k)) {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("ScheduleNexusOperationCommandAttributes.NexusHeader contains a disallowed header key: %q", k),
Expand Down
2 changes: 1 addition & 1 deletion components/nexusoperations/workflow/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var defaultConfig = &nexusoperations.Config{
MaxOperationNameLength: dynamicconfig.GetIntPropertyFnFilteredByNamespace(len("op")),
MaxConcurrentOperations: dynamicconfig.GetIntPropertyFnFilteredByNamespace(2),
MaxOperationHeaderSize: dynamicconfig.GetIntPropertyFnFilteredByNamespace(20),
DisallowedOperationHeaders: dynamicconfig.GetTypedPropertyFnFilteredByNamespace([]string{"request-timeout"}),
DisallowedOperationHeaders: dynamicconfig.GetTypedPropertyFn([]string{"request-timeout"}),
MaxOperationScheduleToCloseTimeout: dynamicconfig.GetDurationPropertyFnFilteredByNamespace(time.Hour * 24),
EndpointNotFoundAlwaysNonRetryable: dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ message WorkerDeploymentLocalState {
temporal.api.deployment.v1.RoutingConfig routing_config = 2;
map<string, WorkerDeploymentVersionSummary> versions = 3;
bytes conflict_token = 4;
string last_modifier_identity = 5;
}

message WorkerDeploymentVersionSummary {
Expand Down
33 changes: 33 additions & 0 deletions service/frontend/nexus_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"go.temporal.io/server/common/namespace"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/components/nexusoperations"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -94,6 +95,7 @@ type operationContext struct {
redirectionInterceptor *interceptor.Redirection
forwardingEnabledForNamespace dynamicconfig.BoolPropertyFnWithNamespaceFilter
headersBlacklist *dynamicconfig.GlobalCachedTypedValue[*regexp.Regexp]
metricTagConfig *dynamicconfig.GlobalCachedTypedValue[*nexusoperations.NexusMetricTagConfig]
cleanupFunctions []func(map[string]string, error)
}

Expand Down Expand Up @@ -273,6 +275,32 @@ func (c *operationContext) shouldForwardRequest(ctx context.Context, header nexu
c.forwardingEnabledForNamespace(c.namespaceName)
}

// enrichNexusOperationMetrics enhances metrics with additional Nexus operation context based on configuration.
func (c *operationContext) enrichNexusOperationMetrics(service, operation string, requestHeader nexus.Header) {
conf := c.metricTagConfig.Get()
if conf == nil {
return
}

var tags []metrics.Tag

if conf.IncludeServiceTag {
tags = append(tags, metrics.NexusServiceTag(service))
}

if conf.IncludeOperationTag {
tags = append(tags, metrics.NexusOperationTag(operation))
}

for _, mapping := range conf.HeaderTagMappings {
tags = append(tags, metrics.StringTag(mapping.TargetTag, requestHeader.Get(mapping.SourceHeader)))
}

if len(tags) > 0 {
c.metricsHandler = c.metricsHandler.WithTags(tags...)
}
}

// Key to extract a nexusContext object from a context.Context.
type nexusContextKey struct{}

Expand All @@ -292,6 +320,7 @@ type nexusHandler struct {
forwardingClients *cluster.FrontendHTTPClientCache
payloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
headersBlacklist *dynamicconfig.GlobalCachedTypedValue[*regexp.Regexp]
metricTagConfig *dynamicconfig.GlobalCachedTypedValue[*nexusoperations.NexusMetricTagConfig]
}

// Extracts a nexusContext from the given ctx and returns an operationContext with tagged metrics and logging.
Expand All @@ -311,6 +340,7 @@ func (h *nexusHandler) getOperationContext(ctx context.Context, method string) (
redirectionInterceptor: h.redirectionInterceptor,
forwardingEnabledForNamespace: h.forwardingEnabledForNamespace,
headersBlacklist: h.headersBlacklist,
metricTagConfig: h.metricTagConfig,
cleanupFunctions: make([]func(map[string]string, error), 0),
}
oc.metricsHandlerForInterceptors = h.metricsHandler.WithTags(
Expand Down Expand Up @@ -355,6 +385,7 @@ func (h *nexusHandler) StartOperation(
return nil, err
}
ctx = oc.augmentContext(ctx, options.Header)
oc.enrichNexusOperationMetrics(service, operation, options.Header)
defer oc.capturePanicAndRecordMetrics(&ctx, &retErr)

var links []*nexuspb.Link
Expand Down Expand Up @@ -517,6 +548,8 @@ func (h *nexusHandler) CancelOperation(ctx context.Context, service, operation,
if err != nil {
return err
}
ctx = oc.augmentContext(ctx, options.Header)
oc.enrichNexusOperationMetrics(service, operation, options.Header)
defer oc.capturePanicAndRecordMetrics(&ctx, &retErr)

request := oc.matchingRequest(&nexuspb.Request{
Expand Down
1 change: 1 addition & 0 deletions service/frontend/nexus_http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewNexusHTTPHandler(
forwardingClients: clientCache,
payloadSizeLimit: serviceConfig.BlobSizeLimitError,
headersBlacklist: serviceConfig.NexusRequestHeadersBlacklist,
metricTagConfig: serviceConfig.NexusOperationsMetricTagConfig,
},
GetResultTimeout: serviceConfig.KeepAliveMaxConnectionIdle(),
Logger: log.NewSlogLogger(logger),
Expand Down
12 changes: 10 additions & 2 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ type Config struct {
MaxCallbacksPerWorkflow dynamicconfig.IntPropertyFnWithNamespaceFilter
CallbackEndpointConfigs dynamicconfig.TypedPropertyFnWithNamespaceFilter[[]callbacks.AddressMatchRule]

MaxNexusOperationTokenLength dynamicconfig.IntPropertyFnWithNamespaceFilter
NexusRequestHeadersBlacklist *dynamicconfig.GlobalCachedTypedValue[*regexp.Regexp]
MaxNexusOperationTokenLength dynamicconfig.IntPropertyFnWithNamespaceFilter
NexusRequestHeadersBlacklist *dynamicconfig.GlobalCachedTypedValue[*regexp.Regexp]
NexusOperationsMetricTagConfig *dynamicconfig.GlobalCachedTypedValue[*nexusoperations.NexusMetricTagConfig]

LinkMaxSize dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxLinksPerRequest dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -357,6 +358,13 @@ func NewConfig(
return util.WildCardStringsToRegexp(patterns)
},
),
NexusOperationsMetricTagConfig: dynamicconfig.NewGlobalCachedTypedValue(
dc,
nexusoperations.MetricTagConfiguration,
func(config nexusoperations.NexusMetricTagConfig) (*nexusoperations.NexusMetricTagConfig, error) {
return &config, nil
},
),

LinkMaxSize: dynamicconfig.FrontendLinkMaxSize.Get(dc),
MaxLinksPerRequest: dynamicconfig.FrontendMaxLinksPerRequest.Get(dc),
Expand Down
3 changes: 2 additions & 1 deletion service/history/statemachine_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func (e *stateMachineEnvironment) validateStateMachineRef(
if ref.StateMachineRef.MutableStateVersionedTransition == nil ||
ref.StateMachineRef.MachineInitialVersionedTransition.TransitionCount == 0 ||
(ref.StateMachineRef.MachineLastUpdateVersionedTransition != nil &&
ref.StateMachineRef.MachineLastUpdateVersionedTransition.TransitionCount == 0) {
ref.StateMachineRef.MachineLastUpdateVersionedTransition.TransitionCount == 0) ||
len(ms.GetExecutionInfo().TransitionHistory) == 0 {
// Transtion history was disabled when the ref is generated,
// fallback to the old validation logic.
return e.validateStateMachineRefWithoutTransitionHistory(ms, ref, potentialStaleState)
Expand Down
15 changes: 15 additions & 0 deletions service/history/statemachine_environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func TestValidateStateMachineRef(t *testing.T) {
mutateRef func(*hsm.Ref)
mutateNode func(*hsm.Node)
assertOutcome func(*testing.T, error)
clearTransitionHistory bool
}{
{
name: "TaskGenerationStale",
Expand Down Expand Up @@ -250,6 +251,17 @@ func TestValidateStateMachineRef(t *testing.T) {
require.NoError(t, err)
},
},
{
name: "WithTransitionHistory/TransitionHistoryCleared/Valid",
enableTransitionHistory: true,
mutateRef: func(ref *hsm.Ref) {
},
mutateNode: func(node *hsm.Node) {},
assertOutcome: func(t *testing.T, err error) {
require.NoError(t, err)
},
clearTransitionHistory: true,
},
{
name: "WithoutTransitionHistory/Valid",
enableTransitionHistory: false,
Expand Down Expand Up @@ -285,6 +297,9 @@ func TestValidateStateMachineRef(t *testing.T) {
tc.mutateRef(&ref)

workflowContext := workflow.NewContext(s.mockShard.GetConfig(), mutableState.GetWorkflowKey(), log.NewTestLogger(), log.NewTestLogger(), metrics.NoopMetricsHandler)
if tc.clearTransitionHistory {
mutableState.GetExecutionInfo().TransitionHistory = nil
}
err = exec.validateStateMachineRef(context.Background(), workflowContext, mutableState, ref, true)
tc.assertOutcome(t, err)
})
Expand Down
17 changes: 13 additions & 4 deletions service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,10 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
RunId: task.GetRunID(),
}
executionInfo := mutableState.GetExecutionInfo()
children := copyChildWorkflowInfos(mutableState.GetPendingChildExecutionInfos())
var completionEvent *historypb.HistoryEvent // needed to report close event to parent workflow
replyToParentWorkflow := mutableState.HasParentExecution() && executionInfo.NewExecutionRunId == ""
if replyToParentWorkflow {
if replyToParentWorkflow || len(children) > 0 {
// only load close event if needed.
completionEvent, err = mutableState.GetCompletionEvent(ctx)
if err != nil {
Expand All @@ -369,7 +370,6 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
}

namespaceName := mutableState.GetNamespaceEntry().Name()
children := copyChildWorkflowInfos(mutableState.GetPendingChildExecutionInfos())

// NOTE: do not access anything related mutable state after this lock release.
// Release lock immediately since mutable state is not needed
Expand Down Expand Up @@ -402,8 +402,17 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(

// process parentClosePolicy except when the execution was reset. In case of reset, we need to keep the children around so that we can reconnect to them.
// We know an execution was reset when ResetRunId was populated in it.
// TODO (Chetan): update this condition as new reset policies are added. For now we keep all children since "Reconnect" is the only policy available.
if executionInfo.GetResetRunId() == "" {
// Note: Sometimes the reset operation might race with this task processing. i.e the WF was closed and before this task can be executed, a reset operation is recorded.
// So we need to additionally check the termination reason for this parent to determine if this task was indeed created due to reset or due to normal completion of the WF.
// Also, checking the dynamic config is not strictly safe since by definition it can change at any time. However this reduces the chance of us skipping the parent close policy when we shouldn't.
allowResetWithPendingChildren := t.config.AllowResetWithPendingChildren(namespaceName.String())
shouldSkipParentClosePolicy := false
isParentTerminatedDueToReset := (completionEvent != nil) && ndc.IsTerminatedByResetter(completionEvent)
if isParentTerminatedDueToReset && executionInfo.GetResetRunId() != "" && allowResetWithPendingChildren {
// TODO (Chetan): update this condition as new reset policies/cases are added.
shouldSkipParentClosePolicy = true // only skip if the parent is reset and we are using the new flow.
}
if !shouldSkipParentClosePolicy {
if err := t.processParentClosePolicy(
ctx,
namespaceName.String(),
Expand Down
Loading

0 comments on commit 9112037

Please sign in to comment.