Skip to content

Commit

Permalink
fix scan
Browse files Browse the repository at this point in the history
fix filtered_entries

fix ci ut
  • Loading branch information
xxhZs authored and xxchan committed Sep 23, 2024
1 parent 2c43b72 commit 075b084
Showing 1 changed file with 14 additions and 18 deletions.
32 changes: 14 additions & 18 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
DataContentType, DataFileFormat, ManifestEntryRef, ManifestFile, ManifestList, Schema,
SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
Expand Down Expand Up @@ -405,15 +405,6 @@ impl TableScan {
return Ok(());
}

// abort the plan if we encounter a manifest entry whose data file's
// content type is currently unsupported
if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Only Data files currently supported",
));
}

if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
let BoundPredicates {
ref snapshot_bound_predicate,
Expand Down Expand Up @@ -542,6 +533,8 @@ impl ManifestEntryContext {
predicate: self
.bound_predicates
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
sequence_number: self.manifest_entry.sequence_number().unwrap_or(0),
equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(),
}
}
}
Expand Down Expand Up @@ -580,15 +573,10 @@ impl PlanContext {
manifest_list: Arc<ManifestList>,
sender: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
let filtered_entries = manifest_list
.entries()
.iter()
.filter(|manifest_file| manifest_file.content == ManifestContentType::Data);

// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut filtered_mfcs = vec![];
if self.predicate.is_some() {
for manifest_file in filtered_entries {
for manifest_file in manifest_list.entries().iter() {
let partition_bound_predicate = self.get_partition_filter(manifest_file)?;

// evaluate the ManifestFile against the partition filter. Skip
Expand All @@ -610,7 +598,7 @@ impl PlanContext {
}
}
} else {
for manifest_file in filtered_entries {
for manifest_file in manifest_list.entries().iter() {
let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone());
filtered_mfcs.push(Ok(mfc));
}
Expand Down Expand Up @@ -883,6 +871,10 @@ pub struct FileScanTask {
/// The predicate to filter.
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate: Option<BoundPredicate>,
/// The `sequence_number` of the task.
pub sequence_number: i64,
/// The `equality_ids` of the task.
pub equality_ids: Vec<i32>,
}

#[cfg(test)]
Expand Down Expand Up @@ -1590,6 +1582,8 @@ mod tests {
schema: schema.clone(),
record_count: Some(100),
data_file_format: DataFileFormat::Parquet,
sequence_number: 0,
equality_ids: vec![],
};
test_fn(task);

Expand All @@ -1604,6 +1598,8 @@ mod tests {
schema,
record_count: None,
data_file_format: DataFileFormat::Avro,
sequence_number: 0,
equality_ids: vec![],
};
test_fn(task);
}
Expand Down

0 comments on commit 075b084

Please sign in to comment.