From 9c05f5df2ec0f1f14f3ff6897df52fbad19b324a Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Sat, 26 Oct 2024 19:05:21 -0600 Subject: [PATCH] chore(deps): Drop use of `infer` crate (#21623) * chore(deps): Drop use of `infer` crate * Add reference * Fix spelling --- Cargo.lock | 9 +--- Cargo.toml | 3 +- src/sources/aws_kinesis_firehose/handlers.rs | 53 +++++++++++++++----- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5259d73832987..dd1a36e72a908 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4392,7 +4392,7 @@ dependencies = [ "async-channel", "base64 0.13.1", "futures-lite", - "infer 0.2.3", + "infer", "pin-project-lite", "rand 0.7.3", "serde", @@ -4740,12 +4740,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" -[[package]] -name = "infer" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc150e5ce2330295b8616ce0e3f53250e53af31759a9dbedad1621ba29151847" - [[package]] name = "influxdb-line-protocol" version = "2.0.0" @@ -10552,7 +10546,6 @@ dependencies = [ "hyper-proxy", "indexmap 2.6.0", "indoc", - "infer 0.16.0", "inventory", "ipnet", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index cb3f4a80dbd7d..ef781ce2841e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -313,7 +313,6 @@ hyper = { version = "0.14.28", default-features = false, features = ["client", " hyper-openssl = { version = "0.9.2", default-features = false } hyper-proxy = { version = "0.9.1", default-features = false, features = ["openssl-tls"] } indexmap.workspace = true -infer = { version = "0.16.0", default-features = false, optional = true } indoc = { version = "2.0.5", default-features = false } inventory = { version = "0.3.15", default-features = false } ipnet = { version = "2", default-features = false, optional = true, features = ["serde", "std"] } @@ -568,7 +567,7 @@ sources-metrics = [ sources-amqp = ["lapin"] sources-apache_metrics = ["sources-utils-http-client"] sources-aws_ecs_metrics = ["sources-utils-http-client"] -sources-aws_kinesis_firehose = ["dep:base64", "dep:infer"] +sources-aws_kinesis_firehose = ["dep:base64"] sources-aws_s3 = ["aws-core", "dep:aws-sdk-sqs", "dep:aws-sdk-s3", "dep:semver", "dep:async-compression", "sources-aws_sqs", "tokio-util/io"] sources-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] sources-datadog_agent = ["sources-utils-http-error", "protobuf-build", "dep:prost"] diff --git a/src/sources/aws_kinesis_firehose/handlers.rs b/src/sources/aws_kinesis_firehose/handlers.rs index 3b81662d542b8..d623dc297842f 100644 --- a/src/sources/aws_kinesis_firehose/handlers.rs +++ b/src/sources/aws_kinesis_firehose/handlers.rs @@ -219,24 +219,32 @@ fn decode_record( compression: compression.to_owned(), }), Compression::Auto => { - match infer::get(&buf) { - Some(filetype) => match filetype.mime_type() { - "application/gzip" => decode_gzip(&buf[..]).or_else(|error| { - emit!(AwsKinesisFirehoseAutomaticRecordDecodeError { - compression: Compression::Gzip, - error - }); - Ok(Bytes::from(buf)) - }), - // only support gzip for now - _ => Ok(Bytes::from(buf)), - }, - None => Ok(Bytes::from(buf)), + if is_gzip(&buf) { + decode_gzip(&buf[..]).or_else(|error| { + emit!(AwsKinesisFirehoseAutomaticRecordDecodeError { + compression: Compression::Gzip, + error + }); + Ok(Bytes::from(buf)) + }) + } else { + // only support gzip for now + Ok(Bytes::from(buf)) } } } } +fn is_gzip(data: &[u8]) -> bool { + // The header length of a GZIP file is 10 bytes. The first two bytes of the constant comes from + // the GZIP file format specification, which is the fixed member header identification bytes. + // The third byte is the compression method, of which only one is defined which is 8 for the + // deflate algorithm. + // + // Reference: https://datatracker.ietf.org/doc/html/rfc1952 Section 2.3 + data.len() >= 10 && &data[..3] == b"\x1f\x8b\x08" +} + fn decode_gzip(data: &[u8]) -> std::io::Result { let mut decoded = Vec::new(); @@ -245,3 +253,22 @@ fn decode_gzip(data: &[u8]) -> std::io::Result { Ok(Bytes::from(decoded)) } + +#[cfg(test)] +mod tests { + use flate2::{write::GzEncoder, Compression}; + use std::io::Write as _; + + use super::*; + + const CONTENT: &[u8] = b"Example"; + + #[test] + fn correctly_detects_gzipped_content() { + assert!(!is_gzip(CONTENT)); + let mut encoder = GzEncoder::new(Vec::new(), Compression::fast()); + encoder.write_all(CONTENT).unwrap(); + let compressed = encoder.finish().unwrap(); + assert!(is_gzip(&compressed)); + } +}