Skip to content

Commit

Permalink
Merge pull request #25 from ultravioletrs/NOISSUE-updateserver
Browse files Browse the repository at this point in the history
NOISSUE - Move HTTP and GRPC Server Implementations To Internal
  • Loading branch information
darkodraskovic authored Jul 25, 2023
2 parents bb88a71 + 9484a44 commit 5bce3a8
Show file tree
Hide file tree
Showing 120 changed files with 23,807 additions and 56 deletions.
101 changes: 50 additions & 51 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package main

import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"

mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/uuid"
Expand All @@ -18,26 +15,36 @@ import (
agent "github.com/ultravioletrs/agent/agent"
"github.com/ultravioletrs/agent/agent/api"
agentgrpc "github.com/ultravioletrs/agent/agent/api/grpc"
agenthttpapi "github.com/ultravioletrs/agent/agent/api/http"
httpapi "github.com/ultravioletrs/agent/agent/api/http"
"github.com/ultravioletrs/agent/internal"
"github.com/ultravioletrs/agent/internal/env"
"github.com/ultravioletrs/agent/internal/server"
grpcserver "github.com/ultravioletrs/agent/internal/server/grpc"
httpserver "github.com/ultravioletrs/agent/internal/server/http"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

const svcName = "agent"
const (
svcName = "agent"
envPrefixHTTP = "AGENT_HTTP_"
envPrefixGRPC = "AGENT_GRPC_"
defSvcHTTPPort = "9031"
defSvcGRPCPort = "7002"
)

type config struct {
LogLevel string `env:"AGENT_LOG_LEVEL" envDefault:"info"`
HTTPPort string `env:"AGENT_HTTP_PORT" envDefault:"9031"`
ServerCert string `env:"AGENT_SERVER_CERT" envDefault:""`
ServerKey string `env:"AGENT_SERVER_KEY" envDefault:""`
Secret string `env:"AGENT_SECRET" envDefault:"secret"`
AgentGRPCURL string `env:"AGENT_GRPC_URL" envDefault:"localhost:7002"`
JaegerURL string `env:"AGENT_JAEGER_URL" envDefault:""`
InstanceID string `env:"AGENT_INSTANCE_ID" envDefault:""`
LogLevel string `env:"AGENT_LOG_LEVEL" envDefault:"info"`
Secret string `env:"AGENT_SECRET" envDefault:"secret"`
JaegerURL string `env:"AGENT_JAEGER_URL" envDefault:"localhost:14268/api/traces"`
InstanceID string `env:"AGENT_INSTANCE_ID" envDefault:""`
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

var cfg config
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
Expand All @@ -59,19 +66,38 @@ func main() {
defer agentCloser.Close()

svc := newService(cfg.Secret, logger)
errs := make(chan error, 2)

go startgRPCServer(cfg, &svc, logger, errs)
go startHTTPServer(agenthttpapi.MakeHandler(agentTracer, svc, cfg.InstanceID), cfg.HTTPPort, cfg, logger, errs)
var httpServerConfig = server.Config{Port: defSvcHTTPPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(agentTracer, svc, cfg.InstanceID), logger)

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
var grpcServerConfig = server.Config{Port: defSvcGRPCPort}
if err := env.Parse(&grpcServerConfig, env.Options{Prefix: envPrefixGRPC}); err != nil {
log.Fatalf("failed to load %s gRPC server configuration : %s", svcName, err.Error())
}
registerAgentServiceServer := func(srv *grpc.Server) {
reflection.Register(srv)
agent.RegisterAgentServiceServer(srv, agentgrpc.NewServer(agentTracer, svc))
}
gs := grpcserver.New(ctx, cancel, svcName, grpcServerConfig, registerAgentServiceServer, logger)

err = <-errs
logger.Error(fmt.Sprintf("Agent service terminated: %s", err))
g.Go(func() error {
return hs.Start()
})

g.Go(func() error {
return gs.Start()
})

g.Go(func() error {
return server.StopHandler(ctx, cancel, logger, svcName, hs, gs)
})

if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err))
}
}

func initJaeger(svcName, url string, logger mflog.Logger) (opentracing.Tracer, io.Closer) {
Expand Down Expand Up @@ -107,30 +133,3 @@ func newService(secret string, logger mflog.Logger) agent.Service {

return svc
}

func startHTTPServer(handler http.Handler, port string, cfg config, logger mflog.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
if cfg.ServerCert != "" || cfg.ServerKey != "" {
logger.Info(fmt.Sprintf("Agent service started using https on port %s with cert %s key %s",
port, cfg.ServerCert, cfg.ServerKey))
errs <- http.ListenAndServeTLS(p, cfg.ServerCert, cfg.ServerKey, handler)
return
}
logger.Info(fmt.Sprintf("Agent service started using http on port %s", cfg.HTTPPort))
errs <- http.ListenAndServe(p, handler)
}

func startgRPCServer(cfg config, svc *agent.Service, logger mflog.Logger, errs chan error) {
// Create a gRPC server object
tracer := opentracing.GlobalTracer()
server := grpc.NewServer()
// Register the implementation of the service with the server
agent.RegisterAgentServiceServer(server, agentgrpc.NewServer(tracer, *svc))
// Listen to a port and serve incoming requests
listener, err := net.Listen("tcp", cfg.AgentGRPCURL)
if err != nil {
log.Fatalf(err.Error())
}
logger.Info(fmt.Sprintf("Agent service started using gRPC on address %s", cfg.AgentGRPCURL))
errs <- server.Serve(listener)
}
13 changes: 8 additions & 5 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

## Agent
AGENT_LOG_LEVEL=info
AGENT_HTTP_PORT=9031
AGENT_SERVER_CERT=
AGENT_SERVER_KEY=
AGENT_SECRET=secret
AGENT_JAEGER_URL=
AGENT_GRPC_URL=localhost:7002
AGENT_GRPC_TIMEOUT=1s
MF_AGENT_HTTP_HOST=
MF_AGENT_HTTP_PORT=9031
MF_AGENT_HTTP_SERVER_CERT=
MF_AGENT_HTTP_SERVER_KEY=
MF_AGENT_GRPC_HOST=
MF_AGENT_GRPC_PORT=7002
MF_AGENT_GRPC_SERVER_CERT=
MF_AGENT_GRPC_SERVER_KEY=
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.7.0
github.com/uber/jaeger-client-go v2.30.0+incompatible
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.31.0
)
Expand All @@ -20,6 +22,8 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand All @@ -31,6 +35,9 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
Expand Down
22 changes: 22 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys=
cloud.google.com/go/compute v1.19.1 h1:am86mquDUgjGNWxiGn+5PGLbmgiWXlE/yNWpIpNvuXY=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand All @@ -6,15 +9,22 @@ github.com/caarlos0/env/v7 v7.1.0 h1:9lzTF5amyQeWHZzuZeKlCb5FWSUxpG1js43mhbY8ozg
github.com/caarlos0/env/v7 v7.1.0/go.mod h1:LPPWniDUq4JaO6Q41vtlyikhMknqymCLBw0eX4dcH1E=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8=
github.com/go-kit/kit v0.12.0 h1:e4o3o3IsBfAKQh5Qbbiqyfu97Ku7jrO/JbohvztANh4=
github.com/go-kit/kit v0.12.0/go.mod h1:lHd+EkCZPIwYItmGDDRdhinkzX2A1sj+M9biaEaizzs=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-zoo/bone v1.3.0 h1:PY6sHq37FnQhj+4ZyqFIzJQHvrrGx0GEc3vTZZC/OsI=
github.com/go-zoo/bone v1.3.0/go.mod h1:HI3Lhb7G3UQcAwEhOJ2WyNcsFtQX1WYHa0Hl4OBbhW8=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
Expand Down Expand Up @@ -60,16 +70,28 @@ github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaO
github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0 h1:ZOLJc06r4CB42laIXg/7udr0pbZyuAihN10A/XuiQRY=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0/go.mod h1:5z+/ZWJQKXa9YT34fQNx5K8Hd1EoIhvtUygUQPqEOgQ=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/grpc v1.56.1 h1:z0dNfjIl0VpaZ9iSVjA6daGatAYwPGstTjt5vkRMFkQ=
Expand Down
2 changes: 2 additions & 0 deletions internal/server/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package server contains the HTTP, gRPC and CoAP server implementation.
package server
2 changes: 2 additions & 0 deletions internal/server/grpc/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package grpc contains the gRPC server implementation.
package grpc
102 changes: 102 additions & 0 deletions internal/server/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package grpc

import (
"context"
"fmt"
"net"
"time"

"github.com/mainflux/mainflux/logger"
"github.com/ultravioletrs/agent/internal/server"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
stopWaitTime = 5 * time.Second
)

type Server struct {
server.BaseServer
server *grpc.Server
registerService serviceRegister
}

type serviceRegister func(srv *grpc.Server)

var _ server.Server = (*Server)(nil)

func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, registerService serviceRegister, logger logger.Logger) server.Server {
var listenFullAddress = fmt.Sprintf("%s:%s", config.Host, config.Port)

return &Server{
BaseServer: server.BaseServer{
Ctx: ctx,
Cancel: cancel,
Name: name,
Address: listenFullAddress,
Config: config,
Logger: logger,
},
registerService: registerService,
}
}

func (s *Server) Start() error {
var errCh = make(chan error)

listener, err := net.Listen("tcp", s.Address)
if err != nil {
return fmt.Errorf("failed to listen on port %s: %w", s.Address, err)
}

switch {
case s.Config.CertFile != "" || s.Config.KeyFile != "":
creds, err := credentials.NewServerTLSFromFile(s.Config.CertFile, s.Config.KeyFile)
if err != nil {
return fmt.Errorf("failed to load auth certificates: %w", err)
}
s.Logger.Info(fmt.Sprintf("%s service gRPC server listening at %s with TLS cert %s and key %s", s.Name, s.Address, s.Config.CertFile, s.Config.KeyFile))
s.server = grpc.NewServer(
grpc.Creds(creds),
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)
default:
s.Logger.Info(fmt.Sprintf("%s service gRPC server listening at %s without TLS", s.Name, s.Address))
s.server = grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)
}

s.registerService(s.server)

go func() {
errCh <- s.server.Serve(listener)
}()

select {
case <-s.Ctx.Done():
return s.Stop()
case err := <-errCh:
s.Cancel()

return err
}
}

func (s *Server) Stop() error {
defer s.Cancel()
var c = make(chan bool)
go func() {
defer close(c)
s.server.GracefulStop()
}()
select {
case <-c:
case <-time.After(stopWaitTime):
}
s.Logger.Info(fmt.Sprintf("%s gRPC service shutdown at %s", s.Name, s.Address))

return nil
}
2 changes: 2 additions & 0 deletions internal/server/http/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package http contains the HTTP server implementation.
package http
Loading

0 comments on commit 5bce3a8

Please sign in to comment.