From c31460e4f54a865d00637c92003af9ee7c386d22 Mon Sep 17 00:00:00 2001 From: Jean Demeusy <61140535+jeandemeusy@users.noreply.github.com> Date: Tue, 5 Mar 2024 12:41:34 +0100 Subject: [PATCH] Prevent app from breaking when nodes are not reachable (#479) * added timeout on api calls * ensured all api calls are behind "connectguard" decorator * fix timeout * Catching any other exception from API * func renamed * node address access thread safe * ensure timeout it used on any http post request * Update ct-app/core/core.py Co-authored-by: Tibor <9529609+Teebor-Choka@users.noreply.github.com> --------- Co-authored-by: Tibor <9529609+Teebor-Choka@users.noreply.github.com> --- .gitignore | 5 +- ct-app/core/components/hoprd_api.py | 126 ++++++++++++++++++---------- ct-app/core/components/utils.py | 10 ++- ct-app/core/core.py | 6 +- ct-app/core/node.py | 70 +++++++++------- 5 files changed, 133 insertions(+), 84 deletions(-) diff --git a/.gitignore b/.gitignore index e5ca0339..ad1189ad 100755 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ simulation.env .vscode/ *.env *.log - +*_rules.yaml # logs net_viz-*.png @@ -23,5 +23,4 @@ foo*.* foo*/ run_*.sh -.DS_Store -*/test_endurance_rules.yaml +.DS_Store \ No newline at end of file diff --git a/ct-app/core/components/hoprd_api.py b/ct-app/core/components/hoprd_api.py index 0cb7b5c6..17f1ab9b 100644 --- a/ct-app/core/components/hoprd_api.py +++ b/ct-app/core/components/hoprd_api.py @@ -1,4 +1,5 @@ -from typing import Callable, Optional +import asyncio +from typing import Callable, Optional, Union from hoprd_sdk import ApiClient, Configuration from hoprd_sdk.api import ( @@ -40,40 +41,69 @@ def _refresh_token_hook(self): def print_prefix(self) -> str: return "api" - def __call_api( - self, obj: Callable[..., object], method: str, *args, **kwargs + async def __call_api( + self, + obj: Callable[..., object], + method: str, + timeout: int = 60, + *args, + **kwargs, ) -> tuple[bool, Optional[object]]: self.debug( f"Calling {obj.__name__}.{method} with kwargs: {kwargs}, args: {args}" ) - try: - with ApiClient(self.configuration) as client: - api_callback = getattr(obj(client), method) - kwargs["async_req"] = True - thread = api_callback(*args, **kwargs) - response = thread.get() - except ApiException as e: - self.error( - f"ApiException calling {obj.__name__}.{method} " - + f"with kwargs: {kwargs}, args: {args}, error is: {e}" - ) - except OSError: - self.error( - f"OSError calling {obj.__name__}.{method} " - + f"with kwargs: {kwargs}, args: {args}:" + async def __call( + obj: Callable[..., object], + method: str, + *args, + **kwargs, + ): + try: + with ApiClient(self.configuration) as client: + api_callback = getattr(obj(client), method) + kwargs["async_req"] = True + thread = api_callback(*args, **kwargs) + response = thread.get() + + except ApiException as e: + self.error( + f"ApiException calling {obj.__name__}.{method} " + + f"with kwargs: {kwargs}, args: {args}, error is: {e}" + ) + except OSError: + self.error( + f"OSError calling {obj.__name__}.{method} " + + f"with kwargs: {kwargs}, args: {args}:" + ) + except MaxRetryError: + self.error( + f"MaxRetryError calling {obj.__name__}.{method} " + + f"with kwargs: {kwargs}, args: {args}" + ) + except Exception as e: + self.error( + f"Exception calling {obj.__name__}.{method} " + + f"with kwargs: {kwargs}, args: {args}, error is: {e}" + ) + else: + return (True, response) + + return (False, None) + + try: + return await asyncio.wait_for( + asyncio.create_task(__call(obj, method, *args, **kwargs)), + timeout=timeout, ) - except MaxRetryError: + except asyncio.TimeoutError: self.error( - f"MaxRetryError calling {obj.__name__}.{method} " + f"TimeoutError calling {obj.__name__}.{method} " + f"with kwargs: {kwargs}, args: {args}" ) - else: - return (True, response) + return (False, None) - return (False, None) - - async def balances(self, type: str or list[str] = "all"): + async def balances(self, type: Union[str, list[str]] = "all"): """ Returns the balance of the node. :param: type: str = "all" | "hopr" | "native" | "safe_native" | "safe_hopr" @@ -86,7 +116,7 @@ async def balances(self, type: str or list[str] = "all"): elif isinstance(type, str): type = [type] - is_ok, response = self.__call_api(AccountApi, "account_get_balances") + is_ok, response = await self.__call_api(AccountApi, "account_get_balances") if not is_ok: return None @@ -110,7 +140,7 @@ async def open_channel(self, peer_address: str, amount: str): """ body = ChannelsBody(peer_address, amount) - is_ok, response = self.__call_api( + is_ok, response = await self.__call_api( ChannelsApi, "channels_open_channel", body=body ) return response.channel_id if is_ok else None @@ -123,7 +153,7 @@ async def fund_channel(self, channel_id: str, amount: str): :return: bool """ body = ChannelidFundBody(amount=f"{amount:.0f}") - is_ok, _ = self.__call_api( + is_ok, _ = await self.__call_api( ChannelsApi, "channels_fund_channel", channel_id, body=body ) return is_ok @@ -134,7 +164,9 @@ async def close_channel(self, channel_id: str): :param: channel_id: str :return: bool """ - is_ok, _ = self.__call_api(ChannelsApi, "channels_close_channel", channel_id) + is_ok, _ = await self.__call_api( + ChannelsApi, "channels_close_channel", channel_id + ) return is_ok async def incoming_channels(self, only_id: bool = False) -> list: @@ -143,7 +175,7 @@ async def incoming_channels(self, only_id: bool = False) -> list: :return: channels: list """ - is_ok, response = self.__call_api( + is_ok, response = await self.__call_api( ChannelsApi, "channels_get_channels", full_topology="false", @@ -170,7 +202,7 @@ async def outgoing_channels(self, only_id: bool = False): Returns all open outgoing channels. :return: channels: list """ - is_ok, response = self.__call_api(ChannelsApi, "channels_get_channels") + is_ok, response = await self.__call_api(ChannelsApi, "channels_get_channels") if is_ok: if not hasattr(response, "outgoing"): self.warning("Response does not contain 'outgoing'") @@ -193,7 +225,9 @@ async def get_channel(self, channel_id: str): :param: channel_id: str :return: channel: response """ - _, response = self.__call_api(ChannelsApi, "channels_get_channel", channel_id) + _, response = await self.__call_api( + ChannelsApi, "channels_get_channel", channel_id + ) return response async def all_channels(self, include_closed: bool): @@ -202,7 +236,7 @@ async def all_channels(self, include_closed: bool): :param: include_closed: bool :return: channels: list """ - is_ok, response = self.__call_api( + is_ok, response = await self.__call_api( ChannelsApi, "channels_get_channels", full_topology="true", @@ -216,12 +250,12 @@ async def ping(self, peer_id: str): :param: peer_id: str :return: response: dict """ - _, response = self.__call_api(PeersApi, "peers_ping_peer", peer_id) + _, response = await self.__call_api(PeersApi, "peers_ping_peer", peer_id) return response async def peers( self, - params: list or str = "peer_id", + params: Union[list, str] = "peer_id", status: str = "connected", quality: float = 0.5, ): @@ -233,7 +267,9 @@ async def peers( :return: peers: list """ - is_ok, response = self.__call_api(NodeApi, "node_get_peers", quality=quality) + is_ok, response = await self.__call_api( + NodeApi, "node_get_peers", quality=quality + ) if not is_ok: return [] @@ -258,8 +294,8 @@ async def peers( return output_list async def get_address( - self, address: str or list[str] = "hopr" - ) -> Optional[dict[str, str]] or Optional[str]: + self, address: Union[str, list[str]] = "hopr" + ) -> Optional[Union[dict[str, str], str]]: """ Returns the address of the node. :param: address: str = "hopr" | "native" @@ -272,7 +308,7 @@ async def get_address( elif isinstance(address, str): address = [address] - is_ok, response = self.__call_api(AccountApi, "account_get_address") + is_ok, response = await self.__call_api(AccountApi, "account_get_address") if not is_ok: return None @@ -298,7 +334,9 @@ async def send_message( :return: bool """ body = MessagesBody(tag, message, destination, path=hops) - is_ok, _ = self.__call_api(MessagesApi, "messages_send_message", body=body) + is_ok, _ = await self.__call_api( + MessagesApi, "messages_send_message", body=body + ) return is_ok async def messages_pop(self, tag: int = MESSAGE_TAG) -> bool: @@ -309,7 +347,9 @@ async def messages_pop(self, tag: int = MESSAGE_TAG) -> bool: """ body = MessagesPopBody(tag=tag) - _, response = self.__call_api(MessagesApi, "messages_pop_message", body=body) + _, response = await self.__call_api( + MessagesApi, "messages_pop_message", body=body + ) return response async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list: @@ -320,13 +360,13 @@ async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list: """ body = MessagesPopallBody(tag=tag) - _, response = self.__call_api( + _, response = await self.__call_api( MessagesApi, "messages_pop_all_message", body=body ) return response.messages if hasattr(response, "messages") else [] async def node_info(self): - _, response = self.__call_api(NodeApi, "node_get_info") + _, response = await self.__call_api(NodeApi, "node_get_info") return response async def channel_balance(self, src_peer_id: str, dest_peer_id: str) -> float: diff --git a/ct-app/core/components/utils.py b/ct-app/core/components/utils.py index fc7ef45c..3ba6fdc8 100644 --- a/ct-app/core/components/utils.py +++ b/ct-app/core/components/utils.py @@ -68,16 +68,18 @@ def nodesAddresses( return list(addresses), list(keys) @classmethod - async def httpPOST(cls, url, data) -> tuple[int, dict]: - async def post(session: ClientSession, url: str, data: dict): - async with session.post(url, json=data) as response: + async def httpPOST( + cls, url: str, data: dict, timeout: int = 60 + ) -> tuple[int, dict]: + async def post(session: ClientSession, url: str, data: dict, timeout: int): + async with session.post(url, json=data, timeout=timeout) as response: status = response.status response = await response.json() return status, response async with aiohttp.ClientSession() as session: try: - status, response = await post(session, url, data) + status, response = await post(session, url, data, timeout) except Exception: return None, None else: diff --git a/ct-app/core/core.py b/ct-app/core/core.py index ce181bc3..c9c84736 100644 --- a/ct-app/core/core.py +++ b/ct-app/core/core.py @@ -73,8 +73,8 @@ def network_nodes(self) -> list[Node]: return self.nodes[:-1] @property - def network_nodes_addresses(self) -> list[Address]: - return [node.address for node in self.network_nodes] + async def network_nodes_addresses(self) -> list[Address]: + return await asyncio.gather(*[node.address.get() for node in self.network_nodes]) @property def safes_balance_subgraph_type(self) -> SubgraphType: @@ -262,7 +262,7 @@ async def apply_economic_model(self): excluded = Utils.excludeElements(eligibles, low_allowance_addresses) self.debug(f"Excluded nodes with low safe allowance ({len(excluded)} entries).") - excluded = Utils.excludeElements(eligibles, self.network_nodes_addresses) + excluded = Utils.excludeElements(eligibles, await self.network_nodes_addresses) self.debug(f"Excluded network nodes ({len(excluded)} entries).") self.debug(f"Eligible nodes ({len(eligibles)} entries).") diff --git a/ct-app/core/node.py b/ct-app/core/node.py index ce799c7f..0505f771 100644 --- a/ct-app/core/node.py +++ b/ct-app/core/node.py @@ -1,6 +1,5 @@ import asyncio from datetime import datetime -from typing import Optional from prometheus_client import Gauge @@ -63,8 +62,8 @@ def __init__(self, url: str, key: str): self.api: HoprdAPI = HoprdAPI(url, key) self.url = url - self.address: Optional[Address] = None + self.address = LockedVar("address", None, infer_type=False) self.peers = LockedVar("peers", set[Peer]()) self.outgoings = LockedVar("outgoings", []) self.incomings = LockedVar("incomings", []) @@ -76,15 +75,11 @@ def __init__(self, url: str, key: str): self.started = False - @property - async def balance(self) -> dict[str, int]: - return await self.api.balances() - @property def print_prefix(self): return ".".join(self.url.split("//")[-1].split(".")[:2]) - async def _retrieve_address(self): + async def _retrieve_address(self) -> Address: address = await self.api.get_address("all") if not isinstance(address, dict): @@ -93,15 +88,17 @@ async def _retrieve_address(self): if "hopr" not in address or "native" not in address: return - self.address = Address(address["hopr"], address["native"]) + await self.address.set(Address(address["hopr"], address["native"])) + + return await self.address.get() @flagguard @formalin(None) async def healthcheck(self): - await self._retrieve_address() - await self.connected.set(self.address is not None) + node_address = await self._retrieve_address() + await self.connected.set(node_address is not None) - if address := self.address: + if address := node_address: self.debug(f"Connection state: {await self.connected.get()}") HEALTH.labels(address.id).set(int(await self.connected.get())) else: @@ -111,7 +108,7 @@ async def healthcheck(self): @formalin("Retrieving balances") @connectguard async def retrieve_balances(self): - for token, balance in (await self.balance).items(): + for token, balance in (await self.api.balances()).items(): BALANCE.labels(self.address.id, token).set(balance) @flagguard @@ -121,6 +118,8 @@ async def open_channels(self): """ Open channels to discovered_peers. """ + node_address = await self.address.get() + out_opens = [ c for c in await self.outgoings.get() @@ -141,12 +140,12 @@ async def open_channels(self): ) if ok: self.debug(f"Opened channel to {address}") - CHANNELS_OPENED.labels(self.address.id).inc() + CHANNELS_OPENED.labels(node_address.id).inc() else: self.warning(f"Failed to open channel to {address}") - OPEN_CHANNELS_CALLS.labels(self.address.id).inc() + OPEN_CHANNELS_CALLS.labels(node_address.id).inc() - ADDRESSES_WOUT_CHANNELS.labels(self.address.id).set( + ADDRESSES_WOUT_CHANNELS.labels(node_address.id).set( len(addresses_without_channels) ) @@ -157,6 +156,7 @@ async def close_incoming_channels(self): """ Close incoming channels """ + node_address = await self.address.get() in_opens = [ c for c in await self.incomings.get() if ChannelStatus.isOpen(c.status) @@ -167,10 +167,10 @@ async def close_incoming_channels(self): ok = await self.api.close_channel(channel.channel_id) if ok: self.debug(f"Closed channel {channel.channel_id}") - INCOMING_CHANNELS_CLOSED.labels(self.address.id).inc() + INCOMING_CHANNELS_CLOSED.labels(node_address.id).inc() else: self.warning(f"Failed to close channel {channel.channel_id}") - CLOSE_INCOMING_CHANNELS_CALLS.labels(self.address.id).inc() + CLOSE_INCOMING_CHANNELS_CALLS.labels(node_address.id).inc() @flagguard @formalin("Closing pending channels") @@ -179,6 +179,7 @@ async def close_pending_channels(self): """ Close channels in PendingToClose state. """ + node_address = await self.address.get() out_pendings = [ c for c in await self.outgoings.get() if ChannelStatus.isPending(c.status) @@ -191,10 +192,10 @@ async def close_pending_channels(self): ok = await self.api.close_channel(channel.channel_id) if ok: self.debug(f"Closed pending channel {channel.channel_id}") - PENDING_CHANNELS_CLOSED.labels(self.address.id).inc() + PENDING_CHANNELS_CLOSED.labels(node_address.id).inc() else: self.warning(f"Failed to close pending channel {channel.channel_id}") - CLOSE_PENDING_CHANNELS_CALLS.labels(self.address.id).inc() + CLOSE_PENDING_CHANNELS_CALLS.labels(node_address.id).inc() @flagguard @formalin("Closing old channels") @@ -203,6 +204,8 @@ async def close_old_channels(self): """ Close channels that have been open for too long. """ + node_address = await self.address.get() + outgoings = await self.outgoings.get() peer_history: dict[str, datetime] = await self.peer_history.get() to_peer_history = dict[str, datetime]() @@ -238,11 +241,11 @@ async def close_old_channels(self): if ok: self.debug(f"Channel {channel} closed") - OLD_CHANNELS_CLOSED.labels(self.address.id).inc() + OLD_CHANNELS_CLOSED.labels(node_address.id).inc() else: self.warning(f"Failed to close channel {channel_id}") - CLOSE_OLD_CHANNELS_CALLS.labels(self.address.id).inc() + CLOSE_OLD_CHANNELS_CALLS.labels(node_address.id).inc() @flagguard @formalin("Funding channels") @@ -251,6 +254,7 @@ async def fund_channels(self): """ Fund channels that are below minimum threshold. """ + node_address = await self.address.get() out_opens = [ c for c in await self.outgoings.get() if ChannelStatus.isOpen(c.status) @@ -274,10 +278,10 @@ async def fund_channels(self): ) if ok: self.debug(f"Funded channel {channel.channel_id}") - FUNDED_CHANNELS.labels(self.address.id).inc() + FUNDED_CHANNELS.labels(node_address.id).inc() else: self.warning(f"Failed to fund channel {channel.channel_id}") - FUND_CHANNELS_CALLS.labels(self.address.id).inc() + FUND_CHANNELS_CALLS.labels(node_address.id).inc() @flagguard @formalin("Retrieving peers") @@ -286,6 +290,7 @@ async def retrieve_peers(self): """ Retrieve real peers from the network. """ + node_address = await self.address.get() results = await self.api.peers( params=["peer_id", "peer_address", "reported_version"], quality=0.5 @@ -301,7 +306,7 @@ async def retrieve_peers(self): await self.peer_history.update(addresses_w_timestamp) self.debug(f"Peers: {len(peers)}") - PEERS_COUNT.labels(self.address.id).set(len(peers)) + PEERS_COUNT.labels(node_address.id).set(len(peers)) @flagguard @formalin("Retrieving outgoing channels") @@ -311,17 +316,18 @@ async def retrieve_outgoing_channels(self): Retrieve all outgoing channels. """ channels = await self.api.all_channels(False) + node_address = await self.address.get() outgoings = [ c for c in channels.all - if c.source_peer_id == self.address.id + if c.source_peer_id == node_address.id and not ChannelStatus.isClosed(c.status) ] await self.outgoings.set(outgoings) self.debug(f"Outgoing channels: {len(outgoings)}") - OUTGOING_CHANNELS.labels(self.address.id).set(len(outgoings)) + OUTGOING_CHANNELS.labels(node_address.id).set(len(outgoings)) @flagguard @formalin("Retrieving incoming channels") @@ -331,17 +337,18 @@ async def retrieve_incoming_channels(self): Retrieve all incoming channels. """ channels = await self.api.all_channels(False) + node_address = await self.address.get() incomings = [ c for c in channels.all - if c.destination_peer_id == self.address.id + if c.destination_peer_id == node_address.id and not ChannelStatus.isClosed(c.status) ] await self.incomings.set(incomings) self.debug(f"Incoming channels: {len(incomings)}") - INCOMING_CHANNELS.labels(self.address.id).set(len(incomings)) + INCOMING_CHANNELS.labels(node_address.id).set(len(incomings)) @flagguard @formalin("Retrieving total funds") @@ -351,17 +358,18 @@ async def get_total_channel_funds(self): Retrieve total funds. """ channels = await self.outgoings.get() + node_address = await self.address.get() results = await Utils.aggregatePeerBalanceInChannels(channels) - if self.address.id not in results: + if node_address.id not in results: self.warning("Funding info not found") return - entry = TopologyEntry.fromDict(self.address.id, results[self.address.id]) + entry = TopologyEntry.fromDict(node_address.id, results[node_address.id]) self.debug(f"Channels funds: { entry.channels_balance}") - TOTAL_CHANNEL_FUNDS.labels(self.address.id).set(entry.channels_balance) + TOTAL_CHANNEL_FUNDS.labels(node_address.id).set(entry.channels_balance) def tasks(self): self.info("Starting node")