Skip to content

Commit

Permalink
enum
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Nov 7, 2024
1 parent 16ba618 commit 5e73992
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 110 deletions.
25 changes: 23 additions & 2 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,7 @@ pub(crate) fn forward_pipeline_metrics() -> &'static PipelineMetrics {
PIPELINE_METRICS.get_or_init(PipelineMetrics::new)
}

// forward_metrics_labels is a helper function used to fetch the
// SDK_INFO_LABELS object
// sdk_info_labels is a helper function used to build the labels used in sdk_info
pub(crate) fn sdk_info_labels(
language: String,
version: String,
Expand Down Expand Up @@ -1058,6 +1057,28 @@ mod tests {
let _ = exponential_buckets_range(-1.0, 100.0, 10).collect::<Vec<f64>>();
}

#[test]
fn test_global_metrics() {
let metrics = global_metrics();
let sdk_labels = sdk_info_labels(
"language".to_string(),
"version".to_string(),
"container_type".to_string(),
);
metrics.sdk_info.get_or_create(&sdk_labels).set(1);

let state = global_registry().registry.lock();
let mut buffer = String::new();
encode(&mut buffer, &state).unwrap();
let got = buffer.trim().lines().collect::<Vec<&str>>().join("\n");

let expected = r#"# HELP sdk_info A metric with a constant value '1', labeled by SDK information such as version, language, and type.
# TYPE sdk_info gauge
sdk_info{language="language",version="version",type="container_type"} 1
# EOF"#;
assert_eq!(got.trim(), expected);
}

#[test]
fn test_metric_names() {
let metrics = forward_mvtx_metrics();
Expand Down
13 changes: 5 additions & 8 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use crate::config::components::{sink, source, transformer};
use crate::config::monovertex::MonovertexConfig;
use crate::error::{self, Error};
use crate::metrics;
use crate::shared::server_info::{
sdk_server_info, CONTAINER_TYPE_FB_SINKER, CONTAINER_TYPE_SINKER, CONTAINER_TYPE_SOURCER,
CONTAINER_TYPE_SOURCE_TRANSFORMER,
};
use crate::shared::server_info::{sdk_server_info, ContainerType};
use crate::shared::utils;
use crate::shared::utils::{
create_rpc_channel, wait_until_sink_ready, wait_until_source_ready,
Expand Down Expand Up @@ -50,7 +47,7 @@ pub(crate) async fn start_forwarder(
let metric_labels = metrics::sdk_info_labels(
server_info.language,
server_info.version,
CONTAINER_TYPE_SOURCER.to_string(),
ContainerType::Sourcer.to_string(),
);
metrics::global_metrics()
.sdk_info
Expand Down Expand Up @@ -81,7 +78,7 @@ pub(crate) async fn start_forwarder(
let metric_labels = metrics::sdk_info_labels(
server_info.language,
server_info.version,
CONTAINER_TYPE_SINKER.to_string(),
ContainerType::Sinker.to_string(),
);
metrics::global_metrics()
.sdk_info
Expand Down Expand Up @@ -111,7 +108,7 @@ pub(crate) async fn start_forwarder(
let metric_labels = metrics::sdk_info_labels(
server_info.language,
server_info.version,
CONTAINER_TYPE_FB_SINKER.to_string(),
ContainerType::FbSinker.to_string(),
);
metrics::global_metrics()
.sdk_info
Expand Down Expand Up @@ -147,7 +144,7 @@ pub(crate) async fn start_forwarder(
let metric_labels = metrics::sdk_info_labels(
server_info.language,
server_info.version,
CONTAINER_TYPE_SOURCE_TRANSFORMER.to_string(),
ContainerType::SourceTransformer.to_string(),
);

metrics::global_metrics()
Expand Down
Loading

0 comments on commit 5e73992

Please sign in to comment.