Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
arroyo-connectors: Test code-gen for JSON Schema and Avro.
Browse files Browse the repository at this point in the history
jacksonrnewhouse committed Nov 13, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent bd9e895 commit 27abc31
Showing 8 changed files with 246 additions and 4 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions arroyo-sql-macro/Cargo.toml
Original file line number Diff line number Diff line change
@@ -7,13 +7,18 @@ edition = "2021"
proc-macro = true

[dependencies]
arroyo-api = { path = "../arroyo-api" }
arroyo-sql = { path = "../arroyo-sql" }
arroyo-datastream = { path = "../arroyo-datastream" }
arroyo-connectors = { path = "../arroyo-connectors" }
arroyo-rpc = { path = "../arroyo-rpc" }

anyhow = "1"

syn = {version = "2", features = ["full"]}
quote = "1.0"
proc-macro2 = "1"
serde_json = "1"
apache-avro = "0.16.0"

runtime-macros-derive = "0.6.0"
218 changes: 218 additions & 0 deletions arroyo-sql-macro/src/connectors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
use arroyo_connectors::{
kafka::{KafkaConfig, KafkaConnector, KafkaTable, ReadMode},
Connection, Connector,
};
use arroyo_rpc::{
api_types::connections::{ConnectionSchema, SchemaDefinition},
formats::{AvroFormat, Format, JsonFormat, TimestampFormat},
};

use anyhow::Result;
use arroyo_sql::{avro::convert_avro_schema, json_schema::convert_json_schema};

pub fn get_json_schema_source() -> Result<Connection> {
let json_schema = r##"
{
"type": "object",
"title": "ksql.orders",
"properties": {
"itemid": {
"type": "string",
"connect.index": 2
},
"address": {
"type": "object",
"title": "ksql.address",
"connect.index": 4,
"properties": {
"zipcode": {
"type": "integer",
"connect.index": 2,
"connect.type": "int64"
},
"city": {
"type": "string",
"connect.index": 0
},
"state": {
"type": "string",
"connect.index": 1
},
"nested": {
"type": "object",
"properties": {
"a": {
"type": "integer"
}
}
}
}
},
"orderid": {
"type": "integer",
"connect.index": 1,
"connect.type": "int32"
},
"orderunits": {
"type": "number",
"connect.index": 3,
"connect.type": "float64"
},
"ordertime": {
"type": "string",
"format": "date-time",
"connect.index": 0,
"connect.type": "timestamp"
}
}
} "##;

let definition = SchemaDefinition::JsonSchema(json_schema.to_string());
let struct_fields = convert_json_schema("kafka_json_schema", json_schema).unwrap();
let connection_schema = ConnectionSchema::try_new(
Some(Format::Json(JsonFormat {
confluent_schema_registry: false,
include_schema: false,
debezium: false,
unstructured: false,
timestamp_format: TimestampFormat::RFC3339,
})),
None,
None,
struct_fields
.into_iter()
.map(|field| field.try_into().unwrap())
.collect(),
Some(definition),
)?;
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".try_into().unwrap(),
schema_registry: None,
};
let table = KafkaTable {
topic: "test_topic".to_string(),
type_: arroyo_connectors::kafka::TableType::Source {
group_id: None,
offset: arroyo_connectors::kafka::SourceOffset::Earliest,
read_mode: Some(ReadMode::ReadUncommitted),
},
};
KafkaConnector {}.from_config(
Some(2),
"kafka_json_schema",
config,
table,
Some(&connection_schema),
)
}

pub fn get_avro_source() -> Result<Connection> {
let avro_schema = r#"
{
"connect.name": "pizza_orders.pizza_orders",
"fields": [
{
"name": "store_id",
"type": "int"
},
{
"name": "store_order_id",
"type": "int"
},
{
"name": "coupon_code",
"type": "int"
},
{
"name": "date",
"type": {
"connect.name": "org.apache.kafka.connect.data.Date",
"connect.version": 1,
"logicalType": "date",
"type": "int"
}
},
{
"name": "status",
"type": "string"
},
{
"name": "order_lines",
"type": {
"items": {
"connect.name": "pizza_orders.order_line",
"fields": [
{
"name": "product_id",
"type": "int"
},
{
"name": "category",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "unit_price",
"type": "double"
},
{
"name": "net_price",
"type": "double"
}
],
"name": "order_line",
"type": "record"
},
"type": "array"
}
}
],
"name": "pizza_orders",
"namespace": "pizza_orders",
"type": "record"
}"#;
let definition = SchemaDefinition::AvroSchema(avro_schema.to_string());
let struct_fields = convert_avro_schema("kafka_avro_schema", avro_schema).unwrap();
let mut format = AvroFormat::new(true, false, false);
format.add_reader_schema(apache_avro::Schema::parse_str(avro_schema).unwrap());
let connection_schema = ConnectionSchema::try_new(
Some(Format::Avro(format)),
None,
None,
struct_fields
.into_iter()
.map(|field| field.try_into().unwrap())
.collect(),
Some(definition),
)?;
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".try_into().unwrap(),
schema_registry: None,
};
let table = KafkaTable {
topic: "test_topic".to_string(),
type_: arroyo_connectors::kafka::TableType::Source {
group_id: None,
offset: arroyo_connectors::kafka::SourceOffset::Earliest,
read_mode: Some(ReadMode::ReadUncommitted),
},
};
KafkaConnector {}.from_config(
Some(3),
"kafka_avro_schema",
config,
table,
Some(&connection_schema),
)
}

#[test]
pub fn get_avro() -> Result<()> {
get_avro_source().unwrap();
Ok(())
}
4 changes: 4 additions & 0 deletions arroyo-sql-macro/src/lib.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ use quote::{quote, ToTokens};
use syn::parse::{Parse, ParseStream};
use syn::{parse_str, Expr, LitInt, LitStr, Token};

mod connectors;

/// This macro is used to generate a test function for a single test case.
/// Used in the `arroyo-sql-testing` crate.
///
@@ -153,6 +155,8 @@ fn get_pipeline_module(
.unwrap();

schema_provider.add_connector_table(nexmark);
schema_provider.add_connector_table(connectors::get_json_schema_source().unwrap());
schema_provider.add_connector_table(connectors::get_avro_source().unwrap());

let file = syn::parse_file(&udfs.unwrap_or_default()).unwrap();
for item in file.items.into_iter() {
1 change: 1 addition & 0 deletions arroyo-sql-testing/Cargo.toml
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ version = "0.7.0"
edition = "2021"

[features]
default = ["integration-tests"]
integration-tests = []


10 changes: 10 additions & 0 deletions arroyo-sql-testing/src/full_query_tests.rs
Original file line number Diff line number Diff line change
@@ -471,3 +471,13 @@ INSERT INTO non_updating_sink
select distinct(bid.url)
from nexmark;
"}

full_pipeline_codegen! {
"kafka_json_schemas",
"SELECT * FROM kafka_json_schema"
}

full_pipeline_codegen! {
"kafka_avro_source",
"SELECT * FROM kafka_avro_schema"
}
2 changes: 1 addition & 1 deletion arroyo-sql/src/avro.rs
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ pub fn get_defs(name: &str, schema: &str) -> anyhow::Result<String> {
let mod_ident: Ident = syn::parse_str(name).unwrap();
Ok(quote! {
mod #mod_ident {
use crate::*;
use super::*;
#(#defs)
*
}
6 changes: 3 additions & 3 deletions arroyo-sql/src/json_schema.rs
Original file line number Diff line number Diff line change
@@ -85,11 +85,11 @@ pub fn get_defs(source_name: &str, schema: &str) -> Result<String, String> {
if *nullable {
Some(quote!{
#[serde(default)]
#[serde(deserialize_with = "arroyo_worker::deserialize_rfc3339_datetime_opt")]
#[serde(with = "arroyo_worker::formats::json::opt_timestamp_as_rfc3339")]
})
} else {
Some(quote! {
#[serde(deserialize_with = "arroyo_worker::deserialize_rfc3339_datetime")]
#[serde(with = "arroyo_worker::formats::json::timestamp_as_rfc3339")]
})
}

@@ -150,7 +150,7 @@ pub fn get_defs(source_name: &str, schema: &str) -> Result<String, String> {
add_defs(source_name, ROOT_NAME, &fields, &mut defs);

Ok(format!(
"mod {} {{\nuse crate::*;\n{}\n}}",
"mod {} {{\nuse super::*;\n{}\n}}",
source_name,
defs.join("\n")
))

0 comments on commit 27abc31

Please sign in to comment.