Skip to content

timsaucer/datafusion-ray

This branch is 7 commits behind apache/datafusion-ray:main.

Folders and files

NameName
Last commit message
Last commit date
Sep 30, 2024
Oct 22, 2024
Oct 19, 2024
Oct 14, 2024
Oct 22, 2024
Oct 22, 2024
Oct 4, 2024
Oct 19, 2024
Oct 7, 2024
Nov 3, 2024
Sep 19, 2024
Oct 19, 2024
Oct 4, 2024
Sep 30, 2024
Oct 19, 2024
Oct 19, 2024
Sep 30, 2024
Sep 30, 2024
Oct 11, 2024
Oct 19, 2024
Oct 19, 2024
Oct 19, 2024

Repository files navigation

DataFusion on Ray

This was originally a research project donated from ray-sql to evaluate performing distributed SQL queries from Python, using Ray and Apache DataFusion

DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation of Apache Arrow, Apache DataFusion, and Ray.

Comparison to other DataFusion projects

Comparison to DataFusion Ballista

  • Unlike DataFusion Ballista, DataFusion Ray does not provide its own distributed scheduler and instead relies on Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project.
  • DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first

Comparison to DataFusion Python

  • DataFusion Python provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends DataFusion Python to provide scalability across multiple nodes.

Example

Run the following example live in your browser using a Google Colab notebook.

import os
import ray

from datafusion_ray import DatafusionRayContext

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))

# Start a local cluster
ray.init(resources={"worker": 1})

# Create a context and register a table
ctx = DatafusionRayContext(2)
# Register either a CSV or Parquet file
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")

result_set = ctx.sql(
  "select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker"
)
for record_batch in result_set:
  print(record_batch.to_pandas())

Status

  • DataFusion Ray can run all queries in the TPC-H benchmark

Features

  • Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion
  • Support for CSV and Parquet files

Building

# prepare development environment (used to build wheel / install in development)
python3 -m venv venv
# activate the venv
source venv/bin/activate
# update pip itself if necessary
python -m pip install -U pip
# install dependencies (for Python 3.8+)
python -m pip install -r requirements-in.txt

Whenever rust code changes (your changes or via git pull):

# make sure you activate the venv using "source venv/bin/activate" first
maturin develop; python -m pytest 

Testing

Running local Rust tests require generating the tpch-data. This can be done by running the following commands:

export TPCH_TEST_PARTITIONS=1
export TPCH_SCALING_FACTOR=1
./scripts/gen-test-data.sh

This will generate data into a top-level data directory.

Tests can be run with:

export TPCH_DATA_PATH=`pwd`/data
cargo test

Benchmarking

Create a release build when running benchmarks, then use pip to install the wheel.

maturin develop --release

How to update dependencies

To change test dependencies, change the requirements.in and run

# install pip-tools (this can be done only once), also consider running in venv
python -m pip install pip-tools
python -m piptools compile --generate-hashes -o requirements-310.txt

To update dependencies, run with -U

python -m piptools compile -U --generate-hashes -o requirements-310.txt

More details here

Releases

No releases published

Packages

No packages published

Languages

  • Rust 65.1%
  • Python 32.6%
  • Shell 1.4%
  • Dockerfile 0.9%