From aea50bed589f1e8dc02fdb3e29dfe2679a06e205 Mon Sep 17 00:00:00 2001 From: Jonah Eisen Date: Mon, 27 Nov 2023 20:45:22 -0800 Subject: [PATCH] Make Kafka schema registry an enum The JSON form in the console does not behave properly when an optional object has an optional field, which is the case with schema registry. Making it an enum in the schema solves it for this case. --- arroyo-api/src/connection_tables.rs | 37 +++++----- arroyo-api/src/pipelines.rs | 17 +++-- arroyo-connectors/src/kafka.rs | 9 +-- .../src/routes/connections/JsonForm.tsx | 6 +- arroyo-sql-macro/src/connectors.rs | 4 +- arroyo-sql/src/lib.rs | 2 +- .../src/connectors/kafka/source/mod.rs | 15 ++-- connector-schemas/kafka/connection.json | 73 ++++++++++++------- 8 files changed, 95 insertions(+), 68 deletions(-) diff --git a/arroyo-api/src/connection_tables.rs b/arroyo-api/src/connection_tables.rs index 3a2167312..2e170c86a 100644 --- a/arroyo-api/src/connection_tables.rs +++ b/arroyo-api/src/connection_tables.rs @@ -15,7 +15,7 @@ use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; -use arroyo_connectors::kafka::{KafkaConfig, KafkaTable}; +use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry}; use arroyo_connectors::{connector_for_type, ErasedConnector}; use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, SchemaDefinition, @@ -553,22 +553,25 @@ async fn get_schema( let table: KafkaTable = serde_json::from_value(table_config.clone()).expect("invalid kafka table"); - let schema_registry = profile.schema_registry.as_ref().ok_or_else(|| { - bad_request("schema registry must be configured on the Kafka connection profile") - })?; - - let resolver = ConfluentSchemaRegistry::new( - &schema_registry.endpoint, - &table.topic, - schema_registry.api_key.clone(), - schema_registry.api_secret.clone(), - ) - .map_err(|e| { - bad_request(format!( - "failed to fetch schemas from schema repository: {}", - e - )) - })?; + let Some(SchemaRegistry::ConfluentSchemaRegistry { + endpoint, + api_key, + api_secret, + }) = profile.schema_registry_enum + else { + return Err(bad_request( + "schema registry must be configured on the Kafka connection profile", + )); + }; + + let resolver = + ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key.clone(), api_secret.clone()) + .map_err(|e| { + bad_request(format!( + "failed to fetch schemas from schema repository: {}", + e + )) + })?; resolver.get_schema(None).await.map_err(|e| { bad_request(format!( diff --git a/arroyo-api/src/pipelines.rs b/arroyo-api/src/pipelines.rs index d2cbdc1d6..78fcebc34 100644 --- a/arroyo-api/src/pipelines.rs +++ b/arroyo-api/src/pipelines.rs @@ -26,7 +26,7 @@ use arroyo_rpc::grpc::api::{ create_pipeline_req, CreateJobReq, CreatePipelineReq, CreateSqlJob, PipelineProgram, }; -use arroyo_connectors::kafka::{KafkaConfig, KafkaTable}; +use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry}; use arroyo_formats::avro::arrow_to_avro_schema; use arroyo_rpc::formats::Format; use arroyo_rpc::public_ids::{generate_id, IdTypes}; @@ -172,16 +172,17 @@ async fn try_register_confluent_schema( return Ok(()); }; - let Some(registry_config) = profile.schema_registry else { + let Some(SchemaRegistry::ConfluentSchemaRegistry { + endpoint, + api_key, + api_secret, + }) = profile.schema_registry_enum + else { return Ok(()); }; - let schema_registry = ConfluentSchemaRegistry::new( - ®istry_config.endpoint, - &table.topic, - registry_config.api_key, - registry_config.api_secret, - )?; + let schema_registry = + ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key, api_secret)?; match config.format.clone() { Some(Format::Avro(mut avro)) => { diff --git a/arroyo-connectors/src/kafka.rs b/arroyo-connectors/src/kafka.rs index 994d20369..27d6ebc3d 100644 --- a/arroyo-connectors/src/kafka.rs +++ b/arroyo-connectors/src/kafka.rs @@ -5,15 +5,14 @@ use std::collections::HashMap; use std::convert::Infallible; use typify::import_types; -use axum::response::sse::Event; -use std::time::{Duration, Instant}; - use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage}; +use axum::response::sse::Event; use rdkafka::{ consumer::{BaseConsumer, Consumer}, message::BorrowedMessage, ClientConfig, Offset, TopicPartitionList, }; +use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; use tonic::Status; use tracing::{error, info, warn}; @@ -159,7 +158,7 @@ impl Connector for KafkaConnector { let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| { let api_key = options.remove("schema_registry.api_key"); let api_secret = options.remove("schema_registry.api_secret"); - SchemaRegistry { + SchemaRegistry::ConfluentSchemaRegistry { endpoint, api_key, api_secret, @@ -168,7 +167,7 @@ impl Connector for KafkaConnector { KafkaConfig { authentication: auth, bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", options)?), - schema_registry, + schema_registry_enum: schema_registry, } } }; diff --git a/arroyo-console/src/routes/connections/JsonForm.tsx b/arroyo-console/src/routes/connections/JsonForm.tsx index 23046f77a..19ee2ecfc 100644 --- a/arroyo-console/src/routes/connections/JsonForm.tsx +++ b/arroyo-console/src/routes/connections/JsonForm.tsx @@ -633,7 +633,9 @@ export function JsonForm({ error: string | null; button?: string; }) { - const ajv = useMemo(() => addFormats(new Ajv()), [schema]); + let ajv = new Ajv(); + ajv.addKeyword('isSensitive'); + const memoAjv = useMemo(() => addFormats(ajv), [schema]); const formik = useFormik({ initialValues: initial, @@ -645,7 +647,7 @@ export function JsonForm({ errors.name = 'Name is required'; } - let validate = ajv.compile(schema); + let validate = memoAjv.compile(schema); let valid = validate(values); if (!valid) { diff --git a/arroyo-sql-macro/src/connectors.rs b/arroyo-sql-macro/src/connectors.rs index f54d3e937..0d56d2f63 100644 --- a/arroyo-sql-macro/src/connectors.rs +++ b/arroyo-sql-macro/src/connectors.rs @@ -90,7 +90,7 @@ pub fn get_json_schema_source() -> Result { let config = KafkaConfig { authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {}, bootstrap_servers: "localhost:9092".try_into().unwrap(), - schema_registry: None, + schema_registry_enum: None, }; let table = KafkaTable { topic: "test_topic".to_string(), @@ -195,7 +195,7 @@ pub fn get_avro_source() -> Result { let config = KafkaConfig { authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {}, bootstrap_servers: "localhost:9092".try_into().unwrap(), - schema_registry: None, + schema_registry_enum: None, }; let table = KafkaTable { topic: "test_topic".to_string(), diff --git a/arroyo-sql/src/lib.rs b/arroyo-sql/src/lib.rs index 3e5112532..e0ad6722b 100644 --- a/arroyo-sql/src/lib.rs +++ b/arroyo-sql/src/lib.rs @@ -703,7 +703,7 @@ pub fn get_test_expression( KafkaConfig { authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {}, bootstrap_servers: "localhost:9092".to_string().try_into().unwrap(), - schema_registry: None, + schema_registry_enum: None, }, KafkaTable { topic: "test_topic".to_string(), diff --git a/arroyo-worker/src/connectors/kafka/source/mod.rs b/arroyo-worker/src/connectors/kafka/source/mod.rs index 01ead4f8a..8838f5538 100644 --- a/arroyo-worker/src/connectors/kafka/source/mod.rs +++ b/arroyo-worker/src/connectors/kafka/source/mod.rs @@ -22,7 +22,7 @@ use std::time::Duration; use tokio::select; use tracing::{debug, error, info, warn}; -use super::{client_configs, KafkaConfig, KafkaTable, ReadMode, TableType}; +use super::{client_configs, KafkaConfig, KafkaTable, ReadMode, SchemaRegistry, TableType}; #[cfg(test)] mod test; @@ -105,13 +105,18 @@ where } let schema_resolver: Arc = - if let Some(schema_registry) = &connection.schema_registry { + if let Some(SchemaRegistry::ConfluentSchemaRegistry { + endpoint, + api_key, + api_secret, + }) = &connection.schema_registry_enum + { Arc::new( ConfluentSchemaRegistry::new( - &schema_registry.endpoint, + &endpoint, &table.topic, - schema_registry.api_key.clone(), - schema_registry.api_secret.clone(), + api_key.clone(), + api_secret.clone(), ) .expect("failed to construct confluent schema resolver"), ) diff --git a/connector-schemas/kafka/connection.json b/connector-schemas/kafka/connection.json index caac41e8c..cbe9c05a7 100644 --- a/connector-schemas/kafka/connection.json +++ b/connector-schemas/kafka/connection.json @@ -6,7 +6,9 @@ "type": "string", "title": "Bootstrap Servers", "description": "Comma-separated list of Kafka servers to connect to", - "examples": ["broker-1:9092,broker-2:9092"], + "examples": [ + "broker-1:9092,broker-2:9092" + ], "pattern": "^(([\\w\\.\\-]+:\\d+),)*([\\w\\.\\-]+:\\d+)$" }, "authentication": { @@ -51,38 +53,53 @@ } ] }, - "schemaRegistry": { + "schemaRegistryEnum": { "type": "object", "title": "Schema Registry", - "properties": { - "endpoint": { - "title": "Endpoint", - "type": "string", - "description": "The endpoint for your Confluent Schema Registry if you have one", - "examples": [ - "http://localhost:8081" - ], - "format": "uri" - }, - "apiKey": { - "title": "API Key", - "type": "string", - "description": "The API key for your Confluent Schema Registry if you have one", - "examples": [ - "ABCDEFGHIJK01234" + "oneOf": [ + { + "type": "object", + "title": "Confluent Schema Registry", + "properties": { + "endpoint": { + "title": "Endpoint", + "type": "string", + "description": "The endpoint for your Confluent Schema Registry", + "examples": [ + "http://localhost:8081" + ], + "format": "uri" + }, + "apiKey": { + "title": "API Key", + "type": "string", + "description": "The API key for your Confluent Schema Registry", + "examples": [ + "ABCDEFGHIJK01234" + ] + }, + "apiSecret": { + "title": "API Secret", + "type": "string", + "description": "Secret for your Confluent Schema Registry", + "isSensitive": true, + "examples": [ + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=" + ] + } + }, + "required": [ + "endpoint" ] }, - "apiSecret": { - "title": "API Secret", - "type": "string", - "description": "Secret for your Confluent Schema Registry if you have one", - "isSensitive": true, - "examples": [ - "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=" - ] + { + "type": "object", + "title": "None", + "properties": { + }, + "additionalProperties": false } - }, - "required": ["endpoint"] + ] } }, "required": [