From f78f11c1619f0c34b8d26f80d88563dc9014ef9a Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Mon, 7 Oct 2024 22:03:45 -0700 Subject: [PATCH 1/6] Support otlp_json_logs codec in S3 source (#5028) Signed-off-by: Daniel Li --- .../plugins/otel/codec/OTLPJsonLogsCodec.java | 22 ++++++++++ .../otel/codec/OTLPJsonLogsDecoder.java | 41 ++++++++++++++++++ .../otel/codec/OTLPJsonLogsCodecTest.java | 42 ++++++++++++++++++ .../otel/codec/OTLPJsonLogsDecoderTest.java | 43 +++++++++++++++++++ 4 files changed, 148 insertions(+) create mode 100644 data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java create mode 100644 data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoder.java create mode 100644 data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java create mode 100644 data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java new file mode 100644 index 0000000000..f9d4bc0bfc --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; + +@DataPrepperPlugin(name = "otlp_json_logs", pluginType = InputCodec.class) +public class OTLPJsonLogsCodec extends OTLPJsonLogsDecoder implements InputCodec { + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + parse(inputStream, null, eventConsumer); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoder.java new file mode 100644 index 0000000000..954ad4927e --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoder.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.otel.codec; + +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.record.Record; + +import com.google.protobuf.util.JsonFormat; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.OpenTelemetryLog; + +import java.util.List; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.function.Consumer; +import java.time.Instant; + + +public class OTLPJsonLogsDecoder implements ByteDecoder { + private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder; + public OTLPJsonLogsDecoder() { + otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + } + public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { + Reader reader = new InputStreamReader(inputStream); + ExportLogsServiceRequest.Builder builder = ExportLogsServiceRequest.newBuilder(); + JsonFormat.parser().merge(reader, builder); + ExportLogsServiceRequest request = builder.build(); + List logs = otelProtoDecoder.parseExportLogsServiceRequest(request, timeReceivedMs); + for (OpenTelemetryLog log: logs) { + eventConsumer.accept(new Record<>(log)); + } + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java new file mode 100644 index 0000000000..8b337c178d --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java @@ -0,0 +1,42 @@ +package org.opensearch.dataprepper.plugins.otel.codec; + +import java.io.InputStream; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.log.OpenTelemetryLog; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class OTLPJsonLogsCodecTest { + private static final String TEST_REQUEST_LOGS_FILE = "test-request-multiple-logs.json"; + + public OTLPJsonLogsCodec createObjectUnderTest() { + return new OTLPJsonLogsCodec(); + } + + private void validateLog(OpenTelemetryLog logRecord) { + assertThat(logRecord.getServiceName(), is("service")); + assertThat(logRecord.getTime(), is("2020-05-24T14:00:00Z")); + assertThat(logRecord.getObservedTime(), is("2020-05-24T14:00:02Z")); + assertThat(logRecord.getBody(), is("Log value")); + assertThat(logRecord.getDroppedAttributesCount(), is(3)); + assertThat(logRecord.getSchemaUrl(), is("schemaurl")); + assertThat(logRecord.getSeverityNumber(), is(5)); + assertThat(logRecord.getSeverityText(), is("Severity value")); + assertThat(logRecord.getTraceId(), is("ba1a1c23b4093b63")); + assertThat(logRecord.getSpanId(), is("2cc83ac90ebc469c")); + Map mergedAttributes = logRecord.getAttributes(); + assertThat(mergedAttributes.keySet().size(), is(2)); + assertThat(mergedAttributes.get("log.attributes.statement@params"), is("us-east-1")); + assertThat(mergedAttributes.get("resource.attributes.service@name"), is("service")); + } + + @Test + public void testParse() throws Exception { + InputStream inputStream = OTLPJsonLogsCodecTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_LOGS_FILE); + createObjectUnderTest().parse(inputStream, (record) -> { + validateLog((OpenTelemetryLog)record.getData()); + }); + + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java new file mode 100644 index 0000000000..0d168a4006 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java @@ -0,0 +1,43 @@ +package org.opensearch.dataprepper.plugins.otel.codec; + +import java.io.InputStream; +import java.time.Instant; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.log.OpenTelemetryLog; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class OTLPJsonLogsDecoderTest { + private static final String TEST_REQUEST_LOGS_FILE = "test-request-multiple-logs.json"; + + public OTLPJsonLogsDecoder createObjectUnderTest() { + return new OTLPJsonLogsDecoder(); + } + + private void validateLog(OpenTelemetryLog logRecord) { + assertThat(logRecord.getServiceName(), is("service")); + assertThat(logRecord.getTime(), is("2020-05-24T14:00:00Z")); + assertThat(logRecord.getObservedTime(), is("2020-05-24T14:00:02Z")); + assertThat(logRecord.getBody(), is("Log value")); + assertThat(logRecord.getDroppedAttributesCount(), is(3)); + assertThat(logRecord.getSchemaUrl(), is("schemaurl")); + assertThat(logRecord.getSeverityNumber(), is(5)); + assertThat(logRecord.getSeverityText(), is("Severity value")); + assertThat(logRecord.getTraceId(), is("ba1a1c23b4093b63")); + assertThat(logRecord.getSpanId(), is("2cc83ac90ebc469c")); + Map mergedAttributes = logRecord.getAttributes(); + assertThat(mergedAttributes.keySet().size(), is(2)); + assertThat(mergedAttributes.get("log.attributes.statement@params"), is("us-east-1")); + assertThat(mergedAttributes.get("resource.attributes.service@name"), is("service")); + } + + @Test + public void testParse() throws Exception { + InputStream inputStream = OTLPJsonLogsCodecTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_LOGS_FILE); + createObjectUnderTest().parse(inputStream, Instant.now(), (record) -> { + validateLog((OpenTelemetryLog)record.getData()); + }); + + } +} \ No newline at end of file From a67c31ac0a12620c5c272e797a090baa04e702c7 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Mon, 7 Oct 2024 22:32:25 -0700 Subject: [PATCH 2/6] Add copyright info (#5028) Signed-off-by: Daniel Li --- .../plugins/otel/codec/OTLPJsonLogsCodecTest.java | 5 +++++ .../plugins/otel/codec/OTLPJsonLogsDecoderTest.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java index 8b337c178d..2b0092bc37 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.otel.codec; import java.io.InputStream; diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java index 0d168a4006..2446ea9d90 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.otel.codec; import java.io.InputStream; From e3c5b915be17563805f25a7259541ddea716da2a Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Tue, 8 Oct 2024 07:40:12 -0700 Subject: [PATCH 3/6] Update class loader (#5028) Signed-off-by: Daniel Li --- .../dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java index 2446ea9d90..ba87509a5d 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java @@ -39,7 +39,7 @@ private void validateLog(OpenTelemetryLog logRecord) { @Test public void testParse() throws Exception { - InputStream inputStream = OTLPJsonLogsCodecTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_LOGS_FILE); + InputStream inputStream = OTLPJsonLogsDecoderTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_LOGS_FILE); createObjectUnderTest().parse(inputStream, Instant.now(), (record) -> { validateLog((OpenTelemetryLog)record.getData()); }); From ac291d6e290852c4a181bb662dc7928ea7312b47 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Tue, 8 Oct 2024 07:59:25 -0700 Subject: [PATCH 4/6] Change the codec's name to opentelemetry_logs (#5028) Signed-off-by: Daniel Li --- .../dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java index f9d4bc0bfc..6d2227afa3 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java @@ -14,7 +14,7 @@ import java.io.InputStream; import java.util.function.Consumer; -@DataPrepperPlugin(name = "otlp_json_logs", pluginType = InputCodec.class) +@DataPrepperPlugin(name = "opentelemetry_logs", pluginType = InputCodec.class) public class OTLPJsonLogsCodec extends OTLPJsonLogsDecoder implements InputCodec { public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { parse(inputStream, null, eventConsumer); From ae110fd4f07e9625b9fe6fc66d5fdc11578ad0f8 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Tue, 8 Oct 2024 18:31:35 -0700 Subject: [PATCH 5/6] Change the codec's name to otel_logs (#5028) Signed-off-by: Daniel Li --- .../plugins/otel/codec/OTLPJsonLogsCodec.java | 22 ------------ .../otel/codec/OTelLogsInputCodec.java | 34 +++++++++++++++++++ .../otel/codec/OTelLogsInputCodecConfig.java | 34 +++++++++++++++++++ ...sDecoder.java => OTelLogsJsonDecoder.java} | 8 ++--- ...cTest.java => OTelLogsInputCodecTest.java} | 26 ++++++++++---- ...Test.java => OTelLogsJsonDecoderTest.java} | 8 ++--- 6 files changed, 96 insertions(+), 36 deletions(-) delete mode 100644 data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java create mode 100644 data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodec.java create mode 100644 data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecConfig.java rename data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/{OTLPJsonLogsDecoder.java => OTelLogsJsonDecoder.java} (93%) rename data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/{OTLPJsonLogsCodecTest.java => OTelLogsInputCodecTest.java} (72%) rename data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/{OTLPJsonLogsDecoderTest.java => OTelLogsJsonDecoderTest.java} (90%) diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java deleted file mode 100644 index 6d2227afa3..0000000000 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodec.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.otel.codec; - -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; - -import java.io.IOException; -import java.io.InputStream; -import java.util.function.Consumer; - -@DataPrepperPlugin(name = "opentelemetry_logs", pluginType = InputCodec.class) -public class OTLPJsonLogsCodec extends OTLPJsonLogsDecoder implements InputCodec { - public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { - parse(inputStream, null, eventConsumer); - } -} \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodec.java new file mode 100644 index 0000000000..e74752f184 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodec.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.function.Consumer; + +@DataPrepperPlugin(name = "otel_logs", pluginType = InputCodec.class, pluginConfigurationType = OTelLogsInputCodecConfig.class) +public class OTelLogsInputCodec implements InputCodec { + private final OTelLogsInputCodecConfig config; + + @DataPrepperPluginConstructor + public OTelLogsInputCodec(final OTelLogsInputCodecConfig config) { + Objects.requireNonNull(config); + this.config = config; + } + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + if (OTelLogsInputCodecConfig.JSON_FORMAT.equals(config.getFormat())) { + OTelLogsJsonDecoder decoder = new OTelLogsJsonDecoder(); + decoder.parse(inputStream, null, eventConsumer); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecConfig.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecConfig.java new file mode 100644 index 0000000000..8d7f944f8e --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecConfig.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; + +/** + * Configuration class for {@link OTelLogsInputCodec}. + */ +public class OTelLogsInputCodecConfig { + static final String JSON_FORMAT = "json"; + + @JsonProperty("format") + private String format = JSON_FORMAT; + + /** + * The format of the OTel logs. + * "json" by default. + * @return The OTel logs format. + */ + public String getFormat() { + return format; + } + + @AssertTrue(message = "format must be json.") + boolean isValidFormat() { + return JSON_FORMAT.equals(format); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsJsonDecoder.java similarity index 93% rename from data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoder.java rename to data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsJsonDecoder.java index 954ad4927e..219ba3376a 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoder.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsJsonDecoder.java @@ -21,12 +21,13 @@ import java.util.function.Consumer; import java.time.Instant; - -public class OTLPJsonLogsDecoder implements ByteDecoder { +public class OTelLogsJsonDecoder implements ByteDecoder { private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder; - public OTLPJsonLogsDecoder() { + + public OTelLogsJsonDecoder() { otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); } + public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { Reader reader = new InputStreamReader(inputStream); ExportLogsServiceRequest.Builder builder = ExportLogsServiceRequest.newBuilder(); @@ -37,5 +38,4 @@ public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer(log)); } } - } \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecTest.java similarity index 72% rename from data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java rename to data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecTest.java index 2b0092bc37..353756d781 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsCodecTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecTest.java @@ -7,16 +7,30 @@ import java.io.InputStream; import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; import org.opensearch.dataprepper.model.log.OpenTelemetryLog; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -public class OTLPJsonLogsCodecTest { +public class OTelLogsInputCodecTest { private static final String TEST_REQUEST_LOGS_FILE = "test-request-multiple-logs.json"; - - public OTLPJsonLogsCodec createObjectUnderTest() { - return new OTLPJsonLogsCodec(); + + @Mock + private OTelLogsInputCodecConfig config; + @Mock + private OTelLogsInputCodec otelLogsCodec; + + @BeforeEach + void setup() { + config = new OTelLogsInputCodecConfig(); + otelLogsCodec = createObjectUnderTest(); + } + + public OTelLogsInputCodec createObjectUnderTest() { + return new OTelLogsInputCodec(config); } private void validateLog(OpenTelemetryLog logRecord) { @@ -38,8 +52,8 @@ private void validateLog(OpenTelemetryLog logRecord) { @Test public void testParse() throws Exception { - InputStream inputStream = OTLPJsonLogsCodecTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_LOGS_FILE); - createObjectUnderTest().parse(inputStream, (record) -> { + InputStream inputStream = OTelLogsInputCodecTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_LOGS_FILE); + otelLogsCodec.parse(inputStream, (record) -> { validateLog((OpenTelemetryLog)record.getData()); }); diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsJsonDecoderTest.java similarity index 90% rename from data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java rename to data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsJsonDecoderTest.java index ba87509a5d..0773bd0f48 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTLPJsonLogsDecoderTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsJsonDecoderTest.java @@ -13,11 +13,11 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -public class OTLPJsonLogsDecoderTest { +public class OTelLogsJsonDecoderTest { private static final String TEST_REQUEST_LOGS_FILE = "test-request-multiple-logs.json"; - public OTLPJsonLogsDecoder createObjectUnderTest() { - return new OTLPJsonLogsDecoder(); + public OTelLogsJsonDecoder createObjectUnderTest() { + return new OTelLogsJsonDecoder(); } private void validateLog(OpenTelemetryLog logRecord) { @@ -39,7 +39,7 @@ private void validateLog(OpenTelemetryLog logRecord) { @Test public void testParse() throws Exception { - InputStream inputStream = OTLPJsonLogsDecoderTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_LOGS_FILE); + InputStream inputStream = OTelLogsJsonDecoderTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_LOGS_FILE); createObjectUnderTest().parse(inputStream, Instant.now(), (record) -> { validateLog((OpenTelemetryLog)record.getData()); }); From dfeeb97c2c83152483d675a9b4a9242d336e32b6 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Fri, 11 Oct 2024 09:47:16 -0700 Subject: [PATCH 6/6] Add documentation and otel_logs format option enum (#5028) Signed-off-by: Daniel Li --- .../otel/codec/OTelLogsFormatOption.java | 40 ++++++++++++++ .../otel/codec/OTelLogsInputCodec.java | 2 +- .../otel/codec/OTelLogsInputCodecConfig.java | 36 +++++++------ .../otel/codec/OTelLogsFormatOptionTest.java | 52 +++++++++++++++++++ 4 files changed, 112 insertions(+), 18 deletions(-) create mode 100644 data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsFormatOption.java create mode 100644 data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsFormatOptionTest.java diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsFormatOption.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsFormatOption.java new file mode 100644 index 0000000000..aa75e2f44d --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsFormatOption.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum OTelLogsFormatOption { + JSON("json"); + + private static final Map NAMES_MAP = Arrays.stream(OTelLogsFormatOption.values()) + .collect(Collectors.toMap( + value -> value.optionName, + value -> value + )); + + private final String optionName; + + OTelLogsFormatOption(final String optionName) { + this.optionName = optionName; + } + + @JsonValue + public String getFormatName() { + return optionName; + } + + @JsonCreator + public static OTelLogsFormatOption fromFormatName(final String optionName) { + return NAMES_MAP.get(optionName); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodec.java index e74752f184..060d911e85 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodec.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodec.java @@ -26,7 +26,7 @@ public OTelLogsInputCodec(final OTelLogsInputCodecConfig config) { this.config = config; } public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { - if (OTelLogsInputCodecConfig.JSON_FORMAT.equals(config.getFormat())) { + if (config.getFormat() == OTelLogsFormatOption.JSON) { OTelLogsJsonDecoder decoder = new OTelLogsJsonDecoder(); decoder.parse(inputStream, null, eventConsumer); } diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecConfig.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecConfig.java index 8d7f944f8e..d3267d3e76 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecConfig.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsInputCodecConfig.java @@ -6,29 +6,31 @@ package org.opensearch.dataprepper.plugins.otel.codec; import com.fasterxml.jackson.annotation.JsonProperty; + import jakarta.validation.constraints.AssertTrue; - -/** - * Configuration class for {@link OTelLogsInputCodec}. - */ -public class OTelLogsInputCodecConfig { - static final String JSON_FORMAT = "json"; +import jakarta.validation.constraints.NotNull; + +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonClassDescription; +@JsonPropertyOrder +@JsonClassDescription("The otel_logs codec parses log files in S3 that follow the OpenTelemetry Protocol Specification. " + + "It creates a Data Prepper log event for each log record along with the resource attributes in the file.") +public class OTelLogsInputCodecConfig { + static final OTelLogsFormatOption DEFAULT_FORMAT = OTelLogsFormatOption.JSON; + @JsonProperty("format") - private String format = JSON_FORMAT; + @JsonPropertyDescription("Specifies the format of the OTel logs. Default is json.") + @NotNull + private OTelLogsFormatOption format = DEFAULT_FORMAT; - /** - * The format of the OTel logs. - * "json" by default. - * @return The OTel logs format. - */ - public String getFormat() { + public OTelLogsFormatOption getFormat() { return format; } - + @AssertTrue(message = "format must be json.") boolean isValidFormat() { - return JSON_FORMAT.equals(format); - } - + return format != null; + } } \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsFormatOptionTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsFormatOptionTest.java new file mode 100644 index 0000000000..29e63c37eb --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsFormatOptionTest.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyString; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class OTelLogsFormatOptionTest { + @ParameterizedTest + @EnumSource(OTelLogsFormatOption.class) + void fromFormatName_returns_expected_value(final OTelLogsFormatOption formatOption) { + assertThat(OTelLogsFormatOption.fromFormatName(formatOption.getFormatName()), equalTo(formatOption)); + } + + @ParameterizedTest + @EnumSource(OTelLogsFormatOption.class) + void getFormatName_returns_non_empty_string_for_all_types(final OTelLogsFormatOption formatOption) { + assertThat(formatOption.getFormatName(), notNullValue()); + assertThat(formatOption.getFormatName(), not(emptyString())); + } + + @ParameterizedTest + @ArgumentsSource(OTelLogsFormatOptionToKnownName.class) + void getFormatName_returns_expected_name(final OTelLogsFormatOption formatOption, final String expectedString) { + assertThat(formatOption.getFormatName(), equalTo(expectedString)); + } + + static class OTelLogsFormatOptionToKnownName implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext extensionContext) { + return Stream.of( + arguments(OTelLogsFormatOption.JSON, "json") + ); + } + } +}