Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Oct 26, 2023
1 parent 63bad7e commit d2d2698
Show file tree
Hide file tree
Showing 12 changed files with 338 additions and 182 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 81 additions & 42 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;

use arroyo_connectors::{Connector, connector_for_type, ErasedConnector};
use arroyo_connectors::kafka::{KafkaConfig, KafkaConnector, KafkaTable};
use arroyo_connectors::{connector_for_type, Connector, ErasedConnector};
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema,
ConnectionTable, ConnectionTablePost, SchemaDefinition,
ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, SchemaDefinition,
};
use arroyo_rpc::api_types::{ConnectionTableCollection, PaginationQueryParams};
use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{ConfluentSchemaResolver, ConfluentSchemaResponse, ConfluentSchemaType};
use arroyo_rpc::schema_resolver::{
ConfluentSchemaResolver, ConfluentSchemaResponse, ConfluentSchemaType,
};
use arroyo_sql::avro;
use arroyo_sql::json_schema::convert_json_schema;
use arroyo_sql::types::{StructField, TypeDef};
Expand Down Expand Up @@ -431,48 +432,75 @@ pub(crate) async fn expand_schema(
};

match format {
Format::Json(_) => expand_json_schema(name, connector, schema, table_config, profile_config).await,
Format::Avro(_) => expand_avro_schema(name, connector, schema, table_config, profile_config).await,
Format::Json(_) => {
expand_json_schema(name, connector, schema, table_config, profile_config).await
}
Format::Avro(_) => {
expand_avro_schema(name, connector, schema, table_config, profile_config).await
}
Format::Parquet(_) => Ok(schema),
Format::RawString(_) => Ok(schema),
}
}

async fn expand_avro_schema(name: &str, connector: &str, mut schema: ConnectionSchema, table_config: &Value, profile_config: &Value) -> Result<ConnectionSchema, ErrorResp> {
if let Some(Format::Avro(AvroFormat { confluent_schema_registry: true, .. })) = &schema.format {
async fn expand_avro_schema(
name: &str,
connector: &str,
mut schema: ConnectionSchema,
table_config: &Value,
profile_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
if let Some(Format::Avro(AvroFormat {
confluent_schema_registry: true,
..
})) = &schema.format
{
let schema_response = get_schema(connector, table_config, profile_config).await?;

if schema_response.schema_type != ConfluentSchemaType::Avro {
return Err(bad_request(format!("Format configured is avro, but confluent schema repository returned a {:?} schema",
schema_response.schema_type)));
return Err(bad_request(format!(
"Format configured is avro, but confluent schema repository returned a {:?} schema",
schema_response.schema_type
)));
}

schema.definition = Some(SchemaDefinition::AvroSchema(schema_response.schema));
}


let Some(SchemaDefinition::AvroSchema(definition)) = schema.definition.as_ref() else {
return Err(bad_request("avro format requires an avro schema be set"));
};

let fields: Result<_, String> = avro::convert_avro_schema(&name, &definition)
.map_err(|e| bad_request(format!("Invalid avro schema: {}", e)))?
.into_iter().map(|f| f.try_into()).collect();

.into_iter()
.map(|f| f.try_into())
.collect();

schema.fields = fields
.map_err(|e| bad_request(format!("Failed to convert schema: {}", e)))?;
schema.fields = fields.map_err(|e| bad_request(format!("Failed to convert schema: {}", e)))?;

Ok(schema)
}

async fn expand_json_schema(name: &str, connector: &str, mut schema: ConnectionSchema, table_config: &Value, profile_config: &Value) -> Result<ConnectionSchema, ErrorResp> {
if let Some(Format::Json(JsonFormat { confluent_schema_registry: true, .. })) = &schema.format {
async fn expand_json_schema(
name: &str,
connector: &str,
mut schema: ConnectionSchema,
table_config: &Value,
profile_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
if let Some(Format::Json(JsonFormat {
confluent_schema_registry: true,
..
})) = &schema.format
{
let schema_response = get_schema(connector, table_config, profile_config).await?;

if schema_response.schema_type != ConfluentSchemaType::Json {
return Err(bad_request(format!("Format configured is json, but confluent schema repository returned a {:?} schema",
schema_response.schema_type)));
return Err(bad_request(format!(
"Format configured is json, but confluent schema repository returned a {:?} schema",
schema_response.schema_type
)));
}

schema.definition = Some(SchemaDefinition::JsonSchema(schema_response.schema));
Expand All @@ -487,11 +515,7 @@ async fn expand_json_schema(name: &str, connector: &str, mut schema: ConnectionS
None,
TypeDef::DataType(DataType::Utf8, false),
)],
_ => {
return Err(bad_request(
"Invalid schema type for json format",
))
}
_ => return Err(bad_request("Invalid schema type for json format")),
};

let fields: Result<_, String> = fields.into_iter().map(|f| f.try_into()).collect();
Expand All @@ -501,29 +525,44 @@ async fn expand_json_schema(name: &str, connector: &str, mut schema: ConnectionS
}

Ok(schema)

}

async fn get_schema(connector: &str, table_config: &Value, profile_config: &Value) -> Result<ConfluentSchemaResponse, ErrorResp> {
async fn get_schema(
connector: &str,
table_config: &Value,
profile_config: &Value,
) -> Result<ConfluentSchemaResponse, ErrorResp> {
if connector != "kafka" {
return Err(bad_request("confluent schema registry can only be used for Kafka connections"));
return Err(bad_request(
"confluent schema registry can only be used for Kafka connections",
));
}

// we unwrap here because this should already have been validated
let profile: KafkaConfig = serde_json::from_value(profile_config.clone())
.expect("invalid kafka config");

let table: KafkaTable = serde_json::from_value(table_config.clone())
.expect("invalid kafka table");

let schema_registry = profile.schema_registry.as_ref().ok_or_else(||
bad_request("schema registry must be configured on the Kafka connection profile"))?;

let resolver = ConfluentSchemaResolver::new(&schema_registry.endpoint, &table.topic)
.map_err(|e| bad_request(format!("failed to fetch schemas from schema repository: {}", e)))?;

resolver.get_schema(None).await
.map_err(|e| bad_request(format!("failed to fetch schemas from schema repository: {}", e)))
let profile: KafkaConfig =
serde_json::from_value(profile_config.clone()).expect("invalid kafka config");

let table: KafkaTable =
serde_json::from_value(table_config.clone()).expect("invalid kafka table");

let schema_registry = profile.schema_registry.as_ref().ok_or_else(|| {
bad_request("schema registry must be configured on the Kafka connection profile")
})?;

let resolver =
ConfluentSchemaResolver::new(&schema_registry.endpoint, &table.topic).map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;

resolver.get_schema(None).await.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})
}

/// Test a Connection Schema
Expand Down Expand Up @@ -556,4 +595,4 @@ pub(crate) async fn test_schema(
Ok(())
}
}
}
}
4 changes: 2 additions & 2 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::connection_profiles::{
__path_create_connection_profile, __path_get_connection_profiles,
};
use crate::connection_tables::{
__path_create_connection_table, __path_delete_connection_table,
__path_get_connection_tables, __path_test_connection_table, __path_test_schema,
__path_create_connection_table, __path_delete_connection_table, __path_get_connection_tables,
__path_test_connection_table, __path_test_schema,
};
use crate::connectors::__path_get_connectors;
use crate::jobs::{
Expand Down
4 changes: 2 additions & 2 deletions arroyo-api/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use utoipa_swagger_ui::SwaggerUi;

use crate::connection_profiles::{create_connection_profile, get_connection_profiles};
use crate::connection_tables::{
create_connection_table, delete_connection_table, get_connection_tables,
test_connection_table, test_schema,
create_connection_table, delete_connection_table, get_connection_tables, test_connection_table,
test_schema,
};
use crate::connectors::get_connectors;
use crate::jobs::{
Expand Down
3 changes: 2 additions & 1 deletion arroyo-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ serde_json = "1.0"
nanoid = "0.4"
utoipa = "3"
anyhow = "1.0.75"
reqwest = "0.11.22"
reqwest = { version = "0.11.22", features = ["default", "serde_json", "json"] }
log = "0.4.20"
tracing = "0.1.40"
async-trait = "0.1.74"

[build-dependencies]
tonic-build = { workspace = true }
10 changes: 6 additions & 4 deletions arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use arroyo_types::UserError;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use serde_json::Value;
use utoipa::ToSchema;
use arroyo_types::UserError;

#[derive(
Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default, Hash, PartialOrd, ToSchema,
Expand Down Expand Up @@ -102,13 +102,15 @@ pub struct ConfluentSchemaRegistryConfig {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AvroFormat {
pub confluent_schema_registry: bool
pub confluent_schema_registry: bool,
pub embedded_schema: bool,
}

impl AvroFormat {
pub fn from_opts(opts: &mut HashMap<String, String>) -> Result<Self, String> {
Ok(Self {
confluent_schema_registry: false
confluent_schema_registry: false,
embedded_schema: false,
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
pub mod api_types;
pub mod formats;
pub mod schema_resolver;
pub mod public_ids;

pub mod schema_resolver;

use std::{fs, time::SystemTime};

Expand Down
Loading

0 comments on commit d2d2698

Please sign in to comment.