Skip to content

Commit

Permalink
Merge pull request #14 from alibaba/develop
Browse files Browse the repository at this point in the history
v1.0.0 release
  • Loading branch information
yaohuitc authored Mar 25, 2024
2 parents f763d21 + 36ed55d commit 345d5a1
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 30 deletions.
21 changes: 16 additions & 5 deletions config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,15 @@ func WithTaskBodySizeMax(taskBodySizeMax int32) Option {
}
}

func WithActorSystemPort(port int32) Option {
func WithGrpcPort(port int32) Option {
return func(config *WorkerConfig) {
config.actorSystemPort = port
config.grpcPort = port
}
}

func WithIface(iface string) Option {
return func(config *WorkerConfig) {
config.iface = iface
}
}

Expand Down Expand Up @@ -158,7 +164,8 @@ type WorkerConfig struct {
workerParallelTaskMaxSize int32
workerMapPageSize int32
taskBodySizeMax int32
actorSystemPort int32
grpcPort int32
iface string
}

func (w *WorkerConfig) IsShareContainerPool() bool {
Expand Down Expand Up @@ -217,8 +224,12 @@ func (w *WorkerConfig) TaskBodySizeMax() int32 {
return w.taskBodySizeMax
}

func (w *WorkerConfig) ActorSystemPort() int32 {
return w.actorSystemPort
func (w *WorkerConfig) GrpcPort() int32 {
return w.grpcPort
}

func (w *WorkerConfig) Iface() string {
return w.iface
}

func defaultWorkerConfig() *WorkerConfig {
Expand Down
6 changes: 3 additions & 3 deletions example/broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func main() {
// This is just an example, the real configuration needs to be obtained from the platform
cfg := &schedulerx.Config{
Endpoint: "acm.aliyun.com",
Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932",
GroupId: "dts-demo",
AppKey: "xxxxx",
Namespace: "fa6ed99e-1469-4477-855c-a2bf1659d039",
GroupId: "xueren_test_sub",
AppKey: "myV5K5Xaf1knuzKdPBaj3A==",
}
client, err := schedulerx.GetClient(cfg)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions example/mapreduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func main() {
// This is just an example, the real configuration needs to be obtained from the platform
cfg := &schedulerx.Config{
Endpoint: "acm.aliyun.com",
Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932",
GroupId: "dts-demo",
AppKey: "xxxxx",
Namespace: "fa6ed99e-1469-4477-855c-a2bf1659d039",
GroupId: "xueren_test_sub",
AppKey: "myV5K5Xaf1knuzKdPBaj3A==",
}
client, err := schedulerx.GetClient(cfg)
if err != nil {
Expand All @@ -38,6 +38,6 @@ func main() {
task := &TestMapReduceJob{
mapjob.NewMapReduceJobProcessor(), // FIXME how define user behavior
}
client.RegisterTask("TestMapReduce", task)
client.RegisterTask("TestMapReduceJob", task)
select {}
}
6 changes: 3 additions & 3 deletions example/mapreduce/order_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error {
// Process the MapReduce model is used to distributed scan orders for timeout confirmation
func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
var (
num = 100 * 10000
num = 1000
err error
)
taskName := jobCtx.TaskName()
Expand All @@ -76,8 +76,8 @@ func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.P
fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task())
}
fmt.Printf("orderInfo=%+v\n", orderInfo)
time.Sleep(1 * time.Second)
fmt.Println("Finish Process...")
time.Sleep(10 * time.Millisecond)
// fmt.Println("Finish Process...")
return processor.NewProcessResult(
processor.WithSucceed(),
processor.WithResult(strconv.Itoa(orderInfo.Value)),
Expand Down
13 changes: 1 addition & 12 deletions internal/actor/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/asynkron/protoactor-go/actor"

"github.com/alibaba/schedulerx-worker-go/internal/remoting/pool"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
)

const (
Expand Down Expand Up @@ -72,11 +71,6 @@ func SchedulerxServerPid(ctx context.Context) *actor.PID {
// The workerAddr issued by the server is the address reported by the heartbeat.
// It is the connection address obtained from the connection pool, not the ActorSystem address, so it needs to be converted.
func GetRealWorkerAddr(workerIdAddr string) string {
localHostAddr, err := utils.GetIpv4AddrHost()
if err != nil {
panic(err)
}

parts := strings.Split(workerIdAddr, "@")
workerAddr := parts[1]
addrParts := strings.Split(workerAddr, ":")
Expand All @@ -92,12 +86,7 @@ func GetRealWorkerAddr(workerIdAddr string) string {
panic(fmt.Sprintf("invalid worker addr: %s", workerAddr))
}

if addrParts[0] == localHostAddr {
// Debugging on local machine, starting multiple processes
host = "127.0.0.1"
} else {
host = addrParts[0]
}
host = addrParts[0]

if len(addrParts) == 2 {
port = addrParts[1]
Expand Down
2 changes: 2 additions & 0 deletions internal/actor/container_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package actor

import (
"context"
"encoding/json"
"fmt"
"runtime/debug"
Expand Down Expand Up @@ -346,5 +347,6 @@ func convertMasterStartContainerRequest2JobContext(req *schedulerx.MasterStartCo
jobCtx.SetShardingNum(req.GetShardingNum())
jobCtx.SetTimeType(req.GetTimeType())
jobCtx.SetTimeExpression(req.GetTimeExpression())
jobCtx.Context = context.Background()
return jobCtx, nil
}
21 changes: 18 additions & 3 deletions internal/actor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package actor

import (
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/remote"
"google.golang.org/grpc"
Expand Down Expand Up @@ -85,11 +86,25 @@ func InitActors(actorSystem *actor.ActorSystem) error {
}()

var (
host = "127.0.0.1"
host = "0.0.0.0"
port = 0 // random port
)
if actorSystemPort := config.GetWorkerConfig().ActorSystemPort(); actorSystemPort != 0 {
port = int(actorSystemPort)
if grpcPort := config.GetWorkerConfig().GrpcPort(); grpcPort != 0 {
port = int(grpcPort)
}

if config.GetWorkerConfig().Iface() != "" {
localHost, err := utils.GetIpv4AddrByIface(config.GetWorkerConfig().Iface())
if err != nil {
panic(err)
}
host = localHost
} else {
localHost, err := utils.GetIpv4AddrHost()
if err != nil {
panic(err)
}
host = localHost
}

// The maximum limit for a subtask is 64kb, and a maximum of 1000 batches can be sent together, which is 64MB,
Expand Down
25 changes: 25 additions & 0 deletions internal/utils/ip_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,31 @@ func GetIpv4AddrHost() (string, error) {
return "", errors.New("cannot find valid ipv4 addr")
}

func GetIpv4AddrByIface(_iface string) (string, error) {
ifaces, err := net.Interfaces()
if err != nil {
return "", err
}

// 遍历所有网卡
for _, iface := range ifaces {
if iface.Name == _iface { // 指定网卡名称
addrs, err := iface.Addrs()
if err != nil {
return "", err
}
// 遍历网卡的地址信息
for _, addr := range addrs {
ip, _, _ := net.ParseCIDR(addr.String())
if ip.To4() != nil {
return ip.String(), nil
}
}
}
}
return "", errors.New("cannot find valid ipv4 addr")
}

func ParseIPAddr(addr string) (string, int, error) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
Expand Down

0 comments on commit 345d5a1

Please sign in to comment.