diff --git a/Cargo.toml b/Cargo.toml index c1f8a604dd58..0952c1788746 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "datafusion/common", "datafusion/common-runtime", "datafusion/catalog", + "datafusion/catalog-listing", "datafusion/core", "datafusion/expr", "datafusion/expr-common", @@ -100,6 +101,7 @@ ctor = "0.2.9" dashmap = "6.0.1" datafusion = { path = "datafusion/core", version = "45.0.0", default-features = false } datafusion-catalog = { path = "datafusion/catalog", version = "45.0.0" } +datafusion-catalog-listing = { path = "datafusion/catalog-listing", version = "45.0.0" } datafusion-common = { path = "datafusion/common", version = "45.0.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "45.0.0" } datafusion-doc = { path = "datafusion/doc", version = "45.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index bcbee29d6b5b..d1107d2a7168 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1219,6 +1219,7 @@ dependencies = [ "bzip2 0.5.0", "chrono", "datafusion-catalog", + "datafusion-catalog-listing", "datafusion-common", "datafusion-common-runtime", "datafusion-execution", @@ -1274,6 +1275,28 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "datafusion-catalog-listing" +version = "45.0.0" +dependencies = [ + "arrow", + "arrow-schema", + "chrono", + "datafusion-catalog", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "futures", + "glob", + "itertools 0.14.0", + "log", + "object_store", + "url", +] + [[package]] name = "datafusion-cli" version = "45.0.0" diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml new file mode 100644 index 000000000000..03132e7b7bb5 --- /dev/null +++ b/datafusion/catalog-listing/Cargo.toml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-catalog-listing" +description = "datafusion-catalog-listing" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +arrow = { workspace = true } +arrow-schema = { workspace = true } +async-compression = { version = "0.4.0", features = [ + "bzip2", + "gzip", + "xz", + "zstd", + "tokio", +], optional = true } +chrono = { workspace = true } +datafusion-catalog = { workspace = true } +datafusion-common = { workspace = true, features = ["object_store"] } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +datafusion-physical-plan = { workspace = true } +futures = { workspace = true } +glob = "0.3.0" +itertools = { workspace = true } +log = { workspace = true } +object_store = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +async-trait = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_catalog_listing" +path = "src/mod.rs" diff --git a/datafusion/catalog-listing/LICENSE.txt b/datafusion/catalog-listing/LICENSE.txt new file mode 120000 index 000000000000..1ef648f64b34 --- /dev/null +++ b/datafusion/catalog-listing/LICENSE.txt @@ -0,0 +1 @@ +../../LICENSE.txt \ No newline at end of file diff --git a/datafusion/catalog-listing/NOTICE.txt b/datafusion/catalog-listing/NOTICE.txt new file mode 120000 index 000000000000..fb051c92b10b --- /dev/null +++ b/datafusion/catalog-listing/NOTICE.txt @@ -0,0 +1 @@ +../../NOTICE.txt \ No newline at end of file diff --git a/datafusion/catalog-listing/README.md b/datafusion/catalog-listing/README.md new file mode 100644 index 000000000000..b4760c413d60 --- /dev/null +++ b/datafusion/catalog-listing/README.md @@ -0,0 +1,30 @@ + + +# DataFusion catalog-listing + +[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. + +This crate is a submodule of DataFusion with [ListingTable], an implementation +of [TableProvider] based on files in a directory (either locally or on remote +object storage such as S3). + +[df]: https://crates.io/crates/datafusion +[listingtable]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html +[tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/catalog-listing/src/helpers.rs similarity index 91% rename from datafusion/core/src/datasource/listing/helpers.rs rename to datafusion/catalog-listing/src/helpers.rs index 228b9a4e9f6b..6cb3f661e652 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use super::ListingTableUrl; use super::PartitionedFile; -use crate::execution::context::SessionState; +use datafusion_catalog::Session; use datafusion_common::internal_err; use datafusion_common::{HashMap, Result, ScalarValue}; use datafusion_expr::{BinaryExpr, Operator}; @@ -154,7 +154,7 @@ pub fn split_files( chunks } -struct Partition { +pub struct Partition { /// The path to the partition, including the table prefix path: Path, /// How many path segments below the table prefix `path` contains @@ -183,7 +183,7 @@ impl Partition { } /// Returns a recursive list of the partitions in `table_path` up to `max_depth` -async fn list_partitions( +pub async fn list_partitions( store: &dyn ObjectStore, table_path: &ListingTableUrl, max_depth: usize, @@ -364,7 +364,7 @@ fn populate_partition_values<'a>( } } -fn evaluate_partition_prefix<'a>( +pub fn evaluate_partition_prefix<'a>( partition_cols: &'a [(String, DataType)], filters: &'a [Expr], ) -> Option { @@ -405,7 +405,7 @@ fn evaluate_partition_prefix<'a>( /// `filters` should only contain expressions that can be evaluated /// using only the partition columns. pub async fn pruned_partition_list<'a>( - ctx: &'a SessionState, + ctx: &'a dyn Session, store: &'a dyn ObjectStore, table_path: &'a ListingTableUrl, filters: &'a [Expr], @@ -489,7 +489,7 @@ pub async fn pruned_partition_list<'a>( /// Extract the partition values for the given `file_path` (in the given `table_path`) /// associated to the partitions defined by `table_partition_cols` -fn parse_partitions_for_path<'a, I>( +pub fn parse_partitions_for_path<'a, I>( table_path: &ListingTableUrl, file_path: &'a Path, table_partition_cols: I, @@ -517,17 +517,36 @@ where } Some(part_values) } +/// Describe a partition as a (path, depth, files) tuple for easier assertions +pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { + ( + partition.path.as_ref(), + partition.depth, + partition + .files + .as_ref() + .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect()) + .unwrap_or_default(), + ) +} #[cfg(test)] mod tests { + use async_trait::async_trait; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnv; + use futures::FutureExt; + use object_store::memory::InMemory; + use std::any::Any; use std::ops::Not; - - use futures::StreamExt; - - use crate::test::object_store::make_test_store_and_state; - use datafusion_expr::{case, col, lit, Expr}; + // use futures::StreamExt; use super::*; + use datafusion_expr::{ + case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF, + }; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_plan::ExecutionPlan; #[test] fn test_split_files() { @@ -578,7 +597,7 @@ mod tests { ]); let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( - &state, + state.as_ref(), store.as_ref(), &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter], @@ -603,7 +622,7 @@ mod tests { ]); let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( - &state, + state.as_ref(), store.as_ref(), &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter], @@ -643,7 +662,7 @@ mod tests { let filter1 = Expr::eq(col("part1"), lit("p1v2")); let filter2 = Expr::eq(col("part2"), lit("p2v1")); let pruned = pruned_partition_list( - &state, + state.as_ref(), store.as_ref(), &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter1, filter2], @@ -680,19 +699,6 @@ mod tests { ); } - /// Describe a partition as a (path, depth, files) tuple for easier assertions - fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { - ( - partition.path.as_ref(), - partition.depth, - partition - .files - .as_ref() - .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect()) - .unwrap_or_default(), - ) - } - #[tokio::test] async fn test_list_partition() { let (store, _) = make_test_store_and_state(&[ @@ -994,4 +1000,74 @@ mod tests { Some(Path::from("a=1970-01-05")), ); } + + pub fn make_test_store_and_state( + files: &[(&str, u64)], + ) -> (Arc, Arc) { + let memory = InMemory::new(); + + for (name, size) in files { + memory + .put(&Path::from(*name), vec![0; *size as usize].into()) + .now_or_never() + .unwrap() + .unwrap(); + } + + (Arc::new(memory), Arc::new(MockSession {})) + } + + struct MockSession {} + + #[async_trait] + impl Session for MockSession { + fn session_id(&self) -> &str { + unimplemented!() + } + + fn config(&self) -> &SessionConfig { + unimplemented!() + } + + async fn create_physical_plan( + &self, + _logical_plan: &LogicalPlan, + ) -> Result> { + unimplemented!() + } + + fn create_physical_expr( + &self, + _expr: Expr, + _df_schema: &DFSchema, + ) -> Result> { + unimplemented!() + } + + fn scalar_functions(&self) -> &std::collections::HashMap> { + unimplemented!() + } + + fn aggregate_functions( + &self, + ) -> &std::collections::HashMap> { + unimplemented!() + } + + fn window_functions(&self) -> &std::collections::HashMap> { + unimplemented!() + } + + fn runtime_env(&self) -> &Arc { + unimplemented!() + } + + fn execution_props(&self) -> &ExecutionProps { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + } } diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs new file mode 100644 index 000000000000..e952e39fd479 --- /dev/null +++ b/datafusion/catalog-listing/src/mod.rs @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A table that uses the `ObjectStore` listing capability +//! to get the list of files to process. + +pub mod helpers; +pub mod url; + +use chrono::TimeZone; +use datafusion_common::Result; +use datafusion_common::{ScalarValue, Statistics}; +use futures::Stream; +use object_store::{path::Path, ObjectMeta}; +use std::pin::Pin; +use std::sync::Arc; + +pub use self::url::ListingTableUrl; + +/// Stream of files get listed from object store +pub type PartitionedFileStream = + Pin> + Send + Sync + 'static>>; + +/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" +/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping +/// sections of a Parquet file in parallel. +#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)] +pub struct FileRange { + /// Range start + pub start: i64, + /// Range end + pub end: i64, +} + +impl FileRange { + /// returns true if this file range contains the specified offset + pub fn contains(&self, offset: i64) -> bool { + offset >= self.start && offset < self.end + } +} + +#[derive(Debug, Clone)] +/// A single file or part of a file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub object_meta: ObjectMeta, + /// Values of partition columns to be appended to each row. + /// + /// These MUST have the same count, order, and type than the [`table_partition_cols`]. + /// + /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. + /// + /// + /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55 + /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62 + /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190 + pub partition_values: Vec, + /// An optional file range for a more fine-grained parallel execution + pub range: Option, + /// Optional statistics that describe the data in this file if known. + /// + /// DataFusion relies on these statistics for planning (in particular to sort file groups), + /// so if they are incorrect, incorrect answers may result. + pub statistics: Option, + /// An optional field for user defined per object metadata + pub extensions: Option>, + /// The estimated size of the parquet metadata, in bytes + pub metadata_size_hint: Option, +} + +impl PartitionedFile { + /// Create a simple file without metadata or partition + pub fn new(path: impl Into, size: u64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path.into()), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + metadata_size_hint: None, + } + } + + /// Create a file range without metadata or partition + pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + partition_values: vec![], + range: Some(FileRange { start, end }), + statistics: None, + extensions: None, + metadata_size_hint: None, + } + .with_range(start, end) + } + + /// Provide a hint to the size of the file metadata. If a hint is provided + /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. + /// Without an appropriate hint, two read may be required to fetch the metadata. + pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { + self.metadata_size_hint = Some(metadata_size_hint); + self + } + + /// Return a file reference from the given path + pub fn from_path(path: String) -> Result { + let size = std::fs::metadata(path.clone())?.len(); + Ok(Self::new(path, size)) + } + + /// Return the path of this partitioned file + pub fn path(&self) -> &Path { + &self.object_meta.location + } + + /// Update the file to only scan the specified range (in bytes) + pub fn with_range(mut self, start: i64, end: i64) -> Self { + self.range = Some(FileRange { start, end }); + self + } + + /// Update the user defined extensions for this file. + /// + /// This can be used to pass reader specific information. + pub fn with_extensions( + mut self, + extensions: Arc, + ) -> Self { + self.extensions = Some(extensions); + self + } +} + +impl From for PartitionedFile { + fn from(object_meta: ObjectMeta) -> Self { + PartitionedFile { + object_meta, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + metadata_size_hint: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::ListingTableUrl; + use datafusion_execution::object_store::{ + DefaultObjectStoreRegistry, ObjectStoreRegistry, + }; + use object_store::{local::LocalFileSystem, path::Path}; + use std::{ops::Not, sync::Arc}; + use url::Url; + + #[test] + fn test_object_store_listing_url() { + let listing = ListingTableUrl::parse("file:///").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "file:///"); + + let listing = ListingTableUrl::parse("s3://bucket/").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "s3://bucket/"); + } + + #[test] + fn test_get_store_hdfs() { + let sut = DefaultObjectStoreRegistry::default(); + let url = Url::parse("hdfs://localhost:8020").unwrap(); + sut.register_store(&url, Arc::new(LocalFileSystem::new())); + let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_s3() { + let sut = DefaultObjectStoreRegistry::default(); + let url = Url::parse("s3://bucket/key").unwrap(); + sut.register_store(&url, Arc::new(LocalFileSystem::new())); + let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_file() { + let sut = DefaultObjectStoreRegistry::default(); + let url = ListingTableUrl::parse("file:///bucket/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_local() { + let sut = DefaultObjectStoreRegistry::default(); + let url = ListingTableUrl::parse("../").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_url_contains() { + let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap(); + + // standard case with default config + assert!(url.contains( + &Path::parse("/var/data/mytable/data.parquet").unwrap(), + true + )); + + // standard case with `ignore_subdirectory` set to false + assert!(url.contains( + &Path::parse("/var/data/mytable/data.parquet").unwrap(), + false + )); + + // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't + // a direct child of the `url` + assert!(url + .contains( + &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), + true + ) + .not()); + + // when we set `ignore_subdirectory` to false, we should not ignore the file + assert!(url.contains( + &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), + false + )); + + // as above, `ignore_subdirectory` is false, so we include the file + assert!(url.contains( + &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), + false + )); + + // in this case, we include the file even when `ignore_subdirectory` is true because the + // path segment is a hive partition which doesn't count as a subdirectory for the purposes + // of `Url::contains` + assert!(url.contains( + &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), + true + )); + + // testing an empty path with default config + assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true)); + + // testing an empty path with `ignore_subdirectory` set to false + assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false)); + } +} diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/catalog-listing/src/url.rs similarity index 98% rename from datafusion/core/src/datasource/listing/url.rs rename to datafusion/catalog-listing/src/url.rs index 6fb536ca2f05..2e6415ba3b2b 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/catalog-listing/src/url.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::context::SessionState; +use datafusion_catalog::Session; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_optimizer::OptimizerConfig; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; @@ -194,7 +193,7 @@ impl ListingTableUrl { /// /// Examples: /// ```rust - /// use datafusion::datasource::listing::ListingTableUrl; + /// use datafusion_catalog_listing::ListingTableUrl; /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap(); /// assert_eq!(url.file_extension(), Some("csv")); /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap(); @@ -216,7 +215,7 @@ impl ListingTableUrl { /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning /// an iterator of the remaining path segments - pub(crate) fn strip_prefix<'a, 'b: 'a>( + pub fn strip_prefix<'a, 'b: 'a>( &'a self, path: &'b Path, ) -> Option + 'a> { @@ -230,11 +229,11 @@ impl ListingTableUrl { /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension` pub async fn list_all_files<'a>( &'a self, - ctx: &'a SessionState, + ctx: &'a dyn Session, store: &'a dyn ObjectStore, file_extension: &'a str, ) -> Result>> { - let exec_options = &ctx.options().execution; + let exec_options = &ctx.config_options().execution; let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory; // If the prefix is a file, use a head request, otherwise list let list = match self.is_collection() { @@ -325,6 +324,7 @@ impl std::fmt::Display for ListingTableUrl { } } +#[cfg(not(target_arch = "wasm32"))] const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; /// Splits `path` at the first path segment containing a glob expression, returning @@ -333,6 +333,7 @@ const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; /// Path delimiters are determined using [`std::path::is_separator`] which /// permits `/` as a path delimiter even on Windows platforms. /// +#[cfg(not(target_arch = "wasm32"))] fn split_glob_expression(path: &str) -> Option<(&str, &str)> { let mut last_separator = 0; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index b708c18f5b75..815191fd3c3f 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -100,6 +100,7 @@ bytes = { workspace = true } bzip2 = { version = "0.5.0", optional = true } chrono = { workspace = true } datafusion-catalog = { workspace = true } +datafusion-catalog-listing = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index f47e2107ade6..2e2e6dba1c0e 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -425,6 +425,7 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema { } /// Coerces the file schema if the table schema uses a view type. +#[cfg(not(target_arch = "wasm32"))] pub(crate) fn coerce_file_schema_to_view_type( table_schema: &Schema, file_schema: &Schema, @@ -489,6 +490,7 @@ pub fn transform_binary_to_string(schema: &Schema) -> Schema { /// If the table schema uses a string type, coerce the file schema to use a string type. /// /// See [parquet::ParquetFormat::binary_as_string] for details +#[cfg(not(target_arch = "wasm32"))] pub(crate) fn coerce_file_schema_to_string_type( table_schema: &Schema, file_schema: &Schema, diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index f11653ce1e52..39323b993d45 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -18,263 +18,6 @@ //! A table that uses the `ObjectStore` listing capability //! to get the list of files to process. -mod helpers; mod table; -mod url; - -use chrono::TimeZone; -use datafusion_common::Result; -use datafusion_common::{ScalarValue, Statistics}; -use futures::Stream; -use object_store::{path::Path, ObjectMeta}; -use std::pin::Pin; -use std::sync::Arc; - -pub use self::url::ListingTableUrl; +pub use datafusion_catalog_listing::*; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; - -/// Stream of files get listed from object store -pub type PartitionedFileStream = - Pin> + Send + Sync + 'static>>; - -/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" -/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping -/// sections of a Parquet file in parallel. -#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)] -pub struct FileRange { - /// Range start - pub start: i64, - /// Range end - pub end: i64, -} - -impl FileRange { - /// returns true if this file range contains the specified offset - pub fn contains(&self, offset: i64) -> bool { - offset >= self.start && offset < self.end - } -} - -#[derive(Debug, Clone)] -/// A single file or part of a file that should be read, along with its schema, statistics -/// and partition column values that need to be appended to each row. -pub struct PartitionedFile { - /// Path for the file (e.g. URL, filesystem path, etc) - pub object_meta: ObjectMeta, - /// Values of partition columns to be appended to each row. - /// - /// These MUST have the same count, order, and type than the [`table_partition_cols`]. - /// - /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. - /// - /// - /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict - /// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict - /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols - pub partition_values: Vec, - /// An optional file range for a more fine-grained parallel execution - pub range: Option, - /// Optional statistics that describe the data in this file if known. - /// - /// DataFusion relies on these statistics for planning (in particular to sort file groups), - /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, - /// The estimated size of the parquet metadata, in bytes - pub metadata_size_hint: Option, -} - -impl PartitionedFile { - /// Create a simple file without metadata or partition - pub fn new(path: impl Into, size: u64) -> Self { - Self { - object_meta: ObjectMeta { - location: Path::from(path.into()), - last_modified: chrono::Utc.timestamp_nanos(0), - size: size as usize, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - } - } - - /// Create a file range without metadata or partition - pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { - Self { - object_meta: ObjectMeta { - location: Path::from(path), - last_modified: chrono::Utc.timestamp_nanos(0), - size: size as usize, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: Some(FileRange { start, end }), - statistics: None, - extensions: None, - metadata_size_hint: None, - } - .with_range(start, end) - } - - /// Provide a hint to the size of the file metadata. If a hint is provided - /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. - /// Without an appropriate hint, two read may be required to fetch the metadata. - pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { - self.metadata_size_hint = Some(metadata_size_hint); - self - } - - /// Return a file reference from the given path - pub fn from_path(path: String) -> Result { - let size = std::fs::metadata(path.clone())?.len(); - Ok(Self::new(path, size)) - } - - /// Return the path of this partitioned file - pub fn path(&self) -> &Path { - &self.object_meta.location - } - - /// Update the file to only scan the specified range (in bytes) - pub fn with_range(mut self, start: i64, end: i64) -> Self { - self.range = Some(FileRange { start, end }); - self - } - - /// Update the user defined extensions for this file. - /// - /// This can be used to pass reader specific information. - pub fn with_extensions( - mut self, - extensions: Arc, - ) -> Self { - self.extensions = Some(extensions); - self - } -} - -impl From for PartitionedFile { - fn from(object_meta: ObjectMeta) -> Self { - PartitionedFile { - object_meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - } - } -} - -#[cfg(test)] -mod tests { - use super::ListingTableUrl; - use datafusion_execution::object_store::{ - DefaultObjectStoreRegistry, ObjectStoreRegistry, - }; - use object_store::{local::LocalFileSystem, path::Path}; - use std::{ops::Not, sync::Arc}; - use url::Url; - - #[test] - fn test_object_store_listing_url() { - let listing = ListingTableUrl::parse("file:///").unwrap(); - let store = listing.object_store(); - assert_eq!(store.as_str(), "file:///"); - - let listing = ListingTableUrl::parse("s3://bucket/").unwrap(); - let store = listing.object_store(); - assert_eq!(store.as_str(), "s3://bucket/"); - } - - #[test] - fn test_get_store_hdfs() { - let sut = DefaultObjectStoreRegistry::default(); - let url = Url::parse("hdfs://localhost:8020").unwrap(); - sut.register_store(&url, Arc::new(LocalFileSystem::new())); - let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_s3() { - let sut = DefaultObjectStoreRegistry::default(); - let url = Url::parse("s3://bucket/key").unwrap(); - sut.register_store(&url, Arc::new(LocalFileSystem::new())); - let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_file() { - let sut = DefaultObjectStoreRegistry::default(); - let url = ListingTableUrl::parse("file:///bucket/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_local() { - let sut = DefaultObjectStoreRegistry::default(); - let url = ListingTableUrl::parse("../").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_url_contains() { - let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap(); - - // standard case with default config - assert!(url.contains( - &Path::parse("/var/data/mytable/data.parquet").unwrap(), - true - )); - - // standard case with `ignore_subdirectory` set to false - assert!(url.contains( - &Path::parse("/var/data/mytable/data.parquet").unwrap(), - false - )); - - // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't - // a direct child of the `url` - assert!(url - .contains( - &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), - true - ) - .not()); - - // when we set `ignore_subdirectory` to false, we should not ignore the file - assert!(url.contains( - &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), - false - )); - - // as above, `ignore_subdirectory` is false, so we include the file - assert!(url.contains( - &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), - false - )); - - // in this case, we include the file even when `ignore_subdirectory` is true because the - // path segment is a hive partition which doesn't count as a subdirectory for the purposes - // of `Url::contains` - assert!(url.contains( - &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), - true - )); - - // testing an empty path with default config - assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true)); - - // testing an empty path with `ignore_subdirectory` set to false - assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false)); - } -} diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 780b22983393..ca0aa92ff1ed 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -809,8 +809,9 @@ pub mod variable { pub use datafusion_expr::var_provider::{VarProvider, VarType}; } -#[cfg(test)] +#[cfg(not(target_arch = "wasm32"))] pub mod test; + pub mod test_util; #[cfg(doctest)] diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 0d659582aca3..05e63a3c4fd4 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -61,7 +61,7 @@ pub fn create_table_dual() -> Arc { Field::new("name", DataType::Utf8, false), ])); let batch = RecordBatch::try_new( - dual_schema.clone(), + Arc::::clone(&dual_schema), vec![ Arc::new(Int32Array::from(vec![1])), Arc::new(array::StringArray::from(vec!["a"])), @@ -244,7 +244,7 @@ pub fn table_with_sequence( let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); let arr = Arc::new(Int32Array::from((seq_start..=seq_end).collect::>())); let partitions = vec![vec![RecordBatch::try_new( - schema.clone(), + Arc::::clone(&schema), vec![arr as ArrayRef], )?]]; Ok(Arc::new(MemTable::try_new(schema, partitions)?))