diff --git a/Cargo.lock b/Cargo.lock index 89bde32b2..1dc45307b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,6 +156,29 @@ dependencies = [ "backtrace", ] +[[package]] +name = "apache-avro" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb7c683b2f8f40970b70e39ff8be514c95b96fcb9c4af87e1ed2cb2e10801a0" +dependencies = [ + "digest 0.10.7", + "lazy_static", + "libflate 2.0.0", + "log", + "num-bigint", + "quad-rand", + "rand", + "regex-lite", + "serde", + "serde_json", + "strum 0.25.0", + "strum_macros 0.25.2", + "thiserror", + "typed-builder", + "uuid", +] + [[package]] name = "arbitrary" version = "1.3.0" @@ -406,6 +429,7 @@ name = "arroyo-api" version = "0.7.0" dependencies = [ "anyhow", + "apache-avro", "argon2", "arrow", "arrow-schema", @@ -419,7 +443,7 @@ dependencies = [ "async-trait", "axum", "axum-extra", - "base64 0.21.4", + "base64 0.21.5", "bincode 2.0.0-rc.3", "chrono", "cornucopia", @@ -636,15 +660,20 @@ name = "arroyo-rpc" version = "0.7.0" dependencies = [ "anyhow", + "apache-avro", "arroyo-types", + "async-trait", "bincode 2.0.0-rc.3", + "log", "nanoid", "prost", + "reqwest", "serde", "serde_json", "tokio", "tonic", "tonic-build", + "tracing", "utoipa", ] @@ -679,6 +708,7 @@ name = "arroyo-sql" version = "0.7.0" dependencies = [ "anyhow", + "apache-avro", "arrow", "arrow-schema", "arroyo-connectors", @@ -806,6 +836,7 @@ name = "arroyo-worker" version = "0.7.0" dependencies = [ "anyhow", + "apache-avro", "arrow", "arrow-array", "arroyo-macro", @@ -1057,9 +1088,9 @@ checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -1392,6 +1423,7 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", + "axum-macros", "bitflags 1.3.2", "bytes", "futures-util", @@ -1455,6 +1487,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.33", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1510,9 +1554,9 @@ checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "base64ct" @@ -1628,7 +1672,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f03db470b3c0213c47e978da93200259a1eb4dae2e5512cba9955e2b540a6fc6" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bollard-stubs", "bytes", "futures-core", @@ -2014,6 +2058,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cornucopia" version = "0.9.0" @@ -2406,6 +2459,12 @@ dependencies = [ "syn 2.0.33", ] +[[package]] +name = "dary_heap" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" + [[package]] name = "dashmap" version = "5.5.3" @@ -2559,7 +2618,7 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-schema", - "base64 0.21.4", + "base64 0.21.5", "blake2", "blake3", "chrono", @@ -3093,7 +3152,7 @@ dependencies = [ "async-lock", "async-rwlock", "async-trait", - "base64 0.21.4", + "base64 0.21.5", "bytes", "cfg-if", "chrono", @@ -3144,7 +3203,7 @@ checksum = "cd0337e771c38978625941b31ebce79b708c9252744f23039b9ff0434e251e02" dependencies = [ "anyhow", "async-trait", - "base64 0.21.4", + "base64 0.21.5", "bytes", "bytesize", "derive_builder", @@ -3677,7 +3736,7 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "headers-core", "http", @@ -4265,7 +4324,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd990069640f9db34b3b0f7a1afc62a05ffaa3be9b66aa3c313f58346df7f788" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "chrono", "http", @@ -4511,7 +4570,20 @@ checksum = "5ff4ae71b685bbad2f2f391fe74f6b7659a34871c08b210fdc039e43bee07d18" dependencies = [ "adler32", "crc32fast", - "libflate_lz77", + "libflate_lz77 1.2.0", +] + +[[package]] +name = "libflate" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7d5654ae1795afc7ff76f4365c2c8791b0feb18e8996a96adad8ffd7c3b2bf" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77 2.0.0", ] [[package]] @@ -4523,6 +4595,17 @@ dependencies = [ "rle-decode-fast", ] +[[package]] +name = "libflate_lz77" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be5f52fb8c451576ec6b79d3f4deb327398bc05bbdbd99021a6e77a4c855d524" +dependencies = [ + "core2", + "hashbrown 0.13.2", + "rle-decode-fast", +] + [[package]] name = "libm" version = "0.2.7" @@ -5097,7 +5180,7 @@ version = "0.7.1" source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=object_store/put_part_api#612c9484547d3274571ce932260a75dc6413d6d1" dependencies = [ "async-trait", - "base64 0.21.4", + "base64 0.21.5", "bytes", "chrono", "futures", @@ -5340,7 +5423,7 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "base64 0.21.4", + "base64 0.21.5", "brotli", "bytes", "chrono", @@ -5603,7 +5686,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "byteorder", "bytes", "fallible-iterator", @@ -5867,7 +5950,7 @@ checksum = "ac8a53ce01af1087eaeee6ce7c4fbf50ea4040ab1825c0115c4bafa039644ba9" dependencies = [ "json", "libc", - "libflate", + "libflate 1.4.0", "log", "names", "prost", @@ -5889,6 +5972,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "quad-rand" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" + [[package]] name = "quanta" version = "0.11.1" @@ -6137,6 +6226,12 @@ dependencies = [ "regex-syntax 0.7.5", ] +[[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -6161,11 +6256,11 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "encoding_rs", "futures-core", @@ -6189,6 +6284,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", @@ -6508,7 +6604,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", ] [[package]] @@ -6724,9 +6820,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.106" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc66a619ed80bf7a0f6b17dd063a84b88f6dea1813737cf469aef1d081142c2" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "indexmap 2.0.0", "itoa", @@ -6847,7 +6943,7 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "chrono", "hex", "indexmap 1.9.3", @@ -7305,6 +7401,27 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "target-lexicon" version = "0.12.11" @@ -7728,7 +7845,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-trait", "axum", - "base64 0.21.4", + "base64 0.21.5", "bytes", "futures-core", "futures-util", @@ -7780,7 +7897,7 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21b00ec4842256d1fe0a46176e2ef5bc357664c66e7d91aff5a7d43d83a65f47" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "futures-core", "http", @@ -7820,7 +7937,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bitflags 2.4.0", "bytes", "futures-core", @@ -7854,11 +7971,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -7878,9 +7994,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", @@ -7889,9 +8005,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -7983,6 +8099,26 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typed-builder" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.33", +] + [[package]] name = "typenum" version = "1.16.0" @@ -8185,6 +8321,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ "getrandom", + "serde", ] [[package]] @@ -8408,7 +8545,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f58ddfe801df3886feaf466d883ea37e941bcc6d841b9f644a08c7acabfe7f8" dependencies = [ "anyhow", - "base64 0.21.4", + "base64 0.21.5", "bincode 1.3.3", "directories-next", "file-per-thread-logger", diff --git a/Cargo.toml b/Cargo.toml index 76076f44e..be113f657 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,6 @@ tonic-build = { version = "0.9" } tonic-web = { version = "0.9" } tonic-reflection = { version = "0.9" } arrow = { version = "46.0.0" } -arrow-buffer = { version = "46.0.0" } arrow-array = { version = "46.0.0" } arrow-schema = { version = "46.0.0" } object_store = { version = "0.7.1" } @@ -44,11 +43,6 @@ parquet = { version = "46.0.0" } [profile.release] debug = 1 -[profile.release.package.copy-artifacts] -# optimize for small binary size -strip = true -opt-level = "z" - [patch.crates-io] typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' } parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'} @@ -56,4 +50,4 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/par arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'} arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'} arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'} -object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'} \ No newline at end of file +object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'} diff --git a/arroyo-api/Cargo.toml b/arroyo-api/Cargo.toml index bca408689..3eaab4871 100644 --- a/arroyo-api/Cargo.toml +++ b/arroyo-api/Cargo.toml @@ -37,7 +37,7 @@ petgraph = {version = "0.6", features = ["serde-1"]} http = "0.2" tower-http = {version = "0.4", features = ["trace", "fs", "cors", "validate-request", "auth"]} -axum = {version = "0.6.12", features = ["headers", "tokio"]} +axum = {version = "0.6.12", features = ["headers", "tokio", "macros"]} axum-extra = "0.7.4" thiserror = "1.0.40" utoipa = "3" @@ -77,6 +77,7 @@ cornucopia_async = { version = "0.4", features = ["with-serde_json-1"] } jwt-simple = "0.11.4" uuid = "1.3.3" regress = "0.6.0" +apache-avro = "0.16.0" [build-dependencies] cornucopia = { version = "0.9" } diff --git a/arroyo-api/src/connection_tables.rs b/arroyo-api/src/connection_tables.rs index 2950290a6..b13788f31 100644 --- a/arroyo-api/src/connection_tables.rs +++ b/arroyo-api/src/connection_tables.rs @@ -9,20 +9,24 @@ use axum_extra::extract::WithRejection; use cornucopia_async::GenericClient; use cornucopia_async::Params; use futures_util::stream::Stream; -use http::StatusCode; -use serde_json::json; +use serde_json::{json, Value}; use std::convert::Infallible; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; +use arroyo_connectors::kafka::{KafkaConfig, KafkaTable}; use arroyo_connectors::{connector_for_type, ErasedConnector}; use arroyo_rpc::api_types::connections::{ - ConfluentSchema, ConfluentSchemaQueryParams, ConnectionProfile, ConnectionSchema, - ConnectionTable, ConnectionTablePost, SchemaDefinition, + ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, SchemaDefinition, }; use arroyo_rpc::api_types::{ConnectionTableCollection, PaginationQueryParams}; +use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat}; use arroyo_rpc::public_ids::{generate_id, IdTypes}; +use arroyo_rpc::schema_resolver::{ + ConfluentSchemaResolver, ConfluentSchemaResponse, ConfluentSchemaType, +}; +use arroyo_sql::avro; use arroyo_sql::json_schema::convert_json_schema; use arroyo_sql::types::{StructField, TypeDef}; @@ -95,8 +99,9 @@ async fn get_and_validate_connector( .transpose() .map_err(|e| bad_request(format!("Invalid schema: {}", e)))?; - let schema = if let Some(schema) = &schema { - Some(expand_schema(&req.name, schema)?) + let schema = if let Some(schema) = schema { + let name = connector.name(); + Some(expand_schema(&req.name, name, schema, &req.config, &profile_config).await?) } else { None }; @@ -130,13 +135,14 @@ pub(crate) async fn delete_connection_table( .map_err(|e| handle_delete("connection_table", "pipelines", e))?; if deleted == 0 { - return Err(not_found("Connection table".to_string())); + return Err(not_found("Connection table")); } Ok(()) } /// Test a Connection Table +#[axum::debug_handler] #[utoipa::path( post, path = "/v1/connection_tables/test", @@ -254,7 +260,7 @@ pub async fn create_connection_table( .unwrap() .to_string(); - if let Some(schema) = &req.schema { + if let Some(schema) = &schema { if schema.definition.is_none() { return Err(required_field("schema.definition")); } @@ -413,31 +419,109 @@ pub(crate) async fn get_connection_tables( // attempts to fill in the SQL schema from a schema object that may just have a json-schema or // other source schema. schemas stored in the database should always be expanded first. -pub(crate) fn expand_schema( +pub(crate) async fn expand_schema( + name: &str, + connector: &str, + schema: ConnectionSchema, + table_config: &Value, + profile_config: &Value, +) -> Result { + let Some(format) = schema.format.as_ref() else { + return Ok(schema); + }; + + match format { + Format::Json(_) => { + expand_json_schema(name, connector, schema, table_config, profile_config).await + } + Format::Avro(_) => { + expand_avro_schema(name, connector, schema, table_config, profile_config).await + } + Format::Parquet(_) => Ok(schema), + Format::RawString(_) => Ok(schema), + } +} + +async fn expand_avro_schema( name: &str, - schema: &ConnectionSchema, + connector: &str, + mut schema: ConnectionSchema, + table_config: &Value, + profile_config: &Value, ) -> Result { - let mut schema = schema.clone(); + if let Some(Format::Avro(AvroFormat { + confluent_schema_registry: true, + .. + })) = &schema.format + { + let schema_response = get_schema(connector, table_config, profile_config).await?; + + if schema_response.schema_type != ConfluentSchemaType::Avro { + return Err(bad_request(format!( + "Format configured is avro, but confluent schema repository returned a {:?} schema", + schema_response.schema_type + ))); + } + + schema.definition = Some(SchemaDefinition::AvroSchema(schema_response.schema)); + } + + let Some(SchemaDefinition::AvroSchema(definition)) = schema.definition.as_ref() else { + return Err(bad_request("avro format requires an avro schema be set")); + }; + + if let Some(Format::Avro(format)) = &mut schema.format { + format.add_reader_schema( + apache_avro::Schema::parse_str(&definition) + .map_err(|e| bad_request(format!("Avro schema is invalid: {:?}", e)))?, + ); + } + + let fields: Result<_, String> = avro::convert_avro_schema(&name, &definition) + .map_err(|e| bad_request(format!("Invalid avro schema: {}", e)))? + .into_iter() + .map(|f| f.try_into()) + .collect(); + + schema.fields = fields.map_err(|e| bad_request(format!("Failed to convert schema: {}", e)))?; + + Ok(schema) +} + +async fn expand_json_schema( + name: &str, + connector: &str, + mut schema: ConnectionSchema, + table_config: &Value, + profile_config: &Value, +) -> Result { + if let Some(Format::Json(JsonFormat { + confluent_schema_registry: true, + .. + })) = &schema.format + { + let schema_response = get_schema(connector, table_config, profile_config).await?; + + if schema_response.schema_type != ConfluentSchemaType::Json { + return Err(bad_request(format!( + "Format configured is json, but confluent schema repository returned a {:?} schema", + schema_response.schema_type + ))); + } + + schema.definition = Some(SchemaDefinition::JsonSchema(schema_response.schema)); + } if let Some(d) = &schema.definition { let fields = match d { - SchemaDefinition::JsonSchema(json) => convert_json_schema(name, &json) + SchemaDefinition::JsonSchema(json) => convert_json_schema(&name, &json) .map_err(|e| bad_request(format!("Invalid json-schema: {}", e)))?, - SchemaDefinition::ProtobufSchema(_) => { - return Err(bad_request( - "Protobuf schemas are not yet supported".to_string(), - )) - } - SchemaDefinition::AvroSchema(_) => { - return Err(bad_request( - "Avro schemas are not yet supported".to_string(), - )) - } SchemaDefinition::RawSchema(_) => vec![StructField::new( "value".to_string(), None, TypeDef::DataType(DataType::Utf8, false), )], + _ => return Err(bad_request("Invalid schema type for json format")), }; let fields: Result<_, String> = fields.into_iter().map(|f| f.try_into()).collect(); @@ -449,6 +533,44 @@ pub(crate) fn expand_schema( Ok(schema) } +async fn get_schema( + connector: &str, + table_config: &Value, + profile_config: &Value, +) -> Result { + if connector != "kafka" { + return Err(bad_request( + "confluent schema registry can only be used for Kafka connections", + )); + } + + // we unwrap here because this should already have been validated + let profile: KafkaConfig = + serde_json::from_value(profile_config.clone()).expect("invalid kafka config"); + + let table: KafkaTable = + serde_json::from_value(table_config.clone()).expect("invalid kafka table"); + + let schema_registry = profile.schema_registry.as_ref().ok_or_else(|| { + bad_request("schema registry must be configured on the Kafka connection profile") + })?; + + let resolver = + ConfluentSchemaResolver::new(&schema_registry.endpoint, &table.topic).map_err(|e| { + bad_request(format!( + "failed to fetch schemas from schema repository: {}", + e + )) + })?; + + resolver.get_schema(None).await.map_err(|e| { + bad_request(format!( + "failed to fetch schemas from schema repository: {}", + e + )) + }) +} + /// Test a Connection Schema #[utoipa::path( post, @@ -480,110 +602,3 @@ pub(crate) async fn test_schema( } } } - -/// Get a Confluent Schema -#[utoipa::path( - get, - path = "/v1/connection_tables/schemas/confluent", - tag = "connection_tables", - params( - ("topic" = String, Query, description = "Confluent topic name"), - ("endpoint" = String, Query, description = "Confluent schema registry endpoint"), - ), - responses( - (status = 200, description = "Got Confluent Schema", body = ConfluentSchema), - ), -)] -pub(crate) async fn get_confluent_schema( - query_params: Query, -) -> Result, ErrorResp> { - // TODO: ensure only external URLs can be hit - let url = format!( - "{}/subjects/{}-value/versions/latest", - query_params.endpoint, query_params.topic - ); - let resp = reqwest::get(url).await.map_err(|e| { - warn!("Got error response from schema registry: {:?}", e); - match e.status() { - Some(StatusCode::NOT_FOUND) => bad_request(format!( - "Could not find value schema for topic '{}'", - query_params.topic - )), - - Some(code) => bad_request(format!("Schema registry returned error: {}", code)), - None => { - warn!( - "Unknown error connecting to schema registry {}: {:?}", - query_params.endpoint, e - ); - bad_request(format!( - "Could not connect to Schema Registry at {}: unknown error", - query_params.endpoint - )) - } - } - })?; - - if !resp.status().is_success() { - let message = format!( - "Received an error status code from the provided endpoint: {} {}", - resp.status().as_u16(), - resp.bytes() - .await - .map(|bs| String::from_utf8_lossy(&bs).to_string()) - .unwrap_or_else(|_| "".to_string()) - ); - return Err(bad_request(message)); - } - - let value: serde_json::Value = resp.json().await.map_err(|e| { - warn!("Invalid json from schema registry: {:?}", e); - bad_request(format!( - "Schema registry returned invalid JSON: {}", - e.to_string() - )) - })?; - - let schema_type = value - .get("schemaType") - .ok_or_else(|| { - bad_request( - "The JSON returned from this endpoint was unexpected. Please confirm that the URL is correct." - .to_string(), - ) - })? - .as_str(); - - if schema_type != Some("JSON") { - return Err(bad_request( - "Only JSON schema types are supported currently".to_string(), - )); - } - - let schema = value - .get("schema") - .ok_or_else(|| { - return bad_request("Missing 'schema' field in schema registry response".to_string()); - })? - .as_str() - .ok_or_else(|| { - return bad_request( - "The 'schema' field in the schema registry response is not a string".to_string(), - ); - })?; - - if let Err(e) = convert_json_schema(&query_params.topic, schema) { - warn!( - "Schema from schema registry is not valid: '{}': {}", - schema, e - ); - return Err(bad_request(format!( - "Schema from schema registry is not valid: {}", - e - ))); - } - - Ok(Json(ConfluentSchema { - schema: schema.to_string(), - })) -} diff --git a/arroyo-api/src/jobs.rs b/arroyo-api/src/jobs.rs index f9c627663..ce6feca91 100644 --- a/arroyo-api/src/jobs.rs +++ b/arroyo-api/src/jobs.rs @@ -416,7 +416,7 @@ pub async fn get_checkpoint_details( .await .map_err(log_and_map)? .ok_or_else(|| { - not_found(format!( + not_found(&format!( "Checkpoint with epoch {} for job '{}'", epoch, job_pub_id )) diff --git a/arroyo-api/src/lib.rs b/arroyo-api/src/lib.rs index 1c3f2702e..43d9f4312 100644 --- a/arroyo-api/src/lib.rs +++ b/arroyo-api/src/lib.rs @@ -8,8 +8,8 @@ use crate::connection_profiles::{ __path_create_connection_profile, __path_get_connection_profiles, }; use crate::connection_tables::{ - __path_create_connection_table, __path_delete_connection_table, __path_get_confluent_schema, - __path_get_connection_tables, __path_test_connection_table, __path_test_schema, + __path_create_connection_table, __path_delete_connection_table, __path_get_connection_tables, + __path_test_connection_table, __path_test_schema, }; use crate::connectors::__path_get_connectors; use crate::jobs::{ @@ -148,7 +148,6 @@ pub(crate) fn to_micros(dt: OffsetDateTime) -> u64 { delete_connection_table, test_connection_table, test_schema, - get_confluent_schema, get_checkpoint_details, ), components(schemas( @@ -193,7 +192,6 @@ pub(crate) fn to_micros(dt: OffsetDateTime) -> u64 { PrimitiveType, SchemaDefinition, TestSourceMessage, - ConfluentSchema, JsonFormat, AvroFormat, ParquetFormat, diff --git a/arroyo-api/src/pipelines.rs b/arroyo-api/src/pipelines.rs index 63677a9b2..ccfe6e3c9 100644 --- a/arroyo-api/src/pipelines.rs +++ b/arroyo-api/src/pipelines.rs @@ -600,7 +600,7 @@ pub async fn patch_pipeline( .opt() .await .map_err(log_and_map)? - .ok_or_else(|| not_found("Job".to_string()))?; + .ok_or_else(|| not_found("Job"))?; let program = PipelineProgram::decode(&res.program[..]).map_err(log_and_map)?; let map: HashMap = program @@ -629,7 +629,7 @@ pub async fn patch_pipeline( .map_err(log_and_map)?; if res == 0 { - return Err(not_found("Job".to_string())); + return Err(not_found("Job")); } let pipeline = query_pipeline_by_pub_id(&pipeline_pub_id, &client, &auth_data).await?; @@ -683,7 +683,7 @@ pub async fn restart_pipeline( .map_err(log_and_map)?; if res == 0 { - return Err(not_found("Pipeline".to_string())); + return Err(not_found("Pipeline")); } let pipeline = query_pipeline_by_pub_id(&id, &client, &auth_data).await?; @@ -814,7 +814,7 @@ pub async fn delete_pipeline( .map_err(log_and_map)?; if count != 1 { - return Err(not_found("Pipeline".to_string())); + return Err(not_found("Pipeline")); } Ok(()) @@ -864,7 +864,7 @@ pub async fn query_pipeline_by_pub_id( .await .map_err(log_and_map)?; - let res = pipeline.ok_or_else(|| not_found("Pipeline".to_string()))?; + let res = pipeline.ok_or_else(|| not_found("Pipeline"))?; res.try_into() } @@ -884,7 +884,7 @@ pub async fn query_job_by_pub_id( .await .map_err(log_and_map)?; - let res: DbPipelineJob = job.ok_or_else(|| not_found("Job".to_string()))?; + let res: DbPipelineJob = job.ok_or_else(|| not_found("Job"))?; Ok(res.into()) } diff --git a/arroyo-api/src/rest.rs b/arroyo-api/src/rest.rs index b0403a327..8da8d0741 100644 --- a/arroyo-api/src/rest.rs +++ b/arroyo-api/src/rest.rs @@ -19,8 +19,8 @@ use utoipa_swagger_ui::SwaggerUi; use crate::connection_profiles::{create_connection_profile, get_connection_profiles}; use crate::connection_tables::{ - create_connection_table, delete_connection_table, get_confluent_schema, get_connection_tables, - test_connection_table, test_schema, + create_connection_table, delete_connection_table, get_connection_tables, test_connection_table, + test_schema, }; use crate::connectors::get_connectors; use crate::jobs::{ @@ -55,7 +55,7 @@ pub async fn ping() -> impl IntoResponse { } pub async fn api_fallback() -> impl IntoResponse { - not_found("Route".to_string()) + not_found("Route") } pub fn create_rest_app(pool: Pool, controller_addr: &str) -> Router { @@ -112,10 +112,6 @@ pub fn create_rest_app(pool: Pool, controller_addr: &str) -> Router { .route("/connection_tables", post(create_connection_table)) .route("/connection_tables/test", post(test_connection_table)) .route("/connection_tables/schemas/test", post(test_schema)) - .route( - "/connection_tables/schemas/confluent", - get(get_confluent_schema), - ) .route("/connection_tables/:id", delete(delete_connection_table)) .route("/pipelines", post(post_pipeline)) .route("/pipelines", get(get_pipelines)) diff --git a/arroyo-api/src/rest_utils.rs b/arroyo-api/src/rest_utils.rs index e09f1127f..02822f435 100644 --- a/arroyo-api/src/rest_utils.rs +++ b/arroyo-api/src/rest_utils.rs @@ -76,21 +76,21 @@ pub(crate) async fn authenticate( cloud::authenticate(client, bearer_auth).await } -pub(crate) fn bad_request(message: String) -> ErrorResp { +pub(crate) fn bad_request(message: impl Into) -> ErrorResp { ErrorResp { status_code: StatusCode::BAD_REQUEST, - message: message.to_string(), + message: message.into(), } } -pub(crate) fn unauthorized(message: String) -> ErrorResp { +pub(crate) fn unauthorized(message: impl Into) -> ErrorResp { ErrorResp { status_code: StatusCode::UNAUTHORIZED, - message: message.to_string(), + message: message.into(), } } -pub(crate) fn not_found(object: String) -> ErrorResp { +pub(crate) fn not_found(object: &str) -> ErrorResp { ErrorResp { status_code: StatusCode::NOT_FOUND, message: format!("{} not found", object), diff --git a/arroyo-connectors/src/kafka.rs b/arroyo-connectors/src/kafka.rs index eb524d839..a5e5262fa 100644 --- a/arroyo-connectors/src/kafka.rs +++ b/arroyo-connectors/src/kafka.rs @@ -151,9 +151,14 @@ impl Connector for KafkaConnector { Some(other) => bail!("unknown auth type '{}'", other), }; + let schema_registry = opts + .remove("schema_registry.endpoint") + .map(|endpoint| SchemaRegistry { endpoint }); + let connection = KafkaConfig { authentication: auth, bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", opts)?), + schema_registry, }; let typ = pull_opt("type", opts)?; diff --git a/arroyo-console/package.json b/arroyo-console/package.json index 0fdab3292..766e8ed86 100644 --- a/arroyo-console/package.json +++ b/arroyo-console/package.json @@ -9,7 +9,7 @@ "preview": "vite preview", "format": "npx prettier --write src/ && npx eslint --fix --ext .js,.jsx,.ts,.tsx src", "check": "npx prettier --check src/ && npx eslint --ext .js,.jsx,.ts,.tsx src", - "openapi": "cargo build && npx openapi-typescript $(pwd)/../arroyo-openapi/api-spec.json --output $(pwd)/src/gen/api-types.ts" + "openapi": "cargo build --package arroyo-openapi && npx openapi-typescript $(pwd)/../arroyo-openapi/api-spec.json --output $(pwd)/src/gen/api-types.ts" }, "dependencies": { "@babel/core": "^7.22.5", diff --git a/arroyo-console/src/gen/api-types.ts b/arroyo-console/src/gen/api-types.ts index 002ff3817..9dfd605ae 100644 --- a/arroyo-console/src/gen/api-types.ts +++ b/arroyo-console/src/gen/api-types.ts @@ -34,13 +34,6 @@ export interface paths { */ post: operations["create_connection_table"]; }; - "/v1/connection_tables/schemas/confluent": { - /** - * Get a Confluent Schema - * @description Get a Confluent Schema - */ - get: operations["get_confluent_schema"]; - }; "/v1/connection_tables/schemas/test": { /** * Test a Connection Schema @@ -183,7 +176,10 @@ export type webhooks = Record; export interface components { schemas: { - AvroFormat: Record; + AvroFormat: { + confluentSchemaRegistry: boolean; + embeddedSchema: boolean; + }; Checkpoint: { backend: string; /** Format: int32 */ @@ -206,9 +202,6 @@ export interface components { }; /** @enum {string} */ CheckpointSpanType: "alignment" | "sync" | "async" | "committing"; - ConfluentSchema: { - schema: string; - }; ConnectionProfile: { config: unknown; connector: string; @@ -588,28 +581,6 @@ export interface operations { }; }; }; - /** - * Get a Confluent Schema - * @description Get a Confluent Schema - */ - get_confluent_schema: { - parameters: { - query: { - /** @description Confluent topic name */ - topic: string; - /** @description Confluent schema registry endpoint */ - endpoint: string; - }; - }; - responses: { - /** @description Got Confluent Schema */ - 200: { - content: { - "application/json": components["schemas"]["ConfluentSchema"]; - }; - }; - }; - }; /** * Test a Connection Schema * @description Test a Connection Schema diff --git a/arroyo-console/src/routes/connections/ConfluentSchemaEditor.tsx b/arroyo-console/src/routes/connections/ConfluentSchemaEditor.tsx index 7076001db..9e603dec2 100644 --- a/arroyo-console/src/routes/connections/ConfluentSchemaEditor.tsx +++ b/arroyo-console/src/routes/connections/ConfluentSchemaEditor.tsx @@ -1,20 +1,6 @@ -import { Dispatch, useState } from 'react'; +import { Dispatch } from 'react'; import { CreateConnectionState } from './CreateConnection'; -import { - Alert, - AlertIcon, - Box, - Button, - Code, - FormControl, - FormHelperText, - FormLabel, - Heading, - Input, - Stack, -} from '@chakra-ui/react'; -import { get } from '../../lib/data_fetching'; -import { formatError } from '../../lib/util'; +import { Button, Stack, Text } from '@chakra-ui/react'; export function ConfluentSchemaEditor({ state, @@ -25,99 +11,13 @@ export function ConfluentSchemaEditor({ setState: Dispatch; next: () => void; }) { - const [endpoint, setEndpoint] = useState(''); - const [schema, setSchema] = useState(null); - const [error, setError] = useState(null); - const [loading, setLoading] = useState(false); - - const topic = state.table.topic; - - const fetchSchema = async () => { - setError(null); - setLoading(true); - const { data, error } = await get('/v1/connection_tables/schemas/confluent', { - params: { - query: { - endpoint, - topic, - }, - }, - }); - - if (error) { - setError(formatError(error)); - } - - if (data) { - setSchema(data.schema); - setState({ - ...state, - schema: { - ...state.schema, - definition: { json_schema: data.schema }, - fields: [], - format: { json: { confluentSchemaRegistry: true } }, - }, - }); - } - setLoading(false); - }; - - let errorBox = null; - if (error != null) { - errorBox = ( - - - - {error} - - - ); - } - - let successBox = null; - if (schema != null) { - successBox = ( - - Fetched schema for topic {topic} - -
{JSON.stringify(JSON.parse(schema), null, 2)}
-
-
- ); - } - return ( - - Schema Registry Endpoint - setEndpoint(e.target.value)} - /> - Provide the endpoint for your Confluent Schema Registry - - - {errorBox} + Schemas will be loaded from the configured Confluent Schema Registry - - - {successBox} - - {schema != null && ( - - )} ); } diff --git a/arroyo-console/src/routes/connections/ConnectionTester.tsx b/arroyo-console/src/routes/connections/ConnectionTester.tsx index 1657ddc66..e9c7782ba 100644 --- a/arroyo-console/src/routes/connections/ConnectionTester.tsx +++ b/arroyo-console/src/routes/connections/ConnectionTester.tsx @@ -78,7 +78,6 @@ export function ConnectionTester({ const onClickTest = async () => { setTesting(true); setError(null); - console.log('config', state); if (!testing) { await useConnectionTableTest(sseHandler, createRequest); @@ -89,9 +88,10 @@ export function ConnectionTester({ const submit = async () => { setError(null); const { error } = await post('/v1/connection_tables', { body: createRequest }); - navigate('/connections'); if (error) { setError({ title: 'Failed to create connection', body: formatError(error) }); + } else { + navigate('/connections'); } }; diff --git a/arroyo-console/src/routes/connections/DefineSchema.tsx b/arroyo-console/src/routes/connections/DefineSchema.tsx index 1c0f4bb49..6d03bde54 100644 --- a/arroyo-console/src/routes/connections/DefineSchema.tsx +++ b/arroyo-console/src/routes/connections/DefineSchema.tsx @@ -11,82 +11,118 @@ import { } from '@chakra-ui/react'; import React, { ChangeEvent, ChangeEventHandler, Dispatch, ReactElement, useState } from 'react'; import { CreateConnectionState } from './CreateConnection'; -import { JsonSchemaEditor } from './JsonSchemaEditor'; -import { Connector } from '../../lib/data_fetching'; +import { SchemaEditor } from './SchemaEditor'; +import { + ConnectionProfile, + ConnectionSchema, + Connector, + useConnectionProfiles, +} from '../../lib/data_fetching'; import { ConfluentSchemaEditor } from './ConfluentSchemaEditor'; import { components } from '../../gen/api-types'; -const JsonEditor = ({ +const SchemaFormatEditor = ({ connector, + connectionProfiles, state, setState, next, + format, }: { connector: Connector; + connectionProfiles: Array; state: CreateConnectionState; setState: Dispatch; next: () => void; + format: 'json' | 'avro'; }) => { type SchemaTypeOption = { name: string; value: string }; - let schemaTypeOptions: SchemaTypeOption[] = [ - { name: 'JSON Schema', value: 'json' }, - { name: 'Unstructured JSON', value: 'unstructured' }, - ]; - if (connector.id == 'kafka') { - schemaTypeOptions.push({ name: 'Confluent Schema Registry', value: 'confluent' }); + let schemaTypeOptions: SchemaTypeOption[] = [{ name: format + ' schema', value: 'schema' }]; + + if (format == 'json') { + schemaTypeOptions.push({ name: 'Unstructured JSON', value: 'unstructured' }); } - const [selectedSchemaType, setSelectedSchemaType] = useState( - undefined - ); + let connectionProfile = null; + if (state.connectionProfileId != null) { + connectionProfile = connectionProfiles.find(c => c.id == state.connectionProfileId); + } - let editor: JSX.Element | null = null; - switch (selectedSchemaType?.value) { + if ( + connector.id == 'kafka' && + connectionProfile != null && + (connectionProfile.config as any).schemaRegistry != null + ) { + schemaTypeOptions.push({ name: 'Confluent Schema Registry', value: 'confluent' }); + } + + let def_name: 'json_schema' | 'avro_schema'; + switch (format) { case 'json': - editor = ; + def_name = 'json_schema'; break; - case 'confluent': - editor = ; - break; - case 'unstructured': - editor = ( - - - Connection tables configured with an unstructured JSON schema have a single{' '} - value column with the JSON value, which can be accessed using SQL{' '} - - JSON functions - - . - - - - ); + case 'avro': + def_name = 'avro_schema'; break; + default: + throw new Error('unknown format: ' + format); + } + + let editor; + let value; + if ((state.schema?.format![format] || {})['confluentSchemaRegistry']) { + editor = ; + value = 'confluent'; + } else if (state.schema?.format?.json?.unstructured) { + editor = ( + + + Connection tables configured with an unstructured JSON schema have a single{' '} + value column with the JSON value, which can be accessed using SQL{' '} + + JSON functions + + . + + + + ); + value = 'unstructured'; + } else if ((state.schema?.definition || {})[def_name] != undefined) { + editor = ; + value = 'schema'; + } else { + editor = null; + value = undefined; } const onChange = (e: React.ChangeEvent) => { - setSelectedSchemaType(schemaTypeOptions.find(o => o.value == e.target.value)); switch (e.target.value) { - case 'json': + case 'schema': { setState({ ...state, schema: { ...state.schema, - definition: { json_schema: '' }, + // @ts-ignore + definition: { + [def_name]: '', + }, fields: [], - format: { json: { unstructured: false, confluentSchemaRegistry: false } }, + // @ts-ignore + format: { [format]: {} }, }, }); break; + } case 'confluent': setState({ ...state, schema: { ...state.schema, - definition: { json_schema: '' }, + definition: null, fields: [], - format: { json: { unstructured: false, confluentSchemaRegistry: true } }, + // @ts-ignore + format: { [format]: { confluentSchemaRegistry: true } }, }, }); break; @@ -101,6 +137,17 @@ const JsonEditor = ({ }, }); break; + default: + setState({ + ...state, + schema: { + ...state.schema, + definition: undefined, + fields: [], + // @ts-ignore + format: { [format]: {} }, + }, + }); } }; @@ -108,12 +155,7 @@ const JsonEditor = ({ Schema type - {schemaTypeOptions.map(s => { return (