Skip to content

Commit

Permalink
Behavioral change to Avro codecs and schema handling (#3238)
Browse files Browse the repository at this point in the history
Change the behavior of Avro-based codecs. When a schema is defined, rely on the schema rather than the incoming event. If the schema is auto-generated, then the incoming event data must continue to match. Fix Avro arrays which were only supporting arrays of strings previously.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Aug 24, 2023
1 parent 2cca4b1 commit f17e833
Show file tree
Hide file tree
Showing 11 changed files with 1,200 additions and 172 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.opensearch.dataprepper.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.util.List;
import java.util.Map;

abstract class AbstractAvroEventConverterTemplate implements AvroEventConverter {
private final SchemaChooser schemaChooser;

protected AbstractAvroEventConverterTemplate(final SchemaChooser schemaChooser) {
this.schemaChooser = schemaChooser;
}

@Override
public GenericRecord convertEventDataToAvro(final Schema schema,
final Map<String, Object> eventData,
final OutputCodecContext codecContext) {
return convertEventDataToAvro(schema, eventData, codecContext, true);
}

private GenericRecord convertEventDataToAvro(final Schema schema,
final Map<String, Object> eventData,
final OutputCodecContext codecContext,
boolean rootOfData) {
final GenericRecord avroRecord = new GenericData.Record(schema);

for (String key : getKeyNames(schema, eventData, codecContext, rootOfData)) {
final Schema.Field field = schema.getField(key);
if (field == null) {
throw new RuntimeException("The event has a key ('" + key + "') which is not included in the schema.");
}
final Object value = schemaMapper(field, eventData.get(key), codecContext);
avroRecord.put(key, value);
}

return avroRecord;
}


private Object schemaMapper(final Schema.Field field, final Object rawValue, OutputCodecContext codecContext) {
Schema providedSchema = schemaChooser.chooseSchema(field.schema());

if (providedSchema.getType() == Schema.Type.RECORD && rawValue instanceof Map) {
return convertEventDataToAvro(providedSchema, (Map<String, Object>) rawValue, codecContext, false);
} else if (providedSchema.getType() == Schema.Type.ARRAY && rawValue instanceof List) {
GenericData.Array<Object> avroArray =
new GenericData.Array<>(((List<?>) rawValue).size(), providedSchema);
for (Object element : ((List<?>) rawValue)) {
avroArray.add(element);
}
return avroArray;
}
return rawValue;
}

/**
* Template method to get key names for a given object.
*
* @param schema The Avro schema
* @param eventData Current event data
* @param codecContext The {@link OutputCodecContext}
* @param rootOfData True, if this is the root of the data. False when this is nested.
* @return An {@Iterable} of key names.
*/
abstract Iterable<String> getKeyNames(Schema schema, Map<String, Object> eventData, OutputCodecContext codecContext, boolean rootOfData);
}
Original file line number Diff line number Diff line change
@@ -1,60 +1,26 @@
package org.opensearch.dataprepper.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.util.List;
import java.util.Map;

/**
* Converts an Event into an Avro record.
* <p>
* It might be a good idea to consolidate similar logic for input.
*/
public class AvroEventConverter {
private final SchemaChooser schemaChooser;

public AvroEventConverter() {
this(new SchemaChooser());
}

AvroEventConverter(final SchemaChooser schemaChooser) {
this.schemaChooser = schemaChooser;
}

public GenericRecord convertEventDataToAvro(final Schema schema,
final Map<String, Object> eventData,
OutputCodecContext codecContext) {
final GenericRecord avroRecord = new GenericData.Record(schema);
for (final String key : eventData.keySet()) {
if (codecContext != null && codecContext.shouldNotIncludeKey(key)) {
continue;
}
final Schema.Field field = schema.getField(key);
if (field == null) {
throw new RuntimeException("The event has a key ('" + key + "') which is not included in the schema.");
}
final Object value = schemaMapper(field, eventData.get(key), codecContext);
avroRecord.put(key, value);
}
return avroRecord;
}

private Object schemaMapper(final Schema.Field field, final Object rawValue, OutputCodecContext codecContext) {
Schema providedSchema = schemaChooser.chooseSchema(field.schema());

if (providedSchema.getType() == Schema.Type.RECORD && rawValue instanceof Map) {
return convertEventDataToAvro(providedSchema, (Map<String, Object>) rawValue, codecContext);
} else if (providedSchema.getType() == Schema.Type.ARRAY && rawValue instanceof List) {
GenericData.Array<String> avroArray =
new GenericData.Array<>(((List<String>) rawValue).size(), providedSchema);
for (String element : ((List<String>) rawValue)) {
avroArray.add(element);
}
return avroArray;
}
return rawValue;
}
public interface AvroEventConverter {
/**
* Converts event data into an Avro record.
*
* @param schema The defined Avro schema
* @param eventData The event data; may include tags
* @param codecContext The output codec context which may define values included/excluded.
* @return The generated Avro {@link GenericRecord}.
*/
GenericRecord convertEventDataToAvro(final Schema schema,
final Map<String, Object> eventData,
final OutputCodecContext codecContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.opensearch.dataprepper.avro;

import org.apache.avro.Schema;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Converts an Event into an Avro record.
* <p>
* This implementation utilizes the Event data first to populate the Avro record. Thus,
* it will fail if the Event has any fields not in the schema.
*/
public class EventDefinedAvroEventConverter extends AbstractAvroEventConverterTemplate {
public EventDefinedAvroEventConverter() {
this(new SchemaChooser());
}

EventDefinedAvroEventConverter(final SchemaChooser schemaChooser) {
super(schemaChooser);
}

@Override
Iterable<String> getKeyNames(Schema schema, Map<String, Object> eventData, OutputCodecContext codecContext, boolean rootOfData) {
Set<String> keySet = eventData.keySet();

if(codecContext == null || !rootOfData) {
return keySet;
}
else {
return keySet.stream()
.filter(Predicate.not(codecContext::shouldNotIncludeKey))
.collect(Collectors.toList());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.opensearch.dataprepper.avro;

import org.apache.avro.Schema;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.util.Map;
import java.util.stream.Collectors;

/**
* Converts an Event into an Avro record.
* <p>
* This implementation relies on the defined schema. Thus, any fields in the Event which
* are not in the schema will be ignored.
*/
public class SchemaDefinedAvroEventConverter extends AbstractAvroEventConverterTemplate {
public SchemaDefinedAvroEventConverter() {
this(new SchemaChooser());
}

SchemaDefinedAvroEventConverter(final SchemaChooser schemaChooser) {
super(schemaChooser);
}

@Override
Iterable<String> getKeyNames(Schema schema, Map<String, Object> eventData, OutputCodecContext codecContext, boolean rootOfData) {
return schema.getFields()
.stream()
.map(Schema.Field::name)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.avro.io.DatumWriter;
import org.opensearch.dataprepper.avro.AvroAutoSchemaGenerator;
import org.opensearch.dataprepper.avro.AvroEventConverter;
import org.opensearch.dataprepper.avro.EventDefinedAvroEventConverter;
import org.opensearch.dataprepper.avro.SchemaDefinedAvroEventConverter;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
Expand Down Expand Up @@ -47,11 +49,13 @@ public AvroOutputCodec(final AvroOutputCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;

avroEventConverter = new AvroEventConverter();
avroAutoSchemaGenerator = new AvroAutoSchemaGenerator();

if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
avroEventConverter = new SchemaDefinedAvroEventConverter();
} else {
avroEventConverter = new EventDefinedAvroEventConverter();
}
}

Expand Down

This file was deleted.

Loading

0 comments on commit f17e833

Please sign in to comment.