Skip to content

Commit

Permalink
refactor(rust): add new iterator to (chunked)array collect methods (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Sep 15, 2023
1 parent 3c222f7 commit f420b61
Show file tree
Hide file tree
Showing 13 changed files with 1,218 additions and 335 deletions.
63 changes: 59 additions & 4 deletions crates/polars-arrow/src/trusted_len/push_unchecked.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use super::*;

pub trait TrustedLenPush<T> {
/// Will push an item and not check if there is enough capacity
/// Will push an item and not check if there is enough capacity.
///
/// # Safety
/// Caller must ensure the array has enough capacity to hold `T`.
unsafe fn push_unchecked(&mut self, value: T);

/// Extend the array with an iterator who's length can be trusted
/// Extend the array with an iterator who's length can be trusted.
fn extend_trusted_len<I: IntoIterator<Item = T, IntoIter = J>, J: TrustedLen>(
&mut self,
iter: I,
Expand All @@ -16,9 +16,16 @@ pub trait TrustedLenPush<T> {
}

/// # Safety
/// Caller must ensure the iterators reported length is correct
/// Caller must ensure the iterators reported length is correct.
unsafe fn extend_trusted_len_unchecked<I: IntoIterator<Item = T>>(&mut self, iter: I);

/// # Safety
/// Caller must ensure the iterators reported length is correct.
unsafe fn try_extend_trusted_len_unchecked<E, I: IntoIterator<Item = Result<T, E>>>(
&mut self,
iter: I,
) -> Result<(), E>;

fn from_trusted_len_iter<I: IntoIterator<Item = T, IntoIter = J>, J: TrustedLen>(
iter: I,
) -> Self
Expand All @@ -28,8 +35,28 @@ pub trait TrustedLenPush<T> {
unsafe { Self::from_trusted_len_iter_unchecked(iter) }
}
/// # Safety
/// Caller must ensure the iterators reported length is correct
/// Caller must ensure the iterators reported length is correct.
unsafe fn from_trusted_len_iter_unchecked<I: IntoIterator<Item = T>>(iter: I) -> Self;

fn try_from_trusted_len_iter<
E,
I: IntoIterator<Item = Result<T, E>, IntoIter = J>,
J: TrustedLen,
>(
iter: I,
) -> Result<Self, E>
where
Self: Sized,
{
unsafe { Self::try_from_trusted_len_iter_unchecked(iter) }
}
/// # Safety
/// Caller must ensure the iterators reported length is correct.
unsafe fn try_from_trusted_len_iter_unchecked<E, I: IntoIterator<Item = Result<T, E>>>(
iter: I,
) -> Result<Self, E>
where
Self: Sized;
}

impl<T> TrustedLenPush<T> for Vec<T> {
Expand All @@ -55,10 +82,38 @@ impl<T> TrustedLenPush<T> for Vec<T> {
self.set_len(self.len() + upper)
}

unsafe fn try_extend_trusted_len_unchecked<E, I: IntoIterator<Item = Result<T, E>>>(
&mut self,
iter: I,
) -> Result<(), E> {
let iter = iter.into_iter();
let upper = iter.size_hint().1.expect("must have an upper bound");
self.reserve(upper);

let mut dst = self.as_mut_ptr().add(self.len());
for value in iter {
std::ptr::write(dst, value?);
dst = dst.add(1)
}
self.set_len(self.len() + upper);
Ok(())
}

#[inline]
unsafe fn from_trusted_len_iter_unchecked<I: IntoIterator<Item = T>>(iter: I) -> Self {
let mut v = vec![];
v.extend_trusted_len_unchecked(iter);
v
}

unsafe fn try_from_trusted_len_iter_unchecked<E, I: IntoIterator<Item = Result<T, E>>>(
iter: I,
) -> Result<Self, E>
where
Self: Sized,
{
let mut v = vec![];
v.try_extend_trusted_len_unchecked(iter)?;
Ok(v)
}
}
171 changes: 171 additions & 0 deletions crates/polars-core/src/chunked_array/collect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
//! Methods for collecting into a ChunkedArray.
//!
//! For types that don't have dtype parameters:
//! iter.(try_)collect_ca(_trusted) (name)
//!
//! For all types:
//! iter.(try_)collect_ca(_trusted)_like (other_df) Copies name/dtype from other_df
//! iter.(try_)collect_ca(_trusted)_with_dtype (name, df)
//!
//! The try variants work on iterators of Results, the trusted variants do not
//! check the length of the iterator.

use std::sync::Arc;

use polars_arrow::trusted_len::TrustedLen;

use crate::chunked_array::ChunkedArray;
use crate::datatypes::{
ArrayCollectIterExt, ArrayFromIter, ArrayFromIterDtype, DataType, Field, PolarsDataType,
};

pub trait ChunkedCollectIterExt<T: PolarsDataType>: Iterator + Sized {
#[inline]
fn collect_ca_with_dtype(self, name: &str, dtype: DataType) -> ChunkedArray<T>
where
T::Array: ArrayFromIterDtype<Self::Item>,
{
let field = Arc::new(Field::new(name, dtype.clone()));
let arr = self.collect_arr_with_dtype(dtype);
ChunkedArray::from_chunk_iter_and_field(field, [arr])
}

#[inline]
fn collect_ca_like(self, name_dtype_src: &ChunkedArray<T>) -> ChunkedArray<T>
where
T::Array: ArrayFromIterDtype<Self::Item>,
{
let field = Arc::clone(&name_dtype_src.field);
let arr = self.collect_arr_with_dtype(field.dtype.clone());
ChunkedArray::from_chunk_iter_and_field(field, [arr])
}

#[inline]
fn collect_ca_trusted_with_dtype(self, name: &str, dtype: DataType) -> ChunkedArray<T>
where
T::Array: ArrayFromIterDtype<Self::Item>,
Self: TrustedLen,
{
let field = Arc::new(Field::new(name, dtype.clone()));
let arr = self.collect_arr_trusted_with_dtype(dtype);
ChunkedArray::from_chunk_iter_and_field(field, [arr])
}

#[inline]
fn collect_ca_trusted_like(self, name_dtype_src: &ChunkedArray<T>) -> ChunkedArray<T>
where
T::Array: ArrayFromIterDtype<Self::Item>,
Self: TrustedLen,
{
let field = Arc::clone(&name_dtype_src.field);
let arr = self.collect_arr_trusted_with_dtype(field.dtype.clone());
ChunkedArray::from_chunk_iter_and_field(field, [arr])
}

#[inline]
fn try_collect_ca_with_dtype<U, E>(
self,
name: &str,
dtype: DataType,
) -> Result<ChunkedArray<T>, E>
where
T::Array: ArrayFromIterDtype<U>,
Self: Iterator<Item = Result<U, E>>,
{
let field = Arc::new(Field::new(name, dtype.clone()));
let arr = self.try_collect_arr_with_dtype(dtype)?;
Ok(ChunkedArray::from_chunk_iter_and_field(field, [arr]))
}

#[inline]
fn try_collect_ca_like<U, E>(
self,
name_dtype_src: &ChunkedArray<T>,
) -> Result<ChunkedArray<T>, E>
where
T::Array: ArrayFromIterDtype<U>,
Self: Iterator<Item = Result<U, E>>,
{
let field = Arc::clone(&name_dtype_src.field);
let arr = self.try_collect_arr_with_dtype(field.dtype.clone())?;
Ok(ChunkedArray::from_chunk_iter_and_field(field, [arr]))
}

#[inline]
fn try_collect_ca_trusted_with_dtype<U, E>(
self,
name: &str,
dtype: DataType,
) -> Result<ChunkedArray<T>, E>
where
T::Array: ArrayFromIterDtype<U>,
Self: Iterator<Item = Result<U, E>> + TrustedLen,
{
let field = Arc::new(Field::new(name, dtype.clone()));
let arr = self.try_collect_arr_trusted_with_dtype(dtype)?;
Ok(ChunkedArray::from_chunk_iter_and_field(field, [arr]))
}

#[inline]
fn try_collect_ca_trusted_like<U, E>(
self,
name_dtype_src: &ChunkedArray<T>,
) -> Result<ChunkedArray<T>, E>
where
T::Array: ArrayFromIterDtype<U>,
Self: Iterator<Item = Result<U, E>> + TrustedLen,
{
let field = Arc::clone(&name_dtype_src.field);
let arr = self.try_collect_arr_trusted_with_dtype(field.dtype.clone())?;
Ok(ChunkedArray::from_chunk_iter_and_field(field, [arr]))
}
}

impl<T: PolarsDataType, I: Iterator> ChunkedCollectIterExt<T> for I {}

pub trait ChunkedCollectInferIterExt<T: PolarsDataType>: Iterator + Sized {
#[inline]
fn collect_ca(self, name: &str) -> ChunkedArray<T>
where
T::Array: ArrayFromIter<Self::Item>,
{
let field = Arc::new(Field::new(name, T::get_dtype()));
let arr = self.collect_arr();
ChunkedArray::from_chunk_iter_and_field(field, [arr])
}

#[inline]
fn collect_ca_trusted(self, name: &str) -> ChunkedArray<T>
where
T::Array: ArrayFromIter<Self::Item>,
Self: TrustedLen,
{
let field = Arc::new(Field::new(name, T::get_dtype()));
let arr = self.collect_arr_trusted();
ChunkedArray::from_chunk_iter_and_field(field, [arr])
}

#[inline]
fn try_collect_ca<U, E>(self, name: &str) -> Result<ChunkedArray<T>, E>
where
T::Array: ArrayFromIter<U>,
Self: Iterator<Item = Result<U, E>>,
{
let field = Arc::new(Field::new(name, T::get_dtype()));
let arr = self.try_collect_arr()?;
Ok(ChunkedArray::from_chunk_iter_and_field(field, [arr]))
}

#[inline]
fn try_collect_ca_trusted<U, E>(self, name: &str) -> Result<ChunkedArray<T>, E>
where
T::Array: ArrayFromIter<U>,
Self: Iterator<Item = Result<U, E>> + TrustedLen,
{
let field = Arc::new(Field::new(name, T::get_dtype()));
let arr = self.try_collect_arr_trusted()?;
Ok(ChunkedArray::from_chunk_iter_and_field(field, [arr]))
}
}

impl<T: PolarsDataType, I: Iterator> ChunkedCollectInferIterExt<T> for I {}
29 changes: 29 additions & 0 deletions crates/polars-core/src/chunked_array/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,35 @@ where
unsafe { Ok(Self::from_chunks(name, chunks?)) }
}

pub(crate) fn from_chunk_iter_and_field<I>(field: Arc<Field>, chunks: I) -> Self
where
I: IntoIterator,
T: PolarsDataType<Array = <I as IntoIterator>::Item>,
<I as IntoIterator>::Item: Array,
{
assert_eq!(
std::mem::discriminant(&T::get_dtype()),
std::mem::discriminant(&field.dtype)
);

let mut length = 0;
let chunks = chunks
.into_iter()
.map(|x| {
length += x.len();
Box::new(x) as Box<dyn Array>
})
.collect();

ChunkedArray {
field,
chunks,
phantom: PhantomData,
bit_settings: Default::default(),
length: length.try_into().unwrap(),
}
}

/// Create a new [`ChunkedArray`] from existing chunks.
///
/// # Safety
Expand Down
7 changes: 2 additions & 5 deletions crates/polars-core/src/chunked_array/list/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,11 @@ impl ListChunked {
where
V: PolarsDataType,
F: FnMut(Option<UnstableSeries<'a>>) -> Option<K> + Copy,
K: ArrayFromElementIter<ArrayType = V::Array>,
V::Array: ArrayFromIter<Option<K>>,
{
// TODO! make an amortized iter that does not flatten

// SAFETY: unstable series never lives longer than the iterator.
let element_iter = unsafe { self.amortized_iter().map(f) };
let array = K::array_from_iter(element_iter);
ChunkedArray::from_chunk_iter(self.name(), std::iter::once(array))
unsafe { self.amortized_iter().map(f).collect_ca(self.name()) }
}

/// Apply a closure `F` elementwise.
Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod ops;
pub mod arithmetic;
pub mod builder;
pub mod cast;
pub mod collect;
pub mod comparison;
pub mod float;
pub mod iterator;
Expand Down
Loading

0 comments on commit f420b61

Please sign in to comment.