From 6eb9a3d6f51f467f6451af8545ecdc1fa2482c24 Mon Sep 17 00:00:00 2001 From: Achi Chen Date: Tue, 21 Mar 2023 09:35:47 +0800 Subject: [PATCH] Move out independent utils packages (#206) * Move out utils/executor package * Move out utils/pubsub package * Move out utils/kafka package * Move out utils/worker package * Move out utils/migrator package * Move out utils/httptripper package * Remove packages from doc generation * Remove worker examples * Move out utils/spanutils package --- builder/ServiceName/service/service.go | 23 -- example/echo/service/service.go | 30 +-- utils/documentation.go | 7 - utils/executor/README.md | 67 ----- utils/executor/executor.go | 127 ---------- utils/executor/executor_test.go | 153 ------------ utils/executor/types.go | 34 --- utils/httptripper/README.md | 141 ----------- utils/httptripper/httptripper.go | 228 ------------------ utils/httptripper/retry/README.md | 88 ------- utils/httptripper/retry/retry.go | 115 --------- utils/httptripper/retry/retry_test.go | 162 ------------- utils/httptripper/retry/types.go | 27 --- utils/httptripper/strategy/README.md | 45 ---- utils/httptripper/strategy/strategy.go | 42 ---- utils/httptripper/strategy/strategy_test.go | 45 ---- utils/httptripper/strategy/types.go | 12 - utils/httptripper/types.go | 24 -- utils/kafka/config.go | 84 ------- utils/kafka/producer.go | 92 ------- utils/kafka/producer_test.go | 90 ------- utils/migrator/README.md | 25 -- utils/migrator/example/migratorexample.go | 79 ------ utils/migrator/migratorcmdbase.go | 183 -------------- utils/pubsub/README.md | 59 ----- utils/pubsub/message_queue/message_queue.go | 126 ---------- .../message_queue/mocks/MessageQueue.go | 91 ------- utils/pubsub/pubsub.go | 116 --------- utils/pubsub/pubsub_test.go | 180 -------------- utils/spanutils/README.md | 80 ------ utils/spanutils/spanutils.go | 206 ---------------- utils/worker/README.md | 119 --------- utils/worker/config.go | 70 ------ utils/worker/types.go | 106 -------- utils/worker/worker.go | 196 --------------- utils/worker/workerinfo.go | 74 ------ 36 files changed, 4 insertions(+), 3342 deletions(-) delete mode 100644 utils/executor/README.md delete mode 100644 utils/executor/executor.go delete mode 100644 utils/executor/executor_test.go delete mode 100644 utils/executor/types.go delete mode 100644 utils/httptripper/README.md delete mode 100644 utils/httptripper/httptripper.go delete mode 100644 utils/httptripper/retry/README.md delete mode 100644 utils/httptripper/retry/retry.go delete mode 100644 utils/httptripper/retry/retry_test.go delete mode 100644 utils/httptripper/retry/types.go delete mode 100644 utils/httptripper/strategy/README.md delete mode 100644 utils/httptripper/strategy/strategy.go delete mode 100644 utils/httptripper/strategy/strategy_test.go delete mode 100644 utils/httptripper/strategy/types.go delete mode 100644 utils/httptripper/types.go delete mode 100644 utils/kafka/config.go delete mode 100644 utils/kafka/producer.go delete mode 100644 utils/kafka/producer_test.go delete mode 100644 utils/migrator/README.md delete mode 100644 utils/migrator/example/migratorexample.go delete mode 100644 utils/migrator/migratorcmdbase.go delete mode 100644 utils/pubsub/README.md delete mode 100644 utils/pubsub/message_queue/message_queue.go delete mode 100644 utils/pubsub/message_queue/mocks/MessageQueue.go delete mode 100644 utils/pubsub/pubsub.go delete mode 100644 utils/pubsub/pubsub_test.go delete mode 100644 utils/spanutils/README.md delete mode 100644 utils/spanutils/spanutils.go delete mode 100644 utils/worker/README.md delete mode 100644 utils/worker/config.go delete mode 100644 utils/worker/types.go delete mode 100644 utils/worker/worker.go delete mode 100644 utils/worker/workerinfo.go diff --git a/builder/ServiceName/service/service.go b/builder/ServiceName/service/service.go index 4ae6c441..46c0d32f 100644 --- a/builder/ServiceName/service/service.go +++ b/builder/ServiceName/service/service.go @@ -7,12 +7,10 @@ import ( "log" "net/http" "strings" - "time" proto "github.com/carousell/Orion/builder/ServiceName/ServiceName_proto" "github.com/carousell/Orion/interceptors" "github.com/carousell/Orion/utils/headers" - "github.com/carousell/Orion/utils/worker" "github.com/gorilla/mux" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "google.golang.org/grpc" @@ -27,7 +25,6 @@ type svc struct { appendText string debug bool client proto.ServiceNameClient - worker worker.Worker } func (s *svc) GetRequestHeaders() []string { @@ -47,29 +44,10 @@ func GetService(config Config) proto.ServiceNameServer { log.Fatalln("did not connect: ", err) } s.client = proto.NewServiceNameClient(conn) - wConfig := worker.Config{} - wConfig.LocalMode = true - /* - wConfig.RabbitConfig = new(worker.RabbitMQConfig) - wConfig.RabbitConfig.Host = "192.168.99.100" - wConfig.RabbitConfig.QueueName = "test" - wConfig.RabbitConfig.UserName = "guest" - wConfig.RabbitConfig.Password = "guest" - */ - s.worker = worker.NewWorker(wConfig) - s.worker.RegisterTask("TestWorker", func(ctx context.Context, payload string) error { - time.Sleep(time.Millisecond * 200) - log.Println("worker", payload) - return nil - }) - s.worker.RunWorker("Worker", 1) return s } func DestroyService(obj interface{}) { - if s, ok := obj.(*svc); ok { - s.worker.CloseWorker() - } } func (s *svc) Echo(ctx context.Context, req *proto.EchoRequest) (*proto.EchoResponse, error) { @@ -106,7 +84,6 @@ func (s *svc) Upper(ctx context.Context, req *proto.UpperRequest) (*proto.UpperR defer sp.End() time.Sleep(100 * time.Millisecond) */ - go s.worker.Schedule(ctx, "TestWorker", req.GetMsg()) return resp, nil } diff --git a/example/echo/service/service.go b/example/echo/service/service.go index 6cfc493a..91558854 100644 --- a/example/echo/service/service.go +++ b/example/echo/service/service.go @@ -7,15 +7,14 @@ import ( "log" "net/http" "strings" - "time" - proto "github.com/carousell/Orion/example/echo/echo_proto" - "github.com/carousell/Orion/interceptors" - "github.com/carousell/Orion/utils/headers" - "github.com/carousell/Orion/utils/worker" "github.com/gorilla/mux" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "google.golang.org/grpc" + + proto "github.com/carousell/Orion/example/echo/echo_proto" + "github.com/carousell/Orion/interceptors" + "github.com/carousell/Orion/utils/headers" ) const ( @@ -27,7 +26,6 @@ type svc struct { appendText string debug bool client proto.EchoServiceClient - worker worker.Worker } func (s *svc) GetRequestHeaders() []string { @@ -47,29 +45,10 @@ func GetService(config Config) proto.EchoServiceServer { log.Fatalln("did not connect: %v", err) } s.client = proto.NewEchoServiceClient(conn) - wConfig := worker.Config{} - wConfig.LocalMode = true - /* - wConfig.RabbitConfig = new(worker.RabbitMQConfig) - wConfig.RabbitConfig.Host = "192.168.99.100" - wConfig.RabbitConfig.QueueName = "test" - wConfig.RabbitConfig.UserName = "guest" - wConfig.RabbitConfig.Password = "guest" - */ - s.worker = worker.NewWorker(wConfig) - s.worker.RegisterTask("TestWorker", func(ctx context.Context, payload string) error { - time.Sleep(time.Millisecond * 200) - log.Println("worker", payload) - return nil - }) - s.worker.RunWorker("Worker", 1) return s } func DestroyService(obj interface{}) { - if s, ok := obj.(*svc); ok { - s.worker.CloseWorker() - } } func (s *svc) Echo(ctx context.Context, req *proto.EchoRequest) (*proto.EchoResponse, error) { @@ -106,7 +85,6 @@ func (s *svc) Upper(ctx context.Context, req *proto.UpperRequest) (*proto.UpperR defer sp.End() time.Sleep(100 * time.Millisecond) */ - go s.worker.Schedule(ctx, "TestWorker", req.GetMsg()) return resp, nil } diff --git a/utils/documentation.go b/utils/documentation.go index 3773429f..1811d36e 100644 --- a/utils/documentation.go +++ b/utils/documentation.go @@ -2,14 +2,7 @@ package utils // This comment block (re)generates the documentation. //go:generate godoc2ghmd -ex -file=README.md github.com/carousell/Orion/utils -//go:generate godoc2ghmd -ex -file=executor/README.md github.com/carousell/Orion/utils/executor //go:generate godoc2ghmd -ex -file=headers/README.md github.com/carousell/Orion/utils/headers -//go:generate godoc2ghmd -ex -file=httptripper/README.md github.com/carousell/Orion/utils/httptripper -//go:generate godoc2ghmd -ex -file=httptripper/retry/README.md github.com/carousell/Orion/utils/httptripper/retry -//go:generate godoc2ghmd -ex -file=httptripper/strategy/README.md github.com/carousell/Orion/utils/httptripper/strategy //go:generate godoc2ghmd -ex -file=listenerutils/README.md github.com/carousell/Orion/utils/listenerutils -//go:generate godoc2ghmd -ex -file=spanutils/README.md github.com/carousell/Orion/utils/spanutils -//go:generate godoc2ghmd -ex -file=worker/README.md github.com/carousell/Orion/utils/worker //go:generate godoc2ghmd -ex -file=options/README.md github.com/carousell/Orion/utils/options -//go:generate godoc2ghmd -ex -file=pubsub/README.md github.com/carousell/Orion/utils/pubsub //go:generate godoc2ghmd -ex -file=log/README.md github.com/carousell/Orion/utils/log diff --git a/utils/executor/README.md b/utils/executor/README.md deleted file mode 100644 index 1ac9481a..00000000 --- a/utils/executor/README.md +++ /dev/null @@ -1,67 +0,0 @@ -# executor -`import "github.com/carousell/Orion/utils/executor"` - -* [Overview](#pkg-overview) -* [Imported Packages](#pkg-imports) -* [Index](#pkg-index) - -## Overview - -## Imported Packages - -No packages beyond the Go standard library are imported. - -## Index -* [type Executor](#Executor) - * [func NewExecutor(options ...Option) Executor](#NewExecutor) -* [type Option](#Option) - * [func WithConcurrency(n int) Option](#WithConcurrency) - * [func WithFailOnError(fail bool) Option](#WithFailOnError) -* [type Task](#Task) - -#### Package files -[executor.go](./executor.go) [types.go](./types.go) - -## type [Executor](./types.go#L7-L12) -``` go -type Executor interface { - //Add adds a task to executor queue - Add(task Task) - //Wait waits for all executors to finish or one of them to error based on option selected - Wait() error -} -``` -Executor is the interface for a basic executor pipeline - -### func [NewExecutor](./executor.go#L110) -``` go -func NewExecutor(options ...Option) Executor -``` -NewExecutor builds and returns an executor - -## type [Option](./types.go#L15) -``` go -type Option func(*config) -``` -Option represents different options available for Executor - -### func [WithConcurrency](./executor.go#L23) -``` go -func WithConcurrency(n int) Option -``` -WithConcurrency sets the number of concurrent works - -### func [WithFailOnError](./executor.go#L30) -``` go -func WithFailOnError(fail bool) Option -``` -WithFailOnError fails all task if even a single task returns a error - -## type [Task](./types.go#L4) -``` go -type Task func() error -``` -Task is the basic task that gets executed in executor - -- - - -Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd) \ No newline at end of file diff --git a/utils/executor/executor.go b/utils/executor/executor.go deleted file mode 100644 index bce27ebb..00000000 --- a/utils/executor/executor.go +++ /dev/null @@ -1,127 +0,0 @@ -package executor - -import ( - "fmt" - "sync" -) - -type exe struct { - wg sync.WaitGroup - work chan Task - config config - errc chan error - done chan bool - errored safeBool -} - -type config struct { - concurrency int - failOnError bool -} - -//WithConcurrency sets the number of concurrent works -func WithConcurrency(n int) Option { - return func(c *config) { - c.concurrency = n - } -} - -//WithFailOnError fails all task if even a single task returns a error -func WithFailOnError(fail bool) Option { - return func(c *config) { - c.failOnError = fail - } -} - -//Add adds a task to executor queue -func (e *exe) Add(task Task) { - e.wg.Add(1) - go e.add(task) -} - -func (e *exe) add(task Task) { - e.work <- task -} - -func (e *exe) worker() { - for t := range e.work { - // if e has errored stop processing and drain the queue - if !e.errored.Get() { - e.processTask(t) - } else { - // no task, just handle all wait groups - e.processTask(nil) - } - } -} - -func (e *exe) processTask(task Task) { - defer func(e *exe) { - if r := recover(); r != nil { - e.errc <- fmt.Errorf("PANIC: %s", r) - } - e.wg.Done() - }(e) - if task != nil { - err := task() - if err != nil { - if e.config.failOnError { - e.errc <- err - } - } - } -} - -//Wait waits for all executors to finish or one of them to error based on option selected -func (e *exe) Wait() error { - go func(e *exe) { - e.wg.Wait() - close(e.done) - close(e.errc) - close(e.work) - }(e) - for { - select { - case err := <-e.errc: - if err != nil { - e.errored.Set(true) - go func(errc chan error) { - for range errc { - } // drain rest of the errors - }(e.errc) - } - if e.config.failOnError { - return err - } - case <-e.done: - // rare corner case of errc trigring after done - // when only 1 task was added - select { - case err := <-e.errc: - return err - default: - return nil - } - } - } -} - -//NewExecutor builds and returns an executor -func NewExecutor(options ...Option) Executor { - c := config{ - concurrency: 5, - failOnError: true, - } - for _, opt := range options { - opt(&c) - } - e := new(exe) - e.work = make(chan Task, 0) - e.errc = make(chan error, 0) - e.done = make(chan bool, 0) - e.config = c - for i := 0; i < c.concurrency; i++ { - go e.worker() - } - return e -} diff --git a/utils/executor/executor_test.go b/utils/executor/executor_test.go deleted file mode 100644 index 29babb67..00000000 --- a/utils/executor/executor_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package executor - -import ( - "errors" - "math/rand" - "testing" - "time" - - "github.com/fortytw2/leaktest" - "github.com/stretchr/testify/assert" -) - -/* -no errors -all errors -random errors -test for panic -*/ - -const ( - runCount = 100 -) - -func TestNoErrors(t *testing.T) { - defer leaktest.Check(t)() - e := NewExecutor(WithFailOnError(true)) - - for i := 0; i < rand.Intn(runCount)+10; i++ { - e.Add(func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) - return nil - }) - } - // wait - assert.NoError(t, e.Wait(), "There should be no error") -} - -func TestNoErrorsNoFail(t *testing.T) { - defer leaktest.Check(t)() - e := NewExecutor(WithFailOnError(false)) - - for i := 0; i < rand.Intn(runCount)+10; i++ { - e.Add(func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) - return nil - }) - } - // wait - assert.NoError(t, e.Wait(), "There should be no error") -} - -func TestAllErrors(t *testing.T) { - defer leaktest.Check(t)() - e := NewExecutor(WithFailOnError(true)) - - for i := 0; i < rand.Intn(runCount)+10; i++ { - e.Add(func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) - return errors.New("error") - }) - } - // wait - assert.Error(t, e.Wait(), "There should be error") -} - -func TestAllErrorsNoFail(t *testing.T) { - defer leaktest.Check(t)() - e := NewExecutor(WithFailOnError(false)) - - for i := 0; i < rand.Intn(runCount)+10; i++ { - e.Add(func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) - return errors.New("error") - }) - } - // wait - assert.NoError(t, e.Wait(), "There should be no error") -} - -func TestRandomErrors(t *testing.T) { - defer leaktest.Check(t)() - e := NewExecutor() - errored := false - - for i := 0; i < rand.Intn(runCount)+10; i++ { - e.Add(func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) - if rand.Intn(200) > 400 { - errored = true - return errors.New("error") - } - return nil - }) - } - - if !errored { - e.Add(func() error { - return errors.New("error") - }) - } - // wait - assert.Error(t, e.Wait(), "There should be error") -} -func TestRandomErrorsNoFail(t *testing.T) { - defer leaktest.Check(t)() - e := NewExecutor(WithFailOnError(false)) - errored := false - - for i := 0; i < rand.Intn(runCount)+10; i++ { - e.Add(func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) - if rand.Intn(200) > 400 { - errored = true - return errors.New("error") - } - return nil - }) - } - - if !errored { - e.Add(func() error { - return errors.New("error") - }) - } - // wait - assert.NoError(t, e.Wait(), "There should be error") -} - -func TestRandomPanic(t *testing.T) { - defer leaktest.Check(t)() - e := NewExecutor() - paniced := safeBool{} - - for i := 0; i < rand.Intn(runCount)+10; i++ { - e.Add(func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) - if rand.Intn(500) > 300 { - paniced.Set(true) - panic("error") - } else { - return nil - } - }) - } - - if !paniced.Get() { - e.Add(func() error { - panic("error") - }) - } - // wait - assert.Error(t, e.Wait(), "There should be error") -} diff --git a/utils/executor/types.go b/utils/executor/types.go deleted file mode 100644 index d97edd92..00000000 --- a/utils/executor/types.go +++ /dev/null @@ -1,34 +0,0 @@ -package executor - -import "sync" - -//Task is the basic task that gets executed in executor -type Task func() error - -//Executor is the interface for a basic executor pipeline -type Executor interface { - //Add adds a task to executor queue - Add(task Task) - //Wait waits for all executors to finish or one of them to error based on option selected - Wait() error -} - -//Option represents different options available for Executor -type Option func(*config) - -type safeBool struct { - val bool - m sync.Mutex -} - -func (i *safeBool) Get() bool { - i.m.Lock() - defer i.m.Unlock() - return i.val -} - -func (i *safeBool) Set(val bool) { - i.m.Lock() - defer i.m.Unlock() - i.val = val -} diff --git a/utils/httptripper/README.md b/utils/httptripper/README.md deleted file mode 100644 index ba1227ff..00000000 --- a/utils/httptripper/README.md +++ /dev/null @@ -1,141 +0,0 @@ -# httptripper -`import "github.com/carousell/Orion/utils/httptripper"` - -* [Overview](#pkg-overview) -* [Imported Packages](#pkg-imports) -* [Index](#pkg-index) - -## Overview -Package httptripper provides an implementation of http.RoundTripper that provides retries, popluates opentracing span info and hystrix circuit breaker. - -### Setup -for most cases using the http.Client provided by the package is sufficient - - client := httptripper.NewHTTPClient(time.Millisecond * 500) - -Note: If you are using a custom http.Client, then just wrap your custom http.Client using httptripper.WrapTripper - - tripper := httptripper.WrapTripper(client.Transport) - client.Transport = tripper - -### How To Use -Make sure you use httptripper.NewRequest to build http.Request, since http.NewRequest does not take context as parameter - - httpReq, err := httptripper.NewRequest(ctx, "TracingName", "GET", url, nil) - -## Imported Packages - -- [github.com/afex/hystrix-go/hystrix](https://godoc.org/github.com/afex/hystrix-go/hystrix) -- [github.com/carousell/Orion/utils/httptripper/retry](./retry) -- [github.com/carousell/Orion/utils/spanutils](./../spanutils) - -## Index -* [func GetRequestRetrier(req \*http.Request) retry.Retriable](#GetRequestRetrier) -* [func GetRequestTraceName(req \*http.Request) string](#GetRequestTraceName) -* [func NewHTTPClient(timeout time.Duration, options ...Option) \*http.Client](#NewHTTPClient) -* [func NewRequest(ctx context.Context, traceName, method, url string, body io.Reader) (\*http.Request, error)](#NewRequest) -* [func NewRequestWithRetrier(ctx context.Context, traceName string, retrier retry.Retriable, method, url string, body io.Reader) (\*http.Request, error)](#NewRequestWithRetrier) -* [func NewTripper(options ...Option) http.RoundTripper](#NewTripper) -* [func SetRequestRetrier(req \*http.Request, retrier retry.Retriable) \*http.Request](#SetRequestRetrier) -* [func SetRequestTraceName(req \*http.Request, traceName string) \*http.Request](#SetRequestTraceName) -* [func WrapTripper(base http.RoundTripper) http.RoundTripper](#WrapTripper) -* [type Option](#Option) - * [func WithBaseTripper(base http.RoundTripper) Option](#WithBaseTripper) - * [func WithHystrix(enabled bool) Option](#WithHystrix) - * [func WithRetrier(retrier retry.Retriable) Option](#WithRetrier) -* [type OptionsData](#OptionsData) - -#### Package files -[httptripper.go](./httptripper.go) [types.go](./types.go) - -## func [GetRequestRetrier](./httptripper.go#L199) -``` go -func GetRequestRetrier(req *http.Request) retry.Retriable -``` -GetRequestRetrier fetches retrier to be used with this request - -## func [GetRequestTraceName](./httptripper.go#L181) -``` go -func GetRequestTraceName(req *http.Request) string -``` -GetRequestTraceName fetches a trace name from HTTP request - -## func [NewHTTPClient](./httptripper.go#L144) -``` go -func NewHTTPClient(timeout time.Duration, options ...Option) *http.Client -``` -NewHTTPClient creates a new http.Client with default retry options and timeout - -## func [NewRequest](./httptripper.go#L156) -``` go -func NewRequest(ctx context.Context, traceName, method, url string, body io.Reader) (*http.Request, error) -``` -NewRequest extends http.NewRequest with context and trace name - -## func [NewRequestWithRetrier](./httptripper.go#L165) -``` go -func NewRequestWithRetrier(ctx context.Context, traceName string, retrier retry.Retriable, method, url string, body io.Reader) (*http.Request, error) -``` -NewRequestWithRetrier extends http.NewRequest with context, trace name and retrier - -## func [NewTripper](./httptripper.go#L130) -``` go -func NewTripper(options ...Option) http.RoundTripper -``` -NewTripper returns a default tripper wrapped around http.DefaultTransport - -## func [SetRequestRetrier](./httptripper.go#L192) -``` go -func SetRequestRetrier(req *http.Request, retrier retry.Retriable) *http.Request -``` -SetRequestRetrier sets the retrier to be used with this request - -## func [SetRequestTraceName](./httptripper.go#L174) -``` go -func SetRequestTraceName(req *http.Request, traceName string) *http.Request -``` -SetRequestTraceName stores a trace name in a HTTP request - -## func [WrapTripper](./httptripper.go#L125) -``` go -func WrapTripper(base http.RoundTripper) http.RoundTripper -``` -WrapTripper wraps the base tripper with zipkin info - -## type [Option](./types.go#L24) -``` go -type Option func(*OptionsData) -``` -Option defines an options for Tripper - -### func [WithBaseTripper](./httptripper.go#L210) -``` go -func WithBaseTripper(base http.RoundTripper) Option -``` -WithBaseTripper updates the tripper to use the provided http.RoundTripper - -### func [WithHystrix](./httptripper.go#L224) -``` go -func WithHystrix(enabled bool) Option -``` -WithHystrix enables/disables use of hystrix - -### func [WithRetrier](./httptripper.go#L217) -``` go -func WithRetrier(retrier retry.Retriable) Option -``` -WithRetrier updates the tripper to use the provided retry.Retriable - -## type [OptionsData](./types.go#L17-L21) -``` go -type OptionsData struct { - BaseTripper http.RoundTripper - HystrixEnabled bool - Retrier retry.Retriable -} - -``` -OptionsData is the data polulated by the options - -- - - -Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd) \ No newline at end of file diff --git a/utils/httptripper/httptripper.go b/utils/httptripper/httptripper.go deleted file mode 100644 index 1f512c79..00000000 --- a/utils/httptripper/httptripper.go +++ /dev/null @@ -1,228 +0,0 @@ -/*Package httptripper provides an implementation of http.RoundTripper that provides retries, popluates opentracing span info and hystrix circuit breaker. - -Setup - -for most cases using the http.Client provided by the package is sufficient - client := httptripper.NewHTTPClient(time.Millisecond * 500) - -Note: If you are using a custom http.Client, then just wrap your custom http.Client using httptripper.WrapTripper - tripper := httptripper.WrapTripper(client.Transport) - client.Transport = tripper - -How To Use - -Make sure you use httptripper.NewRequest to build http.Request, since http.NewRequest does not take context as parameter - httpReq, err := httptripper.NewRequest(ctx, "TracingName", "GET", url, nil) - -*/ -package httptripper - -import ( - "context" - "errors" - "io" - "net" - "net/http" - "time" - - "github.com/afex/hystrix-go/hystrix" - "github.com/carousell/Orion/utils/httptripper/retry" - "github.com/carousell/Orion/utils/spanutils" -) - -var ( - defaultOptions = []Option{ - WithBaseTripper(http.DefaultTransport), - WithHystrix(true), - WithRetrier(retry.NewRetry()), - } -) - -type tripper struct { - option *OptionsData -} - -func (t *tripper) RoundTrip(req *http.Request) (*http.Response, error) { - if req == nil { - return nil, errors.New("Nil request received") - } - attempt := 0 - var resp *http.Response - var err error - for attempt == 0 || t.getRetrier(req).ShouldRetry(attempt, req, resp, err) { - // close body of previous response on retry - if resp != nil { - go resp.Body.Close() - } - if attempt != 0 { - time.Sleep(t.getRetrier(req).WaitDuration(attempt, req, resp, err)) - } - resp, err = t.doRoundTrip(req, attempt) - attempt++ - } - return resp, err -} - -func (t *tripper) doRoundTrip(req *http.Request, retryConut int) (*http.Response, error) { - traceName := GetRequestTraceName(req) - if traceName == "" { - traceName = req.Host - } - if !t.option.HystrixEnabled { - // hystrix not enabled go ahead without it - return t.makeRoundTrip(traceName, req, retryConut) - } - var resp *http.Response - var err error - err = hystrix.Do( - traceName, - func() error { - resp, err = t.makeRoundTrip(traceName, req, retryConut) - return err - }, - nil, - ) - return resp, err -} - -func (t *tripper) makeRoundTrip(traceName string, req *http.Request, retryConut int) (*http.Response, error) { - span, _ := spanutils.NewHTTPExternalSpan(req.Context(), - traceName, req.URL.String(), req.Header) - defer span.Finish() - span.SetTag("attempt", retryConut) - resp, err := t.getTripper().RoundTrip(req) - if err != nil { - span.SetTag("error", err.Error()) - if e, ok := err.(net.Error); ok && e.Timeout() { - span.SetTag("timeout", true) - } - } - if resp != nil { - span.SetTag("statuscode", resp.StatusCode) - } - return resp, err -} - -func (t *tripper) getTripper() http.RoundTripper { - if t.option.BaseTripper != nil { - return t.option.BaseTripper - } - return http.DefaultTransport -} - -func (t *tripper) getRetrier(req *http.Request) retry.Retriable { - r := GetRequestRetrier(req) - if r != nil { - return r - } - if t.option.Retrier != nil { - return t.option.Retrier - } - return retry.NewRetry() -} - -//WrapTripper wraps the base tripper with zipkin info -func WrapTripper(base http.RoundTripper) http.RoundTripper { - return NewTripper(WithBaseTripper(base)) -} - -//NewTripper returns a default tripper wrapped around http.DefaultTransport -func NewTripper(options ...Option) http.RoundTripper { - t := &tripper{ - option: &OptionsData{}, - } - for _, opt := range defaultOptions { - opt(t.option) - } - for _, opt := range options { - opt(t.option) - } - return t -} - -//NewHTTPClient creates a new http.Client with default retry options and timeout -func NewHTTPClient(timeout time.Duration, options ...Option) *http.Client { - if timeout == 0 { - // never use a 0 timeout - timeout = time.Second - } - return &http.Client{ - Transport: NewTripper(options...), - Timeout: timeout, - } -} - -//NewRequest extends http.NewRequest with context and trace name -func NewRequest(ctx context.Context, traceName, method, url string, body io.Reader) (*http.Request, error) { - req, err := http.NewRequest(method, url, body) - if err != nil { - return req, err - } - return SetRequestTraceName(req.WithContext(ctx), traceName), nil -} - -//NewRequestWithRetrier extends http.NewRequest with context, trace name and retrier -func NewRequestWithRetrier(ctx context.Context, traceName string, retrier retry.Retriable, method, url string, body io.Reader) (*http.Request, error) { - req, err := NewRequest(ctx, traceName, method, url, body) - if err != nil { - return req, err - } - return SetRequestRetrier(req, retrier), nil -} - -//SetRequestTraceName stores a trace name in a HTTP request -func SetRequestTraceName(req *http.Request, traceName string) *http.Request { - ctx := req.Context() - ctx = context.WithValue(ctx, traceID, traceName) - return req.WithContext(ctx) -} - -//GetRequestTraceName fetches a trace name from HTTP request -func GetRequestTraceName(req *http.Request) string { - ctx := req.Context() - if value := ctx.Value(traceID); value != nil { - if traceName, ok := value.(string); ok { - return traceName - } - } - return "" -} - -// SetRequestRetrier sets the retrier to be used with this request -func SetRequestRetrier(req *http.Request, retrier retry.Retriable) *http.Request { - ctx := req.Context() - ctx = context.WithValue(ctx, retrierKey, retrier) - return req.WithContext(ctx) -} - -//GetRequestRetrier fetches retrier to be used with this request -func GetRequestRetrier(req *http.Request) retry.Retriable { - ctx := req.Context() - if value := ctx.Value(retrierKey); value != nil { - if retrier, ok := value.(retry.Retriable); ok { - return retrier - } - } - return nil -} - -//WithBaseTripper updates the tripper to use the provided http.RoundTripper -func WithBaseTripper(base http.RoundTripper) Option { - return func(o *OptionsData) { - o.BaseTripper = base - } -} - -//WithRetrier updates the tripper to use the provided retry.Retriable -func WithRetrier(retrier retry.Retriable) Option { - return func(o *OptionsData) { - o.Retrier = retrier - } -} - -//WithHystrix enables/disables use of hystrix -func WithHystrix(enabled bool) Option { - return func(o *OptionsData) { - o.HystrixEnabled = enabled - } -} diff --git a/utils/httptripper/retry/README.md b/utils/httptripper/retry/README.md deleted file mode 100644 index 011816ca..00000000 --- a/utils/httptripper/retry/README.md +++ /dev/null @@ -1,88 +0,0 @@ -# retry -`import "github.com/carousell/Orion/utils/httptripper/retry"` - -* [Overview](#pkg-overview) -* [Imported Packages](#pkg-imports) -* [Index](#pkg-index) - -## Overview -Package retry provides an implementation for retrying http requests with multiple wait strategies - -## Imported Packages - -- [github.com/carousell/Orion/utils/httptripper/strategy](./../strategy) - -## Index -* [type Option](#Option) - * [func WithMaxRetry(max int) Option](#WithMaxRetry) - * [func WithRetryAllMethods(retryAllMethods bool) Option](#WithRetryAllMethods) - * [func WithRetryMethods(methods ...string) Option](#WithRetryMethods) - * [func WithStrategy(s strategy.Strategy) Option](#WithStrategy) -* [type OptionsData](#OptionsData) -* [type Retriable](#Retriable) - * [func NewRetry(options ...Option) Retriable](#NewRetry) - -#### Package files -[retry.go](./retry.go) [types.go](./types.go) - -## type [Option](./types.go#L27) -``` go -type Option func(*OptionsData) -``` -Option is the interface for defining Options - -### func [WithMaxRetry](./retry.go#L87) -``` go -func WithMaxRetry(max int) Option -``` -WithMaxRetry set the max number of times a request is tried - -### func [WithRetryAllMethods](./retry.go#L104) -``` go -func WithRetryAllMethods(retryAllMethods bool) Option -``` -WithRetryAllMethods sets retry on all HTTP methods, overrides WithRetryMethods - -### func [WithRetryMethods](./retry.go#L94) -``` go -func WithRetryMethods(methods ...string) Option -``` -WithRetryMethods specifies the methods that can be retried - -### func [WithStrategy](./retry.go#L111) -``` go -func WithStrategy(s strategy.Strategy) Option -``` -WithStrategy defines the backoff strategy to be used - -## type [OptionsData](./types.go#L19-L24) -``` go -type OptionsData struct { - MaxRetry int - Methods map[string]bool - RetryAllMethods bool - Strategy strategy.Strategy -} - -``` -OptionsData stores all options used across retry - -## type [Retriable](./types.go#L11-L16) -``` go -type Retriable interface { - //ShouldRetry should return when the failure needs to be retried - ShouldRetry(attempt int, req *http.Request, resp *http.Response, err error) bool - //WaitDuration should return the duration to wait before making next call - WaitDuration(attempt int, req *http.Request, resp *http.Response, err error) time.Duration -} -``` -Retriable is the interface implemented by a retrier - -### func [NewRetry](./retry.go#L71) -``` go -func NewRetry(options ...Option) Retriable -``` -NewRetry creates a new retry strategy - -- - - -Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd) \ No newline at end of file diff --git a/utils/httptripper/retry/retry.go b/utils/httptripper/retry/retry.go deleted file mode 100644 index 41b6730b..00000000 --- a/utils/httptripper/retry/retry.go +++ /dev/null @@ -1,115 +0,0 @@ -/* -Package retry provides an implementation for retrying http requests with multiple wait strategies -*/ -package retry - -import ( - "net" - "net/http" - "time" - - "github.com/carousell/Orion/utils/httptripper/strategy" -) - -var ( - defaultDelay = time.Millisecond * 30 - defaultOptions = []Option{ - WithMaxRetry(3), - WithRetryMethods(http.MethodGet, http.MethodOptions, http.MethodHead), - WithRetryAllMethods(false), - WithStrategy(strategy.DefaultStrategy(defaultDelay)), - } -) - -type defaultRetry struct { - option *OptionsData -} - -func (d *defaultRetry) ShouldRetry(attempt int, req *http.Request, resp *http.Response, err error) bool { - if resp != nil { - // dont retry for anything less than 5XX - if resp.StatusCode < 500 { - return false - } - } - /* - if err != nil && !d.isTimeout(err) { - // do not retry non timeout errors - return false - } - */ - if attempt < d.option.MaxRetry && req != nil { - if d.option.RetryAllMethods { - return true - } - if allowed, ok := d.option.Methods[req.Method]; ok { - return allowed - } - } - return false -} - -func (d *defaultRetry) isTimeout(err error) bool { - if err == nil { - return false - } - if e, ok := err.(net.Error); ok { - //retry a temporary error or timeout - return e.Temporary() || e.Timeout() - } - return false -} - -func (d *defaultRetry) WaitDuration(attempt int, req *http.Request, resp *http.Response, err error) time.Duration { - if d.option.Strategy == nil { - return defaultDelay - } - return d.option.Strategy.WaitDuration(attempt, d.option.MaxRetry, req, resp, err) -} - -//NewRetry creates a new retry strategy -func NewRetry(options ...Option) Retriable { - r := &defaultRetry{ - option: &OptionsData{}, - } - // apply default - for _, opt := range defaultOptions { - opt(r.option) - } - // apply user provided - for _, opt := range options { - opt(r.option) - } - return r -} - -//WithMaxRetry set the max number of times a request is tried -func WithMaxRetry(max int) Option { - return func(ro *OptionsData) { - ro.MaxRetry = max - } -} - -//WithRetryMethods specifies the methods that can be retried -func WithRetryMethods(methods ...string) Option { - return func(ro *OptionsData) { - ro.Methods = make(map[string]bool) - for _, method := range methods { - ro.Methods[method] = true - } - } -} - -//WithRetryAllMethods sets retry on all HTTP methods, overrides WithRetryMethods -func WithRetryAllMethods(retryAllMethods bool) Option { - return func(ro *OptionsData) { - ro.RetryAllMethods = retryAllMethods - } -} - -//WithStrategy defines the backoff strategy to be used -func WithStrategy(s strategy.Strategy) Option { - return func(ro *OptionsData) { - ro.Strategy = s - } -} diff --git a/utils/httptripper/retry/retry_test.go b/utils/httptripper/retry/retry_test.go deleted file mode 100644 index fbb2fb81..00000000 --- a/utils/httptripper/retry/retry_test.go +++ /dev/null @@ -1,162 +0,0 @@ -package retry - -import ( - "net/http" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestMaxRetry(t *testing.T) { - r := NewRetry(WithMaxRetry(5)) - tests := []struct { - value int - result bool - }{ - {1, true}, - {2, true}, - {3, true}, - {4, true}, - {5, false}, - {9, false}, - } - req, err := http.NewRequest("GET", "/some-path", nil) - if err != nil { - t.Fatal(err) - } - for _, test := range tests { - assert.Equal( - t, - test.result, - r.ShouldRetry(test.value, req, nil, nil), - "ShouldRetry vaules should match", - ) - } -} - -func TestRetryMethods(t *testing.T) { - r := NewRetry(WithRetryMethods(http.MethodGet, http.MethodHead)) - tests := []struct { - method string - result bool - }{ - {http.MethodGet, true}, - {http.MethodGet, true}, - {http.MethodPost, false}, - {http.MethodPut, false}, - } - // build a fake request - req, err := http.NewRequest("GET", "/some-path", nil) - if err != nil { - t.Fatal(err) - } - for _, test := range tests { - req.Method = test.method - assert.Equal( - t, - test.result, - r.ShouldRetry(1, req, nil, nil), - "WithRetryMethods should work", - ) - } -} - -func TestRetryAllMethods(t *testing.T) { - r := NewRetry( - WithRetryMethods(http.MethodGet, http.MethodHead), - WithRetryAllMethods(true), - ) - tests := []struct { - method string - result bool - }{ - {http.MethodGet, true}, - {http.MethodGet, true}, - {http.MethodPost, true}, - {http.MethodPut, true}, - } - // build a fake request - req, err := http.NewRequest("GET", "/some-path", nil) - if err != nil { - t.Fatal(err) - } - for _, test := range tests { - req.Method = test.method - assert.Equal( - t, - test.result, - r.ShouldRetry(1, req, nil, nil), - "RetryAllMethods should be set", - ) - } -} - -type stra struct { - called bool - duration time.Duration -} - -func (s *stra) WaitDuration(retryCount int, maxRetry int, req *http.Request, resp *http.Response, err error) time.Duration { - s.called = true - return s.duration -} - -func TestWithStrategy(t *testing.T) { - s := &stra{} - s.called = false - r := NewRetry(WithStrategy(s)) - tests := []struct { - result time.Duration - }{ - {time.Millisecond}, - {time.Millisecond * 100}, - {time.Second}, - {time.Second * 30}, - } - for _, test := range tests { - s.called = false - s.duration = test.result - assert.Equal( - t, - test.result, - r.WaitDuration(1, nil, nil, nil), - "WaitDuration vaules should match", - ) - assert.True( - t, - s.called, - "WaitDuration should be called", - ) - } -} - -func TestServerError(t *testing.T) { - tests := []struct { - code int - result bool - }{ - {http.StatusOK, false}, - {http.StatusInternalServerError, true}, - {http.StatusTooManyRequests, false}, - {http.StatusBadGateway, true}, - {http.StatusServiceUnavailable, true}, - } - r := NewRetry() - // build a fake request - req, err := http.NewRequest("GET", "/some-path", nil) - if err != nil { - t.Fatal(err) - } - resp := &http.Response{} - for _, test := range tests { - resp.StatusCode = test.code - assert.Equal( - t, - test.result, - r.ShouldRetry(1, req, resp, nil), - "Should not retry on server error for code %d", - test.code, - ) - } -} diff --git a/utils/httptripper/retry/types.go b/utils/httptripper/retry/types.go deleted file mode 100644 index d6700497..00000000 --- a/utils/httptripper/retry/types.go +++ /dev/null @@ -1,27 +0,0 @@ -package retry - -import ( - "net/http" - "time" - - "github.com/carousell/Orion/utils/httptripper/strategy" -) - -//Retriable is the interface implemented by a retrier -type Retriable interface { - //ShouldRetry should return when the failure needs to be retried - ShouldRetry(attempt int, req *http.Request, resp *http.Response, err error) bool - //WaitDuration should return the duration to wait before making next call - WaitDuration(attempt int, req *http.Request, resp *http.Response, err error) time.Duration -} - -//OptionsData stores all options used across retry -type OptionsData struct { - MaxRetry int - Methods map[string]bool - RetryAllMethods bool - Strategy strategy.Strategy -} - -//Option is the interface for defining Options -type Option func(*OptionsData) diff --git a/utils/httptripper/strategy/README.md b/utils/httptripper/strategy/README.md deleted file mode 100644 index 3c1dad5f..00000000 --- a/utils/httptripper/strategy/README.md +++ /dev/null @@ -1,45 +0,0 @@ -# strategy -`import "github.com/carousell/Orion/utils/httptripper/strategy"` - -* [Overview](#pkg-overview) -* [Imported Packages](#pkg-imports) -* [Index](#pkg-index) - -## Overview -Package strategy provides strategies for use with retry - -## Imported Packages - -No packages beyond the Go standard library are imported. - -## Index -* [type Strategy](#Strategy) - * [func DefaultStrategy(duration time.Duration) Strategy](#DefaultStrategy) - * [func ExponentialStrategy(duration time.Duration) Strategy](#ExponentialStrategy) - -#### Package files -[strategy.go](./strategy.go) [types.go](./types.go) - -## type [Strategy](./types.go#L9-L12) -``` go -type Strategy interface { - //WaitDuration takes attempt, maxRetry and request/response paramaetrs as input and gives out a duration as response - WaitDuration(attempt, maxRetry int, req *http.Request, resp *http.Response, err error) time.Duration -} -``` -Strategy is the interface requirement for any strategy implementation - -### func [DefaultStrategy](./strategy.go#L29) -``` go -func DefaultStrategy(duration time.Duration) Strategy -``` -DefaultStrategy provides implementation for Fixed duration wait - -### func [ExponentialStrategy](./strategy.go#L37) -``` go -func ExponentialStrategy(duration time.Duration) Strategy -``` -ExponentialStrategy provided implementation for exponentially (in powers of 2) growing wait duration - -- - - -Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd) \ No newline at end of file diff --git a/utils/httptripper/strategy/strategy.go b/utils/httptripper/strategy/strategy.go deleted file mode 100644 index 0f6d2bf4..00000000 --- a/utils/httptripper/strategy/strategy.go +++ /dev/null @@ -1,42 +0,0 @@ -/* -Package strategy provides strategies for use with retry -*/ -package strategy - -import ( - "math" - "net/http" - "time" -) - -type defaultStrategy struct { - duration time.Duration - exponential bool -} - -func (d *defaultStrategy) WaitDuration(attempt int, maxRetry int, req *http.Request, resp *http.Response, err error) time.Duration { - if !d.exponential { - return d.duration - } - if attempt <= 0 { - attempt = 1 - } - factor := int(math.Pow(2, float64(attempt))) - 1 - return time.Duration(factor) * d.duration -} - -//DefaultStrategy provides implementation for Fixed duration wait -func DefaultStrategy(duration time.Duration) Strategy { - return &defaultStrategy{ - duration: duration, - exponential: false, - } -} - -//ExponentialStrategy provided implementation for exponentially (in powers of 2) growing wait duration -func ExponentialStrategy(duration time.Duration) Strategy { - return &defaultStrategy{ - duration: duration, - exponential: true, - } -} diff --git a/utils/httptripper/strategy/strategy_test.go b/utils/httptripper/strategy/strategy_test.go deleted file mode 100644 index 2c8b5f4e..00000000 --- a/utils/httptripper/strategy/strategy_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package strategy - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestDefaultStrategy(t *testing.T) { - duration := time.Millisecond * 50 - s := DefaultStrategy(duration) - for _, val := range []int{1, 2, 3, 4, 8, 10, 14} { - assert.Equal( - t, - duration, - s.WaitDuration(val, 20, nil, nil, nil), - "Duration returned should always be fixed", - ) - } -} - -func TestExonentialStrategy(t *testing.T) { - duration := time.Millisecond * 50 - s := ExponentialStrategy(duration) - values := []struct { - attempt int - expectedDuration time.Duration - }{ - {0, duration}, - {1, duration}, - {2, duration * 3}, - {3, duration * 7}, - {4, duration * 15}, - {5, duration * 31}, - } - for _, val := range values { - assert.Equal( - t, - val.expectedDuration, - s.WaitDuration(val.attempt, 20, nil, nil, nil), - "Duration returned should grow exponentially", - ) - } -} diff --git a/utils/httptripper/strategy/types.go b/utils/httptripper/strategy/types.go deleted file mode 100644 index 78e268d3..00000000 --- a/utils/httptripper/strategy/types.go +++ /dev/null @@ -1,12 +0,0 @@ -package strategy - -import ( - "net/http" - "time" -) - -//Strategy is the interface requirement for any strategy implementation -type Strategy interface { - //WaitDuration takes attempt, maxRetry and request/response paramaetrs as input and gives out a duration as response - WaitDuration(attempt, maxRetry int, req *http.Request, resp *http.Response, err error) time.Duration -} diff --git a/utils/httptripper/types.go b/utils/httptripper/types.go deleted file mode 100644 index 0344a904..00000000 --- a/utils/httptripper/types.go +++ /dev/null @@ -1,24 +0,0 @@ -package httptripper - -import ( - "net/http" - - "github.com/carousell/Orion/utils/httptripper/retry" -) - -type key string - -const ( - traceID key = "HTTPRequestTracingName" - retrierKey key = "HTTPRequestRetrier" -) - -//OptionsData is the data polulated by the options -type OptionsData struct { - BaseTripper http.RoundTripper - HystrixEnabled bool - Retrier retry.Retriable -} - -// Option defines an options for Tripper -type Option func(*OptionsData) diff --git a/utils/kafka/config.go b/utils/kafka/config.go deleted file mode 100644 index 691b3376..00000000 --- a/utils/kafka/config.go +++ /dev/null @@ -1,84 +0,0 @@ -package kafka - -import ( - "time" - - "github.com/Shopify/sarama" -) - -func newConfig() config { - return config{ - saramaConfig: sarama.NewConfig(), - errorHandler: defaultErrorHandler, - } -} - -type config struct { - saramaConfig *sarama.Config - errorHandler func(error) -} - -// Option is used to configure the Kafka producer -type Option interface { - apply(cfg *config) -} - -type flushIntervalOption struct { - interval time.Duration -} - -func (opt *flushIntervalOption) apply(cfg *config) { - cfg.saramaConfig.Producer.Flush.Frequency = opt.interval -} - -// WithFlushInterval specifies a flush interval for the Kafka producer -func WithFlushInterval(interval time.Duration) Option { - return &flushIntervalOption{ - interval: interval, - } -} - -type maxRetriesOption struct { - maxRetries int -} - -func (opt *maxRetriesOption) apply(cfg *config) { - cfg.saramaConfig.Producer.Retry.Max = opt.maxRetries -} - -// WithMaxRetries specifies the total number of times to retry sending a message -func WithMaxRetries(maxRetries int) Option { - return &maxRetriesOption{ - maxRetries: maxRetries, - } -} - -type errorHandlerOption struct { - errorHandler func(error) -} - -func (opt *errorHandlerOption) apply(cfg *config) { - cfg.errorHandler = opt.errorHandler -} - -// WithErrorHandler specifies a custom error handler for the Kafka producer -func WithErrorHandler(errorHandler func(error)) Option { - return &errorHandlerOption{ - errorHandler: errorHandler, - } -} - -type versionOption struct { - version sarama.KafkaVersion -} - -func (opt *versionOption) apply(cfg *config) { - cfg.saramaConfig.Version = opt.version -} - -// WithVersion specified the kafka cluster version to connect to -func WithVersion(version sarama.KafkaVersion) Option { - return &versionOption{ - version: version, - } -} diff --git a/utils/kafka/producer.go b/utils/kafka/producer.go deleted file mode 100644 index 844ab673..00000000 --- a/utils/kafka/producer.go +++ /dev/null @@ -1,92 +0,0 @@ -package kafka - -import ( - "context" - "fmt" - - "github.com/Shopify/sarama" - "github.com/carousell/Orion/utils/errors" - "github.com/carousell/Orion/utils/errors/notifier" -) - -// Producer is a Kafka producer based on Sarama -type Producer struct { - asyncProducer sarama.AsyncProducer - open bool - errorHandler func(error) -} - -// NewProducer creates a Kafka producer -func NewProducer(brokers []string, opts ...Option) (*Producer, error) { - cfg := newConfig() - for _, opt := range opts { - opt.apply(&cfg) - } - - if len(brokers) == 0 { - return nil, errors.New("must provide at least one Kafka broker") - } - - asyncProducer, err := sarama.NewAsyncProducer(brokers, cfg.saramaConfig) - if err != nil { - return nil, errors.Wrap(err, "error creating async producer") - } - - return &Producer{ - asyncProducer: asyncProducer, - errorHandler: cfg.errorHandler, - }, nil -} - -// Run tells the producer to start accepting messages to publish to Kafka -func (p *Producer) Run() { - p.open = true - go func() { - defer func() { - if r := recover(); r != nil { - notifier.NotifyWithLevel(errors.Wrap(fmt.Errorf("%v", r), "panic in Kafka producer"), "critical") - } - }() - for { - select { - case err, ok := <-p.asyncProducer.Errors(): - if !ok { - return - } - p.errorHandler(err) - } - } - }() -} - -// Produce sends a message to a particular Kafka topic -func (p *Producer) Produce(ctx context.Context, topic string, key []byte, msg []byte) error { - if !p.open || p.asyncProducer == nil { - return errors.New("producer is closed") - } - var keyEncoder sarama.Encoder - if key != nil { - keyEncoder = sarama.ByteEncoder(key) - } - saramaMsg := &sarama.ProducerMessage{ - Topic: topic, - Key: keyEncoder, - Value: sarama.ByteEncoder(msg), - } - p.asyncProducer.Input() <- saramaMsg - return nil -} - -// Close stops the producer from accepting and sending any new messages -func (p *Producer) Close() error { - p.open = false - err := p.asyncProducer.Close() - if err != nil { - return errors.Wrap(err, "failed to close Kafka async producer") - } - return nil -} - -func defaultErrorHandler(err error) { - notifier.Notify(errors.Wrap(err, "failed to produce Kafka message")) -} diff --git a/utils/kafka/producer_test.go b/utils/kafka/producer_test.go deleted file mode 100644 index 208c2b0b..00000000 --- a/utils/kafka/producer_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package kafka - -import ( - "strings" - "testing" - - "context" - - "github.com/Shopify/sarama" - "github.com/Shopify/sarama/mocks" - "github.com/carousell/Orion/utils/errors" -) - -func TestProducer(t *testing.T) { - - cfg := sarama.Config{} - cfg.Producer.Return.Successes = true - cfg.Producer.Return.Errors = true - ctx := context.Background() - - t.Run("open producer accepts messages", func(t *testing.T) { - asyncProducer := mocks.NewAsyncProducer(t, &cfg) - asyncProducer.ExpectInputAndSucceed() - producer := Producer{ - asyncProducer: asyncProducer, - } - producer.Run() - defer producer.Close() - - err := producer.Produce(ctx, "test-topic", []byte("test-key"), []byte("test-payload")) - if err != nil { - t.Fatalf("expected no error, got %v", err) - } - resp := <-asyncProducer.Successes() - val, _ := resp.Value.Encode() - if string(val) != "test-payload" { - t.Fatalf("expected value sent to Kafka to be %v, got %v", "test-payload", string(val)) - } - }) - - t.Run("producer errors are handled by custom handler", func(t *testing.T) { - asyncProducer := mocks.NewAsyncProducer(t, &cfg) - testErr := errors.New("test error") - asyncProducer.ExpectInputAndFail(testErr) - - errCh := make(chan error) - errorHandler := func(err error) { - errCh <- err - } - - producer := Producer{ - asyncProducer: asyncProducer, - errorHandler: errorHandler, - } - producer.Run() - defer producer.Close() - - err := producer.Produce(ctx, "test-topic", []byte("test-key"), []byte("test-payload")) - if err != nil { - t.Fatalf("expected no error, got %v", err) - } - - producerErr := <-errCh - if !strings.Contains(producerErr.Error(), testErr.Error()) { - t.Fatalf("expected error handler to receive %v, got %v", testErr, producerErr) - } - }) - - t.Run("uninitialized producer does not accept messages", func(t *testing.T) { - producer := Producer{} - err := producer.Produce(ctx, "test-topic", []byte("test-key"), []byte("test-payload")) - if err == nil { - t.Fatal("expected error, got nil") - } - }) - - t.Run("closed producer does not accept messages", func(t *testing.T) { - asyncProducer := mocks.NewAsyncProducer(t, &cfg) - producer := Producer{ - asyncProducer: asyncProducer, - } - producer.Run() - producer.Close() - - err := producer.Produce(ctx, "test-topic", []byte("test-key"), []byte("test-payload")) - if err == nil { - t.Fatal("expected error, got nil") - } - }) -} diff --git a/utils/migrator/README.md b/utils/migrator/README.md deleted file mode 100644 index b5a4abf6..00000000 --- a/utils/migrator/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# Migrator - -Migrator is an utility for schema management in microservices, which based on [github.com/golang-migrate/migrate](golang-migrate) and extended as an cli command using github.com/spf13/cobra - -Given a cluster on which migration has to be managed - base engine has to be extended to enable execute of the supported commands. - -## Usage -migration-client [flags] [command] -``` -migration-client -c mysvcPG up -v 201903290057 -``` -###### Flag --c or --cluster : cluster identifier on which migration has to be performed --v or --version : version of the migration -###### Commands -- up : Applies all migration from current version of migration. If input version is specificed via version flag - applies migrations from current version until given version where given version should be greater than current. -- down : Applies migration from current version to input version specified via version flag. Here input version should lesser than current version -- force : Marks the version as applied without running any migration -- version : displays the current version of migration and its status - which signifies if the current version is applied successfully or not - -## Extending - -Please refer sample implementaion that extended `migratorcmdbase.go` in `example/migratorexample.go` - -Note: vendor drivers from golang-migrate/migrate based on your requirement of dbType i.e postgres, cassandra, mysql etc,. \ No newline at end of file diff --git a/utils/migrator/example/migratorexample.go b/utils/migrator/example/migratorexample.go deleted file mode 100644 index ddd59049..00000000 --- a/utils/migrator/example/migratorexample.go +++ /dev/null @@ -1,79 +0,0 @@ -package main - -import ( - "github.com/golang-migrate/migrate" - "github.com/golang-migrate/migrate/database" - "github.com/pkg/errors" - "github.com/carousell/Orion/utils/migrator" - //_ "github.com/golang-migrate/migrate/source/file" -) - -// sample only - can be customized based on use-case -type connectConfig struct { - dbUsername string - dbPassword string - dbHostPort string - dbName string -} - -type clusterData struct { - name string - dbType string - sourcePath string - connConfig connectConfig -} - -var clusterMap map[string]clusterData - -func getMigrationClient(cluster string) (*migrate.Migrate, error) { - var dbType string - var dbDriver database.Driver - var cData clusterData - switch cluster { - case "mycluster": - //cData = clusterMap[cluster] - //pgConnectUrl = "postgres://%s:%s@%s/%s?sslmode=disable" - //dbType = cData.dbType - //dbUrl := fmt.Sprintf(pgConnectUrl, cData.connectConfig.dbUsername, cData.connectConfig.dbPassword, - // cData.connectConfig.dbHostPort, cData.connectConfig.dbName) - //dbConn, err := sql.Open(dbType, dbUrl) - //if err != nil { - // fmt.Println("Error connecting", err) - // return nil, err - //} - //dbDriver, err = postgres.WithInstance(dbConn, &postgres.Config{}) - //if err != nil { - // fmt.Println("Error initializing postgres driver ", err) - // return nil, err - //} - default: - return nil, errors.New("unknown cluster") - } - - return migrate.NewWithDatabaseInstance(cData.sourcePath, dbType, dbDriver) -} - -func init() { - - clusterDataArray := []clusterData{ - { - name: "mycluster", - dbType: "postgres", - sourcePath: "file://migrator/files/", - connConfig: connectConfig{ // Fetch config from config management, instead of hard-coded here - dbUsername: "testUser", - dbPassword: "testPass", - dbHostPort: "192.186.100.10:5432", - dbName: "testDb", - }, - }, - } - clusterMap = make(map[string]clusterData, len(clusterDataArray)) - for _, cData := range clusterDataArray { - clusterMap[cData.name] = cData - } -} - -func main() { - migrator.Execute(getMigrationClient) -} diff --git a/utils/migrator/migratorcmdbase.go b/utils/migrator/migratorcmdbase.go deleted file mode 100644 index 861552de..00000000 --- a/utils/migrator/migratorcmdbase.go +++ /dev/null @@ -1,183 +0,0 @@ -package migrator - -import ( - "errors" - "fmt" - "github.com/golang-migrate/migrate" - "github.com/spf13/cobra" - "os" - "strings" -) - -const clusterFlag = "cluster" -const versionFlag = "version" - -// Initialize the Migrate with driver based on cluster -type MigrationClientFunc func(cluster string) (*migrate.Migrate, error) - -var mcf MigrationClientFunc - -var migrationClient *migrate.Migrate -var currentVersion uint - -var rootCmd = &cobra.Command{ - Use: "migration", - Short: "migration client", - Long: `A custom migration client built on top of - https://github.com/golang-migrate/migrate`, - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - var dirty bool - // fetch cluster of migration being executed - cluster, err := cmd.Flags().GetString(clusterFlag) - if err != nil { - return err - } else if strings.TrimSpace(cluster) == "" { - return errors.New("cluster is missing") - } - - // get the initialized migrationClient based on cluster - migrationClient, err = mcf(strings.TrimSpace(cluster)) - if err != nil { - return err - } - - // print current version before execution - currentVersion, dirty, err = migrationClient.Version() - if err != nil && err != migrate.ErrNilVersion { - return err - } - fmt.Printf("\nBefore execution: Version %d dirty %t \n", currentVersion, dirty) - return nil - }, -} - -var upCommand = &cobra.Command{ - Use: "up", - Short: "Apply all change or by version", - Long: `Applies all migration upwards`, - RunE: func(cmd *cobra.Command, args []string) error { - version, err := cmd.Flags().GetUint(versionFlag) - if err != nil { - fmt.Println("Invalid version value: ", err) - } - if version > 0 { - // apply migration until given version - where input version is greater than current version - if version <= currentVersion { - return errors.New("version should be greater than current version") - } - err = migrationClient.Migrate(version) - } else { - // apply all remaining migrations - err = migrationClient.Up() - } - - if err != nil && err != migrate.ErrNilVersion && err != migrate.ErrNoChange { - return err - } - return nil - }, -} - -var forceCommand = &cobra.Command{ - Use: "force", - Short: "forcefully mark fake migration for given version", - Long: `Marks version as applied irrespective of it being dirty`, - RunE: func(cmd *cobra.Command, args []string) error { - version, err := cmd.Flags().GetUint(versionFlag) - if err != nil { - return err - } - - if version > 0 { - // force apply any version - err = migrationClient.Force(int(version)) - } - - if err != nil && err != migrate.ErrNilVersion && err != migrate.ErrNoChange { - return err - } - return nil - }, -} - -var downCommand = &cobra.Command{ - Use: "down", - Short: "Undo migration to given version", - Long: `Applies down migration of all migration until given version`, - RunE: func(cmd *cobra.Command, args []string) error { - version, err := cmd.Flags().GetUint(versionFlag) - if err != nil { - return err - } - if version < 1 || version >= currentVersion { - // apply down migration until given version - where input version is lesser than current version - return errors.New("version should be lesser than current version") - } - err = migrationClient.Migrate(version) - - if err != nil && err != migrate.ErrNilVersion && err != migrate.ErrNoChange { - return err - } - return nil - }, -} - -var versionCommand = &cobra.Command{ - Use: "version", - Short: "Fetch current version", - Long: `Display current version and status`, - RunE: func(cmd *cobra.Command, args []string) error { - version, dirty, err := migrationClient.Version() - if err != nil && err != migrate.ErrNilVersion { - return err - } - fmt.Printf("\nversion: Current Version %d dirty %t \n", version, dirty) - return nil - }, -} - -func init() { - rootCmd.PersistentFlags().StringP(clusterFlag, "c", "", "cluster on which migration will be performed") - rootCmd.MarkPersistentFlagRequired(clusterFlag) - rootCmd.PersistentFlags().SortFlags = false - - //up command - version is optional - upCommand.Flags().UintP(versionFlag, "v", 0, "optional: version of migration to be applied") - - //force command - forceCommand.Flags().UintP(versionFlag, "v", 0, "version of migration to be applied") - forceCommand.MarkFlagRequired(versionFlag) - - //down command - downCommand.Flags().UintP(versionFlag, "v", 0, "version of migration to be applied") - downCommand.MarkFlagRequired(versionFlag) - - rootCmd.AddCommand(upCommand) - rootCmd.AddCommand(forceCommand) - rootCmd.AddCommand(downCommand) - rootCmd.AddCommand(versionCommand) -} - -func cleanUp() { - if migrationClient != nil { - //fmt.Println("Exiting after cleanup") - migrationClient.Close() - } -} - -func Execute(migrationClientFunc MigrationClientFunc) { - mcf = migrationClientFunc - defer func() { - if r := recover(); r != nil { - fmt.Println("Error executing command - ", r) - cleanUp() - os.Exit(1) - } - }() - err := rootCmd.Execute() - cleanUp() - if err != nil { - fmt.Println("Error executing command - ", err) - os.Exit(1) - } -} diff --git a/utils/pubsub/README.md b/utils/pubsub/README.md deleted file mode 100644 index 2c82c994..00000000 --- a/utils/pubsub/README.md +++ /dev/null @@ -1,59 +0,0 @@ -# pubsub -`import "github.com/carousell/Orion/utils/pubsub"` - -* [Overview](#pkg-overview) -* [Imported Packages](#pkg-imports) -* [Index](#pkg-index) - -## Overview - -## Imported Packages - -- [cloud.google.com/go/pubsub](https://godoc.org/cloud.google.com/go/pubsub) -- [github.com/afex/hystrix-go/hystrix](https://godoc.org/github.com/afex/hystrix-go/hystrix) -- [github.com/carousell/Orion/utils/executor](./../executor) -- [github.com/carousell/Orion/utils/log](./../log) -- [github.com/carousell/Orion/utils/pubsub/message_queue](./message_queue) -- [github.com/carousell/Orion/utils/spanutils](./../spanutils) - -## Index -* [type Config](#Config) -* [type Service](#Service) - * [func NewPubSubService(config Config) Service](#NewPubSubService) - -#### Package files -[pubsub.go](./pubsub.go) - -## type [Config](./pubsub.go#L16-L23) -``` go -type Config struct { - Key string - Project string - Enabled bool - Timeout int - BulkPublishConcurrency int - Retries int -} - -``` -Config is the config for pubsub - -## type [Service](./pubsub.go#L26-L31) -``` go -type Service interface { - PublishMessage(ctx context.Context, topic string, data []byte, waitSync bool) (*goPubSub.PublishResult, error) - BulkPublishMessages(ctx context.Context, topic string, data [][]byte, waitSync bool) - SubscribeMessages(ctx context.Context, subscribe string, subscribeFunction messageQueue.SubscribeFunction) error - Close() -} -``` -Service is the interface implemented by a pubsub service - -### func [NewPubSubService](./pubsub.go#L42) -``` go -func NewPubSubService(config Config) Service -``` -NewPubSubService build and returns an pubsub service handler - -- - - -Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd) \ No newline at end of file diff --git a/utils/pubsub/message_queue/message_queue.go b/utils/pubsub/message_queue/message_queue.go deleted file mode 100644 index bc1b2d65..00000000 --- a/utils/pubsub/message_queue/message_queue.go +++ /dev/null @@ -1,126 +0,0 @@ -package message_queue - -import ( - "log" - "strconv" - - goPubSub "cloud.google.com/go/pubsub" - cache "github.com/patrickmn/go-cache" - "golang.org/x/net/context" - "golang.org/x/oauth2/google" - "google.golang.org/api/option" -) - -//MessageQueue Intergace to the wrappers around Google pubsub lib calls -type MessageQueue interface { - Init(pubSubKey string, gProject string) error - Close() error - Publish(string, *PubSubData) *goPubSub.PublishResult - GetResult(ctx context.Context, result *goPubSub.PublishResult) (string, error) - SubscribeMessages(ctx context.Context, subscriptionId string, subscribeFunction SubscribeFunction) error -} - -//PubSubData represents msg format to be used for writing messages to pubsub -type PubSubData struct { - Id string - Timestamp int64 - Data []byte -} - -//PubSubQueue Required configs for interacting with pubsub -type PubSubQueue struct { - pubSubKey string - gProject string - PubsubClient *goPubSub.Client - ctx context.Context - topics *cache.Cache -} - -//NewMessageQueue create a new object to MessageQueue interface -func NewMessageQueue(enabled bool, serviceAccountKey string, project string) MessageQueue { - MessageQueue := new(PubSubQueue) - if enabled { - MessageQueue.Init(serviceAccountKey, project) - } - return MessageQueue -} - -//Init initiates connection to Google Pubsub -func (pubsubqueue *PubSubQueue) Init(pubSubKey string, gProject string) error { - var err error - pubsubqueue.pubSubKey = pubSubKey - pubsubqueue.gProject = gProject - pubsubqueue.ctx, pubsubqueue.PubsubClient, err = pubsubqueue.configurePubsub() - if err != nil { - log.Fatalln("Error in client connections to PubSub", err) - return err - } - pubsubqueue.topics = cache.New(cache.NoExpiration, cache.NoExpiration) - return nil -} - -//Close Closes all topic connections to pubsub -func (pubsubqueue *PubSubQueue) Close() error { - for _, item := range pubsubqueue.topics.Items() { - if topic, ok := item.Object.(*goPubSub.Topic); ok { - topic.Stop() - } - } - if pubsubqueue.PubsubClient != nil { - pubsubqueue.PubsubClient.Close() - } - - return nil -} - -func (pubsubqueue *PubSubQueue) configurePubsub() (context.Context, *goPubSub.Client, error) { - var err error - key := []byte(pubsubqueue.pubSubKey) - conf, err := google.JWTConfigFromJSON(key, "https://www.googleapis.com/auth/pubsub") - if err != nil { - log.Fatal(err) - } - ctx := context.Background() - ts := conf.TokenSource(ctx) - ps, err := goPubSub.NewClient(ctx, pubsubqueue.gProject, option.WithTokenSource(ts)) - if err != nil { - log.Fatal("Error in client connections to PubSub", err) - return nil, nil, err - } - return ctx, ps, nil -} - -//Publish publishes the given message to the topic -func (pubsubqueue *PubSubQueue) Publish(topicName string, pubSubData *PubSubData) *goPubSub.PublishResult { - var topic *goPubSub.Topic - if t, ok := pubsubqueue.topics.Get(topicName); ok { - if to, ok := t.(*goPubSub.Topic); ok { - topic = to - } - } - if topic == nil { - topic = pubsubqueue.PubsubClient.Topic(topicName) - pubsubqueue.topics.SetDefault(topicName, topic) - } - attributes := map[string]string{ - "id": pubSubData.Id, - "timestamp": strconv.FormatInt(pubSubData.Timestamp, 10), - } - publishResult := topic.Publish(pubsubqueue.ctx, &goPubSub.Message{Data: pubSubData.Data, Attributes: attributes}) - return publishResult -} - -//GetResult gets results of the publish call, can be used to make publish a sync call -func (pubsubqueue *PubSubQueue) GetResult(ctx context.Context, result *goPubSub.PublishResult) (string, error) { - return result.Get(ctx) -} - -//SubscribeFunction recieves messages from a subscription -type SubscribeFunction func(ctx context.Context, msg *goPubSub.Message) - -//SubscribeMessages Initales a subscriber call and assigns to given subscriber function -func (pubsubqueue *PubSubQueue) SubscribeMessages(ctx context.Context, subscriptionId string, subscribeFunction SubscribeFunction) error { - subscription := pubsubqueue.PubsubClient.Subscription(subscriptionId) - err := subscription.Receive(ctx, subscribeFunction) - return err -} diff --git a/utils/pubsub/message_queue/mocks/MessageQueue.go b/utils/pubsub/message_queue/mocks/MessageQueue.go deleted file mode 100644 index 0fd0c91d..00000000 --- a/utils/pubsub/message_queue/mocks/MessageQueue.go +++ /dev/null @@ -1,91 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. -package mocks - -import context "context" -import message_queue "github.com/carousell/Orion/utils/pubsub/message_queue" -import mock "github.com/stretchr/testify/mock" -import pubsub "cloud.google.com/go/pubsub" - -// MessageQueue is an autogenerated mock type for the MessageQueue type -type MessageQueue struct { - mock.Mock -} - -// Close provides a mock function with given fields: -func (_m *MessageQueue) Close() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// GetResult provides a mock function with given fields: ctx, result -func (_m *MessageQueue) GetResult(ctx context.Context, result *pubsub.PublishResult) (string, error) { - ret := _m.Called(ctx, result) - - var r0 string - if rf, ok := ret.Get(0).(func(context.Context, *pubsub.PublishResult) string); ok { - r0 = rf(ctx, result) - } else { - r0 = ret.Get(0).(string) - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *pubsub.PublishResult) error); ok { - r1 = rf(ctx, result) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Init provides a mock function with given fields: pubSubKey, gProject -func (_m *MessageQueue) Init(pubSubKey string, gProject string) error { - ret := _m.Called(pubSubKey, gProject) - - var r0 error - if rf, ok := ret.Get(0).(func(string, string) error); ok { - r0 = rf(pubSubKey, gProject) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Publish provides a mock function with given fields: _a0, _a1 -func (_m *MessageQueue) Publish(_a0 string, _a1 *message_queue.PubSubData) *pubsub.PublishResult { - ret := _m.Called(_a0, _a1) - - var r0 *pubsub.PublishResult - if rf, ok := ret.Get(0).(func(string, *message_queue.PubSubData) *pubsub.PublishResult); ok { - r0 = rf(_a0, _a1) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*pubsub.PublishResult) - } - } - - return r0 -} - -// SubscribeMessages provides a mock function with given fields: ctx, subscriptionId, subscribeFunction -func (_m *MessageQueue) SubscribeMessages(ctx context.Context, subscriptionId string, subscribeFunction message_queue.SubscribeFunction) error { - ret := _m.Called(ctx, subscriptionId, subscribeFunction) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, message_queue.SubscribeFunction) error); ok { - r0 = rf(ctx, subscriptionId, subscribeFunction) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/utils/pubsub/pubsub.go b/utils/pubsub/pubsub.go deleted file mode 100644 index 84fe89c6..00000000 --- a/utils/pubsub/pubsub.go +++ /dev/null @@ -1,116 +0,0 @@ -package pubsub - -import ( - "context" - "time" - - goPubSub "cloud.google.com/go/pubsub" - "github.com/afex/hystrix-go/hystrix" - "github.com/carousell/Orion/utils/executor" - "github.com/carousell/Orion/utils/log" - messageQueue "github.com/carousell/Orion/utils/pubsub/message_queue" - "github.com/carousell/Orion/utils/spanutils" -) - -//Config is the config for pubsub -type Config struct { - Key string - Project string - Enabled bool - Timeout int - BulkPublishConcurrency int - Retries int -} - -//Service is the interface implemented by a pubsub service -type Service interface { - PublishMessage(ctx context.Context, topic string, data []byte, waitSync bool) (*goPubSub.PublishResult, error) - BulkPublishMessages(ctx context.Context, topic string, data [][]byte, waitSync bool) - SubscribeMessages(ctx context.Context, subscribe string, subscribeFunction messageQueue.SubscribeFunction) error - Close() -} - -type pubSubService struct { - MessageQueue messageQueue.MessageQueue - Config Config -} - -var newMessageQueueFn = messageQueue.NewMessageQueue -var newExecutorFn = executor.NewExecutor - -//NewPubSubService build and returns an pubsub service handler -func NewPubSubService(config Config) Service { - hysConfig := hystrix.CommandConfig{Timeout: config.Timeout} - hystrix.ConfigureCommand("PubSubPublish", hysConfig) - return &pubSubService{ - MessageQueue: newMessageQueueFn(config.Enabled, config.Key, config.Project), - Config: config, - } -} - -//Close closes any active connection to Pubsub endpoint -func (g *pubSubService) Close() { - if g.Config.Enabled { - g.MessageQueue.Close() - } -} - -//Defaults to 1 retry -func (g *pubSubService) GetRetries() int { - if g.Config.Retries < 1 { - return 1 - } - return g.Config.Retries -} - -//PublishMessage publishes a single message to give topic, set waitSync param to true to wait for publish ack -func (g *pubSubService) PublishMessage(ctx context.Context, topic string, data []byte, waitSync bool) (*goPubSub.PublishResult, error) { - var result *goPubSub.PublishResult - retries := g.GetRetries() - for retries >= 0 { - retries-- - er := hystrix.Do("PubSubPublish", func() error { - span, _ := spanutils.NewExternalSpan(ctx, "PubSubPublish", "/"+topic) - // zipkin span - defer span.Finish() - pubsubData := new(messageQueue.PubSubData) - pubsubData.Data = data - pubsubData.Timestamp = time.Now().UnixNano() / 1000000 - result = g.MessageQueue.Publish(topic, pubsubData) - if waitSync { - _, err := g.MessageQueue.GetResult(ctx, result) - if err != nil { - return err - } - } - return nil - }, nil) - if er != nil { - log.Error(ctx, "err", er, "component", "pubsub msg publish") - } else { - break - } - } - if !waitSync { - return result, nil - } - return nil, nil -} - -//BulkPublishMessages publishes a multiple message to give topic, with "BulkPublishConcurrency" no of routines -func (g *pubSubService) BulkPublishMessages(ctx context.Context, topic string, data [][]byte, waitSync bool) { - e := newExecutorFn(executor.WithFailOnError(false), executor.WithConcurrency(g.Config.BulkPublishConcurrency)) - for _, v := range data { - singleMsg := v - e.Add(func() error { - _, err := g.PublishMessage(ctx, topic, singleMsg, waitSync) - return err - }) - } - e.Wait() -} - -//SubscribeMessages Subscirbes to pubsub and returns error if any -func (g *pubSubService) SubscribeMessages(ctx context.Context, subscribe string, subscribeFunction messageQueue.SubscribeFunction) error { - return g.MessageQueue.SubscribeMessages(ctx, subscribe, subscribeFunction) -} diff --git a/utils/pubsub/pubsub_test.go b/utils/pubsub/pubsub_test.go deleted file mode 100644 index 7f822fc0..00000000 --- a/utils/pubsub/pubsub_test.go +++ /dev/null @@ -1,180 +0,0 @@ -package pubsub - -import ( - "context" - "errors" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - goPubSub "cloud.google.com/go/pubsub" - "github.com/carousell/Orion/utils/executor" - "github.com/carousell/Orion/utils/pubsub/message_queue" - mockMessageQueue "github.com/carousell/Orion/utils/pubsub/message_queue/mocks" -) - -const ( - pubsubTopic = "test_topic" - pubsubMsg = "test data" - serverID = "some serverId" -) - -func setupMessageQueueMockCall(publishMethodCallCount int) (*mockMessageQueue.MessageQueue, *goPubSub.PublishResult) { - mockMessageQueue := &mockMessageQueue.MessageQueue{} - newMessageQueueFn = func(enabled bool, serviceAccountKey string, project string) message_queue.MessageQueue { - return mockMessageQueue - } - result := &goPubSub.PublishResult{} - mockMessageQueue.On("Publish", pubsubTopic, mock.MatchedBy(func(pubsubData *message_queue.PubSubData) bool { - return pubsubMsg == string(pubsubData.Data) - })).Return(result).Times(publishMethodCallCount) - return mockMessageQueue, result -} - -func TestPublishMessageSync(t *testing.T) { - ctx := context.Background() - mockMessageQueue, result := setupMessageQueueMockCall(1) - mockMessageQueue.On("GetResult", ctx, result).Return(serverID, nil) - - p := NewPubSubService(Config{}) - data := []byte(pubsubMsg) - response, err := p.PublishMessage(ctx, pubsubTopic, data, true) - assert.Nil(t, response) - assert.Nil(t, err) - p.Close() - - call := mockMessageQueue.Mock.ExpectedCalls[0] - assert.Equal(t, "Publish", call.Method) -} - -func TestPublishMessageAsync(t *testing.T) { - ctx := context.Background() - mockMessageQueue, result := setupMessageQueueMockCall(1) - - p := NewPubSubService(Config{}) - data := []byte(pubsubMsg) - response, err := p.PublishMessage(ctx, pubsubTopic, data, false) - assert.Equal(t, result, response) - assert.Nil(t, err) - p.Close() - - call := mockMessageQueue.Mock.ExpectedCalls[0] - assert.Equal(t, "Publish", call.Method) -} - -type mockExe struct { - tasks []executor.Task -} - -func (e *mockExe) Add(task executor.Task) { - e.tasks = append(e.tasks, task) -} -func (e *mockExe) Wait() error { - return nil -} - -func TestBulkPublishMessageAsync(t *testing.T) { - noOfDataToPublish := 2 - ctx := context.Background() - mockMessageQueue, _ := setupMessageQueueMockCall(noOfDataToPublish) - - mockExecutor := new(mockExe) - newExecutorFn = func(options ...executor.Option) executor.Executor { - return mockExecutor - } - p := NewPubSubService(Config{}) - data := [][]byte{[]byte(pubsubMsg), []byte(pubsubMsg)} - p.BulkPublishMessages(ctx, pubsubTopic, data, false) - p.Close() - - assert.Equal(t, noOfDataToPublish, len(mockExecutor.tasks)) - for i := 0; i < noOfDataToPublish; i++ { - mockExecutor.tasks[i]() - } - - call := mockMessageQueue.Mock.ExpectedCalls[0] - assert.Equal(t, "Publish", call.Method) -} - -func TestBulkPublishMessageSync(t *testing.T) { - noOfDataToPublish := 2 - ctx := context.Background() - mockMessageQueue, result := setupMessageQueueMockCall(noOfDataToPublish) - mockMessageQueue.On("GetResult", ctx, result).Return(serverID, nil).Times(noOfDataToPublish) - - mockExecutor := new(mockExe) - newExecutorFn = func(options ...executor.Option) executor.Executor { - return mockExecutor - } - p := NewPubSubService(Config{}) - data := [][]byte{[]byte(pubsubMsg), []byte(pubsubMsg)} - p.BulkPublishMessages(ctx, pubsubTopic, data, true) - p.Close() - - assert.Equal(t, noOfDataToPublish, len(mockExecutor.tasks)) - for i := 0; i < noOfDataToPublish; i++ { - mockExecutor.tasks[i]() - } - - call := mockMessageQueue.Mock.ExpectedCalls[0] - assert.Equal(t, "Publish", call.Method) -} - -type mockMessageQueueForRetry struct { - tries int -} - -func (_m *mockMessageQueueForRetry) Close() error { - return nil -} -func (_m *mockMessageQueueForRetry) GetResult(ctx context.Context, result *goPubSub.PublishResult) (string, error) { - _m.tries++ - return "", errors.New("Timeout") -} -func (_m *mockMessageQueueForRetry) Init(pubSubKey string, gProject string) error { - _m.tries = 0 - return nil -} -func (_m *mockMessageQueueForRetry) Publish(_a0 string, _a1 *message_queue.PubSubData) *goPubSub.PublishResult { - return nil -} -func (_m *mockMessageQueueForRetry) SubscribeMessages(ctx context.Context, subscriptionID string, subscribeFunction message_queue.SubscribeFunction) error { - return nil -} - -func TestPublishMessageSyncWithRetries(t *testing.T) { - ctx := context.Background() - mockMessageQueue := &mockMessageQueueForRetry{} - newMessageQueueFn = func(enabled bool, serviceAccountKey string, project string) message_queue.MessageQueue { - return mockMessageQueue - } - - p := NewPubSubService(Config{Retries: 2}) - data := []byte(pubsubMsg) - response, err := p.PublishMessage(ctx, pubsubTopic, data, true) - assert.Nil(t, response) - assert.Nil(t, err) - p.Close() - - // 1 + 2 reties - assert.Equal(t, 3, mockMessageQueue.tries) -} - -func testSubscriberFn(ctx context.Context, msg *goPubSub.Message) { -} -func TestSubscribeMessages(t *testing.T) { - ctx := context.Background() - mockMessageQueue := &mockMessageQueue.MessageQueue{} - newMessageQueueFn = func(enabled bool, serviceAccountKey string, project string) message_queue.MessageQueue { - return mockMessageQueue - } - mockMessageQueue.On("SubscribeMessages", ctx, "subscriptionId", mock.MatchedBy(func(subscriberFn message_queue.SubscribeFunction) bool { - return true - })).Return(nil) - p := NewPubSubService(Config{}) - p.SubscribeMessages(ctx, "subscriptionId", testSubscriberFn) - - call := mockMessageQueue.Mock.ExpectedCalls[0] - assert.Equal(t, "SubscribeMessages", call.Method) -} diff --git a/utils/spanutils/README.md b/utils/spanutils/README.md deleted file mode 100644 index 12fbbe33..00000000 --- a/utils/spanutils/README.md +++ /dev/null @@ -1,80 +0,0 @@ -# spanutils -`import "github.com/carousell/Orion/utils/spanutils"` - -* [Overview](#pkg-overview) -* [Imported Packages](#pkg-imports) -* [Index](#pkg-index) - -## Overview - -## Imported Packages - -- [github.com/carousell/Orion/utils](./..) -- [github.com/carousell/Orion/utils/log](./../log) -- [github.com/newrelic/go-agent](https://godoc.org/github.com/newrelic/go-agent) -- [github.com/opentracing/opentracing-go](https://godoc.org/github.com/opentracing/opentracing-go) -- [github.com/opentracing/opentracing-go/ext](https://godoc.org/github.com/opentracing/opentracing-go/ext) -- [go.elastic.co/apm](https://godoc.org/go.elastic.co/apm) -- [google.golang.org/grpc/metadata](https://godoc.org/google.golang.org/grpc/metadata) - -## Index -* [func ClientSpan(operationName string, ctx context.Context) (context.Context, opentracing.Span)](#ClientSpan) -* [func GRPCTracingSpan(operationName string, ctx context.Context) context.Context](#GRPCTracingSpan) -* [type TracingSpan](#TracingSpan) - * [func NewDatastoreSpan(ctx context.Context, name string, datastore string) (TracingSpan, context.Context)](#NewDatastoreSpan) - * [func NewExternalSpan(ctx context.Context, name string, url string) (TracingSpan, context.Context)](#NewExternalSpan) - * [func NewHTTPExternalSpan(ctx context.Context, name string, url string, hdr http.Header) (TracingSpan, context.Context)](#NewHTTPExternalSpan) - * [func NewInternalSpan(ctx context.Context, name string) (TracingSpan, context.Context)](#NewInternalSpan) - -#### Package files -[spanutils.go](./spanutils.go) - -## func [ClientSpan](./spanutils.go#L192) -``` go -func ClientSpan(operationName string, ctx context.Context) (context.Context, opentracing.Span) -``` -ClientSpan starts a new client span linked to the existing spans if any are found - -## func [GRPCTracingSpan](./spanutils.go#L208) -``` go -func GRPCTracingSpan(operationName string, ctx context.Context) context.Context -``` - -## type [TracingSpan](./spanutils.go#L19-L25) -``` go -type TracingSpan interface { - End() - Finish() - SetTag(key string, value interface{}) - SetQuery(query string) - SetError(msg string) -} -``` -TracingSpan defines an interface for implementing a tracing span - -### func [NewDatastoreSpan](./spanutils.go#L107) -``` go -func NewDatastoreSpan(ctx context.Context, name string, datastore string) (TracingSpan, context.Context) -``` -NewDatastoreSpan starts a span for tracing data store actions - -### func [NewExternalSpan](./spanutils.go#L146) -``` go -func NewExternalSpan(ctx context.Context, name string, url string) (TracingSpan, context.Context) -``` -NewExternalSpan starts a span for tracing external actions - -### func [NewHTTPExternalSpan](./spanutils.go#L151) -``` go -func NewHTTPExternalSpan(ctx context.Context, name string, url string, hdr http.Header) (TracingSpan, context.Context) -``` -NewHTTPExternalSpan starts a span for tracing external HTTP actions - -### func [NewInternalSpan](./spanutils.go#L91) -``` go -func NewInternalSpan(ctx context.Context, name string) (TracingSpan, context.Context) -``` -NewInternalSpan starts a span for tracing internal actions - -- - - -Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd) \ No newline at end of file diff --git a/utils/spanutils/spanutils.go b/utils/spanutils/spanutils.go deleted file mode 100644 index ee539130..00000000 --- a/utils/spanutils/spanutils.go +++ /dev/null @@ -1,206 +0,0 @@ -package spanutils - -import ( - "context" - "encoding/base64" - "net/http" - "strings" - - "github.com/carousell/Orion/utils" - newrelic "github.com/newrelic/go-agent" - opentracing "github.com/opentracing/opentracing-go" - otext "github.com/opentracing/opentracing-go/ext" - "go.elastic.co/apm" - "google.golang.org/grpc/metadata" -) - -// TracingSpan defines an interface for implementing a tracing span -type TracingSpan interface { - End() - Finish() - SetTag(key string, value interface{}) - SetQuery(query string) - SetError(msg string) -} - -type tracingSpan struct { - openSpan opentracing.Span - datastore bool - external bool - dataSegment newrelic.DatastoreSegment - externalSegment newrelic.ExternalSegment - segment newrelic.Segment - elasticSpan *apm.Span -} - -func (span *tracingSpan) End() { - if span == nil { - // dont panic when called against a nil span - return - } - span.openSpan.Finish() - - if span.datastore { - span.dataSegment.End() - } else if span.external { - span.externalSegment.End() - } else { - span.segment.End() - } - - if span.elasticSpan != nil { - span.elasticSpan.End() - } -} - -func (span *tracingSpan) Finish() { - span.End() -} - -func (span *tracingSpan) SetTag(key string, value interface{}) { - if span == nil { - // dont panic when called against a nil span - return - } - span.openSpan.SetTag(key, value) -} - -func (span *tracingSpan) SetQuery(query string) { - if span == nil { - // dont panic when called against a nil span - return - } - span.openSpan.SetTag("query", query) - if span.datastore { - span.dataSegment.ParameterizedQuery = query - } -} - -func (span *tracingSpan) SetError(msg string) { - if span == nil { - // dont panic when called against a nil span - return - } - if msg != "" { - span.openSpan.SetTag("error", true) - span.openSpan.SetTag("error.msg", msg) - } -} - -//NewInternalSpan starts a span for tracing internal actions -func NewInternalSpan(ctx context.Context, name string) (TracingSpan, context.Context) { - zip, ctx := opentracing.StartSpanFromContext(ctx, name) - txn := utils.GetNewRelicTransactionFromContext(ctx) - seg := newrelic.Segment{ - StartTime: newrelic.StartSegmentNow(txn), - Name: name, - } - eSpan, ctx := apm.StartSpan(ctx, name, "internal") - return &tracingSpan{ - openSpan: zip, - segment: seg, - elasticSpan: eSpan, - }, ctx -} - -//NewDatastoreSpan starts a span for tracing data store actions -func NewDatastoreSpan(ctx context.Context, name string, datastore string) (TracingSpan, context.Context) { - if !strings.HasPrefix(name, datastore) { - name = datastore + name - } - zip, ctx := opentracing.StartSpanFromContext(ctx, name) - zip.SetTag("store", datastore) - txn := utils.GetNewRelicTransactionFromContext(ctx) - seg := newrelic.DatastoreSegment{ - StartTime: newrelic.StartSegmentNow(txn), - Product: newrelic.DatastoreProduct(datastore), - Operation: name, - } - eSpan, ctx := apm.StartSpan(ctx, name, "datastore") - return &tracingSpan{ - openSpan: zip, - dataSegment: seg, - datastore: true, - elasticSpan: eSpan, - }, ctx -} - -func buildExternalSpan(ctx context.Context, name string, url string) (*tracingSpan, context.Context) { - ctx, zip := ClientSpan(name, ctx) - zip.SetTag("url", url) - txn := utils.GetNewRelicTransactionFromContext(ctx) - seg := newrelic.ExternalSegment{ - StartTime: newrelic.StartSegmentNow(txn), - URL: url, - } - eSpan, ctx := apm.StartSpan(ctx, name, "external") - return &tracingSpan{ - openSpan: zip, - externalSegment: seg, - external: true, - elasticSpan: eSpan, - }, ctx -} - -//NewExternalSpan starts a span for tracing external actions -func NewExternalSpan(ctx context.Context, name string, url string) (TracingSpan, context.Context) { - return buildExternalSpan(ctx, name, url) -} - -//NewHTTPExternalSpan starts a span for tracing external HTTP actions -func NewHTTPExternalSpan(ctx context.Context, name string, url string, hdr http.Header) (TracingSpan, context.Context) { - s, ctx := buildExternalSpan(ctx, name, url) - traceHTTPHeaders(ctx, s.openSpan, hdr) - return s, ctx -} - -func traceHTTPHeaders(ctx context.Context, sp opentracing.Span, hdr http.Header) { - // Transmit the span's TraceContext as HTTP headers on our - // outbound request. - opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(hdr)) -} - -// A type that conforms to opentracing.TextMapReader and -// opentracing.TextMapWriter. -type metadataReaderWriter struct { - *metadata.MD -} - -func (w metadataReaderWriter) Set(key, val string) { - key = strings.ToLower(key) - if strings.HasSuffix(key, "-bin") { - val = string(base64.StdEncoding.EncodeToString([]byte(val))) - } - (*w.MD)[key] = append((*w.MD)[key], val) -} - -func (w metadataReaderWriter) ForeachKey(handler func(key, val string) error) error { - for k, vals := range *w.MD { - for _, v := range vals { - if err := handler(k, v); err != nil { - return err - } - } - } - return nil -} - -//ClientSpan starts a new client span linked to the existing spans if any are found -func ClientSpan(operationName string, ctx context.Context) (context.Context, opentracing.Span) { - tracer := opentracing.GlobalTracer() - var clientSpan opentracing.Span - if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { - clientSpan = tracer.StartSpan( - operationName, - opentracing.ChildOf(parentSpan.Context()), - ) - } else { - clientSpan = tracer.StartSpan(operationName) - } - otext.SpanKindRPCClient.Set(clientSpan) - ctx = opentracing.ContextWithSpan(ctx, clientSpan) - return ctx, clientSpan -} diff --git a/utils/worker/README.md b/utils/worker/README.md deleted file mode 100644 index 62ae975c..00000000 --- a/utils/worker/README.md +++ /dev/null @@ -1,119 +0,0 @@ -# worker -`import "github.com/carousell/Orion/utils/worker"` - -* [Overview](#pkg-overview) -* [Imported Packages](#pkg-imports) -* [Index](#pkg-index) - -## Overview - -## Imported Packages - -- [github.com/RichardKnop/machinery/v1](https://godoc.org/github.com/RichardKnop/machinery/v1) -- [github.com/RichardKnop/machinery/v1/config](https://godoc.org/github.com/RichardKnop/machinery/v1/config) -- [github.com/RichardKnop/machinery/v1/tasks](https://godoc.org/github.com/RichardKnop/machinery/v1/tasks) -- [github.com/carousell/Orion/utils/errors](./../errors) -- [github.com/carousell/Orion/utils/log](./../log) -- [github.com/carousell/Orion/utils/spanutils](./../spanutils) -- [github.com/grpc-ecosystem/go-grpc-middleware/util/metautils](https://godoc.org/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils) -- [github.com/opentracing/opentracing-go](https://godoc.org/github.com/opentracing/opentracing-go) -- [github.com/opentracing/opentracing-go/ext](https://godoc.org/github.com/opentracing/opentracing-go/ext) -- [github.com/satori/go.uuid](https://godoc.org/github.com/satori/go.uuid) - -## Index -* [type Config](#Config) -* [type RabbitMQConfig](#RabbitMQConfig) -* [type ScheduleConfig](#ScheduleConfig) -* [type ScheduleOption](#ScheduleOption) - * [func WithETA(eta \*time.Time) ScheduleOption](#WithETA) - * [func WithQueueName(queueName string) ScheduleOption](#WithQueueName) - * [func WithRetry(n int) ScheduleOption](#WithRetry) -* [type Work](#Work) -* [type Worker](#Worker) - * [func NewWorker(config Config) Worker](#NewWorker) - -#### Package files -[config.go](./config.go) [types.go](./types.go) [worker.go](./worker.go) [workerinfo.go](./workerinfo.go) - -## type [Config](./types.go#L20-L23) -``` go -type Config struct { - LocalMode bool - RabbitConfig *RabbitMQConfig -} - -``` -Config is the config used to intialize workers - -## type [RabbitMQConfig](./types.go#L26-L33) -``` go -type RabbitMQConfig struct { - UserName string - Password string - BrokerVHost string - Host string - Port string - QueueName string -} - -``` -RabbitMQConfig is the config used for scheduling tasks through rabbitmq - -## type [ScheduleConfig](./types.go#L40-L44) -``` go -type ScheduleConfig struct { - // contains filtered or unexported fields -} - -``` -ScheduleConfig is the config used when scheduling a task - -## type [ScheduleOption](./types.go#L47) -``` go -type ScheduleOption func(*ScheduleConfig) -``` -ScheduleOption represents different options available for Schedule - -### func [WithETA](./worker.go#L47) -``` go -func WithETA(eta *time.Time) ScheduleOption -``` -WithETA sets the delay for this task - -### func [WithQueueName](./worker.go#L40) -``` go -func WithQueueName(queueName string) ScheduleOption -``` -WithQueueName sets the destination queue for this task - -### func [WithRetry](./worker.go#L33) -``` go -func WithRetry(n int) ScheduleOption -``` -WithRetry sets the number of Retries for this task - -## type [Work](./types.go#L36) -``` go -type Work func(ctx context.Context, payload string) error -``` -Work is the type of task that can be exeucted by Worker - -## type [Worker](./types.go#L12-L17) -``` go -type Worker interface { - Schedule(ctx context.Context, name, payload string, options ...ScheduleOption) error - RegisterTask(name string, taskFunc Work) error - RunWorker(name string, concurrency int) - CloseWorker() -} -``` -Worker is the interface for worker - -### func [NewWorker](./worker.go#L18) -``` go -func NewWorker(config Config) Worker -``` -NewWorker creates a new worker from given config - -- - - -Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd) \ No newline at end of file diff --git a/utils/worker/config.go b/utils/worker/config.go deleted file mode 100644 index 799b7b46..00000000 --- a/utils/worker/config.go +++ /dev/null @@ -1,70 +0,0 @@ -package worker - -import ( - "fmt" - "strings" -) - -func getServerName(config Config) string { - return fmt.Sprintf("amqp://%s:%s@%s:%s/%s", config.RabbitConfig.UserName, config.RabbitConfig.Password, - config.RabbitConfig.Host, config.RabbitConfig.Port, config.RabbitConfig.BrokerVHost) -} - -func buildRabbitConfig(cfg Config) *RabbitMQConfig { - if cfg.RabbitConfig != nil { - rabbitConfig := new(RabbitMQConfig) - if strings.TrimSpace(cfg.RabbitConfig.BrokerVHost) != "" { - rabbitConfig.BrokerVHost = cfg.RabbitConfig.BrokerVHost - } else { - rabbitConfig.BrokerVHost = "workers" - } - - if strings.TrimSpace(cfg.RabbitConfig.Host) != "" { - rabbitConfig.Host = cfg.RabbitConfig.Host - } else { - rabbitConfig.Host = "localhost" - } - - if strings.TrimSpace(cfg.RabbitConfig.Password) != "" { - rabbitConfig.Password = cfg.RabbitConfig.Password - } else { - rabbitConfig.Host = "guest" - } - - if strings.TrimSpace(cfg.RabbitConfig.Port) != "" { - rabbitConfig.Port = cfg.RabbitConfig.Port - } else { - rabbitConfig.Host = "5672" - } - - if strings.TrimSpace(cfg.RabbitConfig.QueueName) != "" { - rabbitConfig.QueueName = cfg.RabbitConfig.QueueName - } else { - rabbitConfig.QueueName = "WorkerQueue" - } - - /* - if cfg.RabbitConfig.ResultsExpireIn > 0 { - rabbitConfig.ResultsExpireIn = cfg.RabbitConfig.ResultsExpireIn - } else { - rabbitConfig.ResultsExpireIn = 1 - } - */ - - if strings.TrimSpace(cfg.RabbitConfig.UserName) != "" { - rabbitConfig.UserName = cfg.RabbitConfig.UserName - } else { - rabbitConfig.UserName = "guest" - } - return rabbitConfig - } - return nil -} - -func buildConfig(cfg Config) Config { - config := Config{} - config.RabbitConfig = buildRabbitConfig(cfg) - config.LocalMode = cfg.LocalMode - - return config -} diff --git a/utils/worker/types.go b/utils/worker/types.go deleted file mode 100644 index 73c87c05..00000000 --- a/utils/worker/types.go +++ /dev/null @@ -1,106 +0,0 @@ -package worker - -import ( - "context" - - "github.com/RichardKnop/machinery/v1/tasks" - - "time" -) - -//Worker is the interface for worker -type Worker interface { - Schedule(ctx context.Context, name, payload string, options ...ScheduleOption) error - RegisterTask(name string, taskFunc Work) error - RunWorker(name string, concurrency int) - CloseWorker() -} - -//Config is the config used to intialize workers -type Config struct { - LocalMode bool - RabbitConfig *RabbitMQConfig -} - -//RabbitMQConfig is the config used for scheduling tasks through rabbitmq -type RabbitMQConfig struct { - UserName string - Password string - BrokerVHost string - Host string - Port string - QueueName string -} - -//Work is the type of task that can be exeucted by Worker -type Work func(ctx context.Context, payload string) error -type wrappedWork func(payload string) error - -//ScheduleConfig is the config used when scheduling a task -type ScheduleConfig struct { - retries int - queueName string - eta *time.Time -} - -//ScheduleOption represents different options available for Schedule -type ScheduleOption func(*ScheduleConfig) - -type fakeBackend struct { -} - -func (f *fakeBackend) IsAMQP() bool { - return true -} - -func (f *fakeBackend) InitGroup(groupUUID string, taskUUIDs []string) error { - return nil -} - -func (f *fakeBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) { - return true, nil -} - -func (f *fakeBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) { - return []*tasks.TaskState{}, nil -} - -func (f *fakeBackend) TriggerChord(groupUUID string) (bool, error) { - return true, nil -} - -func (f *fakeBackend) SetStatePending(signature *tasks.Signature) error { - return nil -} - -func (f *fakeBackend) SetStateReceived(signature *tasks.Signature) error { - return nil -} - -func (f *fakeBackend) SetStateStarted(signature *tasks.Signature) error { - return nil -} - -func (f *fakeBackend) SetStateRetry(signature *tasks.Signature) error { - return nil -} - -func (f *fakeBackend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error { - return nil -} - -func (f *fakeBackend) SetStateFailure(signature *tasks.Signature, err string) error { - return nil -} - -func (f *fakeBackend) GetState(taskUUID string) (*tasks.TaskState, error) { - return new(tasks.TaskState), nil -} - -func (f *fakeBackend) PurgeState(taskUUID string) error { - return nil -} - -func (f *fakeBackend) PurgeGroupMeta(groupUUID string) error { - return nil -} diff --git a/utils/worker/worker.go b/utils/worker/worker.go deleted file mode 100644 index 40c297fd..00000000 --- a/utils/worker/worker.go +++ /dev/null @@ -1,196 +0,0 @@ -package worker - -import ( - "context" - "time" - - "github.com/RichardKnop/machinery/v1" - machineryConfig "github.com/RichardKnop/machinery/v1/config" - "github.com/RichardKnop/machinery/v1/tasks" - "github.com/carousell/Orion/utils/errors" - "github.com/carousell/Orion/utils/log" - "github.com/carousell/Orion/utils/spanutils" - opentracing "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" -) - -//NewWorker creates a new worker from given config -func NewWorker(config Config) Worker { - w := new(worker) - w.init(config) - return w -} - -type worker struct { - server *machinery.Server - worker *machinery.Worker - config Config - LocalMap map[string]wrappedWork - localWork chan *workerInfo -} - -//WithRetry sets the number of Retries for this task -func WithRetry(n int) ScheduleOption { - return func(c *ScheduleConfig) { - c.retries = n - } -} - -//WithQueueName sets the destination queue for this task -func WithQueueName(queueName string) ScheduleOption { - return func(c *ScheduleConfig) { - c.queueName = queueName - } -} - -//WithETA sets the delay for this task -func WithETA(eta *time.Time) ScheduleOption { - return func(c *ScheduleConfig) { - c.eta = eta - } -} - -func (w *worker) Schedule(ctx context.Context, name string, payload string, options ...ScheduleOption) error { - span, ctx := spanutils.NewInternalSpan(ctx, name+"Scheduled") - defer span.End() - if w.config.LocalMode { - return w.scheduleLocal(ctx, name, payload) - } - return w.scheduleRemote(ctx, name, payload, options...) -} - -func (w *worker) scheduleRemote(ctx context.Context, name string, payload string, options ...ScheduleOption) error { - c := ScheduleConfig{ - retries: 3, - } - for _, opt := range options { - opt(&c) - } - wi := newWorkerInfo(ctx, payload) - signature := &tasks.Signature{ - Name: name, - Args: []tasks.Arg{ - { - Type: "string", - Value: wi.String(), - }, - }, - } - signature.RetryCount = c.retries - signature.RoutingKey = c.queueName - signature.ETA = c.eta - if w.server == nil { - return errors.New("Server not initialized") - } - _, err := w.server.SendTask(signature) - if err != nil { - return err - } - return nil - -} - -func (w *worker) scheduleLocal(ctx context.Context, name string, payload string) error { - wi := newWorkerInfo(ctx, payload) - wi.Name = name - w.localWork <- wi - return nil -} - -func (w *worker) RegisterTask(name string, taskFunc Work) error { - if w.config.LocalMode { - w.LocalMap[name] = wrapperFunc(taskFunc) - } else { - if w.server == nil { - return errors.New("Server not initialized") - } - return w.server.RegisterTask(name, wrapperFunc(taskFunc)) - } - return nil -} - -func wrapperFunc(task Work) wrappedWork { - return func(payload string) error { - wi := unmarshalWorkerInfo(payload) - - // rebuild span context - wireContext, _ := opentracing.GlobalTracer().Extract( - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(wi.Trace)) - serverSpan := opentracing.StartSpan( - wi.Name+"Worker", - ext.RPCServerOption(wireContext)) - defer serverSpan.Finish() - ctx := opentracing.ContextWithSpan(context.Background(), serverSpan) - - sp, ctx := spanutils.NewInternalSpan(ctx, wi.Name+"Process") - defer sp.Finish() - // execute task - err := task(ctx, wi.Payload) - if err != nil { - sp.SetTag("error", err.Error()) - } - return err - } -} - -func (w *worker) init(config Config) error { - w.config = buildConfig(config) - if w.config.LocalMode { - w.LocalMap = make(map[string]wrappedWork) - w.localWork = make(chan *workerInfo, 100) - } else if config.RabbitConfig != nil { - //do machinery init - rabbitServer := getServerName(config) - var cfg = &machineryConfig.Config{ - Broker: rabbitServer, - DefaultQueue: config.RabbitConfig.QueueName, - AMQP: &machineryConfig.AMQPConfig{ - Exchange: "machinery_exchange", - ExchangeType: "direct", - BindingKey: config.RabbitConfig.QueueName, - }, - NoUnixSignals: true, - } - var err error - w.server, err = machinery.NewServer(cfg) - if err != nil { - log.Error(context.Background(), "err", err, "") - return err - } - w.server.SetBackend(&fakeBackend{}) - } - return nil -} - -func (w *worker) RunWorker(name string, concurrency int) { - if w.config.LocalMode { - for i := 0; i < concurrency; i++ { - go func() { - for wi := range w.localWork { - if f, ok := w.LocalMap[wi.Name]; ok { - f(wi.String()) - } else { - log.Error(context.Background(), "err", "could not find "+wi.Name) - } - } - }() - } - } else { - if w.server == nil { - log.Error(context.Background(), "err", "worker not started, server not initialized") - } else { - w.worker = w.server.NewWorker(name, concurrency) - errc := make(chan error, 1) - w.worker.LaunchAsync(errc) - } - } -} - -func (w *worker) CloseWorker() { - if w.config.LocalMode { - close(w.localWork) - } else { - w.worker.Quit() - } -} diff --git a/utils/worker/workerinfo.go b/utils/worker/workerinfo.go deleted file mode 100644 index 1eebfd78..00000000 --- a/utils/worker/workerinfo.go +++ /dev/null @@ -1,74 +0,0 @@ -package worker - -import ( - "context" - "encoding/json" - "net/http" - - "github.com/carousell/Orion/utils/log" - "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils" - opentracing "github.com/opentracing/opentracing-go" - "github.com/satori/go.uuid" -) - -type workerInfo struct { - Name string `json:"name"` - ID string `json:"id"` - Trace map[string][]string `json:"trace"` - Payload string `json:"payload"` -} - -func (w *workerInfo) MarshalTraceInfo(ctx context.Context) { - if w.Trace == nil { - w.Trace = http.Header{} - } - if sp := opentracing.SpanFromContext(ctx); sp != nil { - opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(w.Trace)) - } else { - log.Info(ctx, "trace", "not found") - } -} - -func (w *workerInfo) UnmarshalTraceInfo(ctx context.Context) context.Context { - wireContext, err := opentracing.GlobalTracer().Extract( - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(w.Trace)) - if err == nil { - md := metautils.ExtractIncoming(ctx) - opentracing.GlobalTracer().Inject(wireContext, opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(md)) - ctx = md.ToIncoming(ctx) - } - return ctx -} - -func (w *workerInfo) String() string { - data, err := json.Marshal(w) - if err != nil { - log.Info(context.Background(), "schedule", "scheduling error", "error", err) - return "" - } - return string(data) -} - -func newWorkerInfo(ctx context.Context, payload string) *workerInfo { - uuidValue, _ := uuid.NewV4() - wi := &workerInfo{ - ID: uuidValue.String(), - Payload: payload, - } - wi.MarshalTraceInfo(ctx) - return wi -} - -func unmarshalWorkerInfo(payload string) *workerInfo { - wi := new(workerInfo) - err := json.Unmarshal([]byte(payload), wi) - if err != nil { - log.Error(context.Background(), "worker", "can not deserialize work", "error", err) - return nil - } - return wi -}