Skip to content

Commit

Permalink
Make internal execution info consistent with public API (#6860)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Update internal MS versioning info to be consistent with what is defined
in [the public API](https://github.com/temporalio/api/pull/487/files).

## Why?
<!-- Tell your future self why have you made these changes -->

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
ShahabT authored Nov 21, 2024
1 parent 8466948 commit 00745c6
Show file tree
Hide file tree
Showing 14 changed files with 1,525 additions and 1,828 deletions.
2,898 changes: 1,336 additions & 1,562 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ retract (
v1.26.0 // Published accidentally.
)

replace go.temporal.io/api => github.com/temporalio/api-go v1.39.1-0.20241120204558-7018129677c5
//replace go.temporal.io/api => github.com/temporalio/api-go v1.39.1-0.20241120204558-7018129677c5

require (
cloud.google.com/go/storage v1.41.0
Expand Down Expand Up @@ -57,7 +57,7 @@ require (
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.temporal.io/api v1.39.1-0.20241110233709-f3490ec44b20
go.temporal.io/api v1.39.1-0.20241121195644-de125cd2868b
go.temporal.io/sdk v1.29.1
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/temporalio/api-go v1.39.1-0.20241120204558-7018129677c5 h1:9X+k3sIMPPagOqZGnEnu6t1pkKvI2mGp/oYVmo8Yj4A=
github.com/temporalio/api-go v1.39.1-0.20241120204558-7018129677c5/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
github.com/temporalio/ringpop-go v0.0.0-20240718232345-e2a435d149b6 h1:LnuIZntMSwZJS0O9b9bA+3iUL+Cwwhk+838PB04XLz4=
github.com/temporalio/ringpop-go v0.0.0-20240718232345-e2a435d149b6/go.mod h1:RE+CHmY+kOZQk47AQaVzwrGmxpflnLgTd6EOK0853j4=
github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb h1:YzHH/U/dN7vMP+glybzcXRTczTrgfdRisNTzAj7La04=
Expand Down Expand Up @@ -323,6 +321,8 @@ go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94=
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
go.temporal.io/api v1.39.1-0.20241121195644-de125cd2868b h1:yyC9IlAA9dvluyaECfsKNJbUluB9r1W6q4GO4pj6K/g=
go.temporal.io/api v1.39.1-0.20241121195644-de125cd2868b/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0=
go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
37 changes: 3 additions & 34 deletions proto/internal/temporal/server/api/persistence/v1/executions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -256,40 +256,9 @@ message WorkflowExecutionInfo {

repeated StateMachineTombstoneBatch sub_state_machine_tombstone_batches = 95;

// Holds all the information about versioning for this particular execution.
VersioningInfo versioning_info = 96;

message VersioningInfo {
// Determines how server should treat this execution when workers are upgraded.
// When present it means that this workflow is versioned. This is set after an execution
// completes its first workflow task on a versioned worker.
temporal.api.enums.v1.VersioningBehavior behavior = 1;
// The worker deployment to which this workflow is assigned currently.
// Must be present if and only if `behavior` is set.
temporal.api.deployment.v1.Deployment deployment = 2;
// Manual override for execution's versioning behavior. Takes precedence over `behavior`.
temporal.api.enums.v1.VersioningBehavior behavior_override = 3;
// Used to manually pin the execution to a deployment. Must be set if and only if
// `behavior_override` is PINNED. Takes precedence over `deployment`.
temporal.api.deployment.v1.Deployment deployment_override = 4;
// When present, indicates the workflow is being redirected to a different deployment.
// A redirect can only exist during the lifetime of a pending workflow task.
// If the pending workflow task completes (at the next WorkflowTaskCompleted event), the
// redirect is considered complete and the workflow's deployment is updated. If the pending
// workflow task fails or times out, then the redirect is canceled and workflow remains on
// the previous deployment.
RedirectInfo redirect_info = 5;

message RedirectInfo {
// The target deployment of the redirect. Null means a so-far-versioned workflow is
// being redirected to unversioned workers.
temporal.api.deployment.v1.Deployment deployment = 1;
// If present, it means the redirect is initiated by a (safe) manual versioning
// override. Such override is only applied to the workflow when and if the redirect
// successfully completes.
temporal.api.enums.v1.VersioningBehavior behavior_override = 2;
}
}
// When present, it means the workflow execution is versioned, or is transitioning from
// unversioned workers to versioned ones.
temporal.api.workflow.v1.WorkflowExecutionVersioningInfo versioning_info = 96;
}

message ExecutionStats {
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3165,7 +3165,7 @@ func (wh *WorkflowHandler) GetCurrentDeployment(ctx context.Context, request *wo

describeDeploymentResponse, err := wh.deploymentStoreClient.GetCurrentDeployment(ctx, namespaceEntry, request.SeriesName)
if err != nil {
wh.logger.Error("Error during GetCurrentDeployment", tag.Error(err))
wh.logger.Error("Error during GetEffectiveDeployment", tag.Error(err))
return nil, err
}

Expand Down
22 changes: 1 addition & 21 deletions service/history/api/describeworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,30 +142,10 @@ func Invoke(
AssignedBuildId: executionInfo.AssignedBuildId,
InheritedBuildId: executionInfo.InheritedBuildId,
FirstRunId: executionInfo.FirstExecutionRunId,
VersioningInfo: executionInfo.VersioningInfo,
},
}

if versioningInfo := executionInfo.GetVersioningInfo(); versioningInfo != nil {
result.WorkflowExecutionInfo.VersioningInfo = &workflowpb.WorkflowExecutionInfo_VersioningInfo{
Behavior: versioningInfo.Behavior,
Deployment: versioningInfo.Deployment,
VersioningOverride: &workflowpb.VersioningOverride{
Behavior: versioningInfo.BehaviorOverride,
Deployment: versioningInfo.DeploymentOverride,
},
}
if redirectInfo := versioningInfo.GetRedirectInfo(); redirectInfo != nil {
result.WorkflowExecutionInfo.VersioningInfo.DeploymentTransition =
&workflowpb.WorkflowExecutionInfo_VersioningInfo_DeploymentTransition{
Deployment: redirectInfo.Deployment,
// todo (carly): if the redirect has a versioning override or is unsetting an override, set this field
// currently redirectInfo only has "deployment" and "behavior override", not "deployment override"
// so I'm not 100% sure what to put here. Probably persistence.versioningInfo proto needs to change.
ApplyOverride: nil,
}
}
}

if executionInfo.ParentRunId != "" {
result.WorkflowExecutionInfo.ParentExecution = &commonpb.WorkflowExecution{
WorkflowId: executionInfo.ParentWorkflowId,
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func Invoke(

deployment := worker_versioning.DeploymentFromCapabilities(request.PollRequest.WorkerVersionCapabilities)
// TODO (shahab): support independent deployments
activityInitiatedRedirect := mutableState.StartDeploymentRedirect(deployment, enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED)
activityInitiatedRedirect := mutableState.StartDeploymentTransition(deployment)

if mutableState.GetRedirectInfo() != nil {
if mutableState.GetDeploymentTransition() != nil {
// Can't start activity during a redirect. We reject this request so Matching drops
// the task. The activity will be rescheduled when the redirect completes/fails.

Expand Down
6 changes: 3 additions & 3 deletions service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Invoke(
// For versioned workflows we additionally check for the poller queue to not be a sticky queue itself.
// Although it's ideal to check this for unversioned workflows as well, we can't rely on older clients
// setting the poller TQ kind.
if (mutableState.GetAssignedBuildId() != "" || mutableState.GetCurrentDeployment() != nil) &&
if (mutableState.GetAssignedBuildId() != "" || mutableState.GetEffectiveDeployment() != nil) &&
req.PollRequest.TaskQueue.Kind == enumspb.TASK_QUEUE_KIND_STICKY {
return nil, serviceerrors.NewObsoleteDispatchBuildId("wrong sticky queue")
}
Expand All @@ -152,10 +152,10 @@ func Invoke(
}

deployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities)
wfDeployment := mutableState.GetCurrentDeployment()
wfDeployment := mutableState.GetEffectiveDeployment()
if !deployment.Equal(wfDeployment) {
// Try starting a redirect. If it doesn't happen, it means this task is obsolete.
if !mutableState.StartDeploymentRedirect(deployment, enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED) {
if !mutableState.StartDeploymentTransition(deployment) {
return nil, serviceerrors.NewObsoleteDispatchBuildId("poller's deployment does not match workflow's current deployment")
}
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ func (handler *WorkflowTaskCompletedHandler) validateVersioningInfo(
ms workflow.MutableState,
) error {
taskDeployment := request.GetDeployment()
wfDeployment := ms.GetCurrentDeployment()
wfDeployment := ms.GetEffectiveDeployment()
if !taskDeployment.Equal(wfDeployment) {
return serviceerror.NewNotFound(fmt.Sprintf(
"execution is not assigned to deployment %q, current deployment is %q",
Expand Down
37 changes: 25 additions & 12 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,30 @@ type (
// NextTransitionCount returns the next state transition count from the state transition history.
// If state transition history is empty (e.g. when disabled or fresh mutable state), returns 0.
NextTransitionCount() int64
// GetCurrentDeployment returns the current effective deployment in the following order:
// RedirectInfo.Deployment takes precedence over DeploymentOverride, over Deployment.
GetCurrentDeployment() *deploymentpb.Deployment
// GetVersioningBehavior returns the effective versioning behavior for the workflow.
GetVersioningBehavior() enumspb.VersioningBehavior
GetRedirectInfo() *persistencespb.WorkflowExecutionInfo_VersioningInfo_RedirectInfo
StartDeploymentRedirect(
deployment *deploymentpb.Deployment,
behaviorOverride enumspb.VersioningBehavior,
) bool
CompleteDeploymentRedirect(behavior enumspb.VersioningBehavior) error
FailDeploymentRedirect() error
// GetEffectiveDeployment returns the effective deployment in the following order:
// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a new
// deployment
// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override at wf
// start time, or later via UpdateWorkflowExecutionOptions.
// 3. Deployment: this is returned when there is no transition and not override (most common case).
// Deployment is set based on the worker-sent deployment in the latest WFT completion.
GetEffectiveDeployment() *deploymentpb.Deployment
// GetEffectiveVersioningBehavior returns the effective versioning behavior in the following
// order:
// 1. VersioningOverride.Behavior: this is returned when user has set a behavior override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
// 2. Behavior: this is returned when there is no override (most common case). Behavior is
// set based on the worker-sent deployment in the latest WFT completion.
GetEffectiveVersioningBehavior() enumspb.VersioningBehavior
GetDeploymentTransition() *workflowpb.DeploymentTransition
// StartDeploymentTransition starts a transition to the given deployment. Returns true
// if the requested transition is started. Starting a new transition replaces possible
// existing ongoing transition without rescheduling activities. If the workflow is
// pinned, the transition won't start.
StartDeploymentTransition(deployment *deploymentpb.Deployment) bool
// CompleteDeploymentTransition completes the ongoing transition for this workflow if it exists.
// Completing a transition updates the workflow's deployment and possibly versioning behavior.
// All activities that are not started yet will be rescheduled to be dispatched the new deployment.
CompleteDeploymentTransition(workerSentBehavior enumspb.VersioningBehavior) error
}
)
Loading

0 comments on commit 00745c6

Please sign in to comment.