Skip to content

Commit

Permalink
Work on avro serializers
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Nov 22, 2023
1 parent f8a88dd commit 7cc876b
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 5 deletions.
143 changes: 141 additions & 2 deletions arroyo-sql/src/avro.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::ptr::null;
use crate::types::{StructDef, StructField, TypeDef};
use anyhow::{anyhow, bail};
use apache_avro::Schema;
use arrow_schema::DataType;
use proc_macro2::Ident;
use apache_avro::schema::{FixedSchema, Name, RecordField, RecordFieldOrder, RecordSchema, UnionSchema};
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use proc_macro2::{Ident, TokenStream};
use quote::quote;

pub const ROOT_NAME: &str = "ArroyoAvroRoot";
Expand Down Expand Up @@ -111,3 +113,140 @@ fn to_typedef(source_name: &str, schema: &Schema) -> (TypeDef, Option<String>) {
),
}
}

/// Generates code that serializes an arroyo data struct into avro
///
/// Note that this must align with the schemas described in arrow_to_avro below
pub fn generate_serializer_item(record: &Ident, field: &Ident, td: &TypeDef) -> TokenStream {
use DataType::*;
let value = quote!(#record.#field);
match td {
TypeDef::StructDef(_, _) => {
quote!(todo!("structs"))
}
TypeDef::DataType(dt, nullable) => {
let inner = match dt {
Null => unreachable!("null fields are not supported"),
Boolean => quote!{ Boolean(v) },
Int8 | Int16 | Int32 | UInt8 | UInt16 => quote! { Int(v) },
Int64 | UInt32 => quote! { Long(v) },
UInt64 => quote! { Fixed(8, v.to_be_bytes().to_vec()) },
Float16 | Float32 => quote! { Float(v) },
Float64 => quote! { Double(v) },
Timestamp(t, tz) => {
match (t, tz) {
(TimeUnit::Microsecond | TimeUnit::Nanosecond, None) => quote!{ TimestampMicros(arroyo_types::to_micros(v) as i64) },
(TimeUnit::Microsecond | TimeUnit::Nanosecond, Some(_)) => quote! { LocalTimestampMicros(arroyo_types::to_micros(v) as i64) },
(TimeUnit::Millisecond | TimeUnit::Second, None) => quote!{ TimestampMillis(arroyo_types::to_millis(v) as i64) },
(TimeUnit::Millisecond | TimeUnit::Second, Some(_)) => quote!{ LocalTimestampMillis(arroyo_types::to_millis(v) as i64) },
}
}
Date32 | Date64 => quote! { Date(arroyo_types::days_since_epoch(v)) },
Time32(_) => todo!("time32 is not supported"),
Time64(_) => todo!("time64 is not supported"),
Duration(_) => todo!("duration is not supported"),
Interval(_) => todo!("interval is not supported"),
Binary | FixedSizeBinary(_) | LargeBinary => quote!{ Bytes(v.clone()) },
Utf8 | LargeUtf8 => quote!{ String(v.clone()) },
List(t) | FixedSizeList(t, _) | LargeList(t) => {
todo!("lists are not supported")
}
Struct(fields) => unreachable!("typedefs should not contain structs"),
Union(_, _) => unimplemented!("unions are not supported"),
Dictionary(_, _) => unimplemented!("dictionaries are not supported"),
Decimal128(_, _) => unimplemented!("decimal128 is not supported"),
Decimal256(_, _) => unimplemented!("decimal256 is not supported"),
Map(_, _) => unimplemented!("maps are not supported"),
RunEndEncoded(_, _) => unimplemented!("run end encoded is not supported"),
};

if *nullable {
quote!{
Union(#value.is_some() as u32, Box::new(#value.map(|v| #inner).unwrap_or_else(Null)))
}
} else {
quote!{let v = #value; #inner}
}
}
}
}

fn arrow_to_avro(name: &str, dt: &DataType) -> Schema {
match dt {
DataType::Null => unreachable!("null fields are not supported"),
DataType::Boolean => Schema::Boolean,
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => Schema::Int,
DataType::Int64 | DataType::UInt32 => Schema::Long,
DataType::UInt64 => Schema::Fixed(FixedSchema {
name: Name::from("dev.arroyo.types.uint64"),
aliases: None,
doc: None,
size: 8,
attributes: Default::default(),
}),
DataType::Float16 | DataType::Float32 => Schema::Float,
DataType::Float64 => Schema::Double,
DataType::Timestamp(t, tz) => {
match (t, tz) {
(TimeUnit::Microsecond | TimeUnit::Nanosecond, None) => Schema::TimestampMicros,
(TimeUnit::Microsecond | TimeUnit::Nanosecond, Some(_)) => Schema::LocalTimestampMicros,
(TimeUnit::Millisecond | TimeUnit::Second, None) => Schema::TimestampMillis,
(TimeUnit::Millisecond | TimeUnit::Second, Some(_)) => Schema::LocalTimestampMillis,
}
}
DataType::Date32 | DataType::Date64 => Schema::Date,
DataType::Time64(_) | DataType::Time32(_) => { todo!("time is not supported") },
DataType::Duration(_) => todo!("duration is not supported"),
DataType::Interval(_) => todo!("interval is not supported"),
DataType::Binary | DataType::FixedSizeBinary(_) | DataType::LargeBinary => Schema::Bytes,
DataType::Utf8 | DataType::LargeUtf8 => Schema::String,
DataType::List(t) | DataType::FixedSizeList(t, _) | DataType::LargeList(t) => {
Schema::Array(Box::new(arrow_to_avro(name, t.data_type())))
}
DataType::Struct(fields) => {
arrow_to_avro_schema(name, fields)
}
DataType::Union(_, _) => unimplemented!("unions are not supported"),
DataType::Dictionary(_, _) => unimplemented!("dictionaries are not supported"),
DataType::Decimal128(_, _) => unimplemented!("decimal128 is not supported"),
DataType::Decimal256(_, _) => unimplemented!("decimal256 is not supported"),
DataType::Map(_, _) => unimplemented!("maps are not supported"),
DataType::RunEndEncoded(_, _) => unimplemented!("run end encoded is not supported"),
}
}

fn field_to_avro(index: usize, name: &str, field: &Field) -> RecordField {
let mut schema = arrow_to_avro(&format!("{}_{}", name, &field.name()), field.data_type());

if field.is_nullable() {
schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema]).unwrap());
}

RecordField {
name: field.name().clone(),
doc: None,
aliases: None,
default: None,
schema,
order: RecordFieldOrder::Ascending,
position: index,
custom_attributes: Default::default(),
}
}

pub fn arrow_to_avro_schema(name: &str, fields: &Fields) -> Schema {
let fields = fields
.iter()
.enumerate()
.map(|(i, f)| field_to_avro(i, name, &**f))
.collect();

Schema::Record(RecordSchema {
name: Name::from(name),
aliases: None,
doc: None,
fields,
lookup: Default::default(),
attributes: Default::default(),
})
}
33 changes: 32 additions & 1 deletion arroyo-sql/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{

use anyhow::Result;
use anyhow::{anyhow, bail};
use apache_avro::types::Value;
use arrow::datatypes::{DataType, IntervalMonthDayNanoType};
use arrow::{
array::Decimal128Array,
Expand All @@ -24,10 +25,11 @@ use arroyo_rpc::api_types::connections::{
};
use datafusion_common::ScalarValue;
use proc_macro2::{Ident, TokenStream};
use quote::quote;
use quote::{format_ident, quote};
use regex::Regex;
use syn::PathArguments::AngleBracketed;
use syn::{parse_quote, parse_str, GenericArgument, Type};
use crate::avro;

#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd)]
pub struct StructDef {
Expand Down Expand Up @@ -312,6 +314,31 @@ impl StructDef {
}
}

fn generate_avro_writer(&self) -> TokenStream {
let record_ident = format_ident!("record");
let fields: Vec<_> = self.fields.iter()
.map(|f| {
let name = f.name();
let field_ident = f.field_ident();
let serializer = avro::generate_serializer_item(
&record_ident, &field_ident, &f.data_type);
quote!{
record.put(#name, #serializer);
}
})
.collect();

quote! {
fn write_avro<W: std::io::Write>(&self, writer: &mut apache_avro::Writer<W>, schema: &apache_avro::Schema) {
let mut record = apache_avro::types::Record::new(schema).unwrap();

#(#fields )*;

writer.append(record).unwrap();
}
}
}

// generate a SchemaData impl but only for generated types
pub fn generate_schema_data(&self) -> Option<TokenStream> {
if !self.generated {
Expand Down Expand Up @@ -352,6 +379,8 @@ impl StructDef {

let reader_type = self.parquet_reader_type();

let avro_writer = self.generate_avro_writer();

Some(quote! {
impl arroyo_worker::SchemaData for #struct_type {
fn name() -> &'static str {
Expand All @@ -367,6 +396,8 @@ impl StructDef {
#to_raw_string
}

#avro_writer

fn iterator_from_record_batch(
record_batch: arrow_array::RecordBatch,
) -> anyhow::Result<Box<dyn Iterator<Item = Self> + Send>> {
Expand Down
9 changes: 9 additions & 0 deletions arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ pub fn from_nanos(ts: u128) -> SystemTime {
+ Duration::from_nanos((ts % 1_000_000_000) as u64)
}


// used for avro serialization -- returns the number of days since the UNIX EPOCH
pub fn days_since_epoch(time: SystemTime) -> i32 {
time.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.div_euclid(86400) as i32
}

pub fn string_to_map(s: &str) -> Option<HashMap<String, String>> {
if s.trim().is_empty() {
return Some(HashMap::new());
Expand Down
18 changes: 17 additions & 1 deletion arroyo-worker/src/formats/avro.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use apache_avro::types::{Value as AvroValue, Value};
use apache_avro::{from_avro_datum, Reader, Schema};
use apache_avro::{from_avro_datum, Reader, Schema, Writer};
use arroyo_rpc::formats::AvroFormat;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_types::UserError;
use serde::de::DeserializeOwned;
use serde_json::{json, Value as JsonValue};
use std::collections::HashMap;
use std::sync::Arc;
use apache_avro::schema::{FixedSchema, Name, RecordField, RecordFieldOrder, RecordSchema, UnionSchema};
use arrow::datatypes::{DataType, Field, Fields, TimeUnit};
use tokio::sync::Mutex;
use tracing::info;
use crate::SchemaData;

pub async fn deserialize_slice_avro<'a, T: DeserializeOwned>(
format: &AvroFormat,
Expand Down Expand Up @@ -91,6 +94,19 @@ pub async fn deserialize_slice_avro<'a, T: DeserializeOwned>(
}))
}

pub fn to_vec<T: SchemaData>(record: &T, format: &AvroFormat, schema: &Schema) -> Vec<u8> {
if format.embedded_schema {
let mut writer = Writer::new(schema, Vec::with_capacity(128));

let record = apache_avro::types::Record::new(schema).unwrap();
}

todo!()

}



fn convert_float(f: f64) -> JsonValue {
match serde_json::Number::from_f64(f) {
Some(n) => JsonValue::Number(n),
Expand Down
6 changes: 5 additions & 1 deletion arroyo-worker/src/formats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ pub struct DataSerializer<T: SchemaData> {
kafka_schema: Value,
#[allow(unused)]
json_schema: Value,
avro_schema: apache_avro::schema::Schema,
format: Format,
_t: PhantomData<T>,
}
Expand All @@ -176,6 +177,7 @@ impl<T: SchemaData> DataSerializer<T> {
Self {
kafka_schema: json::arrow_to_kafka_json(T::name(), T::schema().fields()),
json_schema: json::arrow_to_json_schema(T::schema().fields()),
avro_schema: avro::arrow_to_avro_schema(T::name(), T::schema().fields()),
format,
_t: PhantomData,
}
Expand Down Expand Up @@ -204,7 +206,9 @@ impl<T: SchemaData> DataSerializer<T> {
};
Some(writer)
}
Format::Avro(_) => todo!(),
Format::Avro(f) => {
Some(avro::to_vec(record, f, &self.avro_schema))
},
Format::Parquet(_) => todo!(),
Format::RawString(_) => record.to_raw_string(),
}
Expand Down
16 changes: 16 additions & 0 deletions arroyo-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Deserializer, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display, Formatter};
use std::io::Write;
use std::process::exit;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use apache_avro::Writer;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -78,6 +80,8 @@ pub trait SchemaData: Data + Serialize + DeserializeOwned {
/// a None value, and should panic if they do not support raw strings (which
/// indicates a miscompilation).
fn to_raw_string(&self) -> Option<Vec<u8>>;

fn write_avro<W: Write>(&self, writer: &mut apache_avro::Writer<W>, schema: &apache_avro::Schema);
}

impl<T: SchemaData> SchemaData for Debezium<T> {
Expand Down Expand Up @@ -108,6 +112,10 @@ impl<T: SchemaData> SchemaData for Debezium<T> {
fn to_raw_string(&self) -> Option<Vec<u8>> {
unimplemented!("debezium data cannot be written as a raw string");
}

fn write_avro<W: Write>(&self, writer: &mut Writer<W>, schema: &apache_avro::Schema) {
todo!()
}
}

impl SchemaData for RawJson {
Expand Down Expand Up @@ -136,6 +144,10 @@ impl SchemaData for RawJson {
.clone(),
}))
}

fn write_avro<W: Write>(&self, writer: &mut Writer<W>, schema: &apache_avro::Schema) {
todo!()
}
}

struct RawJsonIterator {
Expand Down Expand Up @@ -171,6 +183,10 @@ impl SchemaData for () {
fn to_raw_string(&self) -> Option<Vec<u8>> {
None
}

fn write_avro<W: Write>(&self, _: &mut Writer<W>, _: &apache_avro::Schema) {
// no-op
}
}

// A custom deserializer for json, that takes a json::Value and reserializes it as a string
Expand Down

0 comments on commit 7cc876b

Please sign in to comment.