Skip to content

Commit

Permalink
Make Kafka schema registry an enum
Browse files Browse the repository at this point in the history
The JSON form in the console does not behave properly when an optional
object has an optional field, which is the case with schema registry.
Making it an enum in the schema solves it for this case.
  • Loading branch information
jbeisen committed Nov 28, 2023
1 parent f516ce8 commit aea50be
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 68 deletions.
37 changes: 20 additions & 17 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_connectors::{connector_for_type, ErasedConnector};
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, SchemaDefinition,
Expand Down Expand Up @@ -553,22 +553,25 @@ async fn get_schema(
let table: KafkaTable =
serde_json::from_value(table_config.clone()).expect("invalid kafka table");

let schema_registry = profile.schema_registry.as_ref().ok_or_else(|| {
bad_request("schema registry must be configured on the Kafka connection profile")
})?;

let resolver = ConfluentSchemaRegistry::new(
&schema_registry.endpoint,
&table.topic,
schema_registry.api_key.clone(),
schema_registry.api_secret.clone(),
)
.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;
let Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) = profile.schema_registry_enum
else {
return Err(bad_request(
"schema registry must be configured on the Kafka connection profile",
));
};

let resolver =
ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key.clone(), api_secret.clone())
.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;

resolver.get_schema(None).await.map_err(|e| {
bad_request(format!(
Expand Down
17 changes: 9 additions & 8 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arroyo_rpc::grpc::api::{
create_pipeline_req, CreateJobReq, CreatePipelineReq, CreateSqlJob, PipelineProgram,
};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_formats::avro::arrow_to_avro_schema;
use arroyo_rpc::formats::Format;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
Expand Down Expand Up @@ -172,16 +172,17 @@ async fn try_register_confluent_schema(
return Ok(());
};

let Some(registry_config) = profile.schema_registry else {
let Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) = profile.schema_registry_enum
else {
return Ok(());
};

let schema_registry = ConfluentSchemaRegistry::new(
&registry_config.endpoint,
&table.topic,
registry_config.api_key,
registry_config.api_secret,
)?;
let schema_registry =
ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key, api_secret)?;

match config.format.clone() {
Some(Format::Avro(mut avro)) => {
Expand Down
9 changes: 4 additions & 5 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ use std::collections::HashMap;
use std::convert::Infallible;
use typify::import_types;

use axum::response::sse::Event;
use std::time::{Duration, Instant};

use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use axum::response::sse::Event;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::BorrowedMessage,
ClientConfig, Offset, TopicPartitionList,
};
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tonic::Status;
use tracing::{error, info, warn};
Expand Down Expand Up @@ -159,7 +158,7 @@ impl Connector for KafkaConnector {
let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| {
let api_key = options.remove("schema_registry.api_key");
let api_secret = options.remove("schema_registry.api_secret");
SchemaRegistry {
SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
Expand All @@ -168,7 +167,7 @@ impl Connector for KafkaConnector {
KafkaConfig {
authentication: auth,
bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", options)?),
schema_registry,
schema_registry_enum: schema_registry,
}
}
};
Expand Down
6 changes: 4 additions & 2 deletions arroyo-console/src/routes/connections/JsonForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,9 @@ export function JsonForm({
error: string | null;
button?: string;
}) {
const ajv = useMemo(() => addFormats(new Ajv()), [schema]);
let ajv = new Ajv();
ajv.addKeyword('isSensitive');
const memoAjv = useMemo(() => addFormats(ajv), [schema]);

const formik = useFormik({
initialValues: initial,
Expand All @@ -645,7 +647,7 @@ export function JsonForm({
errors.name = 'Name is required';
}

let validate = ajv.compile(schema);
let validate = memoAjv.compile(schema);
let valid = validate(values);

if (!valid) {
Expand Down
4 changes: 2 additions & 2 deletions arroyo-sql-macro/src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn get_json_schema_source() -> Result<Connection> {
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".try_into().unwrap(),
schema_registry: None,
schema_registry_enum: None,
};
let table = KafkaTable {
topic: "test_topic".to_string(),
Expand Down Expand Up @@ -195,7 +195,7 @@ pub fn get_avro_source() -> Result<Connection> {
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".try_into().unwrap(),
schema_registry: None,
schema_registry_enum: None,
};
let table = KafkaTable {
topic: "test_topic".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion arroyo-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ pub fn get_test_expression(
KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".to_string().try_into().unwrap(),
schema_registry: None,
schema_registry_enum: None,
},
KafkaTable {
topic: "test_topic".to_string(),
Expand Down
15 changes: 10 additions & 5 deletions arroyo-worker/src/connectors/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::time::Duration;
use tokio::select;
use tracing::{debug, error, info, warn};

use super::{client_configs, KafkaConfig, KafkaTable, ReadMode, TableType};
use super::{client_configs, KafkaConfig, KafkaTable, ReadMode, SchemaRegistry, TableType};

#[cfg(test)]
mod test;
Expand Down Expand Up @@ -105,13 +105,18 @@ where
}

let schema_resolver: Arc<dyn SchemaResolver + Sync> =
if let Some(schema_registry) = &connection.schema_registry {
if let Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) = &connection.schema_registry_enum
{
Arc::new(
ConfluentSchemaRegistry::new(
&schema_registry.endpoint,
&endpoint,
&table.topic,
schema_registry.api_key.clone(),
schema_registry.api_secret.clone(),
api_key.clone(),
api_secret.clone(),
)
.expect("failed to construct confluent schema resolver"),
)
Expand Down
73 changes: 45 additions & 28 deletions connector-schemas/kafka/connection.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"type": "string",
"title": "Bootstrap Servers",
"description": "Comma-separated list of Kafka servers to connect to",
"examples": ["broker-1:9092,broker-2:9092"],
"examples": [
"broker-1:9092,broker-2:9092"
],
"pattern": "^(([\\w\\.\\-]+:\\d+),)*([\\w\\.\\-]+:\\d+)$"
},
"authentication": {
Expand Down Expand Up @@ -51,38 +53,53 @@
}
]
},
"schemaRegistry": {
"schemaRegistryEnum": {
"type": "object",
"title": "Schema Registry",
"properties": {
"endpoint": {
"title": "Endpoint",
"type": "string",
"description": "The endpoint for your Confluent Schema Registry if you have one",
"examples": [
"http://localhost:8081"
],
"format": "uri"
},
"apiKey": {
"title": "API Key",
"type": "string",
"description": "The API key for your Confluent Schema Registry if you have one",
"examples": [
"ABCDEFGHIJK01234"
"oneOf": [
{
"type": "object",
"title": "Confluent Schema Registry",
"properties": {
"endpoint": {
"title": "Endpoint",
"type": "string",
"description": "The endpoint for your Confluent Schema Registry",
"examples": [
"http://localhost:8081"
],
"format": "uri"
},
"apiKey": {
"title": "API Key",
"type": "string",
"description": "The API key for your Confluent Schema Registry",
"examples": [
"ABCDEFGHIJK01234"
]
},
"apiSecret": {
"title": "API Secret",
"type": "string",
"description": "Secret for your Confluent Schema Registry",
"isSensitive": true,
"examples": [
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/="
]
}
},
"required": [
"endpoint"
]
},
"apiSecret": {
"title": "API Secret",
"type": "string",
"description": "Secret for your Confluent Schema Registry if you have one",
"isSensitive": true,
"examples": [
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/="
]
{
"type": "object",
"title": "None",
"properties": {
},
"additionalProperties": false
}
},
"required": ["endpoint"]
]
}
},
"required": [
Expand Down

0 comments on commit aea50be

Please sign in to comment.