From 27abc31aaec6439823a06ec863bdfbbdb5a758ba Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Mon, 13 Nov 2023 15:34:46 -0800 Subject: [PATCH] arroyo-connectors: Test code-gen for JSON Schema and Avro. --- Cargo.lock | 4 + arroyo-sql-macro/Cargo.toml | 5 + arroyo-sql-macro/src/connectors.rs | 218 +++++++++++++++++++++ arroyo-sql-macro/src/lib.rs | 4 + arroyo-sql-testing/Cargo.toml | 1 + arroyo-sql-testing/src/full_query_tests.rs | 10 + arroyo-sql/src/avro.rs | 2 +- arroyo-sql/src/json_schema.rs | 6 +- 8 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 arroyo-sql-macro/src/connectors.rs diff --git a/Cargo.lock b/Cargo.lock index 7f7f30513..031648bee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -754,8 +754,12 @@ dependencies = [ name = "arroyo-sql-macro" version = "0.7.0" dependencies = [ + "anyhow", + "apache-avro", + "arroyo-api", "arroyo-connectors", "arroyo-datastream", + "arroyo-rpc", "arroyo-sql", "proc-macro2", "quote", diff --git a/arroyo-sql-macro/Cargo.toml b/arroyo-sql-macro/Cargo.toml index 1ba3e999e..4a0615e5e 100644 --- a/arroyo-sql-macro/Cargo.toml +++ b/arroyo-sql-macro/Cargo.toml @@ -7,13 +7,18 @@ edition = "2021" proc-macro = true [dependencies] +arroyo-api = { path = "../arroyo-api" } arroyo-sql = { path = "../arroyo-sql" } arroyo-datastream = { path = "../arroyo-datastream" } arroyo-connectors = { path = "../arroyo-connectors" } +arroyo-rpc = { path = "../arroyo-rpc" } + +anyhow = "1" syn = {version = "2", features = ["full"]} quote = "1.0" proc-macro2 = "1" serde_json = "1" +apache-avro = "0.16.0" runtime-macros-derive = "0.6.0" diff --git a/arroyo-sql-macro/src/connectors.rs b/arroyo-sql-macro/src/connectors.rs new file mode 100644 index 000000000..c5cdd3ea4 --- /dev/null +++ b/arroyo-sql-macro/src/connectors.rs @@ -0,0 +1,218 @@ +use arroyo_connectors::{ + kafka::{KafkaConfig, KafkaConnector, KafkaTable, ReadMode}, + Connection, Connector, +}; +use arroyo_rpc::{ + api_types::connections::{ConnectionSchema, SchemaDefinition}, + formats::{AvroFormat, Format, JsonFormat, TimestampFormat}, +}; + +use anyhow::Result; +use arroyo_sql::{avro::convert_avro_schema, json_schema::convert_json_schema}; + +pub fn get_json_schema_source() -> Result { + let json_schema = r##" + { + "type": "object", + "title": "ksql.orders", + "properties": { + "itemid": { + "type": "string", + "connect.index": 2 + }, + "address": { + "type": "object", + "title": "ksql.address", + "connect.index": 4, + "properties": { + "zipcode": { + "type": "integer", + "connect.index": 2, + "connect.type": "int64" + }, + "city": { + "type": "string", + "connect.index": 0 + }, + "state": { + "type": "string", + "connect.index": 1 + }, + "nested": { + "type": "object", + "properties": { + "a": { + "type": "integer" + } + } + } + } + }, + "orderid": { + "type": "integer", + "connect.index": 1, + "connect.type": "int32" + }, + "orderunits": { + "type": "number", + "connect.index": 3, + "connect.type": "float64" + }, + "ordertime": { + "type": "string", + "format": "date-time", + "connect.index": 0, + "connect.type": "timestamp" + } + } + } "##; + + let definition = SchemaDefinition::JsonSchema(json_schema.to_string()); + let struct_fields = convert_json_schema("kafka_json_schema", json_schema).unwrap(); + let connection_schema = ConnectionSchema::try_new( + Some(Format::Json(JsonFormat { + confluent_schema_registry: false, + include_schema: false, + debezium: false, + unstructured: false, + timestamp_format: TimestampFormat::RFC3339, + })), + None, + None, + struct_fields + .into_iter() + .map(|field| field.try_into().unwrap()) + .collect(), + Some(definition), + )?; + let config = KafkaConfig { + authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {}, + bootstrap_servers: "localhost:9092".try_into().unwrap(), + schema_registry: None, + }; + let table = KafkaTable { + topic: "test_topic".to_string(), + type_: arroyo_connectors::kafka::TableType::Source { + group_id: None, + offset: arroyo_connectors::kafka::SourceOffset::Earliest, + read_mode: Some(ReadMode::ReadUncommitted), + }, + }; + KafkaConnector {}.from_config( + Some(2), + "kafka_json_schema", + config, + table, + Some(&connection_schema), + ) +} + +pub fn get_avro_source() -> Result { + let avro_schema = r#" + { +"connect.name": "pizza_orders.pizza_orders", +"fields": [ +{ + "name": "store_id", + "type": "int" +}, +{ + "name": "store_order_id", + "type": "int" +}, +{ + "name": "coupon_code", + "type": "int" +}, +{ + "name": "date", + "type": { + "connect.name": "org.apache.kafka.connect.data.Date", + "connect.version": 1, + "logicalType": "date", + "type": "int" + } +}, +{ + "name": "status", + "type": "string" +}, +{ + "name": "order_lines", + "type": { + "items": { + "connect.name": "pizza_orders.order_line", + "fields": [ + { + "name": "product_id", + "type": "int" + }, + { + "name": "category", + "type": "string" + }, + { + "name": "quantity", + "type": "int" + }, + { + "name": "unit_price", + "type": "double" + }, + { + "name": "net_price", + "type": "double" + } + ], + "name": "order_line", + "type": "record" + }, + "type": "array" + } +} +], +"name": "pizza_orders", +"namespace": "pizza_orders", +"type": "record" +}"#; + let definition = SchemaDefinition::AvroSchema(avro_schema.to_string()); + let struct_fields = convert_avro_schema("kafka_avro_schema", avro_schema).unwrap(); + let mut format = AvroFormat::new(true, false, false); + format.add_reader_schema(apache_avro::Schema::parse_str(avro_schema).unwrap()); + let connection_schema = ConnectionSchema::try_new( + Some(Format::Avro(format)), + None, + None, + struct_fields + .into_iter() + .map(|field| field.try_into().unwrap()) + .collect(), + Some(definition), + )?; + let config = KafkaConfig { + authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {}, + bootstrap_servers: "localhost:9092".try_into().unwrap(), + schema_registry: None, + }; + let table = KafkaTable { + topic: "test_topic".to_string(), + type_: arroyo_connectors::kafka::TableType::Source { + group_id: None, + offset: arroyo_connectors::kafka::SourceOffset::Earliest, + read_mode: Some(ReadMode::ReadUncommitted), + }, + }; + KafkaConnector {}.from_config( + Some(3), + "kafka_avro_schema", + config, + table, + Some(&connection_schema), + ) +} + +#[test] +pub fn get_avro() -> Result<()> { + get_avro_source().unwrap(); + Ok(()) +} diff --git a/arroyo-sql-macro/src/lib.rs b/arroyo-sql-macro/src/lib.rs index e42c9ecf7..80742c92e 100644 --- a/arroyo-sql-macro/src/lib.rs +++ b/arroyo-sql-macro/src/lib.rs @@ -8,6 +8,8 @@ use quote::{quote, ToTokens}; use syn::parse::{Parse, ParseStream}; use syn::{parse_str, Expr, LitInt, LitStr, Token}; +mod connectors; + /// This macro is used to generate a test function for a single test case. /// Used in the `arroyo-sql-testing` crate. /// @@ -153,6 +155,8 @@ fn get_pipeline_module( .unwrap(); schema_provider.add_connector_table(nexmark); + schema_provider.add_connector_table(connectors::get_json_schema_source().unwrap()); + schema_provider.add_connector_table(connectors::get_avro_source().unwrap()); let file = syn::parse_file(&udfs.unwrap_or_default()).unwrap(); for item in file.items.into_iter() { diff --git a/arroyo-sql-testing/Cargo.toml b/arroyo-sql-testing/Cargo.toml index 288182abe..ba7977b1d 100644 --- a/arroyo-sql-testing/Cargo.toml +++ b/arroyo-sql-testing/Cargo.toml @@ -4,6 +4,7 @@ version = "0.7.0" edition = "2021" [features] +default = ["integration-tests"] integration-tests = [] diff --git a/arroyo-sql-testing/src/full_query_tests.rs b/arroyo-sql-testing/src/full_query_tests.rs index 2f45fd902..1b53c847d 100644 --- a/arroyo-sql-testing/src/full_query_tests.rs +++ b/arroyo-sql-testing/src/full_query_tests.rs @@ -471,3 +471,13 @@ INSERT INTO non_updating_sink select distinct(bid.url) from nexmark; "} + +full_pipeline_codegen! { + "kafka_json_schemas", + "SELECT * FROM kafka_json_schema" +} + +full_pipeline_codegen! { + "kafka_avro_source", + "SELECT * FROM kafka_avro_schema" +} diff --git a/arroyo-sql/src/avro.rs b/arroyo-sql/src/avro.rs index 7e04fb3a4..badbbf280 100644 --- a/arroyo-sql/src/avro.rs +++ b/arroyo-sql/src/avro.rs @@ -39,7 +39,7 @@ pub fn get_defs(name: &str, schema: &str) -> anyhow::Result { let mod_ident: Ident = syn::parse_str(name).unwrap(); Ok(quote! { mod #mod_ident { - use crate::*; + use super::*; #(#defs) * } diff --git a/arroyo-sql/src/json_schema.rs b/arroyo-sql/src/json_schema.rs index 55b735ad7..21e6c8492 100644 --- a/arroyo-sql/src/json_schema.rs +++ b/arroyo-sql/src/json_schema.rs @@ -85,11 +85,11 @@ pub fn get_defs(source_name: &str, schema: &str) -> Result { if *nullable { Some(quote!{ #[serde(default)] - #[serde(deserialize_with = "arroyo_worker::deserialize_rfc3339_datetime_opt")] + #[serde(with = "arroyo_worker::formats::json::opt_timestamp_as_rfc3339")] }) } else { Some(quote! { - #[serde(deserialize_with = "arroyo_worker::deserialize_rfc3339_datetime")] + #[serde(with = "arroyo_worker::formats::json::timestamp_as_rfc3339")] }) } @@ -150,7 +150,7 @@ pub fn get_defs(source_name: &str, schema: &str) -> Result { add_defs(source_name, ROOT_NAME, &fields, &mut defs); Ok(format!( - "mod {} {{\nuse crate::*;\n{}\n}}", + "mod {} {{\nuse super::*;\n{}\n}}", source_name, defs.join("\n") ))