From 6224d8443773eafec6ca9691f49272ef453a8764 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Thu, 25 Jul 2024 15:29:36 +0200 Subject: [PATCH] Add shortcircuit for all set and none set --- .../polars-parquet/src/arrow/write/pages.rs | 135 ++++++++++++++---- 1 file changed, 104 insertions(+), 31 deletions(-) diff --git a/crates/polars-parquet/src/arrow/write/pages.rs b/crates/polars-parquet/src/arrow/write/pages.rs index e2bbb2f3ee9b..c39093115a15 100644 --- a/crates/polars-parquet/src/arrow/write/pages.rs +++ b/crates/polars-parquet/src/arrow/write/pages.rs @@ -271,11 +271,18 @@ fn to_nested_recursive( fn expand_list_validity<'a, O: Offset>( array: &'a ListArray, - validity: Option, - arrays: &mut VecDeque<(&'a dyn Array, Option)>, + validity: BitmapState, + array_stack: &mut VecDeque<(&'a dyn Array, BitmapState)>, ) { - let Some(list_validity) = validity else { - arrays.push_back((array.values().as_ref(), validity)); + let BitmapState::SomeSet(list_validity) = validity else { + array_stack.push_back(( + array.values().as_ref(), + match validity { + BitmapState::AllSet => BitmapState::AllSet, + BitmapState::SomeSet(_) => unreachable!(), + BitmapState::AllUnset(_) => BitmapState::AllUnset(array.values().len()), + }, + )); return; }; @@ -302,7 +309,67 @@ fn expand_list_validity<'a, O: Offset>( let validity = validity.freeze(); - arrays.push_back((array.values().as_ref(), Some(validity))); + array_stack.push_back((array.values().as_ref(), BitmapState::SomeSet(validity))); +} + +#[derive(Clone)] +enum BitmapState { + AllSet, + SomeSet(Bitmap), + AllUnset(usize), +} + +impl From> for BitmapState { + fn from(bm: Option<&Bitmap>) -> Self { + let Some(bm) = bm else { + return Self::AllSet; + }; + + let null_count = bm.unset_bits(); + + if null_count == 0 { + Self::AllSet + } else if null_count == bm.len() { + Self::AllUnset(bm.len()) + } else { + Self::SomeSet(bm.clone()) + } + } +} + +impl From for Option { + fn from(bms: BitmapState) -> Self { + match bms { + BitmapState::AllSet => None, + BitmapState::SomeSet(bm) => Some(bm), + BitmapState::AllUnset(len) => Some(Bitmap::new_zeroed(len)), + } + } +} + +impl std::ops::BitAnd for &BitmapState { + type Output = BitmapState; + + fn bitand(self, rhs: Self) -> Self::Output { + use BitmapState as B; + match (self, rhs) { + (B::AllSet, B::AllSet) => B::AllSet, + (B::AllSet, B::SomeSet(v)) | (B::SomeSet(v), B::AllSet) => B::SomeSet(v.clone()), + (B::SomeSet(lhs), B::SomeSet(rhs)) => { + let result = lhs & rhs; + let null_count = result.unset_bits(); + + if null_count == 0 { + B::AllSet + } else if null_count == result.len() { + B::AllUnset(result.len()) + } else { + B::SomeSet(result) + } + }, + (B::AllUnset(len), _) | (_, B::AllUnset(len)) => B::AllUnset(*len), + } + } } /// Convert [`Array`] to a `Vec>` leaves in DFS order. @@ -312,17 +379,13 @@ pub fn to_leaves(array: &dyn Array, leaves: &mut Vec>) { use PhysicalType as P; leaves.clear(); - let mut array_stack: VecDeque<(&dyn Array, Option)> = VecDeque::new(); + let mut array_stack: VecDeque<(&dyn Array, BitmapState)> = VecDeque::new(); - array_stack.push_back((array, None)); + array_stack.push_back((array, BitmapState::AllSet)); while let Some((array, parent_validity)) = array_stack.pop_front() { - let child_validity = array.validity().cloned(); - let validity = match (parent_validity, child_validity) { - (Some(parent), Some(child)) => Some((&parent) & (&child)), - (Some(v), None) | (None, Some(v)) => Some(v), - (None, None) => None, - }; + let child_validity = BitmapState::from(array.validity()); + let validity = (&child_validity) & (&parent_validity); match array.data_type().to_physical_type() { P::Struct => { @@ -347,32 +410,42 @@ pub fn to_leaves(array: &dyn Array, leaves: &mut Vec>) { P::FixedSizeList => { let array = array.as_any().downcast_ref::().unwrap(); + let BitmapState::SomeSet(fsl_validity) = validity else { + array_stack.push_back(( + array.values().as_ref(), + match validity { + BitmapState::AllSet => BitmapState::AllSet, + BitmapState::SomeSet(_) => unreachable!(), + BitmapState::AllUnset(_) => BitmapState::AllUnset(array.values().len()), + }, + )); + continue; + }; + let num_values = array.values().len(); let size = array.size(); - let validity = validity.map(|fsl_validity| { - let mut validity = MutableBitmap::with_capacity(num_values); - let mut fsl_validity_iter = fsl_validity.iter(); + let mut validity = MutableBitmap::with_capacity(num_values); + let mut fsl_validity_iter = fsl_validity.iter(); - let mut idx = 0; - while fsl_validity_iter.num_remaining() > 0 { - let num_ones = fsl_validity_iter.take_leading_ones(); - let num_elements = num_ones * size; - validity.extend_constant(num_elements, true); + let mut idx = 0; + while fsl_validity_iter.num_remaining() > 0 { + let num_ones = fsl_validity_iter.take_leading_ones(); + let num_elements = num_ones * size; + validity.extend_constant(num_elements, true); - idx += num_ones; + idx += num_ones; - let num_zeros = fsl_validity_iter.take_leading_zeros(); - let num_elements = num_zeros * size; - validity.extend_constant(num_elements, false); + let num_zeros = fsl_validity_iter.take_leading_zeros(); + let num_elements = num_zeros * size; + validity.extend_constant(num_elements, false); - idx += num_zeros; - } + idx += num_zeros; + } - debug_assert_eq!(idx, array.len()); + debug_assert_eq!(idx, array.len()); - validity.freeze() - }); + let validity = BitmapState::SomeSet(validity.freeze()); array_stack.push_back((array.values().as_ref(), validity)); }, @@ -391,7 +464,7 @@ pub fn to_leaves(array: &dyn Array, leaves: &mut Vec>) { | P::Dictionary(_) | P::BinaryView | P::Utf8View => { - leaves.push(array.with_validity(validity)); + leaves.push(array.with_validity(validity.into())); }, other => todo!("Writing {:?} to parquet not yet implemented", other),