Skip to content

Commit

Permalink
feat(tap): Add duration flag to vector tap (#20815)
Browse files Browse the repository at this point in the history
* add duration flag to vector tap

* added suggested changes + documentation/changelog

* modified duration flag to duration_ms

* add changelog author

* remove trailing space

* use rustfmt to format changes
  • Loading branch information
ArunPiduguDD authored Jul 10, 2024
1 parent 6680b15 commit 1579627
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 23 deletions.
5 changes: 5 additions & 0 deletions changelog.d/vector_tap_duration_flag.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The `vector tap` command now has an optional `duration_ms` flag that allows you to specify the duration of the
tap. By default, the tap will run indefinitely, but if a duration is specified (in milliseconds) the tap will
automatically stop after that duration has elapsed.

authors: ArunPiduguDD
65 changes: 42 additions & 23 deletions src/tap/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{borrow::Cow, collections::BTreeMap, time::Duration};

use colored::{ColoredString, Colorize};
use std::time::Instant;
use std::{borrow::Cow, collections::BTreeMap, time::Duration};
use tokio::time::timeout;
use tokio_stream::StreamExt;
use url::Url;
use vector_lib::api_client::{
Expand Down Expand Up @@ -98,34 +99,52 @@ async fn run(
);
};

let start_time = Instant::now();
let stream_duration = opts
.duration_ms
.map(Duration::from_millis)
.unwrap_or(Duration::MAX);

// Loop over the returned results, printing out tap events.
#[allow(clippy::print_stdout)]
#[allow(clippy::print_stderr)]
loop {
let message = stream.next().await;
if let Some(Some(res)) = message {
if let Some(d) = res.data {
for tap_event in d.output_events_by_component_id_patterns.iter() {
match tap_event {
OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Log(ev) => {
println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref()));
},
OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Metric(ev) => {
println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref()));
},
OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Trace(ev) => {
println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref()));
},
OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::EventNotification(ev) => {
if !opts.quiet {
eprintln!("{}", ev.message);
}
},
let time_elapsed = start_time.elapsed();
if time_elapsed >= stream_duration {
return exitcode::OK;
}

let message = timeout(stream_duration - time_elapsed, stream.next()).await;
match message {
Ok(Some(Some(res))) => {
if let Some(d) = res.data {
for tap_event in d.output_events_by_component_id_patterns.iter() {
match tap_event {
OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Log(ev) => {
println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref()));
},
OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Metric(ev) => {
println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref()));
},
OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Trace(ev) => {
println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref()));
},
OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::EventNotification(ev) => {
if !opts.quiet {
eprintln!("{}", ev.message);
}
},
}
}
}
}
} else {
return exitcode::TEMPFAIL;
Err(_) =>
// If the stream times out, that indicates the duration specified by the user
// has elapsed. We should exit gracefully.
{
return exitcode::OK
}
Ok(_) => return exitcode::TEMPFAIL,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/tap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct Opts {
/// Whether to reconnect if the underlying API connection drops. By default, tap will attempt to reconnect if the connection drops.
#[arg(short, long)]
no_reconnect: bool,

/// Specifies a duration (in milliseconds) to sample logs (e.g. specifying 10000 will sample logs for 10 seconds then exit)
#[arg(short = 'd', long)]
duration_ms: Option<u64>,
}

impl Opts {
Expand Down
4 changes: 4 additions & 0 deletions website/cue/reference/cli.cue
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ cli: {
_short: "n"
description: "Whether to reconnect if the underlying Vector API connection drops. By default, tap will attempt to reconnect if the connection drops."
}
"duration_ms": {
_short: "d"
description: "Specifies a duration (in milliseconds) to sample logs (e.g. passing in 10000 will sample logs for 10 seconds then exit)."
}
}

options: {
Expand Down

0 comments on commit 1579627

Please sign in to comment.