From b97584c8c8f3e0e2059d0262ede4022c67acb7f0 Mon Sep 17 00:00:00 2001 From: Big Herc Date: Tue, 11 Jul 2023 22:53:28 +0100 Subject: [PATCH] webhook: add raw mode to send data as is without wrapping --- sink-webhook/src/bin.rs | 6 +++++- sink-webhook/src/lib.rs | 31 ++++++++++++++++++++----------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/sink-webhook/src/bin.rs b/sink-webhook/src/bin.rs index 588fa97d..599dd98e 100644 --- a/sink-webhook/src/bin.rs +++ b/sink-webhook/src/bin.rs @@ -14,6 +14,10 @@ struct Cli { /// Additional headers to send with the request. #[arg(long, short = 'H', env, value_delimiter = ',')] header: Vec, + #[arg(long, env, action)] + /// Send the data received from the transform step as is, this is useful for + /// Discord/Telegram/... APIs + raw: bool, #[command(flatten)] configuration: ConfigurationArgs, } @@ -23,7 +27,7 @@ async fn main() -> anyhow::Result<()> { init_opentelemetry()?; let args = Cli::parse(); - let sink = WebhookSink::new(args.target_url)?.with_headers(&args.header)?; + let sink = WebhookSink::new(args.target_url, args.raw)?.with_headers(&args.header)?; let ct = CancellationToken::new(); let connector = SinkConnector::::from_configuration_args(args.configuration)?; diff --git a/sink-webhook/src/lib.rs b/sink-webhook/src/lib.rs index 2ff31ddd..d250ce5b 100644 --- a/sink-webhook/src/lib.rs +++ b/sink-webhook/src/lib.rs @@ -33,15 +33,17 @@ pub struct WebhookSink { client: Client, target_url: String, headers: HeaderMap, + raw: bool, } impl WebhookSink { - pub fn new(target_url: String) -> Result { + pub fn new(target_url: String, raw: bool) -> Result { let _ = Uri::from_str(&target_url)?; Ok(Self { client: Client::new(), target_url, headers: HeaderMap::new(), + raw: raw, }) } @@ -109,20 +111,27 @@ impl Sink for WebhookSink { "webhook: calling with data" ); - let body = json!({ - "data": { - "cursor": cursor, - "end_cursor": end_cursor, - "finality": finality, - "batch": batch, - }, - }); - - self.send(&body).await + if self.raw { + self.send(&batch).await + } else { + let body = &json!({ + "data": { + "cursor": cursor, + "end_cursor": end_cursor, + "finality": finality, + "batch": batch, + }, + }); + self.send(&body).await + } } #[instrument(skip(self), err(Debug))] async fn handle_invalidate(&mut self, cursor: &Option) -> Result<(), Self::Error> { + if self.raw { + return Ok(()); + } + let cursor_str = cursor .clone() .map(|c| c.to_string())