Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Custom sink implementations #11315

Closed
wants to merge 3 commits into from

Conversation

andyquinterom
Copy link

Purpose

I want to build an asynchornous HTTP server that uses polars under the hood to manipulate some data frames. From my understanding the sink_* methods only allow writing to files but I want to be able to stream to an HTTP Response Body for example.

Since libraries like hyper use a Sender (as in mpsc) to build a Body I though creating a way to write to any sender would be awesome.

My implemetation

I exposed the FileType enum in order to now have to create different sink_* implementations, however this could be changed.

I also created a SinkSender trait that implements all the methods necesarry to create custom sink implementations.

Additionally, I created a SendError variant in PolarsError.

Example

In this example I placed a single iris.csv file, just for testing purposes.

use bytes::Bytes;
use polars::prelude::*;
use std::{
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    thread,
};

#[derive(Debug)]
struct MySender(SyncSender<Bytes>);

impl SinkSender for MySender {
    fn sink_send(&self, buf: &[u8]) -> PolarsResult<usize> {
        self.0.send(Bytes::copy_from_slice(buf)).map_or_else(
            |e| Err(PolarsError::SendError(e.to_string().into())),
            |_| Ok(buf.len()),
        )
    }
    fn sink_flush(&self) -> PolarsResult<()> {
        Ok(())
    }
}

fn spawn_reader_thread(rx: Receiver<Bytes>) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        while let Ok(buf) = rx.recv() {
            println!("{:?}", buf);
        }
    })
}

fn main() {
    let (tx, rx) = sync_channel::<Bytes>(0);
    let iris = LazyCsvReader::new("iris.csv")
        .has_header(true)
        .finish()
        .unwrap();

    let reader_job = spawn_reader_thread(rx);

    iris.sink_sender(MySender(tx), FileType::Ipc(IpcWriterOptions::default()))
        .unwrap();

    reader_job.join().unwrap();
}

@orlp
Copy link
Collaborator

orlp commented Sep 26, 2023

Why the SinkSender trait at all? Why not just Write?

@andyquinterom
Copy link
Author

Why the SinkSender trait at all? Why not just Write?

Using 'static + Write would require some big refactoring but I could try.

@stinodego stinodego changed the title Custom sink implementations feat(rust): Custom sink implementations Feb 9, 2024
@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature rust Related to Rust Polars labels Feb 9, 2024
@stinodego
Copy link
Member

Apologies for leaving this PR around so long without a review.

It is not completely clear to me what you're trying to achieve. Please make an issue detailing your request so we can discuss whether we want to include it in Polars.

Since this PR has gathered some conflicts by now and we're not sure yet if we want the functionality, I'll close it for now. Feel free to rebase and open a new PR if the related issue is accepted.

@stinodego stinodego closed this Feb 13, 2024
@fabiannagel
Copy link

I guess the idea is to provide an interface for people to implement their own sink implementations, e.g. for writing asynchronously to a HTTP socket rather than to disk (CSV, Parquet, ...). Does this not make sense in Polars? I'm new to the library but also looking for a way to connect my lazy evaluation pipeline to a consumer without having to materialize the entire dataframe. Streaming, or a simple Python generator, essentially. Do you have some thoughts on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants