Skip to content

Commit

Permalink
fix: Respect glob=False for cloud reads (#17860)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Jul 26, 2024
1 parent a07d2fc commit 826882d
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 125 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl CloudWriter {
/// TODO: Naming?
pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
let (cloud_location, object_store) =
crate::cloud::build_object_store(uri, cloud_options).await?;
crate::cloud::build_object_store(uri, cloud_options, false).await?;
Self::new_with_object_store(object_store, cloud_location.prefix.into()).await
}

Expand Down
49 changes: 35 additions & 14 deletions crates/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub struct CloudLocation {
}

impl CloudLocation {
pub fn from_url(parsed: &Url) -> PolarsResult<CloudLocation> {
pub fn from_url(parsed: &Url, glob: bool) -> PolarsResult<CloudLocation> {
let is_local = parsed.scheme() == "file";
let (bucket, key) = if is_local {
("".into(), parsed.path())
Expand All @@ -109,10 +109,16 @@ impl CloudLocation {
let key = percent_encoding::percent_decode_str(key)
.decode_utf8()
.map_err(to_compute_err)?;
let (mut prefix, expansion) = extract_prefix_expansion(&key)?;
if is_local && key.starts_with(DELIMITER) {
prefix.insert(0, DELIMITER);
}
let (prefix, expansion) = if glob {
let (mut prefix, expansion) = extract_prefix_expansion(&key)?;
if is_local && key.starts_with(DELIMITER) {
prefix.insert(0, DELIMITER);
}
(prefix, expansion)
} else {
(key.to_string(), None)
};

Ok(CloudLocation {
scheme: parsed.scheme().into(),
bucket,
Expand All @@ -122,9 +128,9 @@ impl CloudLocation {
}

/// Parse a CloudLocation from an url.
pub fn new(url: &str) -> PolarsResult<CloudLocation> {
pub fn new(url: &str, glob: bool) -> PolarsResult<CloudLocation> {
let parsed = Url::parse(url).map_err(to_compute_err)?;
Self::from_url(&parsed)
Self::from_url(&parsed, glob)
}
}

Expand Down Expand Up @@ -173,7 +179,7 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu
expansion,
},
store,
) = super::build_object_store(url, cloud_options).await?;
) = super::build_object_store(url, cloud_options, true).await?;
let matcher = &Matcher::new(
if scheme == "file" {
// For local paths the returned location has the leading slash stripped.
Expand Down Expand Up @@ -209,7 +215,7 @@ mod test {
#[test]
fn test_cloud_location() {
assert_eq!(
CloudLocation::new("s3://a/b").unwrap(),
CloudLocation::new("s3://a/b", true).unwrap(),
CloudLocation {
scheme: "s3".into(),
bucket: "a".into(),
Expand All @@ -218,7 +224,7 @@ mod test {
}
);
assert_eq!(
CloudLocation::new("s3://a/b/*.c").unwrap(),
CloudLocation::new("s3://a/b/*.c", true).unwrap(),
CloudLocation {
scheme: "s3".into(),
bucket: "a".into(),
Expand All @@ -227,7 +233,7 @@ mod test {
}
);
assert_eq!(
CloudLocation::new("file:///a/b").unwrap(),
CloudLocation::new("file:///a/b", true).unwrap(),
CloudLocation {
scheme: "file".into(),
bucket: "".into(),
Expand Down Expand Up @@ -268,7 +274,7 @@ mod test {

#[test]
fn test_matcher_file_name() {
let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet").unwrap();
let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet", true).unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
// Regular match.
assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
Expand All @@ -280,13 +286,14 @@ mod test {

#[test]
fn test_matcher_folders() {
let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet").unwrap();
let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet", true).unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
// Intermediary folders are optional.
assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
// Intermediary folders are allowed.
assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
let cloud_location = CloudLocation::new("s3://bucket/folder/**/data/*.parquet").unwrap();
let cloud_location =
CloudLocation::new("s3://bucket/folder/**/data/*.parquet", true).unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
// Required folder `data` is missing.
assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
Expand All @@ -295,4 +302,18 @@ mod test {
// Required folder is present and additional folders are allowed.
assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
}

#[test]
fn test_cloud_location_no_glob() {
let cloud_location = CloudLocation::new("s3://bucket/[*", false).unwrap();
assert_eq!(
cloud_location,
CloudLocation {
scheme: "s3".into(),
bucket: "bucket".into(),
prefix: "/[*".into(),
expansion: None,
},
)
}
}
11 changes: 6 additions & 5 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String {
}

/// Construct an object_store `Path` from a string without any encoding/decoding.
pub fn object_path_from_string(path: String) -> PolarsResult<object_store::path::Path> {
pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
object_store::path::Path::parse(path).map_err(to_compute_err)
}

Expand All @@ -53,9 +53,10 @@ pub async fn build_object_store(
allow(unused_variables)
)]
options: Option<&CloudOptions>,
glob: bool,
) -> BuildResult {
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed)?;
let cloud_location = CloudLocation::from_url(&parsed, glob)?;

let key = url_and_creds_to_key(&parsed, options);
let mut allow_cache = true;
Expand Down Expand Up @@ -132,11 +133,11 @@ pub async fn build_object_store(

mod test {
#[test]
fn test_object_path_from_string() {
use super::object_path_from_string;
fn test_object_path_from_str() {
use super::object_path_from_str;

let path = "%25";
let out = object_path_from_string(path.to_string()).unwrap();
let out = object_path_from_str(path).unwrap();

assert_eq!(out.as_ref(), path);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl CloudOptions {
.get_config_value(&AmazonS3ConfigKey::Region)
.is_none()
{
let bucket = crate::cloud::CloudLocation::new(url)?.bucket;
let bucket = crate::cloud::CloudLocation::new(url, false)?.bucket;
let region = {
let bucket_region = BUCKET_REGION.lock().unwrap();
bucket_region.get(bucket.as_str()).cloned()
Expand Down
15 changes: 5 additions & 10 deletions crates/polars-io/src/file_cache/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::cache::{get_env_file_cache_ttl, FILE_CACHE};
use super::entry::FileCacheEntry;
use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
use crate::cloud::{
build_object_store, object_path_from_string, CloudLocation, CloudOptions, PolarsObjectStore,
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
use crate::path_utils::{ensure_directory_init, is_cloud_url, POLARS_TEMP_DIR_BASE_PATH};
use crate::pl_async;
Expand Down Expand Up @@ -76,7 +76,7 @@ pub fn init_entries_from_uri_list(
})
.map(|i| async move {
let (_, object_store) =
build_object_store(&uri_list[i], cloud_options).await?;
build_object_store(&uri_list[i], cloud_options, false).await?;
PolarsResult::Ok(PolarsObjectStore::new(object_store))
}),
)
Expand All @@ -90,14 +90,9 @@ pub fn init_entries_from_uri_list(
FILE_CACHE.init_entry(
uri.clone(),
|| {
let CloudLocation {
prefix, expansion, ..
} = CloudLocation::new(uri.as_ref()).unwrap();

let cloud_path = {
assert!(expansion.is_none(), "path should not contain wildcards");
object_path_from_string(prefix)?
};
let CloudLocation { prefix, .. } =
CloudLocation::new(uri.as_ref(), false).unwrap();
let cloud_path = object_path_from_str(&prefix)?;

let object_store =
object_stores[std::cmp::min(i, object_stores.len())].clone();
Expand Down
19 changes: 5 additions & 14 deletions crates/polars-io/src/ipc/ipc_reader_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use polars_core::schema::Schema;
use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult};

use crate::cloud::{
build_object_store, object_path_from_string, CloudLocation, CloudOptions, PolarsObjectStore,
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
use crate::file_cache::{init_entries_from_uri_list, FileCacheEntry};
use crate::predicates::PhysicalIoExpr;
Expand Down Expand Up @@ -67,19 +67,10 @@ impl IpcReaderAsync {
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<IpcReaderAsync> {
let cache_entry = init_entries_from_uri_list(&[Arc::from(uri)], cloud_options)?[0].clone();
let (
CloudLocation {
prefix, expansion, ..
},
store,
) = build_object_store(uri, cloud_options).await?;

let path = {
// Any wildcards should already have been resolved here. Without this assertion they would
// be ignored.
debug_assert!(expansion.is_none(), "path should not contain wildcards");
object_path_from_string(prefix)?
};
let (CloudLocation { prefix, .. }, store) =
build_object_store(uri, cloud_options, false).await?;

let path = object_path_from_str(&prefix)?;

Ok(Self {
store: PolarsObjectStore::new(store),
Expand Down
15 changes: 3 additions & 12 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use super::read_impl::compute_row_group_range;
use crate::cloud::{
build_object_store, object_path_from_string, CloudLocation, CloudOptions, PolarsObjectStore,
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
use crate::parquet::metadata::FileMetaDataRef;
use crate::pl_async::get_runtime;
Expand All @@ -39,17 +39,8 @@ impl ParquetObjectStore {
options: Option<&CloudOptions>,
metadata: Option<FileMetaDataRef>,
) -> PolarsResult<Self> {
let (
CloudLocation {
prefix, expansion, ..
},
store,
) = build_object_store(uri, options).await?;

// Any wildcards should already have been resolved here. Without this assertion they would
// be ignored.
debug_assert!(expansion.is_none(), "path should not contain wildcards");
let path = object_path_from_string(prefix)?;
let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
let path = object_path_from_str(&prefix)?;

Ok(ParquetObjectStore {
store: PolarsObjectStore::new(store),
Expand Down
29 changes: 15 additions & 14 deletions crates/polars-io/src/path_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub fn expand_paths_hive(
{
use polars_utils::_limit_path_len_io_err;

use crate::cloud::object_path_from_string;
use crate::cloud::object_path_from_str;

if first_path.starts_with("hf://") {
let (expand_start_idx, paths) =
Expand All @@ -172,23 +172,24 @@ pub fn expand_paths_hive(
-> PolarsResult<(usize, Vec<PathBuf>)> {
crate::pl_async::get_runtime().block_on_potential_spawn(async {
let (cloud_location, store) =
crate::cloud::build_object_store(path, cloud_options).await?;

let prefix = object_path_from_string(cloud_location.prefix.clone())?;

let out = if !path.ends_with("/") && cloud_location.expansion.is_none() && {
// We need to check if it is a directory for local paths (we can be here due
// to FORCE_ASYNC). For cloud paths the convention is that the user must add
// a trailing slash `/` to scan directories. We don't infer it as that would
// mean sending one network request per path serially (very slow).
is_cloud || PathBuf::from(path).is_file()
} {
crate::cloud::build_object_store(path, cloud_options, glob).await?;
let prefix = object_path_from_str(&cloud_location.prefix)?;

let out = if !path.ends_with("/")
&& (!glob || cloud_location.expansion.is_none())
&& {
// We need to check if it is a directory for local paths (we can be here due
// to FORCE_ASYNC). For cloud paths the convention is that the user must add
// a trailing slash `/` to scan directories. We don't infer it as that would
// mean sending one network request per path serially (very slow).
is_cloud || PathBuf::from(path).is_file()
} {
(
0,
vec![PathBuf::from(format_path(
&cloud_location.scheme,
&cloud_location.bucket,
&cloud_location.prefix,
prefix.as_ref(),
))],
)
} else {
Expand Down Expand Up @@ -251,7 +252,7 @@ pub fn expand_paths_hive(

let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes());

let path = if glob_start_idx.is_some() {
let path = if glob && glob_start_idx.is_some() {
path.clone()
} else {
let (expand_start_idx, paths) =
Expand Down
6 changes: 5 additions & 1 deletion crates/polars-utils/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ use std::path::Path;

use polars_error::*;

fn verbose() -> bool {
std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("") == "1"
}

pub fn _limit_path_len_io_err(path: &Path, err: io::Error) -> PolarsError {
let path = path.to_string_lossy();
let msg = if path.len() > 88 {
let msg = if path.len() > 88 && !verbose() {
let truncated_path: String = path.chars().skip(path.len() - 88).collect();
format!("{err}: ...{truncated_path}")
} else {
Expand Down
18 changes: 0 additions & 18 deletions py-polars/tests/unit/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import gzip
import io
import os
import sys
import textwrap
import zlib
Expand Down Expand Up @@ -2196,23 +2195,6 @@ def test_fsspec_not_available() -> None:
)


@pytest.mark.write_disk()
@pytest.mark.skipif(
os.environ.get("POLARS_FORCE_ASYNC") == "1" or sys.platform == "win32",
reason="only local",
)
def test_read_csv_no_glob(tmpdir: Path) -> None:
df = pl.DataFrame({"foo": 1})

p = tmpdir / "*.csv"
df.write_csv(str(p))
p = tmpdir / "*1.csv"
df.write_csv(str(p))

p = tmpdir / "*.csv"
assert_frame_equal(pl.read_csv(str(p), glob=False), df)


def test_read_csv_dtypes_deprecated() -> None:
csv = textwrap.dedent(
"""\
Expand Down
Loading

0 comments on commit 826882d

Please sign in to comment.