diff --git a/cmd/otelcontribcol/Dockerfile b/cmd/otelcontribcol/Dockerfile index 10b37eaf6e72..c9c26541f41d 100644 --- a/cmd/otelcontribcol/Dockerfile +++ b/cmd/otelcontribcol/Dockerfile @@ -1,14 +1,10 @@ -FROM alpine:latest as prep -RUN apk --update add ca-certificates +FROM ubuntu:latest as prep +RUN apt-get update +RUN apt-get install ca-certificates -y +RUN update-ca-certificates RUN mkdir -p /tmp -FROM scratch - -ARG USER_UID=10001 -USER ${USER_UID} - -COPY --from=prep /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt COPY otelcontribcol / EXPOSE 4317 55680 55679 ENTRYPOINT ["/otelcontribcol"] diff --git a/examples/demo/docker-compose.yaml b/examples/demo/docker-compose.yaml index ea94ffff28f0..7b4320cbedf7 100644 --- a/examples/demo/docker-compose.yaml +++ b/examples/demo/docker-compose.yaml @@ -1,68 +1,17 @@ version: "2" services: - - # Jaeger - jaeger-all-in-one: - image: jaegertracing/all-in-one:latest - restart: always - ports: - - "16686:16686" - - "14268" - - "14250" - - # Zipkin - zipkin-all-in-one: - image: openzipkin/zipkin:latest - restart: always - ports: - - "9411:9411" - # Collector otel-collector: - image: ${OTELCOL_IMG} + image: otelcontribcol:latest restart: always command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"] volumes: - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml + - ~/.aws/:/root/.aws:ro ports: - "1888:1888" # pprof extension - "8888:8888" # Prometheus metrics exposed by the collector - - "8889:8889" # Prometheus exporter metrics - "13133:13133" # health_check extension - - "4317:4317" # OTLP gRPC receiver - "55679:55679" # zpages extension - depends_on: - - jaeger-all-in-one - - zipkin-all-in-one - - demo-client: - build: - dockerfile: Dockerfile - context: ./client - restart: always - environment: - - OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317 - - DEMO_SERVER_ENDPOINT=http://demo-server:7080/hello - depends_on: - - demo-server - - demo-server: - build: - dockerfile: Dockerfile - context: ./server - restart: always - environment: - - OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317 - ports: - - "7080" - depends_on: - - otel-collector - - prometheus: - container_name: prometheus - image: prom/prometheus:latest - restart: always - volumes: - - ./prometheus.yaml:/etc/prometheus/prometheus.yml - ports: - - "9090:9090" + - "54525:54525" # tcplog + - "25888:25888" # udplog diff --git a/examples/demo/otel-collector-config.yaml b/examples/demo/otel-collector-config.yaml index 38f9415203f6..303ee2e75bfd 100644 --- a/examples/demo/otel-collector-config.yaml +++ b/examples/demo/otel-collector-config.yaml @@ -1,25 +1,14 @@ receivers: - otlp: - protocols: - grpc: + tcplog: + listen_address: "0.0.0.0:54525" + udplog: + listen_address: "0.0.0.0:25888" exporters: - prometheus: - endpoint: "0.0.0.0:8889" - const_labels: - label1: value1 + awsemf: logging: - zipkin: - endpoint: "http://zipkin-all-in-one:9411/api/v2/spans" - format: proto - - jaeger: - endpoint: jaeger-all-in-one:14250 - tls: - insecure: true - processors: batch: @@ -33,11 +22,7 @@ extensions: service: extensions: [pprof, zpages, health_check] pipelines: - traces: - receivers: [otlp] - processors: [batch] - exporters: [logging, zipkin, jaeger] - metrics: - receivers: [otlp] + logs: + receivers: [tcplog, udplog] processors: [batch] - exporters: [logging, prometheus] + exporters: [awsemf, logging] diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index ae4605e2135e..e83a0b385afe 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -16,11 +16,9 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "context" + "encoding/json" "errors" "fmt" - "strings" - "sync" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/google/uuid" @@ -30,8 +28,11 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" + "strings" + "sync" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" @@ -58,6 +59,15 @@ type emfExporter struct { collectorID string } +type EmfWrapper struct { + Wrapper EmfFormatLog `json:"_aws"` +} + +type EmfFormatLog struct { + LogGroupName string `json:"LogGroupName"` + Timestamp int64 `json:"Timestamp"` +} + // newEmfPusher func creates an EMF Exporter instance with data push callback func func newEmfPusher( config component.Config, @@ -117,6 +127,25 @@ func newEmfExporter( return resourcetotelemetry.WrapMetricsExporter(config.(*Config).ResourceToTelemetrySettings, exporter), nil } +// newEmfLogsExporter creates a new exporter using exporterhelper +func newEmfLogsExporter( + config component.Config, + set exporter.CreateSettings, +) (exporter.Logs, error) { + exp, err := newEmfPusher(config, set) + if err != nil { + return nil, err + } + + return exporterhelper.NewLogsExporter( + context.TODO(), + set, + config, + exp.(*emfExporter).pushLogsData, + exporterhelper.WithShutdown(exp.(*emfExporter).Shutdown), + ) +} + func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error { rms := md.ResourceMetrics() labels := map[string]string{} @@ -222,6 +251,10 @@ func (emf *emfExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) return emf.pushMetricsData(ctx, md) } +func (emf *emfExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + return emf.pushLogsData(ctx, ld) +} + // Shutdown stops the exporter and is invoked during shutdown. func (emf *emfExporter) Shutdown(ctx context.Context) error { for _, emfPusher := range emf.listPushers() { @@ -246,6 +279,74 @@ func (emf *emfExporter) Start(ctx context.Context, host component.Host) error { return nil } +func (emf *emfExporter) pushLogsData(_ context.Context, ld plog.Logs) error { + expConfig := emf.config.(*Config) + logStream := expConfig.LogStreamName + if logStream == "" { + logStream = fmt.Sprintf("otel-stream-%s", emf.collectorID) + } + outputDestination := expConfig.OutputDestination + + for i := 0; i < ld.LogRecordCount(); i++ { + for j := 0; j < ld.ResourceLogs().At(i).ScopeLogs().Len(); j++ { + for k := 0; k < ld.ResourceLogs().At(i).ScopeLogs().At(j).LogRecords().Len(); k++ { + logEvent, wrapper, err := convertLogRecordToLogEvent(ld.ResourceLogs().At(i).ScopeLogs().At(j).LogRecords().At(k)) + if err != nil { + return err + } + if strings.EqualFold(outputDestination, outputDestinationStdout) { + fmt.Println(logEvent.InputLogEvent.Message) + } else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) { + logGroup := wrapper.Wrapper.LogGroupName + + emfPusher := emf.getPusher(logGroup, logStream) + if emfPusher != nil { + returnError := emfPusher.AddLogEntry(logEvent) + if returnError != nil { + return wrapErrorIfBadRequest(returnError) + } + } + } + } + } + } + + if strings.EqualFold(outputDestination, outputDestinationCloudWatch) { + for _, emfPusher := range emf.listPushers() { + returnError := emfPusher.ForceFlush() + if returnError != nil { + // TODO now we only have one logPusher, so it's ok to return after first error occurred + err := wrapErrorIfBadRequest(returnError) + if err != nil { + emf.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err)) + } + return err + } + } + } + + return nil +} + +func convertLogRecordToLogEvent(logRecord plog.LogRecord) (*cwlogs.Event, EmfWrapper, error) { + logJson := logRecord.Body().AsString() + + wrapper := EmfWrapper{} + err := json.Unmarshal([]byte(logJson), &wrapper) + if err != nil { + return nil, EmfWrapper{}, errors.New(fmt.Sprintf("Could not unmarshal emf log err %v", err)) + } + + logEvent := cwlogs.NewEvent( + wrapper.Wrapper.Timestamp, + logJson, + ) + // aws timestamp comes from cw logs event not the log record timestamp + // generated time tracks the time log record time + logEvent.GeneratedTime = logRecord.Timestamp().AsTime() + return logEvent, wrapper, nil +} + func wrapErrorIfBadRequest(err error) error { var rfErr awserr.RequestFailure if errors.As(err, &rfErr) && rfErr.StatusCode() < 500 { diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index c1ae4d0d30bd..941fb0779815 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -17,8 +17,11 @@ package awsemfexporter import ( "context" "errors" + "fmt" + "go.opentelemetry.io/collector/pdata/plog" "os" "testing" + "time" "github.com/aws/aws-sdk-go/aws/awserr" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" @@ -129,6 +132,62 @@ func TestConsumeMetrics(t *testing.T) { require.NoError(t, exp.Shutdown(ctx)) } +func TestConsumeLogs(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testCases := map[string]struct { + input plog.Logs + wantErr error + outputDest string + }{ + "OldTimestamp": { + input: createPLog(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`), + wantErr: errors.New("the log entry's timestamp is older than 14 days or more than 2 hours in the future"), + outputDest: "cloudwatch", + }, + "CurrentTimestamp": { + input: createPLog(fmt.Sprintf(`{"_aws":{"Timestamp":%d,"LogGroupName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`, time.Now().UnixNano()/int64(time.Millisecond))), + wantErr: nil, + outputDest: "stdout", + }, + } + for name, testCase := range testCases { + factory := NewFactory() + expCfg := factory.CreateDefaultConfig().(*Config) + expCfg.Region = "us-west-2" + expCfg.MaxRetries = 0 + exp, err := newEmfLogsExporter(expCfg, exportertest.NewNopCreateSettings()) + assert.Nil(t, err) + assert.NotNil(t, exp) + t.Run(name, func(t *testing.T) { + expCfg.OutputDestination = testCase.outputDest + err = exp.ConsumeLogs(ctx, testCase.input) + if testCase.wantErr == nil { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Equal(t, testCase.wantErr, err) + } + require.NoError(t, exp.Shutdown(ctx)) + }) + } +} + +func createPLog(log string) plog.Logs { + pLog := plog.NewLogs() + pLog. + ResourceLogs(). + AppendEmpty(). + ScopeLogs(). + AppendEmpty(). + LogRecords(). + AppendEmpty(). + Body(). + SetStr(log) + return pLog +} + func TestConsumeMetricsWithOutputDestination(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/exporter/awsemfexporter/factory.go b/exporter/awsemfexporter/factory.go index 035a70cbf5d8..d14a996e9614 100644 --- a/exporter/awsemfexporter/factory.go +++ b/exporter/awsemfexporter/factory.go @@ -36,7 +36,8 @@ func NewFactory() exporter.Factory { return exporter.NewFactory( typeStr, createDefaultConfig, - exporter.WithMetrics(createMetricsExporter, stability)) + exporter.WithMetrics(createMetricsExporter, stability), + exporter.WithLogs(createLogsExporter, stability)) } // CreateDefaultConfig creates the default configuration for exporter. @@ -62,3 +63,13 @@ func createMetricsExporter(_ context.Context, return newEmfExporter(expCfg, params) } + +// createLogsExporter creates a metrics exporter based on this config. +func createLogsExporter(_ context.Context, + params exporter.CreateSettings, + config component.Config) (exporter.Logs, error) { + + expCfg := config.(*Config) + + return newEmfLogsExporter(expCfg, params) +}