Skip to content

Commit

Permalink
AVRO-3916: [Rust] Add [Local]TimestampNanos types (#2611)
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
martin-g authored Dec 8, 2023
1 parent 2f9673a commit e9f10b8
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 4 deletions.
2 changes: 2 additions & 0 deletions lang/rust/avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,10 @@ fn main() -> Result<(), Error> {
record.put("time_micros", Value::TimeMicros(3));
record.put("timestamp_millis", Value::TimestampMillis(4));
record.put("timestamp_micros", Value::TimestampMicros(5));
record.put("timestamp_nanos", Value::TimestampNanos(6));
record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
record.put("local_timestamp_nanos", Value::LocalTimestampMicros(6));
record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));

writer.append(record)?;
Expand Down
54 changes: 50 additions & 4 deletions lang/rust/avro/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
| Value::TimeMicros(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i)
| Value::TimestampNanos(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i) => visitor.visit_i64(*i),
| Value::LocalTimestampMicros(i)
| Value::LocalTimestampNanos(i) => visitor.visit_i64(*i),
&Value::Float(f) => visitor.visit_f32(f),
&Value::Double(d) => visitor.visit_f64(d),
Value::Union(_i, u) => match **u {
Expand All @@ -257,8 +259,10 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
| Value::TimeMicros(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i)
| Value::TimestampNanos(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i) => visitor.visit_i64(i),
| Value::LocalTimestampMicros(i)
| Value::LocalTimestampNanos(i) => visitor.visit_i64(i),
Value::Float(f) => visitor.visit_f32(f),
Value::Double(d) => visitor.visit_f64(d),
Value::Record(ref fields) => visitor.visit_map(RecordDeserializer::new(fields)),
Expand Down Expand Up @@ -1080,7 +1084,16 @@ mod tests {
fn test_timestamp_micros() -> TestResult {
let raw_value = 1;
let value = Value::TimestampMicros(raw_value);
let result = crate::from_value::<i64>(&value)?;
let result = from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}

#[test]
fn test_avro_3916_timestamp_nanos() -> TestResult {
let raw_value = 1;
let value = Value::TimestampNanos(raw_value);
let result = from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}
Expand All @@ -1089,7 +1102,7 @@ mod tests {
fn test_avro_3853_local_timestamp_millis() -> TestResult {
let raw_value = 1;
let value = Value::LocalTimestampMillis(raw_value);
let result = crate::from_value::<i64>(&value)?;
let result = from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}
Expand All @@ -1103,6 +1116,15 @@ mod tests {
Ok(())
}

#[test]
fn test_avro_3916_local_timestamp_nanos() -> TestResult {
let raw_value = 1;
let value = Value::LocalTimestampNanos(raw_value);
let result = crate::from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}

#[test]
fn test_from_value_uuid_str() -> TestResult {
let raw_value = "9ec535ff-3e2a-45bd-91d3-0a01321b5a49";
Expand Down Expand Up @@ -1146,8 +1168,10 @@ mod tests {
("time_micros_a".to_string(), 123),
("timestamp_millis_b".to_string(), 234),
("timestamp_micros_c".to_string(), 345),
("timestamp_nanos_d".to_string(), 345_001),
("local_timestamp_millis_d".to_string(), 678),
("local_timestamp_micros_e".to_string(), 789),
("local_timestamp_nanos_f".to_string(), 345_002),
]
.iter()
.cloned()
Expand All @@ -1164,12 +1188,18 @@ mod tests {
key if key.starts_with("timestamp_micros_") => {
(k.clone(), Value::TimestampMicros(*v))
}
key if key.starts_with("timestamp_nanos_") => {
(k.clone(), Value::TimestampNanos(*v))
}
key if key.starts_with("local_timestamp_millis_") => {
(k.clone(), Value::LocalTimestampMillis(*v))
}
key if key.starts_with("local_timestamp_micros_") => {
(k.clone(), Value::LocalTimestampMicros(*v))
}
key if key.starts_with("local_timestamp_nanos_") => {
(k.clone(), Value::LocalTimestampNanos(*v))
}
_ => unreachable!("unexpected key: {:?}", k),
})
.collect();
Expand Down Expand Up @@ -1219,6 +1249,14 @@ mod tests {
"a_non_existing_timestamp_micros".to_string(),
Value::Union(0, Box::new(Value::TimestampMicros(-345))),
),
(
"a_timestamp_nanos".to_string(),
Value::Union(0, Box::new(Value::TimestampNanos(345))),
),
(
"a_non_existing_timestamp_nanos".to_string(),
Value::Union(0, Box::new(Value::TimestampNanos(-345))),
),
(
"a_local_timestamp_millis".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampMillis(678))),
Expand All @@ -1235,6 +1273,14 @@ mod tests {
"a_non_existing_local_timestamp_micros".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampMicros(-789))),
),
(
"a_local_timestamp_nanos".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampNanos(789))),
),
(
"a_non_existing_local_timestamp_nanos".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampNanos(-789))),
),
(
"a_record".to_string(),
Value::Union(
Expand Down
2 changes: 2 additions & 0 deletions lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
Schema::TimestampNanos => zag_i64(reader).map(Value::TimestampNanos),
Schema::LocalTimestampMillis => zag_i64(reader).map(Value::LocalTimestampMillis),
Schema::LocalTimestampMicros => zag_i64(reader).map(Value::LocalTimestampMicros),
Schema::LocalTimestampNanos => zag_i64(reader).map(Value::LocalTimestampNanos),
Schema::Duration => {
let mut buf = [0u8; 12];
reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
Expand Down
2 changes: 2 additions & 0 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
Value::Long(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i)
| Value::TimestampNanos(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i)
| Value::LocalTimestampNanos(i)
| Value::TimeMicros(i) => encode_long(*i, buffer),
Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
Expand Down
6 changes: 6 additions & 0 deletions lang/rust/avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,18 @@ pub enum Error {
#[error("TimestampMicros expected, got {0:?}")]
GetTimestampMicros(ValueKind),

#[error("TimestampNanos expected, got {0:?}")]
GetTimestampNanos(ValueKind),

#[error("LocalTimestampMillis expected, got {0:?}")]
GetLocalTimestampMillis(ValueKind),

#[error("LocalTimestampMicros expected, got {0:?}")]
GetLocalTimestampMicros(ValueKind),

#[error("LocalTimestampNanos expected, got {0:?}")]
GetLocalTimestampNanos(ValueKind),

#[error("Null expected, got {0:?}")]
GetNull(ValueKind),

Expand Down
2 changes: 2 additions & 0 deletions lang/rust/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,10 @@
//! record.put("time_micros", Value::TimeMicros(3));
//! record.put("timestamp_millis", Value::TimestampMillis(4));
//! record.put("timestamp_micros", Value::TimestampMicros(5));
//! record.put("timestamp_nanos", Value::TimestampNanos(6));
//! record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
//! record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
//! record.put("local_timestamp_nanos", Value::LocalTimestampMicros(6));
//! record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));
//!
//! writer.append(record)?;
Expand Down
18 changes: 18 additions & 0 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ pub enum Schema {
TimestampMillis,
/// An instant in time represented as the number of microseconds after the UNIX epoch.
TimestampMicros,
/// An instant in time represented as the number of nanoseconds after the UNIX epoch.
TimestampNanos,
/// An instant in localtime represented as the number of milliseconds after the UNIX epoch.
LocalTimestampMillis,
/// An instant in local time represented as the number of microseconds after the UNIX epoch.
LocalTimestampMicros,
/// An instant in local time represented as the number of nanoseconds after the UNIX epoch.
LocalTimestampNanos,
/// An amount of time defined by a number of months, days and milliseconds.
Duration,
/// A reference to another schema.
Expand Down Expand Up @@ -199,8 +203,10 @@ impl From<&types::Value> for SchemaKind {
Value::TimeMicros(_) => Self::TimeMicros,
Value::TimestampMillis(_) => Self::TimestampMillis,
Value::TimestampMicros(_) => Self::TimestampMicros,
Value::TimestampNanos(_) => Self::TimestampNanos,
Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
Value::LocalTimestampNanos(_) => Self::LocalTimestampNanos,
Value::Duration { .. } => Self::Duration,
}
}
Expand Down Expand Up @@ -1942,6 +1948,12 @@ impl Serialize for Schema {
map.serialize_entry("logicalType", "timestamp-micros")?;
map.end()
}
Schema::TimestampNanos => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "timestamp-nanos")?;
map.end()
}
Schema::LocalTimestampMillis => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
Expand All @@ -1954,6 +1966,12 @@ impl Serialize for Schema {
map.serialize_entry("logicalType", "local-timestamp-micros")?;
map.end()
}
Schema::LocalTimestampNanos => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "local-timestamp-nanos")?;
map.end()
}
Schema::Duration => {
let mut map = serializer.serialize_map(None)?;

Expand Down
54 changes: 54 additions & 0 deletions lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,14 @@ pub enum Value {
TimestampMillis(i64),
/// Timestamp in microseconds.
TimestampMicros(i64),
/// Timestamp in nanoseconds.
TimestampNanos(i64),
/// Local timestamp in milliseconds.
LocalTimestampMillis(i64),
/// Local timestamp in microseconds.
LocalTimestampMicros(i64),
/// Local timestamp in nanoseconds.
LocalTimestampNanos(i64),
/// Avro Duration. An amount of time defined by months, days and milliseconds.
Duration(Duration),
/// Universally unique identifier.
Expand Down Expand Up @@ -340,8 +344,10 @@ impl TryFrom<Value> for JsonValue {
Value::TimeMicros(t) => Ok(Self::Number(t.into())),
Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
Value::Duration(d) => Ok(Self::Array(
<[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
)),
Expand Down Expand Up @@ -428,8 +434,10 @@ impl Value {
(&Value::Long(_), &Schema::LocalTimestampMicros) => None,
(&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
(&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
(&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
(&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
(&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
(&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
(&Value::TimeMicros(_), &Schema::TimeMicros) => None,
(&Value::TimeMillis(_), &Schema::TimeMillis) => None,
(&Value::Date(_), &Schema::Date) => None,
Expand Down Expand Up @@ -689,8 +697,10 @@ impl Value {
Schema::TimeMicros => self.resolve_time_micros(),
Schema::TimestampMillis => self.resolve_timestamp_millis(),
Schema::TimestampMicros => self.resolve_timestamp_micros(),
Schema::TimestampNanos => self.resolve_timestamp_nanos(),
Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
Schema::Duration => self.resolve_duration(),
Schema::Uuid => self.resolve_uuid(),
}
Expand Down Expand Up @@ -814,6 +824,14 @@ impl Value {
}
}

fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
match self {
Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
other => Err(Error::GetTimestampNanos(other.into())),
}
}

fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
match self {
Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
Expand All @@ -834,6 +852,14 @@ impl Value {
}
}

fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
match self {
Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
other => Err(Error::GetLocalTimestampNanos(other.into())),
}
}

fn resolve_null(self) -> Result<Self, Error> {
match self {
Value::Null => Ok(Value::Null),
Expand Down Expand Up @@ -1738,6 +1764,16 @@ Field with name '"b"' is not a member of the map items"#,
assert!(value.resolve(&Schema::TimestampMicros).is_err());
}

#[test]
fn test_avro_3914_resolve_timestamp_nanos() {
let value = Value::TimestampNanos(10);
assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
assert!(value.resolve(&Schema::Int).is_err());

let value = Value::Double(10.0);
assert!(value.resolve(&Schema::TimestampNanos).is_err());
}

#[test]
fn test_avro_3853_resolve_timestamp_millis() {
let value = Value::LocalTimestampMillis(10);
Expand All @@ -1758,6 +1794,16 @@ Field with name '"b"' is not a member of the map items"#,
assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
}

#[test]
fn test_avro_3916_resolve_timestamp_nanos() {
let value = Value::LocalTimestampNanos(10);
assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
assert!(value.resolve(&Schema::Int).is_err());

let value = Value::Double(10.0);
assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
}

#[test]
fn resolve_duration() {
let value = Value::Duration(Duration::new(
Expand Down Expand Up @@ -1963,6 +2009,10 @@ Field with name '"b"' is not a member of the map items"#,
JsonValue::try_from(Value::TimestampMicros(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::TimestampNanos(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::LocalTimestampMillis(1))?,
JsonValue::Number(1.into())
Expand All @@ -1971,6 +2021,10 @@ Field with name '"b"' is not a member of the map items"#,
JsonValue::try_from(Value::LocalTimestampMicros(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::LocalTimestampNanos(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::Duration(
[1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()
Expand Down

0 comments on commit e9f10b8

Please sign in to comment.