From cc9b6687029f3a97095d7e3edff943f89fedd5da Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 21 Oct 2023 22:12:24 +0200 Subject: [PATCH] AVRO-3871: Add blocking direct binary encoder (#2521) * Java: Add blocking direct binary encoder * Optimize * Comments and more tests * Comments and more tests * Fix rat check --- lang/java/avro/pom.xml | 5 + .../avro/io/BlockingDirectBinaryEncoder.java | 135 +++++ .../apache/avro/io/DirectBinaryEncoder.java | 12 +- .../org/apache/avro/io/EncoderFactory.java | 43 ++ .../avro/io/TestBinaryEncoderFidelity.java | 42 ++ .../io/TestBlockingDirectBinaryEncoder.java | 99 ++++ .../java/org/apache/avro/io/TestEncoders.java | 54 +- .../specific/TestRecordWithLogicalTypes.java | 18 +- .../specific/TestRecordWithMapsAndArrays.java | 535 ++++++++++++++++++ .../TestRecordWithMapsAndArrays.avsc | 23 + 10 files changed, 937 insertions(+), 29 deletions(-) create mode 100644 lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java create mode 100644 lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingDirectBinaryEncoder.java create mode 100644 lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithMapsAndArrays.java create mode 100644 lang/java/avro/src/test/resources/TestRecordWithMapsAndArrays.avsc diff --git a/lang/java/avro/pom.xml b/lang/java/avro/pom.xml index b96673d1851..672bbbd105d 100644 --- a/lang/java/avro/pom.xml +++ b/lang/java/avro/pom.xml @@ -250,5 +250,10 @@ hamcrest-library test + + org.mockito + mockito-core + test + diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java new file mode 100644 index 00000000000..b029034d0fb --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * An {@link Encoder} for Avro's binary encoding that does not buffer output. + *

+ * This encoder does not buffer writes in contrast to + * {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when: + * The buffering in BufferedBinaryEncoder is not desired because you buffer a + * different level or the Encoder is very short-lived. + *

+ * The BlockingDirectBinaryEncoder will encode the number of bytes of the Map + * and Array blocks. This will allow to postpone the decoding, or skip over it + * at all. + *

+ * To construct, use + * {@link EncoderFactory#blockingDirectBinaryEncoder(OutputStream, BinaryEncoder)} + *

+ * {@link BlockingDirectBinaryEncoder} instances returned by this method are not + * thread-safe + * + * @see BinaryEncoder + * @see EncoderFactory + * @see Encoder + * @see Decoder + */ +public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder { + private final BufferOutputStream buffer; + + private OutputStream originalStream; + + private boolean inBlock = false; + + private long blockItemCount; + + /** + * Create a writer that sends its output to the underlying stream + * out. + * + * @param out The Outputstream to write to + */ + public BlockingDirectBinaryEncoder(OutputStream out) { + super(out); + this.buffer = new BufferOutputStream(); + } + + private void startBlock() { + if (inBlock) { + throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder"); + } + originalStream = out; + buffer.reset(); + out = buffer; + inBlock = true; + } + + private void endBlock() { + if (!inBlock) { + throw new RuntimeException("Called endBlock, while not buffering a block"); + } + out = originalStream; + if (blockItemCount > 0) { + try { + // Make it negative, so the reader knows that the number of bytes is coming + writeLong(-blockItemCount); + writeLong(buffer.size()); + writeFixed(buffer.toBufferWithoutCopy()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + inBlock = false; + buffer.reset(); + } + + @Override + public void setItemCount(long itemCount) throws IOException { + blockItemCount = itemCount; + } + + @Override + public void writeArrayStart() throws IOException { + startBlock(); + } + + @Override + public void writeArrayEnd() throws IOException { + endBlock(); + // Writes another zero to indicate that this is the last block + super.writeArrayEnd(); + } + + @Override + public void writeMapStart() throws IOException { + startBlock(); + } + + @Override + public void writeMapEnd() throws IOException { + endBlock(); + // Writes another zero to indicate that this is the last block + super.writeMapEnd(); + } + + private static class BufferOutputStream extends ByteArrayOutputStream { + BufferOutputStream() { + } + + ByteBuffer toBufferWithoutCopy() { + return ByteBuffer.wrap(buf, 0, count); + } + + } +} diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java index 62b2a482627..8d8172bc2f5 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java @@ -27,20 +27,20 @@ * This encoder does not buffer writes, and as a result is slower than * {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when * the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is - * very short lived. + * very short-lived. *

* To construct, use * {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)} *

* DirectBinaryEncoder is not thread-safe - * + * * @see BinaryEncoder * @see EncoderFactory * @see Encoder * @see Decoder */ public class DirectBinaryEncoder extends BinaryEncoder { - private OutputStream out; + protected OutputStream out; // the buffer is used for writing floats, doubles, and large longs. private final byte[] buf = new byte[12]; @@ -48,7 +48,7 @@ public class DirectBinaryEncoder extends BinaryEncoder { * Create a writer that sends its output to the underlying stream * out. **/ - DirectBinaryEncoder(OutputStream out) { + public DirectBinaryEncoder(OutputStream out) { configure(out); } @@ -69,8 +69,8 @@ public void writeBoolean(boolean b) throws IOException { } /* - * buffering is slower for ints that encode to just 1 or two bytes, and and - * faster for large ones. (Sun JRE 1.6u22, x64 -server) + * buffering is slower for ints that encode to just 1 or two bytes, and faster + * for large ones. (Sun JRE 1.6u22, x64 -server) */ @Override public void writeInt(int n) throws IOException { diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java index 055ef9541d9..2039f30097a 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java @@ -217,6 +217,49 @@ public BinaryEncoder directBinaryEncoder(OutputStream out, BinaryEncoder reuse) } } + /** + * Creates or reinitializes a {@link BlockingDirectBinaryEncoder} with the + * OutputStream provided as the destination for written data. If reuse is + * provided, an attempt will be made to reconfigure reuse rather than + * construct a new instance, but this is not guaranteed, a new instance may be + * returned. + *

+ * The {@link BinaryEncoder} implementation returned does not buffer its output, + * calling {@link Encoder#flush()} will simply cause the wrapped OutputStream to + * be flushed. + *

+ * The {@link BlockingDirectBinaryEncoder} will write the block sizes for the + * arrays and maps so efficient skipping can be done. + *

+ * Performance of unbuffered writes can be significantly slower than buffered + * writes. {@link #binaryEncoder(OutputStream, BinaryEncoder)} returns + * BinaryEncoder instances that are tuned for performance but may buffer output. + * The unbuffered, 'direct' encoder may be desired when buffering semantics are + * problematic, or if the lifetime of the encoder is so short that the buffer + * would not be useful. + *

+ * {@link BinaryEncoder} instances returned by this method are not thread-safe. + * + * @param out The OutputStream to initialize to. Cannot be null. + * @param reuse The BinaryEncoder to attempt to reuse given the factory + * configuration. A BinaryEncoder implementation may not be + * compatible with reuse, causing a new instance to be returned. If + * null, a new instance is returned. + * @return A BinaryEncoder that uses out as its data output. If + * reuse is null, this will be a new instance. If reuse is + * not null, then the returned instance may be a new instance or + * reuse reconfigured to use out. + * @see DirectBinaryEncoder + * @see Encoder + */ + public BinaryEncoder blockingDirectBinaryEncoder(OutputStream out, BinaryEncoder reuse) { + if (null == reuse || !reuse.getClass().equals(BlockingDirectBinaryEncoder.class)) { + return new BlockingDirectBinaryEncoder(out); + } else { + return ((DirectBinaryEncoder) reuse).configure(out); + } + } + /** * Creates or reinitializes a {@link BinaryEncoder} with the OutputStream * provided as the destination for written data. If reuse is provided, an diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java index 1d9009aacb1..1f699ea8266 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java @@ -181,6 +181,48 @@ void directBinaryEncoder() throws IOException { assertArrayEquals(complexdata, result2); } + @Test + void blockingDirectBinaryEncoder() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder e = factory.blockingDirectBinaryEncoder(baos, null); + generateData(e, true); + + byte[] result = baos.toByteArray(); + assertEquals(legacydata.length, result.length); + assertArrayEquals(legacydata, result); + baos.reset(); + + generateComplexData(e); + byte[] result2 = baos.toByteArray(); + // blocking will cause different length, should be two bytes larger + assertEquals(complexdata.length + 2, result2.length); + // the first byte is the array start, with the count of items negative + assertEquals(complexdata[0] >>> 1, result2[0]); + baos.reset(); + + e.writeArrayStart(); + e.setItemCount(1); + e.startItem(); + e.writeInt(1); + e.writeArrayEnd(); + + // 1: 1 element in the array + // 2: 1 byte for the int + // 3: zigzag encoded int + // 4: 0 elements in the next block + assertArrayEquals(baos.toByteArray(), new byte[] { 1, 2, 2, 0 }); + baos.reset(); + + e.writeArrayStart(); + e.setItemCount(0); + e.writeArrayEnd(); + + // This is correct + // 0: 0 elements in the block + assertArrayEquals(baos.toByteArray(), new byte[] { 0 }); + baos.reset(); + } + @Test void blockingBinaryEncoder() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingDirectBinaryEncoder.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingDirectBinaryEncoder.java new file mode 100644 index 00000000000..27d23916968 --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingDirectBinaryEncoder.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.specific.TestRecordWithMapsAndArrays; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.*; + +public class TestBlockingDirectBinaryEncoder { + + @Test + void blockingDirectBinaryEncoder() throws IOException, NoSuchAlgorithmException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().blockingDirectBinaryEncoder(baos, null); + + // This is needed because there is no BlockingDirectBinaryEncoder + // BinaryMessageWriter + // available out of the box + encoder.writeFixed(new byte[] { (byte) 0xC3, (byte) 0x01 }); + encoder.writeFixed(SchemaNormalization.parsingFingerprint("CRC-64-AVRO", TestRecordWithMapsAndArrays.SCHEMA$)); + + int len = 5; + + encoder.writeArrayStart(); + encoder.setItemCount(len); + for (int i = 0; i < len; i++) { + encoder.startItem(); + encoder.writeString(Integer.toString(i)); + } + encoder.writeArrayEnd(); + + encoder.writeMapStart(); + encoder.setItemCount(len); + for (long i = 0; i < len; i++) { + encoder.startItem(); + encoder.writeString(Long.toString(i)); + encoder.writeLong(i); + } + encoder.writeMapEnd(); + encoder.flush(); + + BinaryMessageDecoder decoder = TestRecordWithMapsAndArrays.getDecoder(); + TestRecordWithMapsAndArrays r = decoder.decode(baos.toByteArray()); + + assertThat(r.getArr(), is(Arrays.asList("0", "1", "2", "3", "4"))); + Map map = r.getMap(); + assertThat(map.size(), is(5)); + for (long i = 0; i < len; i++) { + assertThat(map.get(Long.toString(i)), is(i)); + } + } + + @Test + void testSkippingUsingBlocks() throws IOException, NoSuchAlgorithmException { + // Create an empty schema for read, so we skip over all the fields + Schema emptySchema = new Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"TestRecordWithMapsAndArrays\",\"namespace\":\"org.apache.avro.specific\",\"fields\":[]}"); + + GenericDatumReader in = new GenericDatumReader<>(TestRecordWithMapsAndArrays.SCHEMA$, emptySchema); + Decoder mockDecoder = mock(BinaryDecoder.class); + + for (long i = 0; i < 1; i++) { + in.read(null, mockDecoder); + } + + verify(mockDecoder, times(1)).skipMap(); + verify(mockDecoder, times(1)).skipArray(); + verify(mockDecoder, times(0)).readString(); + verify(mockDecoder, times(0)).readLong(); + } +} diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java index 2995bf56709..51ef375e307 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java @@ -54,7 +54,7 @@ public class TestEncoders { private static final int ENCODER_BUFFER_SIZE = 32; private static final int EXAMPLE_DATA_SIZE = 17; - private static EncoderFactory factory = EncoderFactory.get(); + private static final EncoderFactory FACTORY = EncoderFactory.get(); @TempDir public Path dataDir; @@ -62,14 +62,14 @@ public class TestEncoders { @Test void binaryEncoderInit() throws IOException { OutputStream out = new ByteArrayOutputStream(); - BinaryEncoder enc = factory.binaryEncoder(out, null); - assertSame(enc, factory.binaryEncoder(out, enc)); + BinaryEncoder enc = FACTORY.binaryEncoder(out, null); + assertSame(enc, FACTORY.binaryEncoder(out, enc)); } @Test void badBinaryEncoderInit() { assertThrows(NullPointerException.class, () -> { - factory.binaryEncoder(null, null); + FACTORY.binaryEncoder(null, null); }); } @@ -77,29 +77,43 @@ void badBinaryEncoderInit() { void blockingBinaryEncoderInit() throws IOException { OutputStream out = new ByteArrayOutputStream(); BinaryEncoder reuse = null; - reuse = factory.blockingBinaryEncoder(out, reuse); - assertSame(reuse, factory.blockingBinaryEncoder(out, reuse)); + reuse = FACTORY.blockingBinaryEncoder(out, reuse); + assertSame(reuse, FACTORY.blockingBinaryEncoder(out, reuse)); // comparison } @Test void badBlockintBinaryEncoderInit() { assertThrows(NullPointerException.class, () -> { - factory.binaryEncoder(null, null); + FACTORY.binaryEncoder(null, null); }); } @Test void directBinaryEncoderInit() throws IOException { OutputStream out = new ByteArrayOutputStream(); - BinaryEncoder enc = factory.directBinaryEncoder(out, null); - assertSame(enc, factory.directBinaryEncoder(out, enc)); + BinaryEncoder enc = FACTORY.directBinaryEncoder(out, null); + assertSame(enc, FACTORY.directBinaryEncoder(out, enc)); } @Test void badDirectBinaryEncoderInit() { assertThrows(NullPointerException.class, () -> { - factory.directBinaryEncoder(null, null); + FACTORY.directBinaryEncoder(null, null); + }); + } + + @Test + void blockingDirectBinaryEncoderInit() throws IOException { + OutputStream out = new ByteArrayOutputStream(); + BinaryEncoder enc = FACTORY.blockingDirectBinaryEncoder(out, null); + assertSame(enc, FACTORY.blockingDirectBinaryEncoder(out, enc)); + } + + @Test + void badBlockingDirectBinaryEncoderInit() { + assertThrows(NullPointerException.class, () -> { + FACTORY.blockingDirectBinaryEncoder(null, null); }); } @@ -107,22 +121,22 @@ void badDirectBinaryEncoderInit() { void jsonEncoderInit() throws IOException { Schema s = new Schema.Parser().parse("\"int\""); OutputStream out = new ByteArrayOutputStream(); - factory.jsonEncoder(s, out); - JsonEncoder enc = factory.jsonEncoder(s, new JsonFactory().createGenerator(out, JsonEncoding.UTF8)); + FACTORY.jsonEncoder(s, out); + JsonEncoder enc = FACTORY.jsonEncoder(s, new JsonFactory().createGenerator(out, JsonEncoding.UTF8)); enc.configure(out); } @Test void badJsonEncoderInitOS() throws IOException { assertThrows(NullPointerException.class, () -> { - factory.jsonEncoder(Schema.create(Type.INT), (OutputStream) null); + FACTORY.jsonEncoder(Schema.create(Type.INT), (OutputStream) null); }); } @Test void badJsonEncoderInit() throws IOException { assertThrows(NullPointerException.class, () -> { - factory.jsonEncoder(Schema.create(Type.INT), (JsonGenerator) null); + FACTORY.jsonEncoder(Schema.create(Type.INT), (JsonGenerator) null); }); } @@ -130,7 +144,7 @@ void badJsonEncoderInit() throws IOException { void jsonEncoderNewlineDelimited() throws IOException { OutputStream out = new ByteArrayOutputStream(); Schema ints = Schema.create(Type.INT); - Encoder e = factory.jsonEncoder(ints, out); + Encoder e = FACTORY.jsonEncoder(ints, out); String separator = System.getProperty("line.separator"); GenericDatumWriter writer = new GenericDatumWriter<>(ints); writer.write(1, e); @@ -169,8 +183,8 @@ void jsonEncoderWhenIncludeNamespaceOptionIsTrue() throws IOException { void validatingEncoderInit() throws IOException { Schema s = new Schema.Parser().parse("\"int\""); OutputStream out = new ByteArrayOutputStream(); - Encoder e = factory.directBinaryEncoder(out, null); - factory.validatingEncoder(s, e).configure(e); + Encoder e = FACTORY.directBinaryEncoder(out, null); + FACTORY.validatingEncoder(s, e).configure(e); } @Test @@ -324,7 +338,7 @@ private String fromAvroToJson(byte[] avroBytes, Schema schema, boolean includeNa DatumWriter writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream output = new ByteArrayOutputStream(); - JsonEncoder encoder = factory.jsonEncoder(schema, output); + JsonEncoder encoder = FACTORY.jsonEncoder(schema, output); encoder.setIncludeNamespace(includeNamespace); Decoder decoder = DecoderFactory.get().binaryDecoder(avroBytes, null); Object datum = reader.read(null, decoder); @@ -340,7 +354,7 @@ public void testJsonEncoderInitAutoFlush() throws IOException { Schema s = new Schema.Parser().parse("\"int\""); OutputStream baos = new ByteArrayOutputStream(); OutputStream out = new BufferedOutputStream(baos); - JsonEncoder enc = factory.jsonEncoder(s, out, false); + JsonEncoder enc = FACTORY.jsonEncoder(s, out, false); enc.configure(out, false); enc.writeInt(24); enc.flush(); @@ -354,7 +368,7 @@ public void testJsonEncoderInitAutoFlushDisabled() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStream out = new BufferedOutputStream(baos); Schema ints = Schema.create(Type.INT); - Encoder e = factory.jsonEncoder(ints, out, false, false); + Encoder e = FACTORY.jsonEncoder(ints, out, false, false); String separator = System.getProperty("line.separator"); GenericDatumWriter writer = new GenericDatumWriter(ints); writer.write(1, e); diff --git a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java index c2e1ebd384c..1763a73144c 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java +++ b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java @@ -1,7 +1,19 @@ -/** - * Autogenerated by Avro +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * DO NOT EDIT DIRECTLY + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.avro.specific; diff --git a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithMapsAndArrays.java b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithMapsAndArrays.java new file mode 100644 index 00000000000..7150f500143 --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithMapsAndArrays.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.specific; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class TestRecordWithMapsAndArrays extends org.apache.avro.specific.SpecificRecordBase + implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 3113266652594662627L; + + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"TestRecordWithMapsAndArrays\",\"namespace\":\"org.apache.avro.specific\",\"fields\":[{\"name\":\"arr\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":[]}},{\"name\":\"map\",\"type\":{\"type\":\"map\",\"values\":\"long\",\"avro.java.string\":\"String\",\"default\":{}}}]}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA$; + } + + private static final SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = new BinaryMessageEncoder<>(MODEL$, + SCHEMA$); + + private static final BinaryMessageDecoder DECODER = new BinaryMessageDecoder<>(MODEL$, + SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the + * specified {@link SchemaStore}. + * + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given + * SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this TestRecordWithMapsAndArrays to a ByteBuffer. + * + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a TestRecordWithMapsAndArrays from a ByteBuffer. + * + * @param b a byte buffer holding serialized data for an instance of this class + * @return a TestRecordWithMapsAndArrays instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into + * an instance of this class + */ + public static TestRecordWithMapsAndArrays fromByteBuffer(java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + private java.util.List arr; + private java.util.Map map; + + /** + * Default constructor. Note that this does not initialize fields to their + * default values from the schema. If that is desired then one should use + * newBuilder(). + */ + public TestRecordWithMapsAndArrays() { + } + + /** + * All-args constructor. + * + * @param arr The new value for arr + * @param map The new value for map + */ + public TestRecordWithMapsAndArrays(java.util.List arr, + java.util.Map map) { + this.arr = arr; + this.map = map; + } + + @Override + public org.apache.avro.specific.SpecificData getSpecificData() { + return MODEL$; + } + + @Override + public org.apache.avro.Schema getSchema() { + return SCHEMA$; + } + + // Used by DatumWriter. Applications should not call. + @Override + public java.lang.Object get(int field$) { + switch (field$) { + case 0: + return arr; + case 1: + return map; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value = "unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: + arr = (java.util.List) value$; + break; + case 1: + map = (java.util.Map) value$; + break; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'arr' field. + * + * @return The value of the 'arr' field. + */ + public java.util.List getArr() { + return arr; + } + + /** + * Sets the value of the 'arr' field. + * + * @param value the value to set. + */ + public void setArr(java.util.List value) { + this.arr = value; + } + + /** + * Gets the value of the 'map' field. + * + * @return The value of the 'map' field. + */ + public java.util.Map getMap() { + return map; + } + + /** + * Sets the value of the 'map' field. + * + * @param value the value to set. + */ + public void setMap(java.util.Map value) { + this.map = value; + } + + /** + * Creates a new TestRecordWithMapsAndArrays RecordBuilder. + * + * @return A new TestRecordWithMapsAndArrays RecordBuilder + */ + public static org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder newBuilder() { + return new org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder(); + } + + /** + * Creates a new TestRecordWithMapsAndArrays RecordBuilder by copying an + * existing Builder. + * + * @param other The existing builder to copy. + * @return A new TestRecordWithMapsAndArrays RecordBuilder + */ + public static org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder newBuilder( + org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder other) { + if (other == null) { + return new org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder(); + } else { + return new org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder(other); + } + } + + /** + * Creates a new TestRecordWithMapsAndArrays RecordBuilder by copying an + * existing TestRecordWithMapsAndArrays instance. + * + * @param other The existing instance to copy. + * @return A new TestRecordWithMapsAndArrays RecordBuilder + */ + public static org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder newBuilder( + org.apache.avro.specific.TestRecordWithMapsAndArrays other) { + if (other == null) { + return new org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder(); + } else { + return new org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder(other); + } + } + + /** + * RecordBuilder for TestRecordWithMapsAndArrays instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private java.util.List arr; + private java.util.Map map; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * + * @param other The existing Builder to copy. + */ + private Builder(org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder other) { + super(other); + if (isValidValue(fields()[0], other.arr)) { + this.arr = data().deepCopy(fields()[0].schema(), other.arr); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.map)) { + this.map = data().deepCopy(fields()[1].schema(), other.map); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + } + + /** + * Creates a Builder by copying an existing TestRecordWithMapsAndArrays instance + * + * @param other The existing instance to copy. + */ + private Builder(org.apache.avro.specific.TestRecordWithMapsAndArrays other) { + super(SCHEMA$, MODEL$); + if (isValidValue(fields()[0], other.arr)) { + this.arr = data().deepCopy(fields()[0].schema(), other.arr); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.map)) { + this.map = data().deepCopy(fields()[1].schema(), other.map); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'arr' field. + * + * @return The value. + */ + public java.util.List getArr() { + return arr; + } + + /** + * Sets the value of the 'arr' field. + * + * @param value The value of 'arr'. + * @return This builder. + */ + public org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder setArr(java.util.List value) { + validate(fields()[0], value); + this.arr = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'arr' field has been set. + * + * @return True if the 'arr' field has been set, false otherwise. + */ + public boolean hasArr() { + return fieldSetFlags()[0]; + } + + /** + * Clears the value of the 'arr' field. + * + * @return This builder. + */ + public org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder clearArr() { + arr = null; + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'map' field. + * + * @return The value. + */ + public java.util.Map getMap() { + return map; + } + + /** + * Sets the value of the 'map' field. + * + * @param value The value of 'map'. + * @return This builder. + */ + public org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder setMap( + java.util.Map value) { + validate(fields()[1], value); + this.map = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'map' field has been set. + * + * @return True if the 'map' field has been set, false otherwise. + */ + public boolean hasMap() { + return fieldSetFlags()[1]; + } + + /** + * Clears the value of the 'map' field. + * + * @return This builder. + */ + public org.apache.avro.specific.TestRecordWithMapsAndArrays.Builder clearMap() { + map = null; + fieldSetFlags()[1] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public TestRecordWithMapsAndArrays build() { + try { + TestRecordWithMapsAndArrays record = new TestRecordWithMapsAndArrays(); + record.arr = fieldSetFlags()[0] ? this.arr : (java.util.List) defaultValue(fields()[0]); + record.map = fieldSetFlags()[1] ? this.map + : (java.util.Map) defaultValue(fields()[1]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter WRITER$ = (org.apache.avro.io.DatumWriter) MODEL$ + .createDatumWriter(SCHEMA$); + + @Override + public void writeExternal(java.io.ObjectOutput out) throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader READER$ = (org.apache.avro.io.DatumReader) MODEL$ + .createDatumReader(SCHEMA$); + + @Override + public void readExternal(java.io.ObjectInput in) throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override + protected boolean hasCustomCoders() { + return true; + } + + @Override + public void customEncode(org.apache.avro.io.Encoder out) throws java.io.IOException { + long size0 = this.arr.size(); + out.writeArrayStart(); + out.setItemCount(size0); + long actualSize0 = 0; + for (java.lang.String e0 : this.arr) { + actualSize0++; + out.startItem(); + out.writeString(e0); + } + out.writeArrayEnd(); + if (actualSize0 != size0) + throw new java.util.ConcurrentModificationException( + "Array-size written was " + size0 + ", but element count was " + actualSize0 + "."); + + long size1 = this.map.size(); + out.writeMapStart(); + out.setItemCount(size1); + long actualSize1 = 0; + for (java.util.Map.Entry e1 : this.map.entrySet()) { + actualSize1++; + out.startItem(); + out.writeString(e1.getKey()); + java.lang.Long v1 = e1.getValue(); + out.writeLong(v1); + } + out.writeMapEnd(); + if (actualSize1 != size1) + throw new java.util.ConcurrentModificationException( + "Map-size written was " + size1 + ", but element count was " + actualSize1 + "."); + + } + + @Override + public void customDecode(org.apache.avro.io.ResolvingDecoder in) throws java.io.IOException { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + long size0 = in.readArrayStart(); + java.util.List a0 = this.arr; + if (a0 == null) { + a0 = new SpecificData.Array((int) size0, SCHEMA$.getField("arr").schema()); + this.arr = a0; + } else + a0.clear(); + SpecificData.Array ga0 = (a0 instanceof SpecificData.Array + ? (SpecificData.Array) a0 + : null); + for (; 0 < size0; size0 = in.arrayNext()) { + for (; size0 != 0; size0--) { + java.lang.String e0 = (ga0 != null ? ga0.peek() : null); + e0 = in.readString(); + a0.add(e0); + } + } + + long size1 = in.readMapStart(); + java.util.Map m1 = this.map; // Need fresh name due to limitation of macro + // system + if (m1 == null) { + m1 = new java.util.HashMap((int) size1); + this.map = m1; + } else + m1.clear(); + for (; 0 < size1; size1 = in.mapNext()) { + for (; size1 != 0; size1--) { + java.lang.String k1 = null; + k1 = in.readString(); + java.lang.Long v1 = null; + v1 = in.readLong(); + m1.put(k1, v1); + } + } + + } else { + for (int i = 0; i < 2; i++) { + switch (fieldOrder[i].pos()) { + case 0: + long size0 = in.readArrayStart(); + java.util.List a0 = this.arr; + if (a0 == null) { + a0 = new SpecificData.Array((int) size0, SCHEMA$.getField("arr").schema()); + this.arr = a0; + } else + a0.clear(); + SpecificData.Array ga0 = (a0 instanceof SpecificData.Array + ? (SpecificData.Array) a0 + : null); + for (; 0 < size0; size0 = in.arrayNext()) { + for (; size0 != 0; size0--) { + java.lang.String e0 = (ga0 != null ? ga0.peek() : null); + e0 = in.readString(); + a0.add(e0); + } + } + break; + + case 1: + long size1 = in.readMapStart(); + java.util.Map m1 = this.map; // Need fresh name due to limitation of macro + // system + if (m1 == null) { + m1 = new java.util.HashMap((int) size1); + this.map = m1; + } else + m1.clear(); + for (; 0 < size1; size1 = in.mapNext()) { + for (; size1 != 0; size1--) { + java.lang.String k1 = null; + k1 = in.readString(); + java.lang.Long v1 = null; + v1 = in.readLong(); + m1.put(k1, v1); + } + } + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} diff --git a/lang/java/avro/src/test/resources/TestRecordWithMapsAndArrays.avsc b/lang/java/avro/src/test/resources/TestRecordWithMapsAndArrays.avsc new file mode 100644 index 00000000000..e2bc0382b04 --- /dev/null +++ b/lang/java/avro/src/test/resources/TestRecordWithMapsAndArrays.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "TestRecordWithMapsAndArrays", + "namespace": "org.apache.avro.specific", + "fields": [ + { + "name": "arr", + "type": { + "type": "array", + "items": "string", + "default": [] + } + }, + { + "name": "map", + "type": { + "type": "map", + "values": "long", + "default": {} + } + } + ] +}