From 14361c0b396e716efcb84781a3b52d660d57a4bd Mon Sep 17 00:00:00 2001 From: MarcoLugo Date: Fri, 16 Aug 2024 03:59:41 -0400 Subject: [PATCH] AVRO-4031: [Rust] builder header (#3096) * AVRO-4031: [Rust] allow setting has_header in builder * AVRO-4031: [Rust] add test for appending with multiple writers * AVRO-4031: [Rust] Prefix the new IT test name with `avro_4031_` Signed-off-by: Martin Tzvetanov Grigorov --------- Signed-off-by: Martin Tzvetanov Grigorov Co-authored-by: Martin Tzvetanov Grigorov --- lang/rust/avro/src/writer.rs | 6 +-- lang/rust/avro/tests/append_to_existing.rs | 48 ++++++++++++++++------ 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index 976193a812a..5010bfff552 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -49,7 +49,7 @@ pub struct Writer<'a, W> { num_values: usize, #[builder(default = generate_sync_marker())] marker: [u8; 16], - #[builder(default = false, setter(skip))] + #[builder(default = false)] has_header: bool, #[builder(default)] user_metadata: HashMap, @@ -114,8 +114,8 @@ impl<'a, W: Write> Writer<'a, W> { .writer(writer) .codec(codec) .marker(marker) + .has_header(true) .build(); - w.has_header = true; w.resolved_schema = ResolvedSchema::try_from(schema).ok(); w } @@ -134,8 +134,8 @@ impl<'a, W: Write> Writer<'a, W> { .writer(writer) .codec(codec) .marker(marker) + .has_header(true) .build(); - w.has_header = true; w.resolved_schema = ResolvedSchema::try_from(schemata).ok(); w } diff --git a/lang/rust/avro/tests/append_to_existing.rs b/lang/rust/avro/tests/append_to_existing.rs index 2ea59d95c56..d378ad68416 100644 --- a/lang/rust/avro/tests/append_to_existing.rs +++ b/lang/rust/avro/tests/append_to_existing.rs @@ -22,19 +22,17 @@ use apache_avro::{ }; use apache_avro_test_helper::TestResult; +const SCHEMA: &str = r#"{ + "type": "record", + "name": "append_to_existing_file", + "fields": [ + {"name": "a", "type": "int"} + ] +}"#; + #[test] fn avro_3630_append_to_an_existing_file() -> TestResult { - let schema_str = r#" - { - "type": "record", - "name": "append_to_existing_file", - "fields": [ - {"name": "a", "type": "int"} - ] - } - "#; - - let schema = Schema::parse_str(schema_str).expect("Cannot parse the schema"); + let schema = Schema::parse_str(SCHEMA).expect("Cannot parse the schema"); let bytes = get_avro_bytes(&schema); @@ -51,13 +49,37 @@ fn avro_3630_append_to_an_existing_file() -> TestResult { let reader = Reader::new(&*new_bytes).expect("Cannot read the new bytes"); let mut i = 1; for value in reader { - check(value, i); + check(&value, i); i += 1 } Ok(()) } +#[test] +fn avro_4031_append_to_file_using_multiple_writers() -> TestResult { + let schema = Schema::parse_str(SCHEMA).expect("Cannot parse the schema"); + + let mut first_writer = Writer::builder().schema(&schema).writer(Vec::new()).build(); + first_writer.append(create_datum(&schema, -42))?; + let mut resulting_bytes = first_writer.into_inner()?; + let first_marker = read_marker(&resulting_bytes); + + let mut second_writer = Writer::builder() + .schema(&schema) + .has_header(true) + .marker(first_marker) + .writer(Vec::new()) + .build(); + second_writer.append(create_datum(&schema, 42))?; + resulting_bytes.append(&mut second_writer.into_inner()?); + + let values: Vec<_> = Reader::new(&resulting_bytes[..])?.collect(); + check(&values[0], -42); + check(&values[1], 42); + Ok(()) +} + /// Simulates reading from a pre-existing .avro file and returns its bytes fn get_avro_bytes(schema: &Schema) -> Vec { let mut writer = Writer::new(schema, Vec::new()); @@ -75,7 +97,7 @@ fn create_datum(schema: &Schema, value: i32) -> Record { } /// Checks the read values -fn check(value: AvroResult, expected: i32) { +fn check(value: &AvroResult, expected: i32) { match value { Ok(value) => match value { Value::Record(fields) => match &fields[0] {