Skip to content

Commit

Permalink
Fix confusion around schema registry ids vs version (#430)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Nov 28, 2023
1 parent 61691c8 commit 9b1c17a
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 21 deletions.
6 changes: 3 additions & 3 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arroyo_rpc::api_types::{ConnectionTableCollection, PaginationQueryParams};
use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{
ConfluentSchemaRegistry, ConfluentSchemaResponse, ConfluentSchemaType,
ConfluentSchemaRegistry, ConfluentSchemaSubjectResponse, ConfluentSchemaType,
};
use arroyo_sql::avro;
use arroyo_sql::json_schema::convert_json_schema;
Expand Down Expand Up @@ -539,7 +539,7 @@ async fn get_schema(
connector: &str,
table_config: &Value,
profile_config: &Value,
) -> Result<ConfluentSchemaResponse, ErrorResp> {
) -> Result<ConfluentSchemaSubjectResponse, ErrorResp> {
if connector != "kafka" {
return Err(bad_request(
"confluent schema registry can only be used for Kafka connections",
Expand Down Expand Up @@ -573,7 +573,7 @@ async fn get_schema(
))
})?;

resolver.get_schema(None).await.map_err(|e| {
resolver.get_schema_for_version(None).await.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
Expand Down
9 changes: 6 additions & 3 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,18 @@ async fn try_register_confluent_schema(

match config.format.clone() {
Some(Format::Avro(mut avro)) => {
if avro.confluent_schema_registry && avro.schema_version.is_none() {
if avro.confluent_schema_registry && avro.schema_id.is_none() {
let fields: Vec<Field> = schema.fields.iter().map(|f| f.clone().into()).collect();

let schema = arrow_to_avro_schema(&schema.struct_name_ident(), &fields.into());

let version = schema_registry
let id = schema_registry
.write_schema(schema.canonical_form(), ConfluentSchemaType::Avro)
.await
.map_err(|e| anyhow!("Failed to write schema to schema registry: {}", e))?;

avro.schema_version = Some(version as u32);
println!("Fetched id = {}", id);
avro.schema_id = Some(id as u32);
config.format = Some(Format::Avro(avro))
}
}
Expand All @@ -210,6 +211,8 @@ async fn try_register_confluent_schema(

sink.config = serde_json::to_string(&config).unwrap();

println!("config = {}", sink.config);

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions arroyo-formats/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub fn to_vec<T: SchemaData>(
record: &T,
format: &AvroFormat,
schema: &Schema,
version: Option<i32>,
version: Option<u32>,
) -> Vec<u8> {
let v = record.to_avro(schema);

Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
raw_datums: false,
into_unstructured_json: false,
reader_schema: None,
schema_version: None,
schema_id: None,
};

let schema = arrow_to_avro_schema("ArroyoAvroRoot", &ArroyoAvroRoot::schema().fields());
Expand Down
7 changes: 5 additions & 2 deletions arroyo-formats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub struct DataSerializer<T: SchemaData> {
#[allow(unused)]
json_schema: Value,
avro_schema: apache_avro::schema::Schema,
schema_id: Option<i32>,
schema_id: Option<u32>,
format: Format,
_t: PhantomData<T>,
}
Expand All @@ -355,8 +355,11 @@ impl<T: SchemaData> DataSerializer<T> {
kafka_schema: json::arrow_to_kafka_json(T::name(), T::schema().fields()),
json_schema: json::arrow_to_json_schema(T::schema().fields()),
avro_schema: avro::arrow_to_avro_schema(T::name(), T::schema().fields()),
schema_id: match &format {
Format::Avro(avro) => avro.schema_id,
_ => None,
},
format,
schema_id: Some(1),
_t: PhantomData,
}
}
Expand Down
4 changes: 2 additions & 2 deletions arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub struct AvroFormat {

#[serde(default)]
#[schema(read_only)]
pub schema_version: Option<u32>,
pub schema_id: Option<u32>,
}

impl AvroFormat {
Expand All @@ -192,7 +192,7 @@ impl AvroFormat {
raw_datums,
into_unstructured_json,
reader_schema: None,
schema_version: None,
schema_id: None,
}
}

Expand Down
46 changes: 37 additions & 9 deletions arroyo-rpc/src/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{anyhow, bail};
use apache_avro::Schema;
use async_trait::async_trait;
use reqwest::{Client, StatusCode, Url};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::warn;
Expand Down Expand Up @@ -67,7 +68,7 @@ pub enum ConfluentSchemaType {

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConfluentSchemaResponse {
pub struct ConfluentSchemaSubjectResponse {
pub id: u32,
pub schema: String,
#[serde(default)]
Expand All @@ -76,6 +77,14 @@ pub struct ConfluentSchemaResponse {
pub version: u32,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConfluentSchemaIdResponse {
pub schema: String,
#[serde(default)]
pub schema_type: ConfluentSchemaType,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PostSchemaRequest {
Expand Down Expand Up @@ -109,8 +118,7 @@ impl ConfluentSchemaRegistry {
.build()
.unwrap();

let endpoint: Url = format!("{}/subjects/{}-value/versions/", endpoint, topic)
.as_str()
let endpoint: Url = endpoint
.try_into()
.map_err(|_| anyhow!("{} is not a valid url", endpoint))?;

Expand All @@ -123,6 +131,12 @@ impl ConfluentSchemaRegistry {
})
}

fn topic_endpoint(&self) -> Url {
self.endpoint
.join(&format!("subjects/{}-value/versions/", self.topic))
.unwrap()
}

pub async fn write_schema(
&self,
schema: impl Into<String>,
Expand All @@ -135,7 +149,7 @@ impl ConfluentSchemaRegistry {

let resp = self
.client
.post(self.endpoint.clone())
.post(self.topic_endpoint())
.json(&req)
.send()
.await
Expand Down Expand Up @@ -170,6 +184,9 @@ impl ConfluentSchemaRegistry {
StatusCode::UNAUTHORIZED => {
bail!("Invalid credentials for schema registry");
}
StatusCode::NOT_FOUND => {
bail!("Schema registry returned 404 for topic {}. Make sure that the topic exists.", self.topic)
}
code => {
bail!("Schema registry returned error {}: {}", code.as_u16(), body);
}
Expand All @@ -184,19 +201,29 @@ impl ConfluentSchemaRegistry {
Ok(resp.id)
}

pub async fn get_schema(
pub async fn get_schema_for_id(&self, id: u32) -> anyhow::Result<ConfluentSchemaIdResponse> {
let url = self.endpoint.join(&format!("/schemas/ids/{}", id)).unwrap();

self.get_schema_for_url(url).await
}

pub async fn get_schema_for_version(
&self,
version: Option<u32>,
) -> anyhow::Result<ConfluentSchemaResponse> {
) -> anyhow::Result<ConfluentSchemaSubjectResponse> {
let url = self
.endpoint
.topic_endpoint()
.join(
&version
.map(|v| format!("{}", v))
.unwrap_or_else(|| "latest".to_string()),
)
.unwrap();

self.get_schema_for_url(url).await
}

async fn get_schema_for_url<T: DeserializeOwned>(&self, url: Url) -> anyhow::Result<T> {
let mut get_call = self.client.get(url.clone());

if let Some(api_key) = self.api_key.as_ref() {
Expand Down Expand Up @@ -225,7 +252,8 @@ impl ConfluentSchemaRegistry {

if !resp.status().is_success() {
bail!(
"Received an error status code from the provided endpoint: {} {}",
"Received an error status code from the schema endpoint while fetching {}: {} {}",
url,
resp.status().as_u16(),
resp.bytes()
.await
Expand All @@ -247,7 +275,7 @@ impl ConfluentSchemaRegistry {
#[async_trait]
impl SchemaResolver for ConfluentSchemaRegistry {
async fn resolve_schema(&self, id: u32) -> Result<Option<String>, String> {
self.get_schema(Some(id))
self.get_schema_for_id(id)
.await
.map(|s| Some(s.schema))
.map_err(|e| e.to_string())
Expand Down

0 comments on commit 9b1c17a

Please sign in to comment.