Skip to content

Commit

Permalink
add support for logical types in rust avro
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralf Grubenmann committed Feb 9, 2024
1 parent 9059dd5 commit 1d61ff3
Showing 1 changed file with 265 additions and 8 deletions.
273 changes: 265 additions & 8 deletions lang/rust/avro/src/schema_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,19 @@ impl Checker {
let w_type = SchemaKind::from(writers_schema);
let r_type = SchemaKind::from(readers_schema);

if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) {
if w_type != SchemaKind::Union
&& (r_type.is_primitive()
|| r_type == SchemaKind::Fixed
|| r_type == SchemaKind::TimeMicros
|| r_type == SchemaKind::TimestampMillis
|| r_type == SchemaKind::TimestampMicros
|| r_type == SchemaKind::TimestampNanos
|| r_type == SchemaKind::LocalTimestampMillis
|| r_type == SchemaKind::LocalTimestampMicros
|| r_type == SchemaKind::LocalTimestampNanos
|| r_type == SchemaKind::Date
|| r_type == SchemaKind::TimeMillis)
{
return Ok(());
}

Expand Down Expand Up @@ -401,6 +413,106 @@ impl SchemaCompatibility {
});
}
}
SchemaKind::TimeMillis => {
if let Schema::TimeMillis = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::TimeMillis, SchemaKind::Int],
});
}
}
SchemaKind::TimeMicros => {
if let Schema::TimeMicros = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::TimeMicros, SchemaKind::Long],
});
}
}
SchemaKind::TimestampNanos => {
if let Schema::TimestampNanos = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::TimestampNanos, SchemaKind::Long],
});
}
}
SchemaKind::TimestampMillis => {
if let Schema::TimestampMillis = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::TimestampMillis, SchemaKind::Long],
});
}
}
SchemaKind::TimestampMicros => {
if let Schema::TimestampMicros = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::TimestampMicros, SchemaKind::Long],
});
}
}
SchemaKind::Date => {
if let Schema::Date = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::Date, SchemaKind::Int],
});
}
}
SchemaKind::LocalTimestampMillis => {
if let Schema::LocalTimestampMillis = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::LocalTimestampMillis, SchemaKind::Long],
});
}
}
SchemaKind::LocalTimestampMicros => {
if let Schema::LocalTimestampMicros = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::LocalTimestampMicros, SchemaKind::Long],
});
}
}
SchemaKind::LocalTimestampNanos => {
if let Schema::LocalTimestampNanos = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::LocalTimestampNanos, SchemaKind::Long],
});
}
}
SchemaKind::Duration => {
if let Schema::Duration = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: &[SchemaKind::Duration, SchemaKind::Fixed],
});
}
}
_ => {
return Err(CompatibilityError::Inconclusive(String::from(
"readers_schema",
Expand All @@ -412,9 +524,15 @@ impl SchemaCompatibility {
// Here are the checks for primitive types
match w_type {
SchemaKind::Int => {
if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
.iter()
.any(|&t| t == r_type)
if [
SchemaKind::Long,
SchemaKind::Float,
SchemaKind::Double,
SchemaKind::TimeMillis,
SchemaKind::Date,
]
.iter()
.any(|&t| t == r_type)
{
Ok(())
} else {
Expand All @@ -425,15 +543,35 @@ impl SchemaCompatibility {
}
}
SchemaKind::Long => {
if [SchemaKind::Float, SchemaKind::Double]
.iter()
.any(|&t| t == r_type)
if [
SchemaKind::Float,
SchemaKind::Double,
SchemaKind::TimeMicros,
SchemaKind::TimestampMillis,
SchemaKind::TimestampMicros,
SchemaKind::TimestampNanos,
SchemaKind::LocalTimestampMillis,
SchemaKind::LocalTimestampMicros,
SchemaKind::LocalTimestampNanos,
]
.iter()
.any(|&t| t == r_type)
{
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Float, SchemaKind::Double],
expected_type: &[
SchemaKind::Float,
SchemaKind::Double,
SchemaKind::TimeMicros,
SchemaKind::TimestampMillis,
SchemaKind::TimestampMicros,
SchemaKind::TimestampNanos,
SchemaKind::LocalTimestampMillis,
SchemaKind::LocalTimestampMicros,
SchemaKind::LocalTimestampNanos,
],
})
}
}
Expand Down Expand Up @@ -470,6 +608,96 @@ impl SchemaCompatibility {
})
}
}
SchemaKind::TimeMillis => {
if r_type == SchemaKind::Int {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Int, SchemaKind::TimeMillis],
})
}
}
SchemaKind::Date => {
if r_type == SchemaKind::Int {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Int, SchemaKind::Date],
})
}
}
SchemaKind::TimeMicros => {
if r_type == SchemaKind::Long {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Long, SchemaKind::TimeMicros],
})
}
}
SchemaKind::TimestampMicros => {
if r_type == SchemaKind::Long {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Long, SchemaKind::TimestampMicros],
})
}
}
SchemaKind::TimestampMillis => {
if r_type == SchemaKind::Long {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Long, SchemaKind::TimestampMillis],
})
}
}
SchemaKind::TimestampNanos => {
if r_type == SchemaKind::Long {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Long, SchemaKind::TimestampNanos],
})
}
}
SchemaKind::LocalTimestampMillis => {
if r_type == SchemaKind::Long {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampMillis],
})
}
}
SchemaKind::LocalTimestampMicros => {
if r_type == SchemaKind::Long {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampMicros],
})
}
}
SchemaKind::LocalTimestampNanos => {
if r_type == SchemaKind::Long {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampNanos],
})
}
}
_ => Err(CompatibilityError::Inconclusive(String::from(
"writers_schema",
))),
Expand Down Expand Up @@ -659,6 +887,16 @@ mod tests {
// bytes
(Schema::Bytes, Schema::Null),
(Schema::Bytes, Schema::Int),
// logical types
(Schema::TimeMicros, Schema::Int),
(Schema::TimestampMillis, Schema::Int),
(Schema::TimestampMicros, Schema::Int),
(Schema::TimestampNanos, Schema::Int),
(Schema::LocalTimestampMillis, Schema::Int),
(Schema::LocalTimestampMicros, Schema::Int),
(Schema::LocalTimestampNanos, Schema::Int),
(Schema::Date, Schema::Long),
(Schema::TimeMillis, Schema::Long),
// array and maps
(int_array_schema(), long_array_schema()),
(int_map_schema(), int_array_schema()),
Expand Down Expand Up @@ -698,6 +936,25 @@ mod tests {
(Schema::Double, Schema::Float),
(Schema::String, Schema::Bytes),
(Schema::Bytes, Schema::String),
// logical types
(Schema::TimeMicros, Schema::Long),
(Schema::TimestampMillis, Schema::Long),
(Schema::TimestampMicros, Schema::Long),
(Schema::TimestampNanos, Schema::Long),
(Schema::LocalTimestampMillis, Schema::Long),
(Schema::LocalTimestampMicros, Schema::Long),
(Schema::LocalTimestampNanos, Schema::Long),
(Schema::Date, Schema::Int),
(Schema::TimeMillis, Schema::Int),
(Schema::Long, Schema::TimeMicros),
(Schema::Long, Schema::TimestampMillis),
(Schema::Long, Schema::TimestampMicros),
(Schema::Long, Schema::TimestampNanos),
(Schema::Long, Schema::LocalTimestampMillis),
(Schema::Long, Schema::LocalTimestampMicros),
(Schema::Long, Schema::LocalTimestampNanos),
(Schema::Int, Schema::Date),
(Schema::Int, Schema::TimeMillis),
(int_array_schema(), int_array_schema()),
(long_array_schema(), int_array_schema()),
(int_map_schema(), int_map_schema()),
Expand Down

0 comments on commit 1d61ff3

Please sign in to comment.