Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option for running polars-gpu engine #137

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ run-polars-no-env: ## Run Polars benchmarks
rm -rf data/tables/scale-$(SCALE_FACTOR)/*.tbl
python -m queries.polars

.PHONY: run-polars-gpu-no-env
run-polars-gpu-no-env: run-polars-no-env ## Run Polars CPU and GPU benchmarks
RUN_POLARS_GPU=true CUDA_MODULE_LOADING=EAGER python -m queries.polars

.PHONY: run-duckdb data/tables/
run-duckdb: .venv ## Run DuckDB benchmarks
$(VENV_BIN)/python -m queries.duckdb
Expand Down
101 changes: 92 additions & 9 deletions queries/polars/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import pathlib
import tempfile
from functools import partial
from typing import Literal

import polars as pl

Expand Down Expand Up @@ -60,20 +63,100 @@ def get_part_supp_ds() -> pl.LazyFrame:
return _scan_ds("partsupp")


def _preload_engine(engine):
with tempfile.TemporaryDirectory() as tmpdir:
# GPU engine has one-time lazy-loaded cost in IO, which we
# remove from timings here.
f = pathlib.Path(tmpdir) / "test.pq"
df = pl.DataFrame({"a": [1]})
df.write_parquet(f)
pl.scan_parquet(f).collect(engine=engine)


def obtain_engine_config() -> pl.GPUEngine | Literal["cpu"]:
if not settings.run.polars_gpu:
return "cpu"
import cudf_polars
import rmm
from cudf_polars.callback import set_device
from packaging import version

if version.parse(cudf_polars.__version__) < version.Version("24.10"):
import cudf._lib.pylibcudf as plc
else:
import pylibcudf as plc
Comment on lines +84 to +87
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once rapids 24.10 is released we can remove this conditional.


device = settings.run.polars_gpu_device
mr_type = settings.run.use_rmm_mr
with set_device(device):
# Must make sure to create memory resource on the requested device
free_memory, _ = rmm.mr.available_device_memory()
# Pick an initial pool of around 80% of the free device
# memory, must be multiple of 256
initial_pool_size = 256 * (int(free_memory * 0.8) // 256)
if mr_type == "cuda":
mr = rmm.mr.CudaMemoryResource()
elif mr_type == "cuda-pool":
mr = rmm.mr.PoolMemoryResource(
rmm.mr.CudaMemoryResource(), initial_pool_size=initial_pool_size
)
elif mr_type == "cuda-async":
mr = rmm.mr.CudaAsyncMemoryResource(initial_pool_size=initial_pool_size)
elif mr_type == "managed":
mr = rmm.mr.ManagedMemoryResource()
elif mr_type == "managed-pool":
mr = rmm.mr.PrefetchResourceAdaptor(
rmm.mr.PoolMemoryResource(
rmm.mr.ManagedMemoryResource(), initial_pool_size=initial_pool_size
)
)
else:
msg = "Unknown memory resource type"
raise RuntimeError(msg)
if mr_type in ("managed", "managed-pool"):
for typ in [
"column_view::get_data",
"mutable_column_view::get_data",
"gather",
"hash_join",
]:
plc.experimental.enable_prefetching(typ)

return pl.GPUEngine(device=device, memory_resource=mr, raise_on_fail=True)


def run_query(query_number: int, lf: pl.LazyFrame) -> None:
streaming = settings.run.polars_streaming
eager = settings.run.polars_eager
gpu = settings.run.polars_gpu

if (eager or streaming) and gpu:
msg = "polars-gpu engine does not support eager or streaming"
raise ValueError(msg)
if settings.run.polars_show_plan:
print(lf.explain(streaming=streaming, optimized=eager))

query = partial(lf.collect, streaming=streaming, no_optimization=eager)

library_name = "polars" if not eager else "polars-eager"
run_query_generic(
query,
query_number,
library_name,
library_version=pl.__version__,
query_checker=check_query_result_pl,
engine = obtain_engine_config()
# Eager load engine backend, so we don't time that.
_preload_engine(engine)
query = partial(
lf.collect, streaming=streaming, no_optimization=eager, engine=engine
)

if gpu:
library_name = f"polars-gpu-{settings.run.use_rmm_mr}"
elif eager:
library_name = "polars-eager"
else:
library_name = "polars"

try:
run_query_generic(
query,
query_number,
library_name,
library_version=pl.__version__,
query_checker=check_query_result_pl,
)
except Exception as e:
print(f"q{query_number} FAILED\n{e}")
6 changes: 6 additions & 0 deletions requirements-polars-gpu.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1.8.2 introduces plan optimisations that produce inequality joins
# that are not yet implemented in cudf-polars
polars[gpu]<1.8.2
packaging

-r requirements-polars-only.txt
12 changes: 12 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class Run(BaseSettings):
polars_show_plan: bool = False
polars_eager: bool = False
polars_streaming: bool = False
polars_gpu: bool = False # Use GPU engine?
polars_gpu_device: int = 0 # The GPU device to run on for polars GPU
# Which style of GPU memory resource to use
# cuda -> cudaMalloc
# cuda-pool -> Pool suballocator wrapped around cudaMalloc
# managed -> cudaMallocManaged
# managed-pool -> Pool suballocator wrapped around cudaMallocManaged
# cuda-async -> cudaMallocAsync (comes with pool)
# See https://docs.rapids.ai/api/rmm/stable/ for details on RMM memory resources
use_rmm_mr: Literal[
"cuda", "cuda-pool", "managed", "managed-pool", "cuda-async"
] = "cuda-async"

modin_memory: int = 8_000_000_000 # Tune as needed for optimal performance

Expand Down
Loading