Skip to content

Commit

Permalink
perf(rust): Better GC and push_view for binviews (#17627)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruihe774 authored Jul 17, 2024
1 parent 38321a5 commit 28d8196
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 235 deletions.
36 changes: 0 additions & 36 deletions crates/polars-arrow/src/array/binview/buffers.rs

This file was deleted.

30 changes: 28 additions & 2 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod private {
}
pub use iterator::BinaryViewValueIter;
pub use mutable::MutableBinaryViewArray;
use polars_utils::aliases::{InitHashMaps, PlHashMap};
use polars_utils::slice::GetSaferUnchecked;
use private::Sealed;

Expand Down Expand Up @@ -353,6 +354,22 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
self.total_buffer_len
}

fn total_unshared_buffer_len(&self) -> usize {
// XXX: it is O(n), not O(1).
// Given this function is only called in `maybe_gc()`,
// it may not be worthy to add an extra field for this.
self.buffers
.iter()
.map(|buf| {
if buf.shared_count_strong() == 1 {
buf.len()
} else {
0
}
})
.sum()
}

#[inline(always)]
pub fn len(&self) -> usize {
self.views.len()
Expand All @@ -367,7 +384,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
let buffers = self.buffers.as_ref();

for view in self.views.as_ref() {
unsafe { mutable.push_view(*view, buffers) }
unsafe { mutable.push_view_copied_unchecked(*view, buffers) }
}
mutable.freeze().with_validity(self.validity)
}
Expand All @@ -383,13 +400,21 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
return self;
}

if Arc::strong_count(&self.buffers) != 1 {
// There are multiple holders of this `buffers`.
// If we allow gc in this case,
// it may end up copying the same content multiple times.
return self;
}

// Subtract the maximum amount of inlined strings to get a lower bound
// on the number of buffer bytes needed (assuming no dedup).
let total_bytes_len = self.total_bytes_len();
let buffer_req_lower_bound = total_bytes_len.saturating_sub(self.len() * 12);

let lower_bound_mem_usage_post_gc = self.len() * 16 + buffer_req_lower_bound;
let cur_mem_usage = self.len() * 16 + self.total_buffer_len();
// Use unshared buffer len. Shared buffer won't be freed; no savings.
let cur_mem_usage = self.len() * 16 + self.total_unshared_buffer_len();
let savings_upper_bound = cur_mem_usage.saturating_sub(lower_bound_mem_usage_post_gc);

if savings_upper_bound >= GC_MINIMUM_SAVINGS
Expand All @@ -413,6 +438,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
phantom: Default::default(),
total_bytes_len: self.total_bytes_len.load(Ordering::Relaxed) as usize,
total_buffer_len: self.total_buffer_len,
stolen_buffers: PlHashMap::new(),
}
}
}
Expand Down
161 changes: 152 additions & 9 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::sync::Arc;

use hashbrown::hash_map::Entry;
use polars_error::PolarsResult;
use polars_utils::aliases::{InitHashMaps, PlHashMap};
use polars_utils::slice::GetSaferUnchecked;

use crate::array::binview::iterator::MutableBinaryViewValueIter;
Expand All @@ -19,15 +22,18 @@ const DEFAULT_BLOCK_SIZE: usize = 8 * 1024;
const MAX_EXP_BLOCK_SIZE: usize = 16 * 1024 * 1024;

pub struct MutableBinaryViewArray<T: ViewType + ?Sized> {
pub(super) views: Vec<View>,
pub(super) completed_buffers: Vec<Buffer<u8>>,
pub(super) in_progress_buffer: Vec<u8>,
pub(super) validity: Option<MutableBitmap>,
pub(super) phantom: std::marker::PhantomData<T>,
pub(crate) views: Vec<View>,
pub(crate) completed_buffers: Vec<Buffer<u8>>,
pub(crate) in_progress_buffer: Vec<u8>,
pub(crate) validity: Option<MutableBitmap>,
pub(crate) phantom: std::marker::PhantomData<T>,
/// Total bytes length if we would concatenate them all.
pub(super) total_bytes_len: usize,
pub(crate) total_bytes_len: usize,
/// Total bytes in the buffer (excluding remaining capacity)
pub(super) total_buffer_len: usize,
pub(crate) total_buffer_len: usize,
/// Mapping from `Buffer::deref()` to index in `completed_buffers`.
/// Used in `push_view()`.
pub(crate) stolen_buffers: PlHashMap<usize, u32>,
}

impl<T: ViewType + ?Sized> Clone for MutableBinaryViewArray<T> {
Expand All @@ -40,6 +46,7 @@ impl<T: ViewType + ?Sized> Clone for MutableBinaryViewArray<T> {
phantom: Default::default(),
total_bytes_len: self.total_bytes_len,
total_buffer_len: self.total_buffer_len,
stolen_buffers: PlHashMap::new(),
}
}
}
Expand Down Expand Up @@ -86,6 +93,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
phantom: Default::default(),
total_buffer_len: 0,
total_bytes_len: 0,
stolen_buffers: PlHashMap::new(),
}
}

Expand Down Expand Up @@ -135,8 +143,8 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
/// # Safety
/// - caller must allocate enough capacity
/// - caller must ensure the view and buffers match.
#[inline]
pub unsafe fn push_view(&mut self, v: View, buffers: &[Buffer<u8>]) {
/// - The array must not have validity.
pub(crate) unsafe fn push_view_copied_unchecked(&mut self, v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
Expand All @@ -152,6 +160,67 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
}
}

/// # Safety
/// - caller must allocate enough capacity
/// - caller must ensure the view and buffers match.
/// - The array must not have validity.
/// - caller must not mix use this function with other push functions.
pub unsafe fn push_view_unchecked(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
self.views.push_unchecked(v);
} else {
let buffer = buffers.get_unchecked_release(v.buffer_idx as usize);
let idx = match self.stolen_buffers.entry(buffer.deref().as_ptr() as usize) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
let idx = self.completed_buffers.len() as u32;
entry.insert(idx);
self.completed_buffers.push(buffer.clone());
self.total_buffer_len += buffer.len();
idx
},
};
v.buffer_idx = idx;
self.views.push_unchecked(v);
}
}

pub fn push_view(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
self.views.push(v);
} else {
// Do no mix use of push_view and push_value_ignore_validity -
// it causes fragmentation.
self.finish_in_progress();

let buffer = &buffers[v.buffer_idx as usize];
let idx = match self.stolen_buffers.entry(buffer.deref().as_ptr() as usize) {
Entry::Occupied(entry) => {
let idx = *entry.get();
let target_buffer = &self.completed_buffers[idx as usize];
debug_assert_eq!(buffer, target_buffer);
idx
},
Entry::Vacant(entry) => {
let idx = self.completed_buffers.len() as u32;
entry.insert(idx);
self.completed_buffers.push(buffer.clone());
self.total_buffer_len += buffer.len();
idx
},
};
v.buffer_idx = idx;
self.views.push(v);
}
if let Some(validity) = &mut self.validity {
validity.push(true)
}
}

#[inline]
pub fn push_value_ignore_validity<V: AsRef<T>>(&mut self, value: V) {
let bytes = value.as_ref().to_bytes();
Expand Down Expand Up @@ -312,6 +381,73 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.extend(iterator)
}

#[inline]
pub fn extend_views<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: Iterator<Item = Option<View>>,
{
self.reserve(iterator.size_hint().0);
for p in iterator {
match p {
Some(v) => self.push_view(v, buffers),
None => self.push_null(),
}
}
}

#[inline]
pub fn extend_views_trusted_len<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: TrustedLen<Item = Option<View>>,
{
self.extend_views(iterator, buffers);
}

#[inline]
pub fn extend_non_null_views<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: Iterator<Item = View>,
{
self.reserve(iterator.size_hint().0);
for v in iterator {
self.push_view(v, buffers);
}
}

#[inline]
pub fn extend_non_null_views_trusted_len<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: TrustedLen<Item = View>,
{
self.extend_non_null_views(iterator, buffers);
}

/// # Safety
/// Same as `push_view_unchecked()`.
#[inline]
pub unsafe fn extend_non_null_views_unchecked<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: Iterator<Item = View>,
{
self.reserve(iterator.size_hint().0);
for v in iterator {
self.push_view_unchecked(v, buffers);
}
}

/// # Safety
/// Same as `push_view_unchecked()`.
#[inline]
pub unsafe fn extend_non_null_views_trusted_len_unchecked<I>(
&mut self,
iterator: I,
buffers: &[Buffer<u8>],
) where
I: TrustedLen<Item = View>,
{
self.extend_non_null_views_unchecked(iterator, buffers);
}

#[inline]
pub fn from_iterator<I, P>(iterator: I) -> Self
where
Expand Down Expand Up @@ -352,6 +488,13 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.into()
}

#[inline]
pub fn freeze_with_dtype(self, dtype: ArrowDataType) -> BinaryViewArrayGeneric<T> {
let mut arr: BinaryViewArrayGeneric<T> = self.into();
arr.data_type = dtype;
arr
}

#[inline]
pub fn value(&self, i: usize) -> &T {
assert!(i < self.len());
Expand Down
Loading

0 comments on commit 28d8196

Please sign in to comment.