Skip to content

Commit

Permalink
updates workflow to handle should poll
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Sep 27, 2024
1 parent 2cce96c commit 4d04ce3
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type CheckAccountStatusRequest struct {
}

type CheckAccountStatusResponse struct {
IsValid bool
Reason *string
IsValid bool
Reason *string
ShouldPoll bool
}

func (a *Activity) CheckAccountStatus(
Expand Down Expand Up @@ -73,7 +74,7 @@ func (a *Activity) CheckAccountStatus(
"reason", withReasonOrDefault(resp.Msg.GetReason()),
)

return &CheckAccountStatusResponse{IsValid: resp.Msg.GetIsValid(), Reason: resp.Msg.Reason}, nil
return &CheckAccountStatusResponse{IsValid: resp.Msg.GetIsValid(), Reason: resp.Msg.Reason, ShouldPoll: resp.Msg.GetShouldPoll()}, nil
}

const defaultReason = "no reason provided"
Expand Down
104 changes: 53 additions & 51 deletions worker/pkg/workflows/datasync/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse,
if actOptResp.RequestedRecordCount != nil && *actOptResp.RequestedRecordCount > 0 {
logger.Info("requested record count of %d", *actOptResp.RequestedRecordCount)
}
var result *accountstatus_activity.CheckAccountStatusResponse
var initialCheckAccountStatusResponse *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId, RequestedRecordCount: actOptResp.RequestedRecordCount}).
Get(ctx, &result)
Get(ctx, &initialCheckAccountStatusResponse)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
cancelHandler()
return nil, fmt.Errorf("unable to continue workflow due to error when checking account status: %w", err)
}
if !result.IsValid {
if !initialCheckAccountStatusResponse.IsValid {
logger.Warn("account is no longer is valid state")
cancelHandler()
reason := "no reason provided"
if result.Reason != nil {
reason = *result.Reason
if initialCheckAccountStatusResponse.Reason != nil {
reason = *initialCheckAccountStatusResponse.Reason
}
return nil, fmt.Errorf("halting job run due to account in invalid state. Reason: %q: %w", reason, invalidAccountStatusError)
}
Expand Down Expand Up @@ -139,57 +139,59 @@ func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse,

// spawn account status checker in loop
stopChan := workflow.NewNamedChannel(ctx, "account-status")
accountStatusTimerDuration := getAccountStatusTimerDuration()
workflow.GoNamed(
ctx,
"account-status-check",
func(ctx workflow.Context) {
shouldStop := false
for {
selector := workflow.NewNamedSelector(ctx, "account-status-select")
timer := workflow.NewTimer(ctx, accountStatusTimerDuration)
selector.AddFuture(timer, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("time receive failed", "error", err)
if initialCheckAccountStatusResponse.ShouldPoll {
accountStatusTimerDuration := getAccountStatusTimerDuration()
workflow.GoNamed(
ctx,
"account-status-check",
func(ctx workflow.Context) {
shouldStop := false
for {
selector := workflow.NewNamedSelector(ctx, "account-status-select")
timer := workflow.NewTimer(ctx, accountStatusTimerDuration)
selector.AddFuture(timer, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("time receive failed", "error", err)
return
}

var result *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId}).
Get(ctx, &result)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
if !result.IsValid {
logger.Warn("account is no longer is valid state")
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
})

selector.Select(ctx)

if shouldStop {
logger.Warn("exiting account status check")
return
}

var result *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId}).
Get(ctx, &result)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
if !result.IsValid {
logger.Warn("account is no longer is valid state")
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
if ctx.Err() != nil {
logger.Warn("workflow canceled due to error or stop signal", "error", ctx.Err())
return
}
})

selector.Select(ctx)

if shouldStop {
logger.Warn("exiting account status check")
return
}
if ctx.Err() != nil {
logger.Warn("workflow canceled due to error or stop signal", "error", ctx.Err())
return
}
}
})
})
}

workselector := workflow.NewSelector(ctx)
var activityErr error
Expand Down

0 comments on commit 4d04ce3

Please sign in to comment.