Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Custom sink implementations #11315

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ impl Display for ErrString {

#[derive(Debug, thiserror::Error)]
pub enum PolarsError {
#[error("error sending data: {0}")]
SendError(ErrString),
#[error(transparent)]
ArrowError(Box<ArrowError>),
#[error("not found: {0}")]
Expand Down Expand Up @@ -113,6 +115,7 @@ impl PolarsError {
ShapeMismatch(msg) => ShapeMismatch(func(msg).into()),
StringCacheMismatch(msg) => StringCacheMismatch(func(msg).into()),
StructFieldNotFound(msg) => StructFieldNotFound(func(msg).into()),
SendError(msg) => SendError(func(msg).into()),
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,29 @@ impl LazyFrame {
Ok((out, timer_df))
}

/// Stream a query result into any sender that implements the `SinkSender` trait.
/// This is useful if the final result doesn't fit into memory. This
/// methods will return an error if the query cannot be completely done in a streaming
/// fashion.
pub fn sink_sender(mut self, tx: impl SinkSender, file_type: FileType) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::Sender {
tx: SinkSenderDyn::new(tx),
file_type,
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_parquet()` instead"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
}

/// Stream a query result into a parquet file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
Expand Down
5 changes: 4 additions & 1 deletion crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,11 @@ pub fn create_physical_plan(
SinkType::File{file_type, ..} => panic!(
"sink_{file_type:?} not yet supported in standard engine. Use 'collect().write_parquet()'"
),
SinkType::Sender { .. } => {
panic!("Sender Sink not supported in standard engine.")
}
#[cfg(feature = "cloud")]
SinkType::Cloud{..} => panic!("Cloud Sink not supported in standard engine.")
SinkType::Cloud{..} => panic!("Cloud Sink not supported in standard engine."),
}
}
Union { inputs, options } => {
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub(crate) use polars_ops::prelude::*;
pub use polars_ops::prelude::{JoinArgs, JoinType, JoinValidation};
pub use polars_plan::logical_plan::{
AnonymousScan, AnonymousScanOptions, Literal, LiteralValue, LogicalPlan, Null, NULL,
AnonymousScan, AnonymousScanOptions, FileType, Literal, LiteralValue, LogicalPlan, Null,
SinkSender, NULL,
};
#[cfg(feature = "csv")]
pub use polars_plan::prelude::CsvWriterOptions;
Expand Down
28 changes: 16 additions & 12 deletions crates/polars-pipe/src/executors/sinks/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::any::Any;
use std::path::Path;
use std::thread::JoinHandle;

use crossbeam_channel::{bounded, Receiver, Sender};
Expand Down Expand Up @@ -47,7 +46,7 @@ impl<W: std::io::Write> SinkWriter for polars_io::ipc::BatchedWriter<W> {
}

#[cfg(feature = "csv")]
impl SinkWriter for polars_io::csv::BatchedWriter<std::fs::File> {
impl<W: std::io::Write> SinkWriter for polars_io::csv::BatchedWriter<W> {
fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
self.write_batch(df)
}
Expand All @@ -63,12 +62,11 @@ pub struct ParquetSink {}
impl ParquetSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(
path: &Path,
writer: impl std::io::Write + Send + Sync + 'static,
options: ParquetWriteOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let file = std::fs::File::create(path)?;
let writer = ParquetWriter::new(file)
let writer = ParquetWriter::new(writer)
.with_compression(options.compression)
.with_data_pagesize_limit(options.data_pagesize_limit)
.with_statistics(options.statistics)
Expand Down Expand Up @@ -145,9 +143,12 @@ pub struct IpcSink {}
#[cfg(feature = "ipc")]
impl IpcSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(path: &Path, options: IpcWriterOptions, schema: &Schema) -> PolarsResult<FilesSink> {
let file = std::fs::File::create(path)?;
let writer = IpcWriter::new(file)
pub fn new(
writer: impl std::io::Write + Send + Sync + 'static,
options: IpcWriterOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let writer = IpcWriter::new(writer)
.with_compression(options.compression)
.batched(schema)?;

Expand Down Expand Up @@ -176,9 +177,12 @@ pub struct CsvSink {}
#[cfg(feature = "csv")]
impl CsvSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(path: &Path, options: CsvWriterOptions, schema: &Schema) -> PolarsResult<FilesSink> {
let file = std::fs::File::create(path)?;
let writer = CsvWriter::new(file)
pub fn new(
writer: impl std::io::Write + Send + Sync + 'static,
options: CsvWriterOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let writer = CsvWriter::new(writer)
.has_header(options.has_header)
.with_delimiter(options.serialize_options.delimiter)
.with_line_terminator(options.serialize_options.line_terminator)
Expand All @@ -192,7 +196,7 @@ impl CsvSink {
.with_quote_style(options.serialize_options.quote_style)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send + Sync>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
Expand Down
28 changes: 25 additions & 3 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,48 @@ where
path, file_type, ..
} => {
let path = path.as_ref().as_path();
let file = std::fs::File::create(path)?;
match &file_type {
#[cfg(feature = "parquet")]
FileType::Parquet(options) => {
Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?)
Box::new(ParquetSink::new(file, *options, input_schema.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "ipc")]
FileType::Ipc(options) => {
Box::new(IpcSink::new(path, *options, input_schema.as_ref())?)
Box::new(IpcSink::new(file, *options, input_schema.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "csv")]
FileType::Csv(options) => {
Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?)
Box::new(CsvSink::new(file, options.clone(), input_schema.as_ref())?)
as Box<dyn SinkTrait>
},
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
},
SinkType::Sender { tx, file_type } => match &file_type {
#[cfg(feature = "parquet")]
FileType::Parquet(options) => Box::new(ParquetSink::new(
tx.clone(),
*options,
input_schema.as_ref(),
)?) as Box<dyn SinkTrait>,
#[cfg(feature = "ipc")]
FileType::Ipc(options) => {
Box::new(IpcSink::new(tx.clone(), *options, input_schema.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "csv")]
FileType::Csv(options) => Box::new(CsvSink::new(
tx.clone(),
options.clone(),
input_schema.as_ref(),
)?) as Box<dyn SinkTrait>,
#[allow(unreachable_patterns)]
_ => unreachable!(),
},
#[cfg(feature = "cloud")]
SinkType::Cloud {
uri,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ impl LogicalPlan {
SinkType::File { .. } => "SINK (FILE)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (CLOUD)",
SinkType::Sender { .. } => "SINK (SENDER)",
},
};
self.write_dot(acc_str, prev_node, current_node, id_map)?;
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl ALogicalPlan {
SinkType::File { .. } => "sink (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "sink (cloud)",
SinkType::Sender { .. } => "sink (sender)",
},
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl LogicalPlan {
SinkType::File { .. } => "SINK (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (cloud)",
SinkType::Sender { .. } => "SINK (sender)",
};
write!(f, "{:indent$}{}", "", name)?;
input._format(f, sub_indent)
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub use functions::*;
pub use iterator::*;
pub use lit::*;
pub use optimizer::*;
pub use options::{FileType, SinkSender};
pub use schema::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand Down
60 changes: 60 additions & 0 deletions crates/polars-plan/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,70 @@ pub struct AnonymousScanOptions {
pub fmt_str: &'static str,
}

/// Allows for custom sink implementations. This can be used
/// to stream data to a remote location or to a different thread.
pub trait SinkSender: 'static + std::fmt::Debug + Send + Sync {
/// Send a buffer of bytes to whatever data structure is used to write to.
/// This can be a file, a network connection or a channel.
fn sink_send(&self, buf: &[u8]) -> PolarsResult<usize>;
/// In cases where there's a buffer that needs to be flushed.
/// This is not always the case.
fn sink_flush(&self) -> PolarsResult<()>;
}

#[derive(Debug, Clone)]
pub struct SinkSenderDyn(Arc<dyn SinkSender>);

impl SinkSenderDyn {
pub fn new(tx: impl SinkSender + 'static) -> Self {
Self(Arc::from(tx))
}
}

impl std::ops::Deref for SinkSenderDyn {
type Target = dyn SinkSender;
fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}

impl std::io::Write for SinkSenderDyn {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.sink_send(buf).map_or_else(
|e| Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
|_| Ok(buf.len()),
)
}
fn flush(&mut self) -> std::io::Result<()> {
self.sink_flush().map_or_else(
|e| Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
|_| Ok(()),
)
}
}

#[cfg(feature = "serde")]
impl Serialize for SinkSenderDyn {
fn serialize<S: serde::Serializer>(&self, _serializer: S) -> Result<S::Ok, S::Error> {
Err(serde::ser::Error::custom("cannot serialize SinkSenderDyn"))
}
}

#[cfg(feature = "serde")]
impl<'de> Deserialize<'de> for SinkSenderDyn {
fn deserialize<D: serde::Deserializer<'de>>(_deserializer: D) -> Result<Self, D::Error> {
Err(serde::de::Error::custom("cannot deserialize SinkSenderDyn"))
}
}

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, Debug)]
pub enum SinkType {
Memory,
Sender {
tx: SinkSenderDyn,
file_type: FileType,
},
File {
path: Arc<PathBuf>,
file_type: FileType,
Expand Down
2 changes: 2 additions & 0 deletions py-polars/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl std::convert::From<PyPolarsErr> for PyErr {
use PyPolarsErr::*;
match &err {
Polars(err) => match err {
PolarsError::SendError(err) => SendError::new_err(err.to_string()),
PolarsError::ArrowError(err) => ArrowErrorException::new_err(format!("{err:?}")),
PolarsError::ColumnNotFound(name) => ColumnNotFoundError::new_err(name.to_string()),
PolarsError::ComputeError(err) => ComputeError::new_err(err.to_string()),
Expand Down Expand Up @@ -70,6 +71,7 @@ impl Debug for PyPolarsErr {
}
}

create_exception!(exceptions, SendError, PyException);
create_exception!(exceptions, ArrowErrorException, PyException);
create_exception!(exceptions, ColumnNotFoundError, PyException);
create_exception!(exceptions, ComputeError, PyException);
Expand Down
Loading