From a3159a79434d1126808fd127c47960c129f40bd6 Mon Sep 17 00:00:00 2001 From: Sean <38311363@qq.com> Date: Fri, 24 Jan 2025 12:05:58 +0800 Subject: [PATCH] fix workflow status did not updated for failed activity --- dataviewer/common/types.go | 2 +- dataviewer/component/dataset_viewer.go | 6 ++++-- dataviewer/workflows/activity.go | 23 +++++++++++++---------- dataviewer/workflows/workflow.go | 6 +++++- dataviewer/workflows/workflow_test.go | 4 ++-- 5 files changed, 25 insertions(+), 16 deletions(-) diff --git a/dataviewer/common/types.go b/dataviewer/common/types.go index 1d1bb552..4a91639d 100644 --- a/dataviewer/common/types.go +++ b/dataviewer/common/types.go @@ -169,6 +169,6 @@ type UpdateCardReq struct { type UpdateWorkflowStatusReq struct { Req types.UpdateViewerReq - WorkflowErr error + WorkflowErrMsg string ShouldUpdateViewer bool } diff --git a/dataviewer/component/dataset_viewer.go b/dataviewer/component/dataset_viewer.go index 176140d3..d62d542e 100644 --- a/dataviewer/component/dataset_viewer.go +++ b/dataviewer/component/dataset_viewer.go @@ -253,11 +253,13 @@ func (c *datasetViewerComponentImpl) getViewerCardData(ctx context.Context, repo return nil, nil, fmt.Errorf("failed to get viewer by repo_id %d, error: %w", repoID, err) } - if viewer == nil || viewer.DataviewerJob == nil || len(viewer.DataviewerJob.CardData) < 1 { + if viewer == nil || viewer.DataviewerJob == nil { return nil, nil, fmt.Errorf("viewer card data is empty") } - if viewer.DataviewerJob.Status == types.WorkflowPending || viewer.DataviewerJob.Status == types.WorkflowFailed { + if len(viewer.DataviewerJob.CardData) < 1 && + (viewer.DataviewerJob.Status == types.WorkflowPending || + viewer.DataviewerJob.Status == types.WorkflowFailed) { return &dvCom.CataLogRespone{ Status: viewer.DataviewerJob.Status, Logs: viewer.DataviewerJob.Logs, diff --git a/dataviewer/workflows/activity.go b/dataviewer/workflows/activity.go index 4dc174f5..8d216372 100644 --- a/dataviewer/workflows/activity.go +++ b/dataviewer/workflows/activity.go @@ -89,6 +89,7 @@ func (dva *dataViewerActivityImpl) BeginViewerJob(ctx context.Context) error { } func (dva *dataViewerActivityImpl) GetCardFromReadme(ctx context.Context, req types.UpdateViewerReq) (*dvCom.CardData, error) { + var card dvCom.CardData fileReq := gitserver.GetRepoInfoByPathReq{ Namespace: req.Namespace, Name: req.Name, @@ -98,25 +99,27 @@ func (dva *dataViewerActivityImpl) GetCardFromReadme(ctx context.Context, req ty } f, err := dva.gitServer.GetRepoFileContents(context.Background(), fileReq) if err != nil { - return nil, fmt.Errorf("get %s repo %s/%s branch %s file %s content error: %w", fileReq.RepoType, fileReq.Namespace, fileReq.Name, fileReq.Ref, fileReq.Path, err) + slog.Warn("get repo branch readme.md content error", slog.Any("fileReq", fileReq), slog.Any("err", err)) + return &card, nil } slog.Debug("getRepoCardData", slog.Any("f.Content", f.Content)) decodedContent, err := base64.StdEncoding.DecodeString(f.Content) if err != nil { - return nil, fmt.Errorf("decode %s repo %s/%s branch %s file %s content, error: %w", fileReq.RepoType, fileReq.Namespace, fileReq.Name, fileReq.Ref, fileReq.Path, err) + slog.Warn("decode repo branch readme.md content, error", slog.Any("fileReq", fileReq), slog.Any("err", err)) + return &card, nil } matches := dvCom.REG.FindStringSubmatch(string(decodedContent)) yamlString := "" if len(matches) > 1 { yamlString = matches[1] } else { - return nil, fmt.Errorf("%s repo %s/%s branch %s card yaml config is empty due to invalid content", fileReq.RepoType, fileReq.Namespace, fileReq.Name, fileReq.Ref) + slog.Warn("repo branch card yaml config is empty due to invalid content", slog.Any("fileReq", fileReq), slog.Any("err", err), slog.Any("decodedContent", string(decodedContent))) + return &card, nil } - var card dvCom.CardData err = yaml.Unmarshal([]byte(yamlString), &card) if err != nil { - return nil, fmt.Errorf("unmarshal %s repo %s/%s branch %s yaml error: %w, decoded content: %s", fileReq.RepoType, fileReq.Namespace, fileReq.Name, fileReq.Ref, err, yamlString) + slog.Warn("unmarshal repo branch yaml error", slog.Any("fileReq", fileReq), slog.Any("err", err), slog.Any("yamlString", yamlString)) } return &card, nil } @@ -805,9 +808,9 @@ func (dva *dataViewerActivityImpl) UpdateWorkflowStatus(ctx context.Context, sta workflowID := wfCtx.WorkflowExecution.ID runID := wfCtx.WorkflowExecution.RunID - if status.WorkflowErr != nil { + if len(status.WorkflowErrMsg) > 0 { slog.Error("run data viewer workflow error", slog.Any("workflowID", workflowID), slog.Any("runID", runID), - slog.Any("status", status), slog.Any("workflowErr", status.WorkflowErr)) + slog.Any("status", status), slog.Any("workflowErr", status.WorkflowErrMsg)) } job, err := dva.viewerStore.GetJob(ctx, workflowID) @@ -816,9 +819,9 @@ func (dva *dataViewerActivityImpl) UpdateWorkflowStatus(ctx context.Context, sta return nil } - if status.WorkflowErr != nil { + if len(status.WorkflowErrMsg) > 0 { job.Status = types.WorkflowFailed - job.Logs = status.WorkflowErr.Error() + job.Logs = status.WorkflowErrMsg } else { job.Status = types.WorkflowDone job.Logs = types.WorkflowMsgDone @@ -831,7 +834,7 @@ func (dva *dataViewerActivityImpl) UpdateWorkflowStatus(ctx context.Context, sta slog.Error("update workflow result for ending", slog.Any("workflowID", workflowID), slog.Any("job", job), slog.Any("error", err)) } - if status.WorkflowErr != nil || !status.ShouldUpdateViewer { + if len(status.WorkflowErrMsg) > 0 || !status.ShouldUpdateViewer { return nil } diff --git a/dataviewer/workflows/workflow.go b/dataviewer/workflows/workflow.go index 98d4eeac..50d69112 100644 --- a/dataviewer/workflows/workflow.go +++ b/dataviewer/workflows/workflow.go @@ -221,10 +221,14 @@ func updateWorkflowStatus(sessionCtx workflow.Context, wfErr error, shouldUpdateViewer bool, ) { + errMsg := "" + if wfErr != nil { + errMsg = wfErr.Error() + } err := workflow.ExecuteActivity(sessionCtx, DataViewerActivity.UpdateWorkflowStatus, dvCom.UpdateWorkflowStatusReq{ Req: updateWorkflow.Req, - WorkflowErr: wfErr, + WorkflowErrMsg: errMsg, ShouldUpdateViewer: shouldUpdateViewer, }, ).Get(sessionCtx, nil) diff --git a/dataviewer/workflows/workflow_test.go b/dataviewer/workflows/workflow_test.go index 19609e71..a4dfb3ef 100644 --- a/dataviewer/workflows/workflow_test.go +++ b/dataviewer/workflows/workflow_test.go @@ -128,7 +128,7 @@ func TestWorkflow_DataviewerWorkflow(t *testing.T) { tester.mocks.mockact.EXPECT().UpdateWorkflowStatus(mock.Anything, dvCom.UpdateWorkflowStatusReq{ Req: req, - WorkflowErr: nil, + WorkflowErrMsg: "", ShouldUpdateViewer: true, }).Return(nil) @@ -206,7 +206,7 @@ func TestWorkflow_DataviewerWorkflow(t *testing.T) { tester.mocks.mockact.EXPECT().UpdateWorkflowStatus(mock.Anything, dvCom.UpdateWorkflowStatusReq{ Req: req, - WorkflowErr: nil, + WorkflowErrMsg: "", ShouldUpdateViewer: true, }).Return(nil)