Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Kafka schema registry an enum #427

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_connectors::{connector_for_type, ErasedConnector};
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, SchemaDefinition,
Expand Down Expand Up @@ -553,22 +553,25 @@ async fn get_schema(
let table: KafkaTable =
serde_json::from_value(table_config.clone()).expect("invalid kafka table");

let schema_registry = profile.schema_registry.as_ref().ok_or_else(|| {
bad_request("schema registry must be configured on the Kafka connection profile")
})?;

let resolver = ConfluentSchemaRegistry::new(
&schema_registry.endpoint,
&table.topic,
schema_registry.api_key.clone(),
schema_registry.api_secret.clone(),
)
.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;
let Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) = profile.schema_registry_enum
else {
return Err(bad_request(
"schema registry must be configured on the Kafka connection profile",
));
};

let resolver =
ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key.clone(), api_secret.clone())
.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;

resolver.get_schema(None).await.map_err(|e| {
bad_request(format!(
Expand Down
17 changes: 9 additions & 8 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arroyo_rpc::grpc::api::{
create_pipeline_req, CreateJobReq, CreatePipelineReq, CreateSqlJob, PipelineProgram,
};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_formats::avro::arrow_to_avro_schema;
use arroyo_rpc::formats::Format;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
Expand Down Expand Up @@ -172,16 +172,17 @@ async fn try_register_confluent_schema(
return Ok(());
};

let Some(registry_config) = profile.schema_registry else {
let Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) = profile.schema_registry_enum
else {
return Ok(());
};

let schema_registry = ConfluentSchemaRegistry::new(
&registry_config.endpoint,
&table.topic,
registry_config.api_key,
registry_config.api_secret,
)?;
let schema_registry =
ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key, api_secret)?;

match config.format.clone() {
Some(Format::Avro(mut avro)) => {
Expand Down
9 changes: 4 additions & 5 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ use std::collections::HashMap;
use std::convert::Infallible;
use typify::import_types;

use axum::response::sse::Event;
use std::time::{Duration, Instant};

use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use axum::response::sse::Event;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::BorrowedMessage,
ClientConfig, Offset, TopicPartitionList,
};
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tonic::Status;
use tracing::{error, info, warn};
Expand Down Expand Up @@ -159,7 +158,7 @@ impl Connector for KafkaConnector {
let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| {
let api_key = options.remove("schema_registry.api_key");
let api_secret = options.remove("schema_registry.api_secret");
SchemaRegistry {
SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
Expand All @@ -168,7 +167,7 @@ impl Connector for KafkaConnector {
KafkaConfig {
authentication: auth,
bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", options)?),
schema_registry,
schema_registry_enum: schema_registry,
}
}
};
Expand Down
6 changes: 4 additions & 2 deletions arroyo-console/src/routes/connections/JsonForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,9 @@ export function JsonForm({
error: string | null;
button?: string;
}) {
const ajv = useMemo(() => addFormats(new Ajv()), [schema]);
let ajv = new Ajv();
ajv.addKeyword('isSensitive');
const memoAjv = useMemo(() => addFormats(ajv), [schema]);

const formik = useFormik({
initialValues: initial,
Expand All @@ -645,7 +647,7 @@ export function JsonForm({
errors.name = 'Name is required';
}

let validate = ajv.compile(schema);
let validate = memoAjv.compile(schema);
let valid = validate(values);

if (!valid) {
Expand Down
4 changes: 2 additions & 2 deletions arroyo-sql-macro/src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn get_json_schema_source() -> Result<Connection> {
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".try_into().unwrap(),
schema_registry: None,
schema_registry_enum: None,
};
let table = KafkaTable {
topic: "test_topic".to_string(),
Expand Down Expand Up @@ -195,7 +195,7 @@ pub fn get_avro_source() -> Result<Connection> {
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".try_into().unwrap(),
schema_registry: None,
schema_registry_enum: None,
};
let table = KafkaTable {
topic: "test_topic".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion arroyo-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ pub fn get_test_expression(
KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
bootstrap_servers: "localhost:9092".to_string().try_into().unwrap(),
schema_registry: None,
schema_registry_enum: None,
},
KafkaTable {
topic: "test_topic".to_string(),
Expand Down
15 changes: 10 additions & 5 deletions arroyo-worker/src/connectors/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::time::Duration;
use tokio::select;
use tracing::{debug, error, info, warn};

use super::{client_configs, KafkaConfig, KafkaTable, ReadMode, TableType};
use super::{client_configs, KafkaConfig, KafkaTable, ReadMode, SchemaRegistry, TableType};

#[cfg(test)]
mod test;
Expand Down Expand Up @@ -105,13 +105,18 @@ where
}

let schema_resolver: Arc<dyn SchemaResolver + Sync> =
if let Some(schema_registry) = &connection.schema_registry {
if let Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) = &connection.schema_registry_enum
{
Arc::new(
ConfluentSchemaRegistry::new(
&schema_registry.endpoint,
&endpoint,
&table.topic,
schema_registry.api_key.clone(),
schema_registry.api_secret.clone(),
api_key.clone(),
api_secret.clone(),
)
.expect("failed to construct confluent schema resolver"),
)
Expand Down
73 changes: 45 additions & 28 deletions connector-schemas/kafka/connection.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"type": "string",
"title": "Bootstrap Servers",
"description": "Comma-separated list of Kafka servers to connect to",
"examples": ["broker-1:9092,broker-2:9092"],
"examples": [
"broker-1:9092,broker-2:9092"
],
"pattern": "^(([\\w\\.\\-]+:\\d+),)*([\\w\\.\\-]+:\\d+)$"
},
"authentication": {
Expand Down Expand Up @@ -51,38 +53,53 @@
}
]
},
"schemaRegistry": {
"schemaRegistryEnum": {
"type": "object",
"title": "Schema Registry",
"properties": {
"endpoint": {
"title": "Endpoint",
"type": "string",
"description": "The endpoint for your Confluent Schema Registry if you have one",
"examples": [
"http://localhost:8081"
],
"format": "uri"
},
"apiKey": {
"title": "API Key",
"type": "string",
"description": "The API key for your Confluent Schema Registry if you have one",
"examples": [
"ABCDEFGHIJK01234"
"oneOf": [
{
"type": "object",
"title": "Confluent Schema Registry",
"properties": {
"endpoint": {
"title": "Endpoint",
"type": "string",
"description": "The endpoint for your Confluent Schema Registry",
"examples": [
"http://localhost:8081"
],
"format": "uri"
},
"apiKey": {
"title": "API Key",
"type": "string",
"description": "The API key for your Confluent Schema Registry",
"examples": [
"ABCDEFGHIJK01234"
]
},
"apiSecret": {
"title": "API Secret",
"type": "string",
"description": "Secret for your Confluent Schema Registry",
"isSensitive": true,
"examples": [
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/="
]
}
},
"required": [
"endpoint"
]
},
"apiSecret": {
"title": "API Secret",
"type": "string",
"description": "Secret for your Confluent Schema Registry if you have one",
"isSensitive": true,
"examples": [
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/="
]
{
"type": "object",
"title": "None",
"properties": {
},
"additionalProperties": false
}
},
"required": ["endpoint"]
]
}
},
"required": [
Expand Down
Loading