Skip to content

Commit

Permalink
source-kafka: minor adjustments to error messages and schema registry…
Browse files Browse the repository at this point in the history
… configuration

* Force a more explicit distinction for using a schema registry vs. not using
  one, and add opt-in clarification that not using a schema registry requires
  that all messages be encoded as JSON and collections will use the generic keys
  of partition of offset values.

* Minor adjustments to error messages for common cases of incorrect auth or
  connection details.
  • Loading branch information
williamhbaker committed Nov 15, 2024
1 parent ad63c42 commit c48bf72
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 105 deletions.
122 changes: 87 additions & 35 deletions source-kafka/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use rdkafka::client::{ClientContext, OAuthToken};
use rdkafka::consumer::{BaseConsumer, ConsumerContext};
use rdkafka::ClientConfig;
use schemars::{schema::RootSchema, JsonSchema};
use serde::{Deserialize, Serialize};
use serde::{de, Deserialize, Deserializer, Serialize};

#[derive(Serialize, Deserialize, Default)]
#[derive(Serialize, Deserialize)]
pub struct EndpointConfig {
bootstrap_servers: String,
credentials: Option<Credentials>,
tls: Option<TlsSettings>,
pub schema_registry: Option<SchemaRegistryConfig>,
pub schema_registry: SchemaRegistryConfig,
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -55,10 +55,31 @@ pub enum TlsSettings {
}

#[derive(Serialize, Deserialize)]
pub struct SchemaRegistryConfig {
pub endpoint: String,
pub username: String,
pub password: String,
#[serde(tag = "schema_registry_type")]
#[serde(rename_all = "snake_case")]
pub enum SchemaRegistryConfig {
ConfluentSchemaRegistry {
endpoint: String,
username: String,
password: String,
},
NoSchemaRegistry {
#[serde(deserialize_with = "validate_json_only_true")]
enable_json_only: bool,
},
}

fn validate_json_only_true<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
D: Deserializer<'de>,
{
if bool::deserialize(deserializer)? {
Ok(true)
} else {
Err(de::Error::custom(
"'enable_json_only' must be set to true when no schema registry is configured",
))
}
}

impl JsonSchema for EndpointConfig {
Expand All @@ -73,7 +94,8 @@ impl JsonSchema for EndpointConfig {
"type": "object",
"required": [
"bootstrap_servers",
"credentials"
"credentials",
"schema_registry"
],
"properties": {
"bootstrap_servers": {
Expand Down Expand Up @@ -113,7 +135,6 @@ impl JsonSchema for EndpointConfig {
},
"username": {
"order": 2,
"secret": true,
"title": "Username",
"type": "string"
},
Expand Down Expand Up @@ -176,35 +197,66 @@ impl JsonSchema for EndpointConfig {
},
"schema_registry": {
"title": "Schema Registry",
"description": "Connection details for interacting with a schema registry. This is necessary for processing messages encoded with Avro.",
"description": "Connection details for interacting with a schema registry.",
"type": "object",
"properties": {
"endpoint": {
"type": "string",
"title": "Schema Registry Endpoint",
"description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud",
"order": 0
"order": 3,
"discriminator": {
"propertyName": "schema_registry_type"
},
"oneOf": [{
"title": "Confluent Schema Registry",
"properties": {
"schema_registry_type": {
"type": "string",
"default": "confluent_schema_registry",
"const": "confluent_schema_registry",
"order": 0
},
"endpoint": {
"type": "string",
"title": "Schema Registry Endpoint",
"description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud",
"order": 1
},
"username": {
"type": "string",
"title": "Schema Registry Username",
"description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.",
"order": 2
},
"password": {
"type": "string",
"title": "Schema Registry Password",
"description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.",
"order": 3,
"secret": true
}
},
"username": {
"type": "string",
"title": "Schema Registry Username",
"description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.",
"order": 1
"required": [
"endpoint",
"username",
"password"
],
}, {
"title": "No Schema Registry",
"properties": {
"schema_registry_type": {
"type": "string",
"default": "no_schema_registry",
"const": "no_schema_registry",
"order": 0
},
"enable_json_only": {
"type": "boolean",
"title": "Capture Messages in JSON Format Only",
"description": "If no schema registry is configured the capture will attempt to parse all data as JSON, and discovered collections will use a key of the message partition & offset. All available topics will be discovered, but if their messages are not encoded as JSON attempting to capture them will result in errors. If your topics contain messages encoded with a schema, you should configure the connector to use the schema registry for optimal results.",
"order": 1
}
},
"password": {
"type": "string",
"title": "Schema Registry Password",
"description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.",
"order": 2,
"secret": true
}
},
"required": [
"endpoint",
"username",
"password"
],
"order": 3
"required": [
"enable_json_only",
],
}],
}
}
}))
Expand Down
16 changes: 10 additions & 6 deletions source-kafka/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use schemars::schema::RootSchema;
use serde_json::json;

use crate::{
configuration::{EndpointConfig, Resource},
configuration::{EndpointConfig, Resource, SchemaRegistryConfig},
schema_registry::{
RegisteredSchema::{Avro, Json, Protobuf},
SchemaRegistryClient, TopicSchema,
Expand All @@ -29,7 +29,7 @@ pub async fn do_discover(req: Discover) -> Result<Vec<discovered::Binding>> {

let meta = consumer
.fetch_metadata(None, KAFKA_TIMEOUT)
.context("could not fetch cluster metadata - double check your configuration")?;
.context("Could not connect to bootstrap server with the provided configuration. This may be due to an incorrect configuration for authentication or bootstrap servers. Double check your configuration and try again.")?;

let mut all_topics: Vec<String> = meta
.topics()
Expand All @@ -47,14 +47,18 @@ pub async fn do_discover(req: Discover) -> Result<Vec<discovered::Binding>> {
all_topics.sort();

let registered_schemas = match config.schema_registry {
Some(cfg) => {
let client = SchemaRegistryClient::new(cfg.endpoint, cfg.username, cfg.password);
SchemaRegistryConfig::ConfluentSchemaRegistry {
endpoint,
username,
password,
} => {
let client = SchemaRegistryClient::new(endpoint, username, password);
client
.schemas_for_topics(&all_topics)
.await
.context("fetching topic schemas from schema registry")?
.context("Could not connect to the configured schema registry. Double check your configuration and try again.")?
}
None => HashMap::new(),
SchemaRegistryConfig::NoSchemaRegistry { .. } => HashMap::new(),
};

all_topics
Expand Down
20 changes: 18 additions & 2 deletions source-kafka/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io::Write;

use anyhow::{Context, Result};
use configuration::{schema_for, EndpointConfig, Resource};
use configuration::{schema_for, EndpointConfig, Resource, SchemaRegistryConfig};
use discover::do_discover;
use proto_flow::capture::{
request::Validate,
Expand All @@ -12,6 +12,7 @@ use proto_flow::capture::{
};
use pull::do_pull;
use rdkafka::consumer::Consumer;
use schema_registry::SchemaRegistryClient;
use tokio::io::{self, AsyncBufReadExt};

pub mod configuration;
Expand Down Expand Up @@ -129,7 +130,22 @@ async fn do_validate(req: Validate) -> Result<Vec<ValidatedBinding>> {

consumer
.fetch_metadata(None, KAFKA_TIMEOUT)
.context("could not fetch cluster metadata - double check your configuration")?;
.context("Could not connect to bootstrap server with the provided configuration. This may be due to an incorrect configuration for authentication or bootstrap servers. Double check your configuration and try again.")?;

match config.schema_registry {
SchemaRegistryConfig::ConfluentSchemaRegistry {
endpoint,
username,
password,
} => {
let client = SchemaRegistryClient::new(endpoint, username, password);
client
.schemas_for_topics(&[])
.await
.context("Could not connect to the configured schema registry. Double check your configuration and try again.")?;
}
SchemaRegistryConfig::NoSchemaRegistry { .. } => (),
};

req.bindings
.iter()
Expand Down
4 changes: 1 addition & 3 deletions source-kafka/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ fn main() -> anyhow::Result<()> {
let result = runtime.block_on(run_connector(stdin, stdout));

if let Err(err) = result.as_ref() {
tracing::error!(error = %err, "operation failed");
tracing::error!(error = ?err, "operation failed");
} else {
tracing::debug!("connector run successful");
}

runtime.shutdown_background();

tracing::info!(success = %result.is_ok(), "connector exiting");
result
}

Expand Down Expand Up @@ -47,7 +46,6 @@ fn start_runtime() -> anyhow::Result<tokio::runtime::Runtime> {
.with_target(false)
.init();

// These bits about the runtime and shutdown were cargo-culted over from connector-init
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
Expand Down
14 changes: 7 additions & 7 deletions source-kafka/src/pull.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
configuration::{EndpointConfig, FlowConsumerContext, Resource},
configuration::{EndpointConfig, FlowConsumerContext, Resource, SchemaRegistryConfig},
schema_registry::{RegisteredSchema, SchemaRegistryClient},
write_capture_response,
};
Expand Down Expand Up @@ -85,12 +85,12 @@ pub async fn do_pull(req: Open, mut stdout: std::io::Stdout) -> Result<()> {
let config: EndpointConfig = serde_json::from_str(&spec.config_json)?;
let mut consumer = config.to_consumer().await?;
let schema_client = match config.schema_registry {
Some(cfg) => Some(SchemaRegistryClient::new(
cfg.endpoint,
cfg.username,
cfg.password,
)),
None => None,
SchemaRegistryConfig::ConfluentSchemaRegistry {
endpoint,
username,
password,
} => Some(SchemaRegistryClient::new(endpoint, username, password)),
SchemaRegistryConfig::NoSchemaRegistry { .. } => None,
};
let mut schema_cache: HashMap<u32, RegisteredSchema> = HashMap::new();

Expand Down
55 changes: 31 additions & 24 deletions source-kafka/src/schema_registry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Context, Result};
use futures::stream::{self, StreamExt};
use reqwest::Client;
use serde::Deserialize;
use serde::{de::DeserializeOwned, Deserialize};
use std::collections::{HashMap, HashSet};

const TOPIC_KEY_SUFFIX: &str = "-key";
Expand Down Expand Up @@ -73,12 +73,7 @@ impl SchemaRegistryClient {
let applicable_topics: HashSet<String> = topics.iter().cloned().collect();

let subjects: Vec<String> = self
.http
.get(format!("{}/subjects", self.endpoint))
.basic_auth(&self.username, Some(&self.password))
.send()
.await?
.json()
.make_request(format!("{}/subjects", self.endpoint).as_str())
.await?;

let filter_by_suffix = |s: &str, suffix: &str| {
Expand Down Expand Up @@ -137,13 +132,7 @@ impl SchemaRegistryClient {

pub async fn fetch_schema(&self, id: u32) -> Result<RegisteredSchema> {
let fetched: FetchedSchema = self
.http
.get(format!("{}/schemas/ids/{}", self.endpoint, id))
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.context("fetching schema")?
.json()
.make_request(format!("{}/schemas/ids/{}", self.endpoint, id).as_str())
.await?;

if fetched.references.is_some() {
Expand All @@ -167,16 +156,9 @@ impl SchemaRegistryClient {

async fn fetch_latest_version(&self, subject: &str) -> Result<u32> {
let fetched: FetchedLatestVersion = self
.http
.get(format!(
"{}/subjects/{}/versions/latest",
self.endpoint, subject
))
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.context("fetching latest schema version for subject")?
.json()
.make_request(
format!("{}/subjects/{}/versions/latest", self.endpoint, subject).as_str(),
)
.await?;
Ok(fetched.id)
}
Expand All @@ -194,4 +176,29 @@ impl SchemaRegistryClient {
let version = self.fetch_latest_version(subject.as_str()).await?;
self.fetch_schema(version).await
}

async fn make_request<T>(&self, url: &str) -> Result<T>
where
T: DeserializeOwned,
{
let res = self
.http
.get(url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await?;

if !res.status().is_success() {
let status = res.status();
let body = res.text().await?;
anyhow::bail!(
"request GET {} failed with status {}: {}",
url,
status,
body
);
}

Ok(res.json().await?)
}
}
Loading

0 comments on commit c48bf72

Please sign in to comment.