Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement LazyFrame.sink_ndjson #10786

Merged
merged 26 commits into from
Nov 20, 2023

Conversation

fernandocast
Copy link
Contributor

@fernandocast fernandocast commented Aug 29, 2023

Closes #10762

Implementing new sink_json method to support json format in streaming mode.

crates/polars-lazy/src/frame/mod.rs Show resolved Hide resolved

// if we don't allow threads and we have udfs trying to acquire the gil from different
// threads we deadlock.
py.allow_threads(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this here is duplicated every sink.

Can we not do something like 1 generic sink method where you need to pass in an instance of a BatchedWriter?
That way we can much more easily add more types to sink?

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also agree on this one, will try to refactor this code as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at the code, I don't think we can do much on this part 😞 there are only 2-3 lines of duplicated code because of py.allow_threads call, it would possible to remove the duplicated code if pyo3 implements an allow_threads attribute so the execution of the whole function releases the GIL.
It would be nice to be able to pass a BatchedWriter instance but then we will be mixing API and task execution code. I think is a better design to keep a separation of concerns that enables the API and the task's code to evolve in their own way.

Copy link
Contributor

@svaningelgem svaningelgem Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll think about it a bit more tomorrow & come back if I would think of a certain way to handle it.
link for my own benefit: Parallelism

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into it. Nothing came to mind when I started to ponder about deadlocks and gil and stuff :-)

assert_frame_equal(df, expected)


def test_sink_json_should_support_with_options(io_files_path: Path, tmp_path: Path) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing is being tested in this method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, we are still finishing the testing 😄

@fernandocast fernandocast marked this pull request as ready for review September 5, 2023 00:41
@stinodego stinodego changed the title Feature sink json feat: Implement LazyFrame.sink_json Sep 5, 2023
@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars labels Sep 5, 2023
@fernandocast
Copy link
Contributor Author

fernandocast commented Sep 6, 2023

Hi everyone,

this pull request is ready, we got two failed checks nonetheless they are not in the files that we modified for this feature.

Any feedback or comments are welcome.

@abealcantara
Copy link
Contributor

abealcantara commented Sep 7, 2023

Some tests started to fail after doing the merge with main, we are going to fix them.

/// # Panics
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let fields = df.iter().map(|s| s.field().to_arrow()).collect::<Vec<_>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would make a class in between that handles all the generic stuff between JsonWriter & JsonLinesBatchedWriter. In the final classes, I would just implement an on_block_write and on_finish.

My 2ct :)

Copy link
Contributor

@abealcantara abealcantara Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we were thinking of something similar, that's why we implemented the BatchedWriter trait in the json module but we were thinking that it would be better to expose the SinkWriter trait, that is part of the pipeline execution or maybe a struct with some common code as you mentioned, to each of the io modules, so each module implements this interface and we can remove all the implementations currently in file_sink.rs. Of course this will create an extra dependency as the IO modules will now depend of a trait of the execution core, but I think this will make sense since we will only be exposing an interface or a collections of interfaces that need to be implemented by each module. Please let me know your thoughts on this 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can chime in on #11056? Because that is very much in line with what you are saying here, except on a broader scale.

Just asking ritchie for his opinion there, but with your knowledge of the framework, it might be you have more concrete ideas on how to handle it?

crates/polars-lazy/src/frame/mod.rs Show resolved Hide resolved

// if we don't allow threads and we have udfs trying to acquire the gil from different
// threads we deadlock.
py.allow_threads(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into it. Nothing came to mind when I started to ponder about deadlocks and gil and stuff :-)

@stinodego
Copy link
Member

I'll review this more in-depth later, but I already noticed a doc entry on the Python side is missing. So writing it down now before I forget :)

@fernandocast
Copy link
Contributor Author

I'll review this more in-depth later, but I already noticed a doc entry on the Python side is missing. So writing it down now before I forget :)

Hi @stinodego I hope you are doing well,
I was wondering if we can help adding this feature to Polars documentation, we appreciate if you can explain us how should be this process or perhaps share a link with us about how to contribute on documentation.

Regarding this PR, is there something else missing in order to merge it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have the JsonFormat enum, I think it may be easier to use if the batched writers were combined & accepted a JsonFormat as an input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it, could you check again?

@@ -155,6 +161,101 @@ where
}
}

pub trait BatchedWriter<W: Write> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse the SinkWriter trait here instead of adding a new trait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we changed the implementation to accept a JsonFormat as an input, BatchedWriter trait is not necessary anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a comment, It would be great if we can expose SinkWriter to be used by other crates. This way we can move all SinkWriter implementations to the corresponding modules. 😄

Comment on lines 195 to 198
let fields = df.iter().map(|s| s.field().to_arrow()).collect::<Vec<_>>();
let batches = df
.iter_chunks()
.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we use into_struct here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tried but the implementation does not seem very transparent, do you have an example?

@fernandocast fernandocast force-pushed the feature_sink_json branch 2 times, most recently from a863625 to 2e4e179 Compare September 27, 2023 22:19
Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your patience @fernandocast. It is going in now. :)

@ritchie46 ritchie46 merged commit 1bcdf00 into pola-rs:main Nov 20, 2023
26 checks passed
@fernandocast
Copy link
Contributor Author

Thank you for your patience @fernandocast. It is going in now. :)

My pleasure @ritchie46, my friend and I are convinced of the potential that Polars has.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement sink_json to reduce memory consumption
6 participants