Skip to content

Commit

Permalink
feat: Add credential provider utility classes for AWS, GCP (#19297)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Oct 18, 2024
1 parent 997ebb4 commit a3401dc
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 28 deletions.
37 changes: 30 additions & 7 deletions crates/polars-io/src/cloud/credential_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,19 @@ impl<C: Clone> FetchedCredentialsCache<C> {
update_func: impl Future<Output = PolarsResult<(C, u64)>>,
) -> PolarsResult<C> {
let verbose = config::verbose();

fn expiry_msg(last_fetched_expiry: u64, now: u64) -> String {
if last_fetched_expiry == u64::MAX {
"expiry = (never expires)".into()
} else {
format!(
"expiry = {} (in {} seconds)",
last_fetched_expiry,
last_fetched_expiry.saturating_sub(now)
)
}
}

let mut inner = self.0.lock().await;
let (last_fetched_credentials, last_fetched_expiry) = &mut *inner;

Expand All @@ -379,8 +392,8 @@ impl<C: Clone> FetchedCredentialsCache<C> {
if last_fetched_expiry.saturating_sub(current_time) < REQUEST_TIME_BUFFER {
if verbose {
eprintln!(
"[FetchedCredentialsCache]: Call update_func: current_time = {},\
last_fetched_expiry = {}",
"[FetchedCredentialsCache]: Call update_func: current_time = {}\
, last_fetched_expiry = {}",
current_time, *last_fetched_expiry
)
}
Expand All @@ -402,17 +415,27 @@ impl<C: Clone> FetchedCredentialsCache<C> {

if verbose {
eprintln!(
"[FetchedCredentialsCache]: Finish update_func: \
new expiry = {} (in {} seconds)",
*last_fetched_expiry,
last_fetched_expiry.saturating_sub(
"[FetchedCredentialsCache]: Finish update_func: new {}",
expiry_msg(
*last_fetched_expiry,
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
),
)
)
}
} else if verbose {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
eprintln!(
"[FetchedCredentialsCache]: Using cached credentials: \
current_time = {}, {}",
now,
expiry_msg(*last_fetched_expiry, now)
)
}

Ok(last_fetched_credentials.clone())
Expand Down
9 changes: 9 additions & 0 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use once_cell::sync::Lazy;
use polars_core::config;
use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult};
use polars_utils::aliases::PlHashMap;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -58,6 +59,8 @@ pub async fn build_object_store(
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed, glob)?;

// FIXME: `credential_provider` is currently serializing the entire Python function here
// into a string with pickle for this cache key because we are using `serde_json::to_string`
let key = url_and_creds_to_key(&parsed, options);
let mut allow_cache = true;

Expand Down Expand Up @@ -124,6 +127,12 @@ pub async fn build_object_store(
let mut cache = OBJECT_STORE_CACHE.write().await;
// Clear the cache if we surpass a certain amount of buckets.
if cache.len() > 8 {
if config::verbose() {
eprintln!(
"build_object_store: clearing store cache (cache.len(): {})",
cache.len()
);
}
cache.clear()
}
cache.insert(key, store.clone());
Expand Down
11 changes: 11 additions & 0 deletions py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,14 @@ Connect to pyarrow datasets.
:toctree: api/

scan_pyarrow_dataset

Cloud Credentials
~~~~~~~~~~~~~~~~~
Configuration for cloud credential provisioning.

.. autosummary::
:toctree: api/

CredentialProvider
CredentialProviderAWS
CredentialProviderGCP
13 changes: 13 additions & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@
scan_parquet,
scan_pyarrow_dataset,
)
from polars.io.cloud import (
CredentialProvider,
CredentialProviderAWS,
CredentialProviderFunction,
CredentialProviderFunctionReturn,
CredentialProviderGCP,
)
from polars.lazyframe import GPUEngine, LazyFrame
from polars.meta import (
build_info,
Expand Down Expand Up @@ -266,6 +273,12 @@
"scan_ndjson",
"scan_parquet",
"scan_pyarrow_dataset",
# polars.io.cloud
"CredentialProvider",
"CredentialProviderAWS",
"CredentialProviderFunction",
"CredentialProviderFunctionReturn",
"CredentialProviderGCP",
# polars.stringcache
"StringCache",
"disable_string_cache",
Expand Down
8 changes: 4 additions & 4 deletions py-polars/polars/_typing.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from __future__ import annotations

from collections.abc import Collection, Iterable, Mapping, Sequence
from pathlib import Path
from typing import (
IO,
TYPE_CHECKING,
Any,
Callable,
Literal,
Optional,
Protocol,
TypedDict,
TypeVar,
Expand Down Expand Up @@ -297,6 +297,6 @@ def fetchmany(self, *args: Any, **kwargs: Any) -> Any:
# LazyFrame engine selection
EngineType: TypeAlias = Union[Literal["cpu", "gpu"], "GPUEngine"]

CredentialProviderFunction: TypeAlias = Callable[
[], tuple[dict[str, Optional[str]], Optional[int]]
ScanSource: TypeAlias = Union[
str, Path, IO[bytes], bytes, list[str], list[Path], list[IO[bytes]], list[bytes]
]
6 changes: 5 additions & 1 deletion py-polars/polars/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, overload

from polars._utils.various import is_int_sequence, is_str_sequence, normalize_filepath
from polars._utils.various import (
is_int_sequence,
is_str_sequence,
normalize_filepath,
)
from polars.dependencies import _FSSPEC_AVAILABLE, fsspec
from polars.exceptions import NoDataError

Expand Down
15 changes: 15 additions & 0 deletions py-polars/polars/io/cloud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from polars.io.cloud.credential_provider import (
CredentialProvider,
CredentialProviderAWS,
CredentialProviderFunction,
CredentialProviderFunctionReturn,
CredentialProviderGCP,
)

__all__ = [
"CredentialProvider",
"CredentialProviderAWS",
"CredentialProviderFunction",
"CredentialProviderFunctionReturn",
"CredentialProviderGCP",
]
55 changes: 55 additions & 0 deletions py-polars/polars/io/cloud/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Literal

from polars._utils.various import is_path_or_str_sequence

if TYPE_CHECKING:
from polars._typing import ScanSource


def _first_scan_path(
source: ScanSource,
) -> str | Path | None:
if isinstance(source, (str, Path)):
return source
elif is_path_or_str_sequence(source) and source:
return source[0]

return None


def _infer_cloud_type(
source: ScanSource,
) -> Literal["aws", "azure", "gcp", "file", "http", "hf"] | None:
if (path := _first_scan_path(source)) is None:
return None

splitted = str(path).split("://", maxsplit=1)

# Fast path - local file
if not splitted:
return None

scheme = splitted[0]

if scheme == "file":
return "file"

if any(scheme == x for x in ["s3", "s3a"]):
return "aws"

if any(scheme == x for x in ["az", "azure", "adl", "abfs", "abfss"]):
return "azure"

if any(scheme == x for x in ["gs", "gcp", "gcs"]):
return "gcp"

if any(scheme == x for x in ["http", "https"]):
return "http"

if scheme == "hf":
return "hf"

return None
Loading

0 comments on commit a3401dc

Please sign in to comment.