Skip to content

Commit

Permalink
workflow: use defer func to update state
Browse files Browse the repository at this point in the history
This commit ensures that the status of the collection is only updated at the
very end of the execution of the workflow.

Fixes #16.
  • Loading branch information
sevein committed Jan 9, 2020
1 parent 927da93 commit 8e3ddb4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.
9 changes: 6 additions & 3 deletions internal/workflow/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
wferrors "github.com/artefactual-labs/enduro/internal/workflow/errors"
)

// Cadence doesn't seem to have a concept of unlimited duration. We use this
// constant to represent a long period of time (10 years).
const forever = time.Hour * 24 * 365 * 10

// withActivityOptsForLongLivedRequest returns a workflow context with activity
// options suited for long-running activities without heartbeats
func withActivityOptsForLongLivedRequest(ctx workflow.Context) workflow.Context {
Expand All @@ -35,7 +39,6 @@ func withActivityOptsForLongLivedRequest(ctx workflow.Context) workflow.Context
// The activity is responsible for returning a NRE error. Otherwise it will be
// retried "forever".
func withActivityOptsForHeartbeatedRequest(ctx workflow.Context, heartbeatTimeout time.Duration) workflow.Context {
const forever = time.Hour * 24 * 365 * 10
return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToStartTimeout: forever,
StartToCloseTimeout: forever, // Real cap is workflow.ExecutionStartToCloseTimeout.
Expand All @@ -59,8 +62,8 @@ func withActivityOptsForRequest(ctx workflow.Context) workflow.Context {
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: time.Minute * 10,
ExpirationInterval: time.Minute * 10,
MaximumInterval: time.Minute * 5,
ExpirationInterval: time.Minute * 5,
MaximumAttempts: 20,
NonRetriableErrorReasons: []string{wferrors.NRE},
},
Expand Down
59 changes: 28 additions & 31 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type TransferInfo struct {
// not have a retry policy in place. The user could trigger a new instance via
// the API.
func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.ProcessingWorkflowRequest) error {
logger := workflow.GetLogger(ctx)
tinfo := &TransferInfo{
CollectionID: req.CollectionID,
Event: req.Event,
Expand All @@ -130,6 +131,17 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
return wferrors.NonRetryableError(fmt.Errorf("error persisting collection: %v", err))
}

defer func() {
// Update package status, using a workflow-disconnected context to
// ensure that it runs even after cancellation.
var status = tinfo.Status
if status == collection.StatusInProgress {
status = collection.StatusError
}
var dctx, _ = workflow.NewDisconnectedContext(ctx)
_ = workflow.ExecuteLocalActivity(withLocalActivityOpts(dctx), updatePackageStatusLocalActivity, w.manager.Collection, tinfo, status).Get(activityOpts, nil)
}()

// Load pipeline configuration and hooks.
activityOpts = withLocalActivityOpts(ctx)
err = workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, req.Event.PipelineName, tinfo).Get(activityOpts, &tinfo)
Expand All @@ -138,7 +150,7 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
}

// A session guarantees that activities within it are scheduled on the same
// workflow.
// worker.
var sessCtx workflow.Context
var sessErr error
{
Expand All @@ -163,11 +175,6 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
tinfo.Status = collection.StatusDone
}

// Update package status.
var disconnectedCtx, _ = workflow.NewDisconnectedContext(ctx)
activityOpts = withLocalActivityOpts(disconnectedCtx)
_ = workflow.ExecuteLocalActivity(activityOpts, updatePackageStatusLocalActivity, w.manager.Collection, tinfo, tinfo.Status).Get(activityOpts, nil)

// One of the activities within the session has failed. There's not much we
// can do if the worker died at this point, since what we aim to do next
// depends on resources only available within that worker.
Expand All @@ -176,12 +183,6 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
return sessErr
}

// Schedule deletion of the original in the watched data source.
var deletionTimer workflow.Future
if tinfo.Status == collection.StatusDone && tinfo.Event.RetentionPeriod != nil {
deletionTimer = workflow.NewTimer(ctx, *tinfo.Event.RetentionPeriod)
}

// Activities that we want to run within the session regardless the
// result. E.g. receipts, clean-ups, etc...
// Passing the activity lets the activity determine if the process failed.
Expand Down Expand Up @@ -216,13 +217,21 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
// we'd prefer to give the user to decide what to do next, e.g. start over,
// retry hooks individually, etc...
if receiptsFailed {
var disconnectedCtx, _ = workflow.NewDisconnectedContext(ctx)
activityOpts = withLocalActivityOpts(disconnectedCtx)
_ = workflow.ExecuteLocalActivity(activityOpts, updatePackageStatusLocalActivity, w.manager.Collection, tinfo, collection.StatusError).Get(activityOpts, nil)

tinfo.Status = collection.StatusError
return errors.New("at least one hook/receipt activity failed")
}

// Clean-up is the last activity that depends on the session.
// We'll close it as soon as the activity completes.
if tinfo.Bundle.FullPathBeforeStrip != "" {
activityOpts = withActivityOptsForRequest(sessCtx)
_ = workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
FullPath: tinfo.Bundle.FullPathBeforeStrip,
}).Get(activityOpts, nil)
}

workflow.CompleteSession(sessCtx)

// Hide packages from Archivematica Dashboard.
if tinfo.Status == collection.StatusDone {
futures = []workflow.Future{}
Expand All @@ -234,21 +243,9 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
}
}

// This is the last activity that depends on the session.
if tinfo.Bundle.FullPathBeforeStrip != "" {
activityOpts = withActivityOptsForRequest(sessCtx)
_ = workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
FullPath: tinfo.Bundle.FullPathBeforeStrip,
}).Get(activityOpts, nil)
}

workflow.CompleteSession(sessCtx)

var logger = workflow.GetLogger(ctx)

// Delete original once the timer returns.
if deletionTimer != nil {
err := deletionTimer.Get(ctx, nil)
// Schedule deletion of the original in the watched data source.
if tinfo.Status == collection.StatusDone && tinfo.Event.RetentionPeriod != nil {
err := workflow.NewTimer(ctx, *tinfo.Event.RetentionPeriod).Get(ctx, nil)
if err != nil {
logger.Warn("Retention policy timer failed", zap.Error(err))
} else {
Expand Down

0 comments on commit 8e3ddb4

Please sign in to comment.