Skip to content

Commit

Permalink
Add Conflict Tokens to Deployment APIs (#7203)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->

## Why?
<!-- Tell your future self why have you made these changes -->

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->

---------

Co-authored-by: Shivam <[email protected]>
Co-authored-by: Shahab Tajik <[email protected]>
Co-authored-by: Shivam Saraf <[email protected]>
Co-authored-by: ShahabT <[email protected]>
  • Loading branch information
5 people authored Feb 4, 2025
1 parent 214cd5e commit 6133947
Show file tree
Hide file tree
Showing 8 changed files with 749 additions and 613 deletions.
1,228 changes: 635 additions & 593 deletions api/deployment/v1/message.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
BuildIdSearchAttributeEscape = "|"
// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows
UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
UnversionedBuildId = "__unversioned__"

// Prefixes, Delimeters and Keys
WorkerDeploymentVersionIdDelimiter = "/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ message WorkerDeploymentLocalState {
// - It has no active pollers (see WorkerDeploymentVersionInfo.pollers_status)
// - It is drained (see WorkerDeploymentVersionInfo.drainage_status)
// TODO (Shivam) - Removal of registered versions based on the above conditions.
repeated string versions = 3;
repeated string versions = 3;
bytes conflict_token = 4;
}

// used as Worker Deployment Version workflow update input:
Expand Down Expand Up @@ -233,11 +234,13 @@ message CheckWorkerDeploymentUserDataPropagationRequest {
message SetCurrentVersionArgs {
string identity = 1;
string version = 2;
bytes conflict_token = 3;
}

// used as Worker Deployment update response:
message SetCurrentVersionResponse {
string previous_version = 1;
bytes conflict_token = 2;
}

// used as Worker Deployment workflow update input:
Expand Down Expand Up @@ -277,6 +280,7 @@ message SetRampingVersionArgs {
string identity = 1;
string version = 2;
float percentage = 3;
bytes conflict_token = 4;
}

// used as Worker Deployment activity input:
Expand Down
10 changes: 6 additions & 4 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3387,7 +3387,7 @@ func (wh *WorkflowHandler) SetWorkerDeploymentCurrentVersion(ctx context.Context

// TODO (Shivam): error out if build_ID is empty

resp, err := wh.workerDeploymentClient.SetCurrentVersion(ctx, namespaceEntry, request.DeploymentName, request.Version, request.Identity)
resp, err := wh.workerDeploymentClient.SetCurrentVersion(ctx, namespaceEntry, request.DeploymentName, request.Version, request.Identity, request.GetConflictToken())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3427,7 +3427,7 @@ func (wh *WorkflowHandler) SetWorkerDeploymentRampingVersion(ctx context.Context
return nil, serviceerror.NewInvalidArgument("Percentage must be between 0 and 100 (inclusive)")
}

resp, err := wh.workerDeploymentClient.SetRampingVersion(ctx, namespaceEntry, request.DeploymentName, request.Version, request.GetPercentage(), request.GetIdentity())
resp, err := wh.workerDeploymentClient.SetRampingVersion(ctx, namespaceEntry, request.DeploymentName, request.Version, request.GetPercentage(), request.GetIdentity(), request.GetConflictToken())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3499,19 +3499,21 @@ func (wh *WorkflowHandler) DescribeWorkerDeployment(ctx context.Context, request
return nil, err
}

workerDeploymentInfo, err := wh.workerDeploymentClient.DescribeWorkerDeployment(ctx, namespaceEntry, request.DeploymentName)
workerDeploymentInfo, cT, err := wh.workerDeploymentClient.DescribeWorkerDeployment(ctx, namespaceEntry, request.DeploymentName)
if err != nil {
return nil, err
}

return &workflowservice.DescribeWorkerDeploymentResponse{
WorkerDeploymentInfo: workerDeploymentInfo,
ConflictToken: cT,
}, nil
}

func (wh *WorkflowHandler) DeleteWorkerDeployment(ctx context.Context, request *workflowservice.DeleteWorkerDeploymentRequest) (*workflowservice.DeleteWorkerDeploymentResponse, error) {
// TODO implement me
//TODO implement me
panic("implement me")
return nil, nil
}

func (wh *WorkflowHandler) DeleteWorkerDeploymentVersion(ctx context.Context, request *workflowservice.DeleteWorkerDeploymentVersionRequest) (_ *workflowservice.DeleteWorkerDeploymentVersionResponse, retError error) {
Expand Down
36 changes: 22 additions & 14 deletions service/worker/workerdeployment/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ type Client interface {
ctx context.Context,
namespaceEntry *namespace.Namespace,
deploymentName string,
) (*deploymentpb.WorkerDeploymentInfo, error)
) (*deploymentpb.WorkerDeploymentInfo, []byte, error)

SetCurrentVersion(
ctx context.Context,
namespaceEntry *namespace.Namespace,
deploymentName string,
version string,
identity string,
conflictToken []byte,
) (*deploymentspb.SetCurrentVersionResponse, error)

ListWorkerDeployments(
Expand All @@ -107,6 +108,7 @@ type Client interface {
version string,
percentage float32,
identity string,
conflictToken []byte,
) (*deploymentspb.SetRampingVersionResponse, error)

// Used internally by the Worker Deployment workflow in its StartWorkerDeployment Activity
Expand Down Expand Up @@ -278,14 +280,14 @@ func (d *ClientImpl) DescribeWorkerDeployment(
ctx context.Context,
namespaceEntry *namespace.Namespace,
deploymentName string,
) (_ *deploymentpb.WorkerDeploymentInfo, retErr error) {
) (_ *deploymentpb.WorkerDeploymentInfo, conflictToken []byte, retErr error) {
//revive:disable-next-line:defer
defer d.record("DescribeWorkerDeployment", &retErr, deploymentName)()

// validating params
err := validateVersionWfParams(WorkerDeploymentFieldName, deploymentName, d.maxIDLengthLimit())
if err != nil {
return nil, err
return nil, nil, err
}

deploymentWorkflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)
Expand All @@ -303,16 +305,19 @@ func (d *ClientImpl) DescribeWorkerDeployment(

res, err := d.historyClient.QueryWorkflow(ctx, req)
if err != nil {
return nil, err
return nil, nil, err
}

var queryResponse deploymentspb.QueryDescribeWorkerDeploymentResponse
err = sdk.PreferProtoDataConverter.FromPayloads(res.GetResponse().GetQueryResult(), &queryResponse)
if err != nil {
return nil, err
return nil, nil, err
}

return d.deploymentStateToDeploymentInfo(ctx, namespaceEntry, deploymentName, queryResponse.State)
dInfo, err := d.deploymentStateToDeploymentInfo(ctx, namespaceEntry, deploymentName, queryResponse.State)
if err != nil {
return nil, nil, err
}
return dInfo, queryResponse.GetState().GetConflictToken(), nil
}

func (d *ClientImpl) ListWorkerDeployments(
Expand Down Expand Up @@ -363,6 +368,7 @@ func (d *ClientImpl) SetCurrentVersion(
deploymentName string,
version string,
identity string,
conflictToken []byte,
) (_ *deploymentspb.SetCurrentVersionResponse, retErr error) {
//revive:disable-next-line:defer
defer d.record("SetCurrentVersion", &retErr, namespaceEntry.Name(), version, identity)()
Expand All @@ -376,8 +382,9 @@ func (d *ClientImpl) SetCurrentVersion(
}

updatePayload, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.SetCurrentVersionArgs{
Identity: identity,
Version: version,
Identity: identity,
Version: version,
ConflictToken: conflictToken,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -425,9 +432,10 @@ func (d *ClientImpl) SetRampingVersion(
version string,
percentage float32,
identity string,
conflictToken []byte,
) (_ *deploymentspb.SetRampingVersionResponse, retErr error) {
//revive:disable-next-line:defer
defer d.record("SetWorkerDeploymentRampingVersion", &retErr, namespaceEntry.Name(), version, percentage, identity)()
defer d.record("SetRampingVersion", &retErr, namespaceEntry.Name(), version, percentage, identity)()
requestID := uuid.New()
var versionObj *deploymentspb.WorkerDeploymentVersion
var err error
Expand All @@ -447,9 +455,10 @@ func (d *ClientImpl) SetRampingVersion(
}

updatePayload, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.SetRampingVersionArgs{
Identity: identity,
Version: version,
Percentage: percentage,
Identity: identity,
Version: version,
Percentage: percentage,
ConflictToken: conflictToken,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -477,7 +486,6 @@ func (d *ClientImpl) SetRampingVersion(
} else if failure.GetApplicationFailureInfo().GetType() == errVersionAlreadyCurrentType {
return nil, serviceerror.NewFailedPrecondition(fmt.Sprintf("Ramping version %v is already current", version))
} else if failure != nil {
// TODO: is there an easy way to recover the original type here?
return nil, serviceerror.NewInternal(failure.Message)
}

Expand Down
1 change: 1 addition & 0 deletions service/worker/workerdeployment/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
errVersionAlreadyExistsType = "errVersionAlreadyExists"
errMaxTaskQueuesInVersionType = "errMaxTaskQueuesInVersion"
errVersionAlreadyCurrentType = "errVersionAlreadyCurrent"
errConflictTokenMismatchType = "errConflictTokenMismatch"
)

var (
Expand Down
14 changes: 13 additions & 1 deletion service/worker/workerdeployment/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package workerdeployment

import (
"bytes"
"slices"
"time"

Expand All @@ -48,6 +49,7 @@ type (
metrics sdkclient.MetricsHandler
lock workflow.Mutex
pendingUpdates int
conflictToken []byte
}
)

Expand All @@ -68,6 +70,7 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
d.State = &deploymentspb.WorkerDeploymentLocalState{}
d.State.CreateTime = timestamppb.New(time.Now())
d.State.RoutingConfig = &deploymentpb.RoutingConfig{}
d.State.ConflictToken, _ = workflow.Now(ctx).MarshalBinary()
}

var pendingUpdates int
Expand Down Expand Up @@ -140,6 +143,9 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
}

func (d *WorkflowRunner) validateSetRampingVersion(args *deploymentspb.SetRampingVersionArgs) error {
if args.ConflictToken != nil && !bytes.Equal(args.ConflictToken, d.State.ConflictToken) {
return temporal.NewApplicationError("conflict token mismatch", errConflictTokenMismatchType)
}
if args.Version == d.State.RoutingConfig.RampingVersion && args.Percentage == d.State.RoutingConfig.RampingVersionPercentage {
d.logger.Info("version already ramping, no change")
return temporal.NewApplicationError("version already ramping, no change", errNoChangeType)
Expand Down Expand Up @@ -226,6 +232,7 @@ func (d *WorkflowRunner) handleSetRampingVersion(ctx workflow.Context, args *dep
d.State.RoutingConfig.RampingVersion = newRampingVersion
d.State.RoutingConfig.RampingVersionPercentage = args.Percentage
d.State.RoutingConfig.RampingVersionChangedTime = rampingVersionUpdateTime
d.State.ConflictToken, _ = routingUpdateTime.AsTime().MarshalBinary()

// update memo
if err = d.updateMemo(ctx); err != nil {
Expand All @@ -235,6 +242,7 @@ func (d *WorkflowRunner) handleSetRampingVersion(ctx workflow.Context, args *dep
return &deploymentspb.SetRampingVersionResponse{
PreviousVersion: prevRampingVersion,
PreviousPercentage: prevRampingVersionPercentage,
ConflictToken: d.State.ConflictToken,
}, nil

}
Expand Down Expand Up @@ -283,10 +291,12 @@ func (d *WorkflowRunner) handleDeleteVersion(ctx workflow.Context, args *deploym
}

func (d *WorkflowRunner) validateSetCurrent(args *deploymentspb.SetCurrentVersionArgs) error {
if args.ConflictToken != nil && !bytes.Equal(args.ConflictToken, d.State.ConflictToken) {
return temporal.NewApplicationError("conflict token mismatch", errConflictTokenMismatchType)
}
if d.State.RoutingConfig.CurrentVersion == args.Version {
return temporal.NewApplicationError("no change", errNoChangeType)
}

return nil
}

Expand Down Expand Up @@ -334,6 +344,7 @@ func (d *WorkflowRunner) handleSetCurrent(ctx workflow.Context, args *deployment
// update local state
d.State.RoutingConfig.CurrentVersion = args.Version
d.State.RoutingConfig.CurrentVersionChangedTime = updateTime
d.State.ConflictToken, _ = updateTime.AsTime().MarshalBinary()

// unset ramping version if it was set to current version
if d.State.RoutingConfig.CurrentVersion == d.State.RoutingConfig.RampingVersion {
Expand All @@ -349,6 +360,7 @@ func (d *WorkflowRunner) handleSetCurrent(ctx workflow.Context, args *deployment

return &deploymentspb.SetCurrentVersionResponse{
PreviousVersion: prevCurrentVersion,
ConflictToken: d.State.ConflictToken,
}, nil

}
Expand Down
66 changes: 66 additions & 0 deletions tests/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,72 @@ func (s *WorkerDeploymentSuite) TestDescribeWorkerDeployment_SetCurrentVersion()
}, time.Second*10, time.Millisecond*1000)
}

func (s *WorkerDeploymentSuite) TestConflictToken_Describe_SetCurrent_SetRamping() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
tv := testvars.New(s)

firstVersion := tv.WithBuildIDNumber(1)
secondVersion := tv.WithBuildIDNumber(2)

// Start deployment version workflow + worker-deployment workflow. Only one version is stared manually
// to prevent erroring out in the successive DescribeWorkerDeployment call.
go s.pollFromDeployment(ctx, firstVersion)

var cT []byte
// No current deployment version set.
s.EventuallyWithT(func(t *assert.CollectT) {
a := assert.New(t)

resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
})
a.NoError(err)
a.Equal("", resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetCurrentVersion())
cT = resp.GetConflictToken()
}, time.Second*10, time.Millisecond*1000)

// Set first version as current version
_, _ = s.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, &workflowservice.SetWorkerDeploymentCurrentVersionRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
Version: firstVersion.DeploymentVersionString(),
ConflictToken: cT,
})

s.EventuallyWithT(func(t *assert.CollectT) {
a := assert.New(t)

resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
})
a.NoError(err)
a.Equal(firstVersion.DeploymentVersionString(), resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetCurrentVersion())
cT = resp.GetConflictToken()
}, time.Second*10, time.Millisecond*1000)

// Set second version as ramping version
_, _ = s.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, &workflowservice.SetWorkerDeploymentRampingVersionRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
Version: secondVersion.DeploymentVersionString(),
Percentage: 5,
ConflictToken: cT,
})

s.EventuallyWithT(func(t *assert.CollectT) {
a := assert.New(t)
resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv.DeploymentSeries(),
})
a.NoError(err)
a.Equal(secondVersion.DeploymentVersionString(), resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetRampingVersion())
}, time.Second*10, time.Millisecond*1000)
}

func (s *WorkerDeploymentSuite) TestSetCurrentVersion_Idempotent() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
Expand Down

0 comments on commit 6133947

Please sign in to comment.