diff --git a/crates/polars-error/src/lib.rs b/crates/polars-error/src/lib.rs index db190d63370b..536e856a2a33 100644 --- a/crates/polars-error/src/lib.rs +++ b/crates/polars-error/src/lib.rs @@ -41,6 +41,8 @@ impl Display for ErrString { #[derive(Debug, thiserror::Error)] pub enum PolarsError { + #[error("error sending data: {0}")] + SendError(ErrString), #[error(transparent)] ArrowError(Box), #[error("not found: {0}")] @@ -113,6 +115,7 @@ impl PolarsError { ShapeMismatch(msg) => ShapeMismatch(func(msg).into()), StringCacheMismatch(msg) => StringCacheMismatch(func(msg).into()), StructFieldNotFound(msg) => StructFieldNotFound(func(msg).into()), + SendError(msg) => SendError(func(msg).into()), } } } diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index e9a00849e343..1f1e694462e4 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -647,6 +647,29 @@ impl LazyFrame { Ok((out, timer_df)) } + /// Stream a query result into any sender that implements the `SinkSender` trait. + /// This is useful if the final result doesn't fit into memory. This + /// methods will return an error if the query cannot be completely done in a streaming + /// fashion. + pub fn sink_sender(mut self, tx: impl SinkSender, file_type: FileType) -> PolarsResult<()> { + self.opt_state.streaming = true; + self.logical_plan = LogicalPlan::Sink { + input: Box::new(self.logical_plan), + payload: SinkType::Sender { + tx: SinkSenderDyn::new(tx), + file_type, + }, + }; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: "cannot run the whole query in a streaming order; \ + use `collect().write_parquet()` instead" + ); + let _ = physical_plan.execute(&mut state)?; + Ok(()) + } + /// Stream a query result into a parquet file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index b576ed117feb..4bbb18361b77 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -156,8 +156,11 @@ pub fn create_physical_plan( SinkType::File{file_type, ..} => panic!( "sink_{file_type:?} not yet supported in standard engine. Use 'collect().write_parquet()'" ), + SinkType::Sender { .. } => { + panic!("Sender Sink not supported in standard engine.") + } #[cfg(feature = "cloud")] - SinkType::Cloud{..} => panic!("Cloud Sink not supported in standard engine.") + SinkType::Cloud{..} => panic!("Cloud Sink not supported in standard engine."), } } Union { inputs, options } => { diff --git a/crates/polars-lazy/src/prelude.rs b/crates/polars-lazy/src/prelude.rs index abd8c95a4f38..f51d63e395f6 100644 --- a/crates/polars-lazy/src/prelude.rs +++ b/crates/polars-lazy/src/prelude.rs @@ -1,7 +1,8 @@ pub(crate) use polars_ops::prelude::*; pub use polars_ops::prelude::{JoinArgs, JoinType, JoinValidation}; pub use polars_plan::logical_plan::{ - AnonymousScan, AnonymousScanOptions, FileType, Literal, LiteralValue, LogicalPlan, Null, NULL, + AnonymousScan, AnonymousScanOptions, FileType, Literal, LiteralValue, LogicalPlan, Null, + SinkSender, NULL, }; #[cfg(feature = "csv")] pub use polars_plan::prelude::CsvWriterOptions; diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index db0ab895a6d3..cedce0c04816 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -47,7 +47,7 @@ impl SinkWriter for polars_io::ipc::BatchedWriter { } #[cfg(feature = "csv")] -impl SinkWriter for polars_io::csv::BatchedWriter { +impl SinkWriter for polars_io::csv::BatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -63,12 +63,11 @@ pub struct ParquetSink {} impl ParquetSink { #[allow(clippy::new_ret_no_self)] pub fn new( - path: &Path, + writer: impl std::io::Write + Send + Sync + 'static, options: ParquetWriteOptions, schema: &Schema, ) -> PolarsResult { - let file = std::fs::File::create(path)?; - let writer = ParquetWriter::new(file) + let writer = ParquetWriter::new(writer) .with_compression(options.compression) .with_data_pagesize_limit(options.data_pagesize_limit) .with_statistics(options.statistics) @@ -145,9 +144,12 @@ pub struct IpcSink {} #[cfg(feature = "ipc")] impl IpcSink { #[allow(clippy::new_ret_no_self)] - pub fn new(path: &Path, options: IpcWriterOptions, schema: &Schema) -> PolarsResult { - let file = std::fs::File::create(path)?; - let writer = IpcWriter::new(file) + pub fn new( + writer: impl std::io::Write + Send + Sync + 'static, + options: IpcWriterOptions, + schema: &Schema, + ) -> PolarsResult { + let writer = IpcWriter::new(writer) .with_compression(options.compression) .batched(schema)?; @@ -176,9 +178,12 @@ pub struct CsvSink {} #[cfg(feature = "csv")] impl CsvSink { #[allow(clippy::new_ret_no_self)] - pub fn new(path: &Path, options: CsvWriterOptions, schema: &Schema) -> PolarsResult { - let file = std::fs::File::create(path)?; - let writer = CsvWriter::new(file) + pub fn new( + writer: impl std::io::Write + Send + Sync + 'static, + options: CsvWriterOptions, + schema: &Schema, + ) -> PolarsResult { + let writer = CsvWriter::new(writer) .has_header(options.has_header) .with_delimiter(options.serialize_options.delimiter) .with_line_terminator(options.serialize_options.line_terminator) @@ -192,7 +197,7 @@ impl CsvSink { .with_quote_style(options.serialize_options.quote_style) .batched(schema)?; - let writer = Box::new(writer) as Box; + let writer = Box::new(writer) as Box; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index cf4bbdb5d435..4b75c3b884fe 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -139,26 +139,48 @@ where path, file_type, .. } => { let path = path.as_ref().as_path(); + let file = std::fs::File::create(path)?; match &file_type { #[cfg(feature = "parquet")] FileType::Parquet(options) => { - Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?) + Box::new(ParquetSink::new(file, *options, input_schema.as_ref())?) as Box }, #[cfg(feature = "ipc")] FileType::Ipc(options) => { - Box::new(IpcSink::new(path, *options, input_schema.as_ref())?) + Box::new(IpcSink::new(file, *options, input_schema.as_ref())?) as Box }, #[cfg(feature = "csv")] FileType::Csv(options) => { - Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?) + Box::new(CsvSink::new(file, options.clone(), input_schema.as_ref())?) as Box }, #[allow(unreachable_patterns)] _ => unreachable!(), } }, + SinkType::Sender { tx, file_type } => match &file_type { + #[cfg(feature = "parquet")] + FileType::Parquet(options) => Box::new(ParquetSink::new( + tx.clone(), + *options, + input_schema.as_ref(), + )?) as Box, + #[cfg(feature = "ipc")] + FileType::Ipc(options) => { + Box::new(IpcSink::new(tx.clone(), *options, input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "csv")] + FileType::Csv(options) => Box::new(CsvSink::new( + tx.clone(), + options.clone(), + input_schema.as_ref(), + )?) as Box, + #[allow(unreachable_patterns)] + _ => unreachable!(), + }, #[cfg(feature = "cloud")] SinkType::Cloud { uri, diff --git a/crates/polars-plan/src/dot.rs b/crates/polars-plan/src/dot.rs index f7151bed2ff7..201bb1f92270 100644 --- a/crates/polars-plan/src/dot.rs +++ b/crates/polars-plan/src/dot.rs @@ -400,6 +400,7 @@ impl LogicalPlan { SinkType::File { .. } => "SINK (FILE)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (CLOUD)", + SinkType::Sender { .. } => "SINK (SENDER)", }, }; self.write_dot(acc_str, prev_node, current_node, id_map)?; diff --git a/crates/polars-plan/src/logical_plan/alp.rs b/crates/polars-plan/src/logical_plan/alp.rs index 13328e263a90..620ea87dc77e 100644 --- a/crates/polars-plan/src/logical_plan/alp.rs +++ b/crates/polars-plan/src/logical_plan/alp.rs @@ -166,6 +166,7 @@ impl ALogicalPlan { SinkType::File { .. } => "sink (file)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "sink (cloud)", + SinkType::Sender { .. } => "sink (sender)", }, } } diff --git a/crates/polars-plan/src/logical_plan/format.rs b/crates/polars-plan/src/logical_plan/format.rs index b8291418743f..e310946dcc40 100644 --- a/crates/polars-plan/src/logical_plan/format.rs +++ b/crates/polars-plan/src/logical_plan/format.rs @@ -230,6 +230,7 @@ impl LogicalPlan { SinkType::File { .. } => "SINK (file)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", + SinkType::Sender { .. } => "SINK (sender)", }; write!(f, "{:indent$}{}", "", name)?; input._format(f, sub_indent) diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index d1d97709b247..96db8c070f8b 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -49,7 +49,7 @@ pub use functions::*; pub use iterator::*; pub use lit::*; pub use optimizer::*; -pub use options::FileType; +pub use options::{FileType, SinkSender}; pub use schema::*; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 83be36cdba39..8953eb522e2d 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -301,10 +301,70 @@ pub struct AnonymousScanOptions { pub fmt_str: &'static str, } +/// Allows for custom sink implementations. This can be used +/// to stream data to a remote location or to a different thread. +pub trait SinkSender: 'static + std::fmt::Debug + Send + Sync { + /// Send a buffer of bytes to whatever data structure is used to write to. + /// This can be a file, a network connection or a channel. + fn sink_send(&self, buf: &[u8]) -> PolarsResult; + /// In cases where there's a buffer that needs to be flushed. + /// This is not always the case. + fn sink_flush(&self) -> PolarsResult<()>; +} + +#[derive(Debug, Clone)] +pub struct SinkSenderDyn(Arc); + +impl SinkSenderDyn { + pub fn new(tx: impl SinkSender + 'static) -> Self { + Self(Arc::from(tx)) + } +} + +impl std::ops::Deref for SinkSenderDyn { + type Target = dyn SinkSender; + fn deref(&self) -> &Self::Target { + self.0.as_ref() + } +} + +impl std::io::Write for SinkSenderDyn { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.sink_send(buf).map_or_else( + |e| Err(std::io::Error::new(std::io::ErrorKind::Other, e)), + |_| Ok(buf.len()), + ) + } + fn flush(&mut self) -> std::io::Result<()> { + self.sink_flush().map_or_else( + |e| Err(std::io::Error::new(std::io::ErrorKind::Other, e)), + |_| Ok(()), + ) + } +} + +#[cfg(feature = "serde")] +impl Serialize for SinkSenderDyn { + fn serialize(&self, _serializer: S) -> Result { + Err(serde::ser::Error::custom("cannot serialize SinkSenderDyn")) + } +} + +#[cfg(feature = "serde")] +impl<'de> Deserialize<'de> for SinkSenderDyn { + fn deserialize>(_deserializer: D) -> Result { + Err(serde::de::Error::custom("cannot deserialize SinkSenderDyn")) + } +} + #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Clone, Debug)] pub enum SinkType { Memory, + Sender { + tx: SinkSenderDyn, + file_type: FileType, + }, File { path: Arc, file_type: FileType,