Skip to content

Commit

Permalink
Merge branch 'main' into SparkSink
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Nov 13, 2024
2 parents fe64c68 + ec76db4 commit 5c80a11
Show file tree
Hide file tree
Showing 23 changed files with 1,533 additions and 196 deletions.
14 changes: 14 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,19 @@ message MiniBlockLayout {
ArrayEncoding value_compression = 3;
}

/// A layout used for pages where the data is large
///
/// In this case the cost of transposing the data is relatively small (compared to the cost of writing the data)
/// and so we just zip the buffers together
message FullZipLayout {
// The number of bits of repetition info (0 if there is no repetition)
uint32 bits_rep = 1;
// The number of bits of definition info (0 if there is no definition)
uint32 bits_def = 2;
// Description of the compression of values
ArrayEncoding value_compression = 3;
}

/// A layout used for pages where all values are null
///
/// In addition, there can be no repetition levels and only a single definition level
Expand All @@ -327,5 +340,6 @@ message PageLayout {
oneof layout {
MiniBlockLayout mini_block_layout = 1;
AllNullLayout all_null_layout = 2;
FullZipLayout full_zip_layout = 3;
}
}
66 changes: 66 additions & 0 deletions rust/lance-core/src/utils/bit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,69 @@ pub fn pad_bytes_u64<const ALIGN: u64>(n: u64) -> u64 {
debug_assert!(is_pwr_two(ALIGN));
(ALIGN - (n & (ALIGN - 1))) & (ALIGN - 1)
}

// This is a lookup table for the log2 of the first 256 numbers
const LOG_TABLE_256: [u8; 256] = [
0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
];

/// Returns the number of bits needed to represent the given number
///
/// Inspired by https://graphics.stanford.edu/~seander/bithacks.html
pub fn log_2_ceil(val: u32) -> u32 {
assert!(val > 0);
let upper_half = val >> 16;
if upper_half == 0 {
let third_quarter = val >> 8;
if third_quarter == 0 {
// Use lowest 8 bits (upper 24 are 0)
LOG_TABLE_256[val as usize] as u32
} else {
// Use bits 16..24 (0..16 are 0)
LOG_TABLE_256[third_quarter as usize] as u32 + 8
}
} else {
let first_quarter = upper_half >> 8;
if first_quarter == 0 {
// Use bits 8..16 (0..8 are 0)
16 + LOG_TABLE_256[upper_half as usize] as u32
} else {
// Use most significant bits (it's a big number!)
24 + LOG_TABLE_256[first_quarter as usize] as u32
}
}
}

#[cfg(test)]

pub mod tests {
use crate::utils::bit::log_2_ceil;

#[test]
fn test_log_2_ceil() {
fn classic_approach(mut val: u32) -> u32 {
let mut counter = 0;
while val > 0 {
val >>= 1;
counter += 1;
}
counter
}

for i in 1..(16 * 1024) {
assert_eq!(log_2_ceil(i), classic_approach(i));
}
assert_eq!(log_2_ceil(50 * 1024), classic_approach(50 * 1024));
assert_eq!(
log_2_ceil(1024 * 1024 * 1024),
classic_approach(1024 * 1024 * 1024)
);
}
}
5 changes: 5 additions & 0 deletions rust/lance-encoding/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl LanceBuffer {
/// Reinterprets a LanceBuffer into a Vec<T>
///
/// If the underlying buffer is not properly aligned, this will involve a copy of the data
///
/// Note: doing this sort of re-interpretation generally makes assumptions about the endianness
/// of the data. Lance does not support big-endian machines so this is safe. However, if we end
/// up supporting big-endian machines in the future, then any use of this method will need to be
/// carefully reviewed.
pub fn borrow_to_typed_slice<T: ArrowNativeType>(&mut self) -> impl AsRef<[T]> {
let align = std::mem::align_of::<T>();
let is_aligned = self.as_ptr().align_offset(align) == 0;
Expand Down
36 changes: 35 additions & 1 deletion rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
}
}

pub struct FixedWidthDataBlockBuilder {
struct FixedWidthDataBlockBuilder {
bits_per_value: u64,
bytes_per_value: u64,
values: Vec<u8>,
Expand Down Expand Up @@ -493,6 +493,33 @@ impl FixedSizeListBlock {
}
}

struct FixedSizeListBlockBuilder {
inner: Box<dyn DataBlockBuilderImpl>,
dimension: u64,
}

impl FixedSizeListBlockBuilder {
fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
Self { inner, dimension }
}
}

impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
fn append(&mut self, data_block: &mut DataBlock, selection: Range<u64>) {
let selection = selection.start * self.dimension..selection.end * self.dimension;
let fsl = data_block.as_fixed_size_list_mut_ref().unwrap();
self.inner.append(fsl.child.as_mut(), selection);
}

fn finish(self: Box<Self>) -> DataBlock {
let inner_block = self.inner.finish();
DataBlock::FixedSizeList(FixedSizeListBlock {
child: Box::new(inner_block),
dimension: self.dimension,
})
}
}

/// A data block with no regular structure. There is no available spot to attach
/// validity / repdef information and it cannot be converted to Arrow without being
/// decoded
Expand Down Expand Up @@ -914,6 +941,13 @@ impl DataBlock {
todo!()
}
}
Self::FixedSizeList(inner) => {
let inner_builder = inner.child.make_builder(estimated_size_bytes);
Box::new(FixedSizeListBlockBuilder::new(
inner_builder,
inner.dimension,
))
}
_ => todo!(),
}
}
Expand Down
25 changes: 20 additions & 5 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ use crate::encodings::logical::r#struct::{
};
use crate::encodings::physical::binary::BinaryMiniBlockDecompressor;
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -454,8 +455,14 @@ pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
}

pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
pub trait PerValueDecompressor: std::fmt::Debug + Send + Sync {
/// Decompress one or more values
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
/// The number of bits in each value
///
/// Returns 0 if the data type is variable-width
///
/// Currently (and probably long term) this must be a multiple of 8
fn bits_per_value(&self) -> u64;
}

Expand All @@ -469,10 +476,10 @@ pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
description: &pb::ArrayEncoding,
) -> Result<Box<dyn MiniBlockDecompressor>>;

fn create_fixed_per_value_decompressor(
fn create_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>>;
) -> Result<Box<dyn PerValueDecompressor>>;

fn create_block_decompressor(
&self,
Expand Down Expand Up @@ -502,14 +509,22 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
}
}

fn create_fixed_per_value_decompressor(
fn create_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>> {
) -> Result<Box<dyn PerValueDecompressor>> {
match description.array_encoding.as_ref().unwrap() {
pb::array_encoding::ArrayEncoding::Flat(flat) => {
Ok(Box::new(ValueDecompressor::new(flat)))
}
pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
let items_decompressor =
self.create_per_value_decompressor(fsl.items.as_ref().unwrap())?;
Ok(Box::new(FslPerValueDecompressor::new(
items_decompressor,
fsl.dimension as u64,
)))
}
_ => todo!(),
}
}
Expand Down
Loading

0 comments on commit 5c80a11

Please sign in to comment.