diff --git a/src/lib.rs b/src/lib.rs index 23393c9..c8e8edb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -155,6 +155,7 @@ const GET_LOCK_METHOD_NAME: &str = "get_lock"; const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity"; const DEREGISTER_ENTITY_METHOD_NAME: &str = "deregister_entity"; const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names"; +const GET_SERVICE_NODES_HEALTH_METHOD_NAME: &str = "get_service_nodes_health"; const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes"; const CREATE_SESSION_METHOD_NAME: &str = "create_session"; const GET_DATACENTERS: &str = "get_datacenters"; @@ -736,9 +737,51 @@ impl Consul { .uri(uri.clone()) } + /// returns the Node by a given service and name + /// + /// If the node is not present, returns none. + /// + /// This uses the catalog#list-nodes function in the Consul API + /// then supplies a filter by the Node name + pub async fn get_node_by_name_and_service( + &self, + service: &str, + node_name: &str, + query_opts: Option, + ) -> Result>> { + let query_opts = query_opts.unwrap_or_default(); + let filter = format!("Node == {node_name}"); + let request = GetServiceNodesRequest { + service, + near: None, + passing: false, + filter: Some(filter.as_str()), + }; + let req = self.build_get_service_nodes_catalog_req(request, &query_opts); + let (mut response_body, index) = self + .execute_request( + req, + hyper::Body::empty(), + query_opts.timeout, + GET_SERVICE_NODES_METHOD_NAME, + ) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let mut response = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + if let Some(node) = response.pop() { + Ok(Some(ResponseMeta { + response: node, + index, + })) + } else { + Ok(None) + } + } + /// returns the nodes providing the service indicated on the path. /// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks. - /// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information. + /// See the [consul docs](https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service) for more information. /// # Arguments: /// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest) /// # Errors: @@ -747,9 +790,9 @@ impl Consul { &self, request: GetServiceNodesRequest<'_>, query_opts: Option, - ) -> Result> { + ) -> Result>> { let query_opts = query_opts.unwrap_or_default(); - let req = self.build_get_service_nodes_req(request, &query_opts); + let req = self.build_get_service_nodes_catalog_req(request, &query_opts); let (mut response_body, index) = self .execute_request( req, @@ -759,6 +802,34 @@ impl Consul { ) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); + let response = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + Ok(ResponseMeta { response, index }) + } + + /// returns the nodes providing the service indicated on the path. + /// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks. + /// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information. + /// # Arguments: + /// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn get_service_nodes_health( + &self, + request: GetServiceNodesRequest<'_>, + query_opts: Option, + ) -> Result> { + let query_opts = query_opts.unwrap_or_default(); + let req = self.build_get_service_nodes_health_req(request, &query_opts); + let (mut response_body, index) = self + .execute_request( + req, + hyper::Body::empty(), + query_opts.timeout, + GET_SERVICE_NODES_HEALTH_METHOD_NAME, + ) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); let response = serde_json::from_slice::(&bytes) .map_err(ConsulError::ResponseDeserializationFailed)?; Ok(ResponseMeta { response, index }) @@ -775,10 +846,9 @@ impl Consul { passing: true, ..Default::default() }; - let services = self.get_service_nodes(request, query_opts).await.map_err(|e| { + let services = self.get_service_nodes_health(request, query_opts).await.map_err(|e| { let err = format!( - "Unable to query consul to resolve service '{}' to a list of addresses and ports: {:?}", - service_name, e + "Unable to query consul to resolve service '{service_name}' to a list of addresses and ports: {e:?}" ); error!("{}", err); ConsulError::ServiceInstanceResolutionFailed(service_name.to_string()) @@ -881,7 +951,28 @@ impl Consul { serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed) } - fn build_get_service_nodes_req( + fn build_get_service_nodes_catalog_req( + &self, + request: GetServiceNodesRequest<'_>, + query_opts: &QueryOptions, + ) -> http::request::Builder { + let req = hyper::Request::builder().method(Method::GET); + let mut url = String::new(); + url.push_str(&format!( + "{}/v1/catalog/service/{}", + self.config.address, request.service + )); + if request.passing { + url.push_str(&format!("?passing={}", request.passing)); + } + if let Some(filter) = request.filter { + url.push_str(&format!("&filter={filter}")); + } + add_query_option_params(&mut url, query_opts, '&'); + req.uri(url) + } + + fn build_get_service_nodes_health_req( &self, request: GetServiceNodesRequest<'_>, query_opts: &QueryOptions, @@ -896,10 +987,10 @@ impl Consul { url.push_str(&format!("?passing={}", request.passing)); } if let Some(near) = request.near { - url.push_str(&format!("&near={}", near)); + url.push_str(&format!("&near={near}")); } if let Some(filter) = request.filter { - url.push_str(&format!("&filter={}", filter)); + url.push_str(&format!("&filter={filter}")); } add_query_option_params(&mut url, query_opts, '&'); req.uri(url) @@ -985,7 +1076,7 @@ impl Consul { fn build_create_txn_url(&self, datacenter: Option<&str>) -> String { let mut url = format!("{}/v1/txn", self.config.address); if let Some(dc) = datacenter { - url.push_str(&format!("?datacenter={}", dc)); + url.push_str(&format!("?datacenter={dc}")); } url } @@ -1010,7 +1101,7 @@ impl Consul { } if let Some(cas_idx) = request.check_and_set { url = add_query_param_separator(url, added_query_param); - url.push_str(&format!("cas={}", cas_idx)); + url.push_str(&format!("cas={cas_idx}")); } add_namespace_and_datacenter(url, request.namespace, request.datacenter) @@ -1020,18 +1111,18 @@ impl Consul { fn add_query_option_params(uri: &mut String, query_opts: &QueryOptions, mut separator: char) { if let Some(ns) = &query_opts.namespace { if !ns.is_empty() { - uri.push_str(&format!("{}ns={}", separator, ns)); + uri.push_str(&format!("{separator}ns={ns}")); separator = '&'; } } if let Some(dc) = &query_opts.datacenter { if !dc.is_empty() { - uri.push_str(&format!("{}dc={}", separator, dc)); + uri.push_str(&format!("{separator}dc={dc}")); separator = '&'; } } if let Some(idx) = query_opts.index { - uri.push_str(&format!("{}index={}", separator, idx)); + uri.push_str(&format!("{separator}index={idx}")); separator = '&'; if let Some(wait) = query_opts.wait { uri.push_str(&format!( @@ -1168,7 +1259,7 @@ mod tests { for sn in list_response.response.iter() { let dereg_request = DeregisterEntityRequest { - node: "local".into(), + node: "local", service_id: Some(sn.service.id.as_str()), ..Default::default() }; diff --git a/src/types.rs b/src/types.rs index 662d668..70c6966 100644 --- a/src/types.rs +++ b/src/types.rs @@ -507,6 +507,7 @@ pub struct ServiceNode { #[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// The node information of an instance providing a Consul service. +/// provided by the Consul Health API pub struct Node { /// The ID of the service node. #[serde(rename = "ID")] @@ -525,6 +526,30 @@ pub struct Node { pub meta: HashMap, } +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +/// The node information as returned by the Consul Catalog API +pub struct NodeFull { + id: String, + node: String, + address: String, + datacenter: String, + tagged_addresses: HashMap, + node_meta: HashMap, + create_index: u64, + modify_index: u64, + service_address: Option, + service_enable_tag_override: Option, + #[serde(rename = "Service_ID")] + service_id: Option, + service_name: Option, + service_port: Option, + service_meta: HashMap, + service_tagged_addresses: HashMap, + service_tags: Vec, + namespace: Option, +} + #[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// The service information of an instance providing a Consul service.