Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent app from breaking when nodes are not reachable #479

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ simulation.env
.vscode/
*.env
*.log

*_rules.yaml

# logs
net_viz-*.png
Expand All @@ -23,5 +23,4 @@ foo*.*
foo*/
run_*.sh

.DS_Store
*/test_endurance_rules.yaml
.DS_Store
126 changes: 83 additions & 43 deletions ct-app/core/components/hoprd_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Callable, Optional
import asyncio
from typing import Callable, Optional, Union

from hoprd_sdk import ApiClient, Configuration
from hoprd_sdk.api import (
Expand Down Expand Up @@ -40,40 +41,69 @@ def _refresh_token_hook(self):
def print_prefix(self) -> str:
return "api"

def __call_api(
self, obj: Callable[..., object], method: str, *args, **kwargs
async def __call_api(
self,
obj: Callable[..., object],
method: str,
timeout: int = 60,
*args,
**kwargs,
) -> tuple[bool, Optional[object]]:
self.debug(
f"Calling {obj.__name__}.{method} with kwargs: {kwargs}, args: {args}"
)
try:
with ApiClient(self.configuration) as client:
api_callback = getattr(obj(client), method)
kwargs["async_req"] = True
thread = api_callback(*args, **kwargs)
response = thread.get()

except ApiException as e:
self.error(
f"ApiException calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}, error is: {e}"
)
except OSError:
self.error(
f"OSError calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}:"
async def __call(
obj: Callable[..., object],
method: str,
*args,
**kwargs,
):
try:
with ApiClient(self.configuration) as client:
api_callback = getattr(obj(client), method)
kwargs["async_req"] = True
thread = api_callback(*args, **kwargs)
response = thread.get()

except ApiException as e:
self.error(
f"ApiException calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}, error is: {e}"
)
except OSError:
self.error(
f"OSError calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}:"
)
except MaxRetryError:
self.error(
f"MaxRetryError calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}"
)
except Exception as e:
self.error(
f"Exception calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}, error is: {e}"
)
else:
return (True, response)

return (False, None)

try:
return await asyncio.wait_for(
asyncio.create_task(__call(obj, method, *args, **kwargs)),
timeout=timeout,
)
except MaxRetryError:
except asyncio.TimeoutError:
self.error(
f"MaxRetryError calling {obj.__name__}.{method} "
f"TimeoutError calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}"
)
else:
return (True, response)
return (False, None)

return (False, None)

async def balances(self, type: str or list[str] = "all"):
async def balances(self, type: Union[str, list[str]] = "all"):
"""
Returns the balance of the node.
:param: type: str = "all" | "hopr" | "native" | "safe_native" | "safe_hopr"
Expand All @@ -86,7 +116,7 @@ async def balances(self, type: str or list[str] = "all"):
elif isinstance(type, str):
type = [type]

is_ok, response = self.__call_api(AccountApi, "account_get_balances")
is_ok, response = await self.__call_api(AccountApi, "account_get_balances")
if not is_ok:
return None

Expand All @@ -110,7 +140,7 @@ async def open_channel(self, peer_address: str, amount: str):
"""
body = ChannelsBody(peer_address, amount)

is_ok, response = self.__call_api(
is_ok, response = await self.__call_api(
ChannelsApi, "channels_open_channel", body=body
)
return response.channel_id if is_ok else None
Expand All @@ -123,7 +153,7 @@ async def fund_channel(self, channel_id: str, amount: str):
:return: bool
"""
body = ChannelidFundBody(amount=f"{amount:.0f}")
is_ok, _ = self.__call_api(
is_ok, _ = await self.__call_api(
ChannelsApi, "channels_fund_channel", channel_id, body=body
)
return is_ok
Expand All @@ -134,7 +164,9 @@ async def close_channel(self, channel_id: str):
:param: channel_id: str
:return: bool
"""
is_ok, _ = self.__call_api(ChannelsApi, "channels_close_channel", channel_id)
is_ok, _ = await self.__call_api(
ChannelsApi, "channels_close_channel", channel_id
)
return is_ok

async def incoming_channels(self, only_id: bool = False) -> list:
Expand All @@ -143,7 +175,7 @@ async def incoming_channels(self, only_id: bool = False) -> list:
:return: channels: list
"""

is_ok, response = self.__call_api(
is_ok, response = await self.__call_api(
ChannelsApi,
"channels_get_channels",
full_topology="false",
Expand All @@ -170,7 +202,7 @@ async def outgoing_channels(self, only_id: bool = False):
Returns all open outgoing channels.
:return: channels: list
"""
is_ok, response = self.__call_api(ChannelsApi, "channels_get_channels")
is_ok, response = await self.__call_api(ChannelsApi, "channels_get_channels")
if is_ok:
if not hasattr(response, "outgoing"):
self.warning("Response does not contain 'outgoing'")
Expand All @@ -193,7 +225,9 @@ async def get_channel(self, channel_id: str):
:param: channel_id: str
:return: channel: response
"""
_, response = self.__call_api(ChannelsApi, "channels_get_channel", channel_id)
_, response = await self.__call_api(
ChannelsApi, "channels_get_channel", channel_id
)
return response

async def all_channels(self, include_closed: bool):
Expand All @@ -202,7 +236,7 @@ async def all_channels(self, include_closed: bool):
:param: include_closed: bool
:return: channels: list
"""
is_ok, response = self.__call_api(
is_ok, response = await self.__call_api(
ChannelsApi,
"channels_get_channels",
full_topology="true",
Expand All @@ -216,12 +250,12 @@ async def ping(self, peer_id: str):
:param: peer_id: str
:return: response: dict
"""
_, response = self.__call_api(PeersApi, "peers_ping_peer", peer_id)
_, response = await self.__call_api(PeersApi, "peers_ping_peer", peer_id)
return response

async def peers(
self,
params: list or str = "peer_id",
params: Union[list, str] = "peer_id",
status: str = "connected",
quality: float = 0.5,
):
Expand All @@ -233,7 +267,9 @@ async def peers(
:return: peers: list
"""

is_ok, response = self.__call_api(NodeApi, "node_get_peers", quality=quality)
is_ok, response = await self.__call_api(
NodeApi, "node_get_peers", quality=quality
)
if not is_ok:
return []

Expand All @@ -258,8 +294,8 @@ async def peers(
return output_list

async def get_address(
self, address: str or list[str] = "hopr"
) -> Optional[dict[str, str]] or Optional[str]:
self, address: Union[str, list[str]] = "hopr"
) -> Optional[Union[dict[str, str], str]]:
"""
Returns the address of the node.
:param: address: str = "hopr" | "native"
Expand All @@ -272,7 +308,7 @@ async def get_address(
elif isinstance(address, str):
address = [address]

is_ok, response = self.__call_api(AccountApi, "account_get_address")
is_ok, response = await self.__call_api(AccountApi, "account_get_address")
if not is_ok:
return None

Expand All @@ -298,7 +334,9 @@ async def send_message(
:return: bool
"""
body = MessagesBody(tag, message, destination, path=hops)
is_ok, _ = self.__call_api(MessagesApi, "messages_send_message", body=body)
is_ok, _ = await self.__call_api(
MessagesApi, "messages_send_message", body=body
)
return is_ok

async def messages_pop(self, tag: int = MESSAGE_TAG) -> bool:
Expand All @@ -309,7 +347,9 @@ async def messages_pop(self, tag: int = MESSAGE_TAG) -> bool:
"""

body = MessagesPopBody(tag=tag)
_, response = self.__call_api(MessagesApi, "messages_pop_message", body=body)
_, response = await self.__call_api(
MessagesApi, "messages_pop_message", body=body
)
return response

async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list:
Expand All @@ -320,13 +360,13 @@ async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list:
"""

body = MessagesPopallBody(tag=tag)
_, response = self.__call_api(
_, response = await self.__call_api(
MessagesApi, "messages_pop_all_message", body=body
)
return response.messages if hasattr(response, "messages") else []

async def node_info(self):
_, response = self.__call_api(NodeApi, "node_get_info")
_, response = await self.__call_api(NodeApi, "node_get_info")
return response

async def channel_balance(self, src_peer_id: str, dest_peer_id: str) -> float:
Expand Down
10 changes: 6 additions & 4 deletions ct-app/core/components/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@ def nodesAddresses(
return list(addresses), list(keys)

@classmethod
async def httpPOST(cls, url, data) -> tuple[int, dict]:
async def post(session: ClientSession, url: str, data: dict):
async with session.post(url, json=data) as response:
async def httpPOST(
cls, url: str, data: dict, timeout: int = 60
) -> tuple[int, dict]:
async def post(session: ClientSession, url: str, data: dict, timeout: int):
async with session.post(url, json=data, timeout=timeout) as response:
status = response.status
response = await response.json()
return status, response

async with aiohttp.ClientSession() as session:
try:
status, response = await post(session, url, data)
status, response = await post(session, url, data, timeout)
except Exception:
return None, None
else:
Expand Down
6 changes: 3 additions & 3 deletions ct-app/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def network_nodes(self) -> list[Node]:
return self.nodes[:-1]

@property
def network_nodes_addresses(self) -> list[Address]:
return [node.address for node in self.network_nodes]
async def network_nodes_addresses(self) -> list[Address]:
return [await node.address.get() for node in self.network_nodes]

@property
def safes_balance_subgraph_type(self) -> SubgraphType:
Expand Down Expand Up @@ -262,7 +262,7 @@ async def apply_economic_model(self):
excluded = Utils.excludeElements(eligibles, low_allowance_addresses)
self.debug(f"Excluded nodes with low safe allowance ({len(excluded)} entries).")

excluded = Utils.excludeElements(eligibles, self.network_nodes_addresses)
excluded = Utils.excludeElements(eligibles, await self.network_nodes_addresses)
self.debug(f"Excluded network nodes ({len(excluded)} entries).")

self.debug(f"Eligible nodes ({len(eligibles)} entries).")
Expand Down
Loading
Loading