Skip to content

Commit

Permalink
Merge pull request #23 from alibaba/develop
Browse files Browse the repository at this point in the history
merge develop to main
  • Loading branch information
yaohuitc authored Aug 12, 2024
2 parents 4e62b1d + 985db9a commit 97672a4
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 9 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"github.com/alibaba/schedulerx-worker-go/config"
sxactor "github.com/alibaba/schedulerx-worker-go/internal/actor"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
actorcomm "github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/discovery"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
"github.com/alibaba/schedulerx-worker-go/internal/openapi"
Expand Down
11 changes: 11 additions & 0 deletions config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ func WithQueueSize(queueSize int32) Option {
}
}

func WithLabel(label string) Option {
return func(config *WorkerConfig) {
config.label = label
}
}

func NewWorkerConfig(opts ...Option) *WorkerConfig {
once.Do(func() {
workerConfig = defaultWorkerConfig()
Expand Down Expand Up @@ -173,6 +179,7 @@ type WorkerConfig struct {
grpcPort int32
iface string
queueSize int32
label string
}

func (w *WorkerConfig) IsShareContainerPool() bool {
Expand Down Expand Up @@ -243,6 +250,10 @@ func (w *WorkerConfig) QueueSize() int32 {
return w.queueSize
}

func (w *WorkerConfig) Label() string {
return w.label
}

func defaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{
isSecondDelayIntervalMS: false,
Expand Down
5 changes: 5 additions & 0 deletions example/broadcast/test_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,8 @@ func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.Proce
ret.SetResult(strconv.Itoa(num))
return ret, nil
}

func (t TestBroadcast) Kill(ctx *jobcontext.JobContext) error {
fmt.Println("[Kill] Start kill my task: TestBroadcast")
return nil
}
16 changes: 15 additions & 1 deletion example/standalone/helloworld.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,26 @@ var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

var Stop = false

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("[Process] Start process my task: Hello world!")
// mock execute task
time.Sleep(3 * time.Second)
for i := 0; i < 10; i++ {
fmt.Printf("Hello%d\n", i)
time.Sleep(2 * time.Second)
if Stop {
break
}
}
ret := new(processor.ProcessResult)
ret.SetSucceed()
fmt.Println("[Process] End process my task: Hello world!")
return ret, nil
}

func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error {
fmt.Println("[Kill] Start kill my task: Hello world!")
Stop = true
return nil
}
8 changes: 6 additions & 2 deletions example/standalone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

package main

import "github.com/alibaba/schedulerx-worker-go"
import (
"github.com/alibaba/schedulerx-worker-go"
)

func main() {
// This is just an example, the real configuration needs to be obtained from the platform
cfg := &schedulerx.Config{
Endpoint: "acm.aliyun.com",
Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932",
GroupId: "dts-demo",
GroupId: "xueren_test_sub",
AppKey: "xxxxx",
}
client, err := schedulerx.GetClient(cfg)
// client, err := schedulerx.GetClient(cfg, schedulerx.WithWorkerConfig(config.NewWorkerConfig(
// config.WithLabel("test"))))
if err != nil {
panic(err)
}
Expand Down
1 change: 0 additions & 1 deletion internal/batch/base_req_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (rcvr *BaseReqHandler) Stop() {
}
if rcvr.reqsQueue != nil {
rcvr.reqsQueue.Clear()
rcvr.reqsQueue = nil
}
if rcvr.activeRunnableNum != nil {
rcvr.activeRunnableNum = nil
Expand Down
19 changes: 16 additions & 3 deletions internal/container/thread_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
actorcomm "github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/batch"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (c *ThreadContainer) Start() {
}
result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult(fixedErrMsg))
c.reportTaskStatus(result, workerAddr)
logger.Errorf("Process task=%s failed, uniqueId=%v, serialNum=%v, err=%s ", c.jobCtx.TaskName(), uniqueId, err.Error(), c.jobCtx.SerialNum())
logger.Errorf("Process task=%s failed, uniqueId=%v, serialNum=%v, err=%s ", c.jobCtx.TaskName(), uniqueId, c.jobCtx.SerialNum(), err.Error())
return
}

Expand Down Expand Up @@ -140,7 +140,20 @@ func (c *ThreadContainer) Start() {
}

func (c *ThreadContainer) Kill() {
logger.Infof("kill container, jobInstanceId=%v", c.jobCtx.JobInstanceId())
logger.Infof("kill container, jobInstanceId=%v, content=%s", c.jobCtx.JobInstanceId(), c.jobCtx.Content())
jobName := gjson.Get(c.jobCtx.Content(), "jobName").String()
if jobName != "" {
taskMasterPool := masterpool.GetTaskMasterPool()
task, ok := taskMasterPool.Tasks().Find(jobName)
if !ok {
logger.Warnf("Kill task=%s failed, because it's not found. ", jobName)
} else {
kt, ok := task.(processor.KillProcessor)
if ok {
kt.Kill(c.jobCtx)
}
}
}

workerAddr := c.actorCtx.ActorSystem().Address()
req := &schedulerx.ContainerReportTaskStatusRequest{
Expand Down
2 changes: 2 additions & 0 deletions internal/remoting/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/shirou/gopsutil/load"
"google.golang.org/protobuf/proto"

"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/discovery"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
"github.com/alibaba/schedulerx-worker-go/internal/proto/akka"
Expand Down Expand Up @@ -157,5 +158,6 @@ func genHeartBeatRequest(groupId string, appGroupId int64, jobInstanceIds []int6
AppGroupId: proto.Int64(appGroupId),
Source: proto.String("unknown"),
RpcPort: proto.Int32(int32(actorSystemPort)),
Label: proto.String(config.GetWorkerConfig().Label()),
}
}
6 changes: 5 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Processor interface {
Process(ctx *jobcontext.JobContext) (*ProcessResult, error)
}

type KillProcessor interface {
Kill(ctx *jobcontext.JobContext) error
}

type BroadcastProcessor interface {
Processor
PreProcess(ctx *jobcontext.JobContext) error
Expand All @@ -33,7 +37,7 @@ type BroadcastProcessor interface {
type MapJobProcessor interface {
Processor
Map(jobCtx *jobcontext.JobContext, taskList []interface{}, taskName string) (*ProcessResult, error)
Kill(jobCtx *jobcontext.JobContext) error
Kill(ctx *jobcontext.JobContext) error
}

type MapReduceJobProcessor interface {
Expand Down

0 comments on commit 97672a4

Please sign in to comment.