From 41f9aca3d51de0f2ec0c3dd2952bf88c67193283 Mon Sep 17 00:00:00 2001 From: HuangXiaomeng Date: Mon, 22 Jan 2024 16:42:02 +0800 Subject: [PATCH] fix bug: jobInstance status should be failed if reduce failed --- internal/master/map_task_master.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/master/map_task_master.go b/internal/master/map_task_master.go index 9f6ccb3..b1d14f0 100644 --- a/internal/master/map_task_master.go +++ b/internal/master/map_task_master.go @@ -915,14 +915,14 @@ func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult workerProgressCounter.(*common.WorkerProgressCounter).IncrementTotal() workerProgressCounter.(*common.WorkerProgressCounter).IncrementRunning() - reduceResult, err := mpProcessor.Reduce(jobCtx) + result, err := mpProcessor.Reduce(jobCtx) if err != nil { - reduceResult = processor.NewProcessResult() - reduceResult.SetFailed() - reduceResult.SetResult("reduce exception: " + err.Error()) + result = processor.NewProcessResult() + result.SetFailed() + result.SetResult("reduce exception: " + err.Error()) } - if reduceResult.Status() == processor.InstanceStatusSucceed { + if result.Status() == processor.InstanceStatusSucceed { if val, ok := m.taskProgressMap.Load(reduceTaskName); ok { val.(*common.TaskProgressCounter).IncrementOneSuccess() } @@ -937,6 +937,7 @@ func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult val.(*common.WorkerProgressCounter).IncrementOneFailed() } } + return result } return reduceResult }