Skip to content

Commit

Permalink
arroyo-connectors: Test code-gen for JSON Schema and Avro.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse authored and Jackson Newhouse committed Nov 14, 2023
1 parent 0eb6114 commit fb4692d
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 4 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions arroyo-sql-macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ edition = "2021"
proc-macro = true

[dependencies]
arroyo-api = { path = "../arroyo-api" }
arroyo-sql = { path = "../arroyo-sql" }
arroyo-datastream = { path = "../arroyo-datastream" }
arroyo-connectors = { path = "../arroyo-connectors" }
arroyo-rpc = { path = "../arroyo-rpc" }

anyhow = "1"

syn = {version = "2", features = ["full"]}
quote = "1.0"
proc-macro2 = "1"
serde_json = "1"
apache-avro = "0.16.0"

runtime-macros-derive = "0.6.0"
218 changes: 218 additions & 0 deletions arroyo-sql-macro/src/connectors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
use arroyo_connectors::{
kafka::{KafkaConfig, KafkaConnector, KafkaTable, ReadMode},
Connection, Connector,
};
use arroyo_rpc::{
api_types::connections::{ConnectionSchema, SchemaDefinition},
formats::{AvroFormat, Format, JsonFormat, TimestampFormat},
};

use anyhow::Result;
use arroyo_sql::{avro::convert_avro_schema, json_schema::convert_json_schema};

pub fn get_json_schema_source() -> Result<Connection> {
let json_schema = r##"
{
"type": "object",
"title": "ksql.orders",
"properties": {
"itemid": {
"type": "string",
"connect.index": 2
},
"address": {
"type": "object",
"title": "ksql.address",
"connect.index": 4,
"properties": {
"zipcode": {
"type": "integer",
"connect.index": 2,
"connect.type": "int64"
},
"city": {
"type": "string",
"connect.index": 0
},
"state": {
"type": "string",
"connect.index": 1
},
"nested": {
"type": "object",
"properties": {
"a": {
"type": "integer"
}
}
}
}
},
"orderid": {
"type": "integer",
"connect.index": 1,
"connect.type": "int32"
},
"orderunits": {
"type": "number",
"connect.index": 3,
"connect.type": "float64"
},
"ordertime": {
"type": "string",
"format": "date-time",
"connect.index": 0,
"connect.type": "timestamp"
}
}
} "##;

let definition = SchemaDefinition::JsonSchema(json_schema.to_string());
let struct_fields = convert_json_schema("kafka_json_schema", json_schema).unwrap();
let connection_schema = ConnectionSchema::try_new(
Some(Format::Json(JsonFormat {
confluent_schema_registry: false,
include_schema: false,
debezium: false,
unstructured: false,
timestamp_format: TimestampFormat::RFC3339,
})),
None,
None,
struct_fields
.into_iter()
.map(|field| field.try_into().unwrap())
.collect(),
Some(definition),
)?;
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".try_into().unwrap(),
schema_registry: None,
};
let table = KafkaTable {
topic: "test_topic".to_string(),
type_: arroyo_connectors::kafka::TableType::Source {
group_id: None,
offset: arroyo_connectors::kafka::SourceOffset::Earliest,
read_mode: Some(ReadMode::ReadUncommitted),
},
};
KafkaConnector {}.from_config(
Some(2),
"kafka_json_schema",
config,
table,
Some(&connection_schema),
)
}

pub fn get_avro_source() -> Result<Connection> {
let avro_schema = r#"
{
"connect.name": "pizza_orders.pizza_orders",
"fields": [
{
"name": "store_id",
"type": "int"
},
{
"name": "store_order_id",
"type": "int"
},
{
"name": "coupon_code",
"type": "int"
},
{
"name": "date",
"type": {
"connect.name": "org.apache.kafka.connect.data.Date",
"connect.version": 1,
"logicalType": "date",
"type": "int"
}
},
{
"name": "status",
"type": "string"
},
{
"name": "order_lines",
"type": {
"items": {
"connect.name": "pizza_orders.order_line",
"fields": [
{
"name": "product_id",
"type": "int"
},
{
"name": "category",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "unit_price",
"type": "double"
},
{
"name": "net_price",
"type": "double"
}
],
"name": "order_line",
"type": "record"
},
"type": "array"
}
}
],
"name": "pizza_orders",
"namespace": "pizza_orders",
"type": "record"
}"#;
let definition = SchemaDefinition::AvroSchema(avro_schema.to_string());
let struct_fields = convert_avro_schema("kafka_avro_schema", avro_schema).unwrap();
let mut format = AvroFormat::new(true, false, false);
format.add_reader_schema(apache_avro::Schema::parse_str(avro_schema).unwrap());
let connection_schema = ConnectionSchema::try_new(
Some(Format::Avro(format)),
None,
None,
struct_fields
.into_iter()
.map(|field| field.try_into().unwrap())
.collect(),
Some(definition),
)?;
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".try_into().unwrap(),
schema_registry: None,
};
let table = KafkaTable {
topic: "test_topic".to_string(),
type_: arroyo_connectors::kafka::TableType::Source {
group_id: None,
offset: arroyo_connectors::kafka::SourceOffset::Earliest,
read_mode: Some(ReadMode::ReadUncommitted),
},
};
KafkaConnector {}.from_config(
Some(3),
"kafka_avro_schema",
config,
table,
Some(&connection_schema),
)
}

#[test]
pub fn get_avro() -> Result<()> {
get_avro_source().unwrap();
Ok(())
}
4 changes: 4 additions & 0 deletions arroyo-sql-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use quote::{quote, ToTokens};
use syn::parse::{Parse, ParseStream};
use syn::{parse_str, Expr, LitInt, LitStr, Token};

mod connectors;

/// This macro is used to generate a test function for a single test case.
/// Used in the `arroyo-sql-testing` crate.
///
Expand Down Expand Up @@ -153,6 +155,8 @@ fn get_pipeline_module(
.unwrap();

schema_provider.add_connector_table(nexmark);
schema_provider.add_connector_table(connectors::get_json_schema_source().unwrap());
schema_provider.add_connector_table(connectors::get_avro_source().unwrap());

let file = syn::parse_file(&udfs.unwrap_or_default()).unwrap();
for item in file.items.into_iter() {
Expand Down
1 change: 1 addition & 0 deletions arroyo-sql-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.7.0"
edition = "2021"

[features]
default = ["integration-tests"]
integration-tests = []


Expand Down
10 changes: 10 additions & 0 deletions arroyo-sql-testing/src/full_query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,13 @@ INSERT INTO non_updating_sink
select distinct(bid.url)
from nexmark;
"}

full_pipeline_codegen! {
"kafka_json_schemas",
"SELECT * FROM kafka_json_schema"
}

full_pipeline_codegen! {
"kafka_avro_source",
"SELECT * FROM kafka_avro_schema"
}
2 changes: 1 addition & 1 deletion arroyo-sql/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn get_defs(name: &str, schema: &str) -> anyhow::Result<String> {
let mod_ident: Ident = syn::parse_str(name).unwrap();
Ok(quote! {
mod #mod_ident {
use crate::*;
use super::*;
#(#defs)
*
}
Expand Down
6 changes: 3 additions & 3 deletions arroyo-sql/src/json_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ pub fn get_defs(source_name: &str, schema: &str) -> Result<String, String> {
if *nullable {
Some(quote!{
#[serde(default)]
#[serde(deserialize_with = "arroyo_worker::deserialize_rfc3339_datetime_opt")]
#[serde(with = "arroyo_worker::formats::json::opt_timestamp_as_rfc3339")]
})
} else {
Some(quote! {
#[serde(deserialize_with = "arroyo_worker::deserialize_rfc3339_datetime")]
#[serde(with = "arroyo_worker::formats::json::timestamp_as_rfc3339")]
})
}

Expand Down Expand Up @@ -150,7 +150,7 @@ pub fn get_defs(source_name: &str, schema: &str) -> Result<String, String> {
add_defs(source_name, ROOT_NAME, &fields, &mut defs);

Ok(format!(
"mod {} {{\nuse crate::*;\n{}\n}}",
"mod {} {{\nuse super::*;\n{}\n}}",
source_name,
defs.join("\n")
))
Expand Down

0 comments on commit fb4692d

Please sign in to comment.