Skip to content

Commit

Permalink
avro writing mostly done
Browse files Browse the repository at this point in the history
mwylde committed Nov 26, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent a045422 commit e8ab5ca
Showing 26 changed files with 212 additions and 182 deletions.
72 changes: 42 additions & 30 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use anyhow::{anyhow, bail, Context};
use arrow_schema::{Field, Fields};
use arroyo_connectors::connector_for_type;
use axum::extract::{Path, Query, State};
use axum::Json;
use axum_extra::extract::WithRejection;
use cornucopia_async::{GenericClient, Params};
use deadpool_postgres::{Object, Transaction};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use arrow_schema::{Field, Fields};
use http::StatusCode;
use jwt_simple::prelude::coarsetime::clock_gettime_nsec_np;
use petgraph::Direction;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use crate::{jobs, pipelines, types};
use arroyo_datastream::{ConnectorOp, Operator, Program, StreamNode};
@@ -26,20 +26,20 @@ use arroyo_rpc::grpc::api::{
create_pipeline_req, CreateJobReq, CreatePipelineReq, CreateSqlJob, PipelineProgram,
};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_formats::avro::arrow_to_avro_schema;
use arroyo_rpc::formats::Format;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistry, ConfluentSchemaType};
use arroyo_rpc::OperatorConfig;
use arroyo_server_common::log_event;
use arroyo_sql::{has_duplicate_udf_names, ArroyoSchemaProvider, SqlConfig, CompiledSql};
use arroyo_sql::types::StructDef;
use arroyo_sql::{has_duplicate_udf_names, ArroyoSchemaProvider, CompiledSql, SqlConfig};
use petgraph::visit::EdgeRef;
use prost::Message;
use serde_json::json;
use time::OffsetDateTime;
use tracing::warn;
use arroyo_connectors::kafka::{KafkaConfig, KafkaTable};
use arroyo_formats::avro::arrow_to_avro_schema;
use arroyo_rpc::formats::Format;
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistry, ConfluentSchemaType};
use arroyo_sql::types::StructDef;

use crate::jobs::get_action;
use crate::queries::api_queries;
@@ -151,7 +151,10 @@ fn set_parallelism(program: &mut Program, parallelism: usize) {
}
}

async fn try_register_confluent_schema(sink: &mut ConnectorOp, schema: &StructDef) -> anyhow::Result<()> {
async fn try_register_confluent_schema(
sink: &mut ConnectorOp,
schema: &StructDef,
) -> anyhow::Result<()> {
let mut config: OperatorConfig = serde_json::from_str(&sink.config).unwrap();

let Ok(profile) = serde_json::from_value::<KafkaConfig>(config.connection.clone()) else {
@@ -163,27 +166,27 @@ async fn try_register_confluent_schema(sink: &mut ConnectorOp, schema: &StructDe
};

let Some(registry_config) = profile.schema_registry else {
return Ok(())
return Ok(());
};

let schema_registry = ConfluentSchemaRegistry::new(&registry_config.endpoint,
&table.topic,
registry_config.api_key,
registry_config.api_secret)?;
let schema_registry = ConfluentSchemaRegistry::new(
&registry_config.endpoint,
&table.topic,
registry_config.api_key,
registry_config.api_secret,
)?;

match config.format.clone() {
Some(Format::Avro(mut avro)) => {
if avro.confluent_schema_registry && avro.schema_version.is_none() {
let fields: Vec<Field> = schema.fields.iter().map(|f|
f.clone().into()
).collect();
let fields: Vec<Field> = schema.fields.iter().map(|f| f.clone().into()).collect();

let schema = arrow_to_avro_schema(&schema.struct_name_ident(),
&fields.into());
let schema = arrow_to_avro_schema(&schema.struct_name_ident(), &fields.into());

println!("Schema: {}", schema.canonical_form());

let version = schema_registry.write_schema(schema.canonical_form(), ConfluentSchemaType::Avro)
let version = schema_registry
.write_schema(schema.canonical_form(), ConfluentSchemaType::Avro)
.await
.map_err(|e| anyhow!("Failed to write schema to schema registry: {}", e))?;

@@ -206,8 +209,12 @@ async fn try_register_confluent_schema(sink: &mut ConnectorOp, schema: &StructDe

async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()> {
for node in compiled_sql.program.graph.node_indices() {
let Some(input) = compiled_sql.program.graph.edges_directed(node, Direction::Incoming)
.next() else {
let Some(input) = compiled_sql
.program
.graph
.edges_directed(node, Direction::Incoming)
.next()
else {
continue;
};

@@ -313,14 +320,19 @@ pub(crate) async fn create_pipeline<'a>(
}
}

register_schemas(&mut compiled).await
register_schemas(&mut compiled)
.await
.map_err(|e| ErrorResp {
status_code: StatusCode::BAD_REQUEST,
message: format!("Failed to register schemas with the schema registry. Make sure \
that the schema_registry is configured correctly and running.\nDetails: {}", e)
message: format!(
"Failed to register schemas with the schema registry. Make sure \
that the schema_registry is configured correctly and running.\nDetails: {}",
e
),
})?;

let proto_program: PipelineProgram = compiled.program.clone().try_into().map_err(log_and_map)?;
let proto_program: PipelineProgram =
compiled.program.clone().try_into().map_err(log_and_map)?;

let program_bytes = proto_program.encode_to_vec();

@@ -448,7 +460,7 @@ pub async fn validate_query(

let pipeline_graph_validation_result =
match compile_sql(validate_query_post.query, &udfs, 1, &auth_data, &client).await {
Ok(CompiledSql{ mut program, ..}) => {
Ok(CompiledSql { mut program, .. }) => {
optimizations::optimize(&mut program.graph);
let nodes = program
.graph
61 changes: 28 additions & 33 deletions arroyo-formats/src/avro.rs
Original file line number Diff line number Diff line change
@@ -141,7 +141,7 @@ fn convert_float(f: f64) -> JsonValue {
} else {
"NaN"
})
.to_string(),
.to_string(),
),
}
}
@@ -252,7 +252,7 @@ fn arrow_to_avro(name: &str, dt: &DataType) -> serde_json::value::Value {
})
}

fn field_to_avro(index: usize, name: &str, field: &Field) -> serde_json::value::Value {
fn field_to_avro(name: &str, field: &Field) -> serde_json::value::Value {
let next_name = format!("{}_{}", name, &field.name());
let mut schema = arrow_to_avro(&AvroFormat::sanitize_field(&next_name), field.data_type());

@@ -273,11 +273,7 @@ fn field_to_avro(index: usize, name: &str, field: &Field) -> serde_json::value::
/// Note this must align with the generated code created in
/// `arroyo_sql::avro::generate_serializer_items`!
pub fn arrow_to_avro_schema(name: &str, fields: &Fields) -> Schema {
let fields: Vec<_> = fields
.iter()
.enumerate()
.map(|(i, f)| field_to_avro(i, name, &**f))
.collect();
let fields: Vec<_> = fields.iter().map(|f| field_to_avro(name, &**f)).collect();

let schema = json!({
"type": "record",
@@ -288,12 +284,11 @@ pub fn arrow_to_avro_schema(name: &str, fields: &Fields) -> Schema {
Schema::parse_str(&schema.to_string()).unwrap()
}


#[cfg(test)]
mod tests {
use super::{arrow_to_avro_schema, to_vec};
use crate::{DataDeserializer, SchemaData};
use apache_avro::{Schema};
use apache_avro::Schema;
use arroyo_rpc::formats::{AvroFormat, Format};
use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver};
use arroyo_types::RawJson;
@@ -371,14 +366,14 @@ mod tests {
#[tokio::test]
async fn test_avro_deserialization() {
#[derive(
Clone,
Debug,
bincode::Encode,
bincode::Decode,
PartialEq,
PartialOrd,
serde::Serialize,
serde::Deserialize,
Clone,
Debug,
bincode::Encode,
bincode::Decode,
PartialEq,
PartialOrd,
serde::Serialize,
serde::Deserialize,
)]
pub struct ArroyoAvroRoot {
pub store_id: i32,
@@ -476,14 +471,14 @@ mod tests {
#[tokio::test]
async fn test_backwards_compatible() {
#[derive(
Clone,
Debug,
bincode::Encode,
bincode::Decode,
PartialEq,
PartialOrd,
serde::Serialize,
serde::Deserialize,
Clone,
Debug,
bincode::Encode,
bincode::Decode,
PartialEq,
PartialOrd,
serde::Serialize,
serde::Deserialize,
)]
pub struct ArroyoAvroRoot {
pub name: String,
@@ -625,14 +620,14 @@ mod tests {
#[tokio::test]
async fn test_writing() {
#[derive(
Clone,
Debug,
bincode::Encode,
bincode::Decode,
PartialEq,
PartialOrd,
serde::Serialize,
serde::Deserialize,
Clone,
Debug,
bincode::Encode,
bincode::Decode,
PartialEq,
PartialOrd,
serde::Serialize,
serde::Deserialize,
)]
pub struct ArroyoAvroRoot {
pub name: String,
48 changes: 24 additions & 24 deletions arroyo-formats/src/json.rs
Original file line number Diff line number Diff line change
@@ -55,15 +55,15 @@ pub mod timestamp_as_millis {
use super::MilliSecondsSystemTimeVisitor;

pub fn serialize<S>(t: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
where
S: Serializer,
{
serializer.serialize_u64(to_millis(*t))
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
where
D: Deserializer<'de>,
{
deserializer.deserialize_i64(MilliSecondsSystemTimeVisitor)
}
@@ -77,8 +77,8 @@ pub mod timestamp_as_millis {

/// Deserialize a timestamp in milliseconds since the epoch
fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
where
E: de::Error,
where
E: de::Error,
{
if value >= 0 {
Ok(from_millis(value as u64))
@@ -89,8 +89,8 @@ pub mod timestamp_as_millis {

/// Deserialize a timestamp in milliseconds since the epoch
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
E: de::Error,
where
E: de::Error,
{
Ok(from_millis(value))
}
@@ -110,8 +110,8 @@ pub mod opt_timestamp_as_millis {
use super::{MilliSecondsSystemTimeVisitor, OptMilliSecondsSystemTimeVisitor};

pub fn serialize<S>(t: &Option<SystemTime>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
where
S: Serializer,
{
if let Some(t) = t {
serializer.serialize_some(&to_millis(*t))
@@ -121,8 +121,8 @@ pub mod opt_timestamp_as_millis {
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<SystemTime>, D::Error>
where
D: Deserializer<'de>,
where
D: Deserializer<'de>,
{
deserializer.deserialize_option(OptMilliSecondsSystemTimeVisitor)
}
@@ -136,17 +136,17 @@ pub mod opt_timestamp_as_millis {

/// Deserialize a timestamp in milliseconds since the epoch
fn visit_some<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
where
D: Deserializer<'de>,
{
Ok(Some(
deserializer.deserialize_any(MilliSecondsSystemTimeVisitor)?,
))
}

fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: de::Error,
where
E: de::Error,
{
Ok(None)
}
@@ -164,16 +164,16 @@ pub mod timestamp_as_rfc3339 {
use serde::{Deserialize, Deserializer, Serializer};

pub fn serialize<S>(t: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
where
S: Serializer,
{
let dt: DateTime<Utc> = (*t).into();
serializer.serialize_str(&dt.to_rfc3339())
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
where
D: Deserializer<'de>,
{
let raw: chrono::DateTime<Utc> = DateTime::deserialize(deserializer)?;
Ok(from_nanos(
@@ -191,8 +191,8 @@ pub mod opt_timestamp_as_rfc3339 {
use serde::{Deserialize, Deserializer, Serializer};

pub fn serialize<S>(t: &Option<SystemTime>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
where
S: Serializer,
{
if let Some(t) = *t {
let dt: DateTime<Utc> = t.into();
@@ -203,8 +203,8 @@ pub mod opt_timestamp_as_rfc3339 {
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<SystemTime>, D::Error>
where
D: Deserializer<'de>,
where
D: Deserializer<'de>,
{
let raw = Option::<DateTime<Utc>>::deserialize(deserializer)?;
Ok(raw.map(|raw| {
Loading

0 comments on commit e8ab5ca

Please sign in to comment.