Skip to content

Commit

Permalink
fix bug: jobInstance status should be failed if reduce failed
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangXiaomeng authored and xiaomeng.hxm committed Jan 23, 2024
1 parent 8776c9d commit 41f9aca
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions internal/master/map_task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,14 +915,14 @@ func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult
workerProgressCounter.(*common.WorkerProgressCounter).IncrementTotal()
workerProgressCounter.(*common.WorkerProgressCounter).IncrementRunning()

reduceResult, err := mpProcessor.Reduce(jobCtx)
result, err := mpProcessor.Reduce(jobCtx)
if err != nil {
reduceResult = processor.NewProcessResult()
reduceResult.SetFailed()
reduceResult.SetResult("reduce exception: " + err.Error())
result = processor.NewProcessResult()
result.SetFailed()
result.SetResult("reduce exception: " + err.Error())
}

if reduceResult.Status() == processor.InstanceStatusSucceed {
if result.Status() == processor.InstanceStatusSucceed {
if val, ok := m.taskProgressMap.Load(reduceTaskName); ok {
val.(*common.TaskProgressCounter).IncrementOneSuccess()
}
Expand All @@ -937,6 +937,7 @@ func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult
val.(*common.WorkerProgressCounter).IncrementOneFailed()
}
}
return result
}
return reduceResult
}
Expand Down

0 comments on commit 41f9aca

Please sign in to comment.