Skip to content

Commit

Permalink
webhook: add raw mode to send data as is without wrapping
Browse files Browse the repository at this point in the history
  • Loading branch information
bigherc18 committed Jul 12, 2023
1 parent 7f00306 commit b97584c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
6 changes: 5 additions & 1 deletion sink-webhook/src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ struct Cli {
/// Additional headers to send with the request.
#[arg(long, short = 'H', env, value_delimiter = ',')]
header: Vec<String>,
#[arg(long, env, action)]
/// Send the data received from the transform step as is, this is useful for
/// Discord/Telegram/... APIs
raw: bool,
#[command(flatten)]
configuration: ConfigurationArgs,
}
Expand All @@ -23,7 +27,7 @@ async fn main() -> anyhow::Result<()> {
init_opentelemetry()?;
let args = Cli::parse();

let sink = WebhookSink::new(args.target_url)?.with_headers(&args.header)?;
let sink = WebhookSink::new(args.target_url, args.raw)?.with_headers(&args.header)?;
let ct = CancellationToken::new();
let connector = SinkConnector::<Filter, Block>::from_configuration_args(args.configuration)?;

Expand Down
31 changes: 20 additions & 11 deletions sink-webhook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ pub struct WebhookSink {
client: Client,
target_url: String,
headers: HeaderMap,
raw: bool,
}

impl WebhookSink {
pub fn new(target_url: String) -> Result<Self, InvalidUri> {
pub fn new(target_url: String, raw: bool) -> Result<Self, InvalidUri> {
let _ = Uri::from_str(&target_url)?;
Ok(Self {
client: Client::new(),
target_url,
headers: HeaderMap::new(),
raw: raw,
})
}

Expand Down Expand Up @@ -109,20 +111,27 @@ impl Sink for WebhookSink {
"webhook: calling with data"
);

let body = json!({
"data": {
"cursor": cursor,
"end_cursor": end_cursor,
"finality": finality,
"batch": batch,
},
});

self.send(&body).await
if self.raw {
self.send(&batch).await
} else {
let body = &json!({
"data": {
"cursor": cursor,
"end_cursor": end_cursor,
"finality": finality,
"batch": batch,
},
});
self.send(&body).await
}
}

#[instrument(skip(self), err(Debug))]
async fn handle_invalidate(&mut self, cursor: &Option<Cursor>) -> Result<(), Self::Error> {
if self.raw {
return Ok(());
}

let cursor_str = cursor
.clone()
.map(|c| c.to_string())
Expand Down

0 comments on commit b97584c

Please sign in to comment.