diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AbstractAvroEventConverterTemplate.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AbstractAvroEventConverterTemplate.java new file mode 100644 index 0000000000..6e503bf092 --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AbstractAvroEventConverterTemplate.java @@ -0,0 +1,70 @@ +package org.opensearch.dataprepper.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.util.List; +import java.util.Map; + +abstract class AbstractAvroEventConverterTemplate implements AvroEventConverter { + private final SchemaChooser schemaChooser; + + protected AbstractAvroEventConverterTemplate(final SchemaChooser schemaChooser) { + this.schemaChooser = schemaChooser; + } + + @Override + public GenericRecord convertEventDataToAvro(final Schema schema, + final Map eventData, + final OutputCodecContext codecContext) { + return convertEventDataToAvro(schema, eventData, codecContext, true); + } + + private GenericRecord convertEventDataToAvro(final Schema schema, + final Map eventData, + final OutputCodecContext codecContext, + boolean rootOfData) { + final GenericRecord avroRecord = new GenericData.Record(schema); + + for (String key : getKeyNames(schema, eventData, codecContext, rootOfData)) { + final Schema.Field field = schema.getField(key); + if (field == null) { + throw new RuntimeException("The event has a key ('" + key + "') which is not included in the schema."); + } + final Object value = schemaMapper(field, eventData.get(key), codecContext); + avroRecord.put(key, value); + } + + return avroRecord; + } + + + private Object schemaMapper(final Schema.Field field, final Object rawValue, OutputCodecContext codecContext) { + Schema providedSchema = schemaChooser.chooseSchema(field.schema()); + + if (providedSchema.getType() == Schema.Type.RECORD && rawValue instanceof Map) { + return convertEventDataToAvro(providedSchema, (Map) rawValue, codecContext, false); + } else if (providedSchema.getType() == Schema.Type.ARRAY && rawValue instanceof List) { + GenericData.Array avroArray = + new GenericData.Array<>(((List) rawValue).size(), providedSchema); + for (Object element : ((List) rawValue)) { + avroArray.add(element); + } + return avroArray; + } + return rawValue; + } + + /** + * Template method to get key names for a given object. + * + * @param schema The Avro schema + * @param eventData Current event data + * @param codecContext The {@link OutputCodecContext} + * @param rootOfData True, if this is the root of the data. False when this is nested. + * @return An {@Iterable} of key names. + */ + abstract Iterable getKeyNames(Schema schema, Map eventData, OutputCodecContext codecContext, boolean rootOfData); +} diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroEventConverter.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroEventConverter.java index 9bb6d5b009..dde8a96b61 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroEventConverter.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroEventConverter.java @@ -1,11 +1,9 @@ package org.opensearch.dataprepper.avro; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.opensearch.dataprepper.model.sink.OutputCodecContext; -import java.util.List; import java.util.Map; /** @@ -13,48 +11,16 @@ *

* It might be a good idea to consolidate similar logic for input. */ -public class AvroEventConverter { - private final SchemaChooser schemaChooser; - - public AvroEventConverter() { - this(new SchemaChooser()); - } - - AvroEventConverter(final SchemaChooser schemaChooser) { - this.schemaChooser = schemaChooser; - } - - public GenericRecord convertEventDataToAvro(final Schema schema, - final Map eventData, - OutputCodecContext codecContext) { - final GenericRecord avroRecord = new GenericData.Record(schema); - for (final String key : eventData.keySet()) { - if (codecContext != null && codecContext.shouldNotIncludeKey(key)) { - continue; - } - final Schema.Field field = schema.getField(key); - if (field == null) { - throw new RuntimeException("The event has a key ('" + key + "') which is not included in the schema."); - } - final Object value = schemaMapper(field, eventData.get(key), codecContext); - avroRecord.put(key, value); - } - return avroRecord; - } - - private Object schemaMapper(final Schema.Field field, final Object rawValue, OutputCodecContext codecContext) { - Schema providedSchema = schemaChooser.chooseSchema(field.schema()); - - if (providedSchema.getType() == Schema.Type.RECORD && rawValue instanceof Map) { - return convertEventDataToAvro(providedSchema, (Map) rawValue, codecContext); - } else if (providedSchema.getType() == Schema.Type.ARRAY && rawValue instanceof List) { - GenericData.Array avroArray = - new GenericData.Array<>(((List) rawValue).size(), providedSchema); - for (String element : ((List) rawValue)) { - avroArray.add(element); - } - return avroArray; - } - return rawValue; - } +public interface AvroEventConverter { + /** + * Converts event data into an Avro record. + * + * @param schema The defined Avro schema + * @param eventData The event data; may include tags + * @param codecContext The output codec context which may define values included/excluded. + * @return The generated Avro {@link GenericRecord}. + */ + GenericRecord convertEventDataToAvro(final Schema schema, + final Map eventData, + final OutputCodecContext codecContext); } diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/EventDefinedAvroEventConverter.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/EventDefinedAvroEventConverter.java new file mode 100644 index 0000000000..a414814cd7 --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/EventDefinedAvroEventConverter.java @@ -0,0 +1,39 @@ +package org.opensearch.dataprepper.avro; + +import org.apache.avro.Schema; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * Converts an Event into an Avro record. + *

+ * This implementation utilizes the Event data first to populate the Avro record. Thus, + * it will fail if the Event has any fields not in the schema. + */ +public class EventDefinedAvroEventConverter extends AbstractAvroEventConverterTemplate { + public EventDefinedAvroEventConverter() { + this(new SchemaChooser()); + } + + EventDefinedAvroEventConverter(final SchemaChooser schemaChooser) { + super(schemaChooser); + } + + @Override + Iterable getKeyNames(Schema schema, Map eventData, OutputCodecContext codecContext, boolean rootOfData) { + Set keySet = eventData.keySet(); + + if(codecContext == null || !rootOfData) { + return keySet; + } + else { + return keySet.stream() + .filter(Predicate.not(codecContext::shouldNotIncludeKey)) + .collect(Collectors.toList()); + } + } +} diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaDefinedAvroEventConverter.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaDefinedAvroEventConverter.java new file mode 100644 index 0000000000..f1e0f7bece --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaDefinedAvroEventConverter.java @@ -0,0 +1,31 @@ +package org.opensearch.dataprepper.avro; + +import org.apache.avro.Schema; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Converts an Event into an Avro record. + *

+ * This implementation relies on the defined schema. Thus, any fields in the Event which + * are not in the schema will be ignored. + */ +public class SchemaDefinedAvroEventConverter extends AbstractAvroEventConverterTemplate { + public SchemaDefinedAvroEventConverter() { + this(new SchemaChooser()); + } + + SchemaDefinedAvroEventConverter(final SchemaChooser schemaChooser) { + super(schemaChooser); + } + + @Override + Iterable getKeyNames(Schema schema, Map eventData, OutputCodecContext codecContext, boolean rootOfData) { + return schema.getFields() + .stream() + .map(Schema.Field::name) + .collect(Collectors.toList()); + } +} diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java index 60ad4b570a..ef6d9bce7d 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java @@ -11,6 +11,8 @@ import org.apache.avro.io.DatumWriter; import org.opensearch.dataprepper.avro.AvroAutoSchemaGenerator; import org.opensearch.dataprepper.avro.AvroEventConverter; +import org.opensearch.dataprepper.avro.EventDefinedAvroEventConverter; +import org.opensearch.dataprepper.avro.SchemaDefinedAvroEventConverter; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; @@ -47,11 +49,13 @@ public AvroOutputCodec(final AvroOutputCodecConfig config) { Objects.requireNonNull(config); this.config = config; - avroEventConverter = new AvroEventConverter(); avroAutoSchemaGenerator = new AvroAutoSchemaGenerator(); if (config.getSchema() != null) { schema = parseSchema(config.getSchema()); + avroEventConverter = new SchemaDefinedAvroEventConverter(); + } else { + avroEventConverter = new EventDefinedAvroEventConverter(); } } diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroEventConverterTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroEventConverterTest.java deleted file mode 100644 index 7f134b4c21..0000000000 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroEventConverterTest.java +++ /dev/null @@ -1,110 +0,0 @@ -package org.opensearch.dataprepper.avro; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ArgumentsSource; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.model.sink.OutputCodecContext; - -import java.util.Collections; -import java.util.Map; -import java.util.UUID; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class AvroEventConverterTest { - @Mock - private SchemaChooser schemaChooser; - @Mock(lenient = true) - private Schema schema; - - @Mock - private OutputCodecContext codecContext; - - @BeforeEach - void setUp() { - when(schema.getType()).thenReturn(Schema.Type.RECORD); - } - - - private AvroEventConverter createObjectUnderTest() { - return new AvroEventConverter(schemaChooser); - } - - @ParameterizedTest - @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) - void convertEventDataToAvro_does_not_need_to_getField_on_empty_map() { - Map data = Collections.emptyMap(); - GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); - - assertThat(actualRecord, notNullValue()); - assertThat(actualRecord.getSchema(), equalTo(schema)); - - verify(schema, never()).getField(anyString()); - } - - @Nested - class WithField { - - private String fieldName; - @Mock(lenient = true) - private Schema.Field field; - @Mock - private Schema fieldSchema; - - @BeforeEach - void setUp() { - fieldName = UUID.randomUUID().toString(); - when(schema.getField(fieldName)).thenReturn(field); - when(schema.getFields()).thenReturn(Collections.singletonList(field)); - when(field.schema()).thenReturn(fieldSchema); - when(field.pos()).thenReturn(0); - } - - @ParameterizedTest - @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) - void convertEventDataToAvro_adds_fields(Object value, Schema.Type expectedType) { - Map data = Map.of(fieldName, value); - when(fieldSchema.getType()).thenReturn(expectedType); - - when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); - - GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); - - assertThat(actualRecord, notNullValue()); - assertThat(actualRecord.getSchema(), equalTo(schema)); - - assertThat(actualRecord.get(fieldName), notNullValue()); - assertThat(actualRecord.get(fieldName), instanceOf(value.getClass())); - assertThat(actualRecord.get(fieldName), equalTo(value)); - } - - @ParameterizedTest - @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) - void convertEventDataToAvro_skips_files_if_should_not_include(Object value, Schema.Type expectedType) { - Map data = Map.of(fieldName, value); - when(codecContext.shouldNotIncludeKey(fieldName)).thenReturn(true); - - GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); - - assertThat(actualRecord, notNullValue()); - assertThat(actualRecord.getSchema(), equalTo(schema)); - - assertThat(actualRecord.get(fieldName), nullValue()); - } - } -} \ No newline at end of file diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/EventDefinedAvroEventConverterTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/EventDefinedAvroEventConverterTest.java new file mode 100644 index 0000000000..e03b59d99a --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/EventDefinedAvroEventConverterTest.java @@ -0,0 +1,491 @@ +package org.opensearch.dataprepper.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class EventDefinedAvroEventConverterTest { + @Mock + private SchemaChooser schemaChooser; + @Mock(lenient = true) + private Schema schema; + private OutputCodecContext codecContext = null; + + @BeforeEach + void setUp() { + when(schema.getType()).thenReturn(Schema.Type.RECORD); + } + + private EventDefinedAvroEventConverter createObjectUnderTest() { + return new EventDefinedAvroEventConverter(schemaChooser); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_does_not_need_to_getField_on_empty_map() { + Map data = Collections.emptyMap(); + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + verify(schema, never()).getField(anyString()); + } + + @Nested + class WithPrimitiveField { + + private String fieldName; + @Mock(lenient = true) + private Schema.Field field; + @Mock + private Schema fieldSchema; + + @BeforeEach + void setUp() { + fieldName = UUID.randomUUID().toString(); + when(schema.getField(fieldName)).thenReturn(field); + when(schema.getFields()).thenReturn(Collections.singletonList(field)); + when(field.name()).thenReturn(fieldName); + when(field.schema()).thenReturn(fieldSchema); + when(field.pos()).thenReturn(0); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_adds_fields(Object value, Schema.Type expectedType) { + Map data = Map.of(fieldName, value); + when(fieldSchema.getType()).thenReturn(expectedType); + + when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), notNullValue()); + assertThat(actualRecord.get(fieldName), instanceOf(value.getClass())); + assertThat(actualRecord.get(fieldName), equalTo(value)); + } + + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_non_present_fields(Object value, Schema.Type expectedType) { + Map data = Collections.emptyMap(); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_null_values(Object value, Schema.Type expectedType) { + Map data = Collections.singletonMap(fieldName, null); + when(fieldSchema.getType()).thenReturn(expectedType); + + when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_fields_if_should_not_include_when_not_in_schema(Object value, Schema.Type expectedType) { + when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); + + String nonFieldKey = randomAvroName(); + Map data = Map.of( + fieldName, value, + nonFieldKey, value + ); + + codecContext = mock(OutputCodecContext.class); + when(codecContext.shouldNotIncludeKey(fieldName)).thenReturn(false); + when(codecContext.shouldNotIncludeKey(nonFieldKey)).thenReturn(true); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), notNullValue()); + assertThat(actualRecord.get(fieldName), instanceOf(value.getClass())); + assertThat(actualRecord.get(fieldName), equalTo(value)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_fields_if_should_not_include_when_in_schema(Object value, Schema.Type expectedType) { + Map data = Map.of(fieldName, value); + + codecContext = mock(OutputCodecContext.class); + when(codecContext.shouldNotIncludeKey(fieldName)).thenReturn(true); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_throws_on_fields_which_are_not_defined_in_the_schema(Object value, Schema.Type expectedType) { + lenient().when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); + + String nonFieldKey = randomAvroName(); + Map data = Map.of( + fieldName, value, + nonFieldKey, value + ); + + codecContext = mock(OutputCodecContext.class); + + EventDefinedAvroEventConverter objectUnderTest = createObjectUnderTest(); + RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.convertEventDataToAvro(schema, data, codecContext)); + + assertThat(actualException.getMessage(), notNullValue()); + assertThat(actualException.getMessage(), containsString(nonFieldKey)); + } + } + + @Nested + class WithRecordField { + private String recordFieldName; + private String nestedFieldName; + @Mock(lenient = true) + private Schema.Field nestedField; + @Mock + private Schema nestedFieldSchema; + @Mock + private Schema recordFieldSchema; + + @BeforeEach + void setUp() { + recordFieldName = randomAvroName(); + nestedFieldName = randomAvroName(); + + when(nestedField.name()).thenReturn(nestedFieldName); + when(nestedField.schema()).thenReturn(nestedFieldSchema); + when(nestedField.pos()).thenReturn(0); + + lenient().when(recordFieldSchema.getType()).thenReturn(Schema.Type.RECORD); + lenient().when(recordFieldSchema.getField(nestedFieldName)).thenReturn(nestedField); + lenient().when(recordFieldSchema.getFields()).thenReturn(Collections.singletonList(nestedField)); + + Schema.Field recordField = mock(Schema.Field.class); + when(schema.getField(recordFieldName)).thenReturn(recordField); + when(schema.getFields()).thenReturn(Collections.singletonList(recordField)); + + lenient().when(recordField.schema()).thenReturn(recordFieldSchema); + lenient().when(recordField.pos()).thenReturn(0); + + lenient().when(schemaChooser.chooseSchema(recordFieldSchema)).thenReturn(recordFieldSchema); + lenient().when(schemaChooser.chooseSchema(nestedFieldSchema)).thenReturn(nestedFieldSchema); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_adds_nested_fields(Object value, Schema.Type expectedType) { + Map data = Map.of(recordFieldName, Map.of(nestedFieldName, value)); + when(nestedFieldSchema.getType()).thenReturn(expectedType); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), notNullValue()); + assertThat(actualRecord.get(recordFieldName), instanceOf(GenericData.Record.class)); + GenericData.Record actualSubRecord = (GenericData.Record) actualRecord.get(recordFieldName); + + assertThat(actualSubRecord.getSchema(), notNullValue()); + assertThat(actualSubRecord.getSchema(), equalTo(recordFieldSchema)); + assertThat(actualSubRecord.getSchema().getType(), equalTo(Schema.Type.RECORD)); + assertThat(actualSubRecord.get(nestedFieldName), notNullValue()); + assertThat(actualSubRecord.get(nestedFieldName), instanceOf(value.getClass())); + assertThat(actualSubRecord.get(nestedFieldName), equalTo(value)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_non_present_records(Object value, Schema.Type expectedType) { + Map data = Collections.emptyMap(); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_null_record(Object value, Schema.Type expectedType) { + Map data = Collections.singletonMap(recordFieldName, null); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_non_present_nested_fields(Object value, Schema.Type expectedType) { + Map data = Map.of(recordFieldName, Collections.emptyMap()); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), notNullValue()); + assertThat(actualRecord.get(recordFieldName), instanceOf(GenericData.Record.class)); + GenericData.Record actualSubRecord = (GenericData.Record) actualRecord.get(recordFieldName); + + assertThat(actualSubRecord.getSchema(), notNullValue()); + assertThat(actualSubRecord.getSchema(), equalTo(recordFieldSchema)); + assertThat(actualSubRecord.getSchema().getType(), equalTo(Schema.Type.RECORD)); + assertThat(actualSubRecord.get(nestedFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_null_nested_fields(Object value, Schema.Type expectedType) { + Map data = Map.of(recordFieldName, Collections.singletonMap(nestedFieldName, null)); + when(nestedFieldSchema.getType()).thenReturn(expectedType); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), notNullValue()); + assertThat(actualRecord.get(recordFieldName), instanceOf(GenericData.Record.class)); + GenericData.Record actualSubRecord = (GenericData.Record) actualRecord.get(recordFieldName); + + assertThat(actualSubRecord.getSchema(), notNullValue()); + assertThat(actualSubRecord.getSchema(), equalTo(recordFieldSchema)); + assertThat(actualSubRecord.getSchema().getType(), equalTo(Schema.Type.RECORD)); + assertThat(actualSubRecord.get(nestedFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_does_not_skip_nested_fields_because_include_keys_are_not_paths(Object value, Schema.Type expectedType) { + Map data = Map.of(recordFieldName, Map.of(nestedFieldName, value)); + when(nestedFieldSchema.getType()).thenReturn(expectedType); + + codecContext = mock(OutputCodecContext.class); + lenient().when(codecContext.shouldNotIncludeKey(nestedFieldName)).thenReturn(true); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), notNullValue()); + assertThat(actualRecord.get(recordFieldName), instanceOf(GenericData.Record.class)); + GenericData.Record actualSubRecord = (GenericData.Record) actualRecord.get(recordFieldName); + + assertThat(actualSubRecord.getSchema(), notNullValue()); + assertThat(actualSubRecord.getSchema(), equalTo(recordFieldSchema)); + assertThat(actualSubRecord.getSchema().getType(), equalTo(Schema.Type.RECORD)); + assertThat(actualSubRecord.get(nestedFieldName), notNullValue()); + assertThat(actualSubRecord.get(nestedFieldName), instanceOf(value.getClass())); + assertThat(actualSubRecord.get(nestedFieldName), equalTo(value)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_throws_on_fields_which_are_not_defined_in_the_schema(Object value, Schema.Type expectedType) { + String nonFieldKey = UUID.randomUUID().toString(); + Map data = Map.of( + recordFieldName, + Map.of( + nestedFieldName, value, + nonFieldKey, value + )); + + codecContext = mock(OutputCodecContext.class); + when(codecContext.shouldNotIncludeKey(nonFieldKey)).thenReturn(true); + + EventDefinedAvroEventConverter objectUnderTest = createObjectUnderTest(); + RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.convertEventDataToAvro(schema, data, codecContext)); + + assertThat(actualException.getMessage(), notNullValue()); + assertThat(actualException.getMessage(), containsString(nonFieldKey)); + } + } + + @Nested + class WithArrayField { + private String arrayFieldName; + private String nestedFieldName; + @Mock(lenient = true) + private Schema.Field nestedField; + @Mock + private Schema nestedFieldSchema; + @Mock + private Schema arrayFieldSchema; + + @BeforeEach + void setUp() { + arrayFieldName = randomAvroName(); + nestedFieldName = randomAvroName(); + + when(nestedField.name()).thenReturn(nestedFieldName); + when(nestedField.schema()).thenReturn(nestedFieldSchema); + when(nestedField.pos()).thenReturn(0); + + lenient().when(arrayFieldSchema.getType()).thenReturn(Schema.Type.ARRAY); + + Schema.Field recordField = mock(Schema.Field.class); + when(schema.getField(arrayFieldName)).thenReturn(recordField); + when(schema.getFields()).thenReturn(Collections.singletonList(recordField)); + + lenient().when(recordField.schema()).thenReturn(arrayFieldSchema); + when(recordField.pos()).thenReturn(0); + + lenient().when(schemaChooser.chooseSchema(arrayFieldSchema)).thenReturn(arrayFieldSchema); + lenient().when(schemaChooser.chooseSchema(nestedFieldSchema)).thenReturn(nestedFieldSchema); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_adds_array_values(Object value, Schema.Type expectedType) { + Map data = Map.of(arrayFieldName, List.of(value)); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), notNullValue()); + assertThat(actualRecord.get(arrayFieldName), instanceOf(GenericData.Array.class)); + GenericData.Array actualArray = (GenericData.Array) actualRecord.get(arrayFieldName); + + assertThat(actualArray.getSchema(), notNullValue()); + assertThat(actualArray.getSchema(), equalTo(arrayFieldSchema)); + assertThat(actualArray.getSchema().getType(), equalTo(Schema.Type.ARRAY)); + assertThat(actualArray.size(), equalTo(1)); + assertThat(actualArray.get(0), notNullValue()); + assertThat(actualArray.get(0), equalTo(value)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_non_present_arrays(Object value, Schema.Type expectedType) { + Map data = Collections.emptyMap(); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_null_array(Object value, Schema.Type expectedType) { + Map data = Collections.singletonMap(arrayFieldName, null); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_includes_empty_arrays(Object value, Schema.Type expectedType) { + Map data = Map.of(arrayFieldName, Collections.emptyList()); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), notNullValue()); + assertThat(actualRecord.get(arrayFieldName), instanceOf(GenericData.Array.class)); + GenericData.Array actualArray = (GenericData.Array) actualRecord.get(arrayFieldName); + + assertThat(actualArray.getSchema(), notNullValue()); + assertThat(actualArray.getSchema(), equalTo(arrayFieldSchema)); + assertThat(actualArray.getSchema().getType(), equalTo(Schema.Type.ARRAY)); + assertThat(actualArray.size(), equalTo(0)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_includes_null_array_elements(Object value, Schema.Type expectedType) { + Map data = Map.of(arrayFieldName, Collections.singletonList(null)); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), notNullValue()); + assertThat(actualRecord.get(arrayFieldName), instanceOf(GenericData.Array.class)); + GenericData.Array actualArray = (GenericData.Array) actualRecord.get(arrayFieldName); + + assertThat(actualArray.getSchema(), notNullValue()); + assertThat(actualArray.getSchema(), equalTo(arrayFieldSchema)); + assertThat(actualArray.getSchema().getType(), equalTo(Schema.Type.ARRAY)); + assertThat(actualArray.size(), equalTo(1)); + assertThat(actualArray.get(0), nullValue()); + } + } + + private static String randomAvroName() { + return "a" + UUID.randomUUID().toString().replaceAll("-", ""); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/SchemaDefinedAvroEventConverterTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/SchemaDefinedAvroEventConverterTest.java new file mode 100644 index 0000000000..8e0b5fb532 --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/SchemaDefinedAvroEventConverterTest.java @@ -0,0 +1,451 @@ +package org.opensearch.dataprepper.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class SchemaDefinedAvroEventConverterTest { + @Mock + private SchemaChooser schemaChooser; + @Mock(lenient = true) + private Schema schema; + @Mock + private OutputCodecContext codecContext; + + @BeforeEach + void setUp() { + when(schema.getType()).thenReturn(Schema.Type.RECORD); + } + + @AfterEach + void codecContextIsNotUsed() { + verifyNoInteractions(codecContext); + } + + + private SchemaDefinedAvroEventConverter createObjectUnderTest() { + return new SchemaDefinedAvroEventConverter(schemaChooser); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_does_not_need_to_getField_on_empty_map() { + Map data = Collections.emptyMap(); + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + verify(schema, never()).getField(anyString()); + } + + @Nested + class WithPrimitiveField { + + private String fieldName; + @Mock(lenient = true) + private Schema.Field field; + @Mock + private Schema fieldSchema; + + @BeforeEach + void setUp() { + fieldName = UUID.randomUUID().toString(); + when(schema.getField(fieldName)).thenReturn(field); + when(schema.getFields()).thenReturn(Collections.singletonList(field)); + when(field.name()).thenReturn(fieldName); + when(field.schema()).thenReturn(fieldSchema); + when(field.pos()).thenReturn(0); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_adds_fields(Object value, Schema.Type expectedType) { + Map data = Map.of(fieldName, value); + when(fieldSchema.getType()).thenReturn(expectedType); + + when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), notNullValue()); + assertThat(actualRecord.get(fieldName), instanceOf(value.getClass())); + assertThat(actualRecord.get(fieldName), equalTo(value)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_non_present_fields(Object value, Schema.Type expectedType) { + Map data = Collections.emptyMap(); + when(fieldSchema.getType()).thenReturn(expectedType); + + when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_null_values(Object value, Schema.Type expectedType) { + Map data = Collections.singletonMap(fieldName, null); + when(fieldSchema.getType()).thenReturn(expectedType); + + when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_only_includes_fields_which_are_defined_in_the_schema(Object value, Schema.Type expectedType) { + when(fieldSchema.getType()).thenReturn(expectedType); + when(schemaChooser.chooseSchema(fieldSchema)).thenReturn(fieldSchema); + + String nonFieldKey = randomAvroName(); + Map data = Map.of( + fieldName, value, + nonFieldKey, value + ); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(fieldName), notNullValue()); + assertThat(actualRecord.get(fieldName), instanceOf(value.getClass())); + assertThat(actualRecord.get(fieldName), equalTo(value)); + + assertThat(actualRecord.hasField(nonFieldKey), equalTo(false)); + } + } + + @Nested + class WithRecordField { + private String recordFieldName; + private String nestedFieldName; + @Mock(lenient = true) + private Schema.Field nestedField; + @Mock + private Schema nestedFieldSchema; + @Mock + private Schema recordFieldSchema; + + @BeforeEach + void setUp() { + recordFieldName = randomAvroName(); + nestedFieldName = randomAvroName(); + + when(nestedField.name()).thenReturn(nestedFieldName); + when(nestedField.schema()).thenReturn(nestedFieldSchema); + when(nestedField.pos()).thenReturn(0); + + when(recordFieldSchema.getType()).thenReturn(Schema.Type.RECORD); + lenient().when(recordFieldSchema.getField(nestedFieldName)).thenReturn(nestedField); + lenient().when(recordFieldSchema.getFields()).thenReturn(Collections.singletonList(nestedField)); + + Schema.Field recordField = mock(Schema.Field.class); + when(schema.getField(recordFieldName)).thenReturn(recordField); + when(schema.getFields()).thenReturn(Collections.singletonList(recordField)); + + when(recordField.name()).thenReturn(recordFieldName); + when(recordField.schema()).thenReturn(recordFieldSchema); + when(recordField.pos()).thenReturn(0); + + when(schemaChooser.chooseSchema(recordFieldSchema)).thenReturn(recordFieldSchema); + lenient().when(schemaChooser.chooseSchema(nestedFieldSchema)).thenReturn(nestedFieldSchema); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_adds_nested_fields(Object value, Schema.Type expectedType) { + Map data = Map.of(recordFieldName, Map.of(nestedFieldName, value)); + when(nestedFieldSchema.getType()).thenReturn(expectedType); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), notNullValue()); + assertThat(actualRecord.get(recordFieldName), instanceOf(GenericData.Record.class)); + GenericData.Record actualSubRecord = (GenericData.Record) actualRecord.get(recordFieldName); + + assertThat(actualSubRecord.getSchema(), notNullValue()); + assertThat(actualSubRecord.getSchema(), equalTo(recordFieldSchema)); + assertThat(actualSubRecord.getSchema().getType(), equalTo(Schema.Type.RECORD)); + assertThat(actualSubRecord.get(nestedFieldName), notNullValue()); + assertThat(actualSubRecord.get(nestedFieldName), instanceOf(value.getClass())); + assertThat(actualSubRecord.get(nestedFieldName), equalTo(value)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_non_present_records(Object value, Schema.Type expectedType) { + Map data = Collections.emptyMap(); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_null_record(Object value, Schema.Type expectedType) { + Map data = Collections.singletonMap(recordFieldName, null); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_non_present_nested_fields(Object value, Schema.Type expectedType) { + Map data = Map.of(recordFieldName, Collections.emptyMap()); + when(nestedFieldSchema.getType()).thenReturn(expectedType); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), notNullValue()); + assertThat(actualRecord.get(recordFieldName), instanceOf(GenericData.Record.class)); + GenericData.Record actualSubRecord = (GenericData.Record) actualRecord.get(recordFieldName); + + assertThat(actualSubRecord.getSchema(), notNullValue()); + assertThat(actualSubRecord.getSchema(), equalTo(recordFieldSchema)); + assertThat(actualSubRecord.getSchema().getType(), equalTo(Schema.Type.RECORD)); + assertThat(actualSubRecord.get(nestedFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_null_nested_fields(Object value, Schema.Type expectedType) { + Map data = Map.of(recordFieldName, Collections.singletonMap(nestedFieldName, null)); + when(nestedFieldSchema.getType()).thenReturn(expectedType); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), notNullValue()); + assertThat(actualRecord.get(recordFieldName), instanceOf(GenericData.Record.class)); + GenericData.Record actualSubRecord = (GenericData.Record) actualRecord.get(recordFieldName); + + assertThat(actualSubRecord.getSchema(), notNullValue()); + assertThat(actualSubRecord.getSchema(), equalTo(recordFieldSchema)); + assertThat(actualSubRecord.getSchema().getType(), equalTo(Schema.Type.RECORD)); + assertThat(actualSubRecord.get(nestedFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_only_includes_fields_which_are_defined_in_the_schema(Object value, Schema.Type expectedType) { + when(nestedFieldSchema.getType()).thenReturn(expectedType); + + String nonFieldKey = UUID.randomUUID().toString(); + Map data = Map.of( + recordFieldName, + Map.of( + nestedFieldName, value, + nonFieldKey, value + )); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(recordFieldName), notNullValue()); + assertThat(actualRecord.get(recordFieldName), instanceOf(GenericData.Record.class)); + GenericData.Record actualSubRecord = (GenericData.Record) actualRecord.get(recordFieldName); + + assertThat(actualSubRecord.getSchema(), notNullValue()); + assertThat(actualSubRecord.getSchema(), equalTo(recordFieldSchema)); + assertThat(actualSubRecord.getSchema().getType(), equalTo(Schema.Type.RECORD)); + assertThat(actualSubRecord.get(nestedFieldName), notNullValue()); + assertThat(actualSubRecord.get(nestedFieldName), instanceOf(value.getClass())); + assertThat(actualSubRecord.get(nestedFieldName), equalTo(value)); + + assertThat(actualRecord.hasField(nonFieldKey), equalTo(false)); + } + } + + @Nested + class WithArrayField { + private String arrayFieldName; + private String nestedFieldName; + @Mock(lenient = true) + private Schema.Field nestedField; + @Mock + private Schema nestedFieldSchema; + @Mock + private Schema arrayFieldSchema; + + @BeforeEach + void setUp() { + arrayFieldName = randomAvroName(); + nestedFieldName = randomAvroName(); + + when(nestedField.name()).thenReturn(nestedFieldName); + when(nestedField.schema()).thenReturn(nestedFieldSchema); + when(nestedField.pos()).thenReturn(0); + + when(arrayFieldSchema.getType()).thenReturn(Schema.Type.ARRAY); + + Schema.Field recordField = mock(Schema.Field.class); + when(schema.getField(arrayFieldName)).thenReturn(recordField); + when(schema.getFields()).thenReturn(Collections.singletonList(recordField)); + + when(recordField.name()).thenReturn(arrayFieldName); + when(recordField.schema()).thenReturn(arrayFieldSchema); + when(recordField.pos()).thenReturn(0); + + when(schemaChooser.chooseSchema(arrayFieldSchema)).thenReturn(arrayFieldSchema); + lenient().when(schemaChooser.chooseSchema(nestedFieldSchema)).thenReturn(nestedFieldSchema); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_adds_array_values(Object value, Schema.Type expectedType) { + Map data = Map.of(arrayFieldName, List.of(value)); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), notNullValue()); + assertThat(actualRecord.get(arrayFieldName), instanceOf(GenericData.Array.class)); + GenericData.Array actualArray = (GenericData.Array) actualRecord.get(arrayFieldName); + + assertThat(actualArray.getSchema(), notNullValue()); + assertThat(actualArray.getSchema(), equalTo(arrayFieldSchema)); + assertThat(actualArray.getSchema().getType(), equalTo(Schema.Type.ARRAY)); + assertThat(actualArray.size(), equalTo(1)); + assertThat(actualArray.get(0), notNullValue()); + assertThat(actualArray.get(0), equalTo(value)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_non_present_arrays(Object value, Schema.Type expectedType) { + Map data = Collections.emptyMap(); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_skips_null_array(Object value, Schema.Type expectedType) { + Map data = Collections.singletonMap(arrayFieldName, null); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), nullValue()); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_includes_empty_arrays(Object value, Schema.Type expectedType) { + Map data = Map.of(arrayFieldName, Collections.emptyList()); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), notNullValue()); + assertThat(actualRecord.get(arrayFieldName), instanceOf(GenericData.Array.class)); + GenericData.Array actualArray = (GenericData.Array) actualRecord.get(arrayFieldName); + + assertThat(actualArray.getSchema(), notNullValue()); + assertThat(actualArray.getSchema(), equalTo(arrayFieldSchema)); + assertThat(actualArray.getSchema().getType(), equalTo(Schema.Type.ARRAY)); + assertThat(actualArray.size(), equalTo(0)); + } + + @ParameterizedTest + @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class) + void convertEventDataToAvro_includes_null_array_elements(Object value, Schema.Type expectedType) { + Map data = Map.of(arrayFieldName, Collections.singletonList(null)); + + GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext); + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), equalTo(schema)); + + assertThat(actualRecord.get(arrayFieldName), notNullValue()); + assertThat(actualRecord.get(arrayFieldName), instanceOf(GenericData.Array.class)); + GenericData.Array actualArray = (GenericData.Array) actualRecord.get(arrayFieldName); + + assertThat(actualArray.getSchema(), notNullValue()); + assertThat(actualArray.getSchema(), equalTo(arrayFieldSchema)); + assertThat(actualArray.getSchema().getType(), equalTo(Schema.Type.ARRAY)); + assertThat(actualArray.size(), equalTo(1)); + assertThat(actualArray.get(0), nullValue()); + } + } + + private static String randomAvroName() { + return "a" + UUID.randomUUID().toString().replaceAll("-", ""); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java index 6e297eba6c..2ec27b73ab 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java @@ -206,21 +206,65 @@ void test_happy_case_nullable_records_with_empty_maps(final int numberOfRecords) } @Test - void writeEvent_throws_exception_when_field_does_not_exist() throws IOException { - final Event eventWithInvalidField = mock(Event.class); + void writeEvent_accepts_event_when_field_does_not_exist_in_user_defined_schema() throws IOException { final String invalidFieldName = UUID.randomUUID().toString(); - when(eventWithInvalidField.toMap()).thenReturn(Collections.singletonMap(invalidFieldName, UUID.randomUUID().toString())); + + Map mapWithInvalid = generateRecords(1).get(0); + mapWithInvalid.put(invalidFieldName, UUID.randomUUID().toString()); + final Event eventWithInvalidField = mock(Event.class); + when(eventWithInvalidField.toMap()).thenReturn(mapWithInvalid); + final AvroOutputCodec objectUnderTest = createObjectUnderTest(); outputStream = new ByteArrayOutputStream(); objectUnderTest.start(outputStream, null, new OutputCodecContext()); + objectUnderTest.writeEvent(eventWithInvalidField, outputStream); + objectUnderTest.complete(outputStream); + + final List actualAvroRecords = createAvroRecordsList(outputStream); + assertThat(actualAvroRecords.size(), equalTo(1)); + + int count = 0; + for (final GenericRecord actualRecord : actualAvroRecords) { + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getSchema(), notNullValue()); + + List fields = actualRecord.getSchema().getFields(); + assertThat(fields.size(), equalTo(TOTAL_TOP_LEVEL_FIELDS)); + for (Schema.Field field : fields) { + Object actualValue = actualRecord.get(field.name()); + assertThat(actualValue, notNullValue()); + } + count++; + } + + assertThat(count, equalTo(1)); + } + + @Test + void writeEvent_throws_exception_when_field_does_not_exist_in_auto_schema() throws IOException { + config.setSchema(null); + final String invalidFieldName = UUID.randomUUID().toString(); + + Map mapWithInvalid = generateRecords(1).get(0); + mapWithInvalid.put(invalidFieldName, UUID.randomUUID().toString()); + final Event eventWithInvalidField = mock(Event.class); + when(eventWithInvalidField.toMap()).thenReturn(mapWithInvalid); + + final AvroOutputCodec objectUnderTest = createObjectUnderTest(); + + outputStream = new ByteArrayOutputStream(); + objectUnderTest.start(outputStream, createEventRecord(generateRecords(1).get(0)), new OutputCodecContext()); + final RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.writeEvent(eventWithInvalidField, outputStream)); assertThat(actualException.getMessage(), notNullValue()); assertThat(actualException.getMessage(), containsString(invalidFieldName)); } + @Test public void testInlineSchemaBuilder() throws IOException { Schema expectedSchema = new Schema.Parser().parse(EXPECTED_SCHEMA_STRING); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java index bd93c3e20f..6d88aa1cb7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java @@ -13,6 +13,8 @@ import org.apache.parquet.io.PositionOutputStream; import org.opensearch.dataprepper.avro.AvroAutoSchemaGenerator; import org.opensearch.dataprepper.avro.AvroEventConverter; +import org.opensearch.dataprepper.avro.EventDefinedAvroEventConverter; +import org.opensearch.dataprepper.avro.SchemaDefinedAvroEventConverter; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; @@ -43,8 +45,14 @@ public ParquetOutputCodec(final ParquetOutputCodecConfig config) { Objects.requireNonNull(config); this.config = config; - avroEventConverter = new AvroEventConverter(); avroAutoSchemaGenerator = new AvroAutoSchemaGenerator(); + + if (config.getSchema() != null) { + schema = parseSchema(config.getSchema()); + avroEventConverter = new SchemaDefinedAvroEventConverter(); + } else { + avroEventConverter = new EventDefinedAvroEventConverter(); + } } @Override @@ -60,7 +68,9 @@ public synchronized void start(final OutputStream outputStream, final Event even PositionOutputStream s3OutputStream = (PositionOutputStream) outputStream; CompressionCodecName compressionCodecName = CompressionConverter.convertCodec(((S3OutputCodecContext) codecContext).getCompressionOption()); this.codecContext = codecContext; - buildSchemaAndKey(event); + if (schema == null) { + schema = buildInlineSchemaFromEvent(event); + } final S3OutputFile s3OutputFile = new S3OutputFile(s3OutputStream); buildWriter(s3OutputFile, compressionCodecName); } @@ -70,14 +80,6 @@ public boolean isCompressionInternal() { return true; } - void buildSchemaAndKey(final Event event) throws IOException { - if (config.getSchema() != null) { - schema = parseSchema(config.getSchema()); - } else { - schema = buildInlineSchemaFromEvent(event); - } - } - public Schema buildInlineSchemaFromEvent(final Event event) throws IOException { final Map data; if (codecContext != null && codecContext.getTagsTargetKey() != null) { diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java index bc42eec18c..81a8784068 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java @@ -79,6 +79,18 @@ private ParquetOutputCodec createObjectUnderTest() { return new ParquetOutputCodec(config); } + @Test + void constructor_throws_if_schema_is_invalid() { + String invalidSchema = createStandardSchema().toString().replaceAll(",", ";"); + config.setSchema(invalidSchema); + + RuntimeException actualException = assertThrows(RuntimeException.class, this::createObjectUnderTest); + + assertThat(actualException.getMessage(), notNullValue()); + assertThat(actualException.getMessage(), containsString(invalidSchema)); + assertThat(actualException.getMessage(), containsString("was expecting comma")); + } + @ParameterizedTest @ValueSource(ints = {1, 2, 10, 100}) void test_happy_case(final int numberOfRecords) throws Exception { @@ -166,18 +178,46 @@ public void test_getExtension() { } @Test - void writeEvent_throws_exception_when_field_does_not_exist() throws IOException { + void writeEvent_includes_record_when_field_does_not_exist_in_user_supplied_schema() throws IOException { config.setSchema(createStandardSchema().toString()); when(codecContext.getCompressionOption()).thenReturn(CompressionOption.NONE); final Event eventWithInvalidField = mock(Event.class); final String invalidFieldName = UUID.randomUUID().toString(); - when(eventWithInvalidField.toMap()).thenReturn(Collections.singletonMap(invalidFieldName, UUID.randomUUID().toString())); + Map mapWithInvalid = generateRecords(1).get(0); + mapWithInvalid.put(invalidFieldName, UUID.randomUUID().toString()); + when(eventWithInvalidField.toMap()).thenReturn(mapWithInvalid); final ParquetOutputCodec objectUnderTest = createObjectUnderTest(); final File tempFile = new File(tempDirectory, FILE_NAME); LocalFilePositionOutputStream outputStream = LocalFilePositionOutputStream.create(tempFile); objectUnderTest.start(outputStream, null, codecContext); + objectUnderTest.writeEvent(eventWithInvalidField, outputStream); + + objectUnderTest.closeWriter(outputStream, tempFile); + List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(tempFile.toString().getBytes())); + int index = 0; + for (final Map actualMap : actualRecords) { + assertThat(actualMap, notNullValue()); + Map expectedMap = generateRecords(1).get(index); + assertThat(expectedMap, Matchers.equalTo(actualMap)); + index++; + } + } + + @Test + void writeEvent_throws_exception_when_field_does_not_exist_in_auto_schema() throws IOException { + config.setSchema(null); + when(codecContext.getCompressionOption()).thenReturn(CompressionOption.NONE); + final Event eventWithInvalidField = mock(Event.class); + final String invalidFieldName = UUID.randomUUID().toString(); + when(eventWithInvalidField.toMap()).thenReturn(Collections.singletonMap(invalidFieldName, UUID.randomUUID().toString())); + final ParquetOutputCodec objectUnderTest = createObjectUnderTest(); + + final File tempFile = new File(tempDirectory, FILE_NAME); + LocalFilePositionOutputStream outputStream = LocalFilePositionOutputStream.create(tempFile); + objectUnderTest.start(outputStream, createEventRecord(generateRecords(1).get(0)), codecContext); + final RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.writeEvent(eventWithInvalidField, outputStream)); assertThat(actualException.getMessage(), notNullValue());