From c068e76496b57e3dd5da110c202c37cb25fdf915 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 12 Apr 2024 14:34:24 +0200 Subject: [PATCH] fix: Include cloud creds in cache key (#15609) --- crates/polars-io/Cargo.toml | 2 +- crates/polars-io/src/cloud/object_store_setup.rs | 11 +++++++---- crates/polars-plan/src/logical_plan/pyarrow.rs | 8 ++++---- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index 43c3d677b8db..d8124dada31b 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -107,7 +107,7 @@ async = [ "polars-error/regex", "polars-parquet?/async", ] -cloud = ["object_store", "async", "polars-error/object_store", "url"] +cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde"] aws = ["object_store/aws", "cloud", "reqwest"] azure = ["object_store/azure", "cloud"] gcp = ["object_store/gcp", "cloud"] diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index bd518589ea97..ef8618f7ea0d 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -29,11 +29,14 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult { /// Get the key of a url for object store registration. /// The credential info will be removed -fn url_to_key(url: &Url) -> String { +fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { + // We include credentials as they can expire, so users will send new credentials for the same url. + let creds = serde_json::to_string(&options).unwrap_or_else(|_| "".into()); format!( - "{}://{}", + "{}://{}<\\creds\\>{}", url.scheme(), &url[url::Position::BeforeHost..url::Position::AfterPort], + creds ) } @@ -49,7 +52,7 @@ pub async fn build_object_store( let parsed = parse_url(url).map_err(to_compute_err)?; let cloud_location = CloudLocation::from_url(&parsed)?; - let key = url_to_key(&parsed); + let key = url_and_creds_to_key(&parsed, options); let mut allow_cache = true; { @@ -117,7 +120,7 @@ pub async fn build_object_store( if allow_cache { let mut cache = OBJECT_STORE_CACHE.write().await; // Clear the cache if we surpass a certain amount of buckets. - if cache.len() > 32 { + if cache.len() > 8 { cache.clear() } cache.insert(key, store.clone()); diff --git a/crates/polars-plan/src/logical_plan/pyarrow.rs b/crates/polars-plan/src/logical_plan/pyarrow.rs index dba0a632a064..a762debb6484 100644 --- a/crates/polars-plan/src/logical_plan/pyarrow.rs +++ b/crates/polars-plan/src/logical_plan/pyarrow.rs @@ -161,10 +161,10 @@ pub(super) fn predicate_to_pa( input, .. } => { - if !matches!(expr_arena.get(input[0]), AExpr::Column(_)) { + if !matches!(expr_arena.get(input.first()?.node()), AExpr::Column(_)) { None } else { - let col = predicate_to_pa(*input.first()?, expr_arena, args)?; + let col = predicate_to_pa(input.first()?.node(), expr_arena, args)?; let left_cmp_op = match closed { ClosedInterval::Both | ClosedInterval::Left => Operator::Lt, ClosedInterval::None | ClosedInterval::Right => Operator::LtEq, @@ -174,8 +174,8 @@ pub(super) fn predicate_to_pa( ClosedInterval::None | ClosedInterval::Left => Operator::GtEq, }; - let lower = predicate_to_pa(*input.get(1)?, expr_arena, args)?; - let upper = predicate_to_pa(*input.get(2)?, expr_arena, args)?; + let lower = predicate_to_pa(input.get(1)?.node(), expr_arena, args)?; + let upper = predicate_to_pa(input.get(2)?.node(), expr_arena, args)?; Some(format!( "(({col} {left_cmp_op} {lower}) & ({col} {right_cmp_op} {upper}))"