Skip to content

Commit

Permalink
fix nullable field of equality delete writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 23, 2024
1 parent a0f607c commit 690b2cf
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ apache-avro = "0.17"
array-init = "2"
arrow-arith = { version = "53" }
arrow-array = { version = "53" }
arrow-buffer = { version = "53" }
arrow-cast = { version = "53" }
arrow-ord = { version = "53" }
arrow-schema = { version = "53" }
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ apache-avro = { workspace = true }
array-init = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-row = { workspace = true }
Expand Down
9 changes: 7 additions & 2 deletions crates/iceberg/src/arrow/record_batch_projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

use std::sync::Arc;

use arrow_array::{ArrayRef, RecordBatch, StructArray};
use arrow_array::{make_array, ArrayRef, RecordBatch, StructArray};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};

use crate::error::Result;
Expand Down Expand Up @@ -138,6 +139,7 @@ impl RecordBatchProjector {
fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result<ArrayRef> {
let mut rev_iterator = field_index.iter().rev();
let mut array = batch[*rev_iterator.next().unwrap()].clone();
let mut null_buffer = array.logical_nulls();
for idx in rev_iterator {
array = array
.as_any()
Expand All @@ -148,8 +150,11 @@ impl RecordBatchProjector {
))?
.column(*idx)
.clone();
null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref());
}
Ok(array)
Ok(make_array(
array.to_data().into_builder().nulls(null_buffer).build()?,
))
}
}

Expand Down
19 changes: 4 additions & 15 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
&original_arrow_schema,
&equality_ids,
// The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids
// and https://iceberg.apache.org/spec/#equality-delete-files
// - The identifier field ids must be used for primitive types.
// - The identifier field ids must not be used for floating point types or nullable fields.
// - The identifier field ids can be nested field of struct but not nested field of nullable struct.
|field| {
// Only primitive type is allowed to be used for identifier field ids
if field.is_nullable()
|| field.data_type().is_nested()
if field.data_type().is_nested()
|| matches!(
field.data_type(),
DataType::Float16 | DataType::Float32 | DataType::Float64
Expand All @@ -80,7 +79,7 @@ impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
.map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?,
))
},
|field: &Field| !field.is_nullable(),
|_field: &Field| true,
)?;
Ok(Self {
inner,
Expand Down Expand Up @@ -157,7 +156,7 @@ mod test {
use std::sync::Arc;

use arrow_array::types::Int32Type;
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StructArray};
use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StructArray};
use arrow_schema::DataType;
use arrow_select::concat::concat_batches;
use itertools::Itertools;
Expand Down Expand Up @@ -481,11 +480,6 @@ mod test {
EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![1], schema.clone(), None)
.is_err()
);
// Int is nullable, not allowed to be used for equality delete
assert!(
EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![2], schema.clone(), None)
.is_err()
);
// Struct is not allowed to be used for equality delete
assert!(
EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![3], schema.clone(), None)
Expand All @@ -495,11 +489,6 @@ mod test {
assert!(
EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![4], schema.clone(), None).is_ok()
);
// Nested field of optional struct is not allowed to be used for equality delete
assert!(
EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![6], schema.clone(), None)
.is_err()
);
// Nested field of map is not allowed to be used for equality delete
assert!(
EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![7], schema.clone(), None)
Expand Down

0 comments on commit 690b2cf

Please sign in to comment.