Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sinks/gcp/pubsub.rs): remove pubsub log only restriction #21557

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/21557_remove_pubsub_log_only_restriction.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Remove pubsub log only restriction. This allows using pubsub sink/source for metric and trace events.

authors: genadipost
42 changes: 39 additions & 3 deletions src/sinks/gcp/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use vector_lib::configurable::configurable_component;

use crate::{
codecs::{Encoder, EncodingConfig, Transformer},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
event::Event,
gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL},
http::HttpClient,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl SinkConfig for PubsubConfig {
}

fn input(&self) -> Input {
Input::new(self.encoding.config().input_type() & DataType::Log)
Input::new(self.encoding.config().input_type())
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
Expand Down Expand Up @@ -278,7 +278,7 @@ mod integration_tests {
use crate::test_util::components::{run_and_assert_sink_error, COMPONENT_ERROR_TAGS};
use crate::test_util::{
components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
random_events_with_stream, random_string, trace_init,
random_events_with_stream, random_metrics_with_stream, random_string, trace_init,
};

const PROJECT: &str = "testproject";
Expand All @@ -305,6 +305,34 @@ mod integration_tests {
config(topic).build(cx).await.expect("Building sink failed")
}

#[tokio::test]
async fn publish_metrics() {
trace_init();

let (topic, subscription) = create_topic_subscription().await;
let (sink, healthcheck) = config_build(&topic).await;

healthcheck.await.expect("Health check failed");

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input, events) = random_metrics_with_stream(100, Some(batch), None);
run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let response = pull_messages(&subscription, 1000).await;
let messages = response
.receivedMessages
.as_ref()
.expect("Response is missing messages");
assert_eq!(input.len(), messages.len());
for i in 0..input.len() {
let data = messages[i].message.decode_data_as_value();
let data = serde_json::to_value(data).unwrap();
let expected = serde_json::to_value(input[i].as_metric()).unwrap();
assert_eq!(data, expected);
}
}

#[tokio::test]
async fn publish_events() {
trace_init();
Expand Down Expand Up @@ -434,6 +462,14 @@ mod integration_tests {
let data = String::from_utf8_lossy(&data);
serde_json::from_str(&data).expect("Invalid message structure")
}

fn decode_data_as_value(&self) -> Value {
let data = BASE64_STANDARD
.decode(&self.data)
.expect("Invalid base64 data");
let data = String::from_utf8_lossy(&data);
serde_json::from_str(&data).expect("Invalid json")
}
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
Loading