Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
davisusanibar committed Aug 28, 2023
1 parent b81312a commit 95edea4
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 168 deletions.
88 changes: 0 additions & 88 deletions java/source/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -534,91 +534,3 @@ Let's read a CSV file.
Total batch size: 3

.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html


Write Parquet Files
===================

Let's read an Arrow file and populate that data into a Parquet file.

.. testcode::

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.arrow.dataset.file.DatasetFileWriter;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

// read arrow demo data
Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow");
try (
BufferAllocator allocator = new RootAllocator();
ArrowFileReader readerForDemoData = new ArrowFileReader(
new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(
Files.readAllBytes(uriRead))), allocator)
) {
Path uriWrite = Files.createTempDirectory("parquet_");
// write data for new parquet file
DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString());
// validate data of parquet file just created
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString());
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader readerForFileCreated = scanner.scanBatches()
) {
while (readerForFileCreated.loadNextBatch()) {
System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString());
System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount());
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
// delete temporary parquet file created
try (DirectoryStream<Path> dir = Files.newDirectoryStream(uriWrite)) {
uriWrite.toFile().deleteOnExit();
for (Path path : dir) {
path.toFile().deleteOnExit();
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

.. testoutput::

name age
David 10
Gladis 20
Juan 30
RowCount: 3
name age
Nidia 15
Alexa 20
Mara 15
RowCount: 3
name age
Raul 34
Jhon 29
Thomy 33
RowCount: 3
172 changes: 92 additions & 80 deletions java/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,93 @@ Write - Out to Buffer

Number of rows written: 3

Write Parquet Files
*******************

Let's read an Arrow file and populate that data into a Parquet file.

.. testcode::

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.arrow.dataset.file.DatasetFileWriter;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

// read arrow demo data
Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow");
try (
BufferAllocator allocator = new RootAllocator();
ArrowFileReader readerForDemoData = new ArrowFileReader(
new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(
Files.readAllBytes(uriRead))), allocator)
) {
Path uriWrite = Files.createTempDirectory("parquet_");
// write data for new parquet file
DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString());
// validate data of parquet file just created
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString());
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader readerForFileCreated = scanner.scanBatches()
) {
while (readerForFileCreated.loadNextBatch()) {
System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString());
System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount());
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
// delete temporary parquet file created
try (DirectoryStream<Path> dir = Files.newDirectoryStream(uriWrite)) {
uriWrite.toFile().deleteOnExit();
for (Path path : dir) {
path.toFile().deleteOnExit();
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

.. testoutput::

name age
David 10
Gladis 20
Juan 30
RowCount: 3
name age
Nidia 15
Alexa 20
Mara 15
RowCount: 3
name age
Raul 34
Jhon 29
Thomy 33
RowCount: 3

Reading
=======

Expand Down Expand Up @@ -461,8 +548,8 @@ Reading Parquet File

Please check :doc:`Dataset <./dataset>`

Handling Data with Dictionaries
*******************************
Reading Data with Dictionaries
******************************

Reading and writing dictionary-encoded data requires separately tracking the dictionaries.

Expand Down Expand Up @@ -580,8 +667,8 @@ Reading and writing dictionary-encoded data requires separately tracking the dic
Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia]
Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda]

Customize Logic to Read Dataset
===============================
Reading Custom Dataset
**********************

If you need to implement a custom dataset reader, consider extending `ArrowReader`_ class.

Expand All @@ -593,81 +680,6 @@ The ArrowReader class can be extended as follows:
4. At the end don’t forget to define the logic to ``closeReadSource()``.
5. Make sure you define the logic for closing the ``closeReadSource()`` at the end.

For example, let's create a custom JDBCReader reader.

.. code-block:: java
import java.io.IOException;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
class JDBCReader extends ArrowReader {
private final ArrowVectorIterator iter;
private final JdbcToArrowConfig config;
private VectorSchemaRoot root;
private boolean firstRoot = true;
public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) {
super(allocator);
this.iter = iter;
this.config = config;
}
@Override
public boolean loadNextBatch() throws IOException {
if (firstRoot) {
firstRoot = false;
return true;
}
else {
if (iter.hasNext()) {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
else {
root.allocateNew();
}
root = iter.next();
return root.getRowCount() != 0;
}
else {
return false;
}
}
}
@Override
public long bytesRead() {
return 0;
}
@Override
protected void closeReadSource() throws IOException {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
}
@Override
protected Schema readSchema() throws IOException {
return null;
}
@Override
public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
if (root == null) {
root = iter.next();
}
return root;
}
}
You could see and example of custom JDBC Reader at :doc:`Write ResultSet to Parquet File <./jdbc>`

.. _`ArrowReader`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/vector/ipc/ArrowReader.html

0 comments on commit 95edea4

Please sign in to comment.