Skip to content

Commit

Permalink
AVRO-3925: Fix decimal serialization format (#2673)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Jan 4, 2024
1 parent e5ca151 commit 4295bc2
Showing 1 changed file with 113 additions and 29 deletions.
142 changes: 113 additions & 29 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn schema_name_r() -> &'static Regex {
Regex::new(
r"^((?P<namespace>([A-Za-z_][A-Za-z0-9_]*(\.[A-Za-z_][A-Za-z0-9_]*)*)?)\.)?(?P<name>[A-Za-z_][A-Za-z0-9_]*)$",
)
.unwrap()
.unwrap()
})
}

Expand Down Expand Up @@ -836,6 +836,33 @@ pub struct FixedSchema {
pub attributes: BTreeMap<String, Value>,
}

impl FixedSchema {
fn serialize_to_map<S>(&self, mut map: S::SerializeMap) -> Result<S::SerializeMap, S::Error>
where
S: Serializer,
{
map.serialize_entry("type", "fixed")?;
if let Some(ref n) = self.name.namespace {
map.serialize_entry("namespace", n)?;
}
map.serialize_entry("name", &self.name.name)?;
if let Some(ref docstr) = self.doc {
map.serialize_entry("doc", docstr)?;
}
map.serialize_entry("size", &self.size)?;

if let Some(ref aliases) = self.aliases {
map.serialize_entry("aliases", aliases)?;
}

for attr in &self.attributes {
map.serialize_entry(attr.0, attr.1)?;
}

Ok(map)
}
}

/// A description of a Union schema.
///
/// `scale` defaults to 0 and is an integer greater than or equal to 0 and `precision` is an
Expand Down Expand Up @@ -1883,31 +1910,9 @@ impl Serialize for Schema {
}
map.end()
}
Schema::Fixed(FixedSchema {
ref name,
ref doc,
ref size,
ref aliases,
ref attributes,
}) => {
Schema::Fixed(ref fixed_schema) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "fixed")?;
if let Some(ref n) = name.namespace {
map.serialize_entry("namespace", n)?;
}
map.serialize_entry("name", &name.name)?;
if let Some(ref docstr) = doc {
map.serialize_entry("doc", docstr)?;
}
map.serialize_entry("size", size)?;

if let Some(ref aliases) = aliases {
map.serialize_entry("aliases", aliases)?;
}

for attr in attributes {
map.serialize_entry(attr.0, attr.1)?;
}
map = fixed_schema.serialize_to_map::<S>(map)?;
map.end()
}
Schema::Decimal(DecimalSchema {
Expand All @@ -1916,12 +1921,26 @@ impl Serialize for Schema {
ref inner,
}) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", &*inner.clone())?;
match inner.as_ref() {
Schema::Fixed(fixed_schema) => {
map = fixed_schema.serialize_to_map::<S>(map)?;
}
Schema::Bytes => {
map.serialize_entry("type", "bytes")?;
}
others => {
return Err(serde::ser::Error::custom(format!(
"DecimalSchema inner type must be Fixed or Bytes, got {:?}",
SchemaKind::from(others)
)));
}
}
map.serialize_entry("logicalType", "decimal")?;
map.serialize_entry("scale", scale)?;
map.serialize_entry("precision", precision)?;
map.end()
}

Schema::BigDecimal => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "bytes")?;
Expand Down Expand Up @@ -2210,7 +2229,7 @@ pub mod derive {
}
}

macro_rules! impl_schema(
macro_rules! impl_schema (
($type:ty, $variant_constructor:expr) => (
impl AvroSchemaComponent for $type {
fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
Expand Down Expand Up @@ -3341,7 +3360,7 @@ mod tests {
schema,
Schema::Union(UnionSchema::new(vec![
Schema::Null,
Schema::TimestampMicros
Schema::TimestampMicros,
])?)
);

Expand Down Expand Up @@ -5271,7 +5290,7 @@ mod tests {
match Schema::parse_str(schema_str) {
Err(Error::FieldNameDuplicate(_)) => (),
other => {
return Err(format!("Expected Error::FieldNameDuplicate, got {other:?}").into())
return Err(format!("Expected Error::FieldNameDuplicate, got {other:?}").into());
}
};

Expand Down Expand Up @@ -6396,4 +6415,69 @@ mod tests {

Ok(())
}

#[test]
fn test_avro_3925_serialize_decimal_inner_fixed() -> TestResult {
let schema = Schema::Decimal(DecimalSchema {
precision: 36,
scale: 10,
inner: Box::new(Schema::Fixed(FixedSchema {
name: Name::new("decimal_36_10").unwrap(),
aliases: None,
doc: None,
size: 16,
attributes: Default::default(),
})),
});

let serialized_json = serde_json::to_string_pretty(&schema)?;

let expected_json = r#"{
"type": "fixed",
"name": "decimal_36_10",
"size": 16,
"logicalType": "decimal",
"scale": 10,
"precision": 36
}"#;

assert_eq!(serialized_json, expected_json);
Ok(())
}

#[test]
fn test_avro_3925_serialize_decimal_inner_bytes() -> TestResult {
let schema = Schema::Decimal(DecimalSchema {
precision: 36,
scale: 10,
inner: Box::new(Schema::Bytes),
});

let serialized_json = serde_json::to_string_pretty(&schema)?;

let expected_json = r#"{
"type": "bytes",
"logicalType": "decimal",
"scale": 10,
"precision": 36
}"#;

assert_eq!(serialized_json, expected_json);
Ok(())
}

#[test]
fn test_avro_3925_serialize_decimal_inner_invalid() -> TestResult {
let schema = Schema::Decimal(DecimalSchema {
precision: 36,
scale: 10,
inner: Box::new(Schema::String),
});

let serialized_json = serde_json::to_string_pretty(&schema);

assert!(serialized_json.is_err());

Ok(())
}
}

0 comments on commit 4295bc2

Please sign in to comment.