Skip to content

Commit

Permalink
FLUME-2810. Add static Schema URL to AvroEventSerializer configuration
Browse files Browse the repository at this point in the history
Currently the only way to pass a schema to the avro event serializer is
via header. This introduces an option to specify the location directly
in the Flume configuration.

(Jeff Holoman via Mike Percy)
  • Loading branch information
jholoman authored and mpercy committed Jul 19, 2016
1 parent c7de4ba commit dbf2e98
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public class AvroEventSerializerConfigurationConstants {
public static final String COMPRESSION_CODEC = "compressionCodec";
public static final String DEFAULT_COMPRESSION_CODEC = "null"; // no codec

/**
* Avro static Schema URL
*/
public static final String STATIC_SCHEMA_URL = "schemaURL";
public static final String DEFAULT_STATIC_SCHEMA_URL = null;
}
51 changes: 46 additions & 5 deletions flume-ng-doc/sphinx/FlumeUserGuide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3235,19 +3235,59 @@ Example for agent named a1:
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
"Flume Event" Avro Event Serializer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Alias: ``avro_event``.

This interceptor serializes Flume events into an Avro container file. The schema used is the same schema used for
Flume events in the Avro RPC mechanism.

This serializer inherits from the ``AbstractAvroEventSerializer`` class.

Configuration options are as follows:

========================== ================ ===========================================================================
Property Name Default Description
========================== ================ ===========================================================================
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro's CodecFactory docs.
========================== ================ ===========================================================================

Example for agent named a1:

.. code-block:: properties
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy
Avro Event Serializer
~~~~~~~~~~~~~~~~~~~~~

Alias: ``avro_event``. This interceptor serializes Flume events into an Avro
container file. The schema used is the same schema used for Flume events
in the Avro RPC mechanism. This serializers inherits from the
``AbstractAvroEventSerializer`` class. Configuration options are as follows:
Alias: This serializer does not have an alias, and must be specified using the fully-qualified class name class name.

This serializes Flume events into an Avro container file like the "Flume Event" Avro Event Serializer, however the
record schema is configurable. The record schema may be specified either as a Flume configuration property or passed in an event header.

To pass the record schema as part of the Flume configuration, use the property ``schemaURL`` as listed below.

To pass the record schema in an event header, specify either the event header ``flume.avro.schema.literal``
containing a JSON-format representation of the schema or ``flume.avro.schema.url`` with a URL where
the schema may be found (``hdfs:/...`` URIs are supported).

This serializer inherits from the ``AbstractAvroEventSerializer`` class.

Configuration options are as follows:

========================== ================ ===========================================================================
Property Name Default Description
========================== ================ ===========================================================================
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro's CodecFactory docs.
schemaURL null Avro schema URL. Schemas specified in the header ovverride this option.
========================== ================ ===========================================================================

Example for agent named a1:
Expand All @@ -3257,8 +3297,9 @@ Example for agent named a1:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
Flume Interceptors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@
*/
package org.apache.flume.sink.hdfs;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
Expand All @@ -44,7 +36,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC;
import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC;
import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_STATIC_SCHEMA_URL;
import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES;
import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL;
import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES;

/**
* <p>
Expand Down Expand Up @@ -76,6 +82,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable {
private int syncIntervalBytes;
private String compressionCodec;
private Map<String, Schema> schemaCache = new HashMap<String, Schema>();
private String staticSchemaURL;

private AvroEventSerializer(OutputStream out) {
this.out = out;
Expand All @@ -87,6 +94,7 @@ public void configure(Context context) {
context.getInteger(SYNC_INTERVAL_BYTES, DEFAULT_SYNC_INTERVAL_BYTES);
compressionCodec =
context.getString(COMPRESSION_CODEC, DEFAULT_COMPRESSION_CODEC);
staticSchemaURL = context.getString(STATIC_SCHEMA_URL, DEFAULT_STATIC_SCHEMA_URL);
}

@Override
Expand All @@ -111,19 +119,24 @@ public void write(Event event) throws IOException {
private void initialize(Event event) throws IOException {
Schema schema = null;
String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
if (schemaUrl != null) {
String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);

if (schemaUrl != null) { // if URL_HEADER is there then use it
schema = schemaCache.get(schemaUrl);
if (schema == null) {
schema = loadFromUrl(schemaUrl);
schemaCache.put(schemaUrl, schema);
}
}
if (schema == null) {
String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
if (schemaString == null) {
throw new FlumeException("Could not find schema for event " + event);
}
} else if (schemaString != null) { // fallback to LITERAL_HEADER if it was there
schema = new Schema.Parser().parse(schemaString);
} else if (staticSchemaURL != null) { // fallback to static url if it was there
schema = schemaCache.get(staticSchemaURL);
if (schema == null) {
schema = loadFromUrl(staticSchemaURL);
schemaCache.put(staticSchemaURL, schema);
}
} else { // no other options so giving up
throw new FlumeException("Could not find schema for event " + event);
}

writer = new GenericDatumWriter<Object>(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.serialization.AvroEventSerializerConfigurationConstants;
import org.apache.flume.serialization.EventSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.After;

public class TestAvroEventSerializer {

Expand All @@ -53,39 +55,55 @@ public void setUp() throws Exception {
file = File.createTempFile(getClass().getSimpleName(), "");
}

@After
public void tearDown() throws Exception {
file.delete();
}

@Test
public void testNoCompression() throws IOException {
createAvroFile(file, null, false);
createAvroFile(file, null, false, false);
validateAvroFile(file);
}

@Test
public void testNullCompression() throws IOException {
createAvroFile(file, "null", false);
createAvroFile(file, "null", false, false);
validateAvroFile(file);
}

@Test
public void testDeflateCompression() throws IOException {
createAvroFile(file, "deflate", false);
createAvroFile(file, "deflate", false, false);
validateAvroFile(file);
}

@Test
public void testSnappyCompression() throws IOException {
createAvroFile(file, "snappy", false);
createAvroFile(file, "snappy", false, false);
validateAvroFile(file);
}

@Test
public void testSchemaUrl() throws IOException {
createAvroFile(file, null, true);
createAvroFile(file, null, true, false);
validateAvroFile(file);
}

public void createAvroFile(File file, String codec, boolean useSchemaUrl) throws
IOException {
@Test
public void testStaticSchemaUrl() throws IOException {
createAvroFile(file,null,false, true);
validateAvroFile(file);
}

@Test
public void testBothUrls() throws IOException {
createAvroFile(file,null,true,true);
validateAvroFile(file);
}

public void createAvroFile(File file, String codec, boolean useSchemaUrl,
boolean useStaticSchemaUrl) throws IOException {
// serialize a few events using the reflection-based avro serializer
OutputStream out = new FileOutputStream(file);

Expand All @@ -100,22 +118,27 @@ public void createAvroFile(File file, String codec, boolean useSchemaUrl) throws
}));
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
File schemaFile = null;
if (useSchemaUrl) {
if (useSchemaUrl || useStaticSchemaUrl) {
schemaFile = File.createTempFile(getClass().getSimpleName(), ".avsc");
Files.write(schema.toString(), schemaFile, Charsets.UTF_8);
}

if (useStaticSchemaUrl) {
ctx.put(AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL,
schemaFile.toURI().toURL().toExternalForm());
}

EventSerializer.Builder builder = new AvroEventSerializer.Builder();
EventSerializer serializer = builder.build(ctx, out);

serializer.afterCreate();
for (int i = 0; i < 3; i++) {
GenericRecord record = recordBuilder.set("message", "Hello " + i).build();
Event event = EventBuilder.withBody(serializeAvro(record, schema));
if (schemaFile == null) {
if (schemaFile == null && !useSchemaUrl) {
event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_LITERAL_HEADER,
schema.toString());
} else {
} else if (useSchemaUrl) {
event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_URL_HEADER,
schemaFile.toURI().toURL().toExternalForm());
}
Expand All @@ -125,6 +148,10 @@ public void createAvroFile(File file, String codec, boolean useSchemaUrl) throws
serializer.beforeClose();
out.flush();
out.close();
if (schemaFile != null ) {
schemaFile.delete();
}

}

private byte[] serializeAvro(Object datum, Schema schema) throws IOException {
Expand Down

0 comments on commit dbf2e98

Please sign in to comment.