Skip to content

Commit

Permalink
feat: add testing of string/binary to 2.1 full-zip encoding and fix b…
Browse files Browse the repository at this point in the history
…ugs (#3418)

The bugs mostly originated from having `def` information but no `rep`
information since the variable full zip path was originally built to
support lists.
  • Loading branch information
westonpace authored Jan 28, 2025
1 parent bfacd7c commit 66b99fb
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 73 deletions.
109 changes: 66 additions & 43 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,9 @@ impl VariableFullZipDecoder {
while !databuf.is_empty() {
let data_start = unzipped_data.len();
let offset_start = offsets_data.len();
// We might have only-rep or only-def, neither, or both. They move at the same
// speed though so we only need one index into it
let repdef_start = rep.len().max(def.len());
// TODO: Kind of inefficient we parse the control word twice here
let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
databuf,
Expand All @@ -2279,31 +2282,40 @@ impl VariableFullZipDecoder {
databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];

if ctrl_desc.is_new_row {
self.repdef_starts.push(rep.len() - 1);
self.repdef_starts.push(repdef_start);
self.data_starts.push(data_start);
self.offset_starts.push(offset_start);
self.visible_item_counts.push(visible_item_count);
}
if ctrl_desc.is_visible {
visible_item_count += 1;
// Safety: Data should have at least bytes_per_length bytes remaining
debug_assert!(databuf.len() >= bytes_per_length);
let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
match out_bits_per_offset {
32 => {
offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes())
if ctrl_desc.is_valid_item {
// Safety: Data should have at least bytes_per_length bytes remaining
debug_assert!(databuf.len() >= bytes_per_length);
let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
match out_bits_per_offset {
32 => offsets_data
.extend_from_slice(&(current_offset as u32).to_le_bytes()),
64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
_ => unreachable!(),
};
databuf = &databuf[bytes_per_offset..];
unzipped_data.extend_from_slice(&databuf[..length as usize]);
databuf = &databuf[length as usize..];
current_offset += length;
} else {
// Null items still get an offset
match out_bits_per_offset {
32 => offsets_data
.extend_from_slice(&(current_offset as u32).to_le_bytes()),
64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
_ => unreachable!(),
}
64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
_ => unreachable!(),
};
databuf = &databuf[bytes_per_offset..];
unzipped_data.extend_from_slice(&databuf[..length as usize]);
databuf = &databuf[length as usize..];
current_offset += length;
}
}
}
}
self.repdef_starts.push(rep.len());
self.repdef_starts.push(rep.len().max(def.len()));
self.data_starts.push(unzipped_data.len());
self.offset_starts.push(offsets_data.len());
self.visible_item_counts.push(visible_item_count);
Expand All @@ -2324,11 +2336,14 @@ impl StructuralPageDecoder for VariableFullZipDecoder {
let start = self.current_idx;
let end = start + num_rows as usize;

let data_start = self.data_starts[start];
let data_end = self.data_starts[end];
let data = self
.data
.slice_with_length(data_start, data_end - data_start);
// This might seem a little peculiar. We are returning the entire data for every single
// batch. This is because the offsets are relative to the start of the data. In other words
// imagine we have a data buffer that is 100 bytes long and the offsets are [0, 10, 20, 30, 40]
// and we return in batches of two. The second set of offsets will be [20, 30, 40].
//
// So either we pay for a copy to normalize the offsets or we just return the entire data buffer
// which is slightly cheaper.
let data = self.data.borrow_and_clone();

let offset_start = self.offset_starts[start];
let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
Expand All @@ -2338,8 +2353,16 @@ impl StructuralPageDecoder for VariableFullZipDecoder {

let repdef_start = self.repdef_starts[start];
let repdef_end = self.repdef_starts[end];
let rep = self.rep.slice(repdef_start, repdef_end - repdef_start);
let def = self.def.slice(repdef_start, repdef_end - repdef_start);
let rep = if self.rep.is_empty() {
self.rep.clone()
} else {
self.rep.slice(repdef_start, repdef_end - repdef_start)
};
let def = if self.def.is_empty() {
self.def.clone()
} else {
self.def.slice(repdef_start, repdef_end - repdef_start)
};

let visible_item_counts_start = self.visible_item_counts[start];
let visible_item_counts_end = self.visible_item_counts[end];
Expand Down Expand Up @@ -3930,6 +3953,8 @@ impl PrimitiveStructuralEncoder {
}

// For variable-size data we encode < control word | length | data > for each value
//
// In addition, we create a second buffer, the repetition index
fn serialize_full_zip_variable(
mut variable: VariableWidthBlock,
mut repdef: ControlWordIterator,
Expand All @@ -3946,16 +3971,11 @@ impl PrimitiveStructuralEncoder {
+ bytes_per_offset * variable.num_values as usize;
let mut buf = Vec::with_capacity(len);

let max_rep_index_val = if repdef.has_repetition() {
len as u64
} else {
// Setting this to 0 means we won't write a repetition index
0
};
let max_rep_index_val = len as u64;
let mut rep_index_builder =
BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);

// TODO: byte pack the item lengths
// TODO: byte pack the item lengths with varint encoding
match bytes_per_offset {
4 => {
let offs = variable.offsets.borrow_to_typed_slice::<u32>();
Expand All @@ -3970,10 +3990,12 @@ impl PrimitiveStructuralEncoder {
}
if control.is_visible {
let window = windows_iter.next().unwrap();
buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
buf.extend_from_slice(
&variable.data[window[0] as usize..window[1] as usize],
);
if control.is_valid_item {
buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
buf.extend_from_slice(
&variable.data[window[0] as usize..window[1] as usize],
);
}
}
rep_offset = buf.len();
}
Expand All @@ -3991,18 +4013,22 @@ impl PrimitiveStructuralEncoder {
}
if control.is_visible {
let window = windows_iter.next().unwrap();
buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
buf.extend_from_slice(
&variable.data[window[0] as usize..window[1] as usize],
);
if control.is_valid_item {
buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
buf.extend_from_slice(
&variable.data[window[0] as usize..window[1] as usize],
);
}
}
rep_offset = buf.len();
}
}
_ => panic!("Unsupported offset size"),
}

debug_assert_eq!(buf.len(), len);
// We might have saved a few bytes by not copying lengths when the length was zero. However,
// if we are over `len` then we have a bug.
debug_assert!(buf.len() <= len);
// Put the final value in the rep index
// SAFETY: `zipped_data.len() == len`
unsafe {
Expand All @@ -4011,11 +4037,8 @@ impl PrimitiveStructuralEncoder {

let zipped_data = LanceBuffer::Owned(buf);
let rep_index = rep_index_builder.into_data();
let rep_index = if rep_index.is_empty() {
None
} else {
Some(LanceBuffer::Owned(rep_index))
};
debug_assert!(!rep_index.is_empty());
let rep_index = Some(LanceBuffer::Owned(rep_index));
SerializedFullZip {
values: zipped_data,
repetition_index: rep_index,
Expand Down
39 changes: 27 additions & 12 deletions rust/lance-encoding/src/encodings/physical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ pub mod tests {
};
use arrow_schema::{DataType, Field};

use lance_core::datatypes::{STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_MINIBLOCK};
use rstest::rstest;
use std::{collections::HashMap, sync::Arc, vec};

Expand Down Expand Up @@ -902,8 +903,19 @@ pub mod tests {
#[test_log::test(tokio::test)]
async fn test_binary(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
structural_encoding: &str,
#[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
) {
let field = Field::new("", DataType::Binary, false);
use lance_core::datatypes::STRUCTURAL_ENCODING_META_KEY;

let mut field_metadata = HashMap::new();
field_metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
structural_encoding.into(),
);

let field = Field::new("", data_type, false).with_metadata(field_metadata);
check_round_trip_encoding_random(field, version).await;
}

Expand All @@ -921,10 +933,22 @@ pub mod tests {

#[rstest]
#[test_log::test(tokio::test)]
async fn test_simple_utf8_binary(
async fn test_simple_binary(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
structural_encoding: &str,
#[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
) {
use lance_core::datatypes::STRUCTURAL_ENCODING_META_KEY;

let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();

let mut field_metadata = HashMap::new();
field_metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
structural_encoding.into(),
);

let test_cases = TestCases::default()
.with_range(0..2)
Expand All @@ -935,7 +959,7 @@ pub mod tests {
check_round_trip_encoding_of_data(
vec![Arc::new(string_array)],
&test_cases,
HashMap::new(),
field_metadata,
)
.await;
}
Expand Down Expand Up @@ -1062,15 +1086,6 @@ pub mod tests {
check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
}

#[rstest]
#[test_log::test(tokio::test)]
async fn test_binary_miniblock(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let field = Field::new("", DataType::Utf8, false);
check_round_trip_encoding_random(field, version).await;
}

#[test_log::test(tokio::test)]
async fn test_binary_dictionary_encoding() {
let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
Expand Down
Loading

0 comments on commit 66b99fb

Please sign in to comment.