diff --git a/crates/fluvio-controlplane/tests/k8_remote_cluster_v1.json b/crates/fluvio-controlplane-metadata/tests/k8_remote_cluster_v1.json similarity index 100% rename from crates/fluvio-controlplane/tests/k8_remote_cluster_v1.json rename to crates/fluvio-controlplane-metadata/tests/k8_remote_cluster_v1.json diff --git a/crates/fluvio-controlplane/tests/k8_remote_cluster_v1.yaml b/crates/fluvio-controlplane-metadata/tests/k8_remote_cluster_v1.yaml similarity index 100% rename from crates/fluvio-controlplane/tests/k8_remote_cluster_v1.yaml rename to crates/fluvio-controlplane-metadata/tests/k8_remote_cluster_v1.yaml diff --git a/crates/fluvio-controlplane/tests/k8_upstream_cluster.yaml b/crates/fluvio-controlplane-metadata/tests/k8_upstream_cluster.yaml similarity index 100% rename from crates/fluvio-controlplane/tests/k8_upstream_cluster.yaml rename to crates/fluvio-controlplane-metadata/tests/k8_upstream_cluster.yaml diff --git a/crates/fluvio-controlplane/tests/k8_upstream_cluster_v1.json b/crates/fluvio-controlplane-metadata/tests/k8_upstream_cluster_v1.json similarity index 100% rename from crates/fluvio-controlplane/tests/k8_upstream_cluster_v1.json rename to crates/fluvio-controlplane-metadata/tests/k8_upstream_cluster_v1.json diff --git a/crates/fluvio-controlplane/src/lib.rs b/crates/fluvio-controlplane/src/lib.rs index e5aa305b2c..3a46945a55 100644 --- a/crates/fluvio-controlplane/src/lib.rs +++ b/crates/fluvio-controlplane/src/lib.rs @@ -3,8 +3,6 @@ pub mod spu_api; pub mod replica; pub mod message; pub mod requests; -// pub mod remote_cluster; -// pub mod upstream_cluster; pub use alias::*; mod alias { diff --git a/crates/fluvio-sc/src/services/private_api/private_server.rs b/crates/fluvio-sc/src/services/private_api/private_server.rs index 039a2a65d4..bdcb019adb 100644 --- a/crates/fluvio-sc/src/services/private_api/private_server.rs +++ b/crates/fluvio-sc/src/services/private_api/private_server.rs @@ -106,12 +106,18 @@ where info!(spu_id, "SPU connected"); + let health_check = context.health().clone(); + + health_check.update(spu_id, true).await; + if let Err(err) = dispatch_loop(context, spu_id, api_stream, sink).await { error!("error with SPU <{}>, error: {}", spu_id, err); } info!(spu_id, "Terminating connection to SPU"); + health_check.update(spu_id, false).await; + Ok(()) } } @@ -127,10 +133,6 @@ async fn dispatch_loop( where C: MetadataItem, { - let health_check = context.health().clone(); - - health_check.update(spu_id, true).await; - let mut spu_spec_listener = context.spus().change_listener(); let mut partition_spec_listener = context.partitions().change_listener(); let mut sm_spec_listener = context.smartmodules().change_listener(); @@ -169,8 +171,6 @@ where match req_message { InternalScRequest::UpdateLrsRequest(msg) => { receive_lrs_update(&context,msg.request).await; - // health_check.update(spu_id, true).await; - }, InternalScRequest::RegisterSpuRequest(msg) => { error!("registration req only valid during initialization: {:#?}",msg); @@ -205,10 +205,6 @@ where }, - _ = sm_spec_listener.listen() => { - debug!("smartmodule lister changed"); - }, - _ = rm_cluster_listener.listen() => { debug!("remote cluster lister changed"); }, @@ -220,8 +216,6 @@ where } } - health_check.update(spu_id, false).await; - Ok(()) }