Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Ballista python #1069

Closed
wants to merge 2 commits into from

Conversation

milenkovicm
Copy link
Contributor

@milenkovicm milenkovicm commented Oct 11, 2024

As part of effort outlined in #1066 and #1067 this PR removes python crate.

Relates to: #1066 & #1067

Which issue does this PR close?

Closes #.

Rationale for this change

We should focus effort on providing support for DataFusion python rather than maintaining this crate

What changes are included in this PR?

Are there any user-facing changes?

Python API has been removed

@milenkovicm milenkovicm changed the title Remove Ballista Python Remove Ballista python Oct 11, 2024
@andygrove
Copy link
Member

docs also need updating e.g. https://datafusion.apache.org/ballista/user-guide/python.html

@andygrove
Copy link
Member

If we remove the Python bindings, how will users submit queries? I currently rely on them to run benchmarks, for example.

Yes, users could start writing and compiling Rust, but I think most data scientists/engineers are much more comfortable with Python.

@milenkovicm
Copy link
Contributor Author

I guess we should push making datafusion python running on ballista. We could keep current bindings until we make DF python supporting ballista

@milenkovicm
Copy link
Contributor Author

As follows up work I'd try to use SessionContext instead of BallistaContext, if that works we could deprecate python and ballista context at the same time. Would that make sense @andygrove ?

@andygrove
Copy link
Member

As follows up work I'd try to use SessionContext instead of BallistaContext, if that works we could deprecate python and ballista context at the same time. Would that make sense @andygrove ?

BallistaContext is a thin wrapper around SessionContext:

pub struct BallistaContext {
    state: Arc<Mutex<BallistaContextState>>,
    context: Arc<SessionContext>,
}

To make SessionContext execute queries against Ballista we call with_query_planner and provide a BallistaQueryPlanner.

For Python, I think the trick is to somehow use the DataFusion Python bindings but pass the Ballista SessionContext to it?

@andygrove
Copy link
Member

maybe @timsaucer or @Michael-J-Ward can help with this conversation?

@timsaucer
Copy link

I'm not quite sure - I haven't looked at ballista since you recommended I look at ray instead. Taking a very quick look my guess is that we'd need to do something similar to what I'm working on with the FFI table provider. I'm sorry I can't be of more help right now.

@milenkovicm
Copy link
Contributor Author

milenkovicm commented Oct 12, 2024

I've done quick POC in https://github.com/milenkovicm/arrow-ballista/tree/poc_client_interface where I replaced BallistaContext with SessionContext using ballista query panner, and it works. I haven't put much effort in verification, example with sql works, no problem

Also,

use ballista::ext::BallistaExt;
use datafusion::{execution::options::ParquetReadOptions, prelude::SessionContext};

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    //
    // should we remove BallistaContext ?!
    //
    let ctx : SessionContext = SessionContext::ballista_standalone().await?;

    ctx.register_parquet(
        "test",
        &format!("{testdata}/alltypes_plain.parquet"),
        ParquetReadOptions::default(),
    )
    .await?;

    let df = ctx.sql("select * from test").await?;
    df.write_csv(
        "/directory/to_write/csv",
        Default::default(),
        Default::default(),
    )
    .await?;

    Ok(())
}

Resolves and executes plan, writes the file, but unfortunately file does not make sense (some kind of binary, not sure whats the issue, will have a look once I get through current PRs):

[DEBUG datafusion::physical_planner] Optimized physical plan:
    DataSinkExec: sink=CsvSink(file_groups=[])
      ParquetExec: file_groups={1 group: [[/arrow-ballista/examples/testdata/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col]
    
    
[INFO  ballista_scheduler::planner] planning query stages for job m1UAyt4
[INFO  ballista_scheduler::state::task_manager] Submitting execution graph: ExecutionGraph[job_id=m1UAyt4, session_id=e294befd-26ce-4927-b447-e0779a1fcd6f, available_tasks=0, is_successful=false]
    =========ResolvedStage[stage_id=1.0, partitions=1]=========
    ShuffleWriterExec: None
      DataSinkExec: sink=CsvSink(file_groups=[])
        ParquetExec: file_groups={1 group: [[/arrow-ballista/examples/testdata/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col]

[INFO  ballista_scheduler::display] === [m1UAyt4/1] Stage finished, physical plan with metrics ===
    ShuffleWriterExec: None, metrics=[output_rows=1, input_rows=1, repart_time=1ns, write_time=19.28307ms]
      DataSinkExec: sink=CsvSink(file_groups=[]), metrics=[output_rows=8, elapsed_compute=1ns, bytes_scanned=671, num_predicate_creation_errors=0, page_index_rows_filtered=0, row_groups_matched_statistics=0, predicate_evaluation_errors=0, row_groups_matched_bloom_filter=0, row_groups_pruned_statistics=0, pushdown_rows_filtered=0, row_groups_pruned_bloom_filter=0, file_open_errors=0, file_scan_errors=0, time_elapsed_opening=13.991107ms, time_elapsed_processing=12.901754ms, time_elapsed_scanning_total=1.801347ms, time_elapsed_scanning_until_data=1.628914ms, page_index_eval_time=2ns, pushdown_eval_time=2ns]
        ParquetExec: file_groups={1 group: [[/arrow-ballista/examples/testdata/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], metrics=[]

I guess if we have SessionContext it could be used with DataFusion Python, haven't done much with DF Python

@milenkovicm
Copy link
Contributor Author

I have created #1081 in which i'll try to replace BallistaContext with SessionContext

@milenkovicm
Copy link
Contributor Author

If you agree @andygrove I'll close this task. But will bother two of you once I get #1081 in shape, to implement those change to python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants