Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for writing Avro data #422

Merged
merged 12 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"arroyo-controller",
"arroyo-connectors",
"arroyo-datastream",
"arroyo-formats",
"arroyo-macro",
"arroyo-metrics",
"arroyo-node",
Expand Down
1 change: 1 addition & 0 deletions arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ arroyo-connectors = { path = "../arroyo-connectors" }
arroyo-sql = { path = "../arroyo-sql" }
arroyo-datastream = { path = "../arroyo-datastream" }
arroyo-state = { path = "../arroyo-state" }
arroyo-formats = { path = "../arroyo-formats" }

tonic = { workspace = true }
tonic-reflection = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arroyo_rpc::api_types::{ConnectionTableCollection, PaginationQueryParams};
use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{
ConfluentSchemaResolver, ConfluentSchemaResponse, ConfluentSchemaType,
ConfluentSchemaRegistry, ConfluentSchemaResponse, ConfluentSchemaType,
};
use arroyo_sql::avro;
use arroyo_sql::json_schema::convert_json_schema;
Expand Down Expand Up @@ -452,7 +452,7 @@ async fn expand_avro_schema(
if let Some(Format::Avro(AvroFormat {
confluent_schema_registry: true,
..
})) = &schema.format
})) = &mut schema.format
{
let schema_response = get_schema(connector, table_config, profile_config).await?;

Expand Down Expand Up @@ -557,7 +557,7 @@ async fn get_schema(
bad_request("schema registry must be configured on the Kafka connection profile")
})?;

let resolver = ConfluentSchemaResolver::new(
let resolver = ConfluentSchemaRegistry::new(
&schema_registry.endpoint,
&table.topic,
schema_registry.api_key.clone(),
Expand Down
150 changes: 126 additions & 24 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use anyhow::{anyhow, bail, Context};
use arrow_schema::Field;
use arroyo_connectors::connector_for_type;
use axum::extract::{Path, Query, State};
use axum::Json;
use axum_extra::extract::WithRejection;
use cornucopia_async::{GenericClient, Params};
use deadpool_postgres::{Object, Transaction};
use http::StatusCode;

use petgraph::Direction;
use std::collections::HashMap;

use std::time::Duration;

use crate::{connection_profiles, jobs, pipelines, types};
Expand All @@ -21,9 +26,15 @@ use arroyo_rpc::grpc::api::{
create_pipeline_req, CreateJobReq, CreatePipelineReq, CreateSqlJob, PipelineProgram,
};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_formats::avro::arrow_to_avro_schema;
use arroyo_rpc::formats::Format;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistry, ConfluentSchemaType};
use arroyo_rpc::OperatorConfig;
use arroyo_server_common::log_event;
use arroyo_sql::{has_duplicate_udf_names, ArroyoSchemaProvider, SqlConfig};
use arroyo_sql::types::StructDef;
use arroyo_sql::{has_duplicate_udf_names, ArroyoSchemaProvider, CompiledSql, SqlConfig};
use petgraph::visit::EdgeRef;
use prost::Message;
use serde_json::json;
Expand All @@ -50,7 +61,7 @@ async fn compile_sql<'e, E>(
parallelism: usize,
auth_data: &AuthData,
tx: &E,
) -> anyhow::Result<(Program, Vec<i64>)>
) -> anyhow::Result<CompiledSql>
where
E: GenericClient,
{
Expand Down Expand Up @@ -126,7 +137,7 @@ where
schema_provider.add_connection_profile(profile);
}

let (program, connections) = arroyo_sql::parse_and_get_program(
arroyo_sql::parse_and_get_program(
&query,
schema_provider,
SqlConfig {
Expand All @@ -138,9 +149,7 @@ where
.map_err(|err| {
warn!("{:?}", err);
anyhow!(format!("{}", err.root_cause()))
})?;

Ok((program, connections))
})
}

fn set_parallelism(program: &mut Program, parallelism: usize) {
Expand All @@ -149,15 +158,93 @@ fn set_parallelism(program: &mut Program, parallelism: usize) {
}
}

async fn try_register_confluent_schema(
sink: &mut ConnectorOp,
schema: &StructDef,
) -> anyhow::Result<()> {
let mut config: OperatorConfig = serde_json::from_str(&sink.config).unwrap();

let Ok(profile) = serde_json::from_value::<KafkaConfig>(config.connection.clone()) else {
return Ok(());
};

let Ok(table) = serde_json::from_value::<KafkaTable>(config.table.clone()) else {
return Ok(());
};

let Some(registry_config) = profile.schema_registry else {
return Ok(());
};

let schema_registry = ConfluentSchemaRegistry::new(
&registry_config.endpoint,
&table.topic,
registry_config.api_key,
registry_config.api_secret,
)?;

match config.format.clone() {
Some(Format::Avro(mut avro)) => {
if avro.confluent_schema_registry && avro.schema_version.is_none() {
let fields: Vec<Field> = schema.fields.iter().map(|f| f.clone().into()).collect();

let schema = arrow_to_avro_schema(&schema.struct_name_ident(), &fields.into());

let version = schema_registry
.write_schema(schema.canonical_form(), ConfluentSchemaType::Avro)
.await
.map_err(|e| anyhow!("Failed to write schema to schema registry: {}", e))?;

avro.schema_version = Some(version as u32);
config.format = Some(Format::Avro(avro))
}
}
Some(Format::Json(_)) => {
// TODO: add json schema support
}
_ => {
// unsupported for schema registry
}
}

sink.config = serde_json::to_string(&config).unwrap();

Ok(())
}

async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()> {
for node in compiled_sql.program.graph.node_indices() {
let Some(input) = compiled_sql
.program
.graph
.edges_directed(node, Direction::Incoming)
.next()
else {
continue;
};

let Some(value_schema) = compiled_sql.schemas.get(&input.weight().value) else {
continue;
};

let node = compiled_sql.program.graph.node_weight_mut(node).unwrap();

if let Operator::ConnectorSink(connector) = &mut node.operator {
try_register_confluent_schema(connector, value_schema).await?;
}
}

Ok(())
}

pub(crate) async fn create_pipeline<'a>(
req: &CreatePipelineReq,
pub_id: &str,
auth: AuthData,
tx: &Transaction<'a>,
) -> Result<(i64, Program), ErrorResp> {
let pipeline_type;
let mut program;
let connections;
let mut compiled;
let text;
let udfs: Option<Vec<Udf>>;
let is_preview;
Expand All @@ -170,11 +257,14 @@ pub(crate) async fn create_pipeline<'a>(
));
}
pipeline_type = PipelineType::rust;
program = PipelineProgram::decode(&bytes[..])
.map_err(log_and_map)?
.try_into()
.map_err(log_and_map)?;
connections = vec![];
compiled = CompiledSql {
program: PipelineProgram::decode(&bytes[..])
.map_err(log_and_map)?
.try_into()
.map_err(log_and_map)?,
connection_ids: vec![],
schemas: HashMap::new(),
};
text = None;
udfs = None;
is_preview = false;
Expand All @@ -191,7 +281,7 @@ pub(crate) async fn create_pipeline<'a>(
let api_udfs = sql.udfs.into_iter().map(|t| t.into()).collect::<Vec<Udf>>();

pipeline_type = PipelineType::sql;
(program, connections) = compile_sql(
compiled = compile_sql(
sql.query.clone(),
&api_udfs,
sql.parallelism as usize,
Expand All @@ -206,15 +296,15 @@ pub(crate) async fn create_pipeline<'a>(
}
};

optimizations::optimize(&mut program.graph);
optimizations::optimize(&mut compiled.program.graph);

if program.graph.node_count() > auth.org_metadata.max_operators as usize {
if compiled.program.graph.node_count() > auth.org_metadata.max_operators as usize {
return Err(bad_request(
format!("This pipeline is too large to create under your plan, which only allows pipelines up to {} nodes;
contact [email protected] for an increase", auth.org_metadata.max_operators)));
}

let errors = program.validate_graph();
let errors = compiled.program.validate_graph();
if !errors.is_empty() {
let errs: Vec<String> = errors.iter().map(|s| format!(" * {}\n", s)).collect();

Expand All @@ -224,18 +314,30 @@ pub(crate) async fn create_pipeline<'a>(
)));
}

set_parallelism(&mut program, 1);
set_parallelism(&mut compiled.program, 1);

if is_preview {
for node in program.graph.node_weights_mut() {
// if it is a connector sink or switch to a web sink
for node in compiled.program.graph.node_weights_mut() {
// replace all sink connectors with websink for preview
if let Operator::ConnectorSink { .. } = node.operator {
node.operator = Operator::ConnectorSink(ConnectorOp::web_sink());
}
}
}

let proto_program: PipelineProgram = program.clone().try_into().map_err(log_and_map)?;
register_schemas(&mut compiled)
.await
.map_err(|e| ErrorResp {
status_code: StatusCode::BAD_REQUEST,
message: format!(
"Failed to register schemas with the schema registry. Make sure \
that the schema_registry is configured correctly and running.\nDetails: {}",
e
),
})?;

let proto_program: PipelineProgram =
compiled.program.clone().try_into().map_err(log_and_map)?;

let program_bytes = proto_program.encode_to_vec();

Expand All @@ -260,7 +362,7 @@ pub(crate) async fn create_pipeline<'a>(
.map_err(|e| handle_db_error("pipeline", e))?;

if !is_preview {
for connection in connections {
for connection in compiled.connection_ids {
api_queries::add_pipeline_connection_table()
.bind(
tx,
Expand All @@ -273,7 +375,7 @@ pub(crate) async fn create_pipeline<'a>(
}
}

Ok((pipeline_id, program))
Ok((pipeline_id, compiled.program))
}

impl TryInto<Pipeline> for DbPipeline {
Expand Down Expand Up @@ -363,7 +465,7 @@ pub async fn validate_query(

let pipeline_graph_validation_result =
match compile_sql(validate_query_post.query, &udfs, 1, &auth_data, &client).await {
Ok((mut program, _)) => {
Ok(CompiledSql { mut program, .. }) => {
optimizations::optimize(&mut program.graph);
let nodes = program
.graph
Expand Down
2 changes: 1 addition & 1 deletion arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl Scheduling {
ctx.status.pipeline_path = None;
ctx.status.wasm_path = None;

// TODO: this introduces the possiblility of an infinite loop, if compiling succeeds but for some
// TODO: this introduces the possibility of an infinite loop, if compiling succeeds but for some
// reason we are not able to read the pipeline binary that it produces (e.g., we may have perms
// to write to S3, but not read). Addressing that will take a more sophisticated error handling
// system that is able to track errors across multiple states.
Expand Down
Loading
Loading