Skip to content

Commit

Permalink
add comment to code
Browse files Browse the repository at this point in the history
  • Loading branch information
thinhdanggroup committed Apr 1, 2020
1 parent d319310 commit cf4ddd1
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,35 @@ import (
"reflect"
"runtime"
"sync"
"unsafe"

"go.uber.org/ratelimit"
)

// Executor is a simple thread pool base on goroutine.
type Executor struct {
RateLimit ratelimit.Limiter
WaitGroup *sync.WaitGroup
Channel chan *Job
NumWorkers int
}

// Job is a task will be executor execute.
type Job struct {
Handler interface{}
Args []reflect.Value
}

// ExecutorConfig is a config of executor.
// ReqPerSeconds is request per seconds. If it is 0, no limit for requests.
// QueueSize is size of buffer. Executor use synchronize channel, publisher will waiting if channel is full.
// NumWorkers is a number of goroutine.
type ExecutorConfig struct {
ReqPerSeconds int
QueueSize int
NumWorkers int
}

// DefaultExecutorConfig is a default config
func DefaultExecutorConfig() ExecutorConfig {
return ExecutorConfig{
ReqPerSeconds: 0,
Expand All @@ -37,6 +43,7 @@ func DefaultExecutorConfig() ExecutorConfig {
}
}

// NewExecutor returns a Executors that will manage workers.
func NewExecutor(config ExecutorConfig) (*Executor, error) {
err := config.validate()

Expand All @@ -60,26 +67,15 @@ func NewExecutor(config ExecutorConfig) (*Executor, error) {
return pipeline, nil
}

type rtype struct {
}
type funcType struct {
inCount uint16
outCount uint16 // top bit is set if last input parameter is ...
}

func (t *rtype) NumIn() int {
tt := (*funcType)(unsafe.Pointer(t))
return int(tt.inCount)
}

// NewJob returns a Job that will be executed in workers.
func NewJob(handler interface{}, inputArgs ...interface{}) (*Job, error) {
var (
err error
args []reflect.Value
)

nArgs := len(inputArgs)
parsedHandler, err := getFunc(handler, nArgs)
parsedHandler, err := validateFunc(handler, nArgs)

if err != nil {
return nil, err
Expand All @@ -95,6 +91,8 @@ func NewJob(handler interface{}, inputArgs ...interface{}) (*Job, error) {
}, nil
}

// Publish to publish a handler and arguments
// Workers will run handler with provided arguments.
func (pipeline *Executor) Publish(handler interface{}, inputArgs ...interface{}) error {
job, err := NewJob(handler, inputArgs...)

Expand Down Expand Up @@ -136,16 +134,19 @@ func (pipeline *Executor) runWorker() {
}
}

// Wait for all worker done.
func (pipeline *Executor) Wait() {
pipeline.WaitGroup.Wait()
}

// Close channel and wait all worker done.
func (pipeline *Executor) Close() {
pipeline.WaitGroup.Wait()
close(pipeline.Channel)
}

func getFunc(handler interface{}, nArgs int) (interface{}, error) {
// validateFunc validate type of handler and number of arguments.
func validateFunc(handler interface{}, nArgs int) (interface{}, error) {
f := reflect.Indirect(reflect.ValueOf(handler))

if f.Kind() != reflect.Func {
Expand Down

0 comments on commit cf4ddd1

Please sign in to comment.