diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java index ba64354d92..adb8ee7d3b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.function.Consumer; +import java.util.Objects; public interface InputCodec { /** @@ -31,8 +32,14 @@ public interface InputCodec { * @param eventConsumer The consumer which handles each event from the stream * @throws IOException throws IOException when invalid input is received or incorrect codec name is provided */ - void parse( + default void parse( InputFile inputFile, DecompressionEngine decompressionEngine, - Consumer> eventConsumer) throws IOException; + Consumer> eventConsumer) throws IOException { + Objects.requireNonNull(inputFile); + Objects.requireNonNull(eventConsumer); + try (InputStream inputStream = inputFile.newStream()) { + parse(decompressionEngine.createInputStream(inputStream), eventConsumer); + } + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/InputCodecTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/InputCodecTest.java new file mode 100644 index 0000000000..820c25c2d9 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/InputCodecTest.java @@ -0,0 +1,71 @@ +package org.opensearch.dataprepper.model.codec; + +import org.apache.parquet.io.SeekableInputStream; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.io.InputFile; + +import java.util.function.Consumer; +import java.io.IOException; +import java.io.InputStream; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +public class InputCodecTest { + private SeekableInputStream inputStream; + private InputFile inputFile; + private DecompressionEngine decompressionEngine; + private boolean closeCalled; + + @Test + void testParse_success() throws Exception { + InputCodec objectUnderTest = new InputCodec() { + @Override + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + + } + }; + + inputFile = mock(InputFile.class); + inputStream = mock(SeekableInputStream.class); + decompressionEngine = mock(DecompressionEngine.class); + when(inputFile.newStream()).thenReturn(inputStream); + closeCalled = false; + doAnswer(a -> { + closeCalled = true; + return null; + }).when(inputStream).close(); + when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream); + objectUnderTest.parse(inputFile, decompressionEngine, rec -> {}); + assertTrue(closeCalled); + } + + @Test + void testParse_exception() throws Exception { + InputCodec objectUnderTest = new InputCodec() { + @Override + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + throw new RuntimeException("error"); + } + }; + + inputFile = mock(InputFile.class); + inputStream = mock(SeekableInputStream.class); + decompressionEngine = mock(DecompressionEngine.class); + when(inputFile.newStream()).thenReturn(inputStream); + closeCalled = false; + doAnswer(a -> { + closeCalled = true; + return null; + }).when(inputStream).close(); + when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream); + assertThrows(RuntimeException.class, () -> objectUnderTest.parse(inputFile, decompressionEngine, rec -> {})); + assertTrue(closeCalled); + } +} diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java index 636bfdc360..e37377407f 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroInputCodec.java @@ -13,9 +13,7 @@ import org.apache.avro.file.DataFileStream; import org.apache.avro.util.Utf8; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.io.InputFile; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; @@ -47,14 +45,6 @@ public void parse(InputStream inputStream, Consumer> eventConsumer parseAvroStream(inputStream, eventConsumer); } - @Override - public void parse(final InputFile inputFile, final DecompressionEngine decompressionEngine, final Consumer> eventConsumer) throws IOException { - Objects.requireNonNull(inputFile); - Objects.requireNonNull(eventConsumer); - - parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer); - } - private void parseAvroStream(final InputStream inputStream, final Consumer> eventConsumer) { try { @@ -107,4 +97,4 @@ else if(value instanceof Utf8){ return eventData; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodec.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodec.java index 080e2b84d7..02176c3325 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodec.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodec.java @@ -12,9 +12,7 @@ import com.fasterxml.jackson.dataformat.csv.CsvSchema; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.io.InputFile; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; @@ -52,11 +50,6 @@ public void parse(final InputStream inputStream, final Consumer> e } } - @Override - public void parse(final InputFile inputFile, DecompressionEngine decompressionEngine, Consumer> eventConsumer) throws IOException { - parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer); - } - private void parseBufferedReader(final BufferedReader reader, final Consumer> eventConsumer) throws IOException { final CsvMapper mapper = createCsvMapper(); final CsvSchema schema; diff --git a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputCodec.java b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputCodec.java index 973d0bae7f..12d4f3962e 100644 --- a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputCodec.java +++ b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputCodec.java @@ -7,9 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.io.InputFile; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; @@ -48,11 +46,6 @@ public void parse(final InputStream inputStream, final Consumer> e } } - @Override - public void parse(final InputFile inputFile, final DecompressionEngine decompressionEngine, final Consumer> eventConsumer) throws IOException { - parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer); - } - private void parseBufferedReader(final BufferedReader reader, final Consumer> eventConsumer) throws IOException { final boolean doAddHeaderToOutgoingEvents = Objects.nonNull(headerDestination); boolean hasReadHeader = false; diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java index 51aacb7b1e..724787879f 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java @@ -6,32 +6,13 @@ package org.opensearch.dataprepper.plugins.codec.json; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.io.InputFile; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.codec.JsonDecoder; -import java.io.IOException; -import java.util.Objects; -import java.util.function.Consumer; - /** * An implementation of {@link InputCodec} which parses JSON Objects for arrays. */ @DataPrepperPlugin(name = "json", pluginType = InputCodec.class) public class JsonInputCodec extends JsonDecoder implements InputCodec { - @Override - public void parse( - final InputFile inputFile, - final DecompressionEngine decompressionEngine, - final Consumer> eventConsumer) throws IOException { - Objects.requireNonNull(inputFile); - Objects.requireNonNull(eventConsumer); - - parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer); - } - }