From baf8617f4fbcfa0d85cfe4bbcdde35b8832ba18c Mon Sep 17 00:00:00 2001 From: Andrew Crump Date: Wed, 10 Apr 2024 23:11:09 +0000 Subject: [PATCH] Make whether timers are forwarded as traces configurable - Adds temporary_emit_otel_traces property to forwarder agent to control whether to convert timers to traces and emit to OTel Collector. - Default to false until we have more experience with this feature being activated. - An alternative implementation would be to allow each destination to configure the types of envelopes it would like to receive, which seemed unnecessarily complex. --- jobs/loggr-forwarder-agent-windows/monit | 1 + jobs/loggr-forwarder-agent-windows/spec | 4 ++ jobs/loggr-forwarder-agent/spec | 4 ++ .../templates/bpm.yml.erb | 1 + src/cmd/forwarder-agent/app/app_suite_test.go | 12 +++++ src/cmd/forwarder-agent/app/config.go | 1 + .../forwarder-agent/app/forwarder_agent.go | 12 +++-- .../app/forwarder_agent_test.go | 46 +++++++++++++++---- src/pkg/otelcolclient/otelcolclient.go | 11 ++++- src/pkg/otelcolclient/otelcolclient_test.go | 17 +++++-- 10 files changed, 88 insertions(+), 21 deletions(-) diff --git a/jobs/loggr-forwarder-agent-windows/monit b/jobs/loggr-forwarder-agent-windows/monit index 9c9e23946..7410241a4 100644 --- a/jobs/loggr-forwarder-agent-windows/monit +++ b/jobs/loggr-forwarder-agent-windows/monit @@ -24,6 +24,7 @@ "AGENT_CIPHER_SUITES" => p("tls.cipher_suites").split(":").join(","), "AGENT_TAGS" => tags.map { |k, v| "#{k}:#{v}" }.join(","), "DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"), + "EMIT_OTEL_TRACES" => "#{p("temporary_emit_otel_traces")}", "METRICS_PORT" => "#{p("metrics.port")}", "METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt", "METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt", diff --git a/jobs/loggr-forwarder-agent-windows/spec b/jobs/loggr-forwarder-agent-windows/spec index 27117f54d..41816d805 100644 --- a/jobs/loggr-forwarder-agent-windows/spec +++ b/jobs/loggr-forwarder-agent-windows/spec @@ -34,6 +34,10 @@ properties: default: {} example: {"deployment": "cf"} + temporary_emit_otel_traces: + description: "Whether to emit traces to OpenTelemetry Collector downstream consumers" + default: false + tls.ca_cert: description: | TLS loggregator root CA certificate. It is required for key/cert diff --git a/jobs/loggr-forwarder-agent/spec b/jobs/loggr-forwarder-agent/spec index 21b350e76..e72576559 100644 --- a/jobs/loggr-forwarder-agent/spec +++ b/jobs/loggr-forwarder-agent/spec @@ -34,6 +34,10 @@ properties: default: {} example: {"deployment": "cf"} + temporary_emit_otel_traces: + description: "Whether to emit traces to OpenTelemetry Collector downstream consumers" + default: false + tls.ca_cert: description: | TLS loggregator root CA certificate. It is required for key/cert diff --git a/jobs/loggr-forwarder-agent/templates/bpm.yml.erb b/jobs/loggr-forwarder-agent/templates/bpm.yml.erb index 2dbde5035..ddb5309b8 100644 --- a/jobs/loggr-forwarder-agent/templates/bpm.yml.erb +++ b/jobs/loggr-forwarder-agent/templates/bpm.yml.erb @@ -29,6 +29,7 @@ "AGENT_TAGS" => tags.map { |k, v| "#{k}:#{v}" }.join(","), "DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"), + "EMIT_OTEL_TRACES" => p("temporary_emit_otel_traces"), "METRICS_PORT" => "#{p("metrics.port")}", "METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt", diff --git a/src/cmd/forwarder-agent/app/app_suite_test.go b/src/cmd/forwarder-agent/app/app_suite_test.go index 9d3b26ddd..46db976cf 100644 --- a/src/cmd/forwarder-agent/app/app_suite_test.go +++ b/src/cmd/forwarder-agent/app/app_suite_test.go @@ -197,6 +197,18 @@ func MakeSampleBigEnvelope() *loggregator_v2.Envelope { } } +func WithSampleTraceIdAndSpanId() loggregator.EmitTimerOption { + return func(m proto.Message) { + switch e := m.(type) { + case *loggregator_v2.Envelope: + e.Tags["trace_id"] = "beefdeadbeefdeadbeefdeadbeefdead" + e.Tags["span_id"] = "deadbeefdeadbeef" + default: + panic(fmt.Sprintf("unsupported Message type: %T", m)) + } + } +} + // A fake OTel Collector metrics gRPC server that captures requests made to it. type spyOtelColMetricServer struct { colmetricspb.UnimplementedMetricsServiceServer diff --git a/src/cmd/forwarder-agent/app/config.go b/src/cmd/forwarder-agent/app/config.go index 9ac926211..54daa6120 100644 --- a/src/cmd/forwarder-agent/app/config.go +++ b/src/cmd/forwarder-agent/app/config.go @@ -29,6 +29,7 @@ type Config struct { MetricsServer config.MetricsServer Tags map[string]string `env:"AGENT_TAGS"` DebugMetrics bool `env:"DEBUG_METRICS, report"` + EmitOTelTraces bool `env:"EMIT_OTEL_TRACES, report"` } // LoadConfig will load the configuration for the forwarder agent from the diff --git a/src/cmd/forwarder-agent/app/forwarder_agent.go b/src/cmd/forwarder-agent/app/forwarder_agent.go index b628b3c65..f2e8da0e9 100644 --- a/src/cmd/forwarder-agent/app/forwarder_agent.go +++ b/src/cmd/forwarder-agent/app/forwarder_agent.go @@ -41,6 +41,7 @@ type ForwarderAgent struct { log *log.Logger tags map[string]string debugMetrics bool + emitOTelTraces bool } type Metrics interface { @@ -71,6 +72,7 @@ func NewForwarderAgent( log: log, tags: cfg.Tags, debugMetrics: cfg.MetricsServer.DebugMetrics, + emitOTelTraces: cfg.EmitOTelTraces, } } @@ -94,7 +96,7 @@ func (s *ForwarderAgent) Run() { })) dests := downstreamDestinations(s.downstreamFilePattern, s.log) - writers := downstreamWriters(dests, s.grpc, s.m, s.log) + writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.log) tagger := egress_v2.NewTagger(s.tags) ew := egress_v2.NewEnvelopeWriter( multiWriter{writers: writers}, @@ -207,13 +209,13 @@ func downstreamDestinations(pattern string, l *log.Logger) []destination { return dests } -func downstreamWriters(dests []destination, grpc GRPC, m Metrics, l *log.Logger) []Writer { +func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces bool, l *log.Logger) []Writer { var writers []Writer for _, d := range dests { var w Writer switch d.Protocol { case "otelcol": - w = otelCollectorClient(d, grpc, m, l) + w = otelCollectorClient(d, grpc, m, emitOTelTraces, l) default: w = loggregatorClient(d, grpc, m, l) } @@ -222,7 +224,7 @@ func downstreamWriters(dests []destination, grpc GRPC, m Metrics, l *log.Logger) return writers } -func otelCollectorClient(dest destination, grpc GRPC, m Metrics, l *log.Logger) Writer { +func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces bool, l *log.Logger) Writer { clientCreds, err := tlsconfig.Build( tlsconfig.WithInternalServiceDefaults(), tlsconfig.WithIdentityFromFile(grpc.CertFile, grpc.KeyFile), @@ -250,7 +252,7 @@ func otelCollectorClient(dest destination, grpc GRPC, m Metrics, l *log.Logger) }), ) - dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w), gendiodes.AlertFunc(func(missed int) { + dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w, emitTraces), gendiodes.AlertFunc(func(missed int) { expired.Add(float64(missed)) }), timeoutwaitgroup.New(time.Minute)) diff --git a/src/cmd/forwarder-agent/app/forwarder_agent_test.go b/src/cmd/forwarder-agent/app/forwarder_agent_test.go index 1b4aa2d94..4ee521813 100644 --- a/src/cmd/forwarder-agent/app/forwarder_agent_test.go +++ b/src/cmd/forwarder-agent/app/forwarder_agent_test.go @@ -13,7 +13,6 @@ import ( "code.cloudfoundry.org/loggregator-agent-release/src/pkg/config" colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" - "google.golang.org/protobuf/proto" "code.cloudfoundry.org/go-loggregator/v9" "code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2" @@ -359,6 +358,7 @@ var _ = Describe("App", func() { BeforeEach(func() { otelMetricsServer = startSpyOtelColMetricServer(ingressCfgPath, agentCerts, "otel-collector") otelTraceServer = startSpyOtelColTraceServer(ingressCfgPath, agentCerts, "otel-collector") + agentCfg.EmitOTelTraces = true }) AfterEach(func() { @@ -399,15 +399,7 @@ var _ = Describe("App", func() { It("forwards timers", func() { name := "test-timer-name" - ingressClient.EmitTimer(name, time.Now(), time.Now().Add(time.Second), func(m proto.Message) { - switch e := m.(type) { - case *loggregator_v2.Envelope: - e.Tags["trace_id"] = "beefdeadbeefdeadbeefdeadbeefdead" - e.Tags["span_id"] = "deadbeefdeadbeef" - default: - panic(fmt.Sprintf("unsupported Message type: %T", m)) - } - }) + ingressClient.EmitTimer(name, time.Now(), time.Now().Add(time.Second), WithSampleTraceIdAndSpanId()) var req *coltracepb.ExportTraceServiceRequest Eventually(otelTraceServer.requests).Should(Receive(&req)) @@ -416,6 +408,40 @@ var _ = Describe("App", func() { Expect(trace.GetName()).To(Equal(name)) }) + Context("when support for forwarding timers as traces is not active", func() { + BeforeEach(func() { + agentCfg.EmitOTelTraces = false + }) + + It("only emits timers to other destinations", func() { + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + defer wg.Wait() + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + + ticker := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + ingressClient.EmitTimer("some-timer", time.Now(), time.Now().Add(time.Second), WithSampleTraceIdAndSpanId()) + } + } + }() + + var e *loggregator_v2.Envelope + Eventually(ingressServer1.envelopes, 5).Should(Receive(&e)) + Expect(e.GetTimer().GetName()).To(Equal("some-timer")) + Consistently(otelTraceServer.requests, 5).ShouldNot(Receive()) + }) + }) + It("emits an expired metric", func() { et := map[string]string{ "protocol": "otelcol", diff --git a/src/pkg/otelcolclient/otelcolclient.go b/src/pkg/otelcolclient/otelcolclient.go index f1660bbea..320b4e723 100644 --- a/src/pkg/otelcolclient/otelcolclient.go +++ b/src/pkg/otelcolclient/otelcolclient.go @@ -98,12 +98,15 @@ func (w GRPCWriter) Close() error { type Client struct { // Batch metrics sent to OTel Collector b *SignalBatcher + // Forward timers as traces + emitTraces bool } // New creates a new Client that will batch metrics. -func New(w Writer) *Client { +func New(w Writer, emitTraces bool) *Client { return &Client{ - b: NewSignalBatcher(100, 100*time.Millisecond, w), + b: NewSignalBatcher(100, 100*time.Millisecond, w), + emitTraces: emitTraces, } } @@ -178,6 +181,10 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) { // writeTimer translates a loggregator v2 Timer to OTLP and adds the spans to the pending batch. func (c *Client) writeTimer(e *loggregator_v2.Envelope) { + if !c.emitTraces { + return + } + if ok := validateTimerTags(e.GetTags()); !ok { return } diff --git a/src/pkg/otelcolclient/otelcolclient_test.go b/src/pkg/otelcolclient/otelcolclient_test.go index c648b4f82..b10c8983c 100644 --- a/src/pkg/otelcolclient/otelcolclient_test.go +++ b/src/pkg/otelcolclient/otelcolclient_test.go @@ -23,6 +23,7 @@ import ( var _ = Describe("Client", func() { var ( c Client + b *SignalBatcher spyMSC *spyMetricsServiceClient spyTSC *spyTraceServiceClient buf *gbytes.Buffer @@ -50,12 +51,12 @@ var _ = Describe("Client", func() { cancel: cancel, l: log.New(GinkgoWriter, "", 0), } - b := NewSignalBatcher( + b = NewSignalBatcher( 1, 100*time.Millisecond, w, ) - c = Client{b: b} + c = Client{b: b, emitTraces: true} }) AfterEach(func() { @@ -110,7 +111,7 @@ var _ = Describe("Client", func() { 100*time.Millisecond, w, ) - c = Client{b: b} + c = Client{b: b, emitTraces: true} }) It("returns nil", func() { @@ -551,7 +552,6 @@ var _ = Describe("Client", func() { Kind: tracepb.Span_SPAN_KIND_SERVER, StartTimeUnixNano: 1710799972405641252, EndTimeUnixNano: 1710799972408946683, - // Attributes: []*commonpb.KeyValue{}, Status: &tracepb.Status{ Code: tracepb.Status_STATUS_CODE_UNSET, }, @@ -648,6 +648,15 @@ var _ = Describe("Client", func() { }) } + Context("when support for forwarding traces is not active", func() { + BeforeEach(func() { + c = Client{b: b, emitTraces: false} + }) + It("does not forward a trace", func() { + Expect(spyTSC.requests).NotTo(Receive()) + }) + }) + Context("when the timer has no peer_type tag", func() { BeforeEach(func() { delete(envelope.Tags, "peer_type")