Skip to content

Commit

Permalink
Merge pull request #11 from alibaba/develop
Browse files Browse the repository at this point in the history
fix bug of mapreduce
  • Loading branch information
yaohuitc authored Mar 8, 2024
2 parents f0c9954 + e6b5cbc commit a4ecae5
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 23 deletions.
1 change: 1 addition & 0 deletions internal/batch/base_req_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (rcvr *BaseReqHandler) Start(h ReqHandler) error {
}
rcvr.batchProcessSvc = gopool

rcvr.stopBatchRetrieveCh = make(chan struct{})
rcvr.batchRetrieveFunc = func() {
for {
select {
Expand Down
5 changes: 2 additions & 3 deletions internal/master/common_update_instance_status_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
package master

import (
"github.com/asynkron/protoactor-go/actor"
"google.golang.org/protobuf/proto"

"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/common"
Expand All @@ -28,6 +25,8 @@ import (
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/asynkron/protoactor-go/actor"
"google.golang.org/protobuf/proto"
)

var _ UpdateInstanceStatusHandler = &commonUpdateInstanceStatusHandler{}
Expand Down
12 changes: 6 additions & 6 deletions internal/master/map_task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ import (
)

var (
_ taskmaster.MapTaskMaster = &MapTaskMaster{}
once sync.Once
_ taskmaster.MapTaskMaster = &MapTaskMaster{}
)

type MapTaskMaster struct {
Expand Down Expand Up @@ -84,6 +83,7 @@ type MapTaskMaster struct {
xAttrs *common.MapTaskXAttrs
taskCounter *atomic.Int64
localTaskRouterPath string
once sync.Once
}

func NewMapTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) taskmaster.TaskMaster {
Expand Down Expand Up @@ -139,7 +139,7 @@ func NewMapTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Co
}

func (m *MapTaskMaster) init() {
once.Do(func() {
m.once.Do(func() {
m.TaskMaster.Init()
jobIdAndInstanceId := strconv.FormatInt(m.GetJobInstanceInfo().GetJobId(), 10) + "_" + strconv.FormatInt(m.GetJobInstanceInfo().GetJobInstanceId(), 10)
logger.Infof("jobInstanceId=%d, map master config, pageSize:%d, queueSize:%d, dispatcherSize:%d, workerSize:%d",
Expand All @@ -160,9 +160,9 @@ func (m *MapTaskMaster) init() {
go m.checkWorkerAlive()

// PULL_MODEL specially
if m.xAttrs != nil && m.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) {
go m.notifyWorkerPull()
}
// if m.xAttrs != nil && m.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) {
// go m.notifyWorkerPull()
// }
})
}

Expand Down
29 changes: 16 additions & 13 deletions internal/master/task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func NewTaskMaster(actorCtx actor.Context, jobInstanceInfo *common.JobInstanceIn
}

func (m *TaskMaster) Init() {
m.lock.Lock()
defer m.lock.Unlock()
// m.lock.Lock()
// defer m.lock.Unlock()
if !m.inited {
m.inited = true
}
Expand Down Expand Up @@ -159,7 +159,10 @@ func (m *TaskMaster) UpdateTaskStatus(req *schedulerx.ContainerReportTaskStatusR
return nil
}

func (m *TaskMaster) updateNewInstanceStatus(serialNum, jobInstanceId int64, newStatus processor.InstanceStatus, result string) error {
func (m *TaskMaster) updateNewInstanceStatus(serialNum int64, jobInstanceId int64, newStatus processor.InstanceStatus, result string) error {
m.lock.Lock()
defer m.lock.Unlock()
// fmt.Printf("serialNum=%d, jobInstanceId=%d, status=%s\n", serialNum, jobInstanceId, newStatus)
if err := m.statusHandler.Handle(serialNum, newStatus, result); err != nil {
return fmt.Errorf("update status failed, err=%s", err.Error())
}
Expand Down Expand Up @@ -229,9 +232,9 @@ func (m *TaskMaster) BatchUpdateTaskStatus(taskMaster taskmaster.TaskMaster, req
}

func (m *TaskMaster) KillInstance(reason string) error {
m.lock.Lock()
// m.lock.Lock()
m.killed = true
m.lock.Unlock()
// m.lock.Unlock()

GetTimeScheduler().remove(m.jobInstanceInfo.GetJobInstanceId())
return nil
Expand Down Expand Up @@ -289,20 +292,20 @@ func (m *TaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult {
}

func (m *TaskMaster) GetInstanceStatus() processor.InstanceStatus {
m.lock.RLock()
defer m.lock.RUnlock()
// m.lock.RLock()
// defer m.lock.RUnlock()
return m.instanceStatus
}

func (m *TaskMaster) SetInstanceStatus(instanceStatus processor.InstanceStatus) {
m.lock.Lock()
// m.lock.Lock()
m.instanceStatus = instanceStatus
m.lock.Unlock()
// m.lock.Unlock()
}

func (m *TaskMaster) IsKilled() bool {
m.lock.RLock()
defer m.lock.RUnlock()
// m.lock.RLock()
// defer m.lock.RUnlock()
return m.killed
}

Expand All @@ -316,8 +319,8 @@ func (m *TaskMaster) GetAliveCheckWorkerSet() *utils.ConcurrentSet {
}

func (m *TaskMaster) IsInited() bool {
m.lock.RLock()
defer m.lock.RUnlock()
// m.lock.RLock()
// defer m.lock.RUnlock()
return m.inited
}

Expand Down
2 changes: 1 addition & 1 deletion internal/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ package version

// Version used for statistics and debug
func Version() string {
return "v0.0.2"
return "v0.0.4"
}

0 comments on commit a4ecae5

Please sign in to comment.