From 193ba4d68fd2ee41fe05acde11ee6fdc155fdaee Mon Sep 17 00:00:00 2001 From: ramey Date: Tue, 23 Nov 2021 15:44:40 +0530 Subject: [PATCH] feat: add collector based design (#17) * feat: raccoon http+grpc * test: completing coding and unit tests * fix: resolved merge conflicts and changed integration tests * refactor: addressed review comments * ci: changing protoc version * ci: fixing protoc compilation in docker build * test: fixing tests * ci: fixing github build and integration tests * refactor: Removed pkg directory combined EventsBatch and CollectRequest --- .env.sample | 2 + .env.test | 2 + .github/workflows/integration-test.yaml | 2 +- .gitignore | 3 +- Dockerfile | 4 +- Makefile | 6 +- README.md | 2 +- app/server.go | 16 +- collection/collector.go | 20 ++ collection/mock.go | 16 + collection/service.go | 20 ++ collection/service_test.go | 36 +++ config/load.go | 1 + config/load_test.go | 6 + config/server.go | 13 + deserialization/deserializer.go | 11 + deserialization/json.go | 9 + deserialization/json_test.go | 27 ++ deserialization/proto.go | 20 ++ deserialization/proto_test.go | 27 ++ docker-compose.yml | 1 + docs/example/main.go | 2 +- go.mod | 7 +- go.sum | 73 ++++- http/grpc/handler.go | 66 ++++ http/grpc/handler_test.go | 114 +++++++ http/handler.go | 13 + http/handler_test.go | 33 ++ http/rest/handler.go | 141 +++++++++ http/rest/handler_test.go | 39 +++ http/rest/response.go | 44 +++ http/rest/response_test.go | 284 ++++++++++++++++++ http/server.go | 124 ++++++++ .../websocket}/connection/conn.go | 5 +- .../websocket}/connection/table.go | 22 +- http/websocket/connection/table_test.go | 50 +++ .../websocket}/connection/upgrader.go | 15 +- .../websocket}/connection/upgrader_test.go | 0 http/websocket/handler.go | 143 +++++++++ {websocket => http/websocket}/handler_test.go | 119 ++++++-- {websocket => http/websocket}/pinger.go | 5 +- .../identifier.go | 2 +- integration/integration_test.go | 257 +++++++++++++++- publisher/kafka.go | 5 +- publisher/kafka_test.go | 2 +- serialization/json.go | 9 + serialization/json_test.go | 27 ++ serialization/mock.go | 13 + serialization/proto.go | 21 ++ serialization/proto_test.go | 27 ++ serialization/serializer.go | 11 + websocket/connection/table_test.go | 49 --- websocket/handler.go | 85 ------ websocket/responsefactory.go | 34 --- websocket/server.go | 113 ------- worker/mocks.go | 3 +- worker/worker.go | 20 +- worker/worker_test.go | 17 +- 58 files changed, 1852 insertions(+), 386 deletions(-) create mode 100644 collection/collector.go create mode 100644 collection/mock.go create mode 100644 collection/service.go create mode 100644 collection/service_test.go create mode 100644 deserialization/deserializer.go create mode 100644 deserialization/json.go create mode 100644 deserialization/json_test.go create mode 100644 deserialization/proto.go create mode 100644 deserialization/proto_test.go create mode 100644 http/grpc/handler.go create mode 100644 http/grpc/handler_test.go create mode 100644 http/handler.go create mode 100644 http/handler_test.go create mode 100644 http/rest/handler.go create mode 100644 http/rest/handler_test.go create mode 100644 http/rest/response.go create mode 100644 http/rest/response_test.go create mode 100644 http/server.go rename {websocket => http/websocket}/connection/conn.go (90%) rename {websocket => http/websocket}/connection/table.go (64%) create mode 100644 http/websocket/connection/table_test.go rename {websocket => http/websocket}/connection/upgrader.go (93%) rename {websocket => http/websocket}/connection/upgrader_test.go (100%) create mode 100644 http/websocket/handler.go rename {websocket => http/websocket}/handler_test.go (51%) rename {websocket => http/websocket}/pinger.go (87%) rename {websocket/connection => identification}/identifier.go (88%) create mode 100644 serialization/json.go create mode 100644 serialization/json_test.go create mode 100644 serialization/mock.go create mode 100644 serialization/proto.go create mode 100644 serialization/proto_test.go create mode 100644 serialization/serializer.go delete mode 100644 websocket/connection/table_test.go delete mode 100644 websocket/handler.go delete mode 100644 websocket/responsefactory.go delete mode 100644 websocket/server.go diff --git a/.env.sample b/.env.sample index 1f405f1b..004016e0 100644 --- a/.env.sample +++ b/.env.sample @@ -9,6 +9,8 @@ SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS=60000 SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS=5000 SERVER_WEBSOCKET_PINGER_SIZE=1 +SERVER_GRPC_PORT=8081 + WORKER_BUFFER_CHANNEL_SIZE=5 WORKER_BUFFER_FLUSH_TIMEOUT_MS=5000 WORKER_POOL_SIZE=5 diff --git a/.env.test b/.env.test index 12596ea2..4d6aad85 100644 --- a/.env.test +++ b/.env.test @@ -10,6 +10,8 @@ SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS=10000 SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS=1000 SERVER_WEBSOCKET_PINGER_SIZE=1 +SERVER_GRPC_PORT=8081 + WORKER_BUFFER_CHANNEL_SIZE=5 WORKER_BUFFER_FLUSH_TIMEOUT_MS=5000 WORKER_POOL_SIZE=5 diff --git a/.github/workflows/integration-test.yaml b/.github/workflows/integration-test.yaml index 20acce51..016c05d8 100644 --- a/.github/workflows/integration-test.yaml +++ b/.github/workflows/integration-test.yaml @@ -20,4 +20,4 @@ jobs: run: make docker-run - run: make install-protoc && make generate-proto - name: Invoking go test - run: INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=ws://localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" go test ./integration -v + run: INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" GRPC_SERVER_ADDR="localhost:8081" go test ./integration -v diff --git a/.gitignore b/.gitignore index 3357fc6c..07c0c67c 100644 --- a/.gitignore +++ b/.gitignore @@ -15,5 +15,6 @@ coverage *.idea/ clickstream-service raccoon -websocket/proto/*.pb.go +pkg/proto/*.pb.go +proto/*.pb.go .temp diff --git a/Dockerfile b/Dockerfile index b4f636d7..45364ad4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,8 +2,8 @@ FROM golang:1.14 WORKDIR /app RUN apt-get update && apt-get install unzip --no-install-recommends --assume-yes -RUN PROTOC_ZIP=protoc-3.14.0-linux-x86_64.zip && \ -curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.14.0/$PROTOC_ZIP && \ +RUN PROTOC_ZIP=protoc-3.17.3-linux-x86_64.zip && \ +curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.17.3/$PROTOC_ZIP && \ unzip -o $PROTOC_ZIP -d /usr/local bin/protoc && \ unzip -o $PROTOC_ZIP -d /usr/local 'include/*' && \ rm -f $PROTOC_ZIP diff --git a/Makefile b/Makefile index c38bf4af..73af0c07 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ install-protoc: @echo "> installing dependencies" go get -u github.com/golang/protobuf/proto@v1.4.3 go get -u github.com/golang/protobuf/protoc-gen-go@v1.4.3 + go get -u google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1 update-deps: go mod tidy -v @@ -22,12 +23,13 @@ update-deps: copy-config: cp .env.sample .env -PROTO_PACKAGE=/websocket/proto +PROTO_PACKAGE=/proto generate-proto: rm -rf .temp mkdir -p .temp curl -o .temp/proton.tar.gz -L http://api.github.com/repos/odpf/proton/tarball/main; tar xvf .temp/proton.tar.gz -C .temp/ --strip-components 1 - protoc --proto_path=.temp/ .temp/odpf/raccoon/*.proto --go_out=./ --go_opt=paths=import --go_opt=Modpf/raccoon/Event.proto=$(PROTO_PACKAGE) --go_opt=Modpf/raccoon/EventRequest.proto=$(PROTO_PACKAGE) --go_opt=Modpf/raccoon/EventResponse.proto=$(PROTO_PACKAGE) + protoc --proto_path=.temp/ .temp/odpf/raccoon/Event.proto .temp/odpf/raccoon/EventRequest.proto .temp/odpf/raccoon/EventResponse.proto --go_out=./ --go_opt=paths=import --go_opt=Modpf/raccoon/Event.proto=$(PROTO_PACKAGE) --go_opt=Modpf/raccoon/EventRequest.proto=$(PROTO_PACKAGE) --go_opt=Modpf/raccoon/EventResponse.proto=$(PROTO_PACKAGE) + protoc --proto_path=.temp/ .temp/odpf/raccoon/*.proto --go-grpc_opt=paths=import --go-grpc_opt=Modpf/raccoon/Event.proto=$(PROTO_PACKAGE) --go-grpc_opt=Modpf/raccoon/EventRequest.proto=$(PROTO_PACKAGE) --go-grpc_opt=Modpf/raccoon/EventResponse.proto=$(PROTO_PACKAGE) --go-grpc_opt=Modpf/raccoon/EventService.proto=$(PROTO_PACKAGE) --go-grpc_out=./ # Build Lifecycle compile: diff --git a/README.md b/README.md index 69e31f9b..9082195e 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ $ make test # Running integration tests $ cp .env.test .env $ make docker-run -$ INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=ws://localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" go test ./integration -v +$ INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=ws://localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" GRPC_SERVER_ADDR="localhost:8081" go test ./integration -v ``` ## Contribute diff --git a/app/server.go b/app/server.go index 1a8f5778..eca4576c 100644 --- a/app/server.go +++ b/app/server.go @@ -3,23 +3,24 @@ package app import ( "context" "fmt" - "net/http" "os" "os/signal" + "raccoon/collection" "raccoon/config" + raccoonhttp "raccoon/http" "raccoon/logger" "raccoon/metrics" "raccoon/publisher" - ws "raccoon/websocket" "raccoon/worker" "syscall" ) // StartServer starts the server func StartServer(ctx context.Context, cancel context.CancelFunc) { - wssServer, bufferChannel := ws.CreateServer() + bufferChannel := make(chan *collection.CollectRequest) + httpserver := raccoonhttp.CreateServer(bufferChannel) logger.Info("Start Server -->") - wssServer.StartHTTPServer(ctx, cancel) + httpserver.StartServers(ctx, cancel) logger.Info("Start publisher -->") kPublisher, err := publisher.NewKafka() if err != nil { @@ -32,10 +33,10 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) { workerPool := worker.CreateWorkerPool(config.Worker.WorkersPoolSize, bufferChannel, config.Worker.DeliveryChannelSize, kPublisher) workerPool.StartWorkers() go kPublisher.ReportStats() - go shutDownServer(ctx, cancel, wssServer.HTTPServer, bufferChannel, workerPool, kPublisher) + go shutDownServer(ctx, cancel, httpserver, bufferChannel, workerPool, kPublisher) } -func shutDownServer(ctx context.Context, cancel context.CancelFunc, wssServer *http.Server, bufferChannel chan ws.EventsBatch, workerPool *worker.Pool, kp *publisher.Kafka) { +func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServer *raccoonhttp.Servers, bufferChannel chan *collection.CollectRequest, workerPool *worker.Pool, kp *publisher.Kafka) { signalChan := make(chan os.Signal) signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) for { @@ -43,7 +44,8 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, wssServer *h switch sig { case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: logger.Info(fmt.Sprintf("[App.Server] Received a signal %s", sig)) - wssServer.Shutdown(ctx) + httpServer.HTTPServer.Shutdown(ctx) + httpServer.GRPCServer.GracefulStop() logger.Info("Server shutdown all the listeners") timedOut := workerPool.FlushWithTimeOut(config.Worker.WorkerFlushTimeout) if timedOut { diff --git a/collection/collector.go b/collection/collector.go new file mode 100644 index 00000000..82bee0f1 --- /dev/null +++ b/collection/collector.go @@ -0,0 +1,20 @@ +package collection + +import ( + "context" + "time" + + "raccoon/identification" + pb "raccoon/proto" +) + +type CollectRequest struct { + ConnectionIdentifier *identification.Identifier + TimeConsumed time.Time + TimePushed time.Time + *pb.EventRequest +} + +type Collector interface { + Collect(ctx context.Context, req *CollectRequest) error +} diff --git a/collection/mock.go b/collection/mock.go new file mode 100644 index 00000000..443a7ec9 --- /dev/null +++ b/collection/mock.go @@ -0,0 +1,16 @@ +package collection + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +type MockCollector struct { + mock.Mock +} + +func (m *MockCollector) Collect(ctx context.Context, req *CollectRequest) error { + args := m.Called(ctx, req) + return args.Error(0) +} diff --git a/collection/service.go b/collection/service.go new file mode 100644 index 00000000..8a23746c --- /dev/null +++ b/collection/service.go @@ -0,0 +1,20 @@ +package collection + +import ( + "context" + "time" +) + +type CollectFunction func(ctx context.Context, req *CollectRequest) error + +func (c CollectFunction) Collect(ctx context.Context, req *CollectRequest) error { + return c(ctx, req) +} + +func NewChannelCollector(c chan *CollectRequest) Collector { + return CollectFunction(func(ctx context.Context, req *CollectRequest) error { + req.TimePushed = time.Now() + c <- req + return nil + }) +} diff --git a/collection/service_test.go b/collection/service_test.go new file mode 100644 index 00000000..7b49805e --- /dev/null +++ b/collection/service_test.go @@ -0,0 +1,36 @@ +package collection + +import ( + "context" + "reflect" + "testing" +) + +func TestNewChannelCollector(t *testing.T) { + type args struct { + c chan *CollectRequest + } + c := make(chan *CollectRequest) + tests := []struct { + name string + args args + want Collector + }{ + { + name: "Creating collector", + args: args{ + c: c, + }, + want: CollectFunction(func(ctx context.Context, req *CollectRequest) error { + return nil + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewChannelCollector(tt.args.c); reflect.TypeOf(got) != reflect.TypeOf(tt.want) { + t.Errorf("NewChannelCollector() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/config/load.go b/config/load.go index edb178d2..99c6a79d 100644 --- a/config/load.go +++ b/config/load.go @@ -23,6 +23,7 @@ func Load() { logConfigLoader() publisherKafkaConfigLoader() serverWsConfigLoader() + serverGRPCConfigLoader() workerConfigLoader() metricStatsdConfigLoader() eventDistributionConfigLoader() diff --git a/config/load_test.go b/config/load_test.go index 53de284b..d3167fd1 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -34,6 +34,12 @@ func TestServerConfig(t *testing.T) { assert.Equal(t, time.Duration(1)*time.Millisecond, ServerWs.PongWaitInterval) } +func TestGRPCServerConfig(t *testing.T) { + os.Setenv("SERVER_GRPC_PORT", "8081") + serverGRPCConfigLoader() + assert.Equal(t, "8081", ServerGRPC.Port) +} + func TestDynamicConfigLoad(t *testing.T) { os.Setenv("PUBLISHER_KAFKA_CLIENT_RANDOM", "anything") os.Setenv("PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS", "localhost:9092") diff --git a/config/server.go b/config/server.go index a3c79e22..5b3ffcd5 100644 --- a/config/server.go +++ b/config/server.go @@ -8,6 +8,7 @@ import ( ) var ServerWs serverWs +var ServerGRPC serverGRPC type serverWs struct { AppPort string @@ -24,6 +25,10 @@ type serverWs struct { ConnGroupDefault string } +type serverGRPC struct { + Port string +} + func serverWsConfigLoader() { viper.SetDefault("SERVER_WEBSOCKET_PORT", "8080") viper.SetDefault("SERVER_WEBSOCKET_MAX_CONN", 30000) @@ -52,3 +57,11 @@ func serverWsConfigLoader() { ConnGroupDefault: util.MustGetString("SERVER_WEBSOCKET_CONN_GROUP_DEFAULT"), } } + +func serverGRPCConfigLoader() { + + viper.SetDefault("SERVER_GRPC_PORT", "8081") + ServerGRPC = serverGRPC{ + Port: util.MustGetString("SERVER_GRPC_PORT"), + } +} diff --git a/deserialization/deserializer.go b/deserialization/deserializer.go new file mode 100644 index 00000000..56945bb0 --- /dev/null +++ b/deserialization/deserializer.go @@ -0,0 +1,11 @@ +package deserialization + +type Deserializer interface { + Deserialize(b []byte, i interface{}) error +} + +type DeserializeFunc func(b []byte, i interface{}) error + +func (f DeserializeFunc) Deserialize(b []byte, i interface{}) error { + return f(b, i) +} diff --git a/deserialization/json.go b/deserialization/json.go new file mode 100644 index 00000000..a7ec0bf6 --- /dev/null +++ b/deserialization/json.go @@ -0,0 +1,9 @@ +package deserialization + +import "encoding/json" + +func JSONDeserializer() Deserializer { + return DeserializeFunc(func(b []byte, i interface{}) error { + return json.Unmarshal(b, i) + }) +} diff --git a/deserialization/json_test.go b/deserialization/json_test.go new file mode 100644 index 00000000..5782a9d6 --- /dev/null +++ b/deserialization/json_test.go @@ -0,0 +1,27 @@ +package deserialization + +import ( + "reflect" + "testing" +) + +func TestJSONDeserializer(t *testing.T) { + tests := []struct { + name string + want Deserializer + }{ + { + name: "Creating new JSON Deserializer", + want: DeserializeFunc(func(b []byte, i interface{}) error { + return nil + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := JSONDeserializer(); reflect.TypeOf(got) != reflect.TypeOf(tt.want) { + t.Errorf("JSONDeserializer() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/deserialization/proto.go b/deserialization/proto.go new file mode 100644 index 00000000..e25b97f8 --- /dev/null +++ b/deserialization/proto.go @@ -0,0 +1,20 @@ +package deserialization + +import ( + "errors" + + "google.golang.org/protobuf/proto" +) + +var ErrInvalidProtoMessage = errors.New("invalld proto message") + +func ProtoDeserilizer() Deserializer { + return DeserializeFunc(func(b []byte, i interface{}) error { + msg, ok := i.(proto.Message) + if !ok { + return ErrInvalidProtoMessage + } + return proto.Unmarshal(b, msg) + }) + +} diff --git a/deserialization/proto_test.go b/deserialization/proto_test.go new file mode 100644 index 00000000..f50c7930 --- /dev/null +++ b/deserialization/proto_test.go @@ -0,0 +1,27 @@ +package deserialization + +import ( + "reflect" + "testing" +) + +func TestProtoDeserilizer(t *testing.T) { + tests := []struct { + name string + want Deserializer + }{ + { + name: "Create new proto Deserializer", + want: DeserializeFunc(func(b []byte, i interface{}) error { + return nil + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ProtoDeserilizer(); reflect.TypeOf(got) != reflect.TypeOf(tt.want) { + t.Errorf("ProtoDeserilizer() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/docker-compose.yml b/docker-compose.yml index ca1e1174..1b2952d5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -50,5 +50,6 @@ services: - kafka ports: - "8080:8080" + - "8081:8081" networks: - cs-network diff --git a/docs/example/main.go b/docs/example/main.go index 6bbe4cdf..a0f4de33 100644 --- a/docs/example/main.go +++ b/docs/example/main.go @@ -3,7 +3,7 @@ package main import ( "fmt" "net/http" - pb "raccoon/websocket/proto" + pb "raccoon/proto" "time" "github.com/gorilla/websocket" diff --git a/go.mod b/go.mod index b3b34608..9caca936 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,10 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/sirupsen/logrus v1.6.0 github.com/spf13/viper v1.7.0 - github.com/stretchr/testify v1.6.0 - golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb // indirect - google.golang.org/protobuf v1.26.0 + github.com/stretchr/testify v1.7.0 + golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3 // indirect + google.golang.org/grpc v1.41.0 + google.golang.org/protobuf v1.27.1 gopkg.in/alexcesaro/statsd.v2 v2.0.0 gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2 ) diff --git a/go.sum b/go.sum index 8c7d7632..7f3cf987 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,7 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= @@ -24,8 +25,13 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/confluentinc/confluent-kafka-go v1.4.2 h1:13EK9RTujF7lVkvHQ5Hbu6bM+Yfrq8L0MkJNnjHSd4Q= github.com/confluentinc/confluent-kafka-go v1.4.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -38,6 +44,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -57,18 +69,31 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= @@ -80,6 +105,7 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -148,12 +174,14 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= @@ -181,8 +209,9 @@ github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho= -github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -190,6 +219,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -198,6 +228,7 @@ golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -230,9 +261,12 @@ golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -252,8 +286,9 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= -golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3 h1:T6tyxxvHMj2L1R2kZg0uNMpS8ZhB9lRa9XRGTCSA65w= +golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -270,6 +305,7 @@ golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= @@ -278,8 +314,9 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= @@ -298,12 +335,31 @@ google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.41.0 h1:f+PlOh7QV4iIJkPrx5NQ7qaNGFQ3OTse67yaDHfju4E= +google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= @@ -318,6 +374,8 @@ gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= @@ -325,5 +383,6 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/http/grpc/handler.go b/http/grpc/handler.go new file mode 100644 index 00000000..06e2086d --- /dev/null +++ b/http/grpc/handler.go @@ -0,0 +1,66 @@ +package grpc + +import ( + "context" + "errors" + "fmt" + "raccoon/collection" + "raccoon/config" + "raccoon/identification" + "raccoon/metrics" + pb "raccoon/proto" + "time" + + "google.golang.org/grpc/metadata" +) + +type Handler struct { + C collection.Collector + pb.UnimplementedEventServiceServer +} + +func (h *Handler) SendEvent(ctx context.Context, req *pb.EventRequest) (*pb.EventResponse, error) { + metadata, _ := metadata.FromIncomingContext(ctx) + groups := metadata.Get(config.ServerWs.ConnGroupHeader) + var group string + if len(groups) > 0 { + group = groups[0] + } else { + group = config.ServerWs.ConnGroupDefault + } + + var id string + ids := metadata.Get(config.ServerWs.ConnIDHeader) + + if len(ids) > 0 { + id = ids[0] + } else { + return nil, errors.New("connection id header missing") + } + + identifier := identification.Identifier{ + ID: id, + Group: group, + } + + timeConsumed := time.Now() + + metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", identifier.Group)) + metrics.Count("events_rx_total", len(req.Events), fmt.Sprintf("conn_group=%s", identifier.Group)) + + h.C.Collect(ctx, &collection.CollectRequest{ + ConnectionIdentifier: &identifier, + TimeConsumed: timeConsumed, + EventRequest: req, + }) + + return &pb.EventResponse{ + Status: pb.Status_SUCCESS, + Code: pb.Code_OK, + SentTime: time.Now().Unix(), + Data: map[string]string{ + "req_guid": req.GetReqGuid(), + }, + }, nil + +} diff --git a/http/grpc/handler_test.go b/http/grpc/handler_test.go new file mode 100644 index 00000000..52956b4e --- /dev/null +++ b/http/grpc/handler_test.go @@ -0,0 +1,114 @@ +package grpc + +import ( + "context" + "raccoon/collection" + "raccoon/config" + "raccoon/logger" + "raccoon/metrics" + pb "raccoon/proto" + "reflect" + "testing" + + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type void struct{} + +func (v void) Write(_ []byte) (int, error) { + return 0, nil +} + +func TestHandler_SendEvent(t *testing.T) { + type fields struct { + C collection.Collector + UnimplementedEventServiceServer pb.UnimplementedEventServiceServer + } + type args struct { + ctx context.Context + req *pb.EventRequest + } + + logger.SetOutput(void{}) + metrics.SetVoid() + collector := new(collection.MockCollector) + ctx := context.Background() + meta := metadata.MD{} + meta.Set(config.ServerWs.ConnGroupHeader, "group") + meta.Set(config.ServerWs.ConnIDHeader, "1235") + sentTime := timestamppb.Now() + req := &pb.EventRequest{ + ReqGuid: "abcd", + SentTime: sentTime, + Events: []*pb.Event{}, + } + contextWithIDGroup := metadata.NewIncomingContext(ctx, meta) + collector.On("Collect", contextWithIDGroup, mock.Anything).Return(nil) + + metaWithoutGroup := metadata.MD{} + metaWithoutGroup.Set(config.ServerWs.ConnIDHeader, "1235") + contextWithoutGroup := metadata.NewIncomingContext(ctx, metaWithoutGroup) + collector.On("Collect", contextWithoutGroup, mock.Anything).Return(nil) + + tests := []struct { + name string + fields fields + args args + want *pb.EventResponse + wantErr bool + }{ + { + name: "Sending normal event", + fields: fields{ + C: collector, + }, + args: args{ + ctx: contextWithIDGroup, + req: req, + }, + want: &pb.EventResponse{ + Status: pb.Status_SUCCESS, + Code: pb.Code_OK, + SentTime: sentTime.Seconds, + Data: map[string]string{ + "req_guid": req.ReqGuid, + }, + }, + }, + { + name: "Sending without group", + fields: fields{ + C: collector, + }, + args: args{ + ctx: contextWithoutGroup, + req: req, + }, + want: &pb.EventResponse{ + Status: pb.Status_SUCCESS, + Code: pb.Code_OK, + SentTime: sentTime.Seconds, + Data: map[string]string{ + "req_guid": req.ReqGuid, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + h := &Handler{ + C: tt.fields.C, + } + got, err := h.SendEvent(tt.args.ctx, tt.args.req) + if (err != nil) != tt.wantErr { + t.Errorf("Handler.SendEvent() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Handler.SendEvent() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/http/handler.go b/http/handler.go new file mode 100644 index 00000000..d92c4ecc --- /dev/null +++ b/http/handler.go @@ -0,0 +1,13 @@ +package http + +import ( + "raccoon/http/grpc" + "raccoon/http/rest" + "raccoon/http/websocket" +) + +type Handler struct { + wh *websocket.Handler + rh *rest.Handler + gh *grpc.Handler +} diff --git a/http/handler_test.go b/http/handler_test.go new file mode 100644 index 00000000..678248f0 --- /dev/null +++ b/http/handler_test.go @@ -0,0 +1,33 @@ +package http + +import ( + "fmt" + "net/http" + "net/http/httptest" + "raccoon/collection" + "raccoon/http/grpc" + "raccoon/http/rest" + "raccoon/http/websocket" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestPingHandler(t *testing.T) { + hlr := &Handler{ + wh: &websocket.Handler{}, + rh: rest.NewHandler(), + gh: &grpc.Handler{}, + } + collector := new(collection.MockCollector) + collector.On("Collect", mock.Anything, mock.Anything).Return(nil) + ts := httptest.NewServer(Router(hlr, collector)) + defer ts.Close() + req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/ping", ts.URL), nil) + + httpClient := http.Client{} + res, _ := httpClient.Do(req) + + assert.Equal(t, http.StatusOK, res.StatusCode) +} diff --git a/http/rest/handler.go b/http/rest/handler.go new file mode 100644 index 00000000..de239e75 --- /dev/null +++ b/http/rest/handler.go @@ -0,0 +1,141 @@ +package rest + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "raccoon/collection" + "raccoon/config" + "raccoon/deserialization" + "raccoon/identification" + "raccoon/logger" + "raccoon/metrics" + pb "raccoon/proto" + "raccoon/serialization" + "time" +) + +const ( + ContentJSON = "application/json" + ContentProto = "application/proto" +) + +type serDe struct { + serializer serialization.Serializer + deserializer deserialization.Deserializer +} +type Handler struct { + serDeMap map[string]*serDe +} + +func NewHandler() *Handler { + serDeMap := make(map[string]*serDe) + serDeMap[ContentJSON] = &serDe{ + serializer: serialization.JSONSerializer(), + deserializer: deserialization.JSONDeserializer(), + } + + serDeMap[ContentProto] = &serDe{ + serializer: serialization.ProtoSerilizer(), + deserializer: deserialization.ProtoDeserilizer(), + } + return &Handler{ + serDeMap: serDeMap, + } +} + +func (h *Handler) GetRESTAPIHandler(c collection.Collector) http.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) { + contentType := r.Header.Get("Content-Type") + rw.Header().Set("Content-Type", contentType) + + res := &Response{ + EventResponse: &pb.EventResponse{}, + } + + serde, ok := h.serDeMap[contentType] + + if !ok { + metrics.Increment("batches_read_total", "status=failed,reason=unknowncontentype") + logger.Errorf("[rest.GetRESTAPIHandler] invalid content type %s", contentType) + rw.WriteHeader(http.StatusBadRequest) + _, err := res.SetCode(pb.Code_BAD_REQUEST).SetStatus(pb.Status_ERROR).SetReason("invalid content type"). + SetSentTime(time.Now().Unix()).Write(rw, serialization.JSONSerializer()) + if err != nil { + logger.Errorf("[rest.GetRESTAPIHandler] error sending response: %v", err) + } + return + } + d, s := serde.deserializer, serde.serializer + + var group string + group = r.Header.Get(config.ServerWs.ConnGroupHeader) + if group == "" { + group = config.ServerWs.ConnGroupDefault + } + identifier := identification.Identifier{ + ID: r.Header.Get(config.ServerWs.ConnIDHeader), + Group: group, + } + + if r.Body == nil { + metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=emptybody,conn_group=%s", identifier.Group)) + logger.Errorf("[rest.GetRESTAPIHandler] %s no body", identifier) + rw.WriteHeader(http.StatusBadRequest) + _, err := res.SetCode(pb.Code_BAD_REQUEST).SetStatus(pb.Status_ERROR).SetReason("no body present"). + SetSentTime(time.Now().Unix()).Write(rw, s) + if err != nil { + logger.Errorf("[rest.GetRESTAPIHandler] %s error sending response: %v", identifier, err) + } + return + } + + defer io.Copy(ioutil.Discard, r.Body) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + logger.Errorf(fmt.Sprintf("[rest.GetRESTAPIHandler] %s error reading request body, error: %v", identifier, err)) + metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=readerr,conn_group=%s", identifier.Group)) + rw.WriteHeader(http.StatusInternalServerError) + _, err := res.SetCode(pb.Code_INTERNAL_ERROR).SetStatus(pb.Status_ERROR).SetReason("deserialization failure"). + SetSentTime(time.Now().Unix()).Write(rw, s) + if err != nil { + logger.Errorf("[restGetRESTAPIHandler] %s error sending error response: %v", identifier, err) + } + return + } + + timeConsumed := time.Now() + metrics.Count("events_rx_bytes_total", len(b), fmt.Sprintf("conn_group=%s", identifier.Group)) + req := &pb.EventRequest{} + + if err := d.Deserialize(b, req); err != nil { + logger.Errorf("[rest.GetRESTAPIHandler] error while calling d.Deserialize() for %s, error: %s", identifier, err) + metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=serde,conn_group=%s", identifier.Group)) + rw.WriteHeader(http.StatusBadRequest) + _, err := res.SetCode(pb.Code_BAD_REQUEST).SetStatus(pb.Status_ERROR).SetReason("deserialization failure"). + SetSentTime(time.Now().Unix()).Write(rw, s) + if err != nil { + logger.Errorf("[restGetRESTAPIHandler] %s error sending error response: %v", identifier, err) + } + return + } + + metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", identifier.Group)) + metrics.Count("events_rx_total", len(req.Events), fmt.Sprintf("conn_group=%s", identifier.Group)) + + c.Collect(r.Context(), &collection.CollectRequest{ + ConnectionIdentifier: &identifier, + TimeConsumed: timeConsumed, + EventRequest: req, + }) + + _, err = res.SetCode(pb.Code_OK).SetStatus(pb.Status_SUCCESS).SetSentTime(time.Now().Unix()). + SetDataMap(map[string]string{"req_guid": req.ReqGuid}).Write(rw, s) + if err != nil { + logger.Errorf("[restGetRESTAPIHandler] %s error sending error response: %v", identifier, err) + } + } +} diff --git a/http/rest/handler_test.go b/http/rest/handler_test.go new file mode 100644 index 00000000..2f81b821 --- /dev/null +++ b/http/rest/handler_test.go @@ -0,0 +1,39 @@ +package rest + +import ( + "net/http" + "raccoon/collection" + "reflect" + "testing" +) + +func TestHandler_GetRESTAPIHandler(t *testing.T) { + + collector := &collection.MockCollector{} + type args struct { + c collection.Collector + } + tests := []struct { + name string + h *Handler + args args + want http.HandlerFunc + }{ + { + name: "Return a REST API Handler", + h: &Handler{}, + args: args{ + c: collector, + }, + want: http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {}), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + h := &Handler{} + if got := h.GetRESTAPIHandler(tt.args.c); reflect.TypeOf(got) != reflect.TypeOf(tt.want) { + t.Errorf("Handler.GetRESTAPIHandler() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/http/rest/response.go b/http/rest/response.go new file mode 100644 index 00000000..275f4048 --- /dev/null +++ b/http/rest/response.go @@ -0,0 +1,44 @@ +package rest + +import ( + "io" + pb "raccoon/proto" + "raccoon/serialization" +) + +type Response struct { + *pb.EventResponse +} + +func (r *Response) SetCode(code pb.Code) *Response { + r.Code = code + return r +} + +func (r *Response) SetStatus(status pb.Status) *Response { + r.Status = status + return r +} + +func (r *Response) SetSentTime(sentTime int64) *Response { + r.SentTime = sentTime + return r +} + +func (r *Response) SetReason(reason string) *Response { + r.Reason = reason + return r +} + +func (r *Response) SetDataMap(data map[string]string) *Response { + r.Data = data + return r +} + +func (r *Response) Write(w io.Writer, s serialization.Serializer) (int, error) { + b, err := s.Serialize(r) + if err != nil { + return 0, err + } + return w.Write(b) +} diff --git a/http/rest/response_test.go b/http/rest/response_test.go new file mode 100644 index 00000000..018d0ce4 --- /dev/null +++ b/http/rest/response_test.go @@ -0,0 +1,284 @@ +package rest + +import ( + "bytes" + "errors" + pb "raccoon/proto" + "raccoon/serialization" + "reflect" + "testing" + "time" +) + +func TestResponse_SetCode(t *testing.T) { + type fields struct { + EventResponse *pb.EventResponse + } + type args struct { + code pb.Code + } + tests := []struct { + name string + fields fields + args args + want *Response + }{ + { + name: "sets response code", + fields: fields{ + EventResponse: &pb.EventResponse{}, + }, + args: args{ + code: pb.Code_UNKNOWN_CODE, + }, + want: &Response{ + EventResponse: &pb.EventResponse{ + Code: pb.Code_UNKNOWN_CODE, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Response{ + EventResponse: tt.fields.EventResponse, + } + if got := r.SetCode(tt.args.code); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Response.SetCode() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestResponse_SetStatus(t *testing.T) { + type fields struct { + EventResponse *pb.EventResponse + } + type args struct { + status pb.Status + } + tests := []struct { + name string + fields fields + args args + want *Response + }{ + { + name: "set status", + fields: fields{ + EventResponse: &pb.EventResponse{}, + }, + args: args{ + status: pb.Status_SUCCESS, + }, + want: &Response{ + EventResponse: &pb.EventResponse{ + Status: pb.Status_SUCCESS, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Response{ + EventResponse: tt.fields.EventResponse, + } + if got := r.SetStatus(tt.args.status); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Response.SetStatus() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestResponse_SetSentTime(t *testing.T) { + timeNow := time.Now().Unix() + type fields struct { + EventResponse *pb.EventResponse + } + type args struct { + sentTime int64 + } + tests := []struct { + name string + fields fields + args args + want *Response + }{ + { + name: "set sent time", + fields: fields{ + EventResponse: &pb.EventResponse{}, + }, + args: args{ + sentTime: timeNow, + }, + want: &Response{ + EventResponse: &pb.EventResponse{ + SentTime: timeNow, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Response{ + EventResponse: tt.fields.EventResponse, + } + if got := r.SetSentTime(tt.args.sentTime); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Response.SetSentTime() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestResponse_SetReason(t *testing.T) { + type fields struct { + EventResponse *pb.EventResponse + } + type args struct { + reason string + } + tests := []struct { + name string + fields fields + args args + want *Response + }{ + { + name: "set reason", + fields: fields{ + EventResponse: &pb.EventResponse{}, + }, + args: args{ + reason: "test reason", + }, + want: &Response{ + EventResponse: &pb.EventResponse{ + Reason: "test reason", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Response{ + EventResponse: tt.fields.EventResponse, + } + if got := r.SetReason(tt.args.reason); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Response.SetReason() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestResponse_SetDataMap(t *testing.T) { + type fields struct { + EventResponse *pb.EventResponse + } + type args struct { + data map[string]string + } + tests := []struct { + name string + fields fields + args args + want *Response + }{ + { + name: "set data map", + fields: fields{ + EventResponse: &pb.EventResponse{}, + }, + args: args{ + data: map[string]string{"test_key": "test_value"}, + }, + want: &Response{ + EventResponse: &pb.EventResponse{ + Data: map[string]string{"test_key": "test_value"}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Response{ + EventResponse: tt.fields.EventResponse, + } + if got := r.SetDataMap(tt.args.data); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Response.SetDataMap() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestResponse_Write(t *testing.T) { + s := &serialization.MockSerializer{} + res := &pb.EventResponse{ + Status: pb.Status_SUCCESS, + Code: pb.Code_OK, + SentTime: time.Now().Unix(), + Data: map[string]string{}, + } + s.On("Serialize", &Response{res}).Return("1", nil) + + errorRes := &pb.EventResponse{} + s.On("Serialize", &Response{errorRes}).Return("", errors.New("serialization failure")) + type fields struct { + EventResponse *pb.EventResponse + } + type args struct { + s serialization.Serializer + } + tests := []struct { + name string + fields fields + args args + want int + wantW string + wantErr bool + }{ + { + name: "test normal write", + fields: fields{ + EventResponse: res, + }, + args: args{ + s: s, + }, + want: 1, + wantW: "1", + wantErr: false, + }, + { + name: "seralization error", + fields: fields{ + EventResponse: errorRes, + }, + args: args{ + s: s, + }, + want: 0, + wantW: "", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Response{ + EventResponse: tt.fields.EventResponse, + } + w := &bytes.Buffer{} + got, err := r.Write(w, tt.args.s) + if (err != nil) != tt.wantErr { + t.Errorf("Response.Write() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("Response.Write() = %v, want %v", got, tt.want) + } + if gotW := w.String(); gotW != tt.wantW { + t.Errorf("Response.Write() = %v, want %v", gotW, tt.wantW) + } + }) + } +} diff --git a/http/server.go b/http/server.go new file mode 100644 index 00000000..1ad86aaa --- /dev/null +++ b/http/server.go @@ -0,0 +1,124 @@ +package http + +import ( + "context" + "fmt" + "net" + "net/http" + _ "net/http/pprof" + "raccoon/collection" + "raccoon/config" + raccoongrpc "raccoon/http/grpc" + "raccoon/http/rest" + "raccoon/http/websocket" + "raccoon/http/websocket/connection" + "raccoon/logger" + "raccoon/metrics" + pb "raccoon/proto" + "runtime" + "time" + + "github.com/gorilla/mux" + "google.golang.org/grpc" +) + +type Servers struct { + HTTPServer *http.Server + table *connection.Table + pingChannel chan connection.Conn + GRPCServer *grpc.Server +} + +func (s *Servers) StartServers(ctx context.Context, cancel context.CancelFunc) { + go func() { + logger.Info("HTTP Server --> startServers") + err := s.HTTPServer.ListenAndServe() + if err != http.ErrServerClosed { + logger.Errorf("HTTP Server --> HTTP Server could not be started = %s", err.Error()) + cancel() + } + }() + go func() { + lis, err := net.Listen("tcp", fmt.Sprintf(":%s", config.ServerGRPC.Port)) + if err != nil { + logger.Errorf("GRPC Server --> GRPC Server could not be started = %s", err.Error()) + cancel() + } + logger.Info("GRPC Server -> startServers") + if err := s.GRPCServer.Serve(lis); err != nil { + logger.Errorf("GRPC Server --> GRPC Server could not be started = %s", err.Error()) + cancel() + } + }() + go s.ReportServerMetrics() + go websocket.Pinger(s.pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval) + go func() { + if err := http.ListenAndServe("localhost:6060", nil); err != nil { + logger.Errorf("WebSocket Server --> pprof could not be enabled: %s", err.Error()) + cancel() + } else { + logger.Info("WebSocket Server --> pprof :: Enabled") + } + }() +} + +func (s *Servers) ReportServerMetrics() { + t := time.Tick(config.MetricStatsd.FlushPeriodMs) + m := &runtime.MemStats{} + for { + <-t + for k, v := range s.table.TotalConnectionPerGroup() { + metrics.Gauge("connections_count_current", v, fmt.Sprintf("conn_group=%s", k)) + } + metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), "") + + runtime.ReadMemStats(m) + metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, "") + metrics.Gauge("server_mem_heap_inuse_bytes_current", m.HeapInuse, "") + metrics.Gauge("server_mem_heap_objects_total_current", m.HeapObjects, "") + metrics.Gauge("server_mem_stack_inuse_bytes_current", m.StackInuse, "") + metrics.Gauge("server_mem_gc_triggered_current", m.LastGC/1000, "") + metrics.Gauge("server_mem_gc_pauseNs_current", m.PauseNs[(m.NumGC+255)%256]/1000, "") + metrics.Gauge("server_mem_gc_count_current", m.NumGC, "") + metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, "") + } +} + +//CreateServer - instantiates the http server +func CreateServer(bufferChannel chan *collection.CollectRequest) *Servers { + //create the websocket handler that upgrades the http request + collector := collection.NewChannelCollector(bufferChannel) + pingChannel := make(chan connection.Conn, config.ServerWs.ServerMaxConn) + wsHandler := websocket.NewHandler(pingChannel) + restHandler := rest.NewHandler() + grpcHandler := &raccoongrpc.Handler{C: collector} + handler := &Handler{wsHandler, restHandler, grpcHandler} + grpcServer := grpc.NewServer() + servers := &Servers{ + HTTPServer: &http.Server{ + Handler: Router(handler, collector), + Addr: ":" + config.ServerWs.AppPort, + }, + table: wsHandler.Table(), + pingChannel: pingChannel, + GRPCServer: grpcServer, + } + pb.RegisterEventServiceServer(grpcServer, handler.gh) + //Wrap the handler with a Server instance and return it + return servers +} + +func PingHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("pong")) +} + +// Router sets up the routes +func Router(h *Handler, collector collection.Collector) http.Handler { + router := mux.NewRouter() + router.Path("/ping").HandlerFunc(PingHandler).Methods(http.MethodGet) + subRouter := router.PathPrefix("/api/v1").Subrouter() + subRouter.HandleFunc("/events", h.wh.GetHandlerWSEvents(collector)).Methods(http.MethodGet).Name("events") + subRouter.HandleFunc("/events", h.rh.GetRESTAPIHandler(collector)).Methods(http.MethodPost).Name("events") + return router +} diff --git a/websocket/connection/conn.go b/http/websocket/connection/conn.go similarity index 90% rename from websocket/connection/conn.go rename to http/websocket/connection/conn.go index a3b0ac3f..9c603572 100644 --- a/websocket/connection/conn.go +++ b/http/websocket/connection/conn.go @@ -2,6 +2,7 @@ package connection import ( "fmt" + "raccoon/identification" "raccoon/logger" "raccoon/metrics" "time" @@ -10,7 +11,7 @@ import ( ) type Conn struct { - Identifier Identifier + Identifier identification.Identifier conn *websocket.Conn connectedAt time.Time closeHook func(c Conn) @@ -35,7 +36,7 @@ func (c *Conn) Close() { } func (c *Conn) calculateSessionTime() { - connectionTime := time.Now().Sub(c.connectedAt) + connectionTime := time.Since(c.connectedAt) logger.Debugf("[websocket.calculateSessionTime] %s, total time connected in minutes: %v", c.Identifier, connectionTime.Minutes()) metrics.Timing("user_session_duration_milliseconds", connectionTime.Milliseconds(), fmt.Sprintf("conn_group=%s", c.Identifier.Group)) } diff --git a/websocket/connection/table.go b/http/websocket/connection/table.go similarity index 64% rename from websocket/connection/table.go rename to http/websocket/connection/table.go index 60644e89..0b46fb45 100644 --- a/websocket/connection/table.go +++ b/http/websocket/connection/table.go @@ -2,12 +2,18 @@ package connection import ( "errors" + "raccoon/identification" "sync" ) +var ( + errMaxConnectionReached = errors.New("max connection reached") + errConnDuplicated = errors.New("duplicated connection") +) + type Table struct { m *sync.RWMutex - connMap map[Identifier]struct{} + connMap map[identification.Identifier]struct{} counter map[string]int maxUser int } @@ -15,26 +21,26 @@ type Table struct { func NewTable(maxUser int) *Table { return &Table{ m: &sync.RWMutex{}, - connMap: make(map[Identifier]struct{}), + connMap: make(map[identification.Identifier]struct{}), maxUser: maxUser, counter: make(map[string]int), } } -func (t *Table) Exists(c Identifier) bool { +func (t *Table) Exists(c identification.Identifier) bool { t.m.Lock() defer t.m.Unlock() _, ok := t.connMap[c] return ok } -func (t *Table) Store(c Identifier) error { +func (t *Table) Store(c identification.Identifier) error { t.m.Lock() defer t.m.Unlock() if len(t.connMap) >= t.maxUser { return errMaxConnectionReached } - if _, ok := t.connMap[c]; ok == true { + if _, ok := t.connMap[c]; ok { return errConnDuplicated } t.connMap[c] = struct{}{} @@ -42,7 +48,7 @@ func (t *Table) Store(c Identifier) error { return nil } -func (t *Table) Remove(c Identifier) { +func (t *Table) Remove(c identification.Identifier) { t.m.Lock() defer t.m.Unlock() delete(t.connMap, c) @@ -58,7 +64,3 @@ func (t *Table) TotalConnection() int { func (t *Table) TotalConnectionPerGroup() map[string]int { return t.counter } - -var errMaxConnectionReached = errors.New("max connection reached") - -var errConnDuplicated = errors.New("duplicated connection") diff --git a/http/websocket/connection/table_test.go b/http/websocket/connection/table_test.go new file mode 100644 index 00000000..af5178bc --- /dev/null +++ b/http/websocket/connection/table_test.go @@ -0,0 +1,50 @@ +package connection + +import ( + "raccoon/identification" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConnectionPerGroup(t *testing.T) { + t.Run("Should return all the group on the table with the count", func(t *testing.T) { + table := NewTable(10) + table.Store(identification.Identifier{ID: "user1", Group: "group1"}) + table.Store(identification.Identifier{ID: "user2", Group: "group1"}) + table.Store(identification.Identifier{ID: "user3", Group: "group1"}) + table.Store(identification.Identifier{ID: "user1", Group: "group2"}) + table.Store(identification.Identifier{ID: "user2", Group: "group2"}) + assert.Equal(t, map[string]int{"group1": 3, "group2": 2}, table.TotalConnectionPerGroup()) + }) +} + +func TestStore(t *testing.T) { + t.Run("Should store new connection", func(t *testing.T) { + table := NewTable(10) + err := table.Store(identification.Identifier{ID: "user1", Group: ""}) + assert.NoError(t, err) + assert.True(t, table.Exists(identification.Identifier{ID: "user1"})) + }) + + t.Run("Should return max connection reached error when connection is maxed", func(t *testing.T) { + table := NewTable(0) + err := table.Store(identification.Identifier{ID: "user1", Group: ""}) + assert.Error(t, err, errMaxConnectionReached) + }) + + t.Run("Should return duplicated error when connection already exists", func(t *testing.T) { + table := NewTable(2) + err := table.Store(identification.Identifier{ID: "user1", Group: ""}) + assert.NoError(t, err) + err = table.Store(identification.Identifier{ID: "user1", Group: ""}) + assert.Error(t, err, errConnDuplicated) + }) + + t.Run("Should remove connection when identifier match", func(t *testing.T) { + table := NewTable(10) + table.Store(identification.Identifier{ID: "user1", Group: ""}) + table.Remove(identification.Identifier{ID: "user1", Group: ""}) + assert.False(t, table.Exists(identification.Identifier{ID: "user1", Group: ""})) + }) +} diff --git a/websocket/connection/upgrader.go b/http/websocket/connection/upgrader.go similarity index 93% rename from websocket/connection/upgrader.go rename to http/websocket/connection/upgrader.go index e2db4f26..0efe5487 100644 --- a/websocket/connection/upgrader.go +++ b/http/websocket/connection/upgrader.go @@ -4,13 +4,14 @@ import ( "errors" "fmt" "net/http" + "raccoon/identification" "raccoon/logger" "raccoon/metrics" - pb "raccoon/websocket/proto" + pb "raccoon/proto" "time" - "github.com/golang/protobuf/proto" "github.com/gorilla/websocket" + "google.golang.org/protobuf/proto" ) type Upgrader struct { @@ -37,7 +38,7 @@ type UpgraderConfig struct { func NewUpgrader(conf UpgraderConfig) *Upgrader { var checkOriginFunc func(r *http.Request) bool - if conf.CheckOrigin == false { + if !conf.CheckOrigin { checkOriginFunc = func(r *http.Request) bool { return true } @@ -98,7 +99,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request) (Conn, error) }}, nil } -func (u *Upgrader) setUpControlHandlers(conn *websocket.Conn, identifier Identifier) { +func (u *Upgrader) setUpControlHandlers(conn *websocket.Conn, identifier identification.Identifier) { //expects the client to send a ping, mark this channel as idle timed out post the deadline conn.SetReadDeadline(time.Now().Add(u.pongWaitInterval)) conn.SetPongHandler(func(string) error { @@ -117,14 +118,14 @@ func (u *Upgrader) setUpControlHandlers(conn *websocket.Conn, identifier Identif }) } -func (u *Upgrader) newIdentifier(h http.Header) Identifier { +func (u *Upgrader) newIdentifier(h http.Header) identification.Identifier { // If connGroupHeader is empty string. By default, it will always return an empty string as Group. This means the group is fallback to default value. var group = h.Get(u.connGroupHeader) if group == "" { group = u.connGroupDefault } - return Identifier{ - ID: h.Get(u.connIDHeader), + return identification.Identifier{ + ID: h.Get(u.connIDHeader), Group: group, } } diff --git a/websocket/connection/upgrader_test.go b/http/websocket/connection/upgrader_test.go similarity index 100% rename from websocket/connection/upgrader_test.go rename to http/websocket/connection/upgrader_test.go diff --git a/http/websocket/handler.go b/http/websocket/handler.go new file mode 100644 index 00000000..686ac974 --- /dev/null +++ b/http/websocket/handler.go @@ -0,0 +1,143 @@ +package websocket + +import ( + "fmt" + "net/http" + "raccoon/collection" + "raccoon/config" + "raccoon/deserialization" + "raccoon/http/websocket/connection" + "raccoon/logger" + "raccoon/metrics" + "raccoon/serialization" + "time" + + pb "raccoon/proto" + + "github.com/gorilla/websocket" +) + +type serDe struct { + serializer serialization.Serializer + deserializer deserialization.Deserializer +} +type Handler struct { + upgrader *connection.Upgrader + serdeMap map[int]*serDe + PingChannel chan connection.Conn +} + +func getSerDeMap() map[int]*serDe { + serDeMap := make(map[int]*serDe) + serDeMap[websocket.BinaryMessage] = &serDe{ + serializer: serialization.ProtoSerilizer(), + deserializer: deserialization.ProtoDeserilizer(), + } + + serDeMap[websocket.TextMessage] = &serDe{ + serializer: serialization.JSONSerializer(), + deserializer: deserialization.JSONDeserializer(), + } + return serDeMap +} + +func NewHandler(pingC chan connection.Conn) *Handler { + ugConfig := connection.UpgraderConfig{ + ReadBufferSize: config.ServerWs.ReadBufferSize, + WriteBufferSize: config.ServerWs.WriteBufferSize, + CheckOrigin: config.ServerWs.CheckOrigin, + MaxUser: config.ServerWs.ServerMaxConn, + PongWaitInterval: config.ServerWs.PongWaitInterval, + WriteWaitInterval: config.ServerWs.WriteWaitInterval, + ConnIDHeader: config.ServerWs.ConnIDHeader, + ConnGroupHeader: config.ServerWs.ConnGroupHeader, + } + + upgrader := connection.NewUpgrader(ugConfig) + return &Handler{ + upgrader: upgrader, + serdeMap: getSerDeMap(), + PingChannel: pingC, + } +} + +func (h *Handler) Table() *connection.Table { + return h.upgrader.Table +} + +//HandlerWSEvents handles the upgrade and the events sent by the peers +func (h *Handler) GetHandlerWSEvents(collector collection.Collector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + conn, err := h.upgrader.Upgrade(w, r) + if err != nil { + logger.Debugf("[websocket.Handler] %v", err) + return + } + defer conn.Close() + h.PingChannel <- conn + for { + messageType, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseGoingAway, + websocket.CloseNormalClosure, + websocket.CloseNoStatusReceived, + websocket.CloseAbnormalClosure) { + logger.Error(fmt.Sprintf("[websocket.Handler] %s closed abruptly: %v", conn.Identifier, err)) + metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=closeerror,conn_group=%s", conn.Identifier.Group)) + break + } + metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=unknown,conn_group=%s", conn.Identifier.Group)) + logger.Error(fmt.Sprintf("[websocket.Handler] reading message failed. Unknown failure for %s: %v", conn.Identifier, err)) //no connection issue here + break + } + + timeConsumed := time.Now() + metrics.Count("events_rx_bytes_total", len(message), fmt.Sprintf("conn_group=%s", conn.Identifier.Group)) + payload := &pb.EventRequest{} + serde := h.serdeMap[messageType] + d, s := serde.deserializer, serde.serializer + if err := d.Deserialize(message, payload); err != nil { + logger.Error(fmt.Sprintf("[websocket.Handler] reading message failed for %s: %v", conn.Identifier, err)) + metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=serde,conn_group=%s", conn.Identifier.Group)) + writeBadRequestResponse(conn, s, messageType, err) + continue + } + metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", conn.Identifier.Group)) + metrics.Count("events_rx_total", len(payload.Events), fmt.Sprintf("conn_group=%s", conn.Identifier.Group)) + collector.Collect(r.Context(), &collection.CollectRequest{ + ConnectionIdentifier: &conn.Identifier, + TimeConsumed: timeConsumed, + EventRequest: payload, + }) + writeSuccessResponse(conn, s, messageType, payload.ReqGuid) + } + } + +} + +func writeSuccessResponse(conn connection.Conn, serializer serialization.Serializer, messageType int, requestGUID string) { + response := &pb.EventResponse{ + Status: pb.Status_SUCCESS, + Code: pb.Code_OK, + SentTime: time.Now().Unix(), + Reason: "", + Data: map[string]string{ + "req_guid": requestGUID, + }, + } + success, _ := serializer.Serialize(response) + conn.WriteMessage(messageType, success) +} + +func writeBadRequestResponse(conn connection.Conn, serializer serialization.Serializer, messageType int, err error) { + response := &pb.EventResponse{ + Status: pb.Status_ERROR, + Code: pb.Code_BAD_REQUEST, + SentTime: time.Now().Unix(), + Reason: fmt.Sprintf("cannot deserialize request: %s", err), + Data: nil, + } + + failure, _ := serializer.Serialize(response) + conn.WriteMessage(messageType, failure) +} diff --git a/websocket/handler_test.go b/http/websocket/handler_test.go similarity index 51% rename from websocket/handler_test.go rename to http/websocket/handler_test.go index 289b3a5f..1737d1dd 100644 --- a/websocket/handler_test.go +++ b/http/websocket/handler_test.go @@ -1,24 +1,25 @@ package websocket import ( - "fmt" "net/http" "net/http/httptest" - "os" + "raccoon/collection" + "raccoon/http/websocket/connection" "raccoon/logger" "raccoon/metrics" + pb "raccoon/proto" + "reflect" "strings" "testing" "time" - "raccoon/websocket/connection" - pb "raccoon/websocket/proto" - - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" + "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" ) type void struct{} @@ -29,21 +30,84 @@ func (v void) Write(_ []byte) (int, error) { func TestMain(t *testing.M) { logger.SetOutput(void{}) metrics.SetVoid() - os.Exit(t.Run()) } -func TestPingHandler(t *testing.T) { - ts := httptest.NewServer(Router(nil)) - defer ts.Close() - req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/ping", ts.URL), nil) +func TestNewHandler(t *testing.T) { + type args struct { + pingC chan connection.Conn + } - httpClient := http.Client{} - res, _ := httpClient.Do(req) + ugConfig := connection.UpgraderConfig{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: false, + MaxUser: 100, + PongWaitInterval: 60, + WriteWaitInterval: 60, + ConnIDHeader: "x-conn-id", + ConnGroupHeader: "x-group", + } + pingC := make(chan connection.Conn) + tests := []struct { + name string + args args + want *Handler + }{ + { + name: "creating a new handler", + args: args{ + pingC: pingC, + }, + want: &Handler{ + upgrader: connection.NewUpgrader(ugConfig), + PingChannel: pingC, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewHandler(tt.args.pingC); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewHandler() = %v, want %v", got, tt.want) + } + }) + } +} - assert.Equal(t, http.StatusOK, res.StatusCode) +func TestHandler_Table(t *testing.T) { + table := &connection.Table{} + type fields struct { + upgrader *connection.Upgrader + PingChannel chan connection.Conn + } + tests := []struct { + name string + fields fields + want *connection.Table + }{ + { + name: "return table", + fields: fields{ + upgrader: &connection.Upgrader{ + Table: table, + }, + }, + want: table, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + h := &Handler{ + upgrader: tt.fields.upgrader, + PingChannel: tt.fields.PingChannel, + } + if got := h.Table(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Handler.Table() = %v, want %v", got, tt.want) + } + }) + } } -func TestHandler_HandlerWSEvents(t *testing.T) { +func TestHandler_GETHandlerWSEvents(t *testing.T) { // ---- Setup ---- upgrader := connection.NewUpgrader(connection.UpgraderConfig{ ReadBufferSize: 10240, @@ -56,11 +120,11 @@ func TestHandler_HandlerWSEvents(t *testing.T) { ConnGroupHeader: "string", }) hlr := &Handler{ - upgrader: upgrader, - bufferChannel: make(chan EventsBatch, 10), - PingChannel: make(chan connection.Conn, 100), + upgrader: upgrader, + + PingChannel: make(chan connection.Conn, 100), } - ts := httptest.NewServer(Router(hlr)) + ts := httptest.NewServer(getRouter(hlr)) defer ts.Close() url := "ws" + strings.TrimPrefix(ts.URL+"/api/v1/events", "http") @@ -69,7 +133,7 @@ func TestHandler_HandlerWSEvents(t *testing.T) { } t.Run("Should return success response after successfully push to channel", func(t *testing.T) { - ts = httptest.NewServer(Router(hlr)) + ts = httptest.NewServer(getRouter(hlr)) defer ts.Close() wss, _, err := websocket.DefaultDialer.Dial(url, header) @@ -77,7 +141,7 @@ func TestHandler_HandlerWSEvents(t *testing.T) { request := &pb.EventRequest{ ReqGuid: "1234", - SentTime: ptypes.TimestampNow(), + SentTime: timestamppb.Now(), Events: nil, } serializedRequest, _ := proto.Marshal(request) @@ -99,7 +163,7 @@ func TestHandler_HandlerWSEvents(t *testing.T) { }) t.Run("Should return unknown request when request fail to deserialize", func(t *testing.T) { - ts = httptest.NewServer(Router(hlr)) + ts = httptest.NewServer(getRouter(hlr)) defer ts.Close() wss, _, err := websocket.DefaultDialer.Dial(url, http.Header{ @@ -122,3 +186,12 @@ func TestHandler_HandlerWSEvents(t *testing.T) { assert.Empty(t, resp.GetData()) }) } + +func getRouter(hlr *Handler) http.Handler { + collector := new(collection.MockCollector) + collector.On("Collect", mock.Anything, mock.Anything).Return(nil) + router := mux.NewRouter() + subRouter := router.PathPrefix("/api/v1").Subrouter() + subRouter.HandleFunc("/events", hlr.GetHandlerWSEvents(collector)).Methods(http.MethodGet).Name("events") + return router +} diff --git a/websocket/pinger.go b/http/websocket/pinger.go similarity index 87% rename from websocket/pinger.go rename to http/websocket/pinger.go index fb259f17..7b20a701 100644 --- a/websocket/pinger.go +++ b/http/websocket/pinger.go @@ -2,9 +2,10 @@ package websocket import ( "fmt" + "raccoon/http/websocket/connection" + "raccoon/identification" "raccoon/logger" "raccoon/metrics" - "raccoon/websocket/connection" "time" ) @@ -12,7 +13,7 @@ import ( func Pinger(c chan connection.Conn, size int, PingInterval time.Duration, WriteWaitInterval time.Duration) { for i := 0; i < size; i++ { go func() { - cSet := make(map[connection.Identifier]connection.Conn) + cSet := make(map[identification.Identifier]connection.Conn) ticker := time.NewTicker(PingInterval) for { select { diff --git a/websocket/connection/identifier.go b/identification/identifier.go similarity index 88% rename from websocket/connection/identifier.go rename to identification/identifier.go index cff9a64f..23d81ed7 100644 --- a/websocket/connection/identifier.go +++ b/identification/identifier.go @@ -1,4 +1,4 @@ -package connection +package identification import ( "fmt" diff --git a/integration/integration_test.go b/integration/integration_test.go index 29389754..deeccb4e 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -1,33 +1,43 @@ package integration import ( + "bytes" + "context" + "encoding/json" "fmt" + "io" + "io/ioutil" "math/rand" "net/http" "os" "testing" "time" - pb "raccoon/websocket/proto" + pb "raccoon/proto" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) var uuid string var timeout time.Duration var topicFormat string -var url string +var url, wsurl string var bootstrapServers string +var grpcServerAddr string func TestMain(m *testing.M) { uuid = fmt.Sprintf("%d-test", rand.Int()) timeout = 20 * time.Second topicFormat = os.Getenv("INTEGTEST_TOPIC_FORMAT") - url = fmt.Sprintf("%v/api/v1/events", os.Getenv("INTEGTEST_HOST")) + wsurl = fmt.Sprintf("ws://%v/api/v1/events", os.Getenv("INTEGTEST_HOST")) + url = fmt.Sprintf("http://%v/api/v1/events", os.Getenv("INTEGTEST_HOST")) + grpcServerAddr = os.Getenv("GRPC_SERVER_ADDR") bootstrapServers = os.Getenv("INTEGTEST_BOOTSTRAP_SERVER") os.Exit(m.Run()) } @@ -39,14 +49,15 @@ func TestIntegration(t *testing.T) { "X-User-ID": []string{"1234"}, } t.Run("Should response with BadRequest when sending invalid request", func(t *testing.T) { - wss, _, err := websocket.DefaultDialer.Dial(url, header) + + wss, _, err := websocket.DefaultDialer.Dial(wsurl, header) if err != nil { assert.Fail(t, fmt.Sprintf("fail to connect. %v", err)) } + defer wss.Close() wss.WriteMessage(websocket.BinaryMessage, []byte{1}) - mType, resp, err := wss.ReadMessage() r := &pb.EventResponse{} _ = proto.Unmarshal(resp, r) @@ -60,12 +71,228 @@ func TestIntegration(t *testing.T) { wss.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(100*time.Millisecond)) }) + t.Run("Should response with BadRequest when sending invalid GRPC request", func(t *testing.T) { + opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} + + conn, err := grpc.Dial(grpcServerAddr, opts...) + if err != nil { + assert.Fail(t, fmt.Sprintf("fail to connect grpc server. %v", err)) + } + defer conn.Close() + + client := pb.NewEventServiceClient(conn) + r, err := client.SendEvent(context.Background(), &pb.EventRequest{}) + + assert.NotEmpty(t, err) + assert.Empty(t, r) + + }) + + t.Run("Should response with BadRequest when sending invalid json request", func(t *testing.T) { + wss, _, err := websocket.DefaultDialer.Dial(wsurl, header) + + if err != nil { + assert.Fail(t, fmt.Sprintf("fail to connect. %v", err)) + } + defer wss.Close() + + wss.WriteMessage(websocket.TextMessage, []byte("")) + + mType, resp, err := wss.ReadMessage() + r := &pb.EventResponse{} + _ = json.Unmarshal(resp, r) + assert.Equal(t, websocket.TextMessage, mType) + assert.Empty(t, err) + assert.Equal(t, pb.Status_ERROR, r.Status) + assert.Equal(t, pb.Code_BAD_REQUEST, r.Code) + assert.NotEmpty(t, r.Reason) + assert.Empty(t, r.Data) + + wss.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(100*time.Millisecond)) + }) + + t.Run("Should response with BadRequest when sending HTTP/1.1 request with empty JSON body", func(t *testing.T) { + client := &http.Client{} + + req, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + assert.Fail(t, fmt.Sprintf("failed to create http request. %v", err)) + os.Exit(1) + } + res, err := client.Do(req) + if err != nil { + assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) + os.Exit(1) + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + r := &pb.EventResponse{} + err = json.NewDecoder(res.Body).Decode(r) + assert.Empty(t, err) + assert.Equal(t, pb.Status_ERROR, r.Status) + assert.Equal(t, pb.Code_BAD_REQUEST, r.Code) + assert.NotEmpty(t, r.Reason) + assert.Empty(t, r.Data) + }) + + t.Run("Should response with BadRequest when sending HTTP/1.1 request with invalid JSON body", func(t *testing.T) { + client := &http.Client{} + + bodyBuf := bytes.NewBuffer([]byte{1}) + req, err := http.NewRequest(http.MethodPost, url, bodyBuf) + if err != nil { + assert.Fail(t, fmt.Sprintf("failed to create http request. %v", err)) + os.Exit(1) + } + res, err := client.Do(req) + if err != nil { + assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) + os.Exit(1) + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + r := &pb.EventResponse{} + err = json.NewDecoder(res.Body).Decode(r) + assert.Empty(t, err) + assert.Equal(t, pb.Status_ERROR, r.Status) + assert.Equal(t, pb.Code_BAD_REQUEST, r.Code) + assert.NotEmpty(t, r.Reason) + assert.Empty(t, r.Data) + }) + + t.Run("Should response with success when HTTP/1.1 REST JSON request is processed", func(t *testing.T) { + client := &http.Client{} + var events []*pb.Event + + eEvent1 := &pb.Event{ + EventBytes: []byte("event_1"), + Type: "type_a", + } + eEvent2 := &pb.Event{ + EventBytes: []byte("event_2"), + Type: "type_b", + } + events = append(events, eEvent1) + events = append(events, eEvent2) + req := &pb.EventRequest{ + ReqGuid: "1234", + SentTime: timestamppb.Now(), + Events: events, + } + buf := &bytes.Buffer{} + json.NewEncoder(buf).Encode(req) + + request, err := http.NewRequest(http.MethodPost, url, buf) + if err != nil { + assert.Fail(t, fmt.Sprintf("failed to create http request. %v", err)) + os.Exit(1) + } + request.Header.Add("Content-Type", "application/json") + res, err := client.Do(request) + if err != nil { + assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) + os.Exit(1) + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + r := &pb.EventResponse{} + err = json.NewDecoder(res.Body).Decode(r) + assert.Empty(t, err) + assert.Equal(t, pb.Code_OK, r.Code) + assert.Equal(t, pb.Status_SUCCESS, r.Status) + assert.Equal(t, r.Reason, "") + assert.Equal(t, r.Data, map[string]string{"req_guid": "1234"}) + + }) + + t.Run("Should response with success when JSON request is processed", func(t *testing.T) { + wss, _, err := websocket.DefaultDialer.Dial(wsurl, header) + + if err != nil { + panic(err) + } + defer wss.Close() + var events []*pb.Event + + eEvent1 := &pb.Event{ + EventBytes: []byte("event_1"), + Type: "type_a", + } + eEvent2 := &pb.Event{ + EventBytes: []byte("event_2"), + Type: "type_b", + } + events = append(events, eEvent1) + events = append(events, eEvent2) + req := &pb.EventRequest{ + ReqGuid: "1234", + SentTime: timestamppb.Now(), + Events: events, + } + bReq, _ := json.Marshal(req) + wss.WriteMessage(websocket.TextMessage, bReq) + + mType, resp, err := wss.ReadMessage() + r := &pb.EventResponse{} + _ = json.Unmarshal(resp, r) + assert.Equal(t, mType, websocket.TextMessage) + assert.Empty(t, err) + assert.Equal(t, r.Code.String(), pb.Code_OK.String()) + assert.Equal(t, r.Status.String(), pb.Status_SUCCESS.String()) + assert.Equal(t, r.Reason, "") + assert.Equal(t, r.Data, map[string]string{"req_guid": "1234"}) + + wss.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(100*time.Millisecond)) + }) + + t.Run("Should response with success when correct GRPC request is processed", func(t *testing.T) { + opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} + + conn, err := grpc.Dial(grpcServerAddr, opts...) + if err != nil { + assert.Fail(t, fmt.Sprintf("fail to connect grpc server. %v", err)) + os.Exit(1) + } + defer conn.Close() + + client := pb.NewEventServiceClient(conn) + var events []*pb.Event + + eEvent1 := &pb.Event{ + EventBytes: []byte("event_1"), + Type: "type_a", + } + eEvent2 := &pb.Event{ + EventBytes: []byte("event_2"), + Type: "type_b", + } + events = append(events, eEvent1) + events = append(events, eEvent2) + req := &pb.EventRequest{ + ReqGuid: "1234", + SentTime: timestamppb.Now(), + Events: events, + } + + md := metadata.New(map[string]string{"X-User-ID": "1234"}) + ctx := metadata.NewOutgoingContext(context.Background(), md) + r, err := client.SendEvent(ctx, req) + assert.Empty(t, err) + assert.Equal(t, r.Code.String(), pb.Code_OK.String()) + assert.Equal(t, r.Status.String(), pb.Status_SUCCESS.String()) + assert.Equal(t, r.Reason, "") + assert.Equal(t, r.Data, map[string]string{"req_guid": "1234"}) + + }) + t.Run("Should response with success when request is processed", func(t *testing.T) { - wss, _, err := websocket.DefaultDialer.Dial(url, header) + wss, _, err := websocket.DefaultDialer.Dial(wsurl, header) if err != nil { panic(err) } + + defer wss.Close() var events []*pb.Event eEvent1 := &pb.Event{ @@ -80,7 +307,7 @@ func TestIntegration(t *testing.T) { events = append(events, eEvent2) req := &pb.EventRequest{ ReqGuid: "1234", - SentTime: ptypes.TimestampNow(), + SentTime: timestamppb.Now(), Events: events, } bReq, _ := proto.Marshal(req) @@ -171,12 +398,13 @@ func TestIntegration(t *testing.T) { }) t.Run("Should close connection when client is unresponsive", func(t *testing.T) { - wss, _, err := websocket.DefaultDialer.Dial(url, header) + wss, _, err := websocket.DefaultDialer.Dial(wsurl, header) if err != nil { assert.Fail(t, err.Error()) } + defer wss.Close() wss.SetPingHandler(func(appData string) error { return nil }) @@ -200,12 +428,11 @@ func TestIntegration(t *testing.T) { t.Run("Should disconnect subsequence connection from same user when already connected", func(t *testing.T) { done := make(chan int) - _, _, err := websocket.DefaultDialer.Dial(url, header) + _, _, err := websocket.DefaultDialer.Dial(wsurl, header) assert.NoError(t, err) - secondWss, _, err := websocket.DefaultDialer.Dial(url, header) - + secondWss, _, err := websocket.DefaultDialer.Dial(wsurl, header) assert.NoError(t, err) secondWss.SetCloseHandler(func(code int, text string) error { @@ -233,14 +460,14 @@ func TestIntegration(t *testing.T) { t.Run("Should accept connections with same user id with different connection group", func(t *testing.T) { done := make(chan int) - _, _, err := websocket.DefaultDialer.Dial(url, http.Header{ + _, _, err := websocket.DefaultDialer.Dial(wsurl, http.Header{ "X-User-ID": []string{"1234"}, "X-User-Group": []string{"viewer"}, }) assert.NoError(t, err) - secondWss, _, err := websocket.DefaultDialer.Dial(url, http.Header{ + secondWss, _, err := websocket.DefaultDialer.Dial(wsurl, http.Header{ "X-User-ID": []string{"1234"}, "X-User-Group": []string{"editor"}, }) diff --git a/publisher/kafka.go b/publisher/kafka.go index d545aeca..02a43452 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -3,15 +3,16 @@ package publisher import ( "encoding/json" "fmt" - pb "raccoon/websocket/proto" + pb "raccoon/proto" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" // Importing librd to make it work on vendor mode - _ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka" "raccoon/config" "raccoon/logger" "raccoon/metrics" "strings" + + _ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka" ) // KafkaProducer Produce data to kafka synchronously diff --git a/publisher/kafka_test.go b/publisher/kafka_test.go index 347666a7..6c800def 100644 --- a/publisher/kafka_test.go +++ b/publisher/kafka_test.go @@ -4,7 +4,7 @@ import ( "fmt" "os" "raccoon/logger" - pb "raccoon/websocket/proto" + pb "raccoon/proto" "testing" "github.com/stretchr/testify/assert" diff --git a/serialization/json.go b/serialization/json.go new file mode 100644 index 00000000..7f4eeb33 --- /dev/null +++ b/serialization/json.go @@ -0,0 +1,9 @@ +package serialization + +import "encoding/json" + +func JSONSerializer() Serializer { + return SerializeFunc(func(m interface{}) ([]byte, error) { + return json.Marshal(m) + }) +} diff --git a/serialization/json_test.go b/serialization/json_test.go new file mode 100644 index 00000000..cfcb492d --- /dev/null +++ b/serialization/json_test.go @@ -0,0 +1,27 @@ +package serialization + +import ( + "reflect" + "testing" +) + +func TestJSONSerializer(t *testing.T) { + tests := []struct { + name string + want Serializer + }{ + { + name: "Initilizing JSON serializer", + want: SerializeFunc(func(m interface{}) ([]byte, error) { + return nil, nil + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := JSONSerializer(); reflect.TypeOf(got) != reflect.TypeOf(tt.want) { + t.Errorf("JSONSerializer() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/serialization/mock.go b/serialization/mock.go new file mode 100644 index 00000000..056ece1b --- /dev/null +++ b/serialization/mock.go @@ -0,0 +1,13 @@ +package serialization + +import "github.com/stretchr/testify/mock" + +type MockSerializer struct { + mock.Mock +} + +func (ms *MockSerializer) Serialize(m interface{}) ([]byte, error) { + args := ms.Called(m) + + return []byte(args.String(0)), args.Error(1) +} diff --git a/serialization/proto.go b/serialization/proto.go new file mode 100644 index 00000000..3b777a35 --- /dev/null +++ b/serialization/proto.go @@ -0,0 +1,21 @@ +package serialization + +import ( + "errors" + + "google.golang.org/protobuf/proto" +) + +var ( + ErrInvalidProtoMessage = errors.New("invalld proto message") +) + +func ProtoSerilizer() Serializer { + return SerializeFunc(func(m interface{}) ([]byte, error) { + msg, ok := m.(proto.Message) + if !ok { + return nil, ErrInvalidProtoMessage + } + return proto.Marshal(msg) + }) +} diff --git a/serialization/proto_test.go b/serialization/proto_test.go new file mode 100644 index 00000000..6e5b0686 --- /dev/null +++ b/serialization/proto_test.go @@ -0,0 +1,27 @@ +package serialization + +import ( + "reflect" + "testing" +) + +func TestProtoDeserilizer(t *testing.T) { + tests := []struct { + name string + want Serializer + }{ + { + name: "initializing Proto Desrializer", + want: SerializeFunc(func(m interface{}) ([]byte, error) { + return nil, nil + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ProtoSerilizer(); reflect.TypeOf(got) != reflect.TypeOf(tt.want) { + t.Errorf("ProtoDeserilizer() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/serialization/serializer.go b/serialization/serializer.go new file mode 100644 index 00000000..66246e41 --- /dev/null +++ b/serialization/serializer.go @@ -0,0 +1,11 @@ +package serialization + +type Serializer interface { + Serialize(m interface{}) ([]byte, error) +} + +type SerializeFunc func(m interface{}) ([]byte, error) + +func (f SerializeFunc) Serialize(m interface{}) ([]byte, error) { + return f(m) +} diff --git a/websocket/connection/table_test.go b/websocket/connection/table_test.go deleted file mode 100644 index 2964894b..00000000 --- a/websocket/connection/table_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package connection - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestConnectionPerGroup(t *testing.T) { - t.Run("Should return all the group on the table with the count", func(t *testing.T) { - table := NewTable(10) - table.Store(Identifier{ID: "user1", Group: "group1"}) - table.Store(Identifier{ID: "user2", Group: "group1"}) - table.Store(Identifier{ID: "user3", Group: "group1"}) - table.Store(Identifier{ID: "user1", Group: "group2"}) - table.Store(Identifier{ID: "user2", Group: "group2"}) - assert.Equal(t, map[string]int{"group1": 3, "group2": 2}, table.TotalConnectionPerGroup()) - }) -} - -func TestStore(t *testing.T) { - t.Run("Should store new connection", func(t *testing.T) { - table := NewTable(10) - err := table.Store(Identifier{ID: "user1", Group: ""}) - assert.NoError(t, err) - assert.True(t, table.Exists(Identifier{ID: "user1"})) - }) - - t.Run("Should return max connection reached error when connection is maxed", func(t *testing.T) { - table := NewTable(0) - err := table.Store(Identifier{ID: "user1", Group: ""}) - assert.Error(t, err, errMaxConnectionReached) - }) - - t.Run("Should return duplicated error when connection already exists", func(t *testing.T) { - table := NewTable(2) - err := table.Store(Identifier{ID: "user1", Group: ""}) - assert.NoError(t, err) - err = table.Store(Identifier{ID: "user1", Group: ""}) - assert.Error(t, err, errConnDuplicated) - }) - - t.Run("Should remove connection when identifier match", func(t *testing.T) { - table := NewTable(10) - table.Store(Identifier{ID: "user1", Group: ""}) - table.Remove(Identifier{ID: "user1", Group: ""}) - assert.False(t, table.Exists(Identifier{ID: "user1", Group: ""})) - }) -} diff --git a/websocket/handler.go b/websocket/handler.go deleted file mode 100644 index 75f3550b..00000000 --- a/websocket/handler.go +++ /dev/null @@ -1,85 +0,0 @@ -package websocket - -import ( - "fmt" - "net/http" - "raccoon/logger" - "raccoon/metrics" - "raccoon/websocket/connection" - "time" - - pb "raccoon/websocket/proto" - - "github.com/golang/protobuf/proto" - "github.com/gorilla/websocket" -) - -type Handler struct { - upgrader *connection.Upgrader - bufferChannel chan EventsBatch - PingChannel chan connection.Conn -} -type EventsBatch struct { - ConnIdentifier connection.Identifier - EventReq *pb.EventRequest - TimeConsumed time.Time - TimePushed time.Time -} - -func PingHandler(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte("pong")) -} - -//HandlerWSEvents handles the upgrade and the events sent by the peers -func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) { - conn, err := h.upgrader.Upgrade(w, r) - if err != nil { - logger.Debugf("[websocket.Handler] %v", err) - return - } - defer conn.Close() - h.PingChannel <- conn - - for { - _, message, err := conn.ReadMessage() - if err != nil { - if websocket.IsCloseError(err, websocket.CloseGoingAway, - websocket.CloseNormalClosure, - websocket.CloseNoStatusReceived, - websocket.CloseAbnormalClosure) { - logger.Error(fmt.Sprintf("[websocket.Handler] %s closed abruptly: %v", conn.Identifier, err)) - metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=closeerror,conn_group=%s", conn.Identifier.Group)) - break - } - - metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=unknown,conn_group=%s", conn.Identifier.Group)) - logger.Error(fmt.Sprintf("[websocket.Handler] reading message failed. Unknown failure for %s: %v", conn.Identifier, err)) //no connection issue here - break - } - timeConsumed := time.Now() - metrics.Count("events_rx_bytes_total", len(message), fmt.Sprintf("conn_group=%s", conn.Identifier.Group)) - payload := &pb.EventRequest{} - err = proto.Unmarshal(message, payload) - if err != nil { - logger.Error(fmt.Sprintf("[websocket.Handler] reading message failed for %s: %v", conn.Identifier, err)) - metrics.Increment("batches_read_total", fmt.Sprintf("status=failed,reason=serde,conn_group=%s", conn.Identifier.Group)) - badrequest := createBadrequestResponse(err) - conn.WriteMessage(websocket.BinaryMessage, badrequest) - continue - } - metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", conn.Identifier.Group)) - metrics.Count("events_rx_total", len(payload.Events), fmt.Sprintf("conn_group=%s", conn.Identifier.Group)) - - h.bufferChannel <- EventsBatch{ - ConnIdentifier: conn.Identifier, - EventReq: payload, - TimeConsumed: timeConsumed, - TimePushed: (time.Now()), - } - - resp := createSuccessResponse(payload.ReqGuid) - success, _ := proto.Marshal(resp) - conn.WriteMessage(websocket.BinaryMessage, success) - } -} diff --git a/websocket/responsefactory.go b/websocket/responsefactory.go deleted file mode 100644 index 0bcce572..00000000 --- a/websocket/responsefactory.go +++ /dev/null @@ -1,34 +0,0 @@ -package websocket - -import ( - "fmt" - "github.com/golang/protobuf/proto" - "time" - - pb "raccoon/websocket/proto" -) - -func createSuccessResponse(requestGUID string) *pb.EventResponse { - response := &pb.EventResponse{ - Status: pb.Status_SUCCESS, - Code: pb.Code_OK, - SentTime: time.Now().Unix(), - Reason: "", - Data: map[string]string{ - "req_guid": requestGUID, - }, - } - return response -} - -func createBadrequestResponse(err error) []byte { - response := pb.EventResponse{ - Status: pb.Status_ERROR, - Code: pb.Code_BAD_REQUEST, - SentTime: time.Now().Unix(), - Reason: fmt.Sprintf("cannot deserialize request: %s", err), - Data: nil, - } - badrequestResp, _ := proto.Marshal(&response) - return badrequestResp -} diff --git a/websocket/server.go b/websocket/server.go deleted file mode 100644 index 76f3ecaf..00000000 --- a/websocket/server.go +++ /dev/null @@ -1,113 +0,0 @@ -package websocket - -import ( - "context" - "fmt" - "net/http" - "raccoon/config" - "raccoon/logger" - "raccoon/websocket/connection" - "runtime" - "time" - - "raccoon/metrics" - - "github.com/gorilla/mux" - - // https://golang.org/pkg/net/http/pprof/ - _ "net/http/pprof" -) - -type Server struct { - HTTPServer *http.Server - bufferChannel chan EventsBatch - table *connection.Table - pingChannel chan connection.Conn -} - -func (s *Server) StartHTTPServer(ctx context.Context, cancel context.CancelFunc) { - go func() { - logger.Info("WebSocket Server --> startHttpServer") - err := s.HTTPServer.ListenAndServe() - if err != http.ErrServerClosed { - logger.Errorf("WebSocket Server --> HTTP Server could not be started = %s", err.Error()) - cancel() - } - }() - go s.ReportServerMetrics() - go Pinger(s.pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval) - go func() { - if err := http.ListenAndServe("localhost:6060", nil); err != nil { - logger.Errorf("WebSocket Server --> pprof could not be enabled: %s", err.Error()) - cancel() - } else { - logger.Info("WebSocket Server --> pprof :: Enabled") - } - }() -} - -func (s *Server) ReportServerMetrics() { - t := time.Tick(config.MetricStatsd.FlushPeriodMs) - m := &runtime.MemStats{} - for { - <-t - for k, v := range s.table.TotalConnectionPerGroup() { - metrics.Gauge("connections_count_current", v, fmt.Sprintf("conn_group=%s", k)) - } - metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), "") - - runtime.ReadMemStats(m) - metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, "") - metrics.Gauge("server_mem_heap_inuse_bytes_current", m.HeapInuse, "") - metrics.Gauge("server_mem_heap_objects_total_current", m.HeapObjects, "") - metrics.Gauge("server_mem_stack_inuse_bytes_current", m.StackInuse, "") - metrics.Gauge("server_mem_gc_triggered_current", m.LastGC/1000, "") - metrics.Gauge("server_mem_gc_pauseNs_current", m.PauseNs[(m.NumGC+255)%256]/1000, "") - metrics.Gauge("server_mem_gc_count_current", m.NumGC, "") - metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, "") - } -} - -//CreateServer - instantiates the http server -func CreateServer() (*Server, chan EventsBatch) { - //create the websocket handler that upgrades the http request - bufferChannel := make(chan EventsBatch, config.Worker.ChannelSize) - pingChannel := make(chan connection.Conn, config.ServerWs.ServerMaxConn) - ugConfig := connection.UpgraderConfig{ - ReadBufferSize: config.ServerWs.ReadBufferSize, - WriteBufferSize: config.ServerWs.WriteBufferSize, - CheckOrigin: config.ServerWs.CheckOrigin, - MaxUser: config.ServerWs.ServerMaxConn, - PongWaitInterval: config.ServerWs.PongWaitInterval, - WriteWaitInterval: config.ServerWs.WriteWaitInterval, - ConnIDHeader: config.ServerWs.ConnIDHeader, - ConnGroupHeader: config.ServerWs.ConnGroupHeader, - ConnGroupDefault: config.ServerWs.ConnGroupDefault, - } - upgrader := connection.NewUpgrader(ugConfig) - wsHandler := &Handler{ - upgrader: upgrader, - bufferChannel: bufferChannel, - PingChannel: pingChannel, - } - server := &Server{ - HTTPServer: &http.Server{ - Handler: Router(wsHandler), - Addr: ":" + config.ServerWs.AppPort, - }, - table: upgrader.Table, - bufferChannel: bufferChannel, - pingChannel: pingChannel, - } - //Wrap the handler with a Server instance and return it - return server, bufferChannel -} - -// Router sets up the routes -func Router(h *Handler) http.Handler { - router := mux.NewRouter() - router.Path("/ping").HandlerFunc(PingHandler).Methods(http.MethodGet) - subRouter := router.PathPrefix("/api/v1").Subrouter() - subRouter.HandleFunc("/events", h.HandlerWSEvents).Methods(http.MethodGet).Name("events") - return router -} diff --git a/worker/mocks.go b/worker/mocks.go index ecc0bc55..1473e2b8 100644 --- a/worker/mocks.go +++ b/worker/mocks.go @@ -1,9 +1,10 @@ package worker import ( + pb "raccoon/proto" + "github.com/stretchr/testify/mock" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" - pb "raccoon/websocket/proto" ) type mockKafkaPublisher struct { diff --git a/worker/worker.go b/worker/worker.go index 2f0c9d17..e79b824d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -2,9 +2,9 @@ package worker import ( "fmt" + "raccoon/collection" "raccoon/logger" "raccoon/metrics" - ws "raccoon/websocket" "sync" "time" @@ -17,13 +17,13 @@ import ( type Pool struct { Size int deliveryChannelSize int - EventsChannel <-chan ws.EventsBatch + EventsChannel <-chan *collection.CollectRequest kafkaProducer publisher.KafkaProducer wg sync.WaitGroup } // CreateWorkerPool create new Pool struct given size and EventsChannel worker. -func CreateWorkerPool(size int, eventsChannel <-chan ws.EventsBatch, deliveryChannelSize int, kafkaProducer publisher.KafkaProducer) *Pool { +func CreateWorkerPool(size int, eventsChannel <-chan *collection.CollectRequest, deliveryChannelSize int, kafkaProducer publisher.KafkaProducer) *Pool { return &Pool{ Size: size, deliveryChannelSize: deliveryChannelSize, @@ -45,7 +45,7 @@ func (w *Pool) StartWorkers() { batchReadTime := time.Now() //@TODO - Should add integration tests to prove that the worker receives the same message that it produced, on the delivery channel it created - err := w.kafkaProducer.ProduceBulk(request.EventReq.GetEvents(), deliveryChan) + err := w.kafkaProducer.ProduceBulk(request.GetEvents(), deliveryChan) totalErr := 0 if err != nil { for _, err := range err.(publisher.BulkError).Errors { @@ -55,17 +55,17 @@ func (w *Pool) StartWorkers() { } } } - lenBatch := int64(len(request.EventReq.GetEvents())) + lenBatch := int64(len(request.GetEvents())) logger.Debug(fmt.Sprintf("Success sending messages, %v", lenBatch-int64(totalErr))) if lenBatch > 0 { - eventTimingMs := time.Since(time.Unix(request.EventReq.SentTime.Seconds, 0)).Milliseconds() / lenBatch - metrics.Timing("event_processing_duration_milliseconds", eventTimingMs, fmt.Sprintf("conn_group=%s", request.ConnIdentifier.Group)) + eventTimingMs := time.Since(time.Unix(request.SentTime.Seconds, 0)).Milliseconds() / lenBatch + metrics.Timing("event_processing_duration_milliseconds", eventTimingMs, fmt.Sprintf("conn_group=%s", request.ConnectionIdentifier.Group)) now := time.Now() metrics.Timing("worker_processing_duration_milliseconds", (now.Sub(batchReadTime).Milliseconds())/lenBatch, "worker="+workerName) - metrics.Timing("server_processing_latency_milliseconds", (now.Sub(request.TimeConsumed)).Milliseconds()/lenBatch, fmt.Sprintf("conn_group=%s", request.ConnIdentifier.Group)) + metrics.Timing("server_processing_latency_milliseconds", (now.Sub(request.TimeConsumed)).Milliseconds()/lenBatch, fmt.Sprintf("conn_group=%s", request.ConnectionIdentifier.Group)) } - metrics.Count("kafka_messages_delivered_total", totalErr, fmt.Sprintf("success=false,conn_group=%s", request.ConnIdentifier.Group)) - metrics.Count("kafka_messages_delivered_total", len(request.EventReq.GetEvents())-totalErr, fmt.Sprintf("success=true,conn_group=%s", request.ConnIdentifier.Group)) + metrics.Count("kafka_messages_delivered_total", totalErr, fmt.Sprintf("success=false,conn_group=%s", request.ConnectionIdentifier.Group)) + metrics.Count("kafka_messages_delivered_total", len(request.GetEvents())-totalErr, fmt.Sprintf("success=true,conn_group=%s", request.ConnectionIdentifier.Group)) } w.wg.Done() }(fmt.Sprintf("worker-%d", i)) diff --git a/worker/worker_test.go b/worker/worker_test.go index d8beaa88..461fcbff 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,22 +1,27 @@ package worker import ( - ws "raccoon/websocket" "sync" "testing" "time" "github.com/golang/protobuf/ptypes/timestamp" - pb "raccoon/websocket/proto" + "raccoon/collection" + "raccoon/identification" + pb "raccoon/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) func TestWorker(t *testing.T) { - request := ws.EventsBatch{ - EventReq: &pb.EventRequest{ + request := &collection.CollectRequest{ + ConnectionIdentifier: &identification.Identifier{ + ID: "12345", + Group: "viewer", + }, + EventRequest: &pb.EventRequest{ SentTime: ×tamp.Timestamp{Seconds: 1593574343}, }, } @@ -28,7 +33,7 @@ func TestWorker(t *testing.T) { m.On("Timing", "processing.latency", mock.Anything, "") m.On("Count", "kafka_messages_delivered_total", 0, "success=true") m.On("Count", "kafka_messages_delivered_total", 0, "success=false") - bc := make(chan ws.EventsBatch, 2) + bc := make(chan *collection.CollectRequest, 2) worker := Pool{ Size: 1, deliveryChannelSize: 0, @@ -50,7 +55,7 @@ func TestWorker(t *testing.T) { t.Run("Flush", func(t *testing.T) { t.Run("Should block until all messages is processed", func(t *testing.T) { kp := mockKafkaPublisher{} - bc := make(chan ws.EventsBatch, 2) + bc := make(chan *collection.CollectRequest, 2) m := &mockMetric{} m.On("Timing", "processing.latency", mock.Anything, "") m.On("Count", "kafka_messages_delivered_total", 0, "success=false")