Skip to content

Commit

Permalink
Merge pull request #37 from alibaba/develop
Browse files Browse the repository at this point in the history
v1.0.7
  • Loading branch information
HuangXiaomeng authored Nov 7, 2024
2 parents 6619423 + 8e36fb1 commit 6dac403
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 205 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) {
}

stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)

// Keep heartbeat, and receive message
// KeepHeartbeat must after init actors, so that can get actorSystemPort from actorSystem
Expand Down
2 changes: 1 addition & 1 deletion example/broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {

// wait for the stop signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
<-c
time.Sleep(time.Second * 5)
}
2 changes: 1 addition & 1 deletion example/mapreduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {

// wait for the stop signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
<-c
time.Sleep(time.Second * 5)
}
2 changes: 1 addition & 1 deletion example/standalone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func main() {

// wait for the stop signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
<-c
time.Sleep(time.Second * 5)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alibaba/schedulerx-worker-go

go 1.18
go 1.20

require (
github.com/asynkron/protoactor-go v0.0.0-20230916135836-b14bb1f51af6
Expand Down
1 change: 0 additions & 1 deletion internal/actor/common/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ var (
actorSystem *actor.ActorSystem
)

// GetActorSystem must be executed before InitActorSystem, otherwise it returns nil
func GetActorSystem() *actor.ActorSystem {
once.Do(func() {
actorSystem = actor.NewActorSystem()
Expand Down
3 changes: 1 addition & 2 deletions internal/batch/base_req_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,12 @@ func (rcvr *BaseReqHandler) Stop() {
}
if rcvr.batchProcessSvc != nil {
rcvr.batchProcessSvc.Release()
rcvr.batchProcessSvc = nil
}
if rcvr.reqsQueue != nil {
rcvr.reqsQueue.Clear()
}
if rcvr.activeRunnableNum != nil {
rcvr.activeRunnableNum = nil
rcvr.activeRunnableNum.Store(0)
}
}

Expand Down
8 changes: 6 additions & 2 deletions internal/master/map_task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,9 @@ func (m *MapTaskMaster) BatchUpdateTaskStatues(requests []*schedulerx.ContainerR
taskName = request.GetTaskName()
)

logger.Debugf("report task status:%s from worker:%s, uniqueId:%d", taskStatus.Descriptor(), workerAddr,
logger.Debugf("report task status:%s from worker:%s, uniqueId:%s", taskStatus.Descriptor(), workerAddr,
utils.GetUniqueId(request.GetJobId(), request.GetJobInstanceId(), request.GetTaskId()))

m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName))
if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
Expand Down Expand Up @@ -488,7 +489,10 @@ func (m *MapTaskMaster) BatchUpdateTaskStatues(requests []*schedulerx.ContainerR
break
}
if !updateSuccess {
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, "persistent batch update TaskStatus error up to 3 times")
err := m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, "persistent batch update TaskStatus error up to 3 times")
if err != nil {
logger.Errorf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", m.GetJobInstanceInfo().GetJobInstanceId(), err.Error())
}
}
logger.Debugf("jobInstanceId=%d batch update status db cost %dms", m.GetJobInstanceInfo().GetJobInstanceId(), time.Since(startTime).Milliseconds())
}
Expand Down
33 changes: 8 additions & 25 deletions internal/master/persistence/h2_connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"path/filepath"

_ "github.com/mattn/go-sqlite3"

"github.com/alibaba/schedulerx-worker-go/logger"
)

const memoryModeDataSourceName = ":memory:"
const memoryModeDataSourceName = "file::memory:?cache=shared"

type H2ConnectionPool struct {
*sql.DB
Expand All @@ -50,39 +48,24 @@ func NewH2ConnectionPool(opts ...Option) (*H2ConnectionPool, error) {
opt(options)
}

dataSourceName := "sqlite3.db"
if options.dataSourceName != "" {
dataSourceName = options.dataSourceName
}

files, err := filepath.Glob("schedulerx2_*_sqlite3.db")
if err != nil {
return nil, err
}
for _, file := range files {
if err := os.Remove(file); err != nil {
if err = os.Remove(file); err != nil {
return nil, err
}
}

// memory mode no need creating local file
if dataSourceName != memoryModeDataSourceName {
logger.Infof("Creating %s sqlite3...", dataSourceName)
file, err := os.Create(dataSourceName)
if err != nil {
return nil, err
}
defer file.Close()

logger.Infof("sqlite3 DB=%s created", dataSourceName)
dataSourceName := "sqlite3.db"
if options.dataSourceName != "" {
dataSourceName = options.dataSourceName
}

sqliteDatabase, err := sql.Open("sqlite3", dataSourceName)
db, err := sql.Open("sqlite3", dataSourceName)
if err != nil {
return nil, err
}

return &H2ConnectionPool{
sqliteDatabase,
}, nil
db.SetMaxIdleConns(5) // default 2
return &H2ConnectionPool{DB: db}, nil
}
4 changes: 2 additions & 2 deletions internal/master/persistence/server_task_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (rcvr *ServerTaskPersistence) UpdateTaskStatues(taskStatusInfos []*schedule
}
info := taskStatusInfos[0]
status2WorkIdAddr2TaskIds := getTaskStatusMap(taskStatusInfos)
var batchTaskStatuesReq *schedulerx.WorkerBatchReportTaskStatuesRequest
var batchTaskStatuesReq schedulerx.WorkerBatchReportTaskStatuesRequest
for status, workerAddr2TaskIds := range status2WorkIdAddr2TaskIds {
for workerIdAddr, taskIds := range workerAddr2TaskIds {
workerIdAddrParts := strings.Split(workerIdAddr, "@")
Expand All @@ -273,7 +273,7 @@ func (rcvr *ServerTaskPersistence) UpdateTaskStatues(taskStatusInfos []*schedule
batchTaskStatuesReq.JobInstanceId = proto.Int64(info.GetJobInstanceId())

actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: batchTaskStatuesReq,
Msg: &batchTaskStatuesReq,
}

return nil
Expand Down
Loading

0 comments on commit 6dac403

Please sign in to comment.