Skip to content

Commit

Permalink
Implement json sink for schema registry, use connection types for sin…
Browse files Browse the repository at this point in the history
…ks, add auth for schema registry. (#416)
  • Loading branch information
Jackson Newhouse authored Nov 21, 2023
1 parent 47c6ab9 commit fba6e6f
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 90 deletions.
23 changes: 15 additions & 8 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,9 @@ async fn expand_json_schema(
) -> Result<ConnectionSchema, ErrorResp> {
if let Some(Format::Json(JsonFormat {
confluent_schema_registry: true,
confluent_schema_version,
..
})) = &schema.format
})) = schema.format.as_mut()
{
let schema_response = get_schema(connector, table_config, profile_config).await?;

Expand All @@ -508,6 +509,7 @@ async fn expand_json_schema(
schema_response.schema_type
)));
}
confluent_schema_version.replace(schema_response.version);

schema.definition = Some(SchemaDefinition::JsonSchema(schema_response.schema));
}
Expand Down Expand Up @@ -555,13 +557,18 @@ async fn get_schema(
bad_request("schema registry must be configured on the Kafka connection profile")
})?;

let resolver =
ConfluentSchemaResolver::new(&schema_registry.endpoint, &table.topic).map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;
let resolver = ConfluentSchemaResolver::new(
&schema_registry.endpoint,
&table.topic,
schema_registry.api_key.clone(),
schema_registry.api_secret.clone(),
)
.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;

resolver.get_schema(None).await.map_err(|e| {
bad_request(format!(
Expand Down
12 changes: 9 additions & 3 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,15 @@ impl Connector for KafkaConnector {
Some(other) => bail!("unknown auth type '{}'", other),
};

let schema_registry = opts
.remove("schema_registry.endpoint")
.map(|endpoint| SchemaRegistry { endpoint });
let schema_registry = opts.remove("schema_registry.endpoint").map(|endpoint| {
let api_key = opts.remove("schema_registry.api_key");
let api_secret = opts.remove("schema_registry.api_secret");
SchemaRegistry {
endpoint,
api_key,
api_secret,
}
});

let connection = KafkaConfig {
authentication: auth,
Expand Down
6 changes: 5 additions & 1 deletion arroyo-console/src/routes/connections/JsonForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ function StringWidget({
description,
placeholder,
required,
password,
maxLength,
value,
errors,
Expand All @@ -39,6 +40,7 @@ function StringWidget({
placeholder?: string;
maxLength?: number;
required?: boolean;
password?: boolean;
value: string;
errors: any;
onChange: (e: React.ChangeEvent<any>) => void;
Expand All @@ -49,7 +51,7 @@ function StringWidget({
{maxLength == null || maxLength < 100 ? (
<Input
name={path}
type="text"
type={password ? 'password' : 'text'}
placeholder={placeholder}
value={value || ''}
onChange={e => onChange(e)}
Expand Down Expand Up @@ -324,6 +326,8 @@ export function FormInner({
title={property.title || key}
description={property.description}
required={schema.required?.includes(key)}
// @ts-ignore
password={property.isSensitive || false}
maxLength={property.maxLength}
// @ts-ignore
placeholder={property.examples ? (property.examples[0] as string) : undefined}
Expand Down
8 changes: 8 additions & 0 deletions arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub struct JsonFormat {
#[serde(default)]
pub confluent_schema_registry: bool,

#[serde(default)]
pub confluent_schema_version: Option<u32>,

#[serde(default)]
pub include_schema: bool,

Expand All @@ -61,6 +64,10 @@ impl JsonFormat {
.filter(|t| t == "true")
.is_some();

if include_schema && confluent_schema_registry {
return Err("can't include schema in message if using schema registry".to_string());
}

let unstructured = opts
.remove("json.unstructured")
.filter(|t| t == "true")
Expand All @@ -81,6 +88,7 @@ impl JsonFormat {

Ok(Self {
confluent_schema_registry,
confluent_schema_version: None,
include_schema,
debezium,
unstructured,
Expand Down
20 changes: 17 additions & 3 deletions arroyo-rpc/src/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,17 @@ pub struct ConfluentSchemaResolver {
endpoint: Url,
topic: String,
client: Client,
api_key: Option<String>,
api_secret: Option<String>,
}

impl ConfluentSchemaResolver {
pub fn new(endpoint: &str, topic: &str) -> anyhow::Result<Self> {
pub fn new(
endpoint: &str,
topic: &str,
api_key: Option<String>,
api_secret: Option<String>,
) -> anyhow::Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(5))
.build()
Expand All @@ -98,6 +105,8 @@ impl ConfluentSchemaResolver {
client,
topic: topic.to_string(),
endpoint,
api_key,
api_secret,
})
}

Expand All @@ -114,13 +123,18 @@ impl ConfluentSchemaResolver {
)
.unwrap();

let resp = self.client.get(url.clone()).send().await.map_err(|e| {
let mut get_call = self.client.get(url.clone());

if let Some(api_key) = self.api_key.as_ref() {
get_call = get_call.basic_auth(api_key, self.api_secret.as_ref());
}

let resp = get_call.send().await.map_err(|e| {
warn!("Got error response from schema registry: {:?}", e);
match e.status() {
Some(StatusCode::NOT_FOUND) => {
anyhow!("Could not find value schema for topic '{}'", self.topic)
}

Some(code) => anyhow!("Schema registry returned error: {}", code),
None => {
warn!(
Expand Down
1 change: 1 addition & 0 deletions arroyo-sql-macro/src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn get_json_schema_source() -> Result<Connection> {
let connection_schema = ConnectionSchema::try_new(
Some(Format::Json(JsonFormat {
confluent_schema_registry: false,
confluent_schema_version: None,
include_schema: false,
debezium: false,
unstructured: false,
Expand Down
4 changes: 3 additions & 1 deletion arroyo-sql/src/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ use syn::{parse_quote, parse_str};
pub struct Projection {
pub fields: Vec<(Column, Expression)>,
pub format: Option<Format>,
pub struct_name: Option<String>,
}

impl Projection {
pub fn new(fields: Vec<(Column, Expression)>) -> Self {
Self {
fields,
format: None,
struct_name: None,
}
}

Expand Down Expand Up @@ -72,7 +74,7 @@ impl CodeGenerator<ValuePointerContext, StructDef, syn::Expr> for Projection {
StructField::new(col.name.clone(), col.relation.clone(), field_type)
})
.collect();
StructDef::new(None, true, fields, self.format.clone())
StructDef::new(self.struct_name.clone(), true, fields, self.format.clone())
}
}

Expand Down
120 changes: 58 additions & 62 deletions arroyo-sql/src/plan_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,10 +1142,10 @@ pub struct PlanGraph {
pub graph: DiGraph<PlanNode, PlanEdge>,
pub types: HashSet<StructDef>,
pub key_structs: HashSet<String>,
pub sources: HashMap<String, NodeIndex>,
pub connections: HashMap<String, NodeIndex>,
pub named_tables: HashMap<String, NodeIndex>,
pub sql_config: SqlConfig,
pub saved_sources_used: Vec<i64>,
pub saved_connections_used: Vec<i64>,
}

impl PlanGraph {
Expand All @@ -1154,10 +1154,10 @@ impl PlanGraph {
graph: DiGraph::new(),
types: HashSet::new(),
key_structs: HashSet::new(),
sources: HashMap::new(),
connections: HashMap::new(),
named_tables: HashMap::new(),
sql_config,
saved_sources_used: vec![],
saved_connections_used: vec![],
}
}

Expand Down Expand Up @@ -1327,11 +1327,11 @@ impl PlanGraph {
}

fn add_sql_source(&mut self, source_operator: SourceOperator) -> NodeIndex {
if let Some(node_index) = self.sources.get(&source_operator.name) {
if let Some(node_index) = self.connections.get(&source_operator.name) {
return *node_index;
}
if let Some(source_id) = source_operator.source.id {
self.saved_sources_used.push(source_id);
self.saved_connections_used.push(source_id);
}
let mut current_index = match source_operator.source.processing_mode {
ProcessingMode::Update => self.add_debezium_source(&source_operator),
Expand Down Expand Up @@ -1408,7 +1408,8 @@ impl PlanGraph {
};
self.graph
.add_edge(current_index, watermark_index, watermark_edge);
self.sources.insert(source_operator.name, watermark_index);
self.connections
.insert(source_operator.name, watermark_index);
watermark_index
}

Expand Down Expand Up @@ -1756,64 +1757,58 @@ impl PlanGraph {
) -> NodeIndex {
let input_index = self.add_sql_operator(*input);
let input_node = self.get_plan_node(input_index);
if let PlanType::Updating(inner) = &input_node.output_type {
let value_type = inner.as_syn_type();
let debezium_type = PlanType::Debezium {
key: None,
value_type: quote!(#value_type).to_string(),
values: inner.value_structs(),
};
let debezium_index =
self.insert_operator(PlanOperator::ToDebezium, debezium_type.clone());

let edge = PlanEdge {
edge_type: EdgeType::Forward,
};
self.graph.add_edge(input_index, debezium_index, edge);

let plan_node = PlanOperator::Sink(name, sql_sink);
let plan_node_index = self.insert_operator(plan_node, debezium_type);

let debezium_edge = PlanEdge {
edge_type: EdgeType::Forward,
};

self.graph
.add_edge(debezium_index, plan_node_index, debezium_edge);
plan_node_index
} else if matches!(sql_sink.updating_type, SinkUpdateType::Force) {
let value_type = input_node.output_type.as_syn_type();
let debezium_type = PlanType::Debezium {
key: None,
value_type: quote!(#value_type).to_string(),
values: input_node.output_type.value_structs(),
};
let debezium_index =
self.insert_operator(PlanOperator::ToDebezium, debezium_type.clone());
let edge = PlanEdge {
edge_type: EdgeType::Forward,
};
self.graph.add_edge(input_index, debezium_index, edge);
let (incoming_node_index, sink_node_type) =
if let PlanType::Updating(inner) = &input_node.output_type {
let value_type = inner.as_syn_type();
let debezium_type = PlanType::Debezium {
key: None,
value_type: quote!(#value_type).to_string(),
values: inner.value_structs(),
};
let debezium_index =
self.insert_operator(PlanOperator::ToDebezium, debezium_type.clone());

let plan_node = PlanOperator::Sink(name, sql_sink);
let plan_node_index = self.insert_operator(plan_node, debezium_type);
let edge = PlanEdge {
edge_type: EdgeType::Forward,
};
self.graph.add_edge(input_index, debezium_index, edge);

(debezium_index, debezium_type)
} else if matches!(sql_sink.updating_type, SinkUpdateType::Force) {
let value_type = input_node.output_type.as_syn_type();
let debezium_type = PlanType::Debezium {
key: None,
value_type: quote!(#value_type).to_string(),
values: input_node.output_type.value_structs(),
};
let debezium_index =
self.insert_operator(PlanOperator::ToDebezium, debezium_type.clone());
let edge = PlanEdge {
edge_type: EdgeType::Forward,
};
self.graph.add_edge(input_index, debezium_index, edge);

let debezium_edge = PlanEdge {
edge_type: EdgeType::Forward,
(debezium_index, debezium_type)
} else {
(input_index, input_node.output_type.clone())
};

self.graph
.add_edge(debezium_index, plan_node_index, debezium_edge);
let sink_node = self.connections.get(&name).cloned().unwrap_or_else(|| {
if let Some(connection_id) = sql_sink.id.clone() {
self.saved_connections_used.push(connection_id);
}
let plan_node = PlanOperator::Sink(name.clone(), sql_sink);
let plan_node_index = self.insert_operator(plan_node, sink_node_type);
self.connections.insert(name.clone(), plan_node_index);
plan_node_index
} else {
let plan_node = PlanOperator::Sink(name, sql_sink);
let plan_node_index = self.insert_operator(plan_node, input_node.output_type.clone());
let edge = PlanEdge {
});
self.graph.add_edge(
incoming_node_index,
sink_node,
PlanEdge {
edge_type: EdgeType::Forward,
};
self.graph.add_edge(input_index, plan_node_index, edge);
plan_node_index
}
},
);
sink_node
}

fn add_updating_aggregator(
Expand Down Expand Up @@ -1921,6 +1916,7 @@ impl PlanGraph {
let projection = RecordTransform::ValueProjection(Projection {
fields,
format: None,
struct_name: None,
});
let input_node = self.get_plan_node(input_index);
let plan_node = PlanNode::from_record_transform(projection, input_node);
Expand Down Expand Up @@ -1963,7 +1959,7 @@ pub fn get_program(
optimize(&mut plan_graph.graph);

let mut key_structs = HashSet::new();
let sources = plan_graph.saved_sources_used.clone();
let sources = plan_graph.saved_connections_used.clone();
plan_graph.graph.node_weights().for_each(|node| {
let key_names = node.output_type.get_key_struct_names();
key_structs.extend(key_names);
Expand Down Expand Up @@ -2018,7 +2014,7 @@ pub fn get_program(
schema_provider
.source_defs
.into_iter()
.filter(|(k, _)| plan_graph.sources.contains_key(k))
.filter(|(k, _)| plan_graph.connections.contains_key(k))
.map(|(_, v)| v),
);

Expand Down
Loading

0 comments on commit fba6e6f

Please sign in to comment.