Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Update Cloud plan preparation #17802

Merged
merged 10 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use polars_ops::prelude::{JoinArgs, JoinType, JoinValidation};
#[cfg(feature = "rank")]
pub use polars_ops::prelude::{RankMethod, RankOptions};
#[cfg(feature = "polars_cloud")]
pub use polars_plan::client::assert_cloud_eligible;
pub use polars_plan::client::prepare_cloud_plan;
pub use polars_plan::plans::{
AnonymousScan, AnonymousScanArgs, AnonymousScanOptions, DslPlan, Literal, LiteralValue, Null,
NULL,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ month_end = ["polars-time/month_end"]
offset_by = ["polars-time/offset_by"]

bigidx = ["polars-core/bigidx"]
polars_cloud = []
polars_cloud = ["serde", "ciborium"]

panic_on_schema = []

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,73 @@
use polars_core::error::{polars_err, PolarsResult};
use polars_io::path_utils::is_cloud_url;

use crate::dsl::Expr;
use crate::prelude::DslPlan;
use crate::plans::options::SinkType;
use crate::plans::{DslFunction, DslPlan, FileScan, FunctionNode};

/// Assert that the given [`DslPlan`] is eligible to be executed on Polars Cloud.
pub(super) fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
let mut expr_stack = vec![];
for plan_node in dsl.into_iter() {
match plan_node {
DslPlan::MapFunction {
function: DslFunction::FunctionNode(function),
..
} => match function {
FunctionNode::Opaque { .. } => return ineligible_error("contains opaque function"),
#[cfg(feature = "python")]
FunctionNode::OpaquePython { .. } => {
return ineligible_error("contains Python function")
},
_ => (),
},
#[cfg(feature = "python")]
DslPlan::PythonScan { .. } => return ineligible_error("contains Python scan"),
DslPlan::GroupBy { apply: Some(_), .. } => {
return ineligible_error("contains Python function in group by operation")
},
DslPlan::Scan { paths, .. }
stinodego marked this conversation as resolved.
Show resolved Hide resolved
if paths.lock().unwrap().0.iter().any(|p| !is_cloud_url(p)) =>
{
return ineligible_error("contains scan of local file system")
},
DslPlan::Scan {
scan_type: FileScan::Anonymous { .. },
..
} => return ineligible_error("contains anonymous scan"),
DslPlan::Sink { payload, .. } => {
if !matches!(payload, SinkType::Cloud { .. }) {
return ineligible_error("contains sink to non-cloud location");
}
},
plan => {
plan.get_expr(&mut expr_stack);

for expr in expr_stack.drain(..) {
for expr_node in expr.into_iter() {
match expr_node {
Expr::AnonymousFunction { .. } => {
return ineligible_error("contains anonymous function")
},
Expr::RenameAlias { .. } => {
return ineligible_error("contains custom name remapping")
},
_ => (),
}
}
}
},
}
}
Ok(())
}

fn ineligible_error(message: &str) -> PolarsResult<()> {
Err(polars_err!(
InvalidOperation:
"logical plan ineligible for execution on Polars Cloud: {message}"
))
}

impl DslPlan {
fn inputs<'a>(&'a self, scratch: &mut Vec<&'a DslPlan>) {
Expand Down Expand Up @@ -35,7 +103,7 @@ impl DslPlan {
}
}

pub(super) fn get_expr<'a>(&'a self, scratch: &mut Vec<&'a Expr>) {
fn get_expr<'a>(&'a self, scratch: &mut Vec<&'a Expr>) {
use DslPlan::*;
match self {
Filter { predicate, .. } => scratch.push(predicate),
Expand Down
92 changes: 31 additions & 61 deletions crates/polars-plan/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,38 @@
mod dsl;
mod check;

use polars_core::error::{polars_err, PolarsResult};
use std::sync::Arc;

use polars_core::error::{polars_ensure, polars_err, PolarsResult};
use polars_io::parquet::write::ParquetWriteOptions;
use polars_io::path_utils::is_cloud_url;

use crate::dsl::Expr;
use crate::plans::options::SinkType;
use crate::plans::{DslFunction, DslPlan, FunctionNode};
use crate::plans::options::{FileType, SinkType};
use crate::plans::DslPlan;

/// Assert that the given [`DslPlan`] is eligible to be executed on Polars Cloud.
pub fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
let mut expr_stack = vec![];
for plan_node in dsl.into_iter() {
match plan_node {
DslPlan::MapFunction {
function: DslFunction::FunctionNode(function),
..
} => match function {
FunctionNode::Opaque { .. } => return ineligible_error("contains opaque function"),
#[cfg(feature = "python")]
FunctionNode::OpaquePython { .. } => {
return ineligible_error("contains Python function")
},
_ => (),
},
#[cfg(feature = "python")]
DslPlan::PythonScan { .. } => return ineligible_error("contains Python scan"),
DslPlan::GroupBy { apply: Some(_), .. } => {
return ineligible_error("contains Python function in group by operation")
},
DslPlan::Scan { paths, .. }
if paths.lock().unwrap().0.iter().any(|p| !is_cloud_url(p)) =>
{
return ineligible_error("contains scan of local file system")
},
DslPlan::Sink { payload, .. } => {
if !matches!(payload, SinkType::Cloud { .. }) {
return ineligible_error("contains sink to non-cloud location");
}
},
plan => {
plan.get_expr(&mut expr_stack);
/// Prepare the given [`DslPlan`] for execution on Polars Cloud.
pub fn prepare_cloud_plan(dsl: DslPlan, uri: String) -> PolarsResult<Vec<u8>> {
// Check the plan for cloud eligibility.
check::assert_cloud_eligible(&dsl)?;

for expr in expr_stack.drain(..) {
for expr_node in expr.into_iter() {
match expr_node {
Expr::AnonymousFunction { .. } => {
return ineligible_error("contains anonymous function")
},
Expr::RenameAlias { .. } => {
return ineligible_error("contains custom name remapping")
},
_ => (),
}
}
}
},
}
}
Ok(())
}
// Add Sink node.
polars_ensure!(
is_cloud_url(&uri),
InvalidOperation: "non-cloud paths not supported: {uri}"
);
let sink_type = SinkType::Cloud {
uri: Arc::new(uri),
file_type: FileType::Parquet(ParquetWriteOptions::default()),
cloud_options: None,
};
let dsl = DslPlan::Sink {
input: Arc::new(dsl),
payload: sink_type,
};

// Serialize the plan.
let mut writer = Vec::new();
ciborium::into_writer(&dsl, &mut writer)
.map_err(|err| polars_err!(ComputeError: err.to_string()))?;

fn ineligible_error(message: &str) -> PolarsResult<()> {
Err(polars_err!(
InvalidOperation:
"logical plan ineligible for execution on Polars Cloud: {message}"
))
Ok(writer)
}
34 changes: 24 additions & 10 deletions py-polars/polars/_utils/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,42 @@
from typing import TYPE_CHECKING

import polars.polars as plr
from polars._utils.various import normalize_filepath

if TYPE_CHECKING:
from pathlib import Path

from polars import LazyFrame


def assert_cloud_eligible(lf: LazyFrame) -> None:
def prepare_cloud_plan(
lf: LazyFrame,
uri: Path | str,
**optimizations: bool,
) -> bytes:
"""
Assert that the given LazyFrame is eligible to be executed on Polars Cloud.

The following conditions will disqualify a LazyFrame from being eligible:

- Contains a user-defined function
- Scans a local filesystem
Prepare the given LazyFrame for execution on Polars Cloud.

Parameters
----------
lf
The LazyFrame to check.
The LazyFrame to prepare.
uri
Path to which the file should be written.
Must be a URI to an accessible object store location.
**optimizations
Optimizations to enable or disable in the query optimizer, e.g.
`projection_pushdown=False`.

Raises
------
AssertionError
ValueError
If the given LazyFrame is not eligible to be run on Polars Cloud.
The following conditions will disqualify a LazyFrame from being eligible:

- Contains a user-defined function
- Scans or sinks to a local filesystem
"""
plr.assert_cloud_eligible(lf._ldf)
uri = normalize_filepath(uri)
pylf = lf._set_sink_optimizations(**optimizations)
return plr.prepare_cloud_plan(pylf, uri)
10 changes: 5 additions & 5 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2570,11 +2570,11 @@ def _set_sink_optimizations(
slice_pushdown = False

return self._ldf.optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
type_coercion=type_coercion,
predicate_pushdown=predicate_pushdown,
projection_pushdown=projection_pushdown,
simplify_expression=simplify_expression,
slice_pushdown=slice_pushdown,
comm_subplan_elim=False,
comm_subexpr_elim=False,
cluster_with_columns=False,
Expand Down
13 changes: 7 additions & 6 deletions py-polars/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use pyo3::exceptions::PyAssertionError;
use pyo3::prelude::*;
use pyo3::types::PyBytes;

use crate::error::PyPolarsErr;
use crate::PyLazyFrame;

#[pyfunction]
pub fn assert_cloud_eligible(lf: PyLazyFrame) -> PyResult<()> {
let plan = &lf.ldf.logical_plan;
polars::prelude::assert_cloud_eligible(plan)
.map_err(|e| PyAssertionError::new_err(e.to_string()))?;
Ok(())
pub fn prepare_cloud_plan(lf: PyLazyFrame, uri: String, py: Python) -> PyResult<PyObject> {
let plan = lf.ldf.logical_plan;
let bytes = polars::prelude::prepare_cloud_plan(plan, uri).map_err(PyPolarsErr::from)?;

Ok(PyBytes::new_bound(py, &bytes).to_object(py))
}
4 changes: 2 additions & 2 deletions py-polars/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ impl PyLazyFrame {
type_coercion: bool,
predicate_pushdown: bool,
projection_pushdown: bool,
simplify_expr: bool,
simplify_expression: bool,
slice_pushdown: bool,
comm_subplan_elim: bool,
comm_subexpr_elim: bool,
Expand All @@ -498,7 +498,7 @@ impl PyLazyFrame {
let mut ldf = ldf
.with_type_coercion(type_coercion)
.with_predicate_pushdown(predicate_pushdown)
.with_simplify_expr(simplify_expr)
.with_simplify_expr(simplify_expression)
.with_slice_pushdown(slice_pushdown)
.with_cluster_with_columns(cluster_with_columns)
.with_streaming(streaming)
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ fn polars(py: Python, m: &Bound<PyModule>) -> PyResult<()> {

// Cloud
#[cfg(feature = "polars_cloud")]
m.add_wrapped(wrap_pyfunction!(cloud::assert_cloud_eligible))
m.add_wrapped(wrap_pyfunction!(cloud::prepare_cloud_plan))
.unwrap();

// Build info
Expand Down
Loading