Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Io plugin #94

Merged
merged 6 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ Cargo.lock
.idea/
venv/
target/
rust-toolchain.toml
rust-toolchain.toml
*.so
**/*.pyc
__pycache__/
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"
members = [
"example/derive_expression/expression_lib",
"example/extend_polars_python_dispatch/extend_polars",
"example/io_plugin/io_plugin",
"pyo3-polars",
"pyo3-polars-derive",
]
Expand All @@ -14,6 +15,12 @@ polars-ffi = { version = "0.41.0", default-features = false }
polars-plan = { version = "0.41.0", default-feautres = false }
polars-lazy = { version = "0.41.0", default-features = false }

[workspace.dependencies.arrow]
package = "polars-arrow"
version = "0.41.3"
path = "../polars/crates/polars-arrow"
default-features = false

[patch.crates-io]
polars = { git = "https://github.com/pola-rs/polars.git" }
polars-core = { git = "https://github.com/pola-rs/polars.git" }
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2020 Ritchie Vink
Copyright (c) 2024 Ritchie Vink

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct PigLatinKwargs {
fn pig_latinnify(inputs: &[Series], kwargs: PigLatinKwargs) -> PolarsResult<Series> {
let ca = inputs[0].str()?;
let out: StringChunked =
ca.apply_to_buffer(|value, output| pig_latin_str(value, kwargs.capitalize, output));
ca.apply_into_string_amortized(|value, output| pig_latin_str(value, kwargs.capitalize, output));
Ok(out.into_series())
}
```
Expand Down
2 changes: 1 addition & 1 deletion example/derive_expression/expression_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["cdylib"]

[dependencies]
polars = { workspace = true, features = ["fmt", "dtype-date", "timezones"], default-features = false }
pyo3 = { version = "0.21", features = ["abi3-py38"] }
pyo3 = { version = "0.22.2", features = ["abi3-py38"] }
pyo3-polars = { version = "*", path = "../../../pyo3-polars", features = ["derive"] }
rayon = "1.7.0"
serde = { version = "1", features = ["derive"] }
14 changes: 8 additions & 6 deletions example/derive_expression/expression_lib/src/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ fn pig_latin_str(value: &str, capitalize: bool, output: &mut String) {
#[polars_expr(output_type=String)]
fn pig_latinnify(inputs: &[Series], kwargs: PigLatinKwargs) -> PolarsResult<Series> {
let ca = inputs[0].str()?;
let out: StringChunked =
ca.apply_to_buffer(|value, output| pig_latin_str(value, kwargs.capitalize, output));
let out: StringChunked = ca.apply_into_string_amortized(|value, output| {
pig_latin_str(value, kwargs.capitalize, output)
});
Ok(out.into_series())
}

Expand Down Expand Up @@ -63,8 +64,9 @@ fn pig_latinnify_with_paralellism(
let ca = inputs[0].str()?;

if context.parallel() {
let out: StringChunked =
ca.apply_to_buffer(|value, output| pig_latin_str(value, kwargs.capitalize, output));
let out: StringChunked = ca.apply_into_string_amortized(|value, output| {
pig_latin_str(value, kwargs.capitalize, output)
});
Ok(out.into_series())
} else {
POOL.install(|| {
Expand All @@ -75,7 +77,7 @@ fn pig_latinnify_with_paralellism(
.into_par_iter()
.map(|(offset, len)| {
let sliced = ca.slice(offset as i64, len);
let out = sliced.apply_to_buffer(|value, output| {
let out = sliced.apply_into_string_amortized(|value, output| {
pig_latin_str(value, kwargs.capitalize, output)
});
out.downcast_iter().cloned().collect::<Vec<_>>()
Expand Down Expand Up @@ -155,7 +157,7 @@ fn append_kwargs(input: &[Series], kwargs: MyKwargs) -> PolarsResult<Series> {
let ca = input.str().unwrap();

Ok(ca
.apply_to_buffer(|val, buf| {
.apply_into_string_amortized(|val, buf| {
write!(
buf,
"{}-{}-{}-{}-{}",
Expand Down
2 changes: 1 addition & 1 deletion example/extend_polars_python_dispatch/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ run: install
source venv/bin/activate && python run.py

run-release: install-release
source venv/bin/activate && python run.py
source venv/bin/activate && python run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ crate-type = ["cdylib"]
polars = { workspace = true, features = ["fmt"] }
polars-core = { workspace = true }
polars-lazy = { workspace = true }
pyo3 = { version = "0.21", features = ["extension-module"] }
pyo3 = { version = "0.22.2", features = ["extension-module"] }
pyo3-polars = { version = "*", path = "../../../pyo3-polars", features = ["lazy"] }
rayon = "1.10"
25 changes: 25 additions & 0 deletions example/io_plugin/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

SHELL=/bin/bash

venv: ## Set up virtual environment
python3 -m venv venv
venv/bin/pip install -r requirements.txt

install: venv
unset CONDA_PREFIX && \
source venv/bin/activate && maturin develop -m io_plugin/Cargo.toml

install-release: venv
unset CONDA_PREFIX && \
source venv/bin/activate && maturin develop --release -m io_plugin/Cargo.toml

clean:
-@rm -r venv
-@cd extend_polars && cargo clean


run: install
source venv/bin/activate && python run.py

run-release: install-release
source venv/bin/activate && python run.py
15 changes: 15 additions & 0 deletions example/io_plugin/io_plugin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "io_plugin"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "io_plugin"
crate-type = ["cdylib"]

[dependencies]
polars = { workspace = true, features = ["fmt", "dtype-date", "timezones", "lazy"], default-features = false }
pyo3 = { version = "0.22.2", features = ["abi3-py38"] }
pyo3-polars = { version = "*", path = "../../../pyo3-polars", features = ["derive", "lazy"] }
rand = { version = "0.8.5", features = [] }
45 changes: 45 additions & 0 deletions example/io_plugin/io_plugin/io_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from .io_plugin import new_bernoulli, new_uniform, RandomSource
from typing import Any, Iterator
from polars.io.plugins import register_io_source
import polars as pl


def scan_random(samplers: list[Any], size: int = 1000) -> pl.LazyFrame:
def source_generator(
with_columns: list[str] | None,
predicate: pl.Expr | None,
n_rows: int | None,
batch_size: int | None,
) -> Iterator[pl.DataFrame]:
"""
Generator function that creates the source.
This function will be registered as IO source.
"""

new_size = size
if n_rows is not None and n_rows < size:
new_size = n_rows

src = RandomSource(samplers, batch_size, new_size)
if with_columns is not None:
src.set_with_columns(with_columns)

# Set the predicate.
predicate_set = True
if predicate is not None:
try:
src.try_set_predicate(predicate)
except pl.exceptions.ComputeError:
predicate_set = False

while (out := src.next()) is not None:
# If the source could not apply the predicate
# (because it wasn't able to deserialize it), we do it here.
if not predicate_set and predicate is not None:
out = out.filter(predicate)

yield out

# create src again to compute the schema
src = RandomSource(samplers, 0, 0)
return register_io_source(callable=source_generator, schema=src.schema())
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ requires = ["maturin>=1.0,<2.0"]
build-backend = "maturin"

[project]
name = "extend_polars"
requires-python = ">=3.7"
name = "io_plugin"
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
Expand Down
123 changes: 123 additions & 0 deletions example/io_plugin/io_plugin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
mod samplers;

use crate::samplers::PySampler;
use polars::prelude::*;
use pyo3::prelude::*;
use pyo3_polars::error::PyPolarsErr;
use pyo3_polars::{PyDataFrame, PyExpr, PySchema};

#[pyclass]
pub struct RandomSource {
columns: Vec<PySampler>,
size_hint: usize,
n_rows: usize,
predicate: Option<Expr>,
with_columns: Option<Vec<usize>>,
}

#[pymethods]
impl RandomSource {
#[new]
#[pyo3(signature = (columns, size_hint, n_rows))]
fn new_source(
columns: Vec<PySampler>,
size_hint: Option<usize>,
n_rows: Option<usize>,
) -> Self {
let n_rows = n_rows.unwrap_or(usize::MAX);
let size_hint = size_hint.unwrap_or(10_000);

Self {
columns,
size_hint,
n_rows,
predicate: None,
with_columns: None,
}
}

fn schema(&self) -> PySchema {
let schema = self
.columns
.iter()
.map(|s| {
let s = s.0.lock().unwrap();
Field::new(s.name(), s.dtype())
})
.collect::<Schema>();
PySchema(Arc::new(schema))
}

fn try_set_predicate(&mut self, predicate: PyExpr) {
self.predicate = Some(predicate.0);
}

fn set_with_columns(&mut self, columns: Vec<String>) {
let schema = self.schema().0;

let indexes = columns
.iter()
.map(|name| {
schema
.index_of(name.as_ref())
.expect("schema should be correct")
})
.collect();

self.with_columns = Some(indexes)
}

fn next(&mut self) -> PyResult<Option<PyDataFrame>> {
if self.n_rows > 0 {
// Apply projection pushdown.
// This prevents unneeded sampling.
let s_iter = if let Some(idx) = &self.with_columns {
Box::new(idx.iter().copied().map(|i| &self.columns[i]))
as Box<dyn Iterator<Item = _>>
} else {
Box::new(self.columns.iter())
};

let columns = s_iter
.map(|s| {
let mut s = s.0.lock().unwrap();

// Apply slice pushdown.
// This prevents unneeded sampling.
s.next_n(std::cmp::min(self.size_hint, self.n_rows))
})
.collect::<Vec<_>>();

let mut df = DataFrame::new(columns).map_err(PyPolarsErr::from)?;
self.n_rows = self.n_rows.saturating_sub(self.size_hint);

// Apply predicate pushdown.
// This is done after the fact, but there could be sources where this could be applied
// lower.
if let Some(predicate) = &self.predicate {
df = df
.lazy()
.filter(predicate.clone())
._with_eager(true)
.collect()
.map_err(PyPolarsErr::from)?;
}

Ok(Some(PyDataFrame(df)))
} else {
Ok(None)
}
}
}

#[pymodule]
fn io_plugin(m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<RandomSource>().unwrap();
m.add_class::<PySampler>().unwrap();
m.add_wrapped(wrap_pyfunction!(samplers::new_bernoulli))
.unwrap();
m.add_wrapped(wrap_pyfunction!(samplers::new_uniform))
.unwrap();

Ok(())
}
Loading
Loading