Skip to content

Commit

Permalink
Move From OpenTracing To OpenTelemetry
Browse files Browse the repository at this point in the history
Signed-off-by: rodneyosodo <[email protected]>
  • Loading branch information
rodneyosodo committed Jul 25, 2023
1 parent fb596ab commit e46a2f1
Show file tree
Hide file tree
Showing 318 changed files with 23,176 additions and 22,516 deletions.
20 changes: 9 additions & 11 deletions agent/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"time"

"github.com/go-kit/kit/endpoint"
kitot "github.com/go-kit/kit/tracing/opentracing"
kitgrpc "github.com/go-kit/kit/transport/grpc"
"github.com/opentracing/opentracing-go"
"github.com/ultravioletrs/agent/agent"
"google.golang.org/grpc"
)
Expand All @@ -26,40 +24,40 @@ type grpcClient struct {
}

// NewClient returns new gRPC client instance.
func NewClient(tracer opentracing.Tracer, conn *grpc.ClientConn, timeout time.Duration) agent.AgentServiceClient {
func NewClient(conn *grpc.ClientConn, timeout time.Duration) agent.AgentServiceClient {
return &grpcClient{
run: kitot.TraceClient(tracer, "run")(kitgrpc.NewClient(
run: kitgrpc.NewClient(
conn,
svcName,
"Run",
encodeRunRequest,
decodeRunResponse,
agent.RunResponse{},
).Endpoint()),
algo: kitot.TraceClient(tracer, "algo")(kitgrpc.NewClient(
).Endpoint(),
algo: kitgrpc.NewClient(
conn,
svcName,
"Algo",
encodeAlgoRequest,
decodeAlgoResponse,
agent.AlgoResponse{},
).Endpoint()),
data: kitot.TraceClient(tracer, "data")(kitgrpc.NewClient(
).Endpoint(),
data: kitgrpc.NewClient(
conn,
svcName,
"Data",
encodeDataRequest,
decodeDataResponse,
agent.DataResponse{},
).Endpoint()),
result: kitot.TraceClient(tracer, "result")(kitgrpc.NewClient(
).Endpoint(),
result: kitgrpc.NewClient(
conn,
svcName,
"Result",
encodeResultRequest,
decodeResultResponse,
agent.ResultResponse{},
).Endpoint()),
).Endpoint(),
timeout: timeout,
}
}
Expand Down
12 changes: 5 additions & 7 deletions agent/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package grpc
import (
"context"

kitot "github.com/go-kit/kit/tracing/opentracing"
kitgrpc "github.com/go-kit/kit/transport/grpc"
"github.com/opentracing/opentracing-go"
"github.com/ultravioletrs/agent/agent"
)

Expand All @@ -18,25 +16,25 @@ type grpcServer struct {
}

// NewServer returns new AgentServiceServer instance.
func NewServer(tracer opentracing.Tracer, svc agent.Service) agent.AgentServiceServer {
func NewServer(svc agent.Service) agent.AgentServiceServer {
return &grpcServer{
run: kitgrpc.NewServer(
kitot.TraceServer(tracer, "run")(runEndpoint(svc)),
runEndpoint(svc),
decodeRunRequest,
encodeRunResponse,
),
algo: kitgrpc.NewServer(
kitot.TraceServer(tracer, "algo")(algoEndpoint(svc)),
algoEndpoint(svc),
decodeAlgoRequest,
encodeAlgoResponse,
),
data: kitgrpc.NewServer(
kitot.TraceServer(tracer, "data")(dataEndpoint(svc)),
dataEndpoint(svc),
decodeDataRequest,
encodeDataResponse,
),
result: kitgrpc.NewServer(
kitot.TraceServer(tracer, "result")(resultEndpoint(svc)),
resultEndpoint(svc),
decodeResultRequest,
encodeResultResponse,
),
Expand Down
19 changes: 9 additions & 10 deletions agent/api/http/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
"net/http"
"strings"

kitot "github.com/go-kit/kit/tracing/opentracing"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/go-zoo/bone"
"github.com/mainflux/mainflux"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus/promhttp"
agent "github.com/ultravioletrs/agent/agent"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

const (
Expand All @@ -30,27 +29,27 @@ var (
)

// MakeHandler returns a HTTP handler for API endpoints.
func MakeHandler(tracer opentracing.Tracer, svc agent.Service, instanceID string) http.Handler {
func MakeHandler(svc agent.Service, instanceID string) http.Handler {
opts := []kithttp.ServerOption{
kithttp.ServerErrorEncoder(encodeError),
}

r := bone.New()

r.Post("/agent", kithttp.NewServer(
kitot.TraceServer(tracer, "ping")(pingEndpoint(svc)),
r.Post("/agent", otelhttp.NewHandler(kithttp.NewServer(
pingEndpoint(svc),
decodePing,
encodeResponse,
opts...,
))
r.Post("/run", kithttp.NewServer(
kitot.TraceServer(tracer, "run")(runEndpoint(svc)),
), "ping"))
r.Post("/run", otelhttp.NewHandler(kithttp.NewServer(
runEndpoint(svc),
decodeRun,
encodeResponse,
opts...,
))
), "run"))

r.GetFunc("/version", mainflux.Health("things", instanceID))
r.GetFunc("/version", mainflux.Health("agent", instanceID))
r.Handle("/metrics", promhttp.Handler())

return r
Expand Down
49 changes: 15 additions & 34 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,19 @@ package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"

mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/uuid"
opentracing "github.com/opentracing/opentracing-go"
jconfig "github.com/uber/jaeger-client-go/config"
agent "github.com/ultravioletrs/agent/agent"
"github.com/ultravioletrs/agent/agent/api"
agentgrpc "github.com/ultravioletrs/agent/agent/api/grpc"
httpapi "github.com/ultravioletrs/agent/agent/api/http"
"github.com/ultravioletrs/agent/agent/tracing"
"github.com/ultravioletrs/agent/internal"
"github.com/ultravioletrs/agent/internal/env"
jaegerclient "github.com/ultravioletrs/agent/internal/jaeger"
"github.com/ultravioletrs/agent/internal/server"
grpcserver "github.com/ultravioletrs/agent/internal/server/grpc"
httpserver "github.com/ultravioletrs/agent/internal/server/http"
Expand All @@ -39,7 +36,7 @@ const (
type config struct {
LogLevel string `env:"AGENT_LOG_LEVEL" envDefault:"info"`
Secret string `env:"AGENT_SECRET" envDefault:"secret"`
JaegerURL string `env:"AGENT_JAEGER_URL" envDefault:"localhost:14268/api/traces"`
JaegerURL string `env:"AGENT_JAEGER_URL" envDefault:"http://localhost:14268/api/traces"`
InstanceID string `env:"AGENT_INSTANCE_ID" envDefault:""`
}

Expand All @@ -64,24 +61,32 @@ func main() {
}
}

agentTracer, agentCloser := initJaeger("agent", cfg.JaegerURL, logger)
defer agentCloser.Close()
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)

svc := newService(cfg.Secret, logger, trace.NewNoopTracerProvider().Tracer(svcName))
svc := newService(cfg.Secret, logger, tracer)

var httpServerConfig = server.Config{Port: defSvcHTTPPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(agentTracer, svc, cfg.InstanceID), logger)
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, cfg.InstanceID), logger)

var grpcServerConfig = server.Config{Port: defSvcGRPCPort}
if err := env.Parse(&grpcServerConfig, env.Options{Prefix: envPrefixGRPC}); err != nil {
log.Fatalf("failed to load %s gRPC server configuration : %s", svcName, err.Error())
}
registerAgentServiceServer := func(srv *grpc.Server) {
reflection.Register(srv)
agent.RegisterAgentServiceServer(srv, agentgrpc.NewServer(agentTracer, svc))
agent.RegisterAgentServiceServer(srv, agentgrpc.NewServer(svc))
}
gs := grpcserver.New(ctx, cancel, svcName, grpcServerConfig, registerAgentServiceServer, logger)

Expand All @@ -102,30 +107,6 @@ func main() {
}
}

func initJaeger(svcName, url string, logger mflog.Logger) (opentracing.Tracer, io.Closer) {
if url == "" {
return opentracing.NoopTracer{}, ioutil.NopCloser(nil)
}

tracer, closer, err := jconfig.Configuration{
ServiceName: svcName,
Sampler: &jconfig.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jconfig.ReporterConfig{
LocalAgentHostPort: url,
LogSpans: true,
},
}.NewTracer()
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger client: %s", err))
os.Exit(1)
}

return tracer, closer
}

func newService(secret string, logger mflog.Logger, tracer trace.Tracer) agent.Service {
svc := agent.New(secret)

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (

type config struct {
LogLevel string `env:"AGENT_LOG_LEVEL" envDefault:"info"`
JaegerURL string `env:"AGENT_JAEGER_URL" envDefault:""`
JaegerURL string `env:"AGENT_JAEGER_URL" envDefault:"http://localhost:14268/api/traces"`
}

func main() {
Expand Down
23 changes: 15 additions & 8 deletions docker/.env
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
# Docker: Environment variables in Compose

## Jaeger
AGENT_JAEGER_PORT=6831
AGENT_JAEGER_FRONTEND=16686
AGENT_JAEGER_COLLECTOR=14268
AGENT_JAEGER_CONFIGS=5778
AGENT_JAEGER_URL=http://jaeger:14268/api/traces

## Agent
AGENT_LOG_LEVEL=info
AGENT_SECRET=secret
AGENT_JAEGER_URL=
MF_AGENT_HTTP_HOST=
MF_AGENT_HTTP_PORT=9031
MF_AGENT_HTTP_SERVER_CERT=
MF_AGENT_HTTP_SERVER_KEY=
MF_AGENT_GRPC_HOST=
MF_AGENT_GRPC_PORT=7002
MF_AGENT_GRPC_SERVER_CERT=
MF_AGENT_GRPC_SERVER_KEY=
AGENT_HTTP_HOST=
AGENT_HTTP_PORT=9031
AGENT_HTTP_SERVER_CERT=
AGENT_HTTP_SERVER_KEY=
AGENT_GRPC_HOST=
AGENT_GRPC_PORT=7002
AGENT_GRPC_SERVER_CERT=
AGENT_GRPC_SERVER_KEY=
26 changes: 11 additions & 15 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,18 @@
version: "3.7"

networks:
docker_mainflux-base-net:
external: true
agent-net:
driver: bridge

services:
mfxkit:
image: mainflux/mfxkit:latest
container_name: mainflux-mfxkit
restart: on-failure
environment:
MF_MFXKIT_LOG_LEVEL: ${MF_MFXKIT_LOG_LEVEL}
MF_MFXKIT_HTTP_PORT: ${MF_MFXKIT_HTTP_PORT}
MF_MFXKIT_SERVER_CERT: ${MF_MFXKIT_SERVER_CERT}
MF_MFXKIT_SERVER_KEY: ${MF_MFXKIT_SERVER_KEY}
MF_JAEGER_URL: ${MF_JAEGER_URL}
MF_MFXKIT_SECRET: ${MF_MFXKIT_SECRET}
jaeger:
image: jaegertracing/all-in-one:1.38.0
container_name: mainflux-jaeger
ports:
- ${MF_MFXKIT_HTTP_PORT}:${MF_MFXKIT_HTTP_PORT}
- ${AGENT_JAEGER_PORT}:${AGENT_JAEGER_PORT}/udp
- ${AGENT_JAEGER_FRONTEND}:${AGENT_JAEGER_FRONTEND}
- ${AGENT_JAEGER_COLLECTOR}:${AGENT_JAEGER_COLLECTOR}
- ${AGENT_JAEGER_CONFIGS}:${AGENT_JAEGER_CONFIGS}
networks:
- docker_mainflux-base-net
- agent-net

10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ require (
github.com/go-kit/kit v0.12.0
github.com/go-zoo/bone v1.3.0
github.com/mainflux/mainflux v0.0.0-20230722123816-70f53c2f979d
github.com/opentracing/opentracing-go v1.2.0
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.7.0
github.com/uber/jaeger-client-go v2.30.0+incompatible
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/jaeger v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.56.1
Expand All @@ -22,6 +24,7 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
Expand All @@ -30,15 +33,12 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
Expand Down
Loading

0 comments on commit e46a2f1

Please sign in to comment.