Skip to content

Commit

Permalink
Make whether timers are forwarded as traces configurable
Browse files Browse the repository at this point in the history
- Adds temporary_emit_otel_traces property to forwarder agent to control
  whether to convert timers to traces and emit to OTel Collector.
- Default to false until we have more experience with this feature being
  activated.
- An alternative implementation would be to allow each destination to
  configure the types of envelopes it would like to receive, which
  seemed unnecessarily complex.
  • Loading branch information
acrmp committed Apr 15, 2024
1 parent c638ce7 commit baf8617
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 21 deletions.
1 change: 1 addition & 0 deletions jobs/loggr-forwarder-agent-windows/monit
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"AGENT_CIPHER_SUITES" => p("tls.cipher_suites").split(":").join(","),
"AGENT_TAGS" => tags.map { |k, v| "#{k}:#{v}" }.join(","),
"DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"),
"EMIT_OTEL_TRACES" => "#{p("temporary_emit_otel_traces")}",
"METRICS_PORT" => "#{p("metrics.port")}",
"METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt",
"METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt",
Expand Down
4 changes: 4 additions & 0 deletions jobs/loggr-forwarder-agent-windows/spec
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ properties:
default: {}
example: {"deployment": "cf"}

temporary_emit_otel_traces:
description: "Whether to emit traces to OpenTelemetry Collector downstream consumers"
default: false

tls.ca_cert:
description: |
TLS loggregator root CA certificate. It is required for key/cert
Expand Down
4 changes: 4 additions & 0 deletions jobs/loggr-forwarder-agent/spec
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ properties:
default: {}
example: {"deployment": "cf"}

temporary_emit_otel_traces:
description: "Whether to emit traces to OpenTelemetry Collector downstream consumers"
default: false

tls.ca_cert:
description: |
TLS loggregator root CA certificate. It is required for key/cert
Expand Down
1 change: 1 addition & 0 deletions jobs/loggr-forwarder-agent/templates/bpm.yml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"AGENT_TAGS" => tags.map { |k, v| "#{k}:#{v}" }.join(","),

"DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"),
"EMIT_OTEL_TRACES" => p("temporary_emit_otel_traces"),

"METRICS_PORT" => "#{p("metrics.port")}",
"METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt",
Expand Down
12 changes: 12 additions & 0 deletions src/cmd/forwarder-agent/app/app_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,18 @@ func MakeSampleBigEnvelope() *loggregator_v2.Envelope {
}
}

func WithSampleTraceIdAndSpanId() loggregator.EmitTimerOption {
return func(m proto.Message) {
switch e := m.(type) {
case *loggregator_v2.Envelope:
e.Tags["trace_id"] = "beefdeadbeefdeadbeefdeadbeefdead"
e.Tags["span_id"] = "deadbeefdeadbeef"
default:
panic(fmt.Sprintf("unsupported Message type: %T", m))
}
}
}

// A fake OTel Collector metrics gRPC server that captures requests made to it.
type spyOtelColMetricServer struct {
colmetricspb.UnimplementedMetricsServiceServer
Expand Down
1 change: 1 addition & 0 deletions src/cmd/forwarder-agent/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
MetricsServer config.MetricsServer
Tags map[string]string `env:"AGENT_TAGS"`
DebugMetrics bool `env:"DEBUG_METRICS, report"`
EmitOTelTraces bool `env:"EMIT_OTEL_TRACES, report"`
}

// LoadConfig will load the configuration for the forwarder agent from the
Expand Down
12 changes: 7 additions & 5 deletions src/cmd/forwarder-agent/app/forwarder_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ForwarderAgent struct {
log *log.Logger
tags map[string]string
debugMetrics bool
emitOTelTraces bool
}

type Metrics interface {
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewForwarderAgent(
log: log,
tags: cfg.Tags,
debugMetrics: cfg.MetricsServer.DebugMetrics,
emitOTelTraces: cfg.EmitOTelTraces,
}
}

Expand All @@ -94,7 +96,7 @@ func (s *ForwarderAgent) Run() {
}))

dests := downstreamDestinations(s.downstreamFilePattern, s.log)
writers := downstreamWriters(dests, s.grpc, s.m, s.log)
writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.log)
tagger := egress_v2.NewTagger(s.tags)
ew := egress_v2.NewEnvelopeWriter(
multiWriter{writers: writers},
Expand Down Expand Up @@ -207,13 +209,13 @@ func downstreamDestinations(pattern string, l *log.Logger) []destination {
return dests
}

func downstreamWriters(dests []destination, grpc GRPC, m Metrics, l *log.Logger) []Writer {
func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces bool, l *log.Logger) []Writer {
var writers []Writer
for _, d := range dests {
var w Writer
switch d.Protocol {
case "otelcol":
w = otelCollectorClient(d, grpc, m, l)
w = otelCollectorClient(d, grpc, m, emitOTelTraces, l)
default:
w = loggregatorClient(d, grpc, m, l)
}
Expand All @@ -222,7 +224,7 @@ func downstreamWriters(dests []destination, grpc GRPC, m Metrics, l *log.Logger)
return writers
}

func otelCollectorClient(dest destination, grpc GRPC, m Metrics, l *log.Logger) Writer {
func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces bool, l *log.Logger) Writer {
clientCreds, err := tlsconfig.Build(
tlsconfig.WithInternalServiceDefaults(),
tlsconfig.WithIdentityFromFile(grpc.CertFile, grpc.KeyFile),
Expand Down Expand Up @@ -250,7 +252,7 @@ func otelCollectorClient(dest destination, grpc GRPC, m Metrics, l *log.Logger)
}),
)

dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w), gendiodes.AlertFunc(func(missed int) {
dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w, emitTraces), gendiodes.AlertFunc(func(missed int) {
expired.Add(float64(missed))
}), timeoutwaitgroup.New(time.Minute))

Expand Down
46 changes: 36 additions & 10 deletions src/cmd/forwarder-agent/app/forwarder_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/config"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/protobuf/proto"

"code.cloudfoundry.org/go-loggregator/v9"
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
Expand Down Expand Up @@ -359,6 +358,7 @@ var _ = Describe("App", func() {
BeforeEach(func() {
otelMetricsServer = startSpyOtelColMetricServer(ingressCfgPath, agentCerts, "otel-collector")
otelTraceServer = startSpyOtelColTraceServer(ingressCfgPath, agentCerts, "otel-collector")
agentCfg.EmitOTelTraces = true
})

AfterEach(func() {
Expand Down Expand Up @@ -399,15 +399,7 @@ var _ = Describe("App", func() {

It("forwards timers", func() {
name := "test-timer-name"
ingressClient.EmitTimer(name, time.Now(), time.Now().Add(time.Second), func(m proto.Message) {
switch e := m.(type) {
case *loggregator_v2.Envelope:
e.Tags["trace_id"] = "beefdeadbeefdeadbeefdeadbeefdead"
e.Tags["span_id"] = "deadbeefdeadbeef"
default:
panic(fmt.Sprintf("unsupported Message type: %T", m))
}
})
ingressClient.EmitTimer(name, time.Now(), time.Now().Add(time.Second), WithSampleTraceIdAndSpanId())

var req *coltracepb.ExportTraceServiceRequest
Eventually(otelTraceServer.requests).Should(Receive(&req))
Expand All @@ -416,6 +408,40 @@ var _ = Describe("App", func() {
Expect(trace.GetName()).To(Equal(name))
})

Context("when support for forwarding timers as traces is not active", func() {
BeforeEach(func() {
agentCfg.EmitOTelTraces = false
})

It("only emits timers to other destinations", func() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer wg.Wait()
defer cancel()

wg.Add(1)
go func() {
defer wg.Done()

ticker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
ingressClient.EmitTimer("some-timer", time.Now(), time.Now().Add(time.Second), WithSampleTraceIdAndSpanId())
}
}
}()

var e *loggregator_v2.Envelope
Eventually(ingressServer1.envelopes, 5).Should(Receive(&e))
Expect(e.GetTimer().GetName()).To(Equal("some-timer"))
Consistently(otelTraceServer.requests, 5).ShouldNot(Receive())
})
})

It("emits an expired metric", func() {
et := map[string]string{
"protocol": "otelcol",
Expand Down
11 changes: 9 additions & 2 deletions src/pkg/otelcolclient/otelcolclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,15 @@ func (w GRPCWriter) Close() error {
type Client struct {
// Batch metrics sent to OTel Collector
b *SignalBatcher
// Forward timers as traces
emitTraces bool
}

// New creates a new Client that will batch metrics.
func New(w Writer) *Client {
func New(w Writer, emitTraces bool) *Client {
return &Client{
b: NewSignalBatcher(100, 100*time.Millisecond, w),
b: NewSignalBatcher(100, 100*time.Millisecond, w),
emitTraces: emitTraces,
}
}

Expand Down Expand Up @@ -178,6 +181,10 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) {

// writeTimer translates a loggregator v2 Timer to OTLP and adds the spans to the pending batch.
func (c *Client) writeTimer(e *loggregator_v2.Envelope) {
if !c.emitTraces {
return
}

if ok := validateTimerTags(e.GetTags()); !ok {
return
}
Expand Down
17 changes: 13 additions & 4 deletions src/pkg/otelcolclient/otelcolclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
var _ = Describe("Client", func() {
var (
c Client
b *SignalBatcher
spyMSC *spyMetricsServiceClient
spyTSC *spyTraceServiceClient
buf *gbytes.Buffer
Expand Down Expand Up @@ -50,12 +51,12 @@ var _ = Describe("Client", func() {
cancel: cancel,
l: log.New(GinkgoWriter, "", 0),
}
b := NewSignalBatcher(
b = NewSignalBatcher(
1,
100*time.Millisecond,
w,
)
c = Client{b: b}
c = Client{b: b, emitTraces: true}
})

AfterEach(func() {
Expand Down Expand Up @@ -110,7 +111,7 @@ var _ = Describe("Client", func() {
100*time.Millisecond,
w,
)
c = Client{b: b}
c = Client{b: b, emitTraces: true}
})

It("returns nil", func() {
Expand Down Expand Up @@ -551,7 +552,6 @@ var _ = Describe("Client", func() {
Kind: tracepb.Span_SPAN_KIND_SERVER,
StartTimeUnixNano: 1710799972405641252,
EndTimeUnixNano: 1710799972408946683,
// Attributes: []*commonpb.KeyValue{},
Status: &tracepb.Status{
Code: tracepb.Status_STATUS_CODE_UNSET,
},
Expand Down Expand Up @@ -648,6 +648,15 @@ var _ = Describe("Client", func() {
})
}

Context("when support for forwarding traces is not active", func() {
BeforeEach(func() {
c = Client{b: b, emitTraces: false}
})
It("does not forward a trace", func() {
Expect(spyTSC.requests).NotTo(Receive())
})
})

Context("when the timer has no peer_type tag", func() {
BeforeEach(func() {
delete(envelope.Tags, "peer_type")
Expand Down

0 comments on commit baf8617

Please sign in to comment.