From d68beb0e5d866b35b11fdbe4b095d5a709d8677d Mon Sep 17 00:00:00 2001 From: Genady Postrilko Date: Sat, 19 Oct 2024 14:55:10 +0300 Subject: [PATCH 1/3] fix(sinks/gcp/pubsub.rs): remove pubsub log only restriction --- src/sinks/gcp/pubsub.rs | 43 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index 3421b1bab975a..5e9e0c522c022 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -11,7 +11,7 @@ use vector_lib::configurable::configurable_component; use crate::{ codecs::{Encoder, EncodingConfig, Transformer}, - config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, + config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, event::Event, gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL}, http::HttpClient, @@ -144,7 +144,7 @@ impl SinkConfig for PubsubConfig { } fn input(&self) -> Input { - Input::new(self.encoding.config().input_type() & DataType::Log) + Input::new(self.encoding.config().input_type()) } fn acknowledgements(&self) -> &AcknowledgementsConfig { @@ -278,7 +278,8 @@ mod integration_tests { use crate::test_util::components::{run_and_assert_sink_error, COMPONENT_ERROR_TAGS}; use crate::test_util::{ components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, - random_events_with_stream, random_string, trace_init, + random_events_with_stream, random_metrics_with_stream, + random_string, trace_init, }; const PROJECT: &str = "testproject"; @@ -305,6 +306,34 @@ mod integration_tests { config(topic).build(cx).await.expect("Building sink failed") } + #[tokio::test] + async fn publish_metrics() { + trace_init(); + + let (topic, subscription) = create_topic_subscription().await; + let (sink, healthcheck) = config_build(&topic).await; + + healthcheck.await.expect("Health check failed"); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input, events) = random_metrics_with_stream(100, Some(batch), None); + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let response = pull_messages(&subscription, 1000).await; + let messages = response + .receivedMessages + .as_ref() + .expect("Response is missing messages"); + assert_eq!(input.len(), messages.len()); + for i in 0..input.len() { + let data = messages[i].message.decode_data_as_value(); + let data = serde_json::to_value(data).unwrap(); + let expected = serde_json::to_value(input[i].as_metric()).unwrap(); + assert_eq!(data, expected); + } + } + #[tokio::test] async fn publish_events() { trace_init(); @@ -434,6 +463,14 @@ mod integration_tests { let data = String::from_utf8_lossy(&data); serde_json::from_str(&data).expect("Invalid message structure") } + + fn decode_data_as_value(&self) -> Value { + let data = BASE64_STANDARD + .decode(&self.data) + .expect("Invalid base64 data"); + let data = String::from_utf8_lossy(&data); + serde_json::from_str(&data).expect("Invalid json") + } } #[derive(Debug, Deserialize, Serialize)] From dd679f3db1b5b89ab0ba9a5ed542860177060a8d Mon Sep 17 00:00:00 2001 From: Genady Postrilko Date: Mon, 21 Oct 2024 20:05:29 +0300 Subject: [PATCH 2/3] add changelog entry --- changelog.d/21557_remove_pubsub_log_only_restriction.fix.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/21557_remove_pubsub_log_only_restriction.fix.md diff --git a/changelog.d/21557_remove_pubsub_log_only_restriction.fix.md b/changelog.d/21557_remove_pubsub_log_only_restriction.fix.md new file mode 100644 index 0000000000000..aa340b2226408 --- /dev/null +++ b/changelog.d/21557_remove_pubsub_log_only_restriction.fix.md @@ -0,0 +1,3 @@ +Remove pubsub log only restriction. This allows using pubsub sink/source for metric and trace events. + +authors: genadipost From 7aa1c0ee0580eaa65935e187320bed60b38d11b1 Mon Sep 17 00:00:00 2001 From: Genady Postrilko Date: Mon, 21 Oct 2024 21:46:11 +0300 Subject: [PATCH 3/3] fix fmt --- src/sinks/gcp/pubsub.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index 5e9e0c522c022..3703962f00d9d 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -278,8 +278,7 @@ mod integration_tests { use crate::test_util::components::{run_and_assert_sink_error, COMPONENT_ERROR_TAGS}; use crate::test_util::{ components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, - random_events_with_stream, random_metrics_with_stream, - random_string, trace_init, + random_events_with_stream, random_metrics_with_stream, random_string, trace_init, }; const PROJECT: &str = "testproject";