Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding cloud features #266

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ features = [
"string_pad",
"replace",
"cov",
"http"
"http",
Bidek56 marked this conversation as resolved.
Show resolved Hide resolved
"cloud",
"aws",
"gcp",
"azure"
]
git = "https://github.com/pola-rs/polars.git"
rev = "7686025ac7738607f2d4f6887e9a1313b7c8b1e2"
Expand Down
25 changes: 7 additions & 18 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { isPath } from "./utils";
import { type LazyDataFrame, _LazyDataFrame } from "./lazy/dataframe";
import { type Readable, Stream } from "stream";
import { concat } from "./functions";
import type { ScanParquetOptions, RowCount } from "./types";

export interface ReadCsvOptions {
inferSchemaLength: number | null;
Expand All @@ -31,7 +32,7 @@ export interface ReadCsvOptions {
skipRows: number;
tryParseDates: boolean;
skipRowsAfterHeader: number;
rowCount: any;
rowCount: RowCount;
raiseIfEmpty: boolean;
truncateRaggedLines: boolean;
missingIsNull: boolean;
Expand Down Expand Up @@ -470,23 +471,6 @@ export function readAvro(pathOrBody, options = {}) {
throw new Error("must supply either a path or body");
}

interface RowCount {
name: string;
offset: string;
}

interface ScanParquetOptions {
nRows?: number;
cache?: boolean;
parallel?: "auto" | "columns" | "row_groups" | "none";
rowCount?: RowCount;
rechunk?: boolean;
lowMemory?: boolean;
useStatistics?: boolean;
cloudOptions?: Map<string, string>;
retries?: number;
}

/**
* Lazily read from a local or cloud-hosted parquet file (or files).

Expand All @@ -503,6 +487,10 @@ interface ScanParquetOptions {
This determines the direction of parallelism. 'auto' will try to determine the optimal direction.
@param options.useStatistics - Use statistics in the parquet to determine if pages can be skipped from reading.
@param options.hivePartitioning - Infer statistics and schema from hive partitioned URL and use them to prune reads.
@param options.glob - Expand path given via globbing rules.
@param options.hiveSchema - The column names and data types of the columns by which the data is partitioned.
If set to `None` (default), the schema of the Hive partitions is inferred.
@param options.tryParseHiveDates - Whether to try parsing hive values as date/datetime types.
@param options.rechunk - In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
@param options.lowMemory - Reduce memory pressure at the expense of performance.
@param options.cache - Cache the result after reading.
Expand All @@ -518,6 +506,7 @@ interface ScanParquetOptions {

If `storage_options` is not provided, Polars will try to infer the information from environment variables.
@param retries - Number of retries if accessing a cloud instance fails.
@param includeFilePaths - Include the path of the source file(s) as a column with this name.
*/
export function scanParquet(source: string, options: ScanParquetOptions = {}) {
const defaultOptions = { parallel: "auto" };
Expand Down
19 changes: 14 additions & 5 deletions polars/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,21 @@ export interface ReadParquetOptions {
* Options for {@link scanParquet}
*/
export interface ScanParquetOptions {
columns?: string[] | number[];
numRows?: number;
parallel?: "auto" | "columns" | "row_groups" | "none";
rowCount?: RowCount;
nRows?: number;
rowIndexName?: string;
rowIndexOffset?: number;
cache?: boolean;
parallel?: "auto" | "columns" | "row_groups" | "none";
glob?: boolean;
hivePartitioning?: boolean;
hiveSchema?: unknown;
tryParseHiveDates?: boolean;
rechunk?: boolean;
lowMemory?: boolean;
useStatistics?: boolean;
cloudOptions?: unknown;
Bidek56 marked this conversation as resolved.
Show resolved Hide resolved
retries?: number;
includeFilePaths?: string;
}

/**
Expand All @@ -156,7 +165,7 @@ export interface RowCount {
/** name of column */
name: string;
/** offset */
offset: string;
offset: number;
}

/**
Expand Down
40 changes: 31 additions & 9 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,22 +713,38 @@ pub fn scan_csv(path: String, options: ScanCsvOptions) -> napi::Result<JsLazyFra
#[napi(object)]
pub struct ScanParquetOptions {
pub n_rows: Option<i64>,
pub row_index_name: Option<String>,
pub row_index_offset: Option<u32>,
pub cache: Option<bool>,
pub parallel: Wrap<ParallelStrategy>,
pub row_count: Option<JsRowCount>,
pub glob: Option<bool>,
pub hive_partitioning: Option<bool>,
pub hive_schema: Option<Wrap<Schema>>,
pub try_parse_hive_dates: Option<bool>,
pub rechunk: Option<bool>,
pub low_memory: Option<bool>,
pub use_statistics: Option<bool>,
pub cloud_options: Option<HashMap<String, String>>,
pub retries: Option<i64>,
pub include_file_paths: Option<String>,
}

#[napi(catch_unwind)]
pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<JsLazyFrame> {
let n_rows = options.n_rows.map(|i| i as usize);
let cache = options.cache.unwrap_or(true);
let glob = options.glob.unwrap_or(true);
let parallel = options.parallel;
let row_index: Option<RowIndex> = options.row_count.map(|rc| rc.into());

let row_index: Option<RowIndex> = if let Some(idn) = options.row_index_name {
Some(RowIndex {
name: idn.into(),
offset: options.row_index_offset.unwrap_or(0)
})
} else {
None
};

let rechunk = options.rechunk.unwrap_or(false);
let low_memory = options.low_memory.unwrap_or(false);
let use_statistics = options.use_statistics.unwrap_or(false);
Expand All @@ -751,6 +767,16 @@ pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<J
});
}

let hive_schema = options.hive_schema.map(|s| Arc::new(s.0));
let hive_options = HiveOptions {
enabled: options.hive_partitioning,
hive_start_idx: 0,
schema: hive_schema,
try_parse_dates: options.try_parse_hive_dates.unwrap_or(true),
};

let include_file_paths = options.include_file_paths;

let args = ScanArgsParquet {
n_rows,
cache,
Expand All @@ -760,13 +786,9 @@ pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<J
low_memory,
cloud_options,
use_statistics,
// TODO: Support Hive partitioning.
hive_options: HiveOptions {
enabled: Some(false),
..Default::default()
},
glob: true,
include_file_paths: None
hive_options,
glob,
include_file_paths: include_file_paths.map(Arc::from),
};
let lf = LazyFrame::scan_parquet(path, args).map_err(JsPolarsErr::from)?;
Ok(lf.into())
Expand Down