Skip to content

Commit

Permalink
feat: add elementwise select and with_columns to new streaming engine (
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Jun 26, 2024
1 parent df989de commit 49f5ef5
Show file tree
Hide file tree
Showing 20 changed files with 230 additions and 50 deletions.
21 changes: 0 additions & 21 deletions crates/polars-core/src/frame/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,3 @@ impl TryFrom<StructArray> for DataFrame {
DataFrame::new(columns)
}
}

impl From<&Schema> for DataFrame {
fn from(schema: &Schema) -> Self {
let cols = schema
.iter()
.map(|(name, dtype)| Series::new_empty(name, dtype))
.collect();
unsafe { DataFrame::new_no_checks(cols) }
}
}

impl From<&ArrowSchema> for DataFrame {
fn from(schema: &ArrowSchema) -> Self {
let cols = schema
.fields
.iter()
.map(|fld| Series::new_empty(fld.name.as_str(), &(fld.data_type().into())))
.collect();
unsafe { DataFrame::new_no_checks(cols) }
}
}
19 changes: 19 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,25 @@ impl DataFrame {
unsafe { DataFrame::new_no_checks(Vec::new()) }
}

/// Create an empty `DataFrame` with empty columns as per the `schema`.
pub fn empty_with_schema(schema: &Schema) -> Self {
let cols = schema
.iter()
.map(|(name, dtype)| Series::new_empty(name, dtype))
.collect();
unsafe { DataFrame::new_no_checks(cols) }
}

/// Create an empty `DataFrame` with empty columns as per the `schema`.
pub fn empty_with_arrow_schema(schema: &ArrowSchema) -> Self {
let cols = schema
.fields
.iter()
.map(|fld| Series::new_empty(fld.name.as_str(), &(fld.data_type().into())))
.collect();
unsafe { DataFrame::new_no_checks(cols) }
}

/// Removes the last `Series` from the `DataFrame` and returns it, or [`None`] if it is empty.
///
/// # Example
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,9 @@ impl<'a> CoreReader<'a> {
// An empty file with a schema should return an empty DataFrame with that schema
if bytes.is_empty() {
let mut df = if projection.len() == self.schema.len() {
DataFrame::from(self.schema.as_ref())
DataFrame::empty_with_schema(self.schema.as_ref())
} else {
DataFrame::from(
DataFrame::empty_with_schema(
&projection
.iter()
.map(|&i| self.schema.get_at_index(i).unwrap())
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn materialize_empty_df(
} else {
Cow::Borrowed(reader_schema)
};
let mut df = DataFrame::from(schema.as_ref());
let mut df = DataFrame::empty_with_arrow_schema(&schema);

if let Some(row_index) = row_index {
df.insert_column(0, Series::new_empty(&row_index.name, &IDX_DTYPE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl<const FIXED: bool> AggHashTable<FIXED> {
let (skip_len, take_len) = if let Some((offset, slice_len)) = slice {
if *offset as usize >= local_len {
*offset -= local_len as i64;
return DataFrame::from(self.output_schema.as_ref());
return DataFrame::empty_with_schema(&self.output_schema);
} else {
let out = (*offset as usize, *slice_len);
*offset = 0;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sinks/group_by/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub(super) fn finalize_group_by(
ooc_payload: Option<(IOThread, Box<dyn Sink>)>,
) -> PolarsResult<FinalizedSink> {
let df = if dfs.is_empty() {
DataFrame::from(output_schema)
DataFrame::empty_with_schema(output_schema)
} else {
let mut df = accumulate_dataframes_vertical_unchecked(dfs);
// re init to check duplicates
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ impl Sink for OrderedSink {
}
fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
if self.chunks.is_empty() {
return Ok(FinalizedSink::Finished(DataFrame::from(
self.schema.as_ref(),
return Ok(FinalizedSink::Finished(DataFrame::empty_with_schema(
&self.schema,
)));
}
self.sort();
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ impl Sink for SliceSink {
let mut chunks = chunks.lock().unwrap();
let chunks: Vec<DataChunk> = std::mem::take(chunks.as_mut());
if chunks.is_empty() {
return Ok(FinalizedSink::Finished(DataFrame::from(
self.schema.as_ref(),
return Ok(FinalizedSink::Finished(DataFrame::empty_with_schema(
&self.schema,
)));
}

Expand Down
3 changes: 3 additions & 0 deletions crates/polars-plan/src/plans/ir/schema.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use recursive::recursive;

use super::*;

impl IR {
Expand Down Expand Up @@ -61,6 +63,7 @@ impl IR {
}

/// Get the schema of the logical plan node.
#[recursive]
pub fn schema<'a>(&'a self, arena: &'a Arena<IR>) -> Cow<'a, SchemaRef> {
use IR::*;
let schema = match self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl<'a> PredicatePushDown<'a> {
}
if new_paths.is_empty() {
let schema = output_schema.as_ref().unwrap_or(&file_info.schema);
let df = DataFrame::from(schema.as_ref());
let df = DataFrame::empty_with_schema(schema);

return Ok(DataFrameScan {
df: Arc::new(df),
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl SQLContext {
None => {
let tbl = table_name.to_string();
if let Some(lf) = self.table_map.get_mut(&tbl) {
*lf = DataFrame::from(
*lf = DataFrame::empty_with_schema(
lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
.unwrap()
.as_ref(),
Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct GraphNode {
}

/// A pipe sends data between nodes.
#[allow(unused)] // TODO: remove.
pub struct LogicalPipe {
// Node that we send data to.
sender: GraphNodeKey,
Expand Down
1 change: 0 additions & 1 deletion crates/polars-stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(unused)] // TODO: remove.

#[allow(unused)] // TODO: remove.
mod async_executor;
#[allow(unused)] // TODO: remove.
mod async_primitives;
Expand Down
28 changes: 19 additions & 9 deletions crates/polars-stream/src/nodes/in_memory_sink.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
use std::cmp::Reverse;
use std::collections::{BinaryHeap, VecDeque};
use std::sync::Arc;

use parking_lot::Mutex;
use polars_core::frame::DataFrame;
use polars_core::schema::Schema;
use polars_core::series::Series;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_core::utils::rayon::iter::{IntoParallelIterator, ParallelIterator};
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_expr::state::ExecutionState;
use polars_utils::priority::Priority;
use polars_utils::sync::SyncPtr;

use super::ComputeNode;
use crate::async_executor::{JoinHandle, TaskScope};
use crate::async_primitives::pipe::{Receiver, Sender};
use crate::morsel::Morsel;
use crate::utils::in_memory_linearize::linearize;

#[derive(Default)]
pub struct InMemorySink {
morsels_per_pipe: Mutex<Vec<Vec<Morsel>>>,
schema: Arc<Schema>,
}

impl InMemorySink {
pub fn new(schema: Arc<Schema>) -> Self {
Self {
morsels_per_pipe: Mutex::default(),
schema,
}
}
}

impl ComputeNode for InMemorySink {
Expand Down Expand Up @@ -51,8 +57,12 @@ impl ComputeNode for InMemorySink {
}

fn finalize(&mut self) -> PolarsResult<Option<DataFrame>> {
let mut morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut());
let morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut());
let dataframes = linearize(morsels_per_pipe);
Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes)))
if dataframes.is_empty() {
Ok(Some(DataFrame::empty_with_schema(&self.schema)))
} else {
Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes)))
}
}
}
1 change: 1 addition & 0 deletions crates/polars-stream/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::morsel::Morsel;
pub mod filter;
pub mod in_memory_sink;
pub mod in_memory_source;
pub mod select;
pub mod simple_projection;

pub trait ComputeNode {
Expand Down
90 changes: 90 additions & 0 deletions crates/polars-stream/src/nodes/select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::sync::Arc;

use polars_core::frame::DataFrame;
use polars_core::schema::Schema;
use polars_core::series::Series;
use polars_error::PolarsResult;
use polars_expr::prelude::PhysicalExpr;
use polars_expr::state::ExecutionState;

use super::ComputeNode;
use crate::async_executor::{JoinHandle, TaskScope};
use crate::async_primitives::pipe::{Receiver, Sender};
use crate::morsel::Morsel;

pub struct SelectNode {
selectors: Vec<Arc<dyn PhysicalExpr>>,
schema: Arc<Schema>,
extend_original: bool,
}

impl SelectNode {
pub fn new(
selectors: Vec<Arc<dyn PhysicalExpr>>,
schema: Arc<Schema>,
extend_original: bool,
) -> Self {
Self {
selectors,
schema,
extend_original,
}
}
}

impl ComputeNode for SelectNode {
fn spawn<'env, 's>(
&'env self,
scope: &'s TaskScope<'s, 'env>,
_pipeline: usize,
recv: Vec<Receiver<Morsel>>,
send: Vec<Sender<Morsel>>,
state: &'s ExecutionState,
) -> JoinHandle<PolarsResult<()>> {
let [mut recv] = <[_; 1]>::try_from(recv).ok().unwrap();
let [mut send] = <[_; 1]>::try_from(send).ok().unwrap();

scope.spawn_task(true, async move {
while let Ok(morsel) = recv.recv().await {
let morsel = morsel.try_map(|df| {
// Select columns.
let mut selected: Vec<Series> = self
.selectors
.iter()
.map(|s| s.evaluate(&df, state))
.collect::<PolarsResult<_>>()?;

// Extend or create new dataframe.
let ret = if self.extend_original {
let mut out = df.clone();
out._add_columns(selected, &self.schema)?;
out
} else {
// Broadcast scalars.
let max_non_unit_length = selected
.iter()
.map(|s| s.len())
.filter(|l| *l != 1)
.max()
.unwrap_or(1);
for s in &mut selected {
if s.len() != max_non_unit_length {
assert!(s.len() == 1, "got series of incompatible lengths");
*s = s.new_from_index(0, max_non_unit_length);
}
}
unsafe { DataFrame::new_no_checks(selected) }
};

PolarsResult::Ok(ret)
})?;

if send.send(morsel).await.is_err() {
break;
}
}

Ok(())
})
}
}
48 changes: 46 additions & 2 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,50 @@ pub fn lower_ir(
expr_arena: &mut Arena<AExpr>,
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
) -> PolarsResult<PhysNodeKey> {
match ir_arena.get(node) {
let ir_node = ir_arena.get(node);
match ir_node {
IR::SimpleProjection { input, columns } => {
let schema = columns.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::SimpleProjection { input, schema }))
},

// TODO: split partially streamable selections to avoid fallback as much as possible.
IR::Select {
input,
expr,
schema,
..
} if expr.iter().all(|e| is_streamable(e.node(), expr_arena)) => {
let selectors = expr.clone();
let schema = schema.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::Select {
input,
selectors,
schema,
extend_original: false,
}))
},

// TODO: split partially streamable selections to avoid fallback as much as possible.
IR::HStack {
input,
exprs,
schema,
..
} if exprs.iter().all(|e| is_streamable(e.node(), expr_arena)) => {
let selectors = exprs.clone();
let schema = schema.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::Select {
input,
selectors,
schema,
extend_original: true,
}))
},

IR::Filter { input, predicate } if is_streamable(predicate.node(), expr_arena) => {
let predicate = predicate.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Expand Down Expand Up @@ -57,8 +100,9 @@ pub fn lower_ir(

IR::Sink { input, payload } => {
if *payload == SinkType::Memory {
let schema = ir_node.schema(ir_arena).into_owned();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
return Ok(phys_sm.insert(PhysNode::InMemorySink { input }));
return Ok(phys_sm.insert(PhysNode::InMemorySink { input, schema }));
}

todo!()
Expand Down
Loading

0 comments on commit 49f5ef5

Please sign in to comment.