Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deployment version scavenger #7202

Draft
wants to merge 1 commit into
base: cdf/delete
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions service/worker/workerdeployment/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (d *ClientImpl) DescribeWorkerDeployment(
return nil, err
}

deploymentWorkflowID := GenerateWorkflowID(deploymentName)
deploymentWorkflowID := GenerateDeploymentWorkflowID(deploymentName)

req := &historyservice.QueryWorkflowRequest{
NamespaceId: namespaceEntry.ID().String(),
Expand Down Expand Up @@ -561,7 +561,7 @@ func (d *ClientImpl) StartWorkerDeployment(
//revive:disable-next-line:defer
defer d.record("StartWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)()

workflowID := GenerateWorkflowID(deploymentName)
workflowID := GenerateDeploymentWorkflowID(deploymentName)

input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{
NamespaceName: namespaceEntry.Name().String(),
Expand Down Expand Up @@ -766,7 +766,7 @@ func (d *ClientImpl) updateWithStartWorkerDeployment(
return nil, err
}

workflowID := GenerateWorkflowID(deploymentName)
workflowID := GenerateDeploymentWorkflowID(deploymentName)
input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{
NamespaceName: namespaceEntry.Name().String(),
NamespaceId: namespaceEntry.ID().String(),
Expand Down Expand Up @@ -812,7 +812,7 @@ func (d *ClientImpl) AddVersionToWorkerDeployment(
Meta: &updatepb.Meta{UpdateId: requestID, Identity: identity},
}

workflowID := GenerateWorkflowID(deploymentName)
workflowID := GenerateDeploymentWorkflowID(deploymentName)

outcome, err := d.updateWithStart(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion service/worker/workerdeployment/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na
registry.RegisterWorkflowWithOptions(
DrainageWorkflowWithDurations(s.drainageStatusVisibilityGracePeriod(ns.Name().String()), s.drainageStatusRefreshInterval(ns.Name().String())),
workflow.RegisterOptions{Name: WorkerDeploymentDrainageWorkflowType},
)
) // TODO (Carly): If I attach the drainage durations to the workflow function at registration time, how can it be updated? This likely needs to go in Args

versionActivities := &VersionActivities{
namespace: ns,
Expand Down
5 changes: 3 additions & 2 deletions service/worker/workerdeployment/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
ForceCANSignalName = "force-continue-as-new" // for Worker Deployment Version _and_ Worker Deployment wfs
SyncDrainageSignalName = "sync-drainage-status"
TerminateDrainageSignal = "terminate-drainage"
DeleteVersionSignal = "delete-version-signal" // sent from Version wf to Deployment wf

// Queries
QueryDescribeVersion = "describe-version" // for Worker Deployment Version wf
Expand Down Expand Up @@ -121,9 +122,9 @@ func escapeChar(s string) string {
return s
}

// GenerateWorkflowID is a helper that generates a system accepted
// GenerateDeploymentWorkflowID is a helper that generates a system accepted
// workflowID which are used in our Worker Deployment workflows
func GenerateWorkflowID(WorkerDeploymentName string) string {
func GenerateDeploymentWorkflowID(WorkerDeploymentName string) string {
// escaping the reserved workflow delimiter (|) from the inputs, if present
escapedWorkerDeploymentName := escapeChar(WorkerDeploymentName)
return WorkerDeploymentWorkflowIDPrefix + WorkerDeploymentVersionWorkflowIDDelimeter + escapedWorkerDeploymentName
Expand Down
97 changes: 67 additions & 30 deletions service/worker/workerdeployment/version_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ type (
// VersionWorkflowRunner holds the local state for a deployment workflow
VersionWorkflowRunner struct {
*deploymentspb.WorkerDeploymentVersionWorkflowArgs
a *VersionActivities
logger sdklog.Logger
metrics sdkclient.MetricsHandler
lock workflow.Mutex
pendingUpdates int
signalsCompleted bool
drainageWorkflowFuture *workflow.ChildWorkflowFuture
done bool
a *VersionActivities
logger sdklog.Logger
metrics sdkclient.MetricsHandler
lock workflow.Mutex
pendingUpdates int
signalsCompleted bool
drainageWorkflowFuture *workflow.ChildWorkflowFuture
done bool
describeTaskQueueInterval time.Duration
}
)

Expand All @@ -71,10 +72,11 @@ func VersionWorkflow(ctx workflow.Context, versionWorkflowArgs *deploymentspb.Wo
versionWorkflowRunner := &VersionWorkflowRunner{
WorkerDeploymentVersionWorkflowArgs: versionWorkflowArgs,

a: nil,
logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", versionWorkflowArgs.NamespaceName),
metrics: workflow.GetMetricsHandler(ctx).WithTags(map[string]string{"namespace": versionWorkflowArgs.NamespaceName}),
lock: workflow.NewMutex(ctx),
a: nil,
logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", versionWorkflowArgs.NamespaceName),
metrics: workflow.GetMetricsHandler(ctx).WithTags(map[string]string{"namespace": versionWorkflowArgs.NamespaceName}),
lock: workflow.NewMutex(ctx),
describeTaskQueueInterval: 5 * time.Minute, // TODO (Carly): Populate this from dynamic config
}
return versionWorkflowRunner.run(ctx)
}
Expand All @@ -98,6 +100,9 @@ func (d *VersionWorkflowRunner) listenToSignals(ctx workflow.Context) {
d.VersionState.DrainageInfo.Status = newInfo.Status
d.VersionState.DrainageInfo.LastChangedTime = newInfo.LastCheckedTime
}
if d.VersionState.DrainageInfo.Status == enumspb.VERSION_DRAINAGE_STATUS_DRAINED {
workflow.Go(ctx, d.scavengeWhenReady)
}
})

for (!workflow.GetInfo(ctx).GetContinueAsNewSuggested() && !forceCAN) || selector.HasPending() {
Expand All @@ -108,6 +113,28 @@ func (d *VersionWorkflowRunner) listenToSignals(ctx workflow.Context) {
d.signalsCompleted = true
}

func (d *VersionWorkflowRunner) scavengeWhenReady(ctx workflow.Context) {
v := d.VersionState.Version
versionStr := fmt.Sprintf("%s/%s", v.DeploymentName, v.BuildId)
for {
workflow.Sleep(ctx, d.describeTaskQueueInterval)
ready, err := d.readyToDelete(ctx)
d.logger.Error("error describing task queues while scavenging version:", "Error", err, "Version", versionStr)
if ready {
// signal Deployment Workflow that this version is ready to be deleted and return
d.logger.Info("signalling worker-deployment to delete version", "Version", versionStr)
deploymentWFID := GenerateDeploymentWorkflowID(d.VersionState.Version.DeploymentName)
err := workflow.SignalExternalWorkflow(ctx, deploymentWFID, "", DeleteVersionSignal, d.VersionState.Version.BuildId).Get(ctx, nil)
if err != nil {
d.logger.Error("error signalling worker-deployment to delete version:", err)
// TODO: Better backoff / error management?
} else {
return
}
}
}
}

func (d *VersionWorkflowRunner) run(ctx workflow.Context) error {
if d.VersionState == nil {
d.VersionState = &deploymentspb.VersionLocalState{}
Expand Down Expand Up @@ -217,6 +244,32 @@ func (d *VersionWorkflowRunner) stopDrainage(ctx workflow.Context) error {
return nil
}

// describe all task queues in the deployment, if any have pollers, then not ready to delete
func (d *VersionWorkflowRunner) readyToDelete(ctx workflow.Context) (bool, error) {
state := d.GetVersionState()
var tqs []*taskqueuepb.TaskQueue
for tqName, _ := range state.TaskQueueFamilies {
tqs = append(tqs, &taskqueuepb.TaskQueue{
Name: tqName,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL, // TODO (Carly): could this be sticky?
})
}
activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions)
checkPollersReq := &deploymentspb.CheckTaskQueuesHaveNoPollersActivityArgs{
TaskQueues: tqs,
BuildId: d.VersionState.Version.BuildId,
}
var hasPollers bool
err := workflow.ExecuteActivity(activityCtx, d.a.CheckIfTaskQueuesHavePollers, checkPollersReq).Get(ctx, &hasPollers)
if err != nil {
return false, err
}
if hasPollers {
return false, nil
}
return true, nil
}

func (d *VersionWorkflowRunner) validateDeleteVersion() error {
// We can't call DescribeTaskQueue here because that would be an Activity call / non-deterministic.
// Once we have PollersStatus on the version, we can check it here.
Expand All @@ -243,27 +296,11 @@ func (d *VersionWorkflowRunner) handleDeleteVersion(ctx workflow.Context) error
return serviceerror.NewDeadlineExceeded("Update canceled before worker deployment workflow started")
}

state := d.GetVersionState()

// describe all task queues in the deployment, if any have pollers, then cannot delete
var tqs []*taskqueuepb.TaskQueue
for tqName, _ := range state.TaskQueueFamilies {
tqs = append(tqs, &taskqueuepb.TaskQueue{
Name: tqName,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL, // TODO (Carly): could this be sticky?
})
}
activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions)
checkPollersReq := &deploymentspb.CheckTaskQueuesHaveNoPollersActivityArgs{
TaskQueues: tqs,
BuildId: d.VersionState.Version.BuildId,
}
var hasPollers bool
err = workflow.ExecuteActivity(activityCtx, d.a.CheckIfTaskQueuesHavePollers, checkPollersReq).Get(ctx, &hasPollers)
ready, err := d.readyToDelete(ctx)
if err != nil {
return err
}
if hasPollers {
if !ready {
return serviceerror.NewFailedPrecondition("cannot delete, task queues in this version still have pollers")
}
d.done = true
Expand Down
52 changes: 40 additions & 12 deletions service/worker/workerdeployment/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ type (
// WorkflowRunner holds the local state while running a deployment-series workflow
WorkflowRunner struct {
*deploymentspb.WorkerDeploymentWorkflowArgs
a *Activities
logger sdklog.Logger
metrics sdkclient.MetricsHandler
lock workflow.Mutex
pendingUpdates int
done bool
a *Activities
logger sdklog.Logger
metrics sdkclient.MetricsHandler
lock workflow.Mutex
pendingUpdates int
done bool
signalsCompleted bool
}
)

Expand All @@ -71,8 +72,6 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
d.State.RoutingInfo = &deploymentpb.RoutingInfo{}
}

var pendingUpdates int

err := workflow.SetQueryHandler(ctx, QueryDescribeDeployment, func() (*deploymentspb.QueryDescribeWorkerDeploymentResponse, error) {
return &deploymentspb.QueryDescribeWorkerDeploymentResponse{
State: d.State,
Expand Down Expand Up @@ -138,10 +137,8 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
return err
}

// Wait until we can continue as new or are cancelled.
err = workflow.Await(ctx, func() bool {
return (workflow.GetInfo(ctx).GetContinueAsNewSuggested() || d.done) && pendingUpdates == 0
})
// Wait on any pending signals and updates.
err = workflow.Await(ctx, func() bool { return (d.signalsCompleted || d.done) && d.pendingUpdates == 0 })
if err != nil {
return err
}
Expand All @@ -156,6 +153,37 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
return workflow.NewContinueAsNewError(ctx, Workflow, d.WorkerDeploymentWorkflowArgs)
}

func (d *WorkflowRunner) listenToSignals(ctx workflow.Context) {
// Fetch signal channels
forceCANSignalChannel := workflow.GetSignalChannel(ctx, ForceCANSignalName)
forceCAN := false
deleteVersionSignalChannel := workflow.GetSignalChannel(ctx, DeleteVersionSignal)

selector := workflow.NewSelector(ctx)
selector.AddReceive(forceCANSignalChannel, func(c workflow.ReceiveChannel, more bool) {
// Process Signal
forceCAN = true
})
selector.AddReceive(deleteVersionSignalChannel, func(c workflow.ReceiveChannel, more bool) {
var buildId string
c.Receive(ctx, &buildId)
err := d.handleDeleteVersion(ctx, &deploymentspb.DeleteVersionArgs{
Identity: "temporal-system-wf" + GenerateVersionWorkflowID(buildId),
Version: buildId,
})
if err != nil {
// TODO (Carly): retry? the signal will be re-sent in another 5 minutes which is a form of retry
}
})

for (!workflow.GetInfo(ctx).GetContinueAsNewSuggested() && !forceCAN) || selector.HasPending() {
selector.Select(ctx)
}

// Done processing signals before CAN
d.signalsCompleted = true
}

func (d *WorkflowRunner) validateSetWorkerDeploymentRampingVersion(args *deploymentspb.SetWorkerDeploymentRampingVersionArgs) error {
if args.Version == d.State.RoutingInfo.RampingVersion && args.Percentage == d.State.RoutingInfo.RampingVersionPercentage {
d.logger.Info("version already ramping, no change")
Expand Down
Loading