From fba6e6fcae2a9c8187064f112217693175dea6ed Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Mon, 20 Nov 2023 17:15:31 -0800 Subject: [PATCH] Implement json sink for schema registry, use connection types for sinks, add auth for schema registry. (#416) --- arroyo-api/src/connection_tables.rs | 23 ++-- arroyo-connectors/src/kafka.rs | 12 +- .../src/routes/connections/JsonForm.tsx | 6 +- arroyo-rpc/src/formats.rs | 8 ++ arroyo-rpc/src/schema_resolver.rs | 20 ++- arroyo-sql-macro/src/connectors.rs | 1 + arroyo-sql/src/operators.rs | 4 +- arroyo-sql/src/plan_graph.rs | 120 +++++++++--------- arroyo-sql/src/tables.rs | 1 + .../src/connectors/kafka/source/mod.rs | 9 +- arroyo-worker/src/formats/mod.rs | 21 +-- connector-schemas/kafka/connection.json | 20 ++- 12 files changed, 155 insertions(+), 90 deletions(-) diff --git a/arroyo-api/src/connection_tables.rs b/arroyo-api/src/connection_tables.rs index b13788f31..d24f8537f 100644 --- a/arroyo-api/src/connection_tables.rs +++ b/arroyo-api/src/connection_tables.rs @@ -497,8 +497,9 @@ async fn expand_json_schema( ) -> Result { if let Some(Format::Json(JsonFormat { confluent_schema_registry: true, + confluent_schema_version, .. - })) = &schema.format + })) = schema.format.as_mut() { let schema_response = get_schema(connector, table_config, profile_config).await?; @@ -508,6 +509,7 @@ async fn expand_json_schema( schema_response.schema_type ))); } + confluent_schema_version.replace(schema_response.version); schema.definition = Some(SchemaDefinition::JsonSchema(schema_response.schema)); } @@ -555,13 +557,18 @@ async fn get_schema( bad_request("schema registry must be configured on the Kafka connection profile") })?; - let resolver = - ConfluentSchemaResolver::new(&schema_registry.endpoint, &table.topic).map_err(|e| { - bad_request(format!( - "failed to fetch schemas from schema repository: {}", - e - )) - })?; + let resolver = ConfluentSchemaResolver::new( + &schema_registry.endpoint, + &table.topic, + schema_registry.api_key.clone(), + schema_registry.api_secret.clone(), + ) + .map_err(|e| { + bad_request(format!( + "failed to fetch schemas from schema repository: {}", + e + )) + })?; resolver.get_schema(None).await.map_err(|e| { bad_request(format!( diff --git a/arroyo-connectors/src/kafka.rs b/arroyo-connectors/src/kafka.rs index a5e5262fa..03b28bdb4 100644 --- a/arroyo-connectors/src/kafka.rs +++ b/arroyo-connectors/src/kafka.rs @@ -151,9 +151,15 @@ impl Connector for KafkaConnector { Some(other) => bail!("unknown auth type '{}'", other), }; - let schema_registry = opts - .remove("schema_registry.endpoint") - .map(|endpoint| SchemaRegistry { endpoint }); + let schema_registry = opts.remove("schema_registry.endpoint").map(|endpoint| { + let api_key = opts.remove("schema_registry.api_key"); + let api_secret = opts.remove("schema_registry.api_secret"); + SchemaRegistry { + endpoint, + api_key, + api_secret, + } + }); let connection = KafkaConfig { authentication: auth, diff --git a/arroyo-console/src/routes/connections/JsonForm.tsx b/arroyo-console/src/routes/connections/JsonForm.tsx index 76aebe250..092020234 100644 --- a/arroyo-console/src/routes/connections/JsonForm.tsx +++ b/arroyo-console/src/routes/connections/JsonForm.tsx @@ -28,6 +28,7 @@ function StringWidget({ description, placeholder, required, + password, maxLength, value, errors, @@ -39,6 +40,7 @@ function StringWidget({ placeholder?: string; maxLength?: number; required?: boolean; + password?: boolean; value: string; errors: any; onChange: (e: React.ChangeEvent) => void; @@ -49,7 +51,7 @@ function StringWidget({ {maxLength == null || maxLength < 100 ? ( onChange(e)} @@ -324,6 +326,8 @@ export function FormInner({ title={property.title || key} description={property.description} required={schema.required?.includes(key)} + // @ts-ignore + password={property.isSensitive || false} maxLength={property.maxLength} // @ts-ignore placeholder={property.examples ? (property.examples[0] as string) : undefined} diff --git a/arroyo-rpc/src/formats.rs b/arroyo-rpc/src/formats.rs index bfc2b90de..6117987b8 100644 --- a/arroyo-rpc/src/formats.rs +++ b/arroyo-rpc/src/formats.rs @@ -36,6 +36,9 @@ pub struct JsonFormat { #[serde(default)] pub confluent_schema_registry: bool, + #[serde(default)] + pub confluent_schema_version: Option, + #[serde(default)] pub include_schema: bool, @@ -61,6 +64,10 @@ impl JsonFormat { .filter(|t| t == "true") .is_some(); + if include_schema && confluent_schema_registry { + return Err("can't include schema in message if using schema registry".to_string()); + } + let unstructured = opts .remove("json.unstructured") .filter(|t| t == "true") @@ -81,6 +88,7 @@ impl JsonFormat { Ok(Self { confluent_schema_registry, + confluent_schema_version: None, include_schema, debezium, unstructured, diff --git a/arroyo-rpc/src/schema_resolver.rs b/arroyo-rpc/src/schema_resolver.rs index 5309e8a90..2d2d69b8c 100644 --- a/arroyo-rpc/src/schema_resolver.rs +++ b/arroyo-rpc/src/schema_resolver.rs @@ -80,10 +80,17 @@ pub struct ConfluentSchemaResolver { endpoint: Url, topic: String, client: Client, + api_key: Option, + api_secret: Option, } impl ConfluentSchemaResolver { - pub fn new(endpoint: &str, topic: &str) -> anyhow::Result { + pub fn new( + endpoint: &str, + topic: &str, + api_key: Option, + api_secret: Option, + ) -> anyhow::Result { let client = Client::builder() .timeout(Duration::from_secs(5)) .build() @@ -98,6 +105,8 @@ impl ConfluentSchemaResolver { client, topic: topic.to_string(), endpoint, + api_key, + api_secret, }) } @@ -114,13 +123,18 @@ impl ConfluentSchemaResolver { ) .unwrap(); - let resp = self.client.get(url.clone()).send().await.map_err(|e| { + let mut get_call = self.client.get(url.clone()); + + if let Some(api_key) = self.api_key.as_ref() { + get_call = get_call.basic_auth(api_key, self.api_secret.as_ref()); + } + + let resp = get_call.send().await.map_err(|e| { warn!("Got error response from schema registry: {:?}", e); match e.status() { Some(StatusCode::NOT_FOUND) => { anyhow!("Could not find value schema for topic '{}'", self.topic) } - Some(code) => anyhow!("Schema registry returned error: {}", code), None => { warn!( diff --git a/arroyo-sql-macro/src/connectors.rs b/arroyo-sql-macro/src/connectors.rs index c5cdd3ea4..b5970f858 100644 --- a/arroyo-sql-macro/src/connectors.rs +++ b/arroyo-sql-macro/src/connectors.rs @@ -72,6 +72,7 @@ pub fn get_json_schema_source() -> Result { let connection_schema = ConnectionSchema::try_new( Some(Format::Json(JsonFormat { confluent_schema_registry: false, + confluent_schema_version: None, include_schema: false, debezium: false, unstructured: false, diff --git a/arroyo-sql/src/operators.rs b/arroyo-sql/src/operators.rs index 9ea1c4a13..3f7edde2d 100644 --- a/arroyo-sql/src/operators.rs +++ b/arroyo-sql/src/operators.rs @@ -21,6 +21,7 @@ use syn::{parse_quote, parse_str}; pub struct Projection { pub fields: Vec<(Column, Expression)>, pub format: Option, + pub struct_name: Option, } impl Projection { @@ -28,6 +29,7 @@ impl Projection { Self { fields, format: None, + struct_name: None, } } @@ -72,7 +74,7 @@ impl CodeGenerator for Projection { StructField::new(col.name.clone(), col.relation.clone(), field_type) }) .collect(); - StructDef::new(None, true, fields, self.format.clone()) + StructDef::new(self.struct_name.clone(), true, fields, self.format.clone()) } } diff --git a/arroyo-sql/src/plan_graph.rs b/arroyo-sql/src/plan_graph.rs index 6490a49ae..ff801589b 100644 --- a/arroyo-sql/src/plan_graph.rs +++ b/arroyo-sql/src/plan_graph.rs @@ -1142,10 +1142,10 @@ pub struct PlanGraph { pub graph: DiGraph, pub types: HashSet, pub key_structs: HashSet, - pub sources: HashMap, + pub connections: HashMap, pub named_tables: HashMap, pub sql_config: SqlConfig, - pub saved_sources_used: Vec, + pub saved_connections_used: Vec, } impl PlanGraph { @@ -1154,10 +1154,10 @@ impl PlanGraph { graph: DiGraph::new(), types: HashSet::new(), key_structs: HashSet::new(), - sources: HashMap::new(), + connections: HashMap::new(), named_tables: HashMap::new(), sql_config, - saved_sources_used: vec![], + saved_connections_used: vec![], } } @@ -1327,11 +1327,11 @@ impl PlanGraph { } fn add_sql_source(&mut self, source_operator: SourceOperator) -> NodeIndex { - if let Some(node_index) = self.sources.get(&source_operator.name) { + if let Some(node_index) = self.connections.get(&source_operator.name) { return *node_index; } if let Some(source_id) = source_operator.source.id { - self.saved_sources_used.push(source_id); + self.saved_connections_used.push(source_id); } let mut current_index = match source_operator.source.processing_mode { ProcessingMode::Update => self.add_debezium_source(&source_operator), @@ -1408,7 +1408,8 @@ impl PlanGraph { }; self.graph .add_edge(current_index, watermark_index, watermark_edge); - self.sources.insert(source_operator.name, watermark_index); + self.connections + .insert(source_operator.name, watermark_index); watermark_index } @@ -1756,64 +1757,58 @@ impl PlanGraph { ) -> NodeIndex { let input_index = self.add_sql_operator(*input); let input_node = self.get_plan_node(input_index); - if let PlanType::Updating(inner) = &input_node.output_type { - let value_type = inner.as_syn_type(); - let debezium_type = PlanType::Debezium { - key: None, - value_type: quote!(#value_type).to_string(), - values: inner.value_structs(), - }; - let debezium_index = - self.insert_operator(PlanOperator::ToDebezium, debezium_type.clone()); - - let edge = PlanEdge { - edge_type: EdgeType::Forward, - }; - self.graph.add_edge(input_index, debezium_index, edge); - - let plan_node = PlanOperator::Sink(name, sql_sink); - let plan_node_index = self.insert_operator(plan_node, debezium_type); - - let debezium_edge = PlanEdge { - edge_type: EdgeType::Forward, - }; - - self.graph - .add_edge(debezium_index, plan_node_index, debezium_edge); - plan_node_index - } else if matches!(sql_sink.updating_type, SinkUpdateType::Force) { - let value_type = input_node.output_type.as_syn_type(); - let debezium_type = PlanType::Debezium { - key: None, - value_type: quote!(#value_type).to_string(), - values: input_node.output_type.value_structs(), - }; - let debezium_index = - self.insert_operator(PlanOperator::ToDebezium, debezium_type.clone()); - let edge = PlanEdge { - edge_type: EdgeType::Forward, - }; - self.graph.add_edge(input_index, debezium_index, edge); + let (incoming_node_index, sink_node_type) = + if let PlanType::Updating(inner) = &input_node.output_type { + let value_type = inner.as_syn_type(); + let debezium_type = PlanType::Debezium { + key: None, + value_type: quote!(#value_type).to_string(), + values: inner.value_structs(), + }; + let debezium_index = + self.insert_operator(PlanOperator::ToDebezium, debezium_type.clone()); - let plan_node = PlanOperator::Sink(name, sql_sink); - let plan_node_index = self.insert_operator(plan_node, debezium_type); + let edge = PlanEdge { + edge_type: EdgeType::Forward, + }; + self.graph.add_edge(input_index, debezium_index, edge); + + (debezium_index, debezium_type) + } else if matches!(sql_sink.updating_type, SinkUpdateType::Force) { + let value_type = input_node.output_type.as_syn_type(); + let debezium_type = PlanType::Debezium { + key: None, + value_type: quote!(#value_type).to_string(), + values: input_node.output_type.value_structs(), + }; + let debezium_index = + self.insert_operator(PlanOperator::ToDebezium, debezium_type.clone()); + let edge = PlanEdge { + edge_type: EdgeType::Forward, + }; + self.graph.add_edge(input_index, debezium_index, edge); - let debezium_edge = PlanEdge { - edge_type: EdgeType::Forward, + (debezium_index, debezium_type) + } else { + (input_index, input_node.output_type.clone()) }; - - self.graph - .add_edge(debezium_index, plan_node_index, debezium_edge); + let sink_node = self.connections.get(&name).cloned().unwrap_or_else(|| { + if let Some(connection_id) = sql_sink.id.clone() { + self.saved_connections_used.push(connection_id); + } + let plan_node = PlanOperator::Sink(name.clone(), sql_sink); + let plan_node_index = self.insert_operator(plan_node, sink_node_type); + self.connections.insert(name.clone(), plan_node_index); plan_node_index - } else { - let plan_node = PlanOperator::Sink(name, sql_sink); - let plan_node_index = self.insert_operator(plan_node, input_node.output_type.clone()); - let edge = PlanEdge { + }); + self.graph.add_edge( + incoming_node_index, + sink_node, + PlanEdge { edge_type: EdgeType::Forward, - }; - self.graph.add_edge(input_index, plan_node_index, edge); - plan_node_index - } + }, + ); + sink_node } fn add_updating_aggregator( @@ -1921,6 +1916,7 @@ impl PlanGraph { let projection = RecordTransform::ValueProjection(Projection { fields, format: None, + struct_name: None, }); let input_node = self.get_plan_node(input_index); let plan_node = PlanNode::from_record_transform(projection, input_node); @@ -1963,7 +1959,7 @@ pub fn get_program( optimize(&mut plan_graph.graph); let mut key_structs = HashSet::new(); - let sources = plan_graph.saved_sources_used.clone(); + let sources = plan_graph.saved_connections_used.clone(); plan_graph.graph.node_weights().for_each(|node| { let key_names = node.output_type.get_key_struct_names(); key_structs.extend(key_names); @@ -2018,7 +2014,7 @@ pub fn get_program( schema_provider .source_defs .into_iter() - .filter(|(k, _)| plan_graph.sources.contains_key(k)) + .filter(|(k, _)| plan_graph.connections.contains_key(k)) .map(|(_, v)| v), ); diff --git a/arroyo-sql/src/tables.rs b/arroyo-sql/src/tables.rs index 185d58f6d..d5593c343 100644 --- a/arroyo-sql/src/tables.rs +++ b/arroyo-sql/src/tables.rs @@ -452,6 +452,7 @@ impl ConnectorTable { ); projection.format = Some(format.clone()); + projection.struct_name = self.type_name.clone(); input = SqlOperator::RecordTransform( Box::new(input), diff --git a/arroyo-worker/src/connectors/kafka/source/mod.rs b/arroyo-worker/src/connectors/kafka/source/mod.rs index 3da9d9374..e94602b17 100644 --- a/arroyo-worker/src/connectors/kafka/source/mod.rs +++ b/arroyo-worker/src/connectors/kafka/source/mod.rs @@ -108,8 +108,13 @@ where let schema_resolver: Arc = if let Some(schema_registry) = &connection.schema_registry { Arc::new( - ConfluentSchemaResolver::new(&schema_registry.endpoint, &table.topic) - .expect("failed to construct confluent schema resolver"), + ConfluentSchemaResolver::new( + &schema_registry.endpoint, + &table.topic, + schema_registry.api_key.clone(), + schema_registry.api_secret.clone(), + ) + .expect("failed to construct confluent schema resolver"), ) } else { Arc::new(FailingSchemaResolver::new()) diff --git a/arroyo-worker/src/formats/mod.rs b/arroyo-worker/src/formats/mod.rs index b0960bca3..ae6af0990 100644 --- a/arroyo-worker/src/formats/mod.rs +++ b/arroyo-worker/src/formats/mod.rs @@ -180,22 +180,25 @@ impl DataSerializer { pub fn to_vec(&self, record: &T) -> Option> { match &self.format { Format::Json(json) => { - let v = if json.include_schema { + let mut writer: Vec = Vec::with_capacity(128); + if json.confluent_schema_registry { + if json.include_schema { + unreachable!("can't include schema when writing to confluent schema registry, should've been caught when creating JsonFormat"); + } + writer.push(0); + writer.extend(json.confluent_schema_version.expect("must have computed schema version to write using confluent schema registry").to_be_bytes()); + } + if json.include_schema { let record = json! {{ "schema": self.kafka_schema, "payload": record }}; - serde_json::to_vec(&record).unwrap() + serde_json::to_writer(&mut writer, &record).unwrap(); } else { - serde_json::to_vec(record).unwrap() + serde_json::to_writer(&mut writer, record).unwrap(); }; - - if json.confluent_schema_registry { - todo!("Serializing to confluent schema registry is not yet supported"); - } - - Some(v) + Some(writer) } Format::Avro(_) => todo!(), Format::Parquet(_) => todo!(), diff --git a/connector-schemas/kafka/connection.json b/connector-schemas/kafka/connection.json index 96405d6ad..caac41e8c 100644 --- a/connector-schemas/kafka/connection.json +++ b/connector-schemas/kafka/connection.json @@ -43,7 +43,8 @@ }, "password": { "type": "string", - "description": "The password to use for SASL authentication" + "description": "The password to use for SASL authentication", + "isSensitive": true } }, "additionalProperties": false @@ -62,6 +63,23 @@ "http://localhost:8081" ], "format": "uri" + }, + "apiKey": { + "title": "API Key", + "type": "string", + "description": "The API key for your Confluent Schema Registry if you have one", + "examples": [ + "ABCDEFGHIJK01234" + ] + }, + "apiSecret": { + "title": "API Secret", + "type": "string", + "description": "Secret for your Confluent Schema Registry if you have one", + "isSensitive": true, + "examples": [ + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=" + ] } }, "required": ["endpoint"]