Skip to content

Commit

Permalink
AVRO-3917: [Rust] take field aliases into account when calculating sc…
Browse files Browse the repository at this point in the history
…hema compatibilities (#2633)

* AVRO-3917: [Rust] take field aliases into account when calculating schema compatibilities

* AVRO-3917: Minor improvements

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

---------

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
marcosschroh and martin-g authored Dec 12, 2023
1 parent ff32453 commit e248e6b
Showing 1 changed file with 83 additions and 29 deletions.
112 changes: 83 additions & 29 deletions lang/rust/avro/src/schema_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,45 @@ impl Checker {
}) = readers_schema
{
for field in r_fields.iter() {
if let Some(pos) = w_lookup.get(&field.name) {
if let Err(err) =
self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
{
return Err(CompatibilityError::FieldTypeMismatch(
field.name.clone(),
Box::new(err),
));
// get all field names in a vector (field.name + aliases)
let mut fields_names = vec![&field.name];
if let Some(ref aliases) = field.aliases {
for alias in aliases {
fields_names.push(alias);
}
}

// Check whether any of the possible fields names are in the writer schema.
// If the field was found, then it must have the exact same name with the writer field,
// otherwise we would have a false positive with the writers aliases
let mut position = None;
for field_name in fields_names {
if let Some(pos) = w_lookup.get(field_name) {
if &w_fields[*pos].name == field_name {
position = Some(pos);
break;
}
}
}

match position {
Some(pos) => {
if let Err(err) =
self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
{
return Err(CompatibilityError::FieldTypeMismatch(
field.name.clone(),
Box::new(err),
));
}
}
_ => {
if field.default.is_none() {
return Err(CompatibilityError::MissingDefaultValue(
field.name.clone(),
));
}
}
} else if field.default.is_none() {
return Err(CompatibilityError::MissingDefaultValue(field.name.clone()));
}
}
}
Expand Down Expand Up @@ -1162,9 +1190,8 @@ mod tests {
#[test]
fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
{
use serde::{Deserialize, Serialize};

const RAW_SCHEMA_V1: &str = r#"
let schema_v1 = Schema::parse_str(
r#"
{
"type": "record",
"name": "Conference",
Expand All @@ -1173,8 +1200,11 @@ mod tests {
{"type": "string", "name": "name"},
{"type": "long", "name": "date"}
]
}"#;
const RAW_SCHEMA_V2: &str = r#"
}"#,
)?;

let schema_v2 = Schema::parse_str(
r#"
{
"type": "record",
"name": "Conference",
Expand All @@ -1183,23 +1213,47 @@ mod tests {
{"type": "string", "name": "name"},
{"type": "long", "name": "date", "aliases" : [ "time" ]}
]
}"#;
}"#,
)?;

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct Conference {
pub name: String,
pub date: i64,
}
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ConferenceV2 {
pub name: String,
pub time: i64,
}
assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());

let schema_v1 = Schema::parse_str(RAW_SCHEMA_V1)?;
let schema_v2 = Schema::parse_str(RAW_SCHEMA_V2)?;
Ok(())
}

assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2).is_ok());
#[test]
fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
let schema_v1 = Schema::parse_str(
r#"
{
"type": "record",
"name": "Conference",
"namespace": "advdaba",
"fields": [
{"type": "string", "name": "name"},
{"type": "long", "name": "date", "aliases" : [ "time" ]}
]
}"#,
)?;

let schema_v2 = Schema::parse_str(
r#"
{
"type": "record",
"name": "Conference",
"namespace": "advdaba",
"fields": [
{"type": "string", "name": "name"},
{"type": "long", "name": "time"}
]
}"#,
)?;

assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok());
assert_eq!(
CompatibilityError::MissingDefaultValue(String::from("time")),
SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
);

Ok(())
}
Expand Down

0 comments on commit e248e6b

Please sign in to comment.