diff --git a/client.go b/client.go index b48d933..2caa1ec 100644 --- a/client.go +++ b/client.go @@ -28,7 +28,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/config" sxactor "github.com/alibaba/schedulerx-worker-go/internal/actor" - "github.com/alibaba/schedulerx-worker-go/internal/actor/common" + actorcomm "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/discovery" "github.com/alibaba/schedulerx-worker-go/internal/masterpool" "github.com/alibaba/schedulerx-worker-go/internal/openapi" diff --git a/config/worker_config.go b/config/worker_config.go index eb3743d..b9c8823 100644 --- a/config/worker_config.go +++ b/config/worker_config.go @@ -145,6 +145,12 @@ func WithQueueSize(queueSize int32) Option { } } +func WithLabel(label string) Option { + return func(config *WorkerConfig) { + config.label = label + } +} + func NewWorkerConfig(opts ...Option) *WorkerConfig { once.Do(func() { workerConfig = defaultWorkerConfig() @@ -173,6 +179,7 @@ type WorkerConfig struct { grpcPort int32 iface string queueSize int32 + label string } func (w *WorkerConfig) IsShareContainerPool() bool { @@ -243,6 +250,10 @@ func (w *WorkerConfig) QueueSize() int32 { return w.queueSize } +func (w *WorkerConfig) Label() string { + return w.label +} + func defaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ isSecondDelayIntervalMS: false, diff --git a/example/broadcast/test_broadcast.go b/example/broadcast/test_broadcast.go index 38a6af3..d360df8 100644 --- a/example/broadcast/test_broadcast.go +++ b/example/broadcast/test_broadcast.go @@ -67,3 +67,8 @@ func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.Proce ret.SetResult(strconv.Itoa(num)) return ret, nil } + +func (t TestBroadcast) Kill(ctx *jobcontext.JobContext) error { + fmt.Println("[Kill] Start kill my task: TestBroadcast") + return nil +} diff --git a/example/standalone/helloworld.go b/example/standalone/helloworld.go index f4a0541..dba2309 100644 --- a/example/standalone/helloworld.go +++ b/example/standalone/helloworld.go @@ -28,12 +28,26 @@ var _ processor.Processor = &HelloWorld{} type HelloWorld struct{} +var Stop = false + func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) { fmt.Println("[Process] Start process my task: Hello world!") // mock execute task - time.Sleep(3 * time.Second) + for i := 0; i < 10; i++ { + fmt.Printf("Hello%d\n", i) + time.Sleep(2 * time.Second) + if Stop { + break + } + } ret := new(processor.ProcessResult) ret.SetSucceed() fmt.Println("[Process] End process my task: Hello world!") return ret, nil } + +func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error { + fmt.Println("[Kill] Start kill my task: Hello world!") + Stop = true + return nil +} diff --git a/example/standalone/main.go b/example/standalone/main.go index ee39e7a..8293e11 100644 --- a/example/standalone/main.go +++ b/example/standalone/main.go @@ -16,17 +16,21 @@ package main -import "github.com/alibaba/schedulerx-worker-go" +import ( + "github.com/alibaba/schedulerx-worker-go" +) func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932", - GroupId: "dts-demo", + GroupId: "xueren_test_sub", AppKey: "xxxxx", } client, err := schedulerx.GetClient(cfg) + // client, err := schedulerx.GetClient(cfg, schedulerx.WithWorkerConfig(config.NewWorkerConfig( + // config.WithLabel("test")))) if err != nil { panic(err) } diff --git a/internal/batch/base_req_handler.go b/internal/batch/base_req_handler.go index 599c7db..a7361de 100644 --- a/internal/batch/base_req_handler.go +++ b/internal/batch/base_req_handler.go @@ -176,7 +176,6 @@ func (rcvr *BaseReqHandler) Stop() { } if rcvr.reqsQueue != nil { rcvr.reqsQueue.Clear() - rcvr.reqsQueue = nil } if rcvr.activeRunnableNum != nil { rcvr.activeRunnableNum = nil diff --git a/internal/container/thread_container.go b/internal/container/thread_container.go index e30d0ab..543eb3a 100644 --- a/internal/container/thread_container.go +++ b/internal/container/thread_container.go @@ -26,7 +26,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/config" - "github.com/alibaba/schedulerx-worker-go/internal/actor/common" + actorcomm "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/batch" "github.com/alibaba/schedulerx-worker-go/internal/constants" "github.com/alibaba/schedulerx-worker-go/internal/masterpool" @@ -112,7 +112,7 @@ func (c *ThreadContainer) Start() { } result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult(fixedErrMsg)) c.reportTaskStatus(result, workerAddr) - logger.Errorf("Process task=%s failed, uniqueId=%v, serialNum=%v, err=%s ", c.jobCtx.TaskName(), uniqueId, err.Error(), c.jobCtx.SerialNum()) + logger.Errorf("Process task=%s failed, uniqueId=%v, serialNum=%v, err=%s ", c.jobCtx.TaskName(), uniqueId, c.jobCtx.SerialNum(), err.Error()) return } @@ -140,7 +140,20 @@ func (c *ThreadContainer) Start() { } func (c *ThreadContainer) Kill() { - logger.Infof("kill container, jobInstanceId=%v", c.jobCtx.JobInstanceId()) + logger.Infof("kill container, jobInstanceId=%v, content=%s", c.jobCtx.JobInstanceId(), c.jobCtx.Content()) + jobName := gjson.Get(c.jobCtx.Content(), "jobName").String() + if jobName != "" { + taskMasterPool := masterpool.GetTaskMasterPool() + task, ok := taskMasterPool.Tasks().Find(jobName) + if !ok { + logger.Warnf("Kill task=%s failed, because it's not found. ", jobName) + } else { + kt, ok := task.(processor.KillProcessor) + if ok { + kt.Kill(c.jobCtx) + } + } + } workerAddr := c.actorCtx.ActorSystem().Address() req := &schedulerx.ContainerReportTaskStatusRequest{ diff --git a/internal/remoting/heartbeat.go b/internal/remoting/heartbeat.go index b563ec4..386b119 100644 --- a/internal/remoting/heartbeat.go +++ b/internal/remoting/heartbeat.go @@ -31,6 +31,7 @@ import ( "github.com/shirou/gopsutil/load" "google.golang.org/protobuf/proto" + "github.com/alibaba/schedulerx-worker-go/config" "github.com/alibaba/schedulerx-worker-go/internal/discovery" "github.com/alibaba/schedulerx-worker-go/internal/masterpool" "github.com/alibaba/schedulerx-worker-go/internal/proto/akka" @@ -157,5 +158,6 @@ func genHeartBeatRequest(groupId string, appGroupId int64, jobInstanceIds []int6 AppGroupId: proto.Int64(appGroupId), Source: proto.String("unknown"), RpcPort: proto.Int32(int32(actorSystemPort)), + Label: proto.String(config.GetWorkerConfig().Label()), } } diff --git a/processor/processor.go b/processor/processor.go index b861cf9..3938c4e 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -24,6 +24,10 @@ type Processor interface { Process(ctx *jobcontext.JobContext) (*ProcessResult, error) } +type KillProcessor interface { + Kill(ctx *jobcontext.JobContext) error +} + type BroadcastProcessor interface { Processor PreProcess(ctx *jobcontext.JobContext) error @@ -33,7 +37,7 @@ type BroadcastProcessor interface { type MapJobProcessor interface { Processor Map(jobCtx *jobcontext.JobContext, taskList []interface{}, taskName string) (*ProcessResult, error) - Kill(jobCtx *jobcontext.JobContext) error + Kill(ctx *jobcontext.JobContext) error } type MapReduceJobProcessor interface {