Skip to content

Commit

Permalink
feat(Forwarder Agent): Backport OTel client (#676)
Browse files Browse the repository at this point in the history
Backports the OTel client from `main` with support for logs and traces.
  • Loading branch information
ctlong authored Jan 17, 2025
1 parent 7b4d4fc commit 9de5b8e
Show file tree
Hide file tree
Showing 18 changed files with 4,577 additions and 170 deletions.
134 changes: 134 additions & 0 deletions src/cmd/forwarder-agent/app/app_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
Expand Down Expand Up @@ -196,6 +198,18 @@ func MakeSampleBigEnvelope() *loggregator_v2.Envelope {
}
}

func WithSampleTraceIdAndSpanId() loggregator.EmitTimerOption {
return func(m proto.Message) {
switch e := m.(type) {
case *loggregator_v2.Envelope:
e.Tags["trace_id"] = "beefdeadbeefdeadbeefdeadbeefdead"
e.Tags["span_id"] = "deadbeefdeadbeef"
default:
panic(fmt.Sprintf("unsupported Message type: %T", m))
}
}
}

// A fake OTel Collector metrics gRPC server that captures requests made to it.
type spyOtelColMetricServer struct {
colmetricspb.UnimplementedMetricsServiceServer
Expand Down Expand Up @@ -255,3 +269,123 @@ func (s *spyOtelColMetricServer) Export(_ context.Context, req *colmetricspb.Exp
func (s *spyOtelColMetricServer) close() {
s.srv.Stop()
}

// A fake OTel Collector trace gRPC server that captures requests made to it.
type spyOtelColTraceServer struct {
coltracepb.UnimplementedTraceServiceServer

srv *grpc.Server
addr string

requests chan *coltracepb.ExportTraceServiceRequest
}

// Creates a spyOtelColTraceServer, starts it listening on a random port,
// registers it as a gRPC service, and writes out a temp file for the forwarder
// agent to recognize it as a destination. The cfgPath it accepts is the folder
// under which to write the temp file.
func startSpyOtelColTraceServer(cfgPath string, tc *testhelper.TestCerts, commonName string) *spyOtelColTraceServer {
serverCreds, err := plumbing.NewServerCredentials(
tc.Cert(commonName),
tc.Key(commonName),
tc.CA(),
)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

lis, err := net.Listen("tcp", "127.0.0.1:")
ExpectWithOffset(1, err).NotTo(HaveOccurred())

s := &spyOtelColTraceServer{
srv: grpc.NewServer(grpc.Creds(serverCreds)),
requests: make(chan *coltracepb.ExportTraceServiceRequest, 10000),
addr: lis.Addr().String(),
}

coltracepb.RegisterTraceServiceServer(s.srv, s)
go s.srv.Serve(lis) //nolint:errcheck

port, err := strconv.Atoi(strings.Split(s.addr, ":")[1])
ExpectWithOffset(1, err).NotTo(HaveOccurred())

dir, err := os.MkdirTemp(cfgPath, "")
ExpectWithOffset(1, err).ToNot(HaveOccurred())
tmpfn := filepath.Join(dir, "ingress_port.yml")

contents := fmt.Sprintf(`---
ingress: %d
protocol: otelcol
`, port)
err = os.WriteFile(tmpfn, []byte(contents), 0600)
ExpectWithOffset(1, err).ToNot(HaveOccurred())

return s
}

func (s *spyOtelColTraceServer) Export(_ context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
s.requests <- req
return &coltracepb.ExportTraceServiceResponse{}, nil
}

func (s *spyOtelColTraceServer) close() {
s.srv.Stop()
}

// A fake OTel Collector logs gRPC server that captures requests made to it.
type spyOtelColLogServer struct {
collogspb.UnimplementedLogsServiceServer

srv *grpc.Server
addr string

requests chan *collogspb.ExportLogsServiceRequest
}

// Creates a spyOtelColLogServer, starts it listening on a random port,
// registers it as a gRPC service, and writes out a temp file for the forwarder
// agent to recognize it as a destination. The cfgPath it accepts is the folder
// under which to write the temp file.
func startSpyOtelColLogServer(cfgPath string, tc *testhelper.TestCerts, commonName string) *spyOtelColLogServer {
serverCreds, err := plumbing.NewServerCredentials(
tc.Cert(commonName),
tc.Key(commonName),
tc.CA(),
)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

lis, err := net.Listen("tcp", "127.0.0.1:")
ExpectWithOffset(1, err).NotTo(HaveOccurred())

s := &spyOtelColLogServer{
srv: grpc.NewServer(grpc.Creds(serverCreds)),
requests: make(chan *collogspb.ExportLogsServiceRequest, 10000),
addr: lis.Addr().String(),
}

collogspb.RegisterLogsServiceServer(s.srv, s)
go s.srv.Serve(lis) //nolint:errcheck

port, err := strconv.Atoi(strings.Split(s.addr, ":")[1])
ExpectWithOffset(1, err).NotTo(HaveOccurred())

dir, err := os.MkdirTemp(cfgPath, "")
ExpectWithOffset(1, err).ToNot(HaveOccurred())
tmpfn := filepath.Join(dir, "ingress_port.yml")

contents := fmt.Sprintf(`---
ingress: %d
protocol: otelcol
`, port)
err = os.WriteFile(tmpfn, []byte(contents), 0600)
ExpectWithOffset(1, err).ToNot(HaveOccurred())

return s
}

func (s *spyOtelColLogServer) Export(_ context.Context, req *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) {
s.requests <- req
return &collogspb.ExportLogsServiceResponse{}, nil
}

func (s *spyOtelColLogServer) close() {
s.srv.Stop()
}
1 change: 0 additions & 1 deletion src/cmd/forwarder-agent/app/forwarder_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func otelCollectorClient(dest destination, grpc GRPC, m Metrics, l *log.Logger)
"destination": dest.Ingress,
}),
)

dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w), gendiodes.AlertFunc(func(missed int) {
expired.Add(float64(missed))
}), timeoutwaitgroup.New(time.Minute))
Expand Down
94 changes: 86 additions & 8 deletions src/cmd/forwarder-agent/app/forwarder_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"time"

"code.cloudfoundry.org/loggregator-agent-release/src/pkg/config"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"

"code.cloudfoundry.org/go-loggregator/v9"
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
Expand Down Expand Up @@ -349,23 +351,54 @@ var _ = Describe("App", func() {
})

Context("when an OTel Collector is registered to forward to", func() {
var otelServer *spyOtelColMetricServer
var (
otelMetricsServer *spyOtelColMetricServer
otelTraceServer *spyOtelColTraceServer
otelLogsServer *spyOtelColLogServer
)

BeforeEach(func() {
otelServer = startSpyOtelColMetricServer(ingressCfgPath, agentCerts, "otel-collector")
otelMetricsServer = startSpyOtelColMetricServer(ingressCfgPath, agentCerts, "otel-collector")
otelTraceServer = startSpyOtelColTraceServer(ingressCfgPath, agentCerts, "otel-collector")
otelLogsServer = startSpyOtelColLogServer(ingressCfgPath, agentCerts, "otel-collector")
})

JustBeforeEach(func() {
// Because the event being sent JustBeforeEach in the main test, channels need to be emptied
for len(otelMetricsServer.requests) > 0 {
<-otelMetricsServer.requests
}
for len(otelTraceServer.requests) > 0 {
<-otelTraceServer.requests
}
// test-title events
for len(otelLogsServer.requests) > 0 {
<-otelLogsServer.requests
}
})

AfterEach(func() {
otelServer.close()
otelMetricsServer.close()
otelTraceServer.close()
otelLogsServer.close()
})

DescribeTable("some envelopes are not forwarded",
DescribeTable("do not forward log nor event envelopes to otel metrics",
func(e *loggregator_v2.Envelope) {
ingressClient.Emit(e)
Consistently(otelServer.requests, 3).ShouldNot(Receive())
Consistently(otelMetricsServer.requests, 0.5).ShouldNot(Receive())
},
Entry("drops logs", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Log{}}),
Entry("drops events", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}),
)

DescribeTable("do not forward counters, gagues nor timers envelopes to otel logs",
func(e *loggregator_v2.Envelope) {
ingressClient.Emit(e)
Consistently(otelLogsServer.requests, 0.5).ShouldNot(Receive())
},
Entry("drops counters", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Counter{}}),
Entry("drops gauges", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Gauge{}}),
Entry("drops timers", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}),
)

Expand All @@ -374,7 +407,7 @@ var _ = Describe("App", func() {
ingressClient.EmitCounter(name)

var req *colmetricspb.ExportMetricsServiceRequest
Eventually(otelServer.requests).Should(Receive(&req))
Eventually(otelMetricsServer.requests).Should(Receive(&req))

metric := req.ResourceMetrics[0].ScopeMetrics[0].Metrics[0]
Expect(metric.GetName()).To(Equal(name))
Expand All @@ -385,16 +418,61 @@ var _ = Describe("App", func() {
ingressClient.EmitGauge(loggregator.WithGaugeValue(name, 20.2, "test-unit"))

var req *colmetricspb.ExportMetricsServiceRequest
Eventually(otelServer.requests).Should(Receive(&req))
Eventually(otelMetricsServer.requests).Should(Receive(&req))

metric := req.ResourceMetrics[0].ScopeMetrics[0].Metrics[0]
Expect(metric.GetName()).To(Equal(name))
})

It("forwards timers", func() {
name := "test-timer-name"
ingressClient.EmitTimer(name, time.Now(), time.Now().Add(time.Second), WithSampleTraceIdAndSpanId())

var req *coltracepb.ExportTraceServiceRequest
Eventually(otelTraceServer.requests).Should(Receive(&req))

trace := req.ResourceSpans[0].ScopeSpans[0].Spans[0]
Expect(trace.GetName()).To(Equal(name))
})

It("forwards logs", func() {
body := "test log body"
ingressClient.EmitLog(body, loggregator.WithStdout())

var req *collogspb.ExportLogsServiceRequest
Eventually(otelLogsServer.requests).Should(Receive(&req))

log := req.ResourceLogs[0].ScopeLogs[0].LogRecords[0]
Expect(log.GetBody().GetStringValue()).To(Equal(body))
})

It("forwards events", func() {
title := "event title"
body := "event body"
err := ingressClient.EmitEvent(context.TODO(), title, body)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

var req *collogspb.ExportLogsServiceRequest
Eventually(otelLogsServer.requests).Should(Receive(&req))

log := req.ResourceLogs[0].ScopeLogs[0].LogRecords[0]
Expect(len(log.GetBody().GetKvlistValue().GetValues())).To(Equal(2))
for _, v := range log.GetBody().GetKvlistValue().GetValues() {
switch v.GetKey() {
case "title":
Expect(v.GetValue().GetStringValue()).To(Equal(title))
case "body":
Expect(v.GetValue().GetStringValue()).To(Equal(body))
default:
Expect(v.GetKey()).ToNot(HaveOccurred())
}
}
})

It("emits an expired metric", func() {
et := map[string]string{
"protocol": "otelcol",
"destination": otelServer.addr,
"destination": otelMetricsServer.addr,
}

Eventually(agentMetrics.HasMetric).WithArguments("egress_expired_total", et).Should(BeTrue())
Expand Down
73 changes: 0 additions & 73 deletions src/pkg/otelcolclient/metric_batcher.go

This file was deleted.

Loading

0 comments on commit 9de5b8e

Please sign in to comment.