diff --git a/arroyo-api/src/connection_tables.rs b/arroyo-api/src/connection_tables.rs index 2e170c86a..ab2937813 100644 --- a/arroyo-api/src/connection_tables.rs +++ b/arroyo-api/src/connection_tables.rs @@ -24,7 +24,7 @@ use arroyo_rpc::api_types::{ConnectionTableCollection, PaginationQueryParams}; use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat}; use arroyo_rpc::public_ids::{generate_id, IdTypes}; use arroyo_rpc::schema_resolver::{ - ConfluentSchemaRegistry, ConfluentSchemaResponse, ConfluentSchemaType, + ConfluentSchemaRegistry, ConfluentSchemaSubjectResponse, ConfluentSchemaType, }; use arroyo_sql::avro; use arroyo_sql::json_schema::convert_json_schema; @@ -539,7 +539,7 @@ async fn get_schema( connector: &str, table_config: &Value, profile_config: &Value, -) -> Result { +) -> Result { if connector != "kafka" { return Err(bad_request( "confluent schema registry can only be used for Kafka connections", @@ -573,7 +573,7 @@ async fn get_schema( )) })?; - resolver.get_schema(None).await.map_err(|e| { + resolver.get_schema_for_version(None).await.map_err(|e| { bad_request(format!( "failed to fetch schemas from schema repository: {}", e diff --git a/arroyo-api/src/pipelines.rs b/arroyo-api/src/pipelines.rs index 78fcebc34..497f55647 100644 --- a/arroyo-api/src/pipelines.rs +++ b/arroyo-api/src/pipelines.rs @@ -186,17 +186,18 @@ async fn try_register_confluent_schema( match config.format.clone() { Some(Format::Avro(mut avro)) => { - if avro.confluent_schema_registry && avro.schema_version.is_none() { + if avro.confluent_schema_registry && avro.schema_id.is_none() { let fields: Vec = schema.fields.iter().map(|f| f.clone().into()).collect(); let schema = arrow_to_avro_schema(&schema.struct_name_ident(), &fields.into()); - let version = schema_registry + let id = schema_registry .write_schema(schema.canonical_form(), ConfluentSchemaType::Avro) .await .map_err(|e| anyhow!("Failed to write schema to schema registry: {}", e))?; - avro.schema_version = Some(version as u32); + println!("Fetched id = {}", id); + avro.schema_id = Some(id as u32); config.format = Some(Format::Avro(avro)) } } @@ -210,6 +211,8 @@ async fn try_register_confluent_schema( sink.config = serde_json::to_string(&config).unwrap(); + println!("config = {}", sink.config); + Ok(()) } diff --git a/arroyo-formats/src/avro.rs b/arroyo-formats/src/avro.rs index 1692343eb..ffbe53dfb 100644 --- a/arroyo-formats/src/avro.rs +++ b/arroyo-formats/src/avro.rs @@ -97,7 +97,7 @@ pub fn to_vec( record: &T, format: &AvroFormat, schema: &Schema, - version: Option, + version: Option, ) -> Vec { let v = record.to_avro(schema); @@ -705,7 +705,7 @@ mod tests { raw_datums: false, into_unstructured_json: false, reader_schema: None, - schema_version: None, + schema_id: None, }; let schema = arrow_to_avro_schema("ArroyoAvroRoot", &ArroyoAvroRoot::schema().fields()); diff --git a/arroyo-formats/src/lib.rs b/arroyo-formats/src/lib.rs index d502356b9..d032d91d4 100644 --- a/arroyo-formats/src/lib.rs +++ b/arroyo-formats/src/lib.rs @@ -344,7 +344,7 @@ pub struct DataSerializer { #[allow(unused)] json_schema: Value, avro_schema: apache_avro::schema::Schema, - schema_id: Option, + schema_id: Option, format: Format, _t: PhantomData, } @@ -355,8 +355,11 @@ impl DataSerializer { kafka_schema: json::arrow_to_kafka_json(T::name(), T::schema().fields()), json_schema: json::arrow_to_json_schema(T::schema().fields()), avro_schema: avro::arrow_to_avro_schema(T::name(), T::schema().fields()), + schema_id: match &format { + Format::Avro(avro) => avro.schema_id, + _ => None, + }, format, - schema_id: Some(1), _t: PhantomData, } } diff --git a/arroyo-rpc/src/formats.rs b/arroyo-rpc/src/formats.rs index e4f5ae619..bbee05ca3 100644 --- a/arroyo-rpc/src/formats.rs +++ b/arroyo-rpc/src/formats.rs @@ -178,7 +178,7 @@ pub struct AvroFormat { #[serde(default)] #[schema(read_only)] - pub schema_version: Option, + pub schema_id: Option, } impl AvroFormat { @@ -192,7 +192,7 @@ impl AvroFormat { raw_datums, into_unstructured_json, reader_schema: None, - schema_version: None, + schema_id: None, } } diff --git a/arroyo-rpc/src/schema_resolver.rs b/arroyo-rpc/src/schema_resolver.rs index 8eb774516..e57c63291 100644 --- a/arroyo-rpc/src/schema_resolver.rs +++ b/arroyo-rpc/src/schema_resolver.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, bail}; use apache_avro::Schema; use async_trait::async_trait; use reqwest::{Client, StatusCode, Url}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::time::Duration; use tracing::warn; @@ -67,7 +68,7 @@ pub enum ConfluentSchemaType { #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct ConfluentSchemaResponse { +pub struct ConfluentSchemaSubjectResponse { pub id: u32, pub schema: String, #[serde(default)] @@ -76,6 +77,14 @@ pub struct ConfluentSchemaResponse { pub version: u32, } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ConfluentSchemaIdResponse { + pub schema: String, + #[serde(default)] + pub schema_type: ConfluentSchemaType, +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PostSchemaRequest { @@ -109,8 +118,7 @@ impl ConfluentSchemaRegistry { .build() .unwrap(); - let endpoint: Url = format!("{}/subjects/{}-value/versions/", endpoint, topic) - .as_str() + let endpoint: Url = endpoint .try_into() .map_err(|_| anyhow!("{} is not a valid url", endpoint))?; @@ -123,6 +131,12 @@ impl ConfluentSchemaRegistry { }) } + fn topic_endpoint(&self) -> Url { + self.endpoint + .join(&format!("subjects/{}-value/versions/", self.topic)) + .unwrap() + } + pub async fn write_schema( &self, schema: impl Into, @@ -135,7 +149,7 @@ impl ConfluentSchemaRegistry { let resp = self .client - .post(self.endpoint.clone()) + .post(self.topic_endpoint()) .json(&req) .send() .await @@ -170,6 +184,9 @@ impl ConfluentSchemaRegistry { StatusCode::UNAUTHORIZED => { bail!("Invalid credentials for schema registry"); } + StatusCode::NOT_FOUND => { + bail!("Schema registry returned 404 for topic {}. Make sure that the topic exists.", self.topic) + } code => { bail!("Schema registry returned error {}: {}", code.as_u16(), body); } @@ -184,12 +201,18 @@ impl ConfluentSchemaRegistry { Ok(resp.id) } - pub async fn get_schema( + pub async fn get_schema_for_id(&self, id: u32) -> anyhow::Result { + let url = self.endpoint.join(&format!("/schemas/ids/{}", id)).unwrap(); + + self.get_schema_for_url(url).await + } + + pub async fn get_schema_for_version( &self, version: Option, - ) -> anyhow::Result { + ) -> anyhow::Result { let url = self - .endpoint + .topic_endpoint() .join( &version .map(|v| format!("{}", v)) @@ -197,6 +220,10 @@ impl ConfluentSchemaRegistry { ) .unwrap(); + self.get_schema_for_url(url).await + } + + async fn get_schema_for_url(&self, url: Url) -> anyhow::Result { let mut get_call = self.client.get(url.clone()); if let Some(api_key) = self.api_key.as_ref() { @@ -225,7 +252,8 @@ impl ConfluentSchemaRegistry { if !resp.status().is_success() { bail!( - "Received an error status code from the provided endpoint: {} {}", + "Received an error status code from the schema endpoint while fetching {}: {} {}", + url, resp.status().as_u16(), resp.bytes() .await @@ -247,7 +275,7 @@ impl ConfluentSchemaRegistry { #[async_trait] impl SchemaResolver for ConfluentSchemaRegistry { async fn resolve_schema(&self, id: u32) -> Result, String> { - self.get_schema(Some(id)) + self.get_schema_for_id(id) .await .map(|s| Some(s.schema)) .map_err(|e| e.to_string())