Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Conflict Tokens to Deployment APIs #7203

Merged
merged 29 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b0a859a
pull api-go versioning-3.1 branch in this branch
carlydf Jan 16, 2025
957bc26
RegisterTaskQueue + DescribeVersion: versioning-3.1 (#7107)
Shivs11 Jan 20, 2025
6fbdc7b
Use new versioning fields (#7119)
ShahabT Jan 21, 2025
f983da9
Describe worker deployment in versioning 3.1 (#7127)
Shivs11 Jan 22, 2025
36d48a6
Support ramp in Matching (#7126)
ShahabT Jan 23, 2025
56ee4d9
flake + unit tests fix (#7149)
Shivs11 Jan 23, 2025
7ff7876
SetWorkerDeploymentCurrentVersion: Versioning-3.1 (#7154)
Shivs11 Jan 24, 2025
a1fc7bd
ListWorkerDeployments : Versioning-3.1 (#7173)
Shivs11 Jan 28, 2025
3d40535
Worker Deployment Version Drainage Status (#7158)
carlydf Jan 29, 2025
e632e81
Address Drainage PR comments: Pass RegisterVersion info to task queue…
carlydf Jan 29, 2025
5be3cbb
ramp - Versioning:3.1 (#7183)
Shivs11 Jan 30, 2025
7495272
Pull in rebased API changes and fix build (#7201)
carlydf Jan 31, 2025
7b2b165
add conflict tokens to deployment APIs, no tests
carlydf Jan 31, 2025
93e2946
upgrade to latest api/versioning-3.1 with new build id -> version naming
carlydf Feb 2, 2025
a4354e4
pull latest versioning-3.1
carlydf Feb 2, 2025
0c8ec0d
Update Deployment Workflow ID's and accept string version values (#7198)
Shivs11 Feb 2, 2025
7de0178
DeleteVersion API implementation (#7187)
carlydf Feb 2, 2025
63c99b9
Use Deployment Version strings in all user-facing APIs (#7219)
ShahabT Feb 3, 2025
4741b54
Rename UpdateWorkerDeploymentVersionMetadata
ShahabT Feb 4, 2025
a48cccd
rebase to versioning-3.1
carlydf Feb 4, 2025
b740307
rebase to versioning-3.1-merge
carlydf Feb 4, 2025
bf73b90
remove unversioned change and fix rebase
carlydf Feb 4, 2025
1754818
remove unchanged files from diff
carlydf Feb 4, 2025
6636405
fix linences and imports
carlydf Feb 4, 2025
816cab6
fix import diff
carlydf Feb 4, 2025
e064abc
test conflict token
carlydf Feb 4, 2025
58efe5d
no replace in go.mod
carlydf Feb 4, 2025
a312969
Update service/history/workflow/mutable_state_impl.go
ShahabT Feb 4, 2025
94e89d8
Update tests/activity_api_pause_test.go
ShahabT Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,228 changes: 635 additions & 593 deletions api/deployment/v1/message.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
BuildIdSearchAttributeEscape = "|"
// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows
UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
UnversionedBuildId = "__unversioned__"

// Prefixes, Delimeters and Keys
WorkerDeploymentVersionIdDelimiter = "/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ message WorkerDeploymentLocalState {
// - It has no active pollers (see WorkerDeploymentVersionInfo.pollers_status)
// - It is drained (see WorkerDeploymentVersionInfo.drainage_status)
// TODO (Shivam) - Removal of registered versions based on the above conditions.
repeated string versions = 3;
repeated string versions = 3;
bytes conflict_token = 4;
}

// used as Worker Deployment Version workflow update input:
Expand Down Expand Up @@ -233,11 +234,13 @@ message CheckWorkerDeploymentUserDataPropagationRequest {
message SetCurrentVersionArgs {
string identity = 1;
string version = 2;
bytes conflict_token = 3;
}

// used as Worker Deployment update response:
message SetCurrentVersionResponse {
string previous_version = 1;
bytes conflict_token = 2;
}

// used as Worker Deployment workflow update input:
Expand Down Expand Up @@ -277,6 +280,7 @@ message SetRampingVersionArgs {
string identity = 1;
string version = 2;
float percentage = 3;
bytes conflict_token = 4;
}

// used as Worker Deployment activity input:
Expand Down
10 changes: 6 additions & 4 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3387,7 +3387,7 @@

// TODO (Shivam): error out if build_ID is empty

resp, err := wh.workerDeploymentClient.SetCurrentVersion(ctx, namespaceEntry, request.DeploymentName, request.Version, request.Identity)
resp, err := wh.workerDeploymentClient.SetCurrentVersion(ctx, namespaceEntry, request.DeploymentName, request.Version, request.Identity, request.GetConflictToken())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3427,7 +3427,7 @@
return nil, serviceerror.NewInvalidArgument("Percentage must be between 0 and 100 (inclusive)")
}

resp, err := wh.workerDeploymentClient.SetRampingVersion(ctx, namespaceEntry, request.DeploymentName, request.Version, request.GetPercentage(), request.GetIdentity())
resp, err := wh.workerDeploymentClient.SetRampingVersion(ctx, namespaceEntry, request.DeploymentName, request.Version, request.GetPercentage(), request.GetIdentity(), request.GetConflictToken())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3499,19 +3499,21 @@
return nil, err
}

workerDeploymentInfo, err := wh.workerDeploymentClient.DescribeWorkerDeployment(ctx, namespaceEntry, request.DeploymentName)
workerDeploymentInfo, cT, err := wh.workerDeploymentClient.DescribeWorkerDeployment(ctx, namespaceEntry, request.DeploymentName)
if err != nil {
return nil, err
}

return &workflowservice.DescribeWorkerDeploymentResponse{
WorkerDeploymentInfo: workerDeploymentInfo,
ConflictToken: cT,
}, nil
}

func (wh *WorkflowHandler) DeleteWorkerDeployment(ctx context.Context, request *workflowservice.DeleteWorkerDeploymentRequest) (*workflowservice.DeleteWorkerDeploymentResponse, error) {
// TODO implement me
//TODO implement me

Check failure on line 3514 in service/frontend/workflow_handler.go

View workflow job for this annotation

GitHub Actions / golangci

comment-spacings: no space between comment delimiter and comment text (revive)
panic("implement me")
return nil, nil

Check failure on line 3516 in service/frontend/workflow_handler.go

View workflow job for this annotation

GitHub Actions / golangci

unreachable: unreachable code (govet)
}

func (wh *WorkflowHandler) DeleteWorkerDeploymentVersion(ctx context.Context, request *workflowservice.DeleteWorkerDeploymentVersionRequest) (_ *workflowservice.DeleteWorkerDeploymentVersionResponse, retError error) {
Expand Down
36 changes: 22 additions & 14 deletions service/worker/workerdeployment/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ type Client interface {
ctx context.Context,
namespaceEntry *namespace.Namespace,
deploymentName string,
) (*deploymentpb.WorkerDeploymentInfo, error)
) (*deploymentpb.WorkerDeploymentInfo, []byte, error)

SetCurrentVersion(
ctx context.Context,
namespaceEntry *namespace.Namespace,
deploymentName string,
version string,
identity string,
conflictToken []byte,
) (*deploymentspb.SetCurrentVersionResponse, error)

ListWorkerDeployments(
Expand All @@ -107,6 +108,7 @@ type Client interface {
version string,
percentage float32,
identity string,
conflictToken []byte,
) (*deploymentspb.SetRampingVersionResponse, error)

// Used internally by the Worker Deployment workflow in its StartWorkerDeployment Activity
Expand Down Expand Up @@ -278,14 +280,14 @@ func (d *ClientImpl) DescribeWorkerDeployment(
ctx context.Context,
namespaceEntry *namespace.Namespace,
deploymentName string,
) (_ *deploymentpb.WorkerDeploymentInfo, retErr error) {
) (_ *deploymentpb.WorkerDeploymentInfo, conflictToken []byte, retErr error) {
//revive:disable-next-line:defer
defer d.record("DescribeWorkerDeployment", &retErr, deploymentName)()

// validating params
err := validateVersionWfParams(WorkerDeploymentFieldName, deploymentName, d.maxIDLengthLimit())
if err != nil {
return nil, err
return nil, nil, err
}

deploymentWorkflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)
Expand All @@ -303,16 +305,19 @@ func (d *ClientImpl) DescribeWorkerDeployment(

res, err := d.historyClient.QueryWorkflow(ctx, req)
if err != nil {
return nil, err
return nil, nil, err
}

var queryResponse deploymentspb.QueryDescribeWorkerDeploymentResponse
err = sdk.PreferProtoDataConverter.FromPayloads(res.GetResponse().GetQueryResult(), &queryResponse)
if err != nil {
return nil, err
return nil, nil, err
}

return d.deploymentStateToDeploymentInfo(ctx, namespaceEntry, deploymentName, queryResponse.State)
dInfo, err := d.deploymentStateToDeploymentInfo(ctx, namespaceEntry, deploymentName, queryResponse.State)
if err != nil {
return nil, nil, err
}
return dInfo, queryResponse.GetState().GetConflictToken(), nil
}

func (d *ClientImpl) ListWorkerDeployments(
Expand Down Expand Up @@ -363,6 +368,7 @@ func (d *ClientImpl) SetCurrentVersion(
deploymentName string,
version string,
identity string,
conflictToken []byte,
) (_ *deploymentspb.SetCurrentVersionResponse, retErr error) {
//revive:disable-next-line:defer
defer d.record("SetCurrentVersion", &retErr, namespaceEntry.Name(), version, identity)()
Expand All @@ -376,8 +382,9 @@ func (d *ClientImpl) SetCurrentVersion(
}

updatePayload, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.SetCurrentVersionArgs{
Identity: identity,
Version: version,
Identity: identity,
Version: version,
ConflictToken: conflictToken,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -425,9 +432,10 @@ func (d *ClientImpl) SetRampingVersion(
version string,
percentage float32,
identity string,
conflictToken []byte,
) (_ *deploymentspb.SetRampingVersionResponse, retErr error) {
//revive:disable-next-line:defer
defer d.record("SetWorkerDeploymentRampingVersion", &retErr, namespaceEntry.Name(), version, percentage, identity)()
defer d.record("SetRampingVersion", &retErr, namespaceEntry.Name(), version, percentage, identity)()
requestID := uuid.New()
var versionObj *deploymentspb.WorkerDeploymentVersion
var err error
Expand All @@ -447,9 +455,10 @@ func (d *ClientImpl) SetRampingVersion(
}

updatePayload, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.SetRampingVersionArgs{
Identity: identity,
Version: version,
Percentage: percentage,
Identity: identity,
Version: version,
Percentage: percentage,
ConflictToken: conflictToken,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -477,7 +486,6 @@ func (d *ClientImpl) SetRampingVersion(
} else if failure.GetApplicationFailureInfo().GetType() == errVersionAlreadyCurrentType {
return nil, serviceerror.NewFailedPrecondition(fmt.Sprintf("Ramping version %v is already current", version))
} else if failure != nil {
// TODO: is there an easy way to recover the original type here?
return nil, serviceerror.NewInternal(failure.Message)
}

Expand Down
1 change: 1 addition & 0 deletions service/worker/workerdeployment/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
errVersionAlreadyExistsType = "errVersionAlreadyExists"
errMaxTaskQueuesInVersionType = "errMaxTaskQueuesInVersion"
errVersionAlreadyCurrentType = "errVersionAlreadyCurrent"
errConflictTokenMismatchType = "errConflictTokenMismatch"
)

var (
Expand Down
14 changes: 13 additions & 1 deletion service/worker/workerdeployment/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package workerdeployment

import (
"bytes"
"slices"
"time"

Expand All @@ -48,6 +49,7 @@ type (
metrics sdkclient.MetricsHandler
lock workflow.Mutex
pendingUpdates int
conflictToken []byte
}
)

Expand All @@ -68,6 +70,7 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
d.State = &deploymentspb.WorkerDeploymentLocalState{}
d.State.CreateTime = timestamppb.New(time.Now())
d.State.RoutingConfig = &deploymentpb.RoutingConfig{}
d.State.ConflictToken, _ = workflow.Now(ctx).MarshalBinary()
}

var pendingUpdates int
Expand Down Expand Up @@ -140,6 +143,9 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
}

func (d *WorkflowRunner) validateSetRampingVersion(args *deploymentspb.SetRampingVersionArgs) error {
if args.ConflictToken != nil && !bytes.Equal(args.ConflictToken, d.State.ConflictToken) {
return temporal.NewApplicationError("conflict token mismatch", errConflictTokenMismatchType)
}
if args.Version == d.State.RoutingConfig.RampingVersion && args.Percentage == d.State.RoutingConfig.RampingVersionPercentage {
d.logger.Info("version already ramping, no change")
return temporal.NewApplicationError("version already ramping, no change", errNoChangeType)
Expand Down Expand Up @@ -226,6 +232,7 @@ func (d *WorkflowRunner) handleSetRampingVersion(ctx workflow.Context, args *dep
d.State.RoutingConfig.RampingVersion = newRampingVersion
d.State.RoutingConfig.RampingVersionPercentage = args.Percentage
d.State.RoutingConfig.RampingVersionChangedTime = rampingVersionUpdateTime
d.State.ConflictToken, _ = routingUpdateTime.AsTime().MarshalBinary()

// update memo
if err = d.updateMemo(ctx); err != nil {
Expand All @@ -235,6 +242,7 @@ func (d *WorkflowRunner) handleSetRampingVersion(ctx workflow.Context, args *dep
return &deploymentspb.SetRampingVersionResponse{
PreviousVersion: prevRampingVersion,
PreviousPercentage: prevRampingVersionPercentage,
ConflictToken: d.State.ConflictToken,
}, nil

}
Expand Down Expand Up @@ -283,10 +291,12 @@ func (d *WorkflowRunner) handleDeleteVersion(ctx workflow.Context, args *deploym
}

func (d *WorkflowRunner) validateSetCurrent(args *deploymentspb.SetCurrentVersionArgs) error {
if args.ConflictToken != nil && !bytes.Equal(args.ConflictToken, d.State.ConflictToken) {
return temporal.NewApplicationError("conflict token mismatch", errConflictTokenMismatchType)
}
if d.State.RoutingConfig.CurrentVersion == args.Version {
return temporal.NewApplicationError("no change", errNoChangeType)
}

return nil
}

Expand Down Expand Up @@ -334,6 +344,7 @@ func (d *WorkflowRunner) handleSetCurrent(ctx workflow.Context, args *deployment
// update local state
d.State.RoutingConfig.CurrentVersion = args.Version
d.State.RoutingConfig.CurrentVersionChangedTime = updateTime
d.State.ConflictToken, _ = updateTime.AsTime().MarshalBinary()

// unset ramping version if it was set to current version
if d.State.RoutingConfig.CurrentVersion == d.State.RoutingConfig.RampingVersion {
Expand All @@ -349,6 +360,7 @@ func (d *WorkflowRunner) handleSetCurrent(ctx workflow.Context, args *deployment

return &deploymentspb.SetCurrentVersionResponse{
PreviousVersion: prevCurrentVersion,
ConflictToken: d.State.ConflictToken,
}, nil

}
Expand Down
66 changes: 66 additions & 0 deletions tests/worker_deployment_test.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice test!

maybe we can also have a test that errors out because an invalid conflict token has been passed? Not sure if this is tedious but might confirm if things are fully working or not

not a blocker if you think things are working though

Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,72 @@ func (s *WorkerDeploymentSuite) TestDescribeWorkerDeployment_SetCurrentVersion()
}, time.Second*10, time.Millisecond*1000)
}

func (s *WorkerDeploymentSuite) TestConflictToken_Describe_SetCurrent_SetRamping() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
tv := testvars.New(s)

firstVersion := tv.WithBuildIDNumber(1)
secondVersion := tv.WithBuildIDNumber(2)

// Start deployment version workflow + worker-deployment workflow. Only one version is stared manually
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: started

Also, struggling to understand the comment actually

// to prevent erroring out in the successive DescribeWorkerDeployment call.
go s.pollFromDeployment(ctx, firstVersion)

var cT []byte
// No current deployment version set.
s.EventuallyWithT(func(t *assert.CollectT) {
a := assert.New(t)

resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
})
a.NoError(err)
a.Equal("", resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetCurrentVersion())
cT = resp.GetConflictToken()
}, time.Second*10, time.Millisecond*1000)

// Set first version as current version
_, _ = s.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, &workflowservice.SetWorkerDeploymentCurrentVersionRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
Version: firstVersion.DeploymentVersionString(),
ConflictToken: cT,
})

s.EventuallyWithT(func(t *assert.CollectT) {
a := assert.New(t)

resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
})
a.NoError(err)
a.Equal(firstVersion.DeploymentVersionString(), resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetCurrentVersion())
cT = resp.GetConflictToken()
}, time.Second*10, time.Millisecond*1000)

// Set second version as ramping version
_, _ = s.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, &workflowservice.SetWorkerDeploymentRampingVersionRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
Version: secondVersion.DeploymentVersionString(),
Percentage: 5,
ConflictToken: cT,
})

s.EventuallyWithT(func(t *assert.CollectT) {
a := assert.New(t)
resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
})
a.NoError(err)
a.Equal(secondVersion.DeploymentVersionString(), resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetRampingVersion())
}, time.Second*10, time.Millisecond*1000)
}

func (s *WorkerDeploymentSuite) TestSetCurrentVersion_Idempotent() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
Expand Down
Loading