From 1d61ff3a589f3d4bf4bc104633af0352f2459d19 Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Fri, 9 Feb 2024 16:31:56 +0100 Subject: [PATCH] add support for logical types in rust avro --- lang/rust/avro/src/schema_compatibility.rs | 273 ++++++++++++++++++++- 1 file changed, 265 insertions(+), 8 deletions(-) diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs index 09c302036e2..5dd45d17ab3 100644 --- a/lang/rust/avro/src/schema_compatibility.rs +++ b/lang/rust/avro/src/schema_compatibility.rs @@ -62,7 +62,19 @@ impl Checker { let w_type = SchemaKind::from(writers_schema); let r_type = SchemaKind::from(readers_schema); - if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) { + if w_type != SchemaKind::Union + && (r_type.is_primitive() + || r_type == SchemaKind::Fixed + || r_type == SchemaKind::TimeMicros + || r_type == SchemaKind::TimestampMillis + || r_type == SchemaKind::TimestampMicros + || r_type == SchemaKind::TimestampNanos + || r_type == SchemaKind::LocalTimestampMillis + || r_type == SchemaKind::LocalTimestampMicros + || r_type == SchemaKind::LocalTimestampNanos + || r_type == SchemaKind::Date + || r_type == SchemaKind::TimeMillis) + { return Ok(()); } @@ -401,6 +413,106 @@ impl SchemaCompatibility { }); } } + SchemaKind::TimeMillis => { + if let Schema::TimeMillis = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimeMillis, SchemaKind::Int], + }); + } + } + SchemaKind::TimeMicros => { + if let Schema::TimeMicros = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimeMicros, SchemaKind::Long], + }); + } + } + SchemaKind::TimestampNanos => { + if let Schema::TimestampNanos = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimestampNanos, SchemaKind::Long], + }); + } + } + SchemaKind::TimestampMillis => { + if let Schema::TimestampMillis = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimestampMillis, SchemaKind::Long], + }); + } + } + SchemaKind::TimestampMicros => { + if let Schema::TimestampMicros = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimestampMicros, SchemaKind::Long], + }); + } + } + SchemaKind::Date => { + if let Schema::Date = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::Date, SchemaKind::Int], + }); + } + } + SchemaKind::LocalTimestampMillis => { + if let Schema::LocalTimestampMillis = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::LocalTimestampMillis, SchemaKind::Long], + }); + } + } + SchemaKind::LocalTimestampMicros => { + if let Schema::LocalTimestampMicros = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::LocalTimestampMicros, SchemaKind::Long], + }); + } + } + SchemaKind::LocalTimestampNanos => { + if let Schema::LocalTimestampNanos = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::LocalTimestampNanos, SchemaKind::Long], + }); + } + } + SchemaKind::Duration => { + if let Schema::Duration = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::Duration, SchemaKind::Fixed], + }); + } + } _ => { return Err(CompatibilityError::Inconclusive(String::from( "readers_schema", @@ -412,9 +524,15 @@ impl SchemaCompatibility { // Here are the checks for primitive types match w_type { SchemaKind::Int => { - if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) + if [ + SchemaKind::Long, + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::TimeMillis, + SchemaKind::Date, + ] + .iter() + .any(|&t| t == r_type) { Ok(()) } else { @@ -425,15 +543,35 @@ impl SchemaCompatibility { } } SchemaKind::Long => { - if [SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) + if [ + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::TimeMicros, + SchemaKind::TimestampMillis, + SchemaKind::TimestampMicros, + SchemaKind::TimestampNanos, + SchemaKind::LocalTimestampMillis, + SchemaKind::LocalTimestampMicros, + SchemaKind::LocalTimestampNanos, + ] + .iter() + .any(|&t| t == r_type) { Ok(()) } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Float, SchemaKind::Double], + expected_type: &[ + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::TimeMicros, + SchemaKind::TimestampMillis, + SchemaKind::TimestampMicros, + SchemaKind::TimestampNanos, + SchemaKind::LocalTimestampMillis, + SchemaKind::LocalTimestampMicros, + SchemaKind::LocalTimestampNanos, + ], }) } } @@ -470,6 +608,96 @@ impl SchemaCompatibility { }) } } + SchemaKind::TimeMillis => { + if r_type == SchemaKind::Int { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Int, SchemaKind::TimeMillis], + }) + } + } + SchemaKind::Date => { + if r_type == SchemaKind::Int { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Int, SchemaKind::Date], + }) + } + } + SchemaKind::TimeMicros => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::TimeMicros], + }) + } + } + SchemaKind::TimestampMicros => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::TimestampMicros], + }) + } + } + SchemaKind::TimestampMillis => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::TimestampMillis], + }) + } + } + SchemaKind::TimestampNanos => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::TimestampNanos], + }) + } + } + SchemaKind::LocalTimestampMillis => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampMillis], + }) + } + } + SchemaKind::LocalTimestampMicros => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampMicros], + }) + } + } + SchemaKind::LocalTimestampNanos => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampNanos], + }) + } + } _ => Err(CompatibilityError::Inconclusive(String::from( "writers_schema", ))), @@ -659,6 +887,16 @@ mod tests { // bytes (Schema::Bytes, Schema::Null), (Schema::Bytes, Schema::Int), + // logical types + (Schema::TimeMicros, Schema::Int), + (Schema::TimestampMillis, Schema::Int), + (Schema::TimestampMicros, Schema::Int), + (Schema::TimestampNanos, Schema::Int), + (Schema::LocalTimestampMillis, Schema::Int), + (Schema::LocalTimestampMicros, Schema::Int), + (Schema::LocalTimestampNanos, Schema::Int), + (Schema::Date, Schema::Long), + (Schema::TimeMillis, Schema::Long), // array and maps (int_array_schema(), long_array_schema()), (int_map_schema(), int_array_schema()), @@ -698,6 +936,25 @@ mod tests { (Schema::Double, Schema::Float), (Schema::String, Schema::Bytes), (Schema::Bytes, Schema::String), + // logical types + (Schema::TimeMicros, Schema::Long), + (Schema::TimestampMillis, Schema::Long), + (Schema::TimestampMicros, Schema::Long), + (Schema::TimestampNanos, Schema::Long), + (Schema::LocalTimestampMillis, Schema::Long), + (Schema::LocalTimestampMicros, Schema::Long), + (Schema::LocalTimestampNanos, Schema::Long), + (Schema::Date, Schema::Int), + (Schema::TimeMillis, Schema::Int), + (Schema::Long, Schema::TimeMicros), + (Schema::Long, Schema::TimestampMillis), + (Schema::Long, Schema::TimestampMicros), + (Schema::Long, Schema::TimestampNanos), + (Schema::Long, Schema::LocalTimestampMillis), + (Schema::Long, Schema::LocalTimestampMicros), + (Schema::Long, Schema::LocalTimestampNanos), + (Schema::Int, Schema::Date), + (Schema::Int, Schema::TimeMillis), (int_array_schema(), int_array_schema()), (long_array_schema(), int_array_schema()), (int_map_schema(), int_map_schema()),