Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
ausias-armesto committed Mar 5, 2024
2 parents 14b180b + c31460e commit 075f300
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 88 deletions.
5 changes: 2 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ simulation.env
.vscode/
*.env
*.log

*_rules.yaml

# logs
net_viz-*.png
Expand All @@ -23,5 +23,4 @@ foo*.*
foo*/
run_*.sh

.DS_Store
*/test_endurance_rules.yaml
.DS_Store
4 changes: 2 additions & 2 deletions ct-app/core/components/baseclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

logging.basicConfig()
logging.getLogger("asyncio").setLevel(logging.WARNING)
formatter = logging.Formatter("%(message)s")
formatter = logging.Formatter("%(asctime)s %(levelname)s:%(message)s")


class Base:
Expand All @@ -25,7 +25,7 @@ def class_prefix(cls) -> str:
return f"{cls.__name__.upper()}_"

def __format(self, message: str, color: str = "\033[0m"):
return f"{color}{self.print_prefix}\033[0m | {message}"
return f"{self.print_prefix} | {message}"

def _print(self, message: str, color: str = "\033[0m"):
print(self.__format(message, color))
Expand Down
129 changes: 86 additions & 43 deletions ct-app/core/components/hoprd_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Callable, Optional
import asyncio
from typing import Callable, Optional, Union

from hoprd_sdk import ApiClient, Configuration
from hoprd_sdk.api import (
Expand Down Expand Up @@ -40,37 +41,69 @@ def _refresh_token_hook(self):
def print_prefix(self) -> str:
return "api"

def __call_api(
self, obj: Callable[..., object], method: str, *args, **kwargs
async def __call_api(
self,
obj: Callable[..., object],
method: str,
timeout: int = 60,
*args,
**kwargs,
) -> tuple[bool, Optional[object]]:
try:
with ApiClient(self.configuration) as client:
api_callback = getattr(obj(client), method)
kwargs["async_req"] = True
thread = api_callback(*args, **kwargs)
response = thread.get()
self.debug(
f"Calling {obj.__name__}.{method} with kwargs: {kwargs}, args: {args}"
)

except ApiException as e:
self.error(
f"ApiException calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}, error is: {e}"
)
except OSError:
self.error(
f"OSError calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}:"
async def __call(
obj: Callable[..., object],
method: str,
*args,
**kwargs,
):
try:
with ApiClient(self.configuration) as client:
api_callback = getattr(obj(client), method)
kwargs["async_req"] = True
thread = api_callback(*args, **kwargs)
response = thread.get()

except ApiException as e:
self.error(
f"ApiException calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}, error is: {e}"
)
except OSError:
self.error(
f"OSError calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}:"
)
except MaxRetryError:
self.error(
f"MaxRetryError calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}"
)
except Exception as e:
self.error(
f"Exception calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}, error is: {e}"
)
else:
return (True, response)

return (False, None)

try:
return await asyncio.wait_for(
asyncio.create_task(__call(obj, method, *args, **kwargs)),
timeout=timeout,
)
except MaxRetryError:
except asyncio.TimeoutError:
self.error(
f"MaxRetryError calling {obj.__name__}.{method} "
f"TimeoutError calling {obj.__name__}.{method} "
+ f"with kwargs: {kwargs}, args: {args}"
)
else:
return (True, response)
return (False, None)

return (False, None)

async def balances(self, type: str or list[str] = "all"):
async def balances(self, type: Union[str, list[str]] = "all"):
"""
Returns the balance of the node.
:param: type: str = "all" | "hopr" | "native" | "safe_native" | "safe_hopr"
Expand All @@ -83,7 +116,7 @@ async def balances(self, type: str or list[str] = "all"):
elif isinstance(type, str):
type = [type]

is_ok, response = self.__call_api(AccountApi, "account_get_balances")
is_ok, response = await self.__call_api(AccountApi, "account_get_balances")
if not is_ok:
return None

Expand All @@ -107,7 +140,7 @@ async def open_channel(self, peer_address: str, amount: str):
"""
body = ChannelsBody(peer_address, amount)

is_ok, response = self.__call_api(
is_ok, response = await self.__call_api(
ChannelsApi, "channels_open_channel", body=body
)
return response.channel_id if is_ok else None
Expand All @@ -120,7 +153,7 @@ async def fund_channel(self, channel_id: str, amount: str):
:return: bool
"""
body = ChannelidFundBody(amount=f"{amount:.0f}")
is_ok, _ = self.__call_api(
is_ok, _ = await self.__call_api(
ChannelsApi, "channels_fund_channel", channel_id, body=body
)
return is_ok
Expand All @@ -131,7 +164,9 @@ async def close_channel(self, channel_id: str):
:param: channel_id: str
:return: bool
"""
is_ok, _ = self.__call_api(ChannelsApi, "channels_close_channel", channel_id)
is_ok, _ = await self.__call_api(
ChannelsApi, "channels_close_channel", channel_id
)
return is_ok

async def incoming_channels(self, only_id: bool = False) -> list:
Expand All @@ -140,7 +175,7 @@ async def incoming_channels(self, only_id: bool = False) -> list:
:return: channels: list
"""

is_ok, response = self.__call_api(
is_ok, response = await self.__call_api(
ChannelsApi,
"channels_get_channels",
full_topology="false",
Expand All @@ -167,7 +202,7 @@ async def outgoing_channels(self, only_id: bool = False):
Returns all open outgoing channels.
:return: channels: list
"""
is_ok, response = self.__call_api(ChannelsApi, "channels_get_channels")
is_ok, response = await self.__call_api(ChannelsApi, "channels_get_channels")
if is_ok:
if not hasattr(response, "outgoing"):
self.warning("Response does not contain 'outgoing'")
Expand All @@ -190,7 +225,9 @@ async def get_channel(self, channel_id: str):
:param: channel_id: str
:return: channel: response
"""
_, response = self.__call_api(ChannelsApi, "channels_get_channel", channel_id)
_, response = await self.__call_api(
ChannelsApi, "channels_get_channel", channel_id
)
return response

async def all_channels(self, include_closed: bool):
Expand All @@ -199,7 +236,7 @@ async def all_channels(self, include_closed: bool):
:param: include_closed: bool
:return: channels: list
"""
is_ok, response = self.__call_api(
is_ok, response = await self.__call_api(
ChannelsApi,
"channels_get_channels",
full_topology="true",
Expand All @@ -213,12 +250,12 @@ async def ping(self, peer_id: str):
:param: peer_id: str
:return: response: dict
"""
_, response = self.__call_api(PeersApi, "peers_ping_peer", peer_id)
_, response = await self.__call_api(PeersApi, "peers_ping_peer", peer_id)
return response

async def peers(
self,
params: list or str = "peer_id",
params: Union[list, str] = "peer_id",
status: str = "connected",
quality: float = 0.5,
):
Expand All @@ -230,7 +267,9 @@ async def peers(
:return: peers: list
"""

is_ok, response = self.__call_api(NodeApi, "node_get_peers", quality=quality)
is_ok, response = await self.__call_api(
NodeApi, "node_get_peers", quality=quality
)
if not is_ok:
return []

Expand All @@ -255,8 +294,8 @@ async def peers(
return output_list

async def get_address(
self, address: str or list[str] = "hopr"
) -> Optional[dict[str, str]] or Optional[str]:
self, address: Union[str, list[str]] = "hopr"
) -> Optional[Union[dict[str, str], str]]:
"""
Returns the address of the node.
:param: address: str = "hopr" | "native"
Expand All @@ -269,7 +308,7 @@ async def get_address(
elif isinstance(address, str):
address = [address]

is_ok, response = self.__call_api(AccountApi, "account_get_address")
is_ok, response = await self.__call_api(AccountApi, "account_get_address")
if not is_ok:
return None

Expand All @@ -295,7 +334,9 @@ async def send_message(
:return: bool
"""
body = MessagesBody(tag, message, destination, path=hops)
is_ok, _ = self.__call_api(MessagesApi, "messages_send_message", body=body)
is_ok, _ = await self.__call_api(
MessagesApi, "messages_send_message", body=body
)
return is_ok

async def messages_pop(self, tag: int = MESSAGE_TAG) -> bool:
Expand All @@ -306,7 +347,9 @@ async def messages_pop(self, tag: int = MESSAGE_TAG) -> bool:
"""

body = MessagesPopBody(tag=tag)
_, response = self.__call_api(MessagesApi, "messages_pop_message", body=body)
_, response = await self.__call_api(
MessagesApi, "messages_pop_message", body=body
)
return response

async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list:
Expand All @@ -317,13 +360,13 @@ async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list:
"""

body = MessagesPopallBody(tag=tag)
_, response = self.__call_api(
_, response = await self.__call_api(
MessagesApi, "messages_pop_all_message", body=body
)
return response.messages if hasattr(response, "messages") else []

async def node_info(self):
_, response = self.__call_api(NodeApi, "node_get_info")
_, response = await self.__call_api(NodeApi, "node_get_info")
return response

async def channel_balance(self, src_peer_id: str, dest_peer_id: str) -> float:
Expand Down
10 changes: 6 additions & 4 deletions ct-app/core/components/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@ def nodesAddresses(
return list(addresses), list(keys)

@classmethod
async def httpPOST(cls, url, data) -> tuple[int, dict]:
async def post(session: ClientSession, url: str, data: dict):
async with session.post(url, json=data) as response:
async def httpPOST(
cls, url: str, data: dict, timeout: int = 60
) -> tuple[int, dict]:
async def post(session: ClientSession, url: str, data: dict, timeout: int):
async with session.post(url, json=data, timeout=timeout) as response:
status = response.status
response = await response.json()
return status, response

async with aiohttp.ClientSession() as session:
try:
status, response = await post(session, url, data)
status, response = await post(session, url, data, timeout)
except Exception:
return None, None
else:
Expand Down
9 changes: 5 additions & 4 deletions ct-app/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def network_nodes(self) -> list[Node]:
return self.nodes[:-1]

@property
def network_nodes_addresses(self) -> list[Address]:
return [node.address for node in self.network_nodes]
async def network_nodes_addresses(self) -> list[Address]:
return await asyncio.gather(*[node.address.get() for node in self.network_nodes])

@property
def safes_balance_subgraph_type(self) -> SubgraphType:
Expand Down Expand Up @@ -201,8 +201,8 @@ async def get_subgraph_data(self):
self.debug(f"Fetched subgraph data ({len(results)} entries).")

@flagguard
@connectguard
@formalin("Getting topology data")
@connectguard
async def get_topology_data(self):
"""
Gets a dictionary containing all unique source_peerId-source_address links
Expand Down Expand Up @@ -262,7 +262,7 @@ async def apply_economic_model(self):
excluded = Utils.excludeElements(eligibles, low_allowance_addresses)
self.debug(f"Excluded nodes with low safe allowance ({len(excluded)} entries).")

excluded = Utils.excludeElements(eligibles, self.network_nodes_addresses)
excluded = Utils.excludeElements(eligibles, await self.network_nodes_addresses)
self.debug(f"Excluded network nodes ({len(excluded)} entries).")

self.debug(f"Eligible nodes ({len(eligibles)} entries).")
Expand Down Expand Up @@ -346,6 +346,7 @@ async def distribute_rewards(self):

@flagguard
@formalin("Getting funding data")
@connectguard
async def get_fundings(self):
from_address = self.params.subgraph.from_address
ct_safe_addresses = {
Expand Down
Loading

0 comments on commit 075f300

Please sign in to comment.