Skip to content

Commit

Permalink
Add Export EMF Logs To EMF Exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
sethAmazon committed Feb 10, 2023
1 parent 08cd036 commit 84f11ae
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 90 deletions.
12 changes: 4 additions & 8 deletions cmd/otelcontribcol/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
FROM alpine:latest as prep
RUN apk --update add ca-certificates
FROM ubuntu:latest as prep
RUN apt-get update
RUN apt-get install ca-certificates -y
RUN update-ca-certificates

RUN mkdir -p /tmp

FROM scratch

ARG USER_UID=10001
USER ${USER_UID}

COPY --from=prep /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY otelcontribcol /
EXPOSE 4317 55680 55679
ENTRYPOINT ["/otelcontribcol"]
Expand Down
59 changes: 4 additions & 55 deletions examples/demo/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,68 +1,17 @@
version: "2"
services:

# Jaeger
jaeger-all-in-one:
image: jaegertracing/all-in-one:latest
restart: always
ports:
- "16686:16686"
- "14268"
- "14250"

# Zipkin
zipkin-all-in-one:
image: openzipkin/zipkin:latest
restart: always
ports:
- "9411:9411"

# Collector
otel-collector:
image: ${OTELCOL_IMG}
image: otelcontribcol:latest
restart: always
command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
- ~/.aws/:/root/.aws:ro
ports:
- "1888:1888" # pprof extension
- "8888:8888" # Prometheus metrics exposed by the collector
- "8889:8889" # Prometheus exporter metrics
- "13133:13133" # health_check extension
- "4317:4317" # OTLP gRPC receiver
- "55679:55679" # zpages extension
depends_on:
- jaeger-all-in-one
- zipkin-all-in-one

demo-client:
build:
dockerfile: Dockerfile
context: ./client
restart: always
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317
- DEMO_SERVER_ENDPOINT=http://demo-server:7080/hello
depends_on:
- demo-server

demo-server:
build:
dockerfile: Dockerfile
context: ./server
restart: always
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317
ports:
- "7080"
depends_on:
- otel-collector

prometheus:
container_name: prometheus
image: prom/prometheus:latest
restart: always
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
- "54525:54525" # tcplog
- "25888:25888" # udplog
31 changes: 8 additions & 23 deletions examples/demo/otel-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -1,25 +1,14 @@
receivers:
otlp:
protocols:
grpc:
tcplog:
listen_address: "0.0.0.0:54525"
udplog:
listen_address: "0.0.0.0:25888"

exporters:
prometheus:
endpoint: "0.0.0.0:8889"
const_labels:
label1: value1
awsemf:

logging:

zipkin:
endpoint: "http://zipkin-all-in-one:9411/api/v2/spans"
format: proto

jaeger:
endpoint: jaeger-all-in-one:14250
tls:
insecure: true

processors:
batch:

Expand All @@ -33,11 +22,7 @@ extensions:
service:
extensions: [pprof, zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, zipkin, jaeger]
metrics:
receivers: [otlp]
logs:
receivers: [tcplog, udplog]
processors: [batch]
exporters: [logging, prometheus]
exporters: [awsemf, logging]
107 changes: 104 additions & 3 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/google/uuid"
Expand All @@ -30,8 +28,11 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"strings"
"sync"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
Expand All @@ -58,6 +59,15 @@ type emfExporter struct {
collectorID string
}

type EmfWrapper struct {
Wrapper EmfFormatLog `json:"_aws"`
}

type EmfFormatLog struct {
LogGroupName string `json:"LogGroupName"`
Timestamp int64 `json:"Timestamp"`
}

// newEmfPusher func creates an EMF Exporter instance with data push callback func
func newEmfPusher(
config component.Config,
Expand Down Expand Up @@ -117,6 +127,25 @@ func newEmfExporter(
return resourcetotelemetry.WrapMetricsExporter(config.(*Config).ResourceToTelemetrySettings, exporter), nil
}

// newEmfLogsExporter creates a new exporter using exporterhelper
func newEmfLogsExporter(
config component.Config,
set exporter.CreateSettings,
) (exporter.Logs, error) {
exp, err := newEmfPusher(config, set)
if err != nil {
return nil, err
}

return exporterhelper.NewLogsExporter(
context.TODO(),
set,
config,
exp.(*emfExporter).pushLogsData,
exporterhelper.WithShutdown(exp.(*emfExporter).Shutdown),
)
}

func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error {
rms := md.ResourceMetrics()
labels := map[string]string{}
Expand Down Expand Up @@ -222,6 +251,10 @@ func (emf *emfExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics)
return emf.pushMetricsData(ctx, md)
}

func (emf *emfExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return emf.pushLogsData(ctx, ld)
}

// Shutdown stops the exporter and is invoked during shutdown.
func (emf *emfExporter) Shutdown(ctx context.Context) error {
for _, emfPusher := range emf.listPushers() {
Expand All @@ -246,6 +279,74 @@ func (emf *emfExporter) Start(ctx context.Context, host component.Host) error {
return nil
}

func (emf *emfExporter) pushLogsData(_ context.Context, ld plog.Logs) error {
expConfig := emf.config.(*Config)
logStream := expConfig.LogStreamName
if logStream == "" {
logStream = fmt.Sprintf("otel-stream-%s", emf.collectorID)
}
outputDestination := expConfig.OutputDestination

for i := 0; i < ld.LogRecordCount(); i++ {
for j := 0; j < ld.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
for k := 0; k < ld.ResourceLogs().At(i).ScopeLogs().At(j).LogRecords().Len(); k++ {
logEvent, wrapper, err := convertLogRecordToLogEvent(ld.ResourceLogs().At(i).ScopeLogs().At(j).LogRecords().At(k))
if err != nil {
return err
}
if strings.EqualFold(outputDestination, outputDestinationStdout) {
fmt.Println(logEvent.InputLogEvent.Message)
} else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
logGroup := wrapper.Wrapper.LogGroupName

emfPusher := emf.getPusher(logGroup, logStream)
if emfPusher != nil {
returnError := emfPusher.AddLogEntry(logEvent)
if returnError != nil {
return wrapErrorIfBadRequest(returnError)
}
}
}
}
}
}

if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
for _, emfPusher := range emf.listPushers() {
returnError := emfPusher.ForceFlush()
if returnError != nil {
// TODO now we only have one logPusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(returnError)
if err != nil {
emf.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err))
}
return err
}
}
}

return nil
}

func convertLogRecordToLogEvent(logRecord plog.LogRecord) (*cwlogs.Event, EmfWrapper, error) {
logJson := logRecord.Body().AsString()

wrapper := EmfWrapper{}
err := json.Unmarshal([]byte(logJson), &wrapper)
if err != nil {
return nil, EmfWrapper{}, errors.New(fmt.Sprintf("Could not unmarshal emf log err %v", err))
}

logEvent := cwlogs.NewEvent(
wrapper.Wrapper.Timestamp,
logJson,
)
// aws timestamp comes from cw logs event not the log record timestamp
// generated time tracks the time log record time
logEvent.GeneratedTime = logRecord.Timestamp().AsTime()
return logEvent, wrapper, nil
}

func wrapErrorIfBadRequest(err error) error {
var rfErr awserr.RequestFailure
if errors.As(err, &rfErr) && rfErr.StatusCode() < 500 {
Expand Down
59 changes: 59 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package awsemfexporter
import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/collector/pdata/plog"
"os"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
Expand Down Expand Up @@ -129,6 +132,62 @@ func TestConsumeMetrics(t *testing.T) {
require.NoError(t, exp.Shutdown(ctx))
}

func TestConsumeLogs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testCases := map[string]struct {
input plog.Logs
wantErr error
outputDest string
}{
"OldTimestamp": {
input: createPLog(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`),
wantErr: errors.New("the log entry's timestamp is older than 14 days or more than 2 hours in the future"),
outputDest: "cloudwatch",
},
"CurrentTimestamp": {
input: createPLog(fmt.Sprintf(`{"_aws":{"Timestamp":%d,"LogGroupName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`, time.Now().UnixNano()/int64(time.Millisecond))),
wantErr: nil,
outputDest: "stdout",
},
}
for name, testCase := range testCases {
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.MaxRetries = 0
exp, err := newEmfLogsExporter(expCfg, exportertest.NewNopCreateSettings())
assert.Nil(t, err)
assert.NotNil(t, exp)
t.Run(name, func(t *testing.T) {
expCfg.OutputDestination = testCase.outputDest
err = exp.ConsumeLogs(ctx, testCase.input)
if testCase.wantErr == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Equal(t, testCase.wantErr, err)
}
require.NoError(t, exp.Shutdown(ctx))
})
}
}

func createPLog(log string) plog.Logs {
pLog := plog.NewLogs()
pLog.
ResourceLogs().
AppendEmpty().
ScopeLogs().
AppendEmpty().
LogRecords().
AppendEmpty().
Body().
SetStr(log)
return pLog
}

func TestConsumeMetricsWithOutputDestination(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
13 changes: 12 additions & 1 deletion exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func NewFactory() exporter.Factory {
return exporter.NewFactory(
typeStr,
createDefaultConfig,
exporter.WithMetrics(createMetricsExporter, stability))
exporter.WithMetrics(createMetricsExporter, stability),
exporter.WithLogs(createLogsExporter, stability))
}

// CreateDefaultConfig creates the default configuration for exporter.
Expand All @@ -62,3 +63,13 @@ func createMetricsExporter(_ context.Context,

return newEmfExporter(expCfg, params)
}

// createLogsExporter creates a metrics exporter based on this config.
func createLogsExporter(_ context.Context,
params exporter.CreateSettings,
config component.Config) (exporter.Logs, error) {

expCfg := config.(*Config)

return newEmfLogsExporter(expCfg, params)
}

0 comments on commit 84f11ae

Please sign in to comment.