Skip to content

Commit

Permalink
feat(rust): adds sink_sender for custom streaming methods
Browse files Browse the repository at this point in the history
  • Loading branch information
andyquinterom committed Sep 25, 2023
1 parent a11206d commit d82d665
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 17 deletions.
3 changes: 3 additions & 0 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ impl Display for ErrString {

#[derive(Debug, thiserror::Error)]
pub enum PolarsError {
#[error("error sending data: {0}")]
SendError(ErrString),
#[error(transparent)]
ArrowError(Box<ArrowError>),
#[error("not found: {0}")]
Expand Down Expand Up @@ -113,6 +115,7 @@ impl PolarsError {
ShapeMismatch(msg) => ShapeMismatch(func(msg).into()),
StringCacheMismatch(msg) => StringCacheMismatch(func(msg).into()),
StructFieldNotFound(msg) => StructFieldNotFound(func(msg).into()),
SendError(msg) => SendError(func(msg).into()),
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,29 @@ impl LazyFrame {
Ok((out, timer_df))
}

/// Stream a query result into any sender that implements the `SinkSender` trait.
/// This is useful if the final result doesn't fit into memory. This
/// methods will return an error if the query cannot be completely done in a streaming
/// fashion.
pub fn sink_sender(mut self, tx: impl SinkSender, file_type: FileType) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::Sender {
tx: SinkSenderDyn::new(tx),
file_type,
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_parquet()` instead"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
}

/// Stream a query result into a parquet file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
Expand Down
5 changes: 4 additions & 1 deletion crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,11 @@ pub fn create_physical_plan(
SinkType::File{file_type, ..} => panic!(
"sink_{file_type:?} not yet supported in standard engine. Use 'collect().write_parquet()'"
),
SinkType::Sender { .. } => {
panic!("Sender Sink not supported in standard engine.")
}
#[cfg(feature = "cloud")]
SinkType::Cloud{..} => panic!("Cloud Sink not supported in standard engine.")
SinkType::Cloud{..} => panic!("Cloud Sink not supported in standard engine."),
}
}
Union { inputs, options } => {
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub(crate) use polars_ops::prelude::*;
pub use polars_ops::prelude::{JoinArgs, JoinType, JoinValidation};
pub use polars_plan::logical_plan::{
AnonymousScan, AnonymousScanOptions, FileType, Literal, LiteralValue, LogicalPlan, Null, NULL,
AnonymousScan, AnonymousScanOptions, FileType, Literal, LiteralValue, LogicalPlan, Null,
SinkSender, NULL,
};
#[cfg(feature = "csv")]
pub use polars_plan::prelude::CsvWriterOptions;
Expand Down
27 changes: 16 additions & 11 deletions crates/polars-pipe/src/executors/sinks/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<W: std::io::Write> SinkWriter for polars_io::ipc::BatchedWriter<W> {
}

#[cfg(feature = "csv")]
impl SinkWriter for polars_io::csv::BatchedWriter<std::fs::File> {
impl<W: std::io::Write> SinkWriter for polars_io::csv::BatchedWriter<W> {
fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
self.write_batch(df)
}
Expand All @@ -63,12 +63,11 @@ pub struct ParquetSink {}
impl ParquetSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(
path: &Path,
writer: impl std::io::Write + Send + Sync + 'static,
options: ParquetWriteOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let file = std::fs::File::create(path)?;
let writer = ParquetWriter::new(file)
let writer = ParquetWriter::new(writer)
.with_compression(options.compression)
.with_data_pagesize_limit(options.data_pagesize_limit)
.with_statistics(options.statistics)
Expand Down Expand Up @@ -145,9 +144,12 @@ pub struct IpcSink {}
#[cfg(feature = "ipc")]
impl IpcSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(path: &Path, options: IpcWriterOptions, schema: &Schema) -> PolarsResult<FilesSink> {
let file = std::fs::File::create(path)?;
let writer = IpcWriter::new(file)
pub fn new(
writer: impl std::io::Write + Send + Sync + 'static,
options: IpcWriterOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let writer = IpcWriter::new(writer)
.with_compression(options.compression)
.batched(schema)?;

Expand Down Expand Up @@ -176,9 +178,12 @@ pub struct CsvSink {}
#[cfg(feature = "csv")]
impl CsvSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(path: &Path, options: CsvWriterOptions, schema: &Schema) -> PolarsResult<FilesSink> {
let file = std::fs::File::create(path)?;
let writer = CsvWriter::new(file)
pub fn new(
writer: impl std::io::Write + Send + Sync + 'static,
options: CsvWriterOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let writer = CsvWriter::new(writer)
.has_header(options.has_header)
.with_delimiter(options.serialize_options.delimiter)
.with_line_terminator(options.serialize_options.line_terminator)
Expand All @@ -192,7 +197,7 @@ impl CsvSink {
.with_quote_style(options.serialize_options.quote_style)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send + Sync>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
Expand Down
28 changes: 25 additions & 3 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,48 @@ where
path, file_type, ..
} => {
let path = path.as_ref().as_path();
let file = std::fs::File::create(path)?;
match &file_type {
#[cfg(feature = "parquet")]
FileType::Parquet(options) => {
Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?)
Box::new(ParquetSink::new(file, *options, input_schema.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "ipc")]
FileType::Ipc(options) => {
Box::new(IpcSink::new(path, *options, input_schema.as_ref())?)
Box::new(IpcSink::new(file, *options, input_schema.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "csv")]
FileType::Csv(options) => {
Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?)
Box::new(CsvSink::new(file, options.clone(), input_schema.as_ref())?)
as Box<dyn SinkTrait>
},
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
},
SinkType::Sender { tx, file_type } => match &file_type {
#[cfg(feature = "parquet")]
FileType::Parquet(options) => Box::new(ParquetSink::new(
tx.clone(),
*options,
input_schema.as_ref(),
)?) as Box<dyn SinkTrait>,
#[cfg(feature = "ipc")]
FileType::Ipc(options) => {
Box::new(IpcSink::new(tx.clone(), *options, input_schema.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "csv")]
FileType::Csv(options) => Box::new(CsvSink::new(
tx.clone(),
options.clone(),
input_schema.as_ref(),
)?) as Box<dyn SinkTrait>,
#[allow(unreachable_patterns)]
_ => unreachable!(),
},
#[cfg(feature = "cloud")]
SinkType::Cloud {
uri,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ impl LogicalPlan {
SinkType::File { .. } => "SINK (FILE)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (CLOUD)",
SinkType::Sender { .. } => "SINK (SENDER)",
},
};
self.write_dot(acc_str, prev_node, current_node, id_map)?;
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl ALogicalPlan {
SinkType::File { .. } => "sink (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "sink (cloud)",
SinkType::Sender { .. } => "sink (sender)",
},
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl LogicalPlan {
SinkType::File { .. } => "SINK (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (cloud)",
SinkType::Sender { .. } => "SINK (sender)",
};
write!(f, "{:indent$}{}", "", name)?;
input._format(f, sub_indent)
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub use functions::*;
pub use iterator::*;
pub use lit::*;
pub use optimizer::*;
pub use options::FileType;
pub use options::{FileType, SinkSender};
pub use schema::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand Down
60 changes: 60 additions & 0 deletions crates/polars-plan/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,70 @@ pub struct AnonymousScanOptions {
pub fmt_str: &'static str,
}

/// Allows for custom sink implementations. This can be used
/// to stream data to a remote location or to a different thread.
pub trait SinkSender: 'static + std::fmt::Debug + Send + Sync {
/// Send a buffer of bytes to whatever data structure is used to write to.
/// This can be a file, a network connection or a channel.
fn sink_send(&self, buf: &[u8]) -> PolarsResult<usize>;
/// In cases where there's a buffer that needs to be flushed.
/// This is not always the case.
fn sink_flush(&self) -> PolarsResult<()>;
}

#[derive(Debug, Clone)]
pub struct SinkSenderDyn(Arc<dyn SinkSender>);

impl SinkSenderDyn {
pub fn new(tx: impl SinkSender + 'static) -> Self {
Self(Arc::from(tx))
}
}

impl std::ops::Deref for SinkSenderDyn {
type Target = dyn SinkSender;
fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}

impl std::io::Write for SinkSenderDyn {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.sink_send(buf).map_or_else(
|e| Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
|_| Ok(buf.len()),
)
}
fn flush(&mut self) -> std::io::Result<()> {
self.sink_flush().map_or_else(
|e| Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
|_| Ok(()),
)
}
}

#[cfg(feature = "serde")]
impl Serialize for SinkSenderDyn {
fn serialize<S: serde::Serializer>(&self, _serializer: S) -> Result<S::Ok, S::Error> {
Err(serde::ser::Error::custom("cannot serialize SinkSenderDyn"))
}
}

#[cfg(feature = "serde")]
impl<'de> Deserialize<'de> for SinkSenderDyn {
fn deserialize<D: serde::Deserializer<'de>>(_deserializer: D) -> Result<Self, D::Error> {
Err(serde::de::Error::custom("cannot deserialize SinkSenderDyn"))
}
}

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, Debug)]
pub enum SinkType {
Memory,
Sender {
tx: SinkSenderDyn,
file_type: FileType,
},
File {
path: Arc<PathBuf>,
file_type: FileType,
Expand Down

0 comments on commit d82d665

Please sign in to comment.