diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/AvroRecordsGenerator.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/AvroRecordsGenerator.java new file mode 100644 index 0000000000..8b52f548fb --- /dev/null +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/AvroRecordsGenerator.java @@ -0,0 +1,132 @@ +package org.opensearch.dataprepper.plugins.source.s3; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.codec.avro.AvroInputCodec; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class AvroRecordsGenerator implements RecordsGenerator { + + private static final String QUERY_STATEMENT ="select * from s3Object limit %d"; + + private int numberOfRecords = 0; + @Override + public void write(File file, int numberOfRecords) throws IOException { + this.numberOfRecords = numberOfRecords; + + Schema schema = parseSchema(); + + DatumWriter datumWriter = new SpecificDatumWriter<>(schema); + DataFileWriter dataFileWriter =new DataFileWriter<>(datumWriter); + + List recordList = generateRecords(schema); + + final OutputStream outputStream = new FileOutputStream(file); + + dataFileWriter.create(schema, outputStream); + + for(GenericRecord genericRecord: recordList) { + dataFileWriter.append(genericRecord); + } + dataFileWriter.close(); + } + + @Override + public InputCodec getCodec() { + return new AvroInputCodec(); + } + + @Override + public String getFileExtension() { + return "avro"; + } + + @Override + public void assertEventIsCorrect(final Event event) { + final String name = event.get("name", String.class); + final Integer age = event.get("age", Integer.class); + final Map innerRecord = (Map) event.get("nestedRecord", Object.class); + final String firstFieldInNestedRecord = (String) innerRecord.get("firstFieldInNestedRecord"); + final Integer secondFieldInNestedRecord = (Integer) innerRecord.get("secondFieldInNestedRecord"); + + assertThat(name, startsWith("Person")); + assertThat(age, greaterThanOrEqualTo(0)); + assertThat(innerRecord, notNullValue()); + assertThat(firstFieldInNestedRecord, startsWith("testString")); + assertThat(secondFieldInNestedRecord, greaterThanOrEqualTo(0)); + } + + @Override + public String getS3SelectExpression() { + return String.format(QUERY_STATEMENT, this.numberOfRecords); + } + + @Override + public boolean canCompress() { + return false; + } + + private List generateRecords(Schema schema) { + + List recordList = new ArrayList<>(); + + for(int rows = 0; rows < numberOfRecords; rows++){ + + GenericRecord record = new GenericData.Record(schema); + GenericRecord innerRecord = new GenericData.Record(parseInnerSchemaForNestedRecord()); + innerRecord.put("firstFieldInNestedRecord", "testString"+rows); + innerRecord.put("secondFieldInNestedRecord", rows); + + record.put("name", "Person"+rows); + record.put("age", rows); + record.put("nestedRecord", innerRecord); + recordList.add((record)); + + } + + return recordList; + + } + + private static Schema parseSchema() { + final Schema innerSchema = parseInnerSchemaForNestedRecord(); + return SchemaBuilder.record("Person") + .fields() + .name("name").type().stringType().noDefault() + .name("age").type().intType().noDefault() + .name("nestedRecord").type(innerSchema).noDefault() + .endRecord(); + } + + private static Schema parseInnerSchemaForNestedRecord(){ + return SchemaBuilder + .record("InnerRecord") + .fields() + .name("firstFieldInNestedRecord") + .type(Schema.create(Schema.Type.STRING)) + .noDefault() + .name("secondFieldInNestedRecord") + .type(Schema.create(Schema.Type.INT)) + .noDefault() + .endRecord(); + } +} diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java index f0246eab48..3ce75f2d14 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java @@ -158,7 +158,8 @@ public Stream provideArguments(final ExtensionContext conte new NewlineDelimitedRecordsGenerator(), new JsonRecordsGenerator(), new CsvRecordsGenerator(), - new ParquetRecordsGenerator()); + new ParquetRecordsGenerator(), + new AvroRecordsGenerator()); final List numberOfRecordsList = List.of(0, 1, 25, 500, 5000); final List recordsToAccumulateList = List.of(1, 100, 1000); final List booleanList = List.of(Boolean.TRUE, Boolean.FALSE);