From 826882d3d5dd3cde0503d57ad6e987eb07515a09 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Fri, 26 Jul 2024 18:01:24 +1000 Subject: [PATCH] fix: Respect `glob=False` for cloud reads (#17860) --- crates/polars-io/src/cloud/adaptors.rs | 2 +- crates/polars-io/src/cloud/glob.rs | 49 +++++++++++++------ .../polars-io/src/cloud/object_store_setup.rs | 11 +++-- crates/polars-io/src/cloud/options.rs | 2 +- crates/polars-io/src/file_cache/utils.rs | 15 ++---- crates/polars-io/src/ipc/ipc_reader_async.rs | 19 ++----- .../polars-io/src/parquet/read/async_impl.rs | 15 ++---- crates/polars-io/src/path_utils/mod.rs | 29 +++++------ crates/polars-utils/src/io.rs | 6 ++- py-polars/tests/unit/io/test_csv.py | 18 ------- py-polars/tests/unit/io/test_other.py | 31 ++++++++++++ py-polars/tests/unit/io/test_parquet.py | 35 ------------- 12 files changed, 107 insertions(+), 125 deletions(-) diff --git a/crates/polars-io/src/cloud/adaptors.rs b/crates/polars-io/src/cloud/adaptors.rs index 40a926dd1ff6..e5ff951c9657 100644 --- a/crates/polars-io/src/cloud/adaptors.rs +++ b/crates/polars-io/src/cloud/adaptors.rs @@ -39,7 +39,7 @@ impl CloudWriter { /// TODO: Naming? pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult { let (cloud_location, object_store) = - crate::cloud::build_object_store(uri, cloud_options).await?; + crate::cloud::build_object_store(uri, cloud_options, false).await?; Self::new_with_object_store(object_store, cloud_location.prefix.into()).await } diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index 06dc0f63d06a..b4d74d093a8d 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -84,7 +84,7 @@ pub struct CloudLocation { } impl CloudLocation { - pub fn from_url(parsed: &Url) -> PolarsResult { + pub fn from_url(parsed: &Url, glob: bool) -> PolarsResult { let is_local = parsed.scheme() == "file"; let (bucket, key) = if is_local { ("".into(), parsed.path()) @@ -109,10 +109,16 @@ impl CloudLocation { let key = percent_encoding::percent_decode_str(key) .decode_utf8() .map_err(to_compute_err)?; - let (mut prefix, expansion) = extract_prefix_expansion(&key)?; - if is_local && key.starts_with(DELIMITER) { - prefix.insert(0, DELIMITER); - } + let (prefix, expansion) = if glob { + let (mut prefix, expansion) = extract_prefix_expansion(&key)?; + if is_local && key.starts_with(DELIMITER) { + prefix.insert(0, DELIMITER); + } + (prefix, expansion) + } else { + (key.to_string(), None) + }; + Ok(CloudLocation { scheme: parsed.scheme().into(), bucket, @@ -122,9 +128,9 @@ impl CloudLocation { } /// Parse a CloudLocation from an url. - pub fn new(url: &str) -> PolarsResult { + pub fn new(url: &str, glob: bool) -> PolarsResult { let parsed = Url::parse(url).map_err(to_compute_err)?; - Self::from_url(&parsed) + Self::from_url(&parsed, glob) } } @@ -173,7 +179,7 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu expansion, }, store, - ) = super::build_object_store(url, cloud_options).await?; + ) = super::build_object_store(url, cloud_options, true).await?; let matcher = &Matcher::new( if scheme == "file" { // For local paths the returned location has the leading slash stripped. @@ -209,7 +215,7 @@ mod test { #[test] fn test_cloud_location() { assert_eq!( - CloudLocation::new("s3://a/b").unwrap(), + CloudLocation::new("s3://a/b", true).unwrap(), CloudLocation { scheme: "s3".into(), bucket: "a".into(), @@ -218,7 +224,7 @@ mod test { } ); assert_eq!( - CloudLocation::new("s3://a/b/*.c").unwrap(), + CloudLocation::new("s3://a/b/*.c", true).unwrap(), CloudLocation { scheme: "s3".into(), bucket: "a".into(), @@ -227,7 +233,7 @@ mod test { } ); assert_eq!( - CloudLocation::new("file:///a/b").unwrap(), + CloudLocation::new("file:///a/b", true).unwrap(), CloudLocation { scheme: "file".into(), bucket: "".into(), @@ -268,7 +274,7 @@ mod test { #[test] fn test_matcher_file_name() { - let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet").unwrap(); + let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet", true).unwrap(); let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap(); // Regular match. assert!(a.is_matching(Path::from("folder/1.parquet").as_ref())); @@ -280,13 +286,14 @@ mod test { #[test] fn test_matcher_folders() { - let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet").unwrap(); + let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet", true).unwrap(); let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap(); // Intermediary folders are optional. assert!(a.is_matching(Path::from("folder/1.parquet").as_ref())); // Intermediary folders are allowed. assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref())); - let cloud_location = CloudLocation::new("s3://bucket/folder/**/data/*.parquet").unwrap(); + let cloud_location = + CloudLocation::new("s3://bucket/folder/**/data/*.parquet", true).unwrap(); let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap(); // Required folder `data` is missing. assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref())); @@ -295,4 +302,18 @@ mod test { // Required folder is present and additional folders are allowed. assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref())); } + + #[test] + fn test_cloud_location_no_glob() { + let cloud_location = CloudLocation::new("s3://bucket/[*", false).unwrap(); + assert_eq!( + cloud_location, + CloudLocation { + scheme: "s3".into(), + bucket: "bucket".into(), + prefix: "/[*".into(), + expansion: None, + }, + ) + } } diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 705e48a77f14..b6464b109535 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -41,7 +41,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { } /// Construct an object_store `Path` from a string without any encoding/decoding. -pub fn object_path_from_string(path: String) -> PolarsResult { +pub fn object_path_from_str(path: &str) -> PolarsResult { object_store::path::Path::parse(path).map_err(to_compute_err) } @@ -53,9 +53,10 @@ pub async fn build_object_store( allow(unused_variables) )] options: Option<&CloudOptions>, + glob: bool, ) -> BuildResult { let parsed = parse_url(url).map_err(to_compute_err)?; - let cloud_location = CloudLocation::from_url(&parsed)?; + let cloud_location = CloudLocation::from_url(&parsed, glob)?; let key = url_and_creds_to_key(&parsed, options); let mut allow_cache = true; @@ -132,11 +133,11 @@ pub async fn build_object_store( mod test { #[test] - fn test_object_path_from_string() { - use super::object_path_from_string; + fn test_object_path_from_str() { + use super::object_path_from_str; let path = "%25"; - let out = object_path_from_string(path.to_string()).unwrap(); + let out = object_path_from_str(path).unwrap(); assert_eq!(out.as_ref(), path); } diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index f544765ddad9..0f48688051f7 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -297,7 +297,7 @@ impl CloudOptions { .get_config_value(&AmazonS3ConfigKey::Region) .is_none() { - let bucket = crate::cloud::CloudLocation::new(url)?.bucket; + let bucket = crate::cloud::CloudLocation::new(url, false)?.bucket; let region = { let bucket_region = BUCKET_REGION.lock().unwrap(); bucket_region.get(bucket.as_str()).cloned() diff --git a/crates/polars-io/src/file_cache/utils.rs b/crates/polars-io/src/file_cache/utils.rs index ca564074a3b6..034dd83307cc 100644 --- a/crates/polars-io/src/file_cache/utils.rs +++ b/crates/polars-io/src/file_cache/utils.rs @@ -9,7 +9,7 @@ use super::cache::{get_env_file_cache_ttl, FILE_CACHE}; use super::entry::FileCacheEntry; use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher}; use crate::cloud::{ - build_object_store, object_path_from_string, CloudLocation, CloudOptions, PolarsObjectStore, + build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore, }; use crate::path_utils::{ensure_directory_init, is_cloud_url, POLARS_TEMP_DIR_BASE_PATH}; use crate::pl_async; @@ -76,7 +76,7 @@ pub fn init_entries_from_uri_list( }) .map(|i| async move { let (_, object_store) = - build_object_store(&uri_list[i], cloud_options).await?; + build_object_store(&uri_list[i], cloud_options, false).await?; PolarsResult::Ok(PolarsObjectStore::new(object_store)) }), ) @@ -90,14 +90,9 @@ pub fn init_entries_from_uri_list( FILE_CACHE.init_entry( uri.clone(), || { - let CloudLocation { - prefix, expansion, .. - } = CloudLocation::new(uri.as_ref()).unwrap(); - - let cloud_path = { - assert!(expansion.is_none(), "path should not contain wildcards"); - object_path_from_string(prefix)? - }; + let CloudLocation { prefix, .. } = + CloudLocation::new(uri.as_ref(), false).unwrap(); + let cloud_path = object_path_from_str(&prefix)?; let object_store = object_stores[std::cmp::min(i, object_stores.len())].clone(); diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index 4501898b50ad..9d392575e956 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -9,7 +9,7 @@ use polars_core::schema::Schema; use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult}; use crate::cloud::{ - build_object_store, object_path_from_string, CloudLocation, CloudOptions, PolarsObjectStore, + build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore, }; use crate::file_cache::{init_entries_from_uri_list, FileCacheEntry}; use crate::predicates::PhysicalIoExpr; @@ -67,19 +67,10 @@ impl IpcReaderAsync { cloud_options: Option<&CloudOptions>, ) -> PolarsResult { let cache_entry = init_entries_from_uri_list(&[Arc::from(uri)], cloud_options)?[0].clone(); - let ( - CloudLocation { - prefix, expansion, .. - }, - store, - ) = build_object_store(uri, cloud_options).await?; - - let path = { - // Any wildcards should already have been resolved here. Without this assertion they would - // be ignored. - debug_assert!(expansion.is_none(), "path should not contain wildcards"); - object_path_from_string(prefix)? - }; + let (CloudLocation { prefix, .. }, store) = + build_object_store(uri, cloud_options, false).await?; + + let path = object_path_from_str(&prefix)?; Ok(Self { store: PolarsObjectStore::new(store), diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index 5ec9632871d5..fd96b68697ec 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -16,7 +16,7 @@ use super::mmap::ColumnStore; use super::predicates::read_this_row_group; use super::read_impl::compute_row_group_range; use crate::cloud::{ - build_object_store, object_path_from_string, CloudLocation, CloudOptions, PolarsObjectStore, + build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore, }; use crate::parquet::metadata::FileMetaDataRef; use crate::pl_async::get_runtime; @@ -39,17 +39,8 @@ impl ParquetObjectStore { options: Option<&CloudOptions>, metadata: Option, ) -> PolarsResult { - let ( - CloudLocation { - prefix, expansion, .. - }, - store, - ) = build_object_store(uri, options).await?; - - // Any wildcards should already have been resolved here. Without this assertion they would - // be ignored. - debug_assert!(expansion.is_none(), "path should not contain wildcards"); - let path = object_path_from_string(prefix)?; + let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?; + let path = object_path_from_str(&prefix)?; Ok(ParquetObjectStore { store: PolarsObjectStore::new(store), diff --git a/crates/polars-io/src/path_utils/mod.rs b/crates/polars-io/src/path_utils/mod.rs index 62f77c7c2655..749e248881c6 100644 --- a/crates/polars-io/src/path_utils/mod.rs +++ b/crates/polars-io/src/path_utils/mod.rs @@ -148,7 +148,7 @@ pub fn expand_paths_hive( { use polars_utils::_limit_path_len_io_err; - use crate::cloud::object_path_from_string; + use crate::cloud::object_path_from_str; if first_path.starts_with("hf://") { let (expand_start_idx, paths) = @@ -172,23 +172,24 @@ pub fn expand_paths_hive( -> PolarsResult<(usize, Vec)> { crate::pl_async::get_runtime().block_on_potential_spawn(async { let (cloud_location, store) = - crate::cloud::build_object_store(path, cloud_options).await?; - - let prefix = object_path_from_string(cloud_location.prefix.clone())?; - - let out = if !path.ends_with("/") && cloud_location.expansion.is_none() && { - // We need to check if it is a directory for local paths (we can be here due - // to FORCE_ASYNC). For cloud paths the convention is that the user must add - // a trailing slash `/` to scan directories. We don't infer it as that would - // mean sending one network request per path serially (very slow). - is_cloud || PathBuf::from(path).is_file() - } { + crate::cloud::build_object_store(path, cloud_options, glob).await?; + let prefix = object_path_from_str(&cloud_location.prefix)?; + + let out = if !path.ends_with("/") + && (!glob || cloud_location.expansion.is_none()) + && { + // We need to check if it is a directory for local paths (we can be here due + // to FORCE_ASYNC). For cloud paths the convention is that the user must add + // a trailing slash `/` to scan directories. We don't infer it as that would + // mean sending one network request per path serially (very slow). + is_cloud || PathBuf::from(path).is_file() + } { ( 0, vec![PathBuf::from(format_path( &cloud_location.scheme, &cloud_location.bucket, - &cloud_location.prefix, + prefix.as_ref(), ))], ) } else { @@ -251,7 +252,7 @@ pub fn expand_paths_hive( let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes()); - let path = if glob_start_idx.is_some() { + let path = if glob && glob_start_idx.is_some() { path.clone() } else { let (expand_start_idx, paths) = diff --git a/crates/polars-utils/src/io.rs b/crates/polars-utils/src/io.rs index 5fb273e25d16..7b52ebb0ab67 100644 --- a/crates/polars-utils/src/io.rs +++ b/crates/polars-utils/src/io.rs @@ -4,9 +4,13 @@ use std::path::Path; use polars_error::*; +fn verbose() -> bool { + std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("") == "1" +} + pub fn _limit_path_len_io_err(path: &Path, err: io::Error) -> PolarsError { let path = path.to_string_lossy(); - let msg = if path.len() > 88 { + let msg = if path.len() > 88 && !verbose() { let truncated_path: String = path.chars().skip(path.len() - 88).collect(); format!("{err}: ...{truncated_path}") } else { diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index 294ed6073824..33bbcf67f1ae 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -2,7 +2,6 @@ import gzip import io -import os import sys import textwrap import zlib @@ -2196,23 +2195,6 @@ def test_fsspec_not_available() -> None: ) -@pytest.mark.write_disk() -@pytest.mark.skipif( - os.environ.get("POLARS_FORCE_ASYNC") == "1" or sys.platform == "win32", - reason="only local", -) -def test_read_csv_no_glob(tmpdir: Path) -> None: - df = pl.DataFrame({"foo": 1}) - - p = tmpdir / "*.csv" - df.write_csv(str(p)) - p = tmpdir / "*1.csv" - df.write_csv(str(p)) - - p = tmpdir / "*.csv" - assert_frame_equal(pl.read_csv(str(p), glob=False), df) - - def test_read_csv_dtypes_deprecated() -> None: csv = textwrap.dedent( """\ diff --git a/py-polars/tests/unit/io/test_other.py b/py-polars/tests/unit/io/test_other.py index 2bc36d66e5ae..d6562911adbb 100644 --- a/py-polars/tests/unit/io/test_other.py +++ b/py-polars/tests/unit/io/test_other.py @@ -122,3 +122,34 @@ def test_unit_io_subdir_has_no_init() -> None: assert not ( io_dir / "__init__.py" ).exists(), "Found undesirable '__init__.py' in the 'unit.io' tests subdirectory" + + +@pytest.mark.write_disk() +@pytest.mark.parametrize( + ("scan_funcs", "write_func"), + [ + ([pl.scan_parquet, pl.read_parquet], pl.DataFrame.write_parquet), + ([pl.scan_csv, pl.read_csv], pl.DataFrame.write_csv), + ], +) +@pytest.mark.parametrize("char", ["[", "*"]) +def test_no_glob( + scan_funcs: list[Callable[[Any], pl.LazyFrame | pl.DataFrame]], + write_func: Callable[[pl.DataFrame, Path], None], + char: str, + tmp_path: Path, +) -> None: + if sys.platform == "win32" and char == "*": + pytest.skip("unsupported glob char for windows") + + tmp_path.mkdir(exist_ok=True) + + df = pl.DataFrame({"x": 1}) + + paths = [tmp_path / f"{char}", tmp_path / f"{char}1"] + + write_func(df, paths[0]) + write_func(df, paths[1]) + + for func in scan_funcs: + assert_frame_equal(func(paths[0], glob=False).lazy().collect(), df) # type: ignore[call-arg] diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 680142c4a8fe..9f93018669fc 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1,8 +1,6 @@ from __future__ import annotations import io -import os -import sys from datetime import datetime, time, timezone from decimal import Decimal from typing import TYPE_CHECKING, Any, cast @@ -985,39 +983,6 @@ def test_max_statistic_parquet_writer(tmp_path: Path) -> None: assert_frame_equal(result, expected) -@pytest.mark.write_disk() -@pytest.mark.skipif(os.environ.get("POLARS_FORCE_ASYNC") == "1", reason="only local") -@pytest.mark.skipif( - sys.platform == "win32", reason="Windows filenames cannot contain an asterisk" -) -def test_no_glob(tmp_path: Path) -> None: - tmp_path.mkdir(exist_ok=True) - - df = pl.DataFrame({"foo": 1}) - - p1 = tmp_path / "*.parquet" - df.write_parquet(str(p1)) - p2 = tmp_path / "*1.parquet" - df.write_parquet(str(p2)) - - assert_frame_equal(pl.scan_parquet(str(p1), glob=False).collect(), df) - - -@pytest.mark.write_disk() -@pytest.mark.skipif(os.environ.get("POLARS_FORCE_ASYNC") == "1", reason="only local") -def test_no_glob_windows(tmp_path: Path) -> None: - tmp_path.mkdir(exist_ok=True) - - df = pl.DataFrame({"foo": 1}) - - p1 = tmp_path / "hello[.parquet" - df.write_parquet(str(p1)) - p2 = tmp_path / "hello[2.parquet" - df.write_parquet(str(p2)) - - assert_frame_equal(pl.scan_parquet(str(p1), glob=False).collect(), df) - - @pytest.mark.slow() def test_hybrid_rle() -> None: # 10_007 elements to test if not a nice multiple of 8