From 95edea45415503f32f0428c520846c89ab920443 Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Mon, 28 Aug 2023 18:11:58 -0500 Subject: [PATCH] fix: code review --- java/source/dataset.rst | 88 -------------------- java/source/io.rst | 172 +++++++++++++++++++++------------------- 2 files changed, 92 insertions(+), 168 deletions(-) diff --git a/java/source/dataset.rst b/java/source/dataset.rst index 857956ce..2ac3fa77 100644 --- a/java/source/dataset.rst +++ b/java/source/dataset.rst @@ -534,91 +534,3 @@ Let's read a CSV file. Total batch size: 3 .. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html - - -Write Parquet Files -=================== - -Let's read an Arrow file and populate that data into a Parquet file. - -.. testcode:: - - import java.io.IOException; - import java.nio.file.DirectoryStream; - import java.nio.file.Files; - import java.nio.file.Path; - import java.nio.file.Paths; - - import org.apache.arrow.dataset.file.DatasetFileWriter; - import org.apache.arrow.dataset.file.FileFormat; - import org.apache.arrow.dataset.file.FileSystemDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.scanner.Scanner; - import org.apache.arrow.dataset.source.Dataset; - import org.apache.arrow.dataset.source.DatasetFactory; - import org.apache.arrow.memory.BufferAllocator; - import org.apache.arrow.memory.RootAllocator; - import org.apache.arrow.vector.ipc.ArrowFileReader; - import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.ipc.SeekableReadChannel; - import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; - - // read arrow demo data - Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow"); - try ( - BufferAllocator allocator = new RootAllocator(); - ArrowFileReader readerForDemoData = new ArrowFileReader( - new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel( - Files.readAllBytes(uriRead))), allocator) - ) { - Path uriWrite = Files.createTempDirectory("parquet_"); - // write data for new parquet file - DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString()); - // validate data of parquet file just created - ScanOptions options = new ScanOptions(/*batchSize*/ 32768); - try ( - DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, - NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString()); - Dataset dataset = datasetFactory.finish(); - Scanner scanner = dataset.newScan(options); - ArrowReader readerForFileCreated = scanner.scanBatches() - ) { - while (readerForFileCreated.loadNextBatch()) { - System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString()); - System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount()); - } - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - // delete temporary parquet file created - try (DirectoryStream dir = Files.newDirectoryStream(uriWrite)) { - uriWrite.toFile().deleteOnExit(); - for (Path path : dir) { - path.toFile().deleteOnExit(); - } - } - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - -.. testoutput:: - - name age - David 10 - Gladis 20 - Juan 30 - RowCount: 3 - name age - Nidia 15 - Alexa 20 - Mara 15 - RowCount: 3 - name age - Raul 34 - Jhon 29 - Thomy 33 - RowCount: 3 \ No newline at end of file diff --git a/java/source/io.rst b/java/source/io.rst index 7b66fd67..35a9502c 100644 --- a/java/source/io.rst +++ b/java/source/io.rst @@ -263,6 +263,93 @@ Write - Out to Buffer Number of rows written: 3 +Write Parquet Files +******************* + +Let's read an Arrow file and populate that data into a Parquet file. + +.. testcode:: + + import java.io.IOException; + import java.nio.file.DirectoryStream; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.Paths; + + import org.apache.arrow.dataset.file.DatasetFileWriter; + import org.apache.arrow.dataset.file.FileFormat; + import org.apache.arrow.dataset.file.FileSystemDatasetFactory; + import org.apache.arrow.dataset.jni.NativeMemoryPool; + import org.apache.arrow.dataset.scanner.ScanOptions; + import org.apache.arrow.dataset.scanner.Scanner; + import org.apache.arrow.dataset.source.Dataset; + import org.apache.arrow.dataset.source.DatasetFactory; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.ipc.ArrowFileReader; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.arrow.vector.ipc.SeekableReadChannel; + import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + + // read arrow demo data + Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow"); + try ( + BufferAllocator allocator = new RootAllocator(); + ArrowFileReader readerForDemoData = new ArrowFileReader( + new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel( + Files.readAllBytes(uriRead))), allocator) + ) { + Path uriWrite = Files.createTempDirectory("parquet_"); + // write data for new parquet file + DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString()); + // validate data of parquet file just created + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader readerForFileCreated = scanner.scanBatches() + ) { + while (readerForFileCreated.loadNextBatch()) { + System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString()); + System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount()); + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + // delete temporary parquet file created + try (DirectoryStream dir = Files.newDirectoryStream(uriWrite)) { + uriWrite.toFile().deleteOnExit(); + for (Path path : dir) { + path.toFile().deleteOnExit(); + } + } + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + +.. testoutput:: + + name age + David 10 + Gladis 20 + Juan 30 + RowCount: 3 + name age + Nidia 15 + Alexa 20 + Mara 15 + RowCount: 3 + name age + Raul 34 + Jhon 29 + Thomy 33 + RowCount: 3 + Reading ======= @@ -461,8 +548,8 @@ Reading Parquet File Please check :doc:`Dataset <./dataset>` -Handling Data with Dictionaries -******************************* +Reading Data with Dictionaries +****************************** Reading and writing dictionary-encoded data requires separately tracking the dictionaries. @@ -580,8 +667,8 @@ Reading and writing dictionary-encoded data requires separately tracking the dic Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia] Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda] -Customize Logic to Read Dataset -=============================== +Reading Custom Dataset +********************** If you need to implement a custom dataset reader, consider extending `ArrowReader`_ class. @@ -593,81 +680,6 @@ The ArrowReader class can be extended as follows: 4. At the end don’t forget to define the logic to ``closeReadSource()``. 5. Make sure you define the logic for closing the ``closeReadSource()`` at the end. -For example, let's create a custom JDBCReader reader. - -.. code-block:: java - - import java.io.IOException; - - import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; - import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; - import org.apache.arrow.memory.BufferAllocator; - import org.apache.arrow.vector.VectorSchemaRoot; - import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.types.pojo.Schema; - - class JDBCReader extends ArrowReader { - private final ArrowVectorIterator iter; - private final JdbcToArrowConfig config; - private VectorSchemaRoot root; - private boolean firstRoot = true; - - public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) { - super(allocator); - this.iter = iter; - this.config = config; - } - - @Override - public boolean loadNextBatch() throws IOException { - if (firstRoot) { - firstRoot = false; - return true; - } - else { - if (iter.hasNext()) { - if (root != null && !config.isReuseVectorSchemaRoot()) { - root.close(); - } - else { - root.allocateNew(); - } - root = iter.next(); - return root.getRowCount() != 0; - } - else { - return false; - } - } - } - - @Override - public long bytesRead() { - return 0; - } - - @Override - protected void closeReadSource() throws IOException { - if (root != null && !config.isReuseVectorSchemaRoot()) { - root.close(); - } - } - - @Override - protected Schema readSchema() throws IOException { - return null; - } - - @Override - public VectorSchemaRoot getVectorSchemaRoot() throws IOException { - if (root == null) { - root = iter.next(); - } - return root; - } - } - - - +You could see and example of custom JDBC Reader at :doc:`Write ResultSet to Parquet File <./jdbc>` .. _`ArrowReader`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/vector/ipc/ArrowReader.html