Skip to content

Commit

Permalink
fix(sinks/gcp/pubsub.rs): remove pubsub log only restriction
Browse files Browse the repository at this point in the history
  • Loading branch information
Genady Postrilko authored and genadipost committed Oct 19, 2024
1 parent 4588cec commit d68beb0
Showing 1 changed file with 40 additions and 3 deletions.
43 changes: 40 additions & 3 deletions src/sinks/gcp/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use vector_lib::configurable::configurable_component;

use crate::{
codecs::{Encoder, EncodingConfig, Transformer},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
event::Event,
gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL},
http::HttpClient,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl SinkConfig for PubsubConfig {
}

fn input(&self) -> Input {
Input::new(self.encoding.config().input_type() & DataType::Log)
Input::new(self.encoding.config().input_type())
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
Expand Down Expand Up @@ -278,7 +278,8 @@ mod integration_tests {
use crate::test_util::components::{run_and_assert_sink_error, COMPONENT_ERROR_TAGS};
use crate::test_util::{
components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
random_events_with_stream, random_string, trace_init,
random_events_with_stream, random_metrics_with_stream,
random_string, trace_init,
};

const PROJECT: &str = "testproject";
Expand All @@ -305,6 +306,34 @@ mod integration_tests {
config(topic).build(cx).await.expect("Building sink failed")
}

#[tokio::test]
async fn publish_metrics() {
trace_init();

let (topic, subscription) = create_topic_subscription().await;
let (sink, healthcheck) = config_build(&topic).await;

healthcheck.await.expect("Health check failed");

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input, events) = random_metrics_with_stream(100, Some(batch), None);
run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let response = pull_messages(&subscription, 1000).await;
let messages = response
.receivedMessages
.as_ref()
.expect("Response is missing messages");
assert_eq!(input.len(), messages.len());
for i in 0..input.len() {
let data = messages[i].message.decode_data_as_value();
let data = serde_json::to_value(data).unwrap();
let expected = serde_json::to_value(input[i].as_metric()).unwrap();
assert_eq!(data, expected);
}
}

#[tokio::test]
async fn publish_events() {
trace_init();
Expand Down Expand Up @@ -434,6 +463,14 @@ mod integration_tests {
let data = String::from_utf8_lossy(&data);
serde_json::from_str(&data).expect("Invalid message structure")
}

fn decode_data_as_value(&self) -> Value {
let data = BASE64_STANDARD
.decode(&self.data)
.expect("Invalid base64 data");
let data = String::from_utf8_lossy(&data);
serde_json::from_str(&data).expect("Invalid json")
}
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down

0 comments on commit d68beb0

Please sign in to comment.