Skip to content

Commit

Permalink
Initial support for Avro formats (ArroyoSystems#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored and zh4ngx committed Nov 16, 2023
1 parent 743f130 commit 97e893a
Show file tree
Hide file tree
Showing 38 changed files with 1,745 additions and 678 deletions.
205 changes: 171 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 1 addition & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ tonic-build = { version = "0.9" }
tonic-web = { version = "0.9" }
tonic-reflection = { version = "0.9" }
arrow = { version = "46.0.0" }
arrow-buffer = { version = "46.0.0" }
arrow-array = { version = "46.0.0" }
arrow-schema = { version = "46.0.0" }
object_store = { version = "0.7.1" }
Expand All @@ -44,16 +43,11 @@ parquet = { version = "46.0.0" }
[profile.release]
debug = 1

[profile.release.package.copy-artifacts]
# optimize for small binary size
strip = true
opt-level = "z"

[patch.crates-io]
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'}
3 changes: 2 additions & 1 deletion arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ petgraph = {version = "0.6", features = ["serde-1"]}

http = "0.2"
tower-http = {version = "0.4", features = ["trace", "fs", "cors", "validate-request", "auth"]}
axum = {version = "0.6.12", features = ["headers", "tokio"]}
axum = {version = "0.6.12", features = ["headers", "tokio", "macros"]}
axum-extra = "0.7.4"
thiserror = "1.0.40"
utoipa = "3"
Expand Down Expand Up @@ -77,6 +77,7 @@ cornucopia_async = { version = "0.4", features = ["with-serde_json-1"] }
jwt-simple = "0.11.4"
uuid = "1.3.3"
regress = "0.6.0"
apache-avro = "0.16.0"

[build-dependencies]
cornucopia = { version = "0.9" }
Expand Down
273 changes: 144 additions & 129 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ use axum_extra::extract::WithRejection;
use cornucopia_async::GenericClient;
use cornucopia_async::Params;
use futures_util::stream::Stream;
use http::StatusCode;
use serde_json::json;
use serde_json::{json, Value};
use std::convert::Infallible;
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_connectors::{connector_for_type, ErasedConnector};
use arroyo_rpc::api_types::connections::{
ConfluentSchema, ConfluentSchemaQueryParams, ConnectionProfile, ConnectionSchema,
ConnectionTable, ConnectionTablePost, SchemaDefinition,
ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, SchemaDefinition,
};
use arroyo_rpc::api_types::{ConnectionTableCollection, PaginationQueryParams};
use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{
ConfluentSchemaResolver, ConfluentSchemaResponse, ConfluentSchemaType,
};
use arroyo_sql::avro;
use arroyo_sql::json_schema::convert_json_schema;
use arroyo_sql::types::{StructField, TypeDef};

Expand Down Expand Up @@ -95,8 +99,9 @@ async fn get_and_validate_connector<E: GenericClient>(
.transpose()
.map_err(|e| bad_request(format!("Invalid schema: {}", e)))?;

let schema = if let Some(schema) = &schema {
Some(expand_schema(&req.name, schema)?)
let schema = if let Some(schema) = schema {
let name = connector.name();
Some(expand_schema(&req.name, name, schema, &req.config, &profile_config).await?)
} else {
None
};
Expand Down Expand Up @@ -130,13 +135,14 @@ pub(crate) async fn delete_connection_table(
.map_err(|e| handle_delete("connection_table", "pipelines", e))?;

if deleted == 0 {
return Err(not_found("Connection table".to_string()));
return Err(not_found("Connection table"));
}

Ok(())
}

/// Test a Connection Table
#[axum::debug_handler]
#[utoipa::path(
post,
path = "/v1/connection_tables/test",
Expand Down Expand Up @@ -254,7 +260,7 @@ pub async fn create_connection_table(
.unwrap()
.to_string();

if let Some(schema) = &req.schema {
if let Some(schema) = &schema {
if schema.definition.is_none() {
return Err(required_field("schema.definition"));
}
Expand Down Expand Up @@ -413,31 +419,109 @@ pub(crate) async fn get_connection_tables(

// attempts to fill in the SQL schema from a schema object that may just have a json-schema or
// other source schema. schemas stored in the database should always be expanded first.
pub(crate) fn expand_schema(
pub(crate) async fn expand_schema(
name: &str,
connector: &str,
schema: ConnectionSchema,
table_config: &Value,
profile_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
let Some(format) = schema.format.as_ref() else {
return Ok(schema);
};

match format {
Format::Json(_) => {
expand_json_schema(name, connector, schema, table_config, profile_config).await
}
Format::Avro(_) => {
expand_avro_schema(name, connector, schema, table_config, profile_config).await
}
Format::Parquet(_) => Ok(schema),
Format::RawString(_) => Ok(schema),
}
}

async fn expand_avro_schema(
name: &str,
schema: &ConnectionSchema,
connector: &str,
mut schema: ConnectionSchema,
table_config: &Value,
profile_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
let mut schema = schema.clone();
if let Some(Format::Avro(AvroFormat {
confluent_schema_registry: true,
..
})) = &schema.format
{
let schema_response = get_schema(connector, table_config, profile_config).await?;

if schema_response.schema_type != ConfluentSchemaType::Avro {
return Err(bad_request(format!(
"Format configured is avro, but confluent schema repository returned a {:?} schema",
schema_response.schema_type
)));
}

schema.definition = Some(SchemaDefinition::AvroSchema(schema_response.schema));
}

let Some(SchemaDefinition::AvroSchema(definition)) = schema.definition.as_ref() else {
return Err(bad_request("avro format requires an avro schema be set"));
};

if let Some(Format::Avro(format)) = &mut schema.format {
format.add_reader_schema(
apache_avro::Schema::parse_str(&definition)
.map_err(|e| bad_request(format!("Avro schema is invalid: {:?}", e)))?,
);
}

let fields: Result<_, String> = avro::convert_avro_schema(&name, &definition)
.map_err(|e| bad_request(format!("Invalid avro schema: {}", e)))?
.into_iter()
.map(|f| f.try_into())
.collect();

schema.fields = fields.map_err(|e| bad_request(format!("Failed to convert schema: {}", e)))?;

Ok(schema)
}

async fn expand_json_schema(
name: &str,
connector: &str,
mut schema: ConnectionSchema,
table_config: &Value,
profile_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
if let Some(Format::Json(JsonFormat {
confluent_schema_registry: true,
..
})) = &schema.format
{
let schema_response = get_schema(connector, table_config, profile_config).await?;

if schema_response.schema_type != ConfluentSchemaType::Json {
return Err(bad_request(format!(
"Format configured is json, but confluent schema repository returned a {:?} schema",
schema_response.schema_type
)));
}

schema.definition = Some(SchemaDefinition::JsonSchema(schema_response.schema));
}

if let Some(d) = &schema.definition {
let fields = match d {
SchemaDefinition::JsonSchema(json) => convert_json_schema(name, &json)
SchemaDefinition::JsonSchema(json) => convert_json_schema(&name, &json)
.map_err(|e| bad_request(format!("Invalid json-schema: {}", e)))?,
SchemaDefinition::ProtobufSchema(_) => {
return Err(bad_request(
"Protobuf schemas are not yet supported".to_string(),
))
}
SchemaDefinition::AvroSchema(_) => {
return Err(bad_request(
"Avro schemas are not yet supported".to_string(),
))
}
SchemaDefinition::RawSchema(_) => vec![StructField::new(
"value".to_string(),
None,
TypeDef::DataType(DataType::Utf8, false),
)],
_ => return Err(bad_request("Invalid schema type for json format")),
};

let fields: Result<_, String> = fields.into_iter().map(|f| f.try_into()).collect();
Expand All @@ -449,6 +533,44 @@ pub(crate) fn expand_schema(
Ok(schema)
}

async fn get_schema(
connector: &str,
table_config: &Value,
profile_config: &Value,
) -> Result<ConfluentSchemaResponse, ErrorResp> {
if connector != "kafka" {
return Err(bad_request(
"confluent schema registry can only be used for Kafka connections",
));
}

// we unwrap here because this should already have been validated
let profile: KafkaConfig =
serde_json::from_value(profile_config.clone()).expect("invalid kafka config");

let table: KafkaTable =
serde_json::from_value(table_config.clone()).expect("invalid kafka table");

let schema_registry = profile.schema_registry.as_ref().ok_or_else(|| {
bad_request("schema registry must be configured on the Kafka connection profile")
})?;

let resolver =
ConfluentSchemaResolver::new(&schema_registry.endpoint, &table.topic).map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;

resolver.get_schema(None).await.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})
}

/// Test a Connection Schema
#[utoipa::path(
post,
Expand Down Expand Up @@ -480,110 +602,3 @@ pub(crate) async fn test_schema(
}
}
}

/// Get a Confluent Schema
#[utoipa::path(
get,
path = "/v1/connection_tables/schemas/confluent",
tag = "connection_tables",
params(
("topic" = String, Query, description = "Confluent topic name"),
("endpoint" = String, Query, description = "Confluent schema registry endpoint"),
),
responses(
(status = 200, description = "Got Confluent Schema", body = ConfluentSchema),
),
)]
pub(crate) async fn get_confluent_schema(
query_params: Query<ConfluentSchemaQueryParams>,
) -> Result<Json<ConfluentSchema>, ErrorResp> {
// TODO: ensure only external URLs can be hit
let url = format!(
"{}/subjects/{}-value/versions/latest",
query_params.endpoint, query_params.topic
);
let resp = reqwest::get(url).await.map_err(|e| {
warn!("Got error response from schema registry: {:?}", e);
match e.status() {
Some(StatusCode::NOT_FOUND) => bad_request(format!(
"Could not find value schema for topic '{}'",
query_params.topic
)),

Some(code) => bad_request(format!("Schema registry returned error: {}", code)),
None => {
warn!(
"Unknown error connecting to schema registry {}: {:?}",
query_params.endpoint, e
);
bad_request(format!(
"Could not connect to Schema Registry at {}: unknown error",
query_params.endpoint
))
}
}
})?;

if !resp.status().is_success() {
let message = format!(
"Received an error status code from the provided endpoint: {} {}",
resp.status().as_u16(),
resp.bytes()
.await
.map(|bs| String::from_utf8_lossy(&bs).to_string())
.unwrap_or_else(|_| "<failed to read body>".to_string())
);
return Err(bad_request(message));
}

let value: serde_json::Value = resp.json().await.map_err(|e| {
warn!("Invalid json from schema registry: {:?}", e);
bad_request(format!(
"Schema registry returned invalid JSON: {}",
e.to_string()
))
})?;

let schema_type = value
.get("schemaType")
.ok_or_else(|| {
bad_request(
"The JSON returned from this endpoint was unexpected. Please confirm that the URL is correct."
.to_string(),
)
})?
.as_str();

if schema_type != Some("JSON") {
return Err(bad_request(
"Only JSON schema types are supported currently".to_string(),
));
}

let schema = value
.get("schema")
.ok_or_else(|| {
return bad_request("Missing 'schema' field in schema registry response".to_string());
})?
.as_str()
.ok_or_else(|| {
return bad_request(
"The 'schema' field in the schema registry response is not a string".to_string(),
);
})?;

if let Err(e) = convert_json_schema(&query_params.topic, schema) {
warn!(
"Schema from schema registry is not valid: '{}': {}",
schema, e
);
return Err(bad_request(format!(
"Schema from schema registry is not valid: {}",
e
)));
}

Ok(Json(ConfluentSchema {
schema: schema.to_string(),
}))
}
2 changes: 1 addition & 1 deletion arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ pub async fn get_checkpoint_details(
.await
.map_err(log_and_map)?
.ok_or_else(|| {
not_found(format!(
not_found(&format!(
"Checkpoint with epoch {} for job '{}'",
epoch, job_pub_id
))
Expand Down
Loading

0 comments on commit 97e893a

Please sign in to comment.