diff --git a/executor.go b/executor.go index dbe39e7..67cc686 100644 --- a/executor.go +++ b/executor.go @@ -6,11 +6,11 @@ import ( "reflect" "runtime" "sync" - "unsafe" "go.uber.org/ratelimit" ) +// Executor is a simple thread pool base on goroutine. type Executor struct { RateLimit ratelimit.Limiter WaitGroup *sync.WaitGroup @@ -18,17 +18,23 @@ type Executor struct { NumWorkers int } +// Job is a task will be executor execute. type Job struct { Handler interface{} Args []reflect.Value } +// ExecutorConfig is a config of executor. +// ReqPerSeconds is request per seconds. If it is 0, no limit for requests. +// QueueSize is size of buffer. Executor use synchronize channel, publisher will waiting if channel is full. +// NumWorkers is a number of goroutine. type ExecutorConfig struct { ReqPerSeconds int QueueSize int NumWorkers int } +// DefaultExecutorConfig is a default config func DefaultExecutorConfig() ExecutorConfig { return ExecutorConfig{ ReqPerSeconds: 0, @@ -37,6 +43,7 @@ func DefaultExecutorConfig() ExecutorConfig { } } +// NewExecutor returns a Executors that will manage workers. func NewExecutor(config ExecutorConfig) (*Executor, error) { err := config.validate() @@ -60,18 +67,7 @@ func NewExecutor(config ExecutorConfig) (*Executor, error) { return pipeline, nil } -type rtype struct { -} -type funcType struct { - inCount uint16 - outCount uint16 // top bit is set if last input parameter is ... -} - -func (t *rtype) NumIn() int { - tt := (*funcType)(unsafe.Pointer(t)) - return int(tt.inCount) -} - +// NewJob returns a Job that will be executed in workers. func NewJob(handler interface{}, inputArgs ...interface{}) (*Job, error) { var ( err error @@ -79,7 +75,7 @@ func NewJob(handler interface{}, inputArgs ...interface{}) (*Job, error) { ) nArgs := len(inputArgs) - parsedHandler, err := getFunc(handler, nArgs) + parsedHandler, err := validateFunc(handler, nArgs) if err != nil { return nil, err @@ -95,6 +91,8 @@ func NewJob(handler interface{}, inputArgs ...interface{}) (*Job, error) { }, nil } +// Publish to publish a handler and arguments +// Workers will run handler with provided arguments. func (pipeline *Executor) Publish(handler interface{}, inputArgs ...interface{}) error { job, err := NewJob(handler, inputArgs...) @@ -136,16 +134,19 @@ func (pipeline *Executor) runWorker() { } } +// Wait for all worker done. func (pipeline *Executor) Wait() { pipeline.WaitGroup.Wait() } +// Close channel and wait all worker done. func (pipeline *Executor) Close() { pipeline.WaitGroup.Wait() close(pipeline.Channel) } -func getFunc(handler interface{}, nArgs int) (interface{}, error) { +// validateFunc validate type of handler and number of arguments. +func validateFunc(handler interface{}, nArgs int) (interface{}, error) { f := reflect.Indirect(reflect.ValueOf(handler)) if f.Kind() != reflect.Func {