Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for Avro formats #386

Merged
merged 12 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 171 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 1 addition & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ tonic-build = { version = "0.9" }
tonic-web = { version = "0.9" }
tonic-reflection = { version = "0.9" }
arrow = { version = "46.0.0" }
arrow-buffer = { version = "46.0.0" }
arrow-array = { version = "46.0.0" }
arrow-schema = { version = "46.0.0" }
object_store = { version = "0.7.1" }
Expand All @@ -44,16 +43,11 @@ parquet = { version = "46.0.0" }
[profile.release]
debug = 1

[profile.release.package.copy-artifacts]
# optimize for small binary size
strip = true
opt-level = "z"

[patch.crates-io]
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '46.0.0/parquet_bytes'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'}
3 changes: 2 additions & 1 deletion arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ petgraph = {version = "0.6", features = ["serde-1"]}

http = "0.2"
tower-http = {version = "0.4", features = ["trace", "fs", "cors", "validate-request", "auth"]}
axum = {version = "0.6.12", features = ["headers", "tokio"]}
axum = {version = "0.6.12", features = ["headers", "tokio", "macros"]}
axum-extra = "0.7.4"
thiserror = "1.0.40"
utoipa = "3"
Expand Down Expand Up @@ -77,6 +77,7 @@ cornucopia_async = { version = "0.4", features = ["with-serde_json-1"] }
jwt-simple = "0.11.4"
uuid = "1.3.3"
regress = "0.6.0"
apache-avro = "0.16.0"

[build-dependencies]
cornucopia = { version = "0.9" }
Expand Down
273 changes: 144 additions & 129 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ use axum_extra::extract::WithRejection;
use cornucopia_async::GenericClient;
use cornucopia_async::Params;
use futures_util::stream::Stream;
use http::StatusCode;
use serde_json::json;
use serde_json::{json, Value};
use std::convert::Infallible;
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_connectors::{connector_for_type, ErasedConnector};
use arroyo_rpc::api_types::connections::{
ConfluentSchema, ConfluentSchemaQueryParams, ConnectionProfile, ConnectionSchema,
ConnectionTable, ConnectionTablePost, SchemaDefinition,
ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, SchemaDefinition,
};
use arroyo_rpc::api_types::{ConnectionTableCollection, PaginationQueryParams};
use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{
ConfluentSchemaResolver, ConfluentSchemaResponse, ConfluentSchemaType,
};
use arroyo_sql::avro;
use arroyo_sql::json_schema::convert_json_schema;
use arroyo_sql::types::{StructField, TypeDef};

Expand Down Expand Up @@ -95,8 +99,9 @@ async fn get_and_validate_connector<E: GenericClient>(
.transpose()
.map_err(|e| bad_request(format!("Invalid schema: {}", e)))?;

let schema = if let Some(schema) = &schema {
Some(expand_schema(&req.name, schema)?)
let schema = if let Some(schema) = schema {
let name = connector.name();
Some(expand_schema(&req.name, name, schema, &req.config, &profile_config).await?)
} else {
None
};
Expand Down Expand Up @@ -130,13 +135,14 @@ pub(crate) async fn delete_connection_table(
.map_err(|e| handle_delete("connection_table", "pipelines", e))?;

if deleted == 0 {
return Err(not_found("Connection table".to_string()));
return Err(not_found("Connection table"));
}

Ok(())
}

/// Test a Connection Table
#[axum::debug_handler]
#[utoipa::path(
post,
path = "/v1/connection_tables/test",
Expand Down Expand Up @@ -254,7 +260,7 @@ pub async fn create_connection_table(
.unwrap()
.to_string();

if let Some(schema) = &req.schema {
if let Some(schema) = &schema {
if schema.definition.is_none() {
return Err(required_field("schema.definition"));
}
Expand Down Expand Up @@ -413,31 +419,109 @@ pub(crate) async fn get_connection_tables(

// attempts to fill in the SQL schema from a schema object that may just have a json-schema or
// other source schema. schemas stored in the database should always be expanded first.
pub(crate) fn expand_schema(
pub(crate) async fn expand_schema(
name: &str,
connector: &str,
schema: ConnectionSchema,
table_config: &Value,
profile_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
let Some(format) = schema.format.as_ref() else {
return Ok(schema);
};

match format {
Format::Json(_) => {
expand_json_schema(name, connector, schema, table_config, profile_config).await
}
Format::Avro(_) => {
expand_avro_schema(name, connector, schema, table_config, profile_config).await
}
Format::Parquet(_) => Ok(schema),
Format::RawString(_) => Ok(schema),
}
}

async fn expand_avro_schema(
name: &str,
schema: &ConnectionSchema,
connector: &str,
mut schema: ConnectionSchema,
table_config: &Value,
profile_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
let mut schema = schema.clone();
if let Some(Format::Avro(AvroFormat {
confluent_schema_registry: true,
..
})) = &schema.format
{
let schema_response = get_schema(connector, table_config, profile_config).await?;

if schema_response.schema_type != ConfluentSchemaType::Avro {
return Err(bad_request(format!(
"Format configured is avro, but confluent schema repository returned a {:?} schema",
schema_response.schema_type
)));
}

schema.definition = Some(SchemaDefinition::AvroSchema(schema_response.schema));
}

let Some(SchemaDefinition::AvroSchema(definition)) = schema.definition.as_ref() else {
return Err(bad_request("avro format requires an avro schema be set"));
};

if let Some(Format::Avro(format)) = &mut schema.format {
format.add_reader_schema(
apache_avro::Schema::parse_str(&definition)
.map_err(|e| bad_request(format!("Avro schema is invalid: {:?}", e)))?,
);
}

let fields: Result<_, String> = avro::convert_avro_schema(&name, &definition)
.map_err(|e| bad_request(format!("Invalid avro schema: {}", e)))?
.into_iter()
.map(|f| f.try_into())
.collect();

schema.fields = fields.map_err(|e| bad_request(format!("Failed to convert schema: {}", e)))?;

Ok(schema)
}

async fn expand_json_schema(
name: &str,
connector: &str,
mut schema: ConnectionSchema,
table_config: &Value,
profile_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
if let Some(Format::Json(JsonFormat {
confluent_schema_registry: true,
..
})) = &schema.format
{
let schema_response = get_schema(connector, table_config, profile_config).await?;

if schema_response.schema_type != ConfluentSchemaType::Json {
return Err(bad_request(format!(
"Format configured is json, but confluent schema repository returned a {:?} schema",
schema_response.schema_type
)));
}

schema.definition = Some(SchemaDefinition::JsonSchema(schema_response.schema));
}

if let Some(d) = &schema.definition {
let fields = match d {
SchemaDefinition::JsonSchema(json) => convert_json_schema(name, &json)
SchemaDefinition::JsonSchema(json) => convert_json_schema(&name, &json)
.map_err(|e| bad_request(format!("Invalid json-schema: {}", e)))?,
SchemaDefinition::ProtobufSchema(_) => {
return Err(bad_request(
"Protobuf schemas are not yet supported".to_string(),
))
}
SchemaDefinition::AvroSchema(_) => {
return Err(bad_request(
"Avro schemas are not yet supported".to_string(),
))
}
SchemaDefinition::RawSchema(_) => vec![StructField::new(
"value".to_string(),
None,
TypeDef::DataType(DataType::Utf8, false),
)],
_ => return Err(bad_request("Invalid schema type for json format")),
};

let fields: Result<_, String> = fields.into_iter().map(|f| f.try_into()).collect();
Expand All @@ -449,6 +533,44 @@ pub(crate) fn expand_schema(
Ok(schema)
}

async fn get_schema(
connector: &str,
table_config: &Value,
profile_config: &Value,
) -> Result<ConfluentSchemaResponse, ErrorResp> {
if connector != "kafka" {
return Err(bad_request(
"confluent schema registry can only be used for Kafka connections",
));
}

// we unwrap here because this should already have been validated
let profile: KafkaConfig =
serde_json::from_value(profile_config.clone()).expect("invalid kafka config");

let table: KafkaTable =
serde_json::from_value(table_config.clone()).expect("invalid kafka table");

let schema_registry = profile.schema_registry.as_ref().ok_or_else(|| {
bad_request("schema registry must be configured on the Kafka connection profile")
})?;

let resolver =
ConfluentSchemaResolver::new(&schema_registry.endpoint, &table.topic).map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;

resolver.get_schema(None).await.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})
}

/// Test a Connection Schema
#[utoipa::path(
post,
Expand Down Expand Up @@ -480,110 +602,3 @@ pub(crate) async fn test_schema(
}
}
}

/// Get a Confluent Schema
#[utoipa::path(
get,
path = "/v1/connection_tables/schemas/confluent",
tag = "connection_tables",
params(
("topic" = String, Query, description = "Confluent topic name"),
("endpoint" = String, Query, description = "Confluent schema registry endpoint"),
),
responses(
(status = 200, description = "Got Confluent Schema", body = ConfluentSchema),
),
)]
pub(crate) async fn get_confluent_schema(
query_params: Query<ConfluentSchemaQueryParams>,
) -> Result<Json<ConfluentSchema>, ErrorResp> {
// TODO: ensure only external URLs can be hit
let url = format!(
"{}/subjects/{}-value/versions/latest",
query_params.endpoint, query_params.topic
);
let resp = reqwest::get(url).await.map_err(|e| {
warn!("Got error response from schema registry: {:?}", e);
match e.status() {
Some(StatusCode::NOT_FOUND) => bad_request(format!(
"Could not find value schema for topic '{}'",
query_params.topic
)),

Some(code) => bad_request(format!("Schema registry returned error: {}", code)),
None => {
warn!(
"Unknown error connecting to schema registry {}: {:?}",
query_params.endpoint, e
);
bad_request(format!(
"Could not connect to Schema Registry at {}: unknown error",
query_params.endpoint
))
}
}
})?;

if !resp.status().is_success() {
let message = format!(
"Received an error status code from the provided endpoint: {} {}",
resp.status().as_u16(),
resp.bytes()
.await
.map(|bs| String::from_utf8_lossy(&bs).to_string())
.unwrap_or_else(|_| "<failed to read body>".to_string())
);
return Err(bad_request(message));
}

let value: serde_json::Value = resp.json().await.map_err(|e| {
warn!("Invalid json from schema registry: {:?}", e);
bad_request(format!(
"Schema registry returned invalid JSON: {}",
e.to_string()
))
})?;

let schema_type = value
.get("schemaType")
.ok_or_else(|| {
bad_request(
"The JSON returned from this endpoint was unexpected. Please confirm that the URL is correct."
.to_string(),
)
})?
.as_str();

if schema_type != Some("JSON") {
return Err(bad_request(
"Only JSON schema types are supported currently".to_string(),
));
}

let schema = value
.get("schema")
.ok_or_else(|| {
return bad_request("Missing 'schema' field in schema registry response".to_string());
})?
.as_str()
.ok_or_else(|| {
return bad_request(
"The 'schema' field in the schema registry response is not a string".to_string(),
);
})?;

if let Err(e) = convert_json_schema(&query_params.topic, schema) {
warn!(
"Schema from schema registry is not valid: '{}': {}",
schema, e
);
return Err(bad_request(format!(
"Schema from schema registry is not valid: {}",
e
)));
}

Ok(Json(ConfluentSchema {
schema: schema.to_string(),
}))
}
2 changes: 1 addition & 1 deletion arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ pub async fn get_checkpoint_details(
.await
.map_err(log_and_map)?
.ok_or_else(|| {
not_found(format!(
not_found(&format!(
"Checkpoint with epoch {} for job '{}'",
epoch, job_pub_id
))
Expand Down
Loading
Loading