From 5e739922de37455eef9034732d0d837cc3a3b862 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Wed, 6 Nov 2024 23:39:57 -0800 Subject: [PATCH] enum Signed-off-by: Derek Wang --- rust/numaflow-core/src/metrics.rs | 25 ++- rust/numaflow-core/src/monovertex.rs | 13 +- rust/numaflow-core/src/shared/server_info.rs | 208 ++++++++++--------- 3 files changed, 136 insertions(+), 110 deletions(-) diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index a1561d0a2..7250cd4d2 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -400,8 +400,7 @@ pub(crate) fn forward_pipeline_metrics() -> &'static PipelineMetrics { PIPELINE_METRICS.get_or_init(PipelineMetrics::new) } -// forward_metrics_labels is a helper function used to fetch the -// SDK_INFO_LABELS object +// sdk_info_labels is a helper function used to build the labels used in sdk_info pub(crate) fn sdk_info_labels( language: String, version: String, @@ -1058,6 +1057,28 @@ mod tests { let _ = exponential_buckets_range(-1.0, 100.0, 10).collect::>(); } + #[test] + fn test_global_metrics() { + let metrics = global_metrics(); + let sdk_labels = sdk_info_labels( + "language".to_string(), + "version".to_string(), + "container_type".to_string(), + ); + metrics.sdk_info.get_or_create(&sdk_labels).set(1); + + let state = global_registry().registry.lock(); + let mut buffer = String::new(); + encode(&mut buffer, &state).unwrap(); + let got = buffer.trim().lines().collect::>().join("\n"); + + let expected = r#"# HELP sdk_info A metric with a constant value '1', labeled by SDK information such as version, language, and type. +# TYPE sdk_info gauge +sdk_info{language="language",version="version",type="container_type"} 1 +# EOF"#; + assert_eq!(got.trim(), expected); + } + #[test] fn test_metric_names() { let metrics = forward_mvtx_metrics(); diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index 2949b16f3..f498646f1 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -10,10 +10,7 @@ use crate::config::components::{sink, source, transformer}; use crate::config::monovertex::MonovertexConfig; use crate::error::{self, Error}; use crate::metrics; -use crate::shared::server_info::{ - sdk_server_info, CONTAINER_TYPE_FB_SINKER, CONTAINER_TYPE_SINKER, CONTAINER_TYPE_SOURCER, - CONTAINER_TYPE_SOURCE_TRANSFORMER, -}; +use crate::shared::server_info::{sdk_server_info, ContainerType}; use crate::shared::utils; use crate::shared::utils::{ create_rpc_channel, wait_until_sink_ready, wait_until_source_ready, @@ -50,7 +47,7 @@ pub(crate) async fn start_forwarder( let metric_labels = metrics::sdk_info_labels( server_info.language, server_info.version, - CONTAINER_TYPE_SOURCER.to_string(), + ContainerType::Sourcer.to_string(), ); metrics::global_metrics() .sdk_info @@ -81,7 +78,7 @@ pub(crate) async fn start_forwarder( let metric_labels = metrics::sdk_info_labels( server_info.language, server_info.version, - CONTAINER_TYPE_SINKER.to_string(), + ContainerType::Sinker.to_string(), ); metrics::global_metrics() .sdk_info @@ -111,7 +108,7 @@ pub(crate) async fn start_forwarder( let metric_labels = metrics::sdk_info_labels( server_info.language, server_info.version, - CONTAINER_TYPE_FB_SINKER.to_string(), + ContainerType::FbSinker.to_string(), ); metrics::global_metrics() .sdk_info @@ -147,7 +144,7 @@ pub(crate) async fn start_forwarder( let metric_labels = metrics::sdk_info_labels( server_info.language, server_info.version, - CONTAINER_TYPE_SOURCE_TRANSFORMER.to_string(), + ContainerType::SourceTransformer.to_string(), ); metrics::global_metrics() diff --git a/rust/numaflow-core/src/shared/server_info.rs b/rust/numaflow-core/src/shared/server_info.rs index 9531e4897..af3f6256d 100644 --- a/rust/numaflow-core/src/shared/server_info.rs +++ b/rust/numaflow-core/src/shared/server_info.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fmt; use std::fs; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -18,16 +19,59 @@ use crate::shared::server_info::version::SdkConstraints; // Equivalent to U+005C__END__. const END: &str = "U+005C__END__"; -pub const CONTAINER_TYPE_SOURCER: &str = "sourcer"; -pub const CONTAINER_TYPE_SOURCE_TRANSFORMER: &str = "sourcetransformer"; -pub const CONTAINER_TYPE_SINKER: &str = "sinker"; -pub const CONTAINER_TYPE_MAPPER: &str = "mapper"; -pub const CONTAINER_TYPE_REDUCER: &str = "reducer"; -pub const CONTAINER_TYPE_REDUCE_STREAMER: &str = "reducestreamer"; -pub const CONTAINER_TYPE_SESSION_REDUCER: &str = "sessionreducer"; -pub const CONTAINER_TYPE_SIDE_INPUT: &str = "sideinput"; -pub const CONTAINER_TYPE_FB_SINKER: &str = "fb-sinker"; -pub const CONTAINER_TYPE_UNKNOWN: &str = "unknown"; +#[derive(Debug, Eq, PartialEq, Clone, Hash)] +pub enum ContainerType { + Sourcer, + SourceTransformer, + Sinker, + Mapper, + Reducer, + ReduceStreamer, + SessionReducer, + SideInput, + FbSinker, + Unknown, +} + +impl ContainerType { + fn as_str(&self) -> &'static str { + match self { + ContainerType::Sourcer => "sourcer", + ContainerType::SourceTransformer => "sourcetransformer", + ContainerType::Sinker => "sinker", + ContainerType::Mapper => "mapper", + ContainerType::Reducer => "reducer", + ContainerType::ReduceStreamer => "reducestreamer", + ContainerType::SessionReducer => "sessionreducer", + ContainerType::SideInput => "sideinput", + ContainerType::FbSinker => "fb-sinker", + ContainerType::Unknown => "unknown", + } + } +} + +impl fmt::Display for ContainerType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl From for ContainerType { + fn from(s: String) -> Self { + match s.as_str() { + "sourcer" => ContainerType::Sourcer, + "sourcetransformer" => ContainerType::SourceTransformer, + "sinker" => ContainerType::Sinker, + "mapper" => ContainerType::Mapper, + "reducer" => ContainerType::Reducer, + "reducestreamer" => ContainerType::ReduceStreamer, + "sessionreducer" => ContainerType::SessionReducer, + "sideinput" => ContainerType::SideInput, + "fb-sinker" => ContainerType::FbSinker, + _ => ContainerType::Unknown, + } + } +} /// ServerInfo structure to store server-related information #[derive(Serialize, Deserialize, Debug)] @@ -60,14 +104,11 @@ pub(crate) async fn sdk_server_info( let sdk_version = &server_info.version; let min_numaflow_version = &server_info.minimum_numaflow_version; let sdk_language = &server_info.language; - let container_type = get_container_type(&file_path).unwrap_or(CONTAINER_TYPE_UNKNOWN); + let container_type = get_container_type(&file_path).unwrap_or(ContainerType::Unknown); // Get version information let version_info = version::get_version_info(); let numaflow_version = &version_info.version; - info!("Version_info: {:?}", version_info); - - // Check minimum numaflow version compatibility if specified if min_numaflow_version.is_empty() { warn!("Failed to get the minimum numaflow version, skipping numaflow version compatibility check"); } else if !numaflow_version.contains("latest") @@ -87,7 +128,7 @@ pub(crate) async fn sdk_server_info( check_sdk_compatibility( sdk_version, sdk_language, - container_type, + &container_type, min_supported_sdk_versions, )?; } @@ -126,7 +167,7 @@ fn check_numaflow_compatibility( fn check_sdk_compatibility( sdk_version: &str, sdk_language: &str, - container_type: &str, + container_type: &ContainerType, min_supported_sdk_versions: &SdkConstraints, ) -> error::Result<()> { // Check if the SDK language is present in the minimum supported SDK versions @@ -275,25 +316,10 @@ fn trim_after_dash(input: &str) -> &str { /// Extracts the container type from the server info file. /// The file name is in the format of -server-info. -fn get_container_type(server_info_file: &Path) -> Option<&str> { +fn get_container_type(server_info_file: &Path) -> Option { let file_name = server_info_file.file_name()?; let container_type = file_name.to_str()?.trim_end_matches("-server-info"); - if container_type.is_empty() { - None - } else { - match container_type { - CONTAINER_TYPE_FB_SINKER - | CONTAINER_TYPE_MAPPER - | CONTAINER_TYPE_REDUCER - | CONTAINER_TYPE_REDUCE_STREAMER - | CONTAINER_TYPE_SESSION_REDUCER - | CONTAINER_TYPE_SIDE_INPUT - | CONTAINER_TYPE_SINKER - | CONTAINER_TYPE_SOURCER - | CONTAINER_TYPE_SOURCE_TRANSFORMER => Some(container_type), - _ => Some(CONTAINER_TYPE_UNKNOWN), - } - } + Some(ContainerType::from(container_type.to_string())) } /// Reads the server info file and returns the parsed ServerInfo struct. @@ -370,12 +396,9 @@ mod version { use std::env; use std::sync::LazyLock; - use super::{ - CONTAINER_TYPE_FB_SINKER, CONTAINER_TYPE_SINKER, CONTAINER_TYPE_SOURCER, - CONTAINER_TYPE_SOURCE_TRANSFORMER, - }; + use super::ContainerType; - pub(crate) type SdkConstraints = HashMap>; + pub(crate) type SdkConstraints = HashMap>; // MINIMUM_SUPPORTED_SDK_VERSIONS is the minimum supported version of each SDK for the current numaflow version. static MINIMUM_SUPPORTED_SDK_VERSIONS: LazyLock = LazyLock::new(|| { @@ -384,41 +407,26 @@ mod version { // please follow the instruction there to update the value // NOTE: the string content of the keys matches the corresponding server info file name. // DO NOT change it unless the server info file name is changed. - let mut go_version_map = HashMap::new(); - go_version_map.insert(CONTAINER_TYPE_SOURCER.to_string(), "0.8.0-z".to_string()); - go_version_map.insert( - CONTAINER_TYPE_SOURCE_TRANSFORMER.to_string(), - "0.8.0-z".to_string(), - ); - go_version_map.insert(CONTAINER_TYPE_SINKER.to_string(), "0.8.0-z".to_string()); - go_version_map.insert(CONTAINER_TYPE_FB_SINKER.to_string(), "0.8.0-z".to_string()); + let mut go_version_map: HashMap = HashMap::new(); + go_version_map.insert(ContainerType::Sourcer, "0.8.0-z".to_string()); + go_version_map.insert(ContainerType::SourceTransformer, "0.8.0-z".to_string()); + go_version_map.insert(ContainerType::Sinker, "0.8.0-z".to_string()); + go_version_map.insert(ContainerType::FbSinker, "0.8.0-z".to_string()); let mut python_version_map = HashMap::new(); - python_version_map.insert(CONTAINER_TYPE_SOURCER.to_string(), "0.8.0rc100".to_string()); - python_version_map.insert( - CONTAINER_TYPE_SOURCE_TRANSFORMER.to_string(), - "0.8.0rc100".to_string(), - ); - python_version_map.insert(CONTAINER_TYPE_SINKER.to_string(), "0.8.0rc100".to_string()); - python_version_map.insert( - CONTAINER_TYPE_FB_SINKER.to_string(), - "0.8.0rc100".to_string(), - ); + python_version_map.insert(ContainerType::Sourcer, "0.8.0rc100".to_string()); + python_version_map.insert(ContainerType::SourceTransformer, "0.8.0rc100".to_string()); + python_version_map.insert(ContainerType::Sinker, "0.8.0rc100".to_string()); + python_version_map.insert(ContainerType::FbSinker, "0.8.0rc100".to_string()); let mut java_version_map = HashMap::new(); - java_version_map.insert(CONTAINER_TYPE_SOURCER.to_string(), "0.8.0-z".to_string()); - java_version_map.insert( - CONTAINER_TYPE_SOURCE_TRANSFORMER.to_string(), - "0.8.0-z".to_string(), - ); - java_version_map.insert(CONTAINER_TYPE_SINKER.to_string(), "0.8.0-z".to_string()); - java_version_map.insert(CONTAINER_TYPE_FB_SINKER.to_string(), "0.8.0-z".to_string()); + java_version_map.insert(ContainerType::Sourcer, "0.8.0-z".to_string()); + java_version_map.insert(ContainerType::SourceTransformer, "0.8.0-z".to_string()); + java_version_map.insert(ContainerType::Sinker, "0.8.0-z".to_string()); + java_version_map.insert(ContainerType::FbSinker, "0.8.0-z".to_string()); let mut rust_version_map = HashMap::new(); - rust_version_map.insert(CONTAINER_TYPE_SOURCER.to_string(), "0.1.0-z".to_string()); - rust_version_map.insert( - CONTAINER_TYPE_SOURCE_TRANSFORMER.to_string(), - "0.1.0-z".to_string(), - ); - rust_version_map.insert(CONTAINER_TYPE_SINKER.to_string(), "0.1.0-z".to_string()); - rust_version_map.insert(CONTAINER_TYPE_FB_SINKER.to_string(), "0.1.0-z".to_string()); + rust_version_map.insert(ContainerType::Sourcer, "0.1.0-z".to_string()); + rust_version_map.insert(ContainerType::SourceTransformer, "0.1.0-z".to_string()); + rust_version_map.insert(ContainerType::Sinker, "0.1.0-z".to_string()); + rust_version_map.insert(ContainerType::FbSinker, "0.1.0-z".to_string()); let mut m = HashMap::new(); m.insert("go".to_string(), go_version_map); @@ -513,7 +521,7 @@ mod tests { const TCP: &str = "tcp"; const PYTHON: &str = "python"; const GOLANG: &str = "go"; - const TEST_CONTAINER_TYPE: &str = "sourcer"; + const TEST_CONTAINER_TYPE: ContainerType = ContainerType::Sourcer; async fn write_server_info( svr_info: &ServerInfo, @@ -565,13 +573,13 @@ mod tests { // Helper function to create a SdkConstraints struct with minimum supported SDK versions all being stable releases fn create_sdk_constraints_stable_versions() -> SdkConstraints { let mut go_version_map = HashMap::new(); - go_version_map.insert(TEST_CONTAINER_TYPE.to_string(), "0.10.0-z".to_string()); + go_version_map.insert(TEST_CONTAINER_TYPE, "0.10.0-z".to_string()); let mut python_version_map = HashMap::new(); - python_version_map.insert(TEST_CONTAINER_TYPE.to_string(), "1.2.0rc100".to_string()); + python_version_map.insert(TEST_CONTAINER_TYPE, "1.2.0rc100".to_string()); let mut java_version_map = HashMap::new(); - java_version_map.insert(TEST_CONTAINER_TYPE.to_string(), "2.0.0-z".to_string()); + java_version_map.insert(TEST_CONTAINER_TYPE, "2.0.0-z".to_string()); let mut rust_version_map = HashMap::new(); - rust_version_map.insert(TEST_CONTAINER_TYPE.to_string(), "0.1.0-z".to_string()); + rust_version_map.insert(TEST_CONTAINER_TYPE, "0.1.0-z".to_string()); let mut m = HashMap::new(); m.insert("go".to_string(), go_version_map); @@ -584,13 +592,13 @@ mod tests { // Helper function to create a SdkConstraints struct with minimum supported SDK versions all being pre-releases fn create_sdk_constraints_pre_release_versions() -> SdkConstraints { let mut go_version_map = HashMap::new(); - go_version_map.insert(TEST_CONTAINER_TYPE.to_string(), "0.10.0-rc2".to_string()); + go_version_map.insert(TEST_CONTAINER_TYPE, "0.10.0-rc2".to_string()); let mut python_version_map = HashMap::new(); - python_version_map.insert(TEST_CONTAINER_TYPE.to_string(), "1.2.0b2".to_string()); + python_version_map.insert(TEST_CONTAINER_TYPE, "1.2.0b2".to_string()); let mut java_version_map = HashMap::new(); - java_version_map.insert(TEST_CONTAINER_TYPE.to_string(), "2.0.0-rc2".to_string()); + java_version_map.insert(TEST_CONTAINER_TYPE, "2.0.0-rc2".to_string()); let mut rust_version_map = HashMap::new(); - rust_version_map.insert(TEST_CONTAINER_TYPE.to_string(), "0.1.0-rc3".to_string()); + rust_version_map.insert(TEST_CONTAINER_TYPE, "0.1.0-rc3".to_string()); let mut m = HashMap::new(); m.insert("go".to_string(), go_version_map); @@ -609,7 +617,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -625,7 +633,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -644,7 +652,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -660,7 +668,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -679,7 +687,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -695,7 +703,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -714,7 +722,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -730,7 +738,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -749,7 +757,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -765,7 +773,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -784,7 +792,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -800,7 +808,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -819,7 +827,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -835,7 +843,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -854,7 +862,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -870,7 +878,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -889,7 +897,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -905,7 +913,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -924,7 +932,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -940,7 +948,7 @@ mod tests { let result = check_sdk_compatibility( sdk_version, sdk_language, - TEST_CONTAINER_TYPE, + &TEST_CONTAINER_TYPE, &min_supported_sdk_versions, ); @@ -1054,7 +1062,7 @@ mod tests { async fn test_get_container_type_from_file_valid() { let file_path = PathBuf::from("/var/run/numaflow/sourcer-server-info"); let container_type = get_container_type(&file_path); - assert_eq!("sourcer", container_type.unwrap()); + assert_eq!(ContainerType::Sourcer, container_type.unwrap()); } #[tokio::test]