Skip to content

Commit

Permalink
chore(rust): remove parquet logic from polars-arrow and consolidate…
Browse files Browse the repository at this point in the history
… logic in `polars-parquet` crate. (#12022)
  • Loading branch information
ritchie46 authored Oct 25, 2023
1 parent b97cf5b commit 667bcd4
Show file tree
Hide file tree
Showing 103 changed files with 533 additions and 313 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ polars-core = { version = "0.34.2", path = "crates/polars-core", default-feature
polars-plan = { version = "0.34.2", path = "crates/polars-plan", default-features = false }
polars-lazy = { version = "0.34.2", path = "crates/polars-lazy", default-features = false }
polars-pipe = { version = "0.34.2", path = "crates/polars-pipe", default-features = false }
polars-parquet = { version = "0.34.2", path = "crates/polars-parquet", default-features = false }
polars-row = { version = "0.34.2", path = "crates/polars-row", default-features = false }
polars-ffi = { version = "0.34.2", path = "crates/polars-ffi", default-features = false }
polars-ops = { version = "0.34.2", path = "crates/polars-ops", default-features = false }
Expand Down
38 changes: 0 additions & 38 deletions crates/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ ethnum = { workspace = true }
# To efficiently cast numbers to strings
lexical-core = { workspace = true, optional = true }

fallible-streaming-iterator = { workspace = true, optional = true }
regex = { workspace = true, optional = true }
regex-syntax = { version = "0.8", optional = true }
streaming-iterator = { workspace = true }
Expand All @@ -49,8 +48,6 @@ hex = { workspace = true, optional = true }
lz4 = { version = "1.24", optional = true }
zstd = { version = "0.13", optional = true }

base64 = { workspace = true, optional = true }

# to write to parquet as a stream
futures = { workspace = true, optional = true }

Expand All @@ -74,7 +71,6 @@ arrow-array = { workspace = true, optional = true }
arrow-buffer = { workspace = true, optional = true }
arrow-data = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
parquet2 = { workspace = true, optional = true, default-features = true, features = ["async"] }

[dev-dependencies]
avro-rs = { version = "0.13", features = ["snappy"] }
Expand Down Expand Up @@ -109,8 +105,6 @@ full = [
"io_ipc_write_async",
"io_ipc_read_async",
"io_ipc_compression",
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_avro_compression",
"io_avro_async",
Expand All @@ -126,38 +120,6 @@ io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]

# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = [
"parquet2",
"parquet2/async",
"polars-error/parquet2",
"io_ipc",
"base64",
"futures",
"fallible-streaming-iterator",
]

io_parquet_compression = [
"io_parquet_zstd",
"io_parquet_gzip",
"io_parquet_snappy",
"io_parquet_lz4",
"io_parquet_brotli",
]

# sample testing of generated arrow data
io_parquet_sample_test = ["io_parquet"]

# compression backends
io_parquet_zstd = ["parquet2/zstd"]
io_parquet_snappy = ["parquet2/snappy"]
io_parquet_gzip = ["parquet2/gzip"]
io_parquet_lz4 = ["parquet2/lz4"]
io_parquet_brotli = ["parquet2/brotli"]

# parquet bloom filter functions
io_parquet_bloom_filter = ["parquet2/bloom_filter"]

io_avro = ["avro-schema", "polars-error/avro-schema"]
io_avro_compression = [
"avro-schema/compression",
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl FixedSizeBinaryArray {
}
}

pub(crate) fn get_size(data_type: &DataType) -> usize {
pub fn get_size(data_type: &DataType) -> usize {
Self::maybe_get_size(data_type).unwrap()
}
}
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-arrow/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ pub mod ipc;
#[cfg_attr(docsrs, doc(cfg(feature = "io_flight")))]
pub mod flight;

#[cfg(feature = "io_parquet")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet")))]
pub mod parquet;

#[cfg(feature = "io_avro")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro")))]
pub mod avro;
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ dtype-u16 = []
dtype-categorical = []
dtype-struct = []

parquet = ["arrow/io_parquet"]

# scale to terabytes?
bigidx = ["arrow/bigidx", "polars-utils/bigidx"]
python = []
Expand Down
13 changes: 11 additions & 2 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ description = "IO related logic for the Polars DataFrame library"
polars-core = { workspace = true }
polars-error = { workspace = true }
polars-json = { workspace = true, optional = true }
polars-parquet = { workspace = true, optional = true }
polars-time = { workspace = true, features = [], optional = true }
polars-utils = { workspace = true }

Expand Down Expand Up @@ -92,8 +93,16 @@ dtype-struct = ["polars-core/dtype-struct"]
dtype-decimal = ["polars-core/dtype-decimal"]
fmt = ["polars-core/fmt"]
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression"]
async = ["async-trait", "futures", "tokio", "tokio-util", "arrow/io_ipc_write_async", "polars-error/regex"]
parquet = ["polars-parquet", "polars-parquet/compression"]
async = [
"async-trait",
"futures",
"tokio",
"tokio-util",
"arrow/io_ipc_write_async",
"polars-error/regex",
"polars-parquet?/async",
]
cloud = ["object_store", "async", "polars-error/object_store", "url"]
aws = ["object_store/aws", "cloud", "reqwest"]
azure = ["object_store/azure", "cloud"]
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/export.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(feature = "parquet")]
pub use arrow::io::parquet::read::statistics::Statistics as ParquetStatistics;
pub use polars_parquet::read::statistics::Statistics as ParquetStatistics;
#[cfg(feature = "parquet")]
pub use arrow::io::parquet::read::statistics::*;
pub use polars_parquet::read::statistics::*;
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::borrow::Cow;
use std::ops::Range;
use std::sync::Arc;

use arrow::io::parquet::read::{self as parquet2_read, RowGroupMetaData};
use arrow::io::parquet::write::FileMetaData;
use bytes::Bytes;
use futures::future::try_join_all;
use object_store::path::Path as ObjectPath;
Expand All @@ -14,6 +12,8 @@ use polars_core::datatypes::PlHashMap;
use polars_core::error::{to_compute_err, PolarsResult};
use polars_core::prelude::*;
use polars_core::schema::Schema;
use polars_parquet::read::{self as parquet2_read, RowGroupMetaData};
use polars_parquet::write::FileMetaData;
use smartstring::alias::String as SmartString;

use super::cloud::{build_object_store, CloudLocation, CloudReader};
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-io/src/parquet/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use arrow::datatypes::Field;
use arrow::io::parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
};
use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod read;
mod read_impl;
mod write;

use arrow::io::parquet::write::FileMetaData;
pub use polars_parquet::write::FileMetaData;
pub use read::*;
pub use write::{BrotliLevel, GzipLevel, ZstdLevel, *};

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow::io::parquet::read::statistics::{deserialize, Statistics};
use arrow::io::parquet::read::RowGroupMetaData;
use polars_core::prelude::*;
use polars_parquet::read::statistics::{deserialize, Statistics};
use polars_parquet::read::RowGroupMetaData;

use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr};

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::io::{Read, Seek};
use std::sync::Arc;

use arrow::io::parquet::read;
use arrow::io::parquet::write::FileMetaData;
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_parquet::read;
use polars_parquet::write::FileMetaData;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::ops::{Deref, Range};
use std::sync::Arc;

use arrow::array::new_empty_array;
use arrow::io::parquet::read;
use arrow::io::parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData};
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read;
use polars_parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData};
use rayon::prelude::*;

use super::mmap::ColumnStore;
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::io::Write;
use arrow::array::Array;
use arrow::chunk::Chunk;
use arrow::datatypes::{DataType as ArrowDataType, PhysicalType};
use arrow::io::parquet::read::ParquetError;
use arrow::io::parquet::write::{self, DynIter, DynStreamingIterator, Encoding, FileWriter, *};
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df};
use polars_core::POOL;
use polars_parquet::read::ParquetError;
use polars_parquet::write::{self, DynIter, DynStreamingIterator, Encoding, FileWriter, *};
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ version_check = { workspace = true }
[features]
nightly = ["polars-core/nightly", "polars-pipe?/nightly", "polars-plan/nightly"]
streaming = ["chunked_ids", "polars-pipe", "polars-plan/streaming", "polars-ops/chunked_ids"]
parquet = ["polars-core/parquet", "polars-io/parquet", "polars-plan/parquet", "polars-pipe?/parquet"]
parquet = ["polars-io/parquet", "polars-plan/parquet", "polars-pipe?/parquet"]
async = [
"polars-plan/async",
"polars-io/cloud",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::path::{Path, PathBuf};

use polars_core::config::{concurrent_download_limit, verbose};
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::utils::arrow::io::parquet::read::FileMetaData;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::FileMetaData;
use polars_io::{is_cloud_url, RowCount};

use super::*;
Expand Down
44 changes: 44 additions & 0 deletions crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[package]
name = "polars-parquet"
version = { workspace = true }
authors = [
"Jorge C. Leitao <[email protected]>",
"Apache Arrow <[email protected]>",
"Ritchie Vink <[email protected]>",
]
edition = { workspace = true }
homepage = { workspace = true }
license-file = "./LICENSE"
repository = { workspace = true }
description = "polars parquet IO"

[dependencies]
ahash = { workspace = true }
arrow = { workspace = true, features = ["io_ipc"] }
base64 = { workspace = true }
ethnum = { workspace = true }
fallible-streaming-iterator = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
num-traits = { workspace = true }
parquet2 = { workspace = true, optional = true, default-features = true, features = ["async"] }
polars-error = { workspace = true, features = ["parquet2"] }
simdutf8 = { workspace = true }

[features]
bloom_filter = ["parquet2/bloom_filter"]
async = ["futures"]

compression = [
"zstd",
"gzip",
"snappy",
"lz4",
"brotli",
]

# compression backends
zstd = ["parquet2/zstd"]
snappy = ["parquet2/snappy"]
gzip = ["parquet2/gzip"]
lz4 = ["parquet2/lz4"]
brotli = ["parquet2/brotli"]
Loading

0 comments on commit 667bcd4

Please sign in to comment.