Skip to content

Commit

Permalink
fix workflow status did not updated for failed activity
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHH86 committed Jan 24, 2025
1 parent 35099d7 commit a3159a7
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 16 deletions.
2 changes: 1 addition & 1 deletion dataviewer/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,6 @@ type UpdateCardReq struct {

type UpdateWorkflowStatusReq struct {
Req types.UpdateViewerReq
WorkflowErr error
WorkflowErrMsg string
ShouldUpdateViewer bool
}
6 changes: 4 additions & 2 deletions dataviewer/component/dataset_viewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,13 @@ func (c *datasetViewerComponentImpl) getViewerCardData(ctx context.Context, repo
return nil, nil, fmt.Errorf("failed to get viewer by repo_id %d, error: %w", repoID, err)
}

if viewer == nil || viewer.DataviewerJob == nil || len(viewer.DataviewerJob.CardData) < 1 {
if viewer == nil || viewer.DataviewerJob == nil {
return nil, nil, fmt.Errorf("viewer card data is empty")
}

if viewer.DataviewerJob.Status == types.WorkflowPending || viewer.DataviewerJob.Status == types.WorkflowFailed {
if len(viewer.DataviewerJob.CardData) < 1 &&
(viewer.DataviewerJob.Status == types.WorkflowPending ||
viewer.DataviewerJob.Status == types.WorkflowFailed) {
return &dvCom.CataLogRespone{
Status: viewer.DataviewerJob.Status,
Logs: viewer.DataviewerJob.Logs,
Expand Down
23 changes: 13 additions & 10 deletions dataviewer/workflows/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (dva *dataViewerActivityImpl) BeginViewerJob(ctx context.Context) error {
}

func (dva *dataViewerActivityImpl) GetCardFromReadme(ctx context.Context, req types.UpdateViewerReq) (*dvCom.CardData, error) {
var card dvCom.CardData
fileReq := gitserver.GetRepoInfoByPathReq{
Namespace: req.Namespace,
Name: req.Name,
Expand All @@ -98,25 +99,27 @@ func (dva *dataViewerActivityImpl) GetCardFromReadme(ctx context.Context, req ty
}
f, err := dva.gitServer.GetRepoFileContents(context.Background(), fileReq)
if err != nil {
return nil, fmt.Errorf("get %s repo %s/%s branch %s file %s content error: %w", fileReq.RepoType, fileReq.Namespace, fileReq.Name, fileReq.Ref, fileReq.Path, err)
slog.Warn("get repo branch readme.md content error", slog.Any("fileReq", fileReq), slog.Any("err", err))
return &card, nil
}
slog.Debug("getRepoCardData", slog.Any("f.Content", f.Content))
decodedContent, err := base64.StdEncoding.DecodeString(f.Content)
if err != nil {
return nil, fmt.Errorf("decode %s repo %s/%s branch %s file %s content, error: %w", fileReq.RepoType, fileReq.Namespace, fileReq.Name, fileReq.Ref, fileReq.Path, err)
slog.Warn("decode repo branch readme.md content, error", slog.Any("fileReq", fileReq), slog.Any("err", err))
return &card, nil
}
matches := dvCom.REG.FindStringSubmatch(string(decodedContent))
yamlString := ""
if len(matches) > 1 {
yamlString = matches[1]
} else {
return nil, fmt.Errorf("%s repo %s/%s branch %s card yaml config is empty due to invalid content", fileReq.RepoType, fileReq.Namespace, fileReq.Name, fileReq.Ref)
slog.Warn("repo branch card yaml config is empty due to invalid content", slog.Any("fileReq", fileReq), slog.Any("err", err), slog.Any("decodedContent", string(decodedContent)))
return &card, nil
}

var card dvCom.CardData
err = yaml.Unmarshal([]byte(yamlString), &card)
if err != nil {
return nil, fmt.Errorf("unmarshal %s repo %s/%s branch %s yaml error: %w, decoded content: %s", fileReq.RepoType, fileReq.Namespace, fileReq.Name, fileReq.Ref, err, yamlString)
slog.Warn("unmarshal repo branch yaml error", slog.Any("fileReq", fileReq), slog.Any("err", err), slog.Any("yamlString", yamlString))
}
return &card, nil
}
Expand Down Expand Up @@ -805,9 +808,9 @@ func (dva *dataViewerActivityImpl) UpdateWorkflowStatus(ctx context.Context, sta
workflowID := wfCtx.WorkflowExecution.ID
runID := wfCtx.WorkflowExecution.RunID

if status.WorkflowErr != nil {
if len(status.WorkflowErrMsg) > 0 {
slog.Error("run data viewer workflow error", slog.Any("workflowID", workflowID), slog.Any("runID", runID),
slog.Any("status", status), slog.Any("workflowErr", status.WorkflowErr))
slog.Any("status", status), slog.Any("workflowErr", status.WorkflowErrMsg))
}

job, err := dva.viewerStore.GetJob(ctx, workflowID)
Expand All @@ -816,9 +819,9 @@ func (dva *dataViewerActivityImpl) UpdateWorkflowStatus(ctx context.Context, sta
return nil
}

if status.WorkflowErr != nil {
if len(status.WorkflowErrMsg) > 0 {
job.Status = types.WorkflowFailed
job.Logs = status.WorkflowErr.Error()
job.Logs = status.WorkflowErrMsg
} else {
job.Status = types.WorkflowDone
job.Logs = types.WorkflowMsgDone
Expand All @@ -831,7 +834,7 @@ func (dva *dataViewerActivityImpl) UpdateWorkflowStatus(ctx context.Context, sta
slog.Error("update workflow result for ending", slog.Any("workflowID", workflowID), slog.Any("job", job), slog.Any("error", err))
}

if status.WorkflowErr != nil || !status.ShouldUpdateViewer {
if len(status.WorkflowErrMsg) > 0 || !status.ShouldUpdateViewer {
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion dataviewer/workflows/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,14 @@ func updateWorkflowStatus(sessionCtx workflow.Context,
wfErr error,
shouldUpdateViewer bool,
) {
errMsg := ""
if wfErr != nil {
errMsg = wfErr.Error()
}
err := workflow.ExecuteActivity(sessionCtx, DataViewerActivity.UpdateWorkflowStatus,
dvCom.UpdateWorkflowStatusReq{
Req: updateWorkflow.Req,
WorkflowErr: wfErr,
WorkflowErrMsg: errMsg,
ShouldUpdateViewer: shouldUpdateViewer,
},
).Get(sessionCtx, nil)
Expand Down
4 changes: 2 additions & 2 deletions dataviewer/workflows/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestWorkflow_DataviewerWorkflow(t *testing.T) {

tester.mocks.mockact.EXPECT().UpdateWorkflowStatus(mock.Anything, dvCom.UpdateWorkflowStatusReq{
Req: req,
WorkflowErr: nil,
WorkflowErrMsg: "",
ShouldUpdateViewer: true,
}).Return(nil)

Expand Down Expand Up @@ -206,7 +206,7 @@ func TestWorkflow_DataviewerWorkflow(t *testing.T) {

tester.mocks.mockact.EXPECT().UpdateWorkflowStatus(mock.Anything, dvCom.UpdateWorkflowStatusReq{
Req: req,
WorkflowErr: nil,
WorkflowErrMsg: "",
ShouldUpdateViewer: true,
}).Return(nil)

Expand Down

0 comments on commit a3159a7

Please sign in to comment.