diff --git a/source-kafka/src/configuration.rs b/source-kafka/src/configuration.rs index d7efbc0b3..e2f900f2f 100644 --- a/source-kafka/src/configuration.rs +++ b/source-kafka/src/configuration.rs @@ -3,14 +3,14 @@ use rdkafka::client::{ClientContext, OAuthToken}; use rdkafka::consumer::{BaseConsumer, ConsumerContext}; use rdkafka::ClientConfig; use schemars::{schema::RootSchema, JsonSchema}; -use serde::{Deserialize, Serialize}; +use serde::{de, Deserialize, Deserializer, Serialize}; -#[derive(Serialize, Deserialize, Default)] +#[derive(Serialize, Deserialize)] pub struct EndpointConfig { bootstrap_servers: String, credentials: Option, tls: Option, - pub schema_registry: Option, + pub schema_registry: SchemaRegistryConfig, } #[derive(Serialize, Deserialize, Clone)] @@ -55,10 +55,31 @@ pub enum TlsSettings { } #[derive(Serialize, Deserialize)] -pub struct SchemaRegistryConfig { - pub endpoint: String, - pub username: String, - pub password: String, +#[serde(tag = "schema_registry_type")] +#[serde(rename_all = "snake_case")] +pub enum SchemaRegistryConfig { + ConfluentSchemaRegistry { + endpoint: String, + username: String, + password: String, + }, + NoSchemaRegistry { + #[serde(deserialize_with = "validate_json_only_true")] + enable_json_only: bool, + }, +} + +fn validate_json_only_true<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + if bool::deserialize(deserializer)? { + Ok(true) + } else { + Err(de::Error::custom( + "'enable_json_only' must be set to true when no schema registry is configured", + )) + } } impl JsonSchema for EndpointConfig { @@ -73,7 +94,8 @@ impl JsonSchema for EndpointConfig { "type": "object", "required": [ "bootstrap_servers", - "credentials" + "credentials", + "schema_registry" ], "properties": { "bootstrap_servers": { @@ -113,7 +135,6 @@ impl JsonSchema for EndpointConfig { }, "username": { "order": 2, - "secret": true, "title": "Username", "type": "string" }, @@ -176,35 +197,66 @@ impl JsonSchema for EndpointConfig { }, "schema_registry": { "title": "Schema Registry", - "description": "Connection details for interacting with a schema registry. This is necessary for processing messages encoded with Avro.", + "description": "Connection details for interacting with a schema registry.", "type": "object", - "properties": { - "endpoint": { - "type": "string", - "title": "Schema Registry Endpoint", - "description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud", - "order": 0 + "order": 3, + "discriminator": { + "propertyName": "schema_registry_type" + }, + "oneOf": [{ + "title": "Confluent Schema Registry", + "properties": { + "schema_registry_type": { + "type": "string", + "default": "confluent_schema_registry", + "const": "confluent_schema_registry", + "order": 0 + }, + "endpoint": { + "type": "string", + "title": "Schema Registry Endpoint", + "description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud", + "order": 1 + }, + "username": { + "type": "string", + "title": "Schema Registry Username", + "description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.", + "order": 2 + }, + "password": { + "type": "string", + "title": "Schema Registry Password", + "description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.", + "order": 3, + "secret": true + } }, - "username": { - "type": "string", - "title": "Schema Registry Username", - "description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.", - "order": 1 + "required": [ + "endpoint", + "username", + "password" + ], + }, { + "title": "No Schema Registry", + "properties": { + "schema_registry_type": { + "type": "string", + "default": "no_schema_registry", + "const": "no_schema_registry", + "order": 0 + }, + "enable_json_only": { + "type": "boolean", + "title": "Capture Messages in JSON Format Only", + "description": "If no schema registry is configured the capture will attempt to parse all data as JSON, and discovered collections will use a key of the message partition & offset. All available topics will be discovered, but if their messages are not encoded as JSON attempting to capture them will result in errors. If your topics contain messages encoded with a schema, you should configure the connector to use the schema registry for optimal results.", + "order": 1 + } }, - "password": { - "type": "string", - "title": "Schema Registry Password", - "description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.", - "order": 2, - "secret": true - } - }, - "required": [ - "endpoint", - "username", - "password" - ], - "order": 3 + "required": [ + "enable_json_only", + ], + }], } } })) diff --git a/source-kafka/src/discover.rs b/source-kafka/src/discover.rs index 4b488129e..231e9907e 100644 --- a/source-kafka/src/discover.rs +++ b/source-kafka/src/discover.rs @@ -13,7 +13,7 @@ use schemars::schema::RootSchema; use serde_json::json; use crate::{ - configuration::{EndpointConfig, Resource}, + configuration::{EndpointConfig, Resource, SchemaRegistryConfig}, schema_registry::{ RegisteredSchema::{Avro, Json, Protobuf}, SchemaRegistryClient, TopicSchema, @@ -29,7 +29,7 @@ pub async fn do_discover(req: Discover) -> Result> { let meta = consumer .fetch_metadata(None, KAFKA_TIMEOUT) - .context("could not fetch cluster metadata - double check your configuration")?; + .context("Could not connect to bootstrap server with the provided configuration. This may be due to an incorrect configuration for authentication or bootstrap servers. Double check your configuration and try again.")?; let mut all_topics: Vec = meta .topics() @@ -47,14 +47,18 @@ pub async fn do_discover(req: Discover) -> Result> { all_topics.sort(); let registered_schemas = match config.schema_registry { - Some(cfg) => { - let client = SchemaRegistryClient::new(cfg.endpoint, cfg.username, cfg.password); + SchemaRegistryConfig::ConfluentSchemaRegistry { + endpoint, + username, + password, + } => { + let client = SchemaRegistryClient::new(endpoint, username, password); client .schemas_for_topics(&all_topics) .await - .context("fetching topic schemas from schema registry")? + .context("Could not connect to the configured schema registry. Double check your configuration and try again.")? } - None => HashMap::new(), + SchemaRegistryConfig::NoSchemaRegistry { .. } => HashMap::new(), }; all_topics diff --git a/source-kafka/src/lib.rs b/source-kafka/src/lib.rs index a080750a4..3b4f07dc1 100644 --- a/source-kafka/src/lib.rs +++ b/source-kafka/src/lib.rs @@ -1,7 +1,7 @@ use std::io::Write; use anyhow::{Context, Result}; -use configuration::{schema_for, EndpointConfig, Resource}; +use configuration::{schema_for, EndpointConfig, Resource, SchemaRegistryConfig}; use discover::do_discover; use proto_flow::capture::{ request::Validate, @@ -12,6 +12,7 @@ use proto_flow::capture::{ }; use pull::do_pull; use rdkafka::consumer::Consumer; +use schema_registry::SchemaRegistryClient; use tokio::io::{self, AsyncBufReadExt}; pub mod configuration; @@ -129,7 +130,22 @@ async fn do_validate(req: Validate) -> Result> { consumer .fetch_metadata(None, KAFKA_TIMEOUT) - .context("could not fetch cluster metadata - double check your configuration")?; + .context("Could not connect to bootstrap server with the provided configuration. This may be due to an incorrect configuration for authentication or bootstrap servers. Double check your configuration and try again.")?; + + match config.schema_registry { + SchemaRegistryConfig::ConfluentSchemaRegistry { + endpoint, + username, + password, + } => { + let client = SchemaRegistryClient::new(endpoint, username, password); + client + .schemas_for_topics(&[]) + .await + .context("Could not connect to the configured schema registry. Double check your configuration and try again.")?; + } + SchemaRegistryConfig::NoSchemaRegistry { .. } => (), + }; req.bindings .iter() diff --git a/source-kafka/src/main.rs b/source-kafka/src/main.rs index 3531828ff..214079f37 100644 --- a/source-kafka/src/main.rs +++ b/source-kafka/src/main.rs @@ -11,14 +11,13 @@ fn main() -> anyhow::Result<()> { let result = runtime.block_on(run_connector(stdin, stdout)); if let Err(err) = result.as_ref() { - tracing::error!(error = %err, "operation failed"); + tracing::error!(error = ?err, "operation failed"); } else { tracing::debug!("connector run successful"); } runtime.shutdown_background(); - tracing::info!(success = %result.is_ok(), "connector exiting"); result } @@ -47,7 +46,6 @@ fn start_runtime() -> anyhow::Result { .with_target(false) .init(); - // These bits about the runtime and shutdown were cargo-culted over from connector-init let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() diff --git a/source-kafka/src/pull.rs b/source-kafka/src/pull.rs index 51379fef7..7121d9877 100644 --- a/source-kafka/src/pull.rs +++ b/source-kafka/src/pull.rs @@ -1,5 +1,5 @@ use crate::{ - configuration::{EndpointConfig, FlowConsumerContext, Resource}, + configuration::{EndpointConfig, FlowConsumerContext, Resource, SchemaRegistryConfig}, schema_registry::{RegisteredSchema, SchemaRegistryClient}, write_capture_response, }; @@ -85,12 +85,12 @@ pub async fn do_pull(req: Open, mut stdout: std::io::Stdout) -> Result<()> { let config: EndpointConfig = serde_json::from_str(&spec.config_json)?; let mut consumer = config.to_consumer().await?; let schema_client = match config.schema_registry { - Some(cfg) => Some(SchemaRegistryClient::new( - cfg.endpoint, - cfg.username, - cfg.password, - )), - None => None, + SchemaRegistryConfig::ConfluentSchemaRegistry { + endpoint, + username, + password, + } => Some(SchemaRegistryClient::new(endpoint, username, password)), + SchemaRegistryConfig::NoSchemaRegistry { .. } => None, }; let mut schema_cache: HashMap = HashMap::new(); diff --git a/source-kafka/src/schema_registry.rs b/source-kafka/src/schema_registry.rs index f879f19eb..0d302586c 100644 --- a/source-kafka/src/schema_registry.rs +++ b/source-kafka/src/schema_registry.rs @@ -1,7 +1,7 @@ use anyhow::{Context, Result}; use futures::stream::{self, StreamExt}; use reqwest::Client; -use serde::Deserialize; +use serde::{de::DeserializeOwned, Deserialize}; use std::collections::{HashMap, HashSet}; const TOPIC_KEY_SUFFIX: &str = "-key"; @@ -73,12 +73,7 @@ impl SchemaRegistryClient { let applicable_topics: HashSet = topics.iter().cloned().collect(); let subjects: Vec = self - .http - .get(format!("{}/subjects", self.endpoint)) - .basic_auth(&self.username, Some(&self.password)) - .send() - .await? - .json() + .make_request(format!("{}/subjects", self.endpoint).as_str()) .await?; let filter_by_suffix = |s: &str, suffix: &str| { @@ -137,13 +132,7 @@ impl SchemaRegistryClient { pub async fn fetch_schema(&self, id: u32) -> Result { let fetched: FetchedSchema = self - .http - .get(format!("{}/schemas/ids/{}", self.endpoint, id)) - .basic_auth(&self.username, Some(&self.password)) - .send() - .await - .context("fetching schema")? - .json() + .make_request(format!("{}/schemas/ids/{}", self.endpoint, id).as_str()) .await?; if fetched.references.is_some() { @@ -167,16 +156,9 @@ impl SchemaRegistryClient { async fn fetch_latest_version(&self, subject: &str) -> Result { let fetched: FetchedLatestVersion = self - .http - .get(format!( - "{}/subjects/{}/versions/latest", - self.endpoint, subject - )) - .basic_auth(&self.username, Some(&self.password)) - .send() - .await - .context("fetching latest schema version for subject")? - .json() + .make_request( + format!("{}/subjects/{}/versions/latest", self.endpoint, subject).as_str(), + ) .await?; Ok(fetched.id) } @@ -194,4 +176,29 @@ impl SchemaRegistryClient { let version = self.fetch_latest_version(subject.as_str()).await?; self.fetch_schema(version).await } + + async fn make_request(&self, url: &str) -> Result + where + T: DeserializeOwned, + { + let res = self + .http + .get(url) + .basic_auth(&self.username, Some(&self.password)) + .send() + .await?; + + if !res.status().is_success() { + let status = res.status(); + let body = res.text().await?; + anyhow::bail!( + "request GET {} failed with status {}: {}", + url, + status, + body + ); + } + + Ok(res.json().await?) + } } diff --git a/source-kafka/tests/snapshots/test__spec.snap b/source-kafka/tests/snapshots/test__spec.snap index efd0605cf..83e3558f9 100644 --- a/source-kafka/tests/snapshots/test__spec.snap +++ b/source-kafka/tests/snapshots/test__spec.snap @@ -46,7 +46,6 @@ expression: "serde_json::to_string_pretty(&got).unwrap()" }, "username": { "order": 2, - "secret": true, "title": "Username", "type": "string" } @@ -98,34 +97,68 @@ expression: "serde_json::to_string_pretty(&got).unwrap()" "type": "object" }, "schema_registry": { - "description": "Connection details for interacting with a schema registry. This is necessary for processing messages encoded with Avro.", - "order": 3, - "properties": { - "endpoint": { - "description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud", - "order": 0, - "title": "Schema Registry Endpoint", - "type": "string" - }, - "password": { - "description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.", - "order": 2, - "secret": true, - "title": "Schema Registry Password", - "type": "string" + "description": "Connection details for interacting with a schema registry.", + "discriminator": { + "propertyName": "schema_registry_type" + }, + "oneOf": [ + { + "properties": { + "endpoint": { + "description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud", + "order": 1, + "title": "Schema Registry Endpoint", + "type": "string" + }, + "password": { + "description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.", + "order": 3, + "secret": true, + "title": "Schema Registry Password", + "type": "string" + }, + "schema_registry_type": { + "const": "confluent_schema_registry", + "default": "confluent_schema_registry", + "order": 0, + "type": "string" + }, + "username": { + "description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.", + "order": 2, + "title": "Schema Registry Username", + "type": "string" + } + }, + "required": [ + "endpoint", + "password", + "username" + ], + "title": "Confluent Schema Registry" }, - "username": { - "description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.", - "order": 1, - "title": "Schema Registry Username", - "type": "string" + { + "properties": { + "enable_json_only": { + "description": "If no schema registry is configured the capture will attempt to parse all data as JSON, and discovered collections will use a key of the message partition & offset. All available topics will be discovered, but if their messages are not encoded as JSON attempting to capture them will result in errors. If your topics contain messages encoded with a schema, you should configure the connector to use the schema registry for optimal results.", + "order": 1, + "title": "Capture Messages in JSON Format Only", + "type": "boolean" + }, + "schema_registry_type": { + "const": "no_schema_registry", + "default": "no_schema_registry", + "order": 0, + "type": "string" + } + }, + "required": [ + "enable_json_only" + ], + "title": "No Schema Registry" } - }, - "required": [ - "endpoint", - "password", - "username" ], + "order": 3, "title": "Schema Registry", "type": "object" }, @@ -142,7 +175,8 @@ expression: "serde_json::to_string_pretty(&got).unwrap()" }, "required": [ "bootstrap_servers", - "credentials" + "credentials", + "schema_registry" ], "title": "Kafka Source Configuration", "type": "object" diff --git a/source-kafka/tests/test.flow.yaml b/source-kafka/tests/test.flow.yaml index 054348c68..fe8a4d91c 100644 --- a/source-kafka/tests/test.flow.yaml +++ b/source-kafka/tests/test.flow.yaml @@ -11,6 +11,7 @@ captures: config: bootstrap_servers: "localhost:9092" schema_registry: + schema_registry_type: confluent_schema_registry endpoint: http://localhost:8081 username: user password: password diff --git a/tests/source-kafka/setup.sh b/tests/source-kafka/setup.sh index d0fa1e339..7ce7d4595 100755 --- a/tests/source-kafka/setup.sh +++ b/tests/source-kafka/setup.sh @@ -3,7 +3,7 @@ set -e export TEST_STREAM="estuary-test-$(shuf -zer -n6 {a..z} | tr -d '\0')" export RESOURCE="{\"topic\": \"${TEST_STREAM}\"}" -export CONNECTOR_CONFIG='{"bootstrap_servers": "source-kafka-db-1.flow-test:9092"}' +export CONNECTOR_CONFIG='{"bootstrap_servers": "source-kafka-db-1.flow-test:9092", "schema_registry": {"schema_registry_type": "no_schema_registry", "enable_json_only": true}}' LISTENER_HOST="source-kafka-db-1.flow-test" docker compose -f source-kafka/docker-compose.yaml up --wait --detach