Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reported and actual arrow schema of the table can be different #813

Open
gruuya opened this issue Dec 17, 2024 · 6 comments · May be fixed by #814
Open

Reported and actual arrow schema of the table can be different #813

gruuya opened this issue Dec 17, 2024 · 6 comments · May be fixed by #814

Comments

@gruuya
Copy link
Contributor

gruuya commented Dec 17, 2024

This is related to #783.

Namely what happens is

  • I use pyiceberg to create an Iceberg table from a Parquet file.
  • The Parquet file has type hints for e.g. DataType::Int16 (required int32 c1 (INTEGER(16,true)) = 1;).
  • Thanks to Discussion: Support conversion of Arrow Int8 and Int16 to PrimitiveType::Int #783 we now upcast that to the native 32-bit Int type and can read it.
  • This is also the type returned in e.g. TableProvider::schema.
  • However the actual type in the read arrow record batches (inferred from the Parquet hint) is now DataType::Int16, leading to reported and actual schema mismatch.
  • This now leads to a DataFusion query such as SELECT c1 FROM t where c1 <= 2 crashing with Invalid comparison operation: Int16 <= Int32
  • Ultimately the schema mismatch tricks one of the logical optimizers into thinking that if it casts the right side (i.e. the 2 literal) into DataType::Int32 (from the reported schema) the comparison will be fine.
@gruuya
Copy link
Contributor Author

gruuya commented Dec 18, 2024

I see two ways of resolving this

  1. Align the schema returned from TableProvider::schema to match the one from the Parquet files. I dislike this approach as it feels hacky. For one, the canonical arrow schema for a given table will still be different than what TableProvider::schema returns, and moreover this would necessitate reading the Parquet file metadata when instantiating the table (and it's also vulnerable to schema drift across Parquet files in different table versions).
  2. Coerce the streamed batches from the scan to match the canonical arrow schema
    - one way to do this is by instructing arrow-rs itself to perform the casting automatically as is suggested in Supply a hint arrow schema for casting Parquet field types during scans #814
    - another might be to do it via the RecordBatchTransformer
    - or just wrap the stream in IcebergTableScan with a SchemaAdapter/SchemaMapper from DataFusion.

@Fokko
Copy link
Contributor

Fokko commented Dec 18, 2024

So we have the table schema, and we have the file schema, and we must assume that they are different. This can be in the case that you mentioned above, where there is a different arrow type, but more obvious is in the case of schema evolution. Iceberg is lazy, and will not rewrite the historical data until it needs to update the Parquet file.

For the data itself, we should project it into the schema of the table. For the filtering situation, we want to cast the type to the physical type. This ties also in with the second part of #777 (comment)

@Fokko
Copy link
Contributor

Fokko commented Dec 18, 2024

I was also able to replicate this issue here:

// Does not yet work, somewhere in the reader
// let predicate =
// Reference::new("foo").not_equal_to(Datum::int(22));

@gruuya
Copy link
Contributor Author

gruuya commented Dec 18, 2024

Ok, I've also taken a liberty of opening a PR with a pyiceberg -> iceberg-rust -> datafusion integration test #825.

Besides showing that the basics work, it also reproduces this issue.

@gruuya
Copy link
Contributor Author

gruuya commented Dec 20, 2024

For the filtering situation, we want to cast the type to the physical type

Is something like this close to what you had in mind

 /// Convert Iceberg Datum to Arrow Datum.
-pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> {
+pub(crate) fn get_arrow_datum(
+    datum: &Datum,
+    arrow_type: &DataType,
+) -> Result<Box<dyn ArrowDatum + Send>> {
     match (datum.data_type(), datum.literal()) {
         (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
             Ok(Box::new(BooleanArray::new_scalar(*value)))
         }
-        (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
-            Ok(Box::new(Int32Array::new_scalar(*value)))
-        }
+        (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => match arrow_type {
+            DataType::Int8 => Ok(Box::new(Int8Array::new_scalar(*value as i8))),
+            DataType::Int16 => Ok(Box::new(Int16Array::new_scalar(*value as i16))),
+            DataType::Int32 => Ok(Box::new(Int32Array::new_scalar(*value))),
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Can't convert {datum} to type {arrow_type}"),
+            )),
+        },
         (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
             Ok(Box::new(Int64Array::new_scalar(*value)))

and then for PredicateConverter call it only once the column is projected (and thus the target data type is known)

         if let Some(idx) = self.bound_reference(reference)? {
-            let literal = get_arrow_datum(literal)?;
-
             Ok(Box::new(move |batch| {
                 let left = project_column(&batch, idx)?;
+                let literal = get_arrow_datum(literal, left.data_type())?;
                 lt_eq(&left, literal.as_ref())
             }))

I found that this also resolves the reported problem. Though arguably less general than just casting the batches at the arrow/parquet level, it is a less invasive fix.

@gruuya
Copy link
Contributor Author

gruuya commented Dec 20, 2024

Even more liberally, one could do without changing get_arrow_datum at all

         if let Some(idx) = self.bound_reference(reference)? {
-            let literal = get_arrow_datum(literal)?;
+            let mut literal = get_arrow_datum(literal)?;
 
             Ok(Box::new(move |batch| {
                 let left = project_column(&batch, idx)?;
+                if left.data_type() != literal.get().0.data_type() {
+                    literal = Box::new(Scalar::new(cast(literal.get().0, left.data_type())?));
+                }
                 lt_eq(&left, literal.as_ref())
             }))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants