diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java index 65a305f34fc..6c2c45cf184 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.function.Function; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; @@ -73,6 +74,8 @@ public class DataFileWriter implements Closeable, Flushable { private byte[] sync; // 16 random bytes private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL; + private Function initEncoder = out -> new EncoderFactory().directBinaryEncoder(out, + null); private boolean isOpen; private Codec codec; @@ -130,6 +133,17 @@ public DataFileWriter setSyncInterval(int syncInterval) { return this; } + /** + * Allows setting a different encoder than the default DirectBinaryEncoder. + * + * @param initEncoderFunc Function to create a binary encoder + * @return this DataFileWriter + */ + public DataFileWriter setEncoder(Function initEncoderFunc) { + this.initEncoder = initEncoderFunc; + return this; + } + /** Open a new file for data matching a schema with a random sync. */ public DataFileWriter create(Schema schema, File file) throws IOException { SyncableFileOutputStream sfos = new SyncableFileOutputStream(file); @@ -242,7 +256,7 @@ private void init(OutputStream outs) throws IOException { this.vout = efactory.directBinaryEncoder(out, null); dout.setSchema(schema); buffer = new NonCopyingByteArrayOutputStream(Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1)); - this.bufOut = efactory.directBinaryEncoder(buffer, null); + this.bufOut = this.initEncoder.apply(buffer); if (this.codec == null) { this.codec = CodecFactory.nullCodec().createInstance(); } diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java index c82b9aae391..01611c698bf 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java @@ -25,9 +25,11 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.avro.file.CodecFactory; @@ -40,7 +42,9 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.EncoderFactory; import org.apache.avro.util.RandomData; import org.junit.jupiter.api.Test; @@ -93,22 +97,32 @@ private File makeFile(CodecFactory codec) { @ParameterizedTest @MethodSource("codecs") public void runTestsInOrder(CodecFactory codec) throws Exception { - LOG.info("Running with codec: " + codec); - testGenericWrite(codec); - testGenericRead(codec); - testSplits(codec); - testSyncDiscovery(codec); - testGenericAppend(codec); - testReadWithHeader(codec); - testFSync(codec, false); - testFSync(codec, true); + // Run for both encoders, but the MethodSource didn't really like it, + // so it is just a loop within the test + List> encoders = new ArrayList<>(); + encoders.add(b -> new EncoderFactory().directBinaryEncoder(b, null)); + encoders.add(b -> new EncoderFactory().blockingDirectBinaryEncoder(b, null)); + + for (Function encoder : encoders) { + LOG.info("Running with codec: {}", codec); + testGenericWrite(codec, encoder); + testGenericRead(codec); + testSplits(codec); + testSyncDiscovery(codec); + testGenericAppend(codec, encoder); + testReadWithHeader(codec); + testFSync(codec, encoder, false); + testFSync(codec, encoder, true); + } } - private void testGenericWrite(CodecFactory codec) throws IOException { + private void testGenericWrite(CodecFactory codec, Function encoderFunc) + throws IOException { DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>()).setSyncInterval(100); if (codec != null) { writer.setCodec(codec); } + writer.setEncoder(encoderFunc); writer.create(SCHEMA, makeFile(codec)); try { int count = 0; @@ -210,10 +224,12 @@ private void testSyncDiscovery(CodecFactory codec) throws IOException { } } - private void testGenericAppend(CodecFactory codec) throws IOException { + private void testGenericAppend(CodecFactory codec, Function encoderFunc) + throws IOException { File file = makeFile(codec); long start = file.length(); try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>()).appendTo(file)) { + writer.setEncoder(encoderFunc); for (Object datum : new RandomData(SCHEMA, COUNT, SEED + 1)) { writer.append(datum); } @@ -254,11 +270,8 @@ private void testReadWithHeader(CodecFactory codec) throws IOException { assertEquals(validPos, sin.tell(), "Should not move from sync point on reopen"); assertNotNull(readerFalse.next(), "Should be able to reopen at sync point"); } - } - } - } @Test @@ -306,8 +319,10 @@ public void flushCount() throws IOException { assertTrue(out.flushCount < currentCount && out.flushCount >= flushCounter); } - private void testFSync(CodecFactory codec, boolean useFile) throws IOException { + private void testFSync(CodecFactory codec, Function encoderFunc, boolean useFile) + throws IOException { try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.setEncoder(encoderFunc); writer.setFlushOnEveryBlock(false); TestingByteArrayOutputStream out = new TestingByteArrayOutputStream(); if (useFile) {