diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b4e15821f..fd266f2fa 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -48,7 +48,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, PrimitiveType, Schema}; +use crate::spec::{DataContentType, Datum, PrimitiveType, Schema}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -189,12 +189,6 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); - // RecordBatchTransformer performs any required transformations on the RecordBatches - // that come back from the file, such as type promotion, default column insertion - // and column re-ordering - let mut record_batch_transformer = - RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); - if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } @@ -246,13 +240,27 @@ impl ArrowReader { // Build the batch stream and send all the RecordBatches that it generates // to the requester. + let record_batch_stream = record_batch_stream_builder.build()?; + let record_batch_stream = - record_batch_stream_builder - .build()? - .map(move |batch| match batch { + if matches!(task.data_file_content, DataContentType::PositionDeletes) { + // The schema of the xxx file doesn't change, so we don't need to convert the schema. + record_batch_stream.map(move |batch| match batch { + Ok(batch) => record_batch_transformer.process_record_batch(batch), + Err(err) => Err(err.into()), + }) + } else { + // RecordBatchTransformer performs any required transformations on the RecordBatches + // that come back from the file, such as type promotion, default column insertion + // and column re-ordering. + let mut record_batch_transformer = + RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); + + record_batch_stream.map(move |batch| match batch { Ok(batch) => record_batch_transformer.process_record_batch(batch), Err(err) => Err(err.into()), - }); + }) + }; Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) }