Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding cloud options to scan_parquet #173

Merged
merged 7 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,73 @@ features = [
"cov",
"group_by_list",
"sql",
"binary_encoding",
"rolling_window",
"json",
"dynamic_group_by",
"zip_with",
"simd",
"lazy",
"strings",
"temporal",
"random",
"object",
"fmt",
"performant",
"dtype-full",
"rows",
"round_series",
"is_unique",
"is_in",
"is_first_distinct",
"asof_join",
"cross_join",
"dot_product",
"concat_str",
"row_hash",
"reinterpret",
"mode",
"extract_jsonpath",
"cum_agg",
"rolling_window",
"repeat_by",
"interpolate",
"ewma",
"rank",
"propagate_nans",
"diff",
"pct_change",
"moment",
"diagonal_concat",
"abs",
"dot_diagram",
"dataframe_arithmetic",
"json",
"string_encoding",
"product",
"ndarray",
"unique_counts",
"log",
"serde-lazy",
"partition_by",
"pivot",
"semi_anti_join",
"parquet",
"to_dummies",
"ipc",
"avro",
"list_eval",
"arg_where",
"timezones",
"peaks",
"string_pad",
"cov",
"group_by_list",
"http",
"cloud",
"aws",
"gcp",
"azure"
]
git = "https://github.com/pola-rs/polars.git"
rev = "3cf4897e679b056d17a235d48867035265d43cdc"
Expand Down
2 changes: 1 addition & 1 deletion __tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ describe("parquet", () => {
});

test("scan:options", () => {
const df = pl.scanParquet(parquetpath, { numRows: 4 }).collectSync();
const df = pl.scanParquet(parquetpath, { nRows: 4 }).collectSync();
expect(df.shape).toEqual({ height: 4, width: 4 });
});
});
Expand Down
Binary file added bun.lockb
Binary file not shown.
59 changes: 40 additions & 19 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,33 +476,54 @@ interface RowCount {
}

interface ScanParquetOptions {
columns?: string[] | number[];
numRows?: number;
nRows?: number;
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
cache?: boolean;
parallel?: "auto" | "columns" | "row_groups" | "none";
rowCount?: RowCount;
cache?: boolean;
rechunk?: boolean;
hive_partitioning?: boolean;
lowMemory?: boolean;
useStatistics?: boolean;
hivePartitioning?: boolean;
cloudOptions?: Map<string, string>;
retries?: number;
}

/**
* __Lazily read from a parquet file or multiple files via glob patterns.__
* ___
* Lazily read from a local or cloud-hosted parquet file (or files).

This function allows the query optimizer to push down predicates and projections to
the scan level, typically increasing performance and reducing memory overhead.

* This allows the query optimizer to push down predicates and projections to the scan level,
* thereby potentially reducing memory overhead.
* @param path Path to a file or or glob pattern
* @param options.numRows Stop reading from parquet file after reading ``numRows``.
* @param options.cache Cache the result after reading.
* @param options.parallel Read the parquet file in parallel. The single threaded reader consumes less memory.
* @param options.rechunk In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
* thereby potentially reducing memory overhead.
* @param source - Path(s) to a file. If a single path is given, it can be a globbing pattern.
@param options.nRows - Stop reading from parquet file after reading `n_rows`.
@param options.rowIndexName - If not None, this will insert a row index column with the given name into the DataFrame
@param options.rowIndexOffset - Offset to start the row index column (only used if the name is set)
@param options.parallel : {'auto', 'columns', 'row_groups', 'none'}
This determines the direction of parallelism. 'auto' will try to determine the optimal direction.
@param options.useStatistics - Use statistics in the parquet to determine if pages can be skipped from reading.
@param options.hivePartitioning - Infer statistics and schema from hive partitioned URL and use them to prune reads.
@param options.rechunk - In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
@param options.lowMemory - Reduce memory pressure at the expense of performance.
@param options.cache - Cache the result after reading.
@param options.storageOptions - Options that indicate how to connect to a cloud provider.
If the cloud provider is not supported by Polars, the storage options are passed to `fsspec.open()`.

The cloud providers currently supported are AWS, GCP, and Azure.
See supported keys here:

* `aws <https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html>`_
* `gcp <https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html>`_
* `azure <https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html>`_

If `storage_options` is not provided, Polars will try to infer the information from environment variables.
@param retries - Number of retries if accessing a cloud instance fails.
*/
export function scanParquet(path: string, options: ScanParquetOptions = {}) {
const pliOptions: any = {};

pliOptions.nRows = options?.numRows;
pliOptions.rowCount = options?.rowCount;
pliOptions.parallel = options?.parallel ?? "auto";
return _LazyDataFrame(pli.scanParquet(path, pliOptions));
export function scanParquet(source: string, options: ScanParquetOptions = {}) {
const defaultOptions = { parallel: "auto" };
const pliOptions = { ...defaultOptions, ...options };
return _LazyDataFrame(pli.scanParquet(source, pliOptions));
}

export interface ReadIPCOptions {
Expand Down
24 changes: 21 additions & 3 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,11 +709,11 @@ pub struct ScanParquetOptions {
pub parallel: Wrap<ParallelStrategy>,
pub row_count: Option<JsRowCount>,
pub rechunk: Option<bool>,
pub row_count_name: Option<String>,
pub row_count_offset: Option<u32>,
pub low_memory: Option<bool>,
pub use_statistics: Option<bool>,
pub hive_partitioning: Option<bool>,
pub cloud_options: Option<HashMap::<String, String>>,
pub retries: Option<i64>,
}

#[napi(catch_unwind)]
Expand All @@ -725,7 +725,25 @@ pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<J
let rechunk = options.rechunk.unwrap_or(false);
let low_memory = options.low_memory.unwrap_or(false);
let use_statistics = options.use_statistics.unwrap_or(false);
let cloud_options = Some(CloudOptions::default());

let mut cloud_options: Option<CloudOptions> = if let Some(o) = options.cloud_options {
let co: Vec<(String, String)> = o.into_iter().map(|kv: (String, String)| kv).collect();
Some(CloudOptions::from_untyped_config(&path, co).map_err(JsPolarsErr::from)?)
} else {
None
};

let retries = options.retries.unwrap_or_else(|| 2) as usize;
if retries > 0 {
cloud_options =
cloud_options
.or_else(|| Some(CloudOptions::default()))
.map(|mut options| {
options.max_retries = retries;
options
});
}

let hive_partitioning: bool = options.hive_partitioning.unwrap_or(false);
let args = ScanArgsParquet {
n_rows,
Expand Down