diff --git a/client.go b/client.go index eaa5532..b48d933 100644 --- a/client.go +++ b/client.go @@ -144,8 +144,8 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) { logger.Infof("SchedulerX server connected, remoteAddr=%s, localAddr=%s", conn.RemoteAddr(), conn.LocalAddr().String()) } - tasks := tasks.GetTaskMap() - masterpool.InitTaskMasterPool(masterpool.NewTaskMasterPool(tasks)) + taskMap := tasks.GetTaskMap() + masterpool.InitTaskMasterPool(masterpool.NewTaskMasterPool(taskMap)) // Init actors actorSystem := actor.NewActorSystem() @@ -162,7 +162,7 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) { return &Client{ cfg: cfg, opts: options, - tasks: tasks, + tasks: taskMap, actorSystem: actorSystem, }, nil } diff --git a/config/worker_config.go b/config/worker_config.go index 42271db..eb3743d 100644 --- a/config/worker_config.go +++ b/config/worker_config.go @@ -139,6 +139,12 @@ func WithIface(iface string) Option { } } +func WithQueueSize(queueSize int32) Option { + return func(config *WorkerConfig) { + config.queueSize = queueSize + } +} + func NewWorkerConfig(opts ...Option) *WorkerConfig { once.Do(func() { workerConfig = defaultWorkerConfig() @@ -166,6 +172,7 @@ type WorkerConfig struct { taskBodySizeMax int32 grpcPort int32 iface string + queueSize int32 } func (w *WorkerConfig) IsShareContainerPool() bool { @@ -232,6 +239,10 @@ func (w *WorkerConfig) Iface() string { return w.iface } +func (w *WorkerConfig) QueueSize() int32 { + return w.queueSize +} + func defaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ isSecondDelayIntervalMS: false, @@ -248,5 +259,6 @@ func defaultWorkerConfig() *WorkerConfig { workerParallelTaskMaxSize: constants.ParallelTaskListSizeMaxDefault, workerMapPageSize: constants.WorkerMapPageSizeDefault, taskBodySizeMax: constants.TaskBodySizeMaxDefault, + queueSize: constants.MapMasterQueueSizeDefault, } } diff --git a/internal/actor/common/channel.go b/internal/actor/common/channel.go index 0768861..6d8469c 100644 --- a/internal/actor/common/channel.go +++ b/internal/actor/common/channel.go @@ -16,22 +16,25 @@ package actorcomm -import "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" +import ( + "github.com/alibaba/schedulerx-worker-go/config" + "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" +) var ( - sxMsgCh = make(chan interface{}, 10*10000) - heartbeatMsgCh = make(chan interface{}, 10*10000) - taskMasterMsgCh = make(chan interface{}, 10*10000) - containerRouterMsgCh = make(chan interface{}, 10*10000) - atLeastOnceDeliveryMsgCh = make(chan interface{}, 10*10000) - - workerMapTaskRespMsgCh = make(chan *schedulerx.WorkerMapTaskResponse, 10*10000) - workerBatchUpdateTaskStatusRespMsgCh = make(chan *schedulerx.WorkerBatchUpdateTaskStatusResponse, 10*10000) - workerQueryJobInstanceStatusRespMsgCh = make(chan *schedulerx.WorkerQueryJobInstanceStatusResponse, 10*10000) - workerClearTasksRespMsgCh = make(chan *schedulerx.WorkerClearTasksResponse, 10*10000) - workerBatchCreateTasksRespMsgCh = make(chan *schedulerx.WorkerBatchCreateTasksResponse, 10*10000) - workerPullTasksRespMsgCh = make(chan *schedulerx.WorkerPullTasksResponse, 10*10000) - workerReportTaskListStatusRespMsgCh = make(chan *schedulerx.WorkerReportTaskListStatusResponse, 10*10000) + sxMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize()) + heartbeatMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize()) + taskMasterMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize()) + containerRouterMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize()) + atLeastOnceDeliveryMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize()) + + workerMapTaskRespMsgCh = make(chan *schedulerx.WorkerMapTaskResponse, config.GetWorkerConfig().QueueSize()) + workerBatchUpdateTaskStatusRespMsgCh = make(chan *schedulerx.WorkerBatchUpdateTaskStatusResponse, config.GetWorkerConfig().QueueSize()) + workerQueryJobInstanceStatusRespMsgCh = make(chan *schedulerx.WorkerQueryJobInstanceStatusResponse, config.GetWorkerConfig().QueueSize()) + workerClearTasksRespMsgCh = make(chan *schedulerx.WorkerClearTasksResponse, config.GetWorkerConfig().QueueSize()) + workerBatchCreateTasksRespMsgCh = make(chan *schedulerx.WorkerBatchCreateTasksResponse, config.GetWorkerConfig().QueueSize()) + workerPullTasksRespMsgCh = make(chan *schedulerx.WorkerPullTasksResponse, config.GetWorkerConfig().QueueSize()) + workerReportTaskListStatusRespMsgCh = make(chan *schedulerx.WorkerReportTaskListStatusResponse, config.GetWorkerConfig().QueueSize()) ) func SxMsgReceiver() chan interface{} { diff --git a/internal/actor/container_actor.go b/internal/actor/container_actor.go index dd19f52..ef7ce75 100644 --- a/internal/actor/container_actor.go +++ b/internal/actor/container_actor.go @@ -42,6 +42,13 @@ import ( ) var _ actor.Actor = &containerActor{} +var defaultActorPool, _ = ants.NewPool( + ants.DefaultAntsPoolSize, + ants.WithPanicHandler(func(i interface{}) { + if r := recover(); r != nil { + logger.Errorf("Panic happened in containerStarter, %v\n%s", r, debug.Stack()) + } + })) type containerActor struct { enableShareContainerPool bool @@ -53,19 +60,12 @@ type containerActor struct { } func newContainerActor() *containerActor { - gopool, _ := ants.NewPool( - ants.DefaultAntsPoolSize, - ants.WithPanicHandler(func(i interface{}) { - if r := recover(); r != nil { - logger.Errorf("Panic happened in containerStarter, %v\n%s", r, debug.Stack()) - } - })) return &containerActor{ enableShareContainerPool: config.GetWorkerConfig().IsShareContainerPool(), batchSize: config.GetWorkerConfig().WorkerMapPageSize(), statusReqBatchHandlerPool: batch.GetContainerStatusReqHandlerPool(), containerPool: container.GetThreadContainerPool(), - containerStarter: gopool, + containerStarter: defaultActorPool, lock: sync.Mutex{}, } } @@ -205,26 +205,31 @@ func (a *containerActor) handleKillContainer(actorCtx actor.Context, req *schedu func (a *containerActor) handleDestroyContainerPool(actorCtx actor.Context, req *schedulerx.MasterDestroyContainerPoolRequest) { if !a.enableShareContainerPool { - handler, ok := a.statusReqBatchHandlerPool.GetHandlers().Load(req.GetJobInstanceId()) - if ok { - a.lock.Lock() - defer a.lock.Unlock() + // handler, ok := a.statusReqBatchHandlerPool.GetHandlers().Load(req.GetJobInstanceId()) + // if ok { + a.lock.Lock() + defer a.lock.Unlock() + logger.Infof("handleDestroyContainerPool from jobInstanceId=%v.", req.GetJobInstanceId()) + a.statusReqBatchHandlerPool.Stop(req.GetJobInstanceId()) + a.containerPool.DestroyByInstance(req.GetJobInstanceId()) + /* if h, ok := handler.(*batch.ContainerStatusReqHandler); ok { - if latestRequest := h.GetLatestRequest(); latestRequest != nil { - reportTaskStatusRequest, ok := latestRequest.(*schedulerx.ContainerReportTaskStatusRequest) - if ok { - if reportTaskStatusRequest.GetSerialNum() != req.GetSerialNum() { - logger.Infof("skip handleDestroyContainerPool cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum()) - return + + if latestRequest := h.GetLatestRequest(); latestRequest != nil { + reportTaskStatusRequest, ok := latestRequest.(*schedulerx.ContainerReportTaskStatusRequest) + if ok { + if reportTaskStatusRequest.GetSerialNum() != req.GetSerialNum() { + logger.Infof("skip handleDestroyContainerPool cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum()) + return + } + logger.Infof("handleDestroyContainerPool from cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum()) + a.statusReqBatchHandlerPool.Stop(req.GetJobInstanceId()) + a.containerPool.DestroyByInstance(req.GetJobInstanceId()) } - } else { - logger.Infof("handleDestroyContainerPool from cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum()) - a.containerPool.DestroyByInstance(req.GetJobInstanceId()) - a.statusReqBatchHandlerPool.Stop(req.GetJobInstanceId()) } - } } - } + */ + // } } response := &schedulerx.MasterDestroyContainerPoolResponse{ Success: proto.Bool(true), @@ -236,7 +241,7 @@ func (a *containerActor) handleDestroyContainerPool(actorCtx actor.Context, req logger.Warnf("Cannot send MasterKillContainerResponse due to sender is unknown in handleDestroyContainerPool of containerActor, request=%+v", req) } - a.containerPool.ReleaseInstanceLock(req.GetJobInstanceId()) + // a.containerPool.ReleaseInstanceLock(req.GetJobInstanceId()) } func (a *containerActor) killInstance(jobId, jobInstanceId int64) { @@ -250,6 +255,7 @@ func (a *containerActor) killInstance(jobId, jobInstanceId int64) { if strings.HasPrefix(uniqueId, prefixKey) { container.Kill() containerMap.Delete(uniqueId) + a.statusReqBatchHandlerPool.Stop(jobInstanceId) } return true }) @@ -282,10 +288,12 @@ func (a *containerActor) startContainer(actorCtx actor.Context, req *schedulerx. } if !a.statusReqBatchHandlerPool.Contains(statusReqBatchHandlerKey) { // support 1.5 million requests - reqQueue := batch.NewReqQueue(150 * 10000) + reqQueue := batch.NewReqQueue(config.GetWorkerConfig().QueueSize()) a.statusReqBatchHandlerPool.Start( statusReqBatchHandlerKey, - batch.NewContainerStatusReqHandler(statusReqBatchHandlerKey, 1, 1, a.batchSize, reqQueue, req.GetInstanceMasterAkkaPath())) + batch.NewContainerStatusReqHandler(statusReqBatchHandlerKey, 1, 1, + a.batchSize, reqQueue, req.GetInstanceMasterAkkaPath()), + ) } consumerNum := int32(constants.ConsumerNumDefault) if req.GetConsumerNum() > 0 { diff --git a/internal/actor/job_instance_actor.go b/internal/actor/job_instance_actor.go index 20ee8dd..a0b2540 100644 --- a/internal/actor/job_instance_actor.go +++ b/internal/actor/job_instance_actor.go @@ -19,10 +19,12 @@ package actor import ( "context" "fmt" - "github.com/alibaba/schedulerx-worker-go/processor" - "github.com/tidwall/gjson" "time" + "github.com/tidwall/gjson" + + "github.com/alibaba/schedulerx-worker-go/processor" + "github.com/asynkron/protoactor-go/actor" "google.golang.org/protobuf/proto" @@ -67,16 +69,16 @@ func (a *jobInstanceActor) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *schedulerx.WorkerBatchReportTaskStatuesResponse: // send to atLeastOnceDeliveryRoutingActor - //actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ + // actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ // Msg: msg, - //} + // } // FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s a.handleReportWorkerStatus(ctx, ctx.Message()) case *schedulerx.WorkerReportJobInstanceStatusResponse: // send to atLeastOnceDeliveryRoutingActor - //actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ + // actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ // Msg: msg, - //} + // } // FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s a.handleReportWorkerStatus(ctx, ctx.Message()) case *actorcomm.SchedulerWrappedMsg: @@ -141,7 +143,7 @@ func (a *jobInstanceActor) handleSubmitJobInstance(actorCtx actor.Context, msg * } task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName) if !ok || task == nil { - fmt.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName) + logger.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName) // report job instance status with at-least-once-delivery req := &schedulerx.WorkerReportJobInstanceStatusRequest{ @@ -244,7 +246,7 @@ func (a *jobInstanceActor) handleKillJobInstance(actorCtx actor.Context, msg *ac } else { if taskMaster := masterpool.GetTaskMasterPool().Get(req.GetJobInstanceId()); taskMaster != nil { if err := taskMaster.KillInstance("killed from server"); err != nil { - logger.Infof(fmt.Sprintf("%d killed from server failed, err=%s", req.GetJobInstanceId()), err.Error()) + logger.Infof("%d killed from server failed, err=%s", req.GetJobInstanceId(), err.Error()) } } errMsg := fmt.Sprintf("%d killed from server", req.GetJobInstanceId()) diff --git a/internal/batch/base_req_handler.go b/internal/batch/base_req_handler.go index 129496a..599c7db 100644 --- a/internal/batch/base_req_handler.go +++ b/internal/batch/base_req_handler.go @@ -128,7 +128,8 @@ func (rcvr *BaseReqHandler) SetWorkThreadNum(workThreadNum int) { } func (rcvr *BaseReqHandler) Start(h ReqHandler) error { - gopool, err := ants.NewPool(rcvr.maxBatchThreadNum, + gopool, err := ants.NewPool( + rcvr.maxBatchThreadNum, ants.WithExpiryDuration(30*time.Second), ants.WithPanicHandler(func(i interface{}) { if r := recover(); r != nil { @@ -145,10 +146,10 @@ func (rcvr *BaseReqHandler) Start(h ReqHandler) error { for { select { case <-rcvr.stopBatchRetrieveCh: - break + return default: reqs := rcvr.AsyncHandleReqs(h) - logger.Debugf("jobInstanceId=%s, batch retrieve reqs, size:%d, remain size:%d, batchSize:%d", + logger.Debugf("jobInstanceId=%d, batch retrieve reqs, size:%d, remain size:%d, batchSize:%d", rcvr.jobInstanceId, len(reqs), len(rcvr.reqsQueue.requests), rcvr.batchSize) if int32(len(reqs)) < rcvr.batchSize*4/5 { // no element in reqs, sleep a while for aggregation @@ -171,9 +172,14 @@ func (rcvr *BaseReqHandler) Stop() { } if rcvr.batchProcessSvc != nil { rcvr.batchProcessSvc.Release() + rcvr.batchProcessSvc = nil } if rcvr.reqsQueue != nil { rcvr.reqsQueue.Clear() + rcvr.reqsQueue = nil + } + if rcvr.activeRunnableNum != nil { + rcvr.activeRunnableNum = nil } } diff --git a/internal/batch/container_status_req_handler.go b/internal/batch/container_status_req_handler.go index 2fef0cc..6df1207 100644 --- a/internal/batch/container_status_req_handler.go +++ b/internal/batch/container_status_req_handler.go @@ -63,9 +63,9 @@ func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []inte err := h.batchProcessSvc.Submit(func() { if h.enableShareContainerPool { // FIXME fix import cycle - //// 如果开启共享线程池,statues 可能会有多个 jobInstanceId,需要先 split 成不同的 list - //taskStatusRequestMap := make(map[*Pair][]*schedulerx.ContainerReportTaskStatusRequest) - //for _, req := range reqs { + // // 如果开启共享线程池,statues 可能会有多个 jobInstanceId,需要先 split 成不同的 list + // taskStatusRequestMap := make(map[*Pair][]*schedulerx.ContainerReportTaskStatusRequest) + // for _, req := range reqs { // pair := &Pair{ // jobInstanceId: req.GetJobInstanceId(), // serialNum: req.GetSerialNum(), @@ -75,10 +75,10 @@ func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []inte // } else { // taskStatusRequestMap[pair] = []*schedulerx.ContainerReportTaskStatusRequest{req} // } - //} + // } // - //// 针对不同的 jobInstanceId,构造 batchStatusRequests - //for pair, reqs := range taskStatusRequestMap { + // // 针对不同的 jobInstanceId,构造 batchStatusRequests + // for pair, reqs := range taskStatusRequestMap { // var ( // instanceMasterActorPath string // finishCount = 0 @@ -137,7 +137,7 @@ func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []inte // } else { // logger.Errorf("instanceMasterActorPath is null, jobInstanceId=%d", jobInstanceId) // } - //} + // } } else { taskStatuses := make([]*schedulerx.TaskStatusInfo, 0, len(reqs)) // some attrs are duplicated in all reqs, for example: workAddr, workerId, jobId, jobInstanceId, taskMasterPath @@ -157,6 +157,9 @@ func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []inte if req.GetProgress() != "" { taskStatusInfo.Progress = proto.String(req.GetProgress()) } + if req.GetTraceId() != "" { + taskStatusInfo.TraceId = proto.String(req.GetTraceId()) + } taskStatuses = append(taskStatuses, taskStatusInfo) } req := &schedulerx.ContainerBatchReportTaskStatuesRequest{ diff --git a/internal/batch/container_status_req_handler_pool.go b/internal/batch/container_status_req_handler_pool.go index 2eae4af..9fa6e21 100644 --- a/internal/batch/container_status_req_handler_pool.go +++ b/internal/batch/container_status_req_handler_pool.go @@ -60,6 +60,7 @@ func (p *ContainerStatusReqHandlerPool) Stop(jobInstanceId int64) { handler, ok := p.handlers.LoadAndDelete(jobInstanceId) if ok { handler.(*ContainerStatusReqHandler).Stop() + handler = nil } } diff --git a/internal/batch/req_queue.go b/internal/batch/req_queue.go index 6f337f3..2dd3dd3 100644 --- a/internal/batch/req_queue.go +++ b/internal/batch/req_queue.go @@ -18,17 +18,17 @@ package batch import ( "sync" - + "github.com/alibaba/schedulerx-worker-go/logger" ) type ReqQueue struct { - capacity int64 + capacity int32 requests []interface{} lock sync.RWMutex } -func NewReqQueue(capacity int64) (q *ReqQueue) { +func NewReqQueue(capacity int32) (q *ReqQueue) { return &ReqQueue{ capacity: capacity, requests: make([]interface{}, 0, capacity), @@ -40,11 +40,11 @@ func (q *ReqQueue) SubmitRequest(request interface{}) { } func (q *ReqQueue) RetrieveRequests(batchSize int32) []interface{} { - res := make([]interface{}, 0, q.Size()) - for i := 0; int32(i) < batchSize; i++ { + res := make([]interface{}, 0, batchSize) + for i := int32(0); i < batchSize; i++ { request := q.pop() if request == nil { - //empty, just break + // empty, just break break } res = append(res, request) @@ -54,7 +54,7 @@ func (q *ReqQueue) RetrieveRequests(batchSize int32) []interface{} { func (q *ReqQueue) push(req interface{}) { if req != nil { - if q.capacity > 0 && int64(q.Size()) == q.capacity { + if q.capacity > 0 && int32(q.Size()) == q.capacity { logger.Warnf("req queue is full, capacity: %d", q.capacity) return } @@ -75,11 +75,11 @@ func (q *ReqQueue) pop() interface{} { return req } -func (q *ReqQueue) SetCapacity(capacity int64) { +func (q *ReqQueue) SetCapacity(capacity int32) { q.lock.Lock() defer q.lock.Unlock() q.capacity = capacity - if int64(q.Size()) > q.capacity { + if int32(q.Size()) > q.capacity { q.requests = q.requests[:q.capacity] } } diff --git a/internal/container/thread_container_pool.go b/internal/container/thread_container_pool.go index 38f48ca..272b9f0 100644 --- a/internal/container/thread_container_pool.go +++ b/internal/container/thread_container_pool.go @@ -52,7 +52,8 @@ type ThreadContainerPool struct { } func newTreadContainerPool() *ThreadContainerPool { - gopool, _ := ants.NewPool(int(config.GetWorkerConfig().SharePoolSize()), + gopool, _ := ants.NewPool( + int(config.GetWorkerConfig().SharePoolSize()), ants.WithExpiryDuration(30*time.Second), ants.WithPanicHandler(func(i interface{}) { if r := recover(); r != nil { @@ -68,32 +69,27 @@ func newTreadContainerPool() *ThreadContainerPool { } } -func (p *ThreadContainerPool) Submit(jobId, jobInstanceId, taskId int64, container Container, consumerSize int32) error { - if !p.enableShareContainerPool { - var ( - pool interface{} - ok bool - err error - ) - pool, ok = p.threadPoolMap.Load(jobInstanceId) - if !ok { - pool, err = ants.NewPool(int(consumerSize), - ants.WithExpiryDuration(30*time.Second), - ants.WithPanicHandler(func(i interface{}) { - if r := recover(); r != nil { - logger.Errorf("Catch panic with PanicHandler in ThreadContainerPool, %v\n%s", r, debug.Stack()) - } - })) - if err != nil { - return err - } - p.threadPoolMap.Store(jobInstanceId, pool) +func (p *ThreadContainerPool) Submit(jobId, jobInstanceId, taskId int64, container Container, consumerSize int32) (err error) { + if p.enableShareContainerPool { + return p.sharedThreadPool.Submit(container.Start) + } + + pool, ok := p.threadPoolMap.Load(jobInstanceId) + if !ok { + pool, err = ants.NewPool( + int(consumerSize), + ants.WithExpiryDuration(30*time.Second), + ants.WithPanicHandler(func(i interface{}) { + if r := recover(); r != nil { + logger.Errorf("Catch panic with PanicHandler in ThreadContainerPool, %v\n%s", r, debug.Stack()) + } + })) + if err != nil { + return err } - pool.(*ants.Pool).Submit(container.Start) - } else { - p.sharedThreadPool.Submit(container.Start) + p.threadPoolMap.Store(jobInstanceId, pool) } - return nil + return pool.(*ants.Pool).Submit(container.Start) } func (p *ThreadContainerPool) DestroyByInstance(jobInstanceId int64) bool { diff --git a/internal/discovery/discover.go b/internal/discovery/discover.go index ff973c3..f3b9d29 100644 --- a/internal/discovery/discover.go +++ b/internal/discovery/discover.go @@ -76,6 +76,7 @@ func (s *ServiceDiscover) Start(groupId, appKey string) { s.refreshActiveServer(groupId, appKey) case <-s.stopCh: logger.Infof("receive stop signal") + return } } } diff --git a/internal/master/batch_task_master.go b/internal/master/batch_task_master.go index 4e0bc97..155b2a7 100644 --- a/internal/master/batch_task_master.go +++ b/internal/master/batch_task_master.go @@ -51,7 +51,7 @@ func (m *BatchTaskMaster) doMetricsCheck() error { return err } if diskUsedPercent > constants.UserSpacePercentMax { - return fmt.Errorf("used space beyond d% % ", constants.UserSpacePercentMax*100) + return fmt.Errorf("used space beyond %f%% ", constants.UserSpacePercentMax*100) } return nil } diff --git a/internal/master/common_update_instance_status_handler.go b/internal/master/common_update_instance_status_handler.go index 092f032..1820876 100644 --- a/internal/master/common_update_instance_status_handler.go +++ b/internal/master/common_update_instance_status_handler.go @@ -17,6 +17,9 @@ package master import ( + "github.com/asynkron/protoactor-go/actor" + "google.golang.org/protobuf/proto" + "github.com/alibaba/schedulerx-worker-go/config" "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/common" @@ -25,8 +28,6 @@ import ( "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor" - "github.com/asynkron/protoactor-go/actor" - "google.golang.org/protobuf/proto" ) var _ UpdateInstanceStatusHandler = &commonUpdateInstanceStatusHandler{} @@ -92,7 +93,7 @@ func (rcvr *commonUpdateInstanceStatusHandler) Handle(serialNum int64, instanceS rcvr.masterPool.Remove(jobInstanceId) } - logger.Infof("uniqueId: %d is finished, remove from MasterPool.", uniqueId) + logger.Infof("uniqueId: %s is finished, remove from MasterPool.", uniqueId) } } progress, err := rcvr.taskMaster.GetJobInstanceProgress() diff --git a/internal/master/grid_task_master.go b/internal/master/grid_task_master.go index 1be545d..8ec89f4 100644 --- a/internal/master/grid_task_master.go +++ b/internal/master/grid_task_master.go @@ -19,6 +19,7 @@ package master import ( "encoding/json" "fmt" + "github.com/alibaba/schedulerx-worker-go/config" "github.com/asynkron/protoactor-go/actor" @@ -46,9 +47,9 @@ func NewGridTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.C } gridTaskMaster.taskPersistence = persistence.GetH2MemoryPersistence() gridTaskMaster.taskPersistence.InitTable() - gridTaskMaster.taskStatusReqQueue = batch.NewReqQueue(150 * 10000) + gridTaskMaster.taskStatusReqQueue = batch.NewReqQueue(config.GetWorkerConfig().QueueSize()) gridTaskMaster.taskStatusReqBatchHandler = batch.NewTMStatusReqHandler(jobInstanceId, 1, 1, 3000, gridTaskMaster.taskStatusReqQueue) - gridTaskMaster.taskBlockingQueue = batch.NewReqQueue(150 * 10000) + gridTaskMaster.taskBlockingQueue = batch.NewReqQueue(config.GetWorkerConfig().QueueSize()) if jobInstanceInfo.GetXattrs() != "" { gridTaskMaster.xAttrs = new(common.MapTaskXAttrs) if err := json.Unmarshal([]byte(jobInstanceInfo.GetXattrs()), gridTaskMaster.xAttrs); err != nil { diff --git a/internal/master/map_task_master.go b/internal/master/map_task_master.go index 725c8f0..8b4e953 100644 --- a/internal/master/map_task_master.go +++ b/internal/master/map_task_master.go @@ -109,8 +109,8 @@ func NewMapTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Co taskStatusMap: make(map[int64]taskstatus.TaskStatus), taskCounter: atomic.NewInt64(0), localTaskRouterPath: actorCtx.ActorSystem().Address(), - //taskStatusReqQueue: batch.NewReqQueue(100000), - //taskBlockingQueue: batch.NewReqQueue(100000), + // taskStatusReqQueue: batch.NewReqQueue(100000), + // taskBlockingQueue: batch.NewReqQueue(100000), } statusHandler := NewCommonUpdateInstanceStatusHandler(actorCtx, mapTaskMaster, jobInstanceInfo) @@ -119,21 +119,21 @@ func NewMapTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Co } mapTaskMaster.TaskMaster = NewTaskMaster(actorCtx, jobInstanceInfo, statusHandler) - //mapTaskMaster.taskStatusReqBatchHandler = batch.NewTMStatusReqHandler(jobInstanceInfo.GetJobInstanceId(), 1, 1, 3000, mapTaskMaster.taskStatusReqQueue) - //if jobInstanceInfo.GetXattrs() != "" { + // mapTaskMaster.taskStatusReqBatchHandler = batch.NewTMStatusReqHandler(jobInstanceInfo.GetJobInstanceId(), 1, 1, 3000, mapTaskMaster.taskStatusReqQueue) + // if jobInstanceInfo.GetXattrs() != "" { // if err := json.Unmarshal([]byte(jobInstanceInfo.GetXattrs()), mapTaskMaster.xAttrs); err != nil { // logger.Errorf("Unmarshal xAttrs failed, err=%s", err.Error()) // } - //} - //if mapTaskMaster.xAttrs != nil && mapTaskMaster.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) { + // } + // if mapTaskMaster.xAttrs != nil && mapTaskMaster.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) { // mapTaskMaster.taskDispatchReqHandler = batch.NewTaskPullReqHandler( // jobInstanceInfo.GetJobInstanceId(), 1, 1, int32(mapTaskMaster.pageSize*int64(len(jobInstanceInfo.GetAllWorkers()))), // mapTaskMaster.taskBlockingQueue) - //} else { + // } else { // mapTaskMaster.taskDispatchReqHandler = batch.NewTaskPushReqHandler( // jobInstanceInfo.GetJobInstanceId(), 1, 1, int32(mapTaskMaster.pageSize*int64(len(jobInstanceInfo.GetAllWorkers()))), // mapTaskMaster.taskBlockingQueue, 3000) - //} + // } return mapTaskMaster } @@ -176,7 +176,7 @@ func (m *MapTaskMaster) pullTask(jobIdAndInstanceId string) { time.Sleep(10 * time.Second) continue } - logger.Debugf("jobInstanceId=%d, pull cost=%dms", jobInstanceId, time.Now().Sub(startTime).Milliseconds()) + logger.Debugf("jobInstanceId=%d, pull cost=%dms", jobInstanceId, time.Since(startTime).Milliseconds()) if len(taskInfos) == 0 { logger.Debugf("pull task empty of jobInstanceId=%d, sleep 10s ...", jobInstanceId) time.Sleep(10 * time.Second) @@ -513,11 +513,11 @@ func (m *MapTaskMaster) machineOverload() bool { taskQueueOverload = false ) // FIXME golang get heap and cpu metric - //vmDetail := MetricsCollector.getMetrics() - //if vmDetail != nil { + // vmDetail := MetricsCollector.getMetrics() + // if vmDetail != nil { // memOverload = vmDetail.getHeap1Usage() >= WorkerConstants.USER_MEMORY_PERCENT_MAX // loadOverload = vmDetail.getCpuLoad1() >= vmDetail.getCpuProcessors() - //} + // } return memOverload || loadOverload || taskQueueOverload } @@ -551,7 +551,7 @@ func (m *MapTaskMaster) batchHandleContainers(workerIdAddr string, reqs []*sched if dispatchMode == common.TaskDispatchModePush { startTime := time.Now() // FIXME - //workerAddr = actorcomm.GetRemoteWorkerAddr(workerAddr) + // workerAddr = actorcomm.GetRemoteWorkerAddr(workerAddr) containerRouterActorPid := actorcomm.GetContainerRouterPid(workerAddr) req := &schedulerx.MasterBatchStartContainersRequest{ @@ -863,12 +863,12 @@ func (m *MapTaskMaster) GetJobInstanceProgress() (string, error) { func (m *MapTaskMaster) CheckProcessor() { // FIXME - //if "java".equalsIgnoreCase(jobInstanceInfo.getJobType()) { + // if "java".equalsIgnoreCase(jobInstanceInfo.getJobType()) { // processor := JavaProcessorProfileUtil.getJavaProcessor(jobInstanceInfo.getContent()) // if !ok { // throw(NewIOException(processor.getClass().getName() + " must extends MapJobProcessor or MapReduceJobProcessor")) // } - //} + // } } func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult { @@ -954,15 +954,15 @@ func (m *MapTaskMaster) Stop() { func (m *MapTaskMaster) startBatchHandler() error { // FIXME - //if m.IsInited() { + // if m.IsInited() { // return nil - //} + // } // start batch handlers if err := m.taskStatusReqBatchHandler.Start(m.taskStatusReqBatchHandler); err != nil { return err } - //m.taskBlockingQueue = batch.NewReqQueue(m.queueSize) + // m.taskBlockingQueue = batch.NewReqQueue(m.queueSize) if m.xAttrs != nil && m.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePush) { m.taskDispatchReqHandler.SetWorkThreadNum(int(m.dispatcherSize)) diff --git a/internal/master/second_job_update_instance_status_handler.go b/internal/master/second_job_update_instance_status_handler.go index 0a6d21f..55de35f 100644 --- a/internal/master/second_job_update_instance_status_handler.go +++ b/internal/master/second_job_update_instance_status_handler.go @@ -20,7 +20,7 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "reflect" "strconv" @@ -155,10 +155,10 @@ func (h *secondJobUpdateInstanceStatusHandler) getAllWorkers(appGroupId, jobId i url := fmt.Sprintf("http://%s/app/getAllUsefulWorkerList.json?appGroupId=%d&jobId=%d", openapi.GetOpenAPIClient().Domain(), appGroupId, jobId) resp, err := openapi.GetOpenAPIClient().HttpClient().Get(url) if err != nil { - return nil, fmt.Errorf("HTTP request getAllWorkers failed, appGroupId:%s, err:%s", appGroupId, err.Error()) + return nil, fmt.Errorf("HTTP request getAllWorkers failed, appGroupId:%d, err:%s", appGroupId, err.Error()) } defer resp.Body.Close() - respData, err := ioutil.ReadAll(resp.Body) + respData, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("Read http http response failed, err=%s ", err.Error()) } @@ -255,16 +255,16 @@ func (h *secondJobUpdateInstanceStatusHandler) triggerNextCycle(cycleId string, h.setHistory(h.taskMaster.GetSerialNum(), h.cycleStartTime, instanceStatus) if !h.taskMaster.IsKilled() { - //TODO: 先清理这次迭代的资源,未来可以优化不需要每次清理 + // TODO: 先清理这次迭代的资源,未来可以优化不需要每次清理 h.taskMaster.Clear(h.taskMaster) // The current node is offline // FIXME implement it - //if (!SchedulerxWorker.INITED) { + // if (!SchedulerxWorker.INITED) { // LOGGER.info("Current worker is not running. To shutdown this master JobInstanceId={}", jobInstanceInfo.getJobInstanceId()); // taskMaster.killInstance(true,"Worker master shutdown."); // return; - //} + // } // Calculate the next scheduling time and add it to the time scheduler delayTime, err := strconv.Atoi(h.jobInstanceInfo.GetTimeExpression()) diff --git a/internal/remoting/codec/akka_codec.go b/internal/remoting/codec/akka_codec.go index c4d7050..0ddcc49 100644 --- a/internal/remoting/codec/akka_codec.go +++ b/internal/remoting/codec/akka_codec.go @@ -233,8 +233,7 @@ func DecodeAkkaMessage(msg *akka.AkkaProtocolMessage) (interface{}, string, erro default: return nil, "", fmt.Errorf("Unknown message type=%s, decode failed ", msgType) } - } else { - return nil, "", fmt.Errorf("Unknown message=%+v, decode failed ", msg) } - return nil, "", fmt.Errorf("Invalid msg=%+v, decode failed ", msg) + + return nil, "", fmt.Errorf("Unknown message=%+v, decode failed ", msg) } diff --git a/internal/remoting/handshake.go b/internal/remoting/handshake.go index 5b1abd0..294f063 100644 --- a/internal/remoting/handshake.go +++ b/internal/remoting/handshake.go @@ -68,6 +68,9 @@ func Handshake(ctx context.Context, conn net.Conn) error { } msg, err := trans.ReadAkkaMsg(dataBuf) + if err != nil { + return fmt.Errorf("handshake read akka msg err=%+v ", err) + } if controlMsg := msg.Instruction; controlMsg != nil && controlMsg.CommandType != nil { if int32(*controlMsg.CommandType) == int32(akka.CommandType_ASSOCIATE) { logger.Infof("Receive handshake msg, msg=%+v ", controlMsg) diff --git a/internal/remoting/pool/single_pool.go b/internal/remoting/pool/single_pool.go index fe2e1be..9565422 100644 --- a/internal/remoting/pool/single_pool.go +++ b/internal/remoting/pool/single_pool.go @@ -144,23 +144,17 @@ func (p *singleConnPool) isConnExisted() bool { } func (p *singleConnPool) onReconnectTrigger(ctx context.Context) { - for { - select { - case <-p.reconnectSignalCh: - if _, err := p.newConn(ctx); err != nil { - logger.Errorf("Reconnect server failed after connection isn't available, err=%s", err.Error()) - } + for range p.reconnectSignalCh { + if _, err := p.newConn(ctx); err != nil { + logger.Errorf("Reconnect server failed after connection isn't available, err=%s", err.Error()) } } } func (p *singleConnPool) onAddrChanged(ctx context.Context) { - for { - select { - case <-p.options.addrChangedSignalCh: - if _, err := p.newConn(ctx); err != nil { - logger.Errorf("Reconnect server failed after addr if changed, err=%s", err.Error()) - } + for range p.options.addrChangedSignalCh { + if _, err := p.newConn(ctx); err != nil { + logger.Errorf("Reconnect server failed after addr if changed, err=%s", err.Error()) } } } diff --git a/internal/tasks/tasks.go b/internal/tasks/tasks.go index 227a92d..8b2a28a 100644 --- a/internal/tasks/tasks.go +++ b/internal/tasks/tasks.go @@ -22,19 +22,13 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor" ) -var ( - taskMap *TaskMap - once sync.Once -) +var taskMap = &TaskMap{tasks: sync.Map{}} type TaskMap struct { tasks sync.Map // map[string]processor.Processor } func GetTaskMap() *TaskMap { - once.Do(func() { - taskMap = &TaskMap{tasks: sync.Map{}} - }) return taskMap } diff --git a/internal/version/version.go b/internal/version/version.go index 511571b..b53d925 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -18,5 +18,5 @@ package version // Version used for statistics and debug func Version() string { - return "v0.0.4" + return "v1.0.1" } diff --git a/processor/mapjob/map_job_processor.go b/processor/mapjob/map_job_processor.go index f915be4..556cf44 100644 --- a/processor/mapjob/map_job_processor.go +++ b/processor/mapjob/map_job_processor.go @@ -58,8 +58,8 @@ func NewMapJobProcessor() *MapJobProcessor { func (rcvr *MapJobProcessor) checkTaskObject(jobCtx *jobcontext.JobContext, taskObject interface{}) error { // FIXME isAdvancedVersion := false - //context := ContainerFactory.getContainerPool().getContext() - //isAdvancedVersion := GroupManager.INSTANCE.isAdvancedVersion(context.getGroupId()) + // context := ContainerFactory.getContainerPool().getContext() + // isAdvancedVersion := GroupManager.INSTANCE.isAdvancedVersion(context.getGroupId()) if bizSubTask, ok := taskObject.(bizsubtask.BizSubTask); isAdvancedVersion && ok { labelMap := bizSubTask.LabelMap() @@ -216,11 +216,12 @@ func (rcvr *MapJobProcessor) Map(jobCtx *jobcontext.JobContext, taskList []inter ret, err = rcvr.actorSystem.Root.RequestFuture(mapMasterPid, req, 30*time.Second).Result() if errors.Is(err, actor.ErrTimeout) { logger.Warnf("JobInstanceId=%d WorkerMapTaskRequest dispatch failed, due to send request=%+v to taskMaster timeout.", req.GetJobInstanceId(), req) - if retryCount < maxRetryCount { + for retryCount < maxRetryCount { time.Sleep(10 * time.Millisecond) ret, err = rcvr.actorSystem.Root.RequestFuture(mapMasterPid, req, 30*time.Second).Result() retryCount++ - } else { + } + if err != nil { return nil, fmt.Errorf("JobInstanceId=%d WorkerMapTaskRequest dispatch failed, due to send request=%+v to taskMaster timeout after retry exceed %d times, err=%s ", req.GetJobInstanceId(), req, retryCount, err.Error()) } } @@ -255,7 +256,7 @@ func (rcvr *MapJobProcessor) Map(jobCtx *jobcontext.JobContext, taskList []inter } func (rcvr *MapJobProcessor) Kill(jobCtx *jobcontext.JobContext) error { - //TODO implement me + // TODO implement me panic("implement me") }