Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Avro single-object encoding #203

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6,189 changes: 6,189 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"
flate2 = "1.0"
anyhow = "1"
async-trait = "0.1"
apache-avro = "^0.14"
apache-avro = "^0.17"
base64 = "0.13"
bytes = "1"
chrono = "0.4.31"
Expand All @@ -32,6 +32,7 @@ tokio-stream = { version = "0", features = ["fs"] }
tokio-util = "0.6.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
url = "2.3"
dashmap = "6.0.1"

# datafusion feature is required for writer version 2
deltalake-core = { version = "0.21.0", features = ["json", "datafusion"]}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ pub enum MessageFormat {

/// Parses avro messages using provided schema, schema registry or schema within file
Avro(SchemaSource),

/// Parses avro messages in the single object encoding format, PathBuf can either point to a single avro schema file or a directory containing (only) multiple avro schema files
SoeAvro(PathBuf),
}

/// Source for schema
Expand Down
32 changes: 31 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,24 @@ fn to_schema_source(
}
}

fn to_schema_path(input: Option<&String>) -> Result<PathBuf, SchemaSourceError> {
match input {
None => Err(SchemaSourceError::NoFileSpecified),
Some(value) => {
if value.is_empty() {
return Err(SchemaSourceError::NoFileSpecified);
}
let p = PathBuf::from_str(value)?;
if !p.exists() {
return Err(SchemaSourceError::FileNotFound {
file_name: (*value).clone(),
});
}
return Ok(p);
}
}
}

fn init_logger(app_id: String) {
let app_id: &'static str = Box::leak(app_id.into_boxed_str());
let log_level = std::env::var("RUST_LOG")
Expand Down Expand Up @@ -272,6 +290,9 @@ enum SchemaSourceError {
},
#[error("File not found error: {file_name}")]
FileNotFound { file_name: String },

#[error("No file specified error")]
NoFileSpecified,
}

fn parse_kafka_property(val: &str) -> Result<(String, String), KafkaPropertySyntaxError> {
Expand Down Expand Up @@ -444,8 +465,12 @@ This can be used to provide TLS configuration as in:
.env("AVRO_REGISTRY")
.required(false)
.help("Schema registry endpoint, local path, or empty string"))
.arg(Arg::new("soe-avro")
.long("soe-avro")
.required(false)
.help("Local path to either a single Avro schema file or a directory containing (only) Avro schematas"))
.group(ArgGroup::new("format")
.args(["json", "avro"])
.args(["json", "avro","soe-avro"])
.required(false))
.arg(Arg::new("end")
.short('e')
Expand Down Expand Up @@ -476,6 +501,11 @@ fn convert_matches_to_message_format(
.map(MessageFormat::Avro);
}

if ingest_matches.contains_id("soe-avro") {
return to_schema_path(ingest_matches.get_one::<String>("soe-avro"))
.map(MessageFormat::SoeAvro);
}

return to_schema_source(ingest_matches.get_one::<String>("json"), true)
.map(MessageFormat::Json);
}
Expand Down
148 changes: 147 additions & 1 deletion src/serialization.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};
use async_trait::async_trait;
use dashmap::DashMap;
use flate2::read::GzDecoder;
use schema_registry_converter::async_impl::{
easy_avro::EasyAvroDecoder, easy_json::EasyJsonDecoder, schema_registry::SrSettings,
};
use serde_json::Value;
use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, io::Read, path::PathBuf};

// use crate::avro_canonical_schema_workaround::parse_into_canonical_form;
use apache_avro::{rabin::Rabin, GenericSingleObjectReader, Schema};
use std::{
borrow::BorrowMut,
convert::{TryFrom, TryInto},
io::{Cursor, Read},
path::PathBuf,
};

use log::debug;

#[async_trait]
pub(crate) trait MessageDeserializer {
Expand Down Expand Up @@ -48,6 +59,10 @@ impl MessageDeserializerFactory {
}
}
},
MessageFormat::SoeAvro(path) => match SoeAvroDeserializer::try_from_path(path) {
Ok(s) => Ok(Box::new(s)),
Err(e) => Err(e),
},
_ => Ok(Box::new(DefaultDeserializer::new(decompress_gzip))),
}
}
Expand Down Expand Up @@ -128,6 +143,11 @@ struct AvroDeserializer {
decoder: EasyAvroDecoder,
}

struct SoeAvroDeserializer {
//Deserializer for avro single object encoding
decoders: DashMap<i64, GenericSingleObjectReader>,
}

#[derive(Default)]
struct AvroSchemaDeserializer {
schema: Option<apache_avro::Schema>,
Expand All @@ -137,6 +157,58 @@ struct JsonDeserializer {
decoder: EasyJsonDecoder,
}

#[async_trait]
impl MessageDeserializer for SoeAvroDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError> {
let key = Self::extract_message_fingerprint(message_bytes).map_err(|e| {
MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(message_bytes, e.to_string()),
}
})?;

let decoder =
self.decoders
.get(&key)
.ok_or(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
format!(
"Unkown schema with fingerprint {}",
&message_bytes[2..10]
.iter()
.map(|byte| format!("{:02x}", byte))
.collect::<Vec<String>>()
.join("")
),
),
})?;
let mut reader = Cursor::new(message_bytes);

match decoder.read_value(&mut reader) {
Ok(drs) => match Value::try_from(drs) {
Ok(v) => Ok(v),
Err(e) => Err(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
e.to_string(),
),
}),
},
Err(e) => {
return Err(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
e.to_string(),
),
});
}
}
}
}

#[async_trait]
impl MessageDeserializer for AvroDeserializer {
async fn deserialize(
Expand Down Expand Up @@ -293,5 +365,79 @@ impl AvroDeserializer {
}
}

impl SoeAvroDeserializer {
pub(crate) fn try_from_path(path: &PathBuf) -> Result<Self, anyhow::Error> {
if path.is_file() {
let (key, seo_reader) = Self::read_single_schema_file(path)?;
debug!(
"Loaded schema {:?} with key (i64 rep of fingerprint) {:?}",
path, key
);
let map: DashMap<i64, GenericSingleObjectReader> = DashMap::with_capacity(1);
map.insert(key, seo_reader);
Ok(SoeAvroDeserializer { decoders: map })
} else if path.is_dir() {
let decoders = path
.read_dir()?
.map(|file| {
let file_path = file?.path();
let value = Self::read_single_schema_file(&file_path)?;
Ok(value)
})
.collect::<anyhow::Result<DashMap<_, _>>>()?;

Ok(SoeAvroDeserializer { decoders })
} else {
Err(anyhow::format_err!("Path '{:?}' does not exists", path))
}
}

fn read_single_schema_file(
path: &PathBuf,
) -> Result<(i64, GenericSingleObjectReader), anyhow::Error> {
match std::fs::read_to_string(path) {
Ok(content) => match Schema::parse_str(&content) {
Ok(s) => {
let fingerprint = s.fingerprint::<Rabin>().bytes;
let fingerprint = fingerprint
.try_into()
.expect("Rabin fingerprints are 8 bytes");
let key = Self::fingerprint_to_i64(fingerprint);
match GenericSingleObjectReader::new(s) {
Ok(decoder) => Ok((key, decoder)),
Err(e) => Err(anyhow::format_err!(
"Schema file '{:?}'; Error: {}",
path,
e.to_string()
)),
}
}
Err(e) => Err(anyhow::format_err!(
"Schema file '{:?}'; Error: {}",
path,
e.to_string()
)),
},
Err(e) => Err(anyhow::format_err!(
"Schema file '{:?}'; Error: {}",
path,
e.to_string()
)),
}
}

fn extract_message_fingerprint(msg: &[u8]) -> Result<i64, anyhow::Error> {
msg.get(2..10)
.ok_or(anyhow::anyhow!(
"Message does not contain a valid fingerprint"
))
.map(|x| Self::fingerprint_to_i64(x.try_into().expect("Slice must be 8 bytes long")))
}

fn fingerprint_to_i64(msg: [u8; 8]) -> i64 {
i64::from_le_bytes(msg)
}
}

#[cfg(test)]
mod tests {}
54 changes: 54 additions & 0 deletions src/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ lazy_static! {
"epoch_micros_to_iso8601",
Box::new(create_epoch_micros_to_iso8601_fn()),
);
runtime.register_function(
"epoch_millis_to_micro",
Box::new(create_epoch_millis_to_micro_fn()),
);
runtime
};
}
Expand Down Expand Up @@ -202,6 +206,13 @@ fn create_epoch_micros_to_iso8601_fn() -> CustomFunction {
)
}

fn create_epoch_millis_to_micro_fn() -> CustomFunction {
CustomFunction::new(
Signature::new(vec![ArgumentType::Number], None),
Box::new(jmespath_epoch_millis_to_micro),
)
}

fn substr(args: &[Rcvar], context: &mut Context) -> Result<Rcvar, JmespathError> {
let s = args[0].as_string().ok_or_else(|| {
InvalidTypeError::new(context, "string", args[0].get_type().to_string(), 0)
Expand Down Expand Up @@ -268,6 +279,14 @@ fn jmespath_epoch_micros_to_iso8601(
let variable = Variable::try_from(value)?;
Ok(Arc::new(variable))
}
fn jmespath_epoch_millis_to_micro(
args: &[Rcvar],
context: &mut Context,
) -> Result<Rcvar, JmespathError> {
let millis = i64_from_args(args, context, 0)?;
let variable = Variable::Number((millis * 1000).into());
Ok(Arc::new(variable))
}

fn i64_from_args(
args: &[Rcvar],
Expand Down Expand Up @@ -586,6 +605,41 @@ mod tests {
assert_eq!(expected_iso, dt);
}

#[test]
fn test_epoch_millis_to_micro() {
let mut test_value = json!({
"name": "A",
"modified": 1732279537028u64,
});

let test_message = OwnedMessage::new(
Some(test_value.to_string().into_bytes()),
None,
"test".to_string(),
rdkafka::Timestamp::NotAvailable,
0,
0,
None,
);

let mut transforms = HashMap::new();

transforms.insert(
"modified_micros".to_string(),
"epoch_millis_to_micro(modified)".to_string(),
);

let transformer = Transformer::from_transforms(&transforms).unwrap();

transformer
.transform(&mut test_value, Some(&test_message))
.unwrap();

let modified_date = test_value.get("modified_micros").unwrap().as_u64().unwrap();

assert_eq!(1732279537028000u64, modified_date);
}

#[test]
fn test_transforms_with_epoch_seconds_to_iso8601() {
let expected_iso = "2021-07-20T23:18:18Z";
Expand Down
1 change: 1 addition & 0 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ impl DataWriter {
let mut adds = self.write_parquet_files(&table.table_uri()).await?;
let actions = adds.drain(..).map(Action::Add).collect();
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
.with_max_retries(100) //We increase this from the default 15 times because (at leat for Azure) this may fail in case of to frequent writes (which happen if many messages arrive in the dead letter queue)
.with_actions(actions)
.build(
table.state.as_ref().map(|s| s as &dyn TableReference),
Expand Down
Loading
Loading