From 8790d22ddba53ca5bf9781af12082cfa19cfed03 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Fri, 31 Jan 2025 01:39:20 -0800 Subject: [PATCH] add deployment version scavenger --- service/worker/workerdeployment/client.go | 8 +- service/worker/workerdeployment/fx.go | 2 +- service/worker/workerdeployment/util.go | 5 +- .../workerdeployment/version_workflow.go | 97 +++++++++++++------ service/worker/workerdeployment/workflow.go | 52 +++++++--- 5 files changed, 115 insertions(+), 49 deletions(-) diff --git a/service/worker/workerdeployment/client.go b/service/worker/workerdeployment/client.go index 96e3219bfcf..9172b0cf1d3 100644 --- a/service/worker/workerdeployment/client.go +++ b/service/worker/workerdeployment/client.go @@ -283,7 +283,7 @@ func (d *ClientImpl) DescribeWorkerDeployment( return nil, err } - deploymentWorkflowID := GenerateWorkflowID(deploymentName) + deploymentWorkflowID := GenerateDeploymentWorkflowID(deploymentName) req := &historyservice.QueryWorkflowRequest{ NamespaceId: namespaceEntry.ID().String(), @@ -561,7 +561,7 @@ func (d *ClientImpl) StartWorkerDeployment( //revive:disable-next-line:defer defer d.record("StartWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)() - workflowID := GenerateWorkflowID(deploymentName) + workflowID := GenerateDeploymentWorkflowID(deploymentName) input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{ NamespaceName: namespaceEntry.Name().String(), @@ -766,7 +766,7 @@ func (d *ClientImpl) updateWithStartWorkerDeployment( return nil, err } - workflowID := GenerateWorkflowID(deploymentName) + workflowID := GenerateDeploymentWorkflowID(deploymentName) input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{ NamespaceName: namespaceEntry.Name().String(), NamespaceId: namespaceEntry.ID().String(), @@ -812,7 +812,7 @@ func (d *ClientImpl) AddVersionToWorkerDeployment( Meta: &updatepb.Meta{UpdateId: requestID, Identity: identity}, } - workflowID := GenerateWorkflowID(deploymentName) + workflowID := GenerateDeploymentWorkflowID(deploymentName) outcome, err := d.updateWithStart( ctx, diff --git a/service/worker/workerdeployment/fx.go b/service/worker/workerdeployment/fx.go index c5169f7b190..3de1f6b9146 100644 --- a/service/worker/workerdeployment/fx.go +++ b/service/worker/workerdeployment/fx.go @@ -106,7 +106,7 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na registry.RegisterWorkflowWithOptions( DrainageWorkflowWithDurations(s.drainageStatusVisibilityGracePeriod(ns.Name().String()), s.drainageStatusRefreshInterval(ns.Name().String())), workflow.RegisterOptions{Name: WorkerDeploymentDrainageWorkflowType}, - ) + ) // TODO (Carly): If I attach the drainage durations to the workflow function at registration time, how can it be updated? This likely needs to go in Args versionActivities := &VersionActivities{ namespace: ns, diff --git a/service/worker/workerdeployment/util.go b/service/worker/workerdeployment/util.go index e78da851508..72d67458076 100644 --- a/service/worker/workerdeployment/util.go +++ b/service/worker/workerdeployment/util.go @@ -59,6 +59,7 @@ const ( ForceCANSignalName = "force-continue-as-new" // for Worker Deployment Version _and_ Worker Deployment wfs SyncDrainageSignalName = "sync-drainage-status" TerminateDrainageSignal = "terminate-drainage" + DeleteVersionSignal = "delete-version-signal" // sent from Version wf to Deployment wf // Queries QueryDescribeVersion = "describe-version" // for Worker Deployment Version wf @@ -121,9 +122,9 @@ func escapeChar(s string) string { return s } -// GenerateWorkflowID is a helper that generates a system accepted +// GenerateDeploymentWorkflowID is a helper that generates a system accepted // workflowID which are used in our Worker Deployment workflows -func GenerateWorkflowID(WorkerDeploymentName string) string { +func GenerateDeploymentWorkflowID(WorkerDeploymentName string) string { // escaping the reserved workflow delimiter (|) from the inputs, if present escapedWorkerDeploymentName := escapeChar(WorkerDeploymentName) return WorkerDeploymentWorkflowIDPrefix + WorkerDeploymentVersionWorkflowIDDelimeter + escapedWorkerDeploymentName diff --git a/service/worker/workerdeployment/version_workflow.go b/service/worker/workerdeployment/version_workflow.go index eee140b3313..161fb5d305f 100644 --- a/service/worker/workerdeployment/version_workflow.go +++ b/service/worker/workerdeployment/version_workflow.go @@ -46,14 +46,15 @@ type ( // VersionWorkflowRunner holds the local state for a deployment workflow VersionWorkflowRunner struct { *deploymentspb.WorkerDeploymentVersionWorkflowArgs - a *VersionActivities - logger sdklog.Logger - metrics sdkclient.MetricsHandler - lock workflow.Mutex - pendingUpdates int - signalsCompleted bool - drainageWorkflowFuture *workflow.ChildWorkflowFuture - done bool + a *VersionActivities + logger sdklog.Logger + metrics sdkclient.MetricsHandler + lock workflow.Mutex + pendingUpdates int + signalsCompleted bool + drainageWorkflowFuture *workflow.ChildWorkflowFuture + done bool + describeTaskQueueInterval time.Duration } ) @@ -71,10 +72,11 @@ func VersionWorkflow(ctx workflow.Context, versionWorkflowArgs *deploymentspb.Wo versionWorkflowRunner := &VersionWorkflowRunner{ WorkerDeploymentVersionWorkflowArgs: versionWorkflowArgs, - a: nil, - logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", versionWorkflowArgs.NamespaceName), - metrics: workflow.GetMetricsHandler(ctx).WithTags(map[string]string{"namespace": versionWorkflowArgs.NamespaceName}), - lock: workflow.NewMutex(ctx), + a: nil, + logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", versionWorkflowArgs.NamespaceName), + metrics: workflow.GetMetricsHandler(ctx).WithTags(map[string]string{"namespace": versionWorkflowArgs.NamespaceName}), + lock: workflow.NewMutex(ctx), + describeTaskQueueInterval: 5 * time.Minute, // TODO (Carly): Populate this from dynamic config } return versionWorkflowRunner.run(ctx) } @@ -98,6 +100,9 @@ func (d *VersionWorkflowRunner) listenToSignals(ctx workflow.Context) { d.VersionState.DrainageInfo.Status = newInfo.Status d.VersionState.DrainageInfo.LastChangedTime = newInfo.LastCheckedTime } + if d.VersionState.DrainageInfo.Status == enumspb.VERSION_DRAINAGE_STATUS_DRAINED { + workflow.Go(ctx, d.scavengeWhenReady) + } }) for (!workflow.GetInfo(ctx).GetContinueAsNewSuggested() && !forceCAN) || selector.HasPending() { @@ -108,6 +113,28 @@ func (d *VersionWorkflowRunner) listenToSignals(ctx workflow.Context) { d.signalsCompleted = true } +func (d *VersionWorkflowRunner) scavengeWhenReady(ctx workflow.Context) { + v := d.VersionState.Version + versionStr := fmt.Sprintf("%s/%s", v.DeploymentName, v.BuildId) + for { + workflow.Sleep(ctx, d.describeTaskQueueInterval) + ready, err := d.readyToDelete(ctx) + d.logger.Error("error describing task queues while scavenging version:", "Error", err, "Version", versionStr) + if ready { + // signal Deployment Workflow that this version is ready to be deleted and return + d.logger.Info("signalling worker-deployment to delete version", "Version", versionStr) + deploymentWFID := GenerateDeploymentWorkflowID(d.VersionState.Version.DeploymentName) + err := workflow.SignalExternalWorkflow(ctx, deploymentWFID, "", DeleteVersionSignal, d.VersionState.Version.BuildId).Get(ctx, nil) + if err != nil { + d.logger.Error("error signalling worker-deployment to delete version:", err) + // TODO: Better backoff / error management? + } else { + return + } + } + } +} + func (d *VersionWorkflowRunner) run(ctx workflow.Context) error { if d.VersionState == nil { d.VersionState = &deploymentspb.VersionLocalState{} @@ -217,6 +244,32 @@ func (d *VersionWorkflowRunner) stopDrainage(ctx workflow.Context) error { return nil } +// describe all task queues in the deployment, if any have pollers, then not ready to delete +func (d *VersionWorkflowRunner) readyToDelete(ctx workflow.Context) (bool, error) { + state := d.GetVersionState() + var tqs []*taskqueuepb.TaskQueue + for tqName, _ := range state.TaskQueueFamilies { + tqs = append(tqs, &taskqueuepb.TaskQueue{ + Name: tqName, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, // TODO (Carly): could this be sticky? + }) + } + activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions) + checkPollersReq := &deploymentspb.CheckTaskQueuesHaveNoPollersActivityArgs{ + TaskQueues: tqs, + BuildId: d.VersionState.Version.BuildId, + } + var hasPollers bool + err := workflow.ExecuteActivity(activityCtx, d.a.CheckIfTaskQueuesHavePollers, checkPollersReq).Get(ctx, &hasPollers) + if err != nil { + return false, err + } + if hasPollers { + return false, nil + } + return true, nil +} + func (d *VersionWorkflowRunner) validateDeleteVersion() error { // We can't call DescribeTaskQueue here because that would be an Activity call / non-deterministic. // Once we have PollersStatus on the version, we can check it here. @@ -243,27 +296,11 @@ func (d *VersionWorkflowRunner) handleDeleteVersion(ctx workflow.Context) error return serviceerror.NewDeadlineExceeded("Update canceled before worker deployment workflow started") } - state := d.GetVersionState() - - // describe all task queues in the deployment, if any have pollers, then cannot delete - var tqs []*taskqueuepb.TaskQueue - for tqName, _ := range state.TaskQueueFamilies { - tqs = append(tqs, &taskqueuepb.TaskQueue{ - Name: tqName, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, // TODO (Carly): could this be sticky? - }) - } - activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions) - checkPollersReq := &deploymentspb.CheckTaskQueuesHaveNoPollersActivityArgs{ - TaskQueues: tqs, - BuildId: d.VersionState.Version.BuildId, - } - var hasPollers bool - err = workflow.ExecuteActivity(activityCtx, d.a.CheckIfTaskQueuesHavePollers, checkPollersReq).Get(ctx, &hasPollers) + ready, err := d.readyToDelete(ctx) if err != nil { return err } - if hasPollers { + if !ready { return serviceerror.NewFailedPrecondition("cannot delete, task queues in this version still have pollers") } d.done = true diff --git a/service/worker/workerdeployment/workflow.go b/service/worker/workerdeployment/workflow.go index 2740b2cb20d..99914535e0c 100644 --- a/service/worker/workerdeployment/workflow.go +++ b/service/worker/workerdeployment/workflow.go @@ -43,12 +43,13 @@ type ( // WorkflowRunner holds the local state while running a deployment-series workflow WorkflowRunner struct { *deploymentspb.WorkerDeploymentWorkflowArgs - a *Activities - logger sdklog.Logger - metrics sdkclient.MetricsHandler - lock workflow.Mutex - pendingUpdates int - done bool + a *Activities + logger sdklog.Logger + metrics sdkclient.MetricsHandler + lock workflow.Mutex + pendingUpdates int + done bool + signalsCompleted bool } ) @@ -71,8 +72,6 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error { d.State.RoutingInfo = &deploymentpb.RoutingInfo{} } - var pendingUpdates int - err := workflow.SetQueryHandler(ctx, QueryDescribeDeployment, func() (*deploymentspb.QueryDescribeWorkerDeploymentResponse, error) { return &deploymentspb.QueryDescribeWorkerDeploymentResponse{ State: d.State, @@ -138,10 +137,8 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error { return err } - // Wait until we can continue as new or are cancelled. - err = workflow.Await(ctx, func() bool { - return (workflow.GetInfo(ctx).GetContinueAsNewSuggested() || d.done) && pendingUpdates == 0 - }) + // Wait on any pending signals and updates. + err = workflow.Await(ctx, func() bool { return (d.signalsCompleted || d.done) && d.pendingUpdates == 0 }) if err != nil { return err } @@ -156,6 +153,37 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error { return workflow.NewContinueAsNewError(ctx, Workflow, d.WorkerDeploymentWorkflowArgs) } +func (d *WorkflowRunner) listenToSignals(ctx workflow.Context) { + // Fetch signal channels + forceCANSignalChannel := workflow.GetSignalChannel(ctx, ForceCANSignalName) + forceCAN := false + deleteVersionSignalChannel := workflow.GetSignalChannel(ctx, DeleteVersionSignal) + + selector := workflow.NewSelector(ctx) + selector.AddReceive(forceCANSignalChannel, func(c workflow.ReceiveChannel, more bool) { + // Process Signal + forceCAN = true + }) + selector.AddReceive(deleteVersionSignalChannel, func(c workflow.ReceiveChannel, more bool) { + var buildId string + c.Receive(ctx, &buildId) + err := d.handleDeleteVersion(ctx, &deploymentspb.DeleteVersionArgs{ + Identity: "temporal-system-wf" + GenerateVersionWorkflowID(buildId), + Version: buildId, + }) + if err != nil { + // TODO (Carly): retry? the signal will be re-sent in another 5 minutes which is a form of retry + } + }) + + for (!workflow.GetInfo(ctx).GetContinueAsNewSuggested() && !forceCAN) || selector.HasPending() { + selector.Select(ctx) + } + + // Done processing signals before CAN + d.signalsCompleted = true +} + func (d *WorkflowRunner) validateSetWorkerDeploymentRampingVersion(args *deploymentspb.SetWorkerDeploymentRampingVersionArgs) error { if args.Version == d.State.RoutingInfo.RampingVersion && args.Percentage == d.State.RoutingInfo.RampingVersionPercentage { d.logger.Info("version already ramping, no change")