diff --git a/internal/workflow/policies.go b/internal/workflow/policies.go index 3fed907f..8a6d110e 100644 --- a/internal/workflow/policies.go +++ b/internal/workflow/policies.go @@ -9,6 +9,10 @@ import ( wferrors "github.com/artefactual-labs/enduro/internal/workflow/errors" ) +// Cadence doesn't seem to have a concept of unlimited duration. We use this +// constant to represent a long period of time (10 years). +const forever = time.Hour * 24 * 365 * 10 + // withActivityOptsForLongLivedRequest returns a workflow context with activity // options suited for long-running activities without heartbeats func withActivityOptsForLongLivedRequest(ctx workflow.Context) workflow.Context { @@ -35,7 +39,6 @@ func withActivityOptsForLongLivedRequest(ctx workflow.Context) workflow.Context // The activity is responsible for returning a NRE error. Otherwise it will be // retried "forever". func withActivityOptsForHeartbeatedRequest(ctx workflow.Context, heartbeatTimeout time.Duration) workflow.Context { - const forever = time.Hour * 24 * 365 * 10 return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ ScheduleToStartTimeout: forever, StartToCloseTimeout: forever, // Real cap is workflow.ExecutionStartToCloseTimeout. @@ -59,8 +62,8 @@ func withActivityOptsForRequest(ctx workflow.Context) workflow.Context { RetryPolicy: &cadence.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2, - MaximumInterval: time.Minute * 10, - ExpirationInterval: time.Minute * 10, + MaximumInterval: time.Minute * 5, + ExpirationInterval: time.Minute * 5, MaximumAttempts: 20, NonRetriableErrorReasons: []string{wferrors.NRE}, }, diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 3498518a..2ea19f7e 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -110,6 +110,7 @@ type TransferInfo struct { // not have a retry policy in place. The user could trigger a new instance via // the API. func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.ProcessingWorkflowRequest) error { + logger := workflow.GetLogger(ctx) tinfo := &TransferInfo{ CollectionID: req.CollectionID, Event: req.Event, @@ -130,6 +131,17 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce return wferrors.NonRetryableError(fmt.Errorf("error persisting collection: %v", err)) } + defer func() { + // Update package status, using a workflow-disconnected context to + // ensure that it runs even after cancellation. + var status = tinfo.Status + if status == collection.StatusInProgress { + status = collection.StatusError + } + var dctx, _ = workflow.NewDisconnectedContext(ctx) + _ = workflow.ExecuteLocalActivity(withLocalActivityOpts(dctx), updatePackageStatusLocalActivity, w.manager.Collection, tinfo, status).Get(activityOpts, nil) + }() + // Load pipeline configuration and hooks. activityOpts = withLocalActivityOpts(ctx) err = workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, req.Event.PipelineName, tinfo).Get(activityOpts, &tinfo) @@ -138,7 +150,7 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce } // A session guarantees that activities within it are scheduled on the same - // workflow. + // worker. var sessCtx workflow.Context var sessErr error { @@ -163,11 +175,6 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce tinfo.Status = collection.StatusDone } - // Update package status. - var disconnectedCtx, _ = workflow.NewDisconnectedContext(ctx) - activityOpts = withLocalActivityOpts(disconnectedCtx) - _ = workflow.ExecuteLocalActivity(activityOpts, updatePackageStatusLocalActivity, w.manager.Collection, tinfo, tinfo.Status).Get(activityOpts, nil) - // One of the activities within the session has failed. There's not much we // can do if the worker died at this point, since what we aim to do next // depends on resources only available within that worker. @@ -176,12 +183,6 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce return sessErr } - // Schedule deletion of the original in the watched data source. - var deletionTimer workflow.Future - if tinfo.Status == collection.StatusDone && tinfo.Event.RetentionPeriod != nil { - deletionTimer = workflow.NewTimer(ctx, *tinfo.Event.RetentionPeriod) - } - // Activities that we want to run within the session regardless the // result. E.g. receipts, clean-ups, etc... // Passing the activity lets the activity determine if the process failed. @@ -216,13 +217,21 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce // we'd prefer to give the user to decide what to do next, e.g. start over, // retry hooks individually, etc... if receiptsFailed { - var disconnectedCtx, _ = workflow.NewDisconnectedContext(ctx) - activityOpts = withLocalActivityOpts(disconnectedCtx) - _ = workflow.ExecuteLocalActivity(activityOpts, updatePackageStatusLocalActivity, w.manager.Collection, tinfo, collection.StatusError).Get(activityOpts, nil) - + tinfo.Status = collection.StatusError return errors.New("at least one hook/receipt activity failed") } + // Clean-up is the last activity that depends on the session. + // We'll close it as soon as the activity completes. + if tinfo.Bundle.FullPathBeforeStrip != "" { + activityOpts = withActivityOptsForRequest(sessCtx) + _ = workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ + FullPath: tinfo.Bundle.FullPathBeforeStrip, + }).Get(activityOpts, nil) + } + + workflow.CompleteSession(sessCtx) + // Hide packages from Archivematica Dashboard. if tinfo.Status == collection.StatusDone { futures = []workflow.Future{} @@ -234,21 +243,9 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce } } - // This is the last activity that depends on the session. - if tinfo.Bundle.FullPathBeforeStrip != "" { - activityOpts = withActivityOptsForRequest(sessCtx) - _ = workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ - FullPath: tinfo.Bundle.FullPathBeforeStrip, - }).Get(activityOpts, nil) - } - - workflow.CompleteSession(sessCtx) - - var logger = workflow.GetLogger(ctx) - - // Delete original once the timer returns. - if deletionTimer != nil { - err := deletionTimer.Get(ctx, nil) + // Schedule deletion of the original in the watched data source. + if tinfo.Status == collection.StatusDone && tinfo.Event.RetentionPeriod != nil { + err := workflow.NewTimer(ctx, *tinfo.Event.RetentionPeriod).Get(ctx, nil) if err != nil { logger.Warn("Retention policy timer failed", zap.Error(err)) } else {