Skip to content

Commit

Permalink
Merge pull request #13 from alibaba/develop
Browse files Browse the repository at this point in the history
Update README.md: update demo of mapjob
  • Loading branch information
yaohuitc authored Mar 13, 2024
2 parents a4ecae5 + 15f2c0f commit f763d21
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type MapJobProcessor interface {
package main

import (
"encoding/json"
"errors"
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
Expand All @@ -185,7 +187,7 @@ func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error {
// Process the MapReduce model is used to distributed scan orders for timeout confirmation
func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
var (
num = 1000
num = 100
err error
)
taskName := jobCtx.TaskName()
Expand All @@ -201,24 +203,33 @@ func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.Process
fmt.Println("start root task")
var messageList []interface{}
for i := 1; i <= num; i++ {
messageList = append(messageList, fmt.Sprintf("id_%d", i))
// orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i))
var str = fmt.Sprintf("id_%d", i)
messageList = append(messageList, str)
}
fmt.Println(messageList)
return mr.Map(jobCtx, messageList, "Level1Dispatch")
} else if taskName == "Level1Dispatch" {
var str []byte = jobCtx.Task()
var s = string(str)
fmt.Printf("str=%s\n", s)
var task []byte = jobCtx.Task()
var str string
err = json.Unmarshal(task, &str)
fmt.Printf("str=%s\n", str)
time.Sleep(100 * time.Millisecond)
fmt.Println("Finish Process...")
if str == "id_5" {
return processor.NewProcessResult(
processor.WithFailed(),
processor.WithResult(str),
), errors.New("test")
}
return processor.NewProcessResult(
processor.WithSucceed(),
processor.WithResult(s),
processor.WithResult(str),
), nil
}
return processor.NewProcessResult(processor.WithFailed()), nil
}


```

#### 3.4 MapReduce任务
Expand Down

0 comments on commit f763d21

Please sign in to comment.