diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 515999627..14bb5455e 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -664,6 +664,7 @@ impl KafkaTester { let mut deserializer = ArrowDeserializer::new( format.clone(), Arc::new(aschema), + &schema.metadata_fields(), None, BadData::Fail {}, ); @@ -701,6 +702,7 @@ impl KafkaTester { let mut deserializer = ArrowDeserializer::new( format.clone(), Arc::new(aschema), + &schema.metadata_fields(), None, BadData::Fail {}, ); diff --git a/crates/arroyo-formats/src/de.rs b/crates/arroyo-formats/src/de.rs index c937a36e2..7d089b9c6 100644 --- a/crates/arroyo-formats/src/de.rs +++ b/crates/arroyo-formats/src/de.rs @@ -293,7 +293,7 @@ impl ArrowDeserializer { Self::with_schema_resolver_and_raw_schema( format, framing, - Arc::new(schema.schema_without_timestamp()), + schema.schema.clone(), Some(schema.timestamp_index), metadata_fields, bad_data, @@ -329,7 +329,7 @@ impl ArrowDeserializer { fn with_schema_resolver_and_raw_schema( format: Format, framing: Option, - schema_without_timestamp: Arc, + schema: Arc, timestamp_idx: Option, metadata_fields: &[MetadataField], bad_data: BadData, @@ -348,20 +348,20 @@ impl ArrowDeserializer { let metadata_names: HashSet<_> = metadata_fields.iter().map(|f| &f.field_name).collect(); let schema_without_additional = { - let fields = schema_without_timestamp + let fields = schema .fields() .iter() - .filter(|f| !metadata_names.contains(f.name())) + .filter(|f| !metadata_names.contains(f.name()) && f.name() != TIMESTAMP_FIELD) .cloned() .collect::>(); - Arc::new(Schema::new_with_metadata( - fields, - schema_without_timestamp.metadata.clone(), - )) + Arc::new(Schema::new_with_metadata(fields, schema.metadata.clone())) }; let buffer_decoder = match format { - Format::Json(..) + Format::Json(JsonFormat { + unstructured: false, + .. + }) | Format::Avro(AvroFormat { into_unstructured_json: false, .. @@ -388,7 +388,7 @@ impl ArrowDeserializer { buffer_decoder, timestamp_builder: timestamp_idx .map(|i| (i, TimestampNanosecondBuilder::with_capacity(128))), - final_schema: schema_without_timestamp, + final_schema: schema, decoder_schema: schema_without_additional, schema_registry: Arc::new(Mutex::new(HashMap::new())), bad_data, @@ -1000,6 +1000,7 @@ mod tests { assert!(result.is_empty()); let batch = deserializer.flush_buffer().unwrap().unwrap(); + println!("batch ={:?}", batch); assert_eq!(batch.num_rows(), 1); assert_eq!(batch.columns()[0].as_primitive::().value(0), 5); assert_eq!(batch.columns()[1].as_primitive::().value(0), 5);