Skip to content

Commit

Permalink
changing get_service_nodes to get_service_nodes_health adding the cat…
Browse files Browse the repository at this point in the history
…alog/service nodes list as the default, also adding get single NodeFull by service and name
  • Loading branch information
rrichardson committed Jun 26, 2023
1 parent d5e3b9b commit 2704704
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 15 deletions.
121 changes: 106 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ const GET_LOCK_METHOD_NAME: &str = "get_lock";
const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity";
const DEREGISTER_ENTITY_METHOD_NAME: &str = "deregister_entity";
const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names";
const GET_SERVICE_NODES_HEALTH_METHOD_NAME: &str = "get_service_nodes_health";
const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes";
const CREATE_SESSION_METHOD_NAME: &str = "create_session";
const GET_DATACENTERS: &str = "get_datacenters";
Expand Down Expand Up @@ -736,9 +737,51 @@ impl Consul {
.uri(uri.clone())
}

/// returns the Node by a given service and name
///
/// If the node is not present, returns none.
///
/// This uses the catalog#list-nodes function in the Consul API
/// then supplies a filter by the Node name
pub async fn get_node_by_name_and_service(
&self,
service: &str,
node_name: &str,
query_opts: Option<QueryOptions>,
) -> Result<Option<ResponseMeta<NodeFull>>> {
let query_opts = query_opts.unwrap_or_default();
let filter = format!("Node == {node_name}");
let request = GetServiceNodesRequest {
service,
near: None,
passing: false,
filter: Some(filter.as_str()),
};
let req = self.build_get_service_nodes_catalog_req(request, &query_opts);
let (mut response_body, index) = self
.execute_request(
req,
hyper::Body::empty(),
query_opts.timeout,
GET_SERVICE_NODES_METHOD_NAME,
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
let mut response = serde_json::from_slice::<Vec<NodeFull>>(&bytes)
.map_err(ConsulError::ResponseDeserializationFailed)?;
if let Some(node) = response.pop() {
Ok(Some(ResponseMeta {
response: node,
index,
}))
} else {
Ok(None)
}
}

/// returns the nodes providing the service indicated on the path.
/// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks.
/// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information.
/// See the [consul docs](https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service) for more information.
/// # Arguments:
/// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest)
/// # Errors:
Expand All @@ -747,9 +790,9 @@ impl Consul {
&self,
request: GetServiceNodesRequest<'_>,
query_opts: Option<QueryOptions>,
) -> Result<ResponseMeta<GetServiceNodesResponse>> {
) -> Result<ResponseMeta<Vec<NodeFull>>> {
let query_opts = query_opts.unwrap_or_default();
let req = self.build_get_service_nodes_req(request, &query_opts);
let req = self.build_get_service_nodes_catalog_req(request, &query_opts);
let (mut response_body, index) = self
.execute_request(
req,
Expand All @@ -759,6 +802,34 @@ impl Consul {
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
let response = serde_json::from_slice::<Vec<NodeFull>>(&bytes)
.map_err(ConsulError::ResponseDeserializationFailed)?;
Ok(ResponseMeta { response, index })
}

/// returns the nodes providing the service indicated on the path.
/// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks.
/// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information.
/// # Arguments:
/// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest)
/// # Errors:
/// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
pub async fn get_service_nodes_health(
&self,
request: GetServiceNodesRequest<'_>,
query_opts: Option<QueryOptions>,
) -> Result<ResponseMeta<GetServiceNodesResponse>> {
let query_opts = query_opts.unwrap_or_default();
let req = self.build_get_service_nodes_health_req(request, &query_opts);
let (mut response_body, index) = self
.execute_request(
req,
hyper::Body::empty(),
query_opts.timeout,
GET_SERVICE_NODES_HEALTH_METHOD_NAME,
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
let response = serde_json::from_slice::<GetServiceNodesResponse>(&bytes)
.map_err(ConsulError::ResponseDeserializationFailed)?;
Ok(ResponseMeta { response, index })
Expand All @@ -775,10 +846,9 @@ impl Consul {
passing: true,
..Default::default()
};
let services = self.get_service_nodes(request, query_opts).await.map_err(|e| {
let services = self.get_service_nodes_health(request, query_opts).await.map_err(|e| {
let err = format!(
"Unable to query consul to resolve service '{}' to a list of addresses and ports: {:?}",
service_name, e
"Unable to query consul to resolve service '{service_name}' to a list of addresses and ports: {e:?}"
);
error!("{}", err);
ConsulError::ServiceInstanceResolutionFailed(service_name.to_string())
Expand Down Expand Up @@ -881,7 +951,28 @@ impl Consul {
serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed)
}

fn build_get_service_nodes_req(
fn build_get_service_nodes_catalog_req(
&self,
request: GetServiceNodesRequest<'_>,
query_opts: &QueryOptions,
) -> http::request::Builder {
let req = hyper::Request::builder().method(Method::GET);
let mut url = String::new();
url.push_str(&format!(
"{}/v1/catalog/service/{}",
self.config.address, request.service
));
if request.passing {
url.push_str(&format!("?passing={}", request.passing));
}
if let Some(filter) = request.filter {
url.push_str(&format!("&filter={filter}"));
}
add_query_option_params(&mut url, query_opts, '&');
req.uri(url)
}

fn build_get_service_nodes_health_req(
&self,
request: GetServiceNodesRequest<'_>,
query_opts: &QueryOptions,
Expand All @@ -896,10 +987,10 @@ impl Consul {
url.push_str(&format!("?passing={}", request.passing));
}
if let Some(near) = request.near {
url.push_str(&format!("&near={}", near));
url.push_str(&format!("&near={near}"));
}
if let Some(filter) = request.filter {
url.push_str(&format!("&filter={}", filter));
url.push_str(&format!("&filter={filter}"));
}
add_query_option_params(&mut url, query_opts, '&');
req.uri(url)
Expand Down Expand Up @@ -985,7 +1076,7 @@ impl Consul {
fn build_create_txn_url(&self, datacenter: Option<&str>) -> String {
let mut url = format!("{}/v1/txn", self.config.address);
if let Some(dc) = datacenter {
url.push_str(&format!("?datacenter={}", dc));
url.push_str(&format!("?datacenter={dc}"));
}
url
}
Expand All @@ -1010,7 +1101,7 @@ impl Consul {
}
if let Some(cas_idx) = request.check_and_set {
url = add_query_param_separator(url, added_query_param);
url.push_str(&format!("cas={}", cas_idx));
url.push_str(&format!("cas={cas_idx}"));
}

add_namespace_and_datacenter(url, request.namespace, request.datacenter)
Expand All @@ -1020,18 +1111,18 @@ impl Consul {
fn add_query_option_params(uri: &mut String, query_opts: &QueryOptions, mut separator: char) {
if let Some(ns) = &query_opts.namespace {
if !ns.is_empty() {
uri.push_str(&format!("{}ns={}", separator, ns));
uri.push_str(&format!("{separator}ns={ns}"));
separator = '&';
}
}
if let Some(dc) = &query_opts.datacenter {
if !dc.is_empty() {
uri.push_str(&format!("{}dc={}", separator, dc));
uri.push_str(&format!("{separator}dc={dc}"));
separator = '&';
}
}
if let Some(idx) = query_opts.index {
uri.push_str(&format!("{}index={}", separator, idx));
uri.push_str(&format!("{separator}index={idx}"));
separator = '&';
if let Some(wait) = query_opts.wait {
uri.push_str(&format!(
Expand Down Expand Up @@ -1168,7 +1259,7 @@ mod tests {

for sn in list_response.response.iter() {
let dereg_request = DeregisterEntityRequest {
node: "local".into(),
node: "local",
service_id: Some(sn.service.id.as_str()),
..Default::default()
};
Expand Down
25 changes: 25 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ pub struct ServiceNode {
#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
/// The node information of an instance providing a Consul service.
/// provided by the Consul Health API
pub struct Node {
/// The ID of the service node.
#[serde(rename = "ID")]
Expand All @@ -525,6 +526,30 @@ pub struct Node {
pub meta: HashMap<String, String>,
}

#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
/// The node information as returned by the Consul Catalog API
pub struct NodeFull {
id: String,
node: String,
address: String,
datacenter: String,
tagged_addresses: HashMap<String, String>,
node_meta: HashMap<String, String>,
create_index: u64,
modify_index: u64,
service_address: Option<String>,
service_enable_tag_override: Option<bool>,
#[serde(rename = "Service_ID")]
service_id: Option<String>,
service_name: Option<String>,
service_port: Option<u16>,
service_meta: HashMap<String, String>,
service_tagged_addresses: HashMap<String, String>,
service_tags: Vec<String>,
namespace: Option<String>,
}

#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
/// The service information of an instance providing a Consul service.
Expand Down

0 comments on commit 2704704

Please sign in to comment.