Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jan 13, 2025
1 parent bcf7d54 commit 539383e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
2 changes: 2 additions & 0 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ impl KafkaTester {
let mut deserializer = ArrowDeserializer::new(
format.clone(),
Arc::new(aschema),
&schema.metadata_fields(),
None,
BadData::Fail {},
);
Expand Down Expand Up @@ -701,6 +702,7 @@ impl KafkaTester {
let mut deserializer = ArrowDeserializer::new(
format.clone(),
Arc::new(aschema),
&schema.metadata_fields(),
None,
BadData::Fail {},
);
Expand Down
21 changes: 11 additions & 10 deletions crates/arroyo-formats/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl ArrowDeserializer {
Self::with_schema_resolver_and_raw_schema(
format,
framing,
Arc::new(schema.schema_without_timestamp()),
schema.schema.clone(),
Some(schema.timestamp_index),
metadata_fields,
bad_data,
Expand Down Expand Up @@ -329,7 +329,7 @@ impl ArrowDeserializer {
fn with_schema_resolver_and_raw_schema(
format: Format,
framing: Option<Framing>,
schema_without_timestamp: Arc<Schema>,
schema: Arc<Schema>,
timestamp_idx: Option<usize>,
metadata_fields: &[MetadataField],
bad_data: BadData,
Expand All @@ -348,20 +348,20 @@ impl ArrowDeserializer {
let metadata_names: HashSet<_> = metadata_fields.iter().map(|f| &f.field_name).collect();

let schema_without_additional = {
let fields = schema_without_timestamp
let fields = schema
.fields()
.iter()
.filter(|f| !metadata_names.contains(f.name()))
.filter(|f| !metadata_names.contains(f.name()) && f.name() != TIMESTAMP_FIELD)
.cloned()
.collect::<Vec<_>>();
Arc::new(Schema::new_with_metadata(
fields,
schema_without_timestamp.metadata.clone(),
))
Arc::new(Schema::new_with_metadata(fields, schema.metadata.clone()))
};

let buffer_decoder = match format {
Format::Json(..)
Format::Json(JsonFormat {
unstructured: false,
..
})
| Format::Avro(AvroFormat {
into_unstructured_json: false,
..
Expand All @@ -388,7 +388,7 @@ impl ArrowDeserializer {
buffer_decoder,
timestamp_builder: timestamp_idx
.map(|i| (i, TimestampNanosecondBuilder::with_capacity(128))),
final_schema: schema_without_timestamp,
final_schema: schema,
decoder_schema: schema_without_additional,
schema_registry: Arc::new(Mutex::new(HashMap::new())),
bad_data,
Expand Down Expand Up @@ -1000,6 +1000,7 @@ mod tests {
assert!(result.is_empty());

let batch = deserializer.flush_buffer().unwrap().unwrap();
println!("batch ={:?}", batch);
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.columns()[0].as_primitive::<Int64Type>().value(0), 5);
assert_eq!(batch.columns()[1].as_primitive::<Int32Type>().value(0), 5);
Expand Down

0 comments on commit 539383e

Please sign in to comment.