diff --git a/ct-app/README.md b/ct-app/README.md index 18074c51..80611dac 100644 --- a/ct-app/README.md +++ b/ct-app/README.md @@ -139,7 +139,6 @@ Parameter | Recommanded value (staging) | Description `GCP_FILE_PREFIX` | `expected_reward` | File prefix for GCP distribution list storage `GCP_FOLDER` | `staging` | Folder on GCP where to store distribution list `PEER_MIN_VERSION` | `2.0.0` | Minimum node version to be eligible - `RABBITMQ_HOST` | (check Bitwarden) | `RABBITMQ_PASSWORD` | (check Bitwarden) | `RABBITMQ_PROJECT_NAME` | `ct-app` | Name of the RabbitMQ project @@ -155,6 +154,8 @@ Parameter | Recommanded value (staging) | Description `NODE_ADDRESS_X` (multiple, min. 2) | (check Bitwarden) | `NODE_KEY_X` | (check Bitwarden) | +Then there's a bunch of optional flags to enable features of the app (the name of the feature should be explicit enough). +The following enables `Core` features: Then there's a bunch of optional flags to enable features of the app Flag | Recommanded value (staging) diff --git a/ct-app/core/__main__.py b/ct-app/core/__main__.py index d4fceb7b..e1642a44 100644 --- a/ct-app/core/__main__.py +++ b/ct-app/core/__main__.py @@ -23,7 +23,6 @@ def main(configfile: str = None): params.from_env("SUBGRAPH_", "PG", "RABBITMQ_") params.overrides("OVERRIDE_") - Utils.stringArrayToGCP( params.gcp.bucket, Utils.generateFilename("", "startup", "csv"), diff --git a/ct-app/core/components/baseclass.py b/ct-app/core/components/baseclass.py index ae59055a..75451ab5 100644 --- a/ct-app/core/components/baseclass.py +++ b/ct-app/core/components/baseclass.py @@ -9,6 +9,7 @@ class Base: """ Base class for logging and printing messages with different colors. """ + handler = logging.StreamHandler() handler.setFormatter(formatter) @@ -19,7 +20,10 @@ class Base: @property def print_prefix(self) -> str: - return "" + cls = self.__class__ + raise NotImplementedError( + f"print_prefix not implemented for class '{cls.__name__}'" + ) @classmethod def class_prefix(cls) -> str: diff --git a/ct-app/core/components/decorators.py b/ct-app/core/components/decorators.py index e3c131d4..1277c9c3 100644 --- a/ct-app/core/components/decorators.py +++ b/ct-app/core/components/decorators.py @@ -31,10 +31,10 @@ async def wrapper(self, *args, **kwargs): if not hasattr(self.params, "flags"): self.error("No flags available") return - + if not hasattr(self.params.flags, self.class_prefix()): raise AttributeError(f"Feature `{func.__name__}` not in config file") - + class_flags = getattr(self.params.flags, self.class_prefix()) params_raw = dir(class_flags) @@ -71,7 +71,7 @@ async def wrapper(self, *args, **kwargs): if func_name_clean not in params_clean: self.error(f"Feature `{func.__name__}` not regonized") return - + index = params_clean.index(func_name_clean) delay = getattr(class_flags, params_raw[index]) @@ -85,11 +85,10 @@ async def wrapper(self, *args, **kwargs): if delay == 0: break - + if delay is not None: await asyncio.sleep(delay) - return wrapper return decorator diff --git a/ct-app/core/components/environment_utils.py b/ct-app/core/components/environment_utils.py new file mode 100644 index 00000000..42bba94e --- /dev/null +++ b/ct-app/core/components/environment_utils.py @@ -0,0 +1,25 @@ +from os import environ +from typing import Any + +from .baseclass import Base + + +class EnvironmentUtils(Base): + @property + def print_prefix(self) -> str: + return "EnvUtils" + + @classmethod + def envvar(cls, var_name: str, default: Any = None, type: type = str): + if var_name in environ: + return type(environ[var_name]) + else: + return default + + @classmethod + def envvarWithPrefix(cls, prefix: str, type=str) -> dict[str, Any]: + var_dict = { + key: type(v) for key, v in environ.items() if key.startswith(prefix) + } + + return dict(sorted(var_dict.items())) diff --git a/ct-app/core/components/graphql_providers.py b/ct-app/core/components/graphql_providers.py index dc2b2dbf..f6d83e50 100644 --- a/ct-app/core/components/graphql_providers.py +++ b/ct-app/core/components/graphql_providers.py @@ -58,7 +58,9 @@ async def _test_query(self, key: str, **kwargs) -> bool: # call `self._execute(self._sku_query, vars)` with a timeout try: - response = await asyncio.wait_for(self._execute(self._sku_query, vars), timeout=30) + response = await asyncio.wait_for( + self._execute(self._sku_query, vars), timeout=30 + ) except asyncio.TimeoutError: self.error("Query timeout occurred") return False @@ -81,10 +83,12 @@ async def _get(self, key: str, **kwargs) -> dict: vars.update(kwargs) try: - response = await asyncio.wait_for(self._execute(self._sku_query, vars), timeout=30) + response = await asyncio.wait_for( + self._execute(self._sku_query, vars), timeout=30 + ) except asyncio.TimeoutError: self.error("Timeout error while fetching data from subgraph.") - break + break if response is None: break @@ -136,9 +140,10 @@ async def test(self, **kwargs): if result is None: return False - + return result + class SafesProvider(GraphQLProvider): def __init__(self, url: str): super().__init__(url) @@ -175,14 +180,13 @@ def __init__(self, url: str): def print_prefix(self) -> str: return "transaction-provider" + class RewardsProvider(GraphQLProvider): def __init__(self, url: str): super().__init__(url) self._default_key = "accounts" - self._sku_query = self._load_query( - "core/subgraph_queries/rewards.graphql" - ) + self._sku_query = self._load_query("core/subgraph_queries/rewards.graphql") @property def print_prefix(self) -> str: - return "rewards-provider" \ No newline at end of file + return "rewards-provider" diff --git a/ct-app/core/components/hoprd_api.py b/ct-app/core/components/hoprd_api.py index 062083a6..47ea9c6e 100644 --- a/ct-app/core/components/hoprd_api.py +++ b/ct-app/core/components/hoprd_api.py @@ -10,13 +10,7 @@ SendMessageBodyRequest, TagQueryRequest, ) -from hoprd_sdk.api import ( - AccountApi, - ChannelsApi, - MessagesApi, - NetworkApi, - NodeApi, -) +from hoprd_sdk.api import AccountApi, ChannelsApi, MessagesApi, NetworkApi, NodeApi from hoprd_sdk.rest import ApiException from requests import Response from urllib3.exceptions import MaxRetryError @@ -40,7 +34,7 @@ def _refresh_token_hook(self): self.configuration.refresh_api_key_hook = _refresh_token_hook @property - def print_prefix(self) -> str: + def print_prefix(cls) -> str: return "api" async def __call_api( @@ -120,7 +114,7 @@ async def balances(self, type: Union[str, list[str]] = "all"): is_ok, response = await self.__call_api(AccountApi, "balances") if not is_ok: - return None + return {} return_dict = {} @@ -142,9 +136,7 @@ async def open_channel(self, peer_address: str, amount: str): """ body = OpenChannelBodyRequest(amount, peer_address) - is_ok, response = await self.__call_api( - ChannelsApi, "open_channel", body=body - ) + is_ok, response = await self.__call_api(ChannelsApi, "open_channel", body=body) return response.channel_id if is_ok else None @@ -212,7 +204,6 @@ async def outgoing_channels(self, only_id: bool = False): full_topology=False, including_closed=False, ) - if is_ok: if not hasattr(response, "outgoing"): self.warning("Response does not contain 'outgoing'") @@ -241,6 +232,7 @@ async def all_channels(self, include_closed: bool): full_topology="true", including_closed="true" if include_closed else "false", ) + return response if is_ok else [] async def peers( @@ -248,7 +240,7 @@ async def peers( params: Union[list, str] = "peer_id", status: str = "connected", quality: float = 0.5, - ): + ) -> list[dict]: """ Returns a list of peers. :param: param: list or str = "peer_id" @@ -335,7 +327,7 @@ async def messages_pop(self, tag: int = MESSAGE_TAG) -> bool: """ body = TagQueryRequest(tag=tag) _, response = await self.__call_api(MessagesApi, "pop", body=body) - + return response async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list: @@ -350,7 +342,7 @@ async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list: async def node_info(self): _, response = await self.__call_api(NodeApi, "info") - + return response async def ticket_price(self) -> int: diff --git a/ct-app/core/components/parameters.py b/ct-app/core/components/parameters.py index a4dbcb67..aa4a33eb 100644 --- a/ct-app/core/components/parameters.py +++ b/ct-app/core/components/parameters.py @@ -1,11 +1,12 @@ from .baseclass import Base -from .utils import Utils +from .environment_utils import EnvironmentUtils class Parameters(Base): """ Class that represents a set of parameters that can be accessed and modified. The parameters are stored in a dictionary and can be accessed and modified using the dot notation. The parameters can be loaded from environment variables with a specified prefix. """ + def __init__(self): super().__init__() @@ -21,7 +22,7 @@ def parse(self, data: dict): setattr(self, key, value) def overrides(self, prefix: str): - for key, value in Utils.envvarWithPrefix(prefix).items(): + for key, value in EnvironmentUtils.envvarWithPrefix(prefix).items(): path = key.replace(prefix, "").lower().split("_") parent = self @@ -41,7 +42,6 @@ def overrides(self, prefix: str): else: raise KeyError(f"Key {key} not found in parameters") - def from_env(self, *prefixes: list[str]): for prefix in prefixes: subparams_name = prefix.lower() @@ -55,29 +55,22 @@ def from_env(self, *prefixes: list[str]): else: subparams = type(self)() - for key, value in Utils.envvarWithPrefix(prefix).items(): - k = key.replace(prefix, "").lower() - - # convert snake case to camel case - k = k.replace("_", " ").title().replace(" ", "") - k = k[0].lower() + k[1:] - - try: - value = float(value) - except ValueError: - pass - - try: - integer = int(value) - if integer == value: - value = integer - except ValueError: - pass - - setattr(subparams, k, value) + self._parse_env_vars(prefix, subparams) setattr(self, subparams_name, subparams) + def _parse_env_vars(self, prefix, subparams): + for key, value in EnvironmentUtils.envvarWithPrefix(prefix).items(): + k = self._format_key(key, prefix) + value = self._convert(value) + setattr(subparams, k, value) + + def _format_key(self, key, prefix): + k = key.replace(prefix, "").lower() + k = k.replace("_", " ").title().replace(" ", "") + k = k[0].lower() + k[1:] + return k + def _convert(self, value: str): try: value = float(value) diff --git a/ct-app/core/components/utils.py b/ct-app/core/components/utils.py index 2b22ade1..e732e0fc 100644 --- a/ct-app/core/components/utils.py +++ b/ct-app/core/components/utils.py @@ -1,45 +1,35 @@ import csv -import os +import random import time from datetime import datetime, timedelta -from os import environ +from os import path from typing import Any -import aiohttp from aiohttp import ClientSession -from celery import Celery -from google.cloud import storage - from core.model.address import Address from core.model.peer import Peer from core.model.subgraph_entry import SubgraphEntry from core.model.topology_entry import TopologyEntry +from google.cloud import storage from .baseclass import Base +from .channelstatus import ChannelStatus +from .environment_utils import EnvironmentUtils class Utils(Base): - @classmethod - def envvar(cls, var_name: str, default: Any = None, type: type = str): - if var_name in environ: - return type(environ[var_name]) - else: - return default - - @classmethod - def envvarWithPrefix(cls, prefix: str, type=str) -> dict[str, Any]: - var_dict = { - key: type(v) for key, v in environ.items() if key.startswith(prefix) - } - - return dict(sorted(var_dict.items())) - @classmethod def nodesAddresses( cls, address_prefix: str, keyenv: str ) -> tuple[list[str], list[str]]: - addresses = Utils.envvarWithPrefix(address_prefix).values() - keys = Utils.envvarWithPrefix(keyenv).values() + """ + Returns a tuple containing the addresses and keys of the nodes. + :param address_prefix: The prefix of the environment variables containing addresses. + :param keyenv: The prefix of the environment variables containing keys. + :returns: A tuple containing the addresses and keys. + """ + addresses = EnvironmentUtils.envvarWithPrefix(address_prefix).values() + keys = EnvironmentUtils.envvarWithPrefix(keyenv).values() return list(addresses), list(keys) @@ -47,13 +37,20 @@ def nodesAddresses( async def httpPOST( cls, url: str, data: dict, timeout: int = 60 ) -> tuple[int, dict]: + """ + Performs an HTTP POST request. + :param url: The URL to send the request to. + :param data: The data to be sent. + :returns: A tuple containing the status code and the response. + """ + async def post(session: ClientSession, url: str, data: dict, timeout: int): async with session.post(url, json=data, timeout=timeout) as response: status = response.status response = await response.json() return status, response - async with aiohttp.ClientSession() as session: + async with ClientSession() as session: try: status, response = await post(session, url, data, timeout) except Exception: @@ -76,11 +73,16 @@ def mergeDataSources( topo = next(filter(lambda t: t.node_address == address, topology), None) safe = next(filter(lambda s: s.node_address == address, safes), None) - ## TEMP SOLUTION TO ENFORCE DISTRIBUTION TO PEERS NOT LISTED BY THE SUBGRAPH ON STAGING - # if safe is None: - # safe = SubgraphEntry(address, "0.000015", "0x0", "10000") + # TEMP SOLUTION TO ENFORCE DISTRIBUTION TO PEERS NOT LISTED BY THE SUBGRAPH ON STAGING + if safe is None: + safe = SubgraphEntry(address, "0", "0x0", "0") - if peer is None or topo is None or safe is None: + if ( + peer is None + or topo is None + or safe is None + or safe.wxHoprBalance is None + ): continue peer.safe_address = safe.safe_address @@ -97,7 +99,7 @@ def allowManyNodePerSafe(cls, peers: list[Peer]): """ Split the stake managed by a safe address equaly between the nodes that the safe manages. - :param: peer: list of peers + :param peer: list of peers :returns: nothing. """ safe_counts = {peer.safe_address: 0 for peer in peers} @@ -114,9 +116,9 @@ def allowManyNodePerSafe(cls, peers: list[Peer]): def exclude(cls, source_data: list[Peer], blacklist: list[Address]) -> list[Peer]: """ Removes elements from a dictionary based on a blacklist. - :param: source_data (dict): The dictionary to be updated. - :param: blacklist (list): A list containing the keys to be removed. - :returns: nothing. + :param source_data (dict): The dictionary to be updated. + :param blacklist (list): A list containing the keys to be removed. + :returns: A list containing the removed elements. """ addresses = [peer.address for peer in source_data] @@ -134,8 +136,8 @@ def exclude(cls, source_data: list[Peer], blacklist: list[Address]) -> list[Peer def rewardProbability(cls, peers: list[Peer]) -> list[int]: """ Evaluate the function for each stake value in the eligible_peers dictionary. - :param eligible_peers: A dict containing the data. - :returns: nothing. + :param peers: A dict containing the data. + :returns: A list containing the excluded elements due to low stake. """ indexes_to_remove = [ @@ -173,19 +175,28 @@ def stringArrayToGCP(cls, bucket_name: str, blob_name: str, data: list[str]): @classmethod def generateFilename(cls, prefix: str, foldername: str, extension: str = "csv"): + """ + Generates a filename with the following format: + _. + :param prefix: The prefix of the filename + :param foldername: The folder where the file will be stored + :param extension: The extension of the file + :returns: The filename + """ timestamp = time.strftime("%Y%m%d%H%M%S") if extension.startswith("."): extension = extension[1:] filename = f"{prefix}_{timestamp}.{extension}" - return os.path.join(foldername, filename) + return path.join(foldername, filename) @classmethod def nextEpoch(cls, seconds: int) -> datetime: """ Calculates the delay until the next whole `minutes`min and `seconds`sec. :param seconds: next whole second to trigger the function + :returns: The next epoch """ if seconds == 0: raise ValueError("'seconds' must be greater than 0") @@ -200,6 +211,7 @@ def nextDelayInSeconds(cls, seconds: int) -> int: """ Calculates the delay until the next whole `minutes`min and `seconds`sec. :param seconds: next whole second to trigger the function + :returns: The delay in seconds. """ if seconds == 0: return 1 @@ -215,6 +227,8 @@ def nextDelayInSeconds(cls, seconds: int) -> int: async def aggregatePeerBalanceInChannels(cls, channels: list) -> dict[str, dict]: """ Returns a dict containing all unique source_peerId-source_address links. + :param channels: The list of channels. + :returns: A dict containing all peerIds-balanceInChannels links. """ results: dict[str, dict] = {} @@ -223,10 +237,11 @@ async def aggregatePeerBalanceInChannels(cls, channels: list) -> dict[str, dict] hasattr(c, "source_peer_id") and hasattr(c, "source_address") and hasattr(c, "status") + and hasattr(c, "balance") ): continue - if c.status != "Open": + if ChannelStatus(c.status) != ChannelStatus.Open: continue if c.source_peer_id not in results: @@ -240,18 +255,18 @@ async def aggregatePeerBalanceInChannels(cls, channels: list) -> dict[str, dict] return results @classmethod - def taskSendMessage( - cls, - app: Celery, - relayer_id: str, - expected: int, - ticket_price: float, - timestamp: float = None, - attempts: int = 0, - task_name: str = "send_1_hop_message", - ): - app.send_task( - task_name, - args=(relayer_id, expected, ticket_price, timestamp, attempts), - queue="send_messages", - ) + def splitDict(cls, src: dict[str, Any], bins: int) -> list[dict[str, Any]]: + """ + Splits randomly a dict into multiple sub-dictionary of almost equal sizes. + :param src: The dict to be split. + :param bins: The number of sub-dictionaries. + :returns: A list containing the sub-dictionaries. + """ + # Split the dictionary into multiple sub-dictionaries + split = [{} for _ in range(bins)] + + # Assign a random number to each element in the dictionary + for idx, (key, value) in enumerate(random.sample(src.items(), len(src))): + split[idx % bins][key] = value + + return split diff --git a/ct-app/core/core.py b/ct-app/core/core.py index 96d876c3..34566ce7 100644 --- a/ct-app/core/core.py +++ b/ct-app/core/core.py @@ -1,18 +1,23 @@ import asyncio -from copy import deepcopy import random +import time +from copy import deepcopy +from datetime import datetime +from typing import Any -from celery import Celery +from database import Utils as DBUtils +from database.database_connection import DatabaseConnection +from database.models import Reward from prometheus_client import Gauge from .components.baseclass import Base from .components.decorators import connectguard, flagguard, formalin from .components.graphql_providers import ( ProviderError, + RewardsProvider, SafesProvider, StakingProvider, wxHOPRTransactionProvider, - RewardsProvider ) from .components.hoprd_api import HoprdAPI from .components.lockedvar import LockedVar @@ -114,9 +119,7 @@ def api(self) -> HoprdAPI: @property async def network_nodes_addresses(self) -> list[Address]: - return await asyncio.gather( - *[node.address.get() for node in self.nodes] - ) + return await asyncio.gather(*[node.address.get() for node in self.nodes]) @property def subgraph_type(self) -> SubgraphType: @@ -137,7 +140,7 @@ def wxhopr_txs_subgraph_url(self) -> str: @property def rewards_subgraph_url(self) -> str: return self._rewards_subgraph_url(self.subgraph_type) - + @subgraph_type.setter def subgraph_type(self, value: SubgraphType): if value != self.subgraph_type: @@ -266,6 +269,7 @@ async def get_topology_data(self): including the aggregated balance of "Open" outgoing payment channels. """ channels = await self.api.all_channels(False) + if channels is None: self.warning("Topology data not available") return @@ -298,7 +302,7 @@ async def apply_economic_model(self): if not ready: self.warning("Not enough data to apply economic model.") return - + eligibles = Utils.mergeDataSources(topology, peers, registered_nodes) self.debug(f"Merged topology and subgraph data ({len(eligibles)} entries).") @@ -348,7 +352,9 @@ async def apply_economic_model(self): for peer in eligibles: peer.economic_model = deepcopy(model) - peer.economic_model.coefficients.c += redeemed_rewards.get(peer.address.address,0.0) + peer.economic_model.coefficients.c += redeemed_rewards.get( + peer.address.address, 0.0 + ) peer.max_apr = self.params.economicModel.maxAPRPercentage self.debug("Assigned economic model to eligible nodes.") @@ -373,7 +379,7 @@ async def apply_economic_model(self): PEER_SPLIT_STAKE.labels(peer.address.id).set(peer.split_stake) PEER_SAFE_COUNT.labels(peer.address.id).set(peer.safe_address_count) PEER_TF_STAKE.labels(peer.address.id).set(peer.transformed_stake) - PEER_VERSION.labels(peer.address.id, peer.version).set(1) + PEER_VERSION.labels(peer.address.id, str(peer.version)).set(1) @flagguard @formalin("Distributing rewards") @@ -409,21 +415,46 @@ async def distribute_rewards(self): lines = Peer.toCSV(peers) Utils.stringArrayToGCP(self.params.gcp.bucket, filename, lines) - # create celery tasks - app = Celery( - name=self.params.rabbitmq.projectName, - broker=f"amqp://{self.params.rabbitmq.username}:{self.params.rabbitmq.password}@{self.params.rabbitmq.host}/{self.params.rabbitmq.virtualhost}", + # distribute rewards + # randomly split peers into groups, one group per node + self.info("Initiating distribution.") + + t: tuple[dict[str, dict[str, Any]], int] = await self.multiple_attempts_sending( + peers, self.params.distribution.maxIterations ) - app.autodiscover_tasks(force=True) - - for peer in peers: - Utils.taskSendMessage( - app, - peer.address.id, - peer.message_count_for_reward, - peer.economic_model.budget.ticket_price, - task_name=self.params.rabbitmq.taskName, - ) + rewards, iterations = t # trick for typehinting tuple unpacking + self.info("Distribution completed.") + + self.debug(rewards) + self.debug(iterations) + + with DatabaseConnection(self.params.pg) as session: + entries = set[Reward]() + + for peer, values in rewards.items(): + expected = values.get("expected", 0) + remaining = values.get("remaining", 0) + issued = values.get("issued", 0) + effective = expected - remaining + status = "SUCCESS" if remaining < 1 else "TIMEOUT" + + entry = Reward( + peer_id=peer, + node_address="", + expected_count=expected, + effective_count=effective, + status=status, + timestamp=datetime.fromtimestamp(time.time()), + issued_count=issued, + ) + + entries.add(entry) + + session.add_all(entries) + session.commit() + + self.debug(f"Stored {len(entries)} reward entries in database: {entry}") + self.info(f"Distributed rewards to {len(peers)} peers.") EXECUTIONS_COUNTER.inc() @@ -455,6 +486,70 @@ async def get_fundings(self): self.debug(f"Total funding: {total_funding}") TOTAL_FUNDING.set(total_funding) + async def multiple_attempts_sending( + self, peers: list[Peer], max_iterations: int = 4 + ) -> dict[str, dict[str, Any]]: + def _total_messages_to_send(rewards: dict[str, dict[str, int]]) -> int: + return sum( + [max(value.get("remaining", 0), 0) for value in rewards.values()] + ) + + iteration: int = 0 + reward_per_peer = { + peer.address.id: { + "expected": peer.message_count_for_reward, + "remaining": peer.message_count_for_reward, + "issued": 0, + "tag": DBUtils.peerIDToInt(peer.address.id, self.params.pg), + "ticket-price": peer.economic_model.budget.ticket_price, + } # will be retrieved from the API once the endpoint is available in 2.1 + for peer in peers + } + + self.debug(f"Distribution summary: {reward_per_peer}") + + while ( + iteration < max_iterations and _total_messages_to_send(reward_per_peer) > 0 + ): + self.debug("Splitting peers into groups") + peers_groups = Utils.splitDict(reward_per_peer, len(reward_per_peer)) + + # send rewards to peers + self.debug("Sending rewards to peers") + tasks = set[asyncio.Task]() + for node, peers_group in zip(self.nodes, peers_groups): + tasks.add(asyncio.create_task(node.distribute_rewards(peers_group))) + issued_counts: list[dict] = await asyncio.gather(*tasks) + + # wait for message delivery (if needed) + self.debug( + f"Waiting {self.params.distribution.messageDeliveryDelay} for message delivery" + ) + await asyncio.sleep(self.params.distribution.messageDeliveryDelay) + + # check inboxes for relayed messages + self.debug("Checking inboxes") + tasks = set[asyncio.Task]() + for node, peers_group in zip(self.nodes, peers_groups): + tasks.add(asyncio.create_task(node.check_inbox(peers_group))) + relayed_counts: list[dict] = await asyncio.gather(*tasks) + + # for every peer, substract the relayed count from the total count + self.debug("Updating remaining counts") + for peer in reward_per_peer: + reward_per_peer[peer]["remaining"] -= sum( + [res.get(peer, 0) for res in relayed_counts] + ) + reward_per_peer[peer]["issued"] += sum( + [res.get(peer, 0) for res in issued_counts] + ) + + self.debug(f"Iteration {iteration} completed.") + + iteration += 1 + + return reward_per_peer, iteration + @flagguard @formalin("Getting peers rewards amounts") async def get_peers_rewards(self): diff --git a/ct-app/core/model/economic_model.py b/ct-app/core/model/economic_model.py index 54da3456..c4856ce4 100644 --- a/ct-app/core/model/economic_model.py +++ b/ct-app/core/model/economic_model.py @@ -17,6 +17,7 @@ def __init__(self, formula: str, condition: str): def fromParameters(cls, parameters: Parameters): return cls(parameters.formula, parameters.condition) + class Equations: def __init__(self, f_x: Equation, g_x: Equation): self.f_x = f_x @@ -74,7 +75,7 @@ def period(self): @property def distribution_per_period(self): return self._distribution_per_period - + @property def ticket_price(self): return self._ticket_price @@ -108,7 +109,7 @@ def ticket_price(self, value): def winning_probability(self, value): self._winning_probability = value TICKET_WINNING_PROB.set(value) - + @classmethod def fromParameters(cls, parameters: Parameters): return cls( @@ -154,10 +155,10 @@ def delay_between_distributions(self): @classmethod def fromParameters(cls, parameters: Parameters): return EconomicModel( - Equations.fromParameters(parameters.equations), - Coefficients.fromParameters(parameters.coefficients), + Equations.fromParameters(parameters.equations), + Coefficients.fromParameters(parameters.coefficients), Budget.fromParameters(parameters.budget), ) - + def __repr__(self): return f"EconomicModel({self.equations}, {self.coefficients}, {self.budget})" diff --git a/ct-app/core/model/peer.py b/ct-app/core/model/peer.py index 58576119..7836cc20 100644 --- a/ct-app/core/model/peer.py +++ b/ct-app/core/model/peer.py @@ -156,7 +156,7 @@ def message_count_for_reward(self): budget = self.economic_model.budget denominator = budget.ticket_price * budget.winning_probability - return round(self.protocol_reward_per_distribution / denominator) + return round(self.protocol_reward_per_distribution / denominator) @property def apr_percentage(self): diff --git a/ct-app/core/model/subgraph_url.py b/ct-app/core/model/subgraph_url.py index 258c44d4..1b5cd0f2 100644 --- a/ct-app/core/model/subgraph_url.py +++ b/ct-app/core/model/subgraph_url.py @@ -1,4 +1,5 @@ from core.components.parameters import Parameters + from .subgraph_type import SubgraphType @@ -24,4 +25,4 @@ def _construct_backup(self): return self.param_set.URLBackup def __call__(self, type: SubgraphType) -> str: - return self._urls[type] + return self._urls.get(type, None) diff --git a/ct-app/core/node.py b/ct-app/core/node.py index f62ccbcf..b6d023d9 100644 --- a/ct-app/core/node.py +++ b/ct-app/core/node.py @@ -1,4 +1,5 @@ import asyncio +import time from datetime import datetime from prometheus_client import Gauge @@ -6,13 +7,12 @@ from .components.baseclass import Base from .components.channelstatus import ChannelStatus from .components.decorators import connectguard, flagguard, formalin -from .components.hoprd_api import HoprdAPI +from .components.hoprd_api import MESSAGE_TAG, HoprdAPI from .components.lockedvar import LockedVar from .components.parameters import Parameters from .components.utils import Utils from .model.address import Address from .model.peer import Peer -from .model.topology_entry import TopologyEntry BALANCE = Gauge("balance", "Node balance", ["peer_id", "token"]) PEERS_COUNT = Gauge("peers_count", "Node peers", ["peer_id"]) @@ -138,6 +138,8 @@ async def retrieve_balances(self): for token, balance in balances.items(): BALANCE.labels(addr.id, token).set(balance) + return balances + @flagguard @formalin("Opening channels") @connectguard @@ -154,7 +156,11 @@ async def open_channels(self): ] addresses_with_channels = {c.destination_address for c in out_opens} - all_addresses = {p.address.address for p in await self.peers.get()} + all_addresses = { + p.address.address + for p in await self.peers.get() + if not p.version_is_old(self.params.peer.minVersion) + } addresses_without_channels = all_addresses - addresses_with_channels self.debug(f"Addresses without channels: {len(addresses_without_channels)}") @@ -209,7 +215,6 @@ async def close_pending_channels(self): """ Close channels in PendingToClose state. """ - node_address = await self.address.get() out_pendings = [ c for c in await self.outgoings.get() if ChannelStatus.isPending(c.status) @@ -331,6 +336,7 @@ async def retrieve_peers(self): results = await self.api.peers( params=["peer_id", "peer_address", "reported_version"], quality=0.5 ) + peers = { Peer(item["peer_id"], item["peer_address"], item["reported_version"]) for item in results @@ -364,8 +370,7 @@ async def retrieve_outgoing_channels(self): outgoings = [ c for c in channels.all - if c.source_peer_id == addr.id - and not ChannelStatus.isClosed(c.status) + if c.source_peer_id == addr.id and not ChannelStatus.isClosed(c.status) ] await self.outgoings.set(outgoings) @@ -410,7 +415,7 @@ async def get_total_channel_funds(self): if node_address is None: return - + results = await Utils.aggregatePeerBalanceInChannels(channels) self.debug(f"Results of channels balance aggregation: {results}") @@ -419,10 +424,125 @@ async def get_total_channel_funds(self): self.warning("Funding info not found") return - entry = TopologyEntry.fromDict(node_address.id, results[node_address.id]) + funds = results[node_address.id].get("channels_balance", 0) + + self.debug(f"Channels funds: {funds}") + TOTAL_CHANNEL_FUNDS.labels(node_address.id).set(funds) + + return funds + + async def _delay_message( + self, + relayer: str, + message: str, + tag: int, + sleep: float, + ): + await asyncio.sleep(sleep) + + if node_address := await self.address.get(): + return await self.api.send_message(node_address.id, message, [relayer], tag) + return False + + async def distribute_rewards( + self, peer_group: dict[str, dict[str, int]] + ) -> dict[str, int]: + # format of peer group is: + # { + # "0x1212": { + # "remaining": 10, + # "tag": 49, + # "ticket-price": 0.1, + # ... + # }, + # ... + # } + + issued_count = {peer_id: 0 for peer_id in peer_group.keys()} + node_address = await self.address.get() + + if node_address is None: + return issued_count + + for relayer, data in peer_group.items(): + remaining = data.get("remaining", 0) + tag = data.get("tag", None) + ticket_price = data.get("ticket-price", None) + + if remaining == 0: + continue + + if tag is None or ticket_price is None: + self.error( + "Invalid peer in group when sending messages (missing data)" + ) # should never happen + continue + + channel_balance = await self.api.channel_balance(node_address.id, relayer) + max_possible = int(min(remaining, channel_balance // ticket_price)) + + if max_possible == 0: + self.warning( + f"Channel balance to {relayer} doesn't allow to send any message" + ) + continue + + tasks = set[asyncio.Task]() + + for it in range(max_possible): + sleep = it * self.params.distribution.delayBetweenTwoMessages + message = f"{relayer}//{time.time()}-{it + 1}/{max_possible}" + + task = asyncio.create_task( + self._delay_message( + relayer, + message, + MESSAGE_TAG + tag, + sleep, + ) + ) + tasks.add(task) + + issued = await asyncio.gather(*tasks) + issued_count[relayer] = sum(issued) + + self.debug(f"Sent {issued_count[relayer]} messages through {relayer}") + + return issued_count + + async def check_inbox( + self, peer_group: dict[str, dict[str, int]] + ) -> dict[str, int]: + # format of peer group is: + # { + # "0x1212": { + # "remaining": 10, + # "tag": 49, + # "ticket-price": 0.1, + # ... + # }, + # ... + # } + relayed_count = {peer_id: 0 for peer_id in peer_group.keys()} + + for relayer, data in peer_group.items(): + tag = data.get("tag", None) + if tag is None: + self.error( + "Invalid peer in group when querying inbox (missing tag)" + ) # should never happen + continue + if not isinstance(tag, int): + self.error( + f"Invalid peer in group when querying inbox (invalid tag: '{tag}')" + ) + continue + + messages = await self.api.messages_pop_all(MESSAGE_TAG + tag) + relayed_count[relayer] = len(messages) + self.debug(f"{relayed_count[relayer]} messages relayed by {relayer}") - self.debug(f"Channels funds: { entry.channels_balance}") - TOTAL_CHANNEL_FUNDS.labels(node_address.id).set(entry.channels_balance) + return relayed_count def tasks(self): self.info("Starting node") diff --git a/ct-app/database/__init__.py b/ct-app/database/__init__.py index b6f5ab28..b89b8e1b 100644 --- a/ct-app/database/__init__.py +++ b/ct-app/database/__init__.py @@ -1,4 +1,12 @@ from .database_connection import DatabaseConnection from .models import Base, NodePeerConnection, Peer, Reward +from .utils import Utils -__all__ = ["DatabaseConnection", "Base", "NodePeerConnection", "Reward", "Peer"] +__all__ = [ + "DatabaseConnection", + "Base", + "NodePeerConnection", + "Reward", + "Peer", + "Utils", +] diff --git a/ct-app/database/database_connection.py b/ct-app/database/database_connection.py index 3f981108..6f944764 100644 --- a/ct-app/database/database_connection.py +++ b/ct-app/database/database_connection.py @@ -27,7 +27,7 @@ def __init__(self, params: Parameters): host=params.host, port=params.port, database=params.database, - query={} + query={}, ) self.engine = create_engine(url) diff --git a/ct-app/database/utils.py b/ct-app/database/utils.py new file mode 100644 index 00000000..bc7122e5 --- /dev/null +++ b/ct-app/database/utils.py @@ -0,0 +1,19 @@ +from core.components.parameters import Parameters + +from .database_connection import DatabaseConnection +from .models import Peer + + +class Utils: + @classmethod + def peerIDToInt(cls, peer_id: str, db_params: Parameters) -> int: + with DatabaseConnection(db_params) as session: + existing_peer = session.query(Peer).filter_by(peer_id=peer_id).first() + + if existing_peer: + return existing_peer.id + else: + new_peer = Peer(peer_id=peer_id) + session.add(new_peer) + session.commit() + return new_peer.id diff --git a/ct-app/postman/__init__.py b/ct-app/postman/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/ct-app/postman/postman_tasks.py b/ct-app/postman/postman_tasks.py deleted file mode 100644 index cd15f3bb..00000000 --- a/ct-app/postman/postman_tasks.py +++ /dev/null @@ -1,188 +0,0 @@ -import asyncio -import logging -import time -from datetime import datetime - -from billiard import current_process -from celery import Celery -from core.components.hoprd_api import HoprdAPI -from core.components.parameters import Parameters -from core.components.utils import Utils -from database import DatabaseConnection, Reward - -from .task_status import TaskStatus -from .utils import Utils as PMUtils -import yaml - -log = logging.getLogger() -log.setLevel(logging.INFO) - - -with open(Utils.envvar("CONFIG_FILE_PATH", type= str), "r") as file: - config = yaml.safe_load(file) - -params = Parameters() -params.parse(config) -params.from_env("PG", "RABBITMQ_") -params.overrides("OVERRIDE_") - -app = Celery( - name=params.rabbitmq.projectName, - broker=f"amqp://{params.rabbitmq.username}:{params.rabbitmq.password}@{params.rabbitmq.host}/{params.rabbitmq.virtualhost}", -) -app.autodiscover_tasks(force=True) - - -@app.task(name="send_1_hop_message") -def send_1_hop_message( - peer: str, - expected: int, - ticket_price: float, - timestamp: float = None, - attempts: int = 0, -): - """ - Celery task to send `messages_count` 1-hop messages to a peer. This method is the - entry point for the celery worker. As the task that is executed - relies on asyncio, we need to run it in a dedicated event loop. The only call this - method does is to run the async method `async_send_1_hop_message`. - :param peer: Peer ID to send messages to. - :param expected: Number of messages to send. - :param ticket_price: Cost of sending a message. - :param timestamp: Timestamp at first iteration. For timeout purposes. - :param attempts: Number of attempts to send the message regardless of the node. - """ - - if timestamp is None: - timestamp = time.time() - - send_status, node_peer_id, (issued, relayed) = asyncio.run( - async_send_1_hop_message(peer, expected, ticket_price, timestamp) - ) - - attempts += 1 # send_status in [TaskStatus.SPLITTED, TaskStatus.SUCCESS] - - if attempts >= params.distribution.maxIterations: - send_status = TaskStatus.TIMEOUT - - if send_status in [TaskStatus.RETRIED, TaskStatus.SPLIT]: - Utils.taskSendMessage( - app, peer, expected - relayed, ticket_price, timestamp, attempts - ) - - # store results in database - if send_status != TaskStatus.RETRIED: - with DatabaseConnection(params.pg) as session: - entry = Reward( - peer_id=peer, - node_address=node_peer_id, - expected_count=expected, - effective_count=relayed, - status=send_status.value, - timestamp=datetime.fromtimestamp(timestamp), - issued_count=issued, - ) - - session.add(entry) - session.commit() - - log.debug(f"Stored reward entry in database: {entry}") - - return send_status - - -async def async_send_1_hop_message( - peer_id: str, - expected_count: int, - ticket_price: float, - timestamp: float, -) -> tuple[TaskStatus, tuple]: - """ - Celery task to send `count` 1-hop messages to a peer in an async manner. A timeout - mecanism is implemented to stop the task if sending a given bunch of messages takes - too long. - :param peer_id: Peer ID to send messages to. - :param expected_count: Number of messages to send. - :param ticket_price: Cost of sending a message. - :param timestamp: Timestamp at first iteration. For timeout purposes. - :param attempts: Number of attempts to send the message regardless of the node. - """ - - # pick the associated node - address = Utils.envvar(f"NODE_ADDRESS_{current_process().index+1}") - key = Utils.envvar(f"NODE_KEY_{current_process().index+1}") - api = HoprdAPI(address, key) - - node_peer_id = await api.get_address("hopr") - - if node_peer_id is None: - log.error("Can't connect to the node") - return TaskStatus.RETRIED, None, (0, 0) - else: - log.info(f"Node peer id: {node_peer_id}") - - # validate balance of peer - balance = await api.channel_balance(node_peer_id, peer_id) - print( - f"Should send {expected_count} messages to {peer_id} with balance {balance=} (ticket price: {ticket_price})" - ) - max_possible = min(expected_count, balance // ticket_price) - - if max_possible == 0: - log.error(f"Balance of {peer_id} doesn't allow to send any message") - return TaskStatus.RETRIED, node_peer_id, (0, 0) - - relayed, issued = await PMUtils.send_messages_in_batches( - api, - peer_id, - max_possible, - node_peer_id, - timestamp, - params, - ) - - status = TaskStatus.SPLIT if relayed < expected_count else TaskStatus.SUCCESS - - log.info( - f"From {node_peer_id} through {peer_id}: relayed {relayed}/{expected_count} (possible: {max_possible})" - ) - - return status, node_peer_id, (issued, relayed) - - -@app.task(name="fake_task") -def fake_task( - peer: str, - expected: int, - ticket_price: float, - timestamp: float = None, - attempts: int = 0, -) -> TaskStatus: - """ - Fake celery task to test if queues are working as expected. - method does is to run the async method `async_send_1_hop_message`. - :param peer: Peer ID to send messages to. - :param expected: Number of messages to send. - :param ticket_price: Cost of sending a message. - :param timestamp: Timestamp at first iteration. For timeout purposes. - :param attempts: Number of attempts to send the message regardless of the node. - """ - - if timestamp is None: - timestamp = time.time() - - address = Utils.envvar(f"NODE_ADDRESS_{current_process().index+1}") - - log.info(f"Fake task execution started at {timestamp}") - log.info(f"{expected} messages ment to be sent through {peer} by {address}") - - Utils.taskStoreFeedback( - app, - peer, - "fake_node", - expected, - 0, - 0, - TaskStatus.SUCCESS.value, - timestamp, - ) diff --git a/ct-app/postman/task_status.py b/ct-app/postman/task_status.py deleted file mode 100644 index d2824867..00000000 --- a/ct-app/postman/task_status.py +++ /dev/null @@ -1,15 +0,0 @@ -from enum import Enum - - -class TaskStatus(Enum): - """ - Enum to represent the status of a task. This status is also used when creating a - task in the outgoing queue. - """ - - DEFAULT = "DEFAULT" - SUCCESS = "SUCCESS" - RETRIED = "RETRIED" - SPLIT = "SPLIT" - TIMEOUT = "TIMEOUT" - FAILED = "FAILED" diff --git a/ct-app/postman/utils.py b/ct-app/postman/utils.py deleted file mode 100644 index 6b1c9da2..00000000 --- a/ct-app/postman/utils.py +++ /dev/null @@ -1,90 +0,0 @@ -import asyncio - -from core.components.hoprd_api import MESSAGE_TAG, HoprdAPI -from core.components.parameters import Parameters -from database import DatabaseConnection, Peer - - -class Utils: - @classmethod - def createBatches(cls, total_count: int, batch_size: int) -> list[int]: - if total_count <= 0: - return [] - - full_batches: int = total_count // batch_size - remainder: int = total_count % batch_size - - return [batch_size] * full_batches + [remainder] * bool(remainder) - - @classmethod - def peerIDToInt(cls, peer_id: str, parameters: Parameters) -> int: - with DatabaseConnection(parameters) as session: - existing_peer = session.query(Peer).filter_by(peer_id=peer_id).first() - - if existing_peer: - return existing_peer.id - else: - new_peer = Peer(peer_id=peer_id) - session.add(new_peer) - session.commit() - return new_peer.id - - @classmethod - async def delayedMessageSend( - cls, - api: HoprdAPI, - recipient: str, - relayer: str, - message: str, - tag: int, - sleep: float, - ): - await asyncio.sleep(sleep) - - return await api.send_message(recipient, message, [relayer], tag) - - @classmethod - async def send_messages_in_batches( - cls, - api: HoprdAPI, - relayer: str, - expected_count: int, - recipient: str, - timestamp: float, - params: Parameters, - ): - - batch_size = params.distribution.batchSize - delay_between_two_messages = params.distribution.delayBetweenTwoMessages - message_delivery_timeout = params.distribution.messageDeliveryDelay - - relayed_count = 0 - issued_count = 0 - - tag = MESSAGE_TAG + cls.peerIDToInt(relayer, params.pg) - - batches = cls.createBatches(expected_count, batch_size) - - for batch_index, batch in enumerate(batches): - tasks = set[asyncio.Task]() - for it in range(batch): - global_index = it + batch_index * batch_size - message = f"{relayer}//{timestamp}-{global_index + 1}/{expected_count}" - sleep = it * delay_between_two_messages - - tasks.add( - asyncio.create_task( - cls.delayedMessageSend( - api, recipient, relayer, message, tag, sleep - ) - ) - ) - - issued_count += sum(await asyncio.gather(*tasks)) - - await asyncio.sleep(message_delivery_timeout) - - messages = await api.messages_pop_all(tag) - relayed_count += len(messages) - - return relayed_count, issued_count diff --git a/ct-app/pytest.ini b/ct-app/pytest.ini new file mode 100644 index 00000000..37d650e8 --- /dev/null +++ b/ct-app/pytest.ini @@ -0,0 +1,11 @@ +[pytest] +filterwarnings = ignore::DeprecationWarning + +minversion = 7.0 + +asyncio_mode = auto + +addopts = -ra -q + +log_cli = true +log_cli_level = INFO diff --git a/ct-app/requirements.txt b/ct-app/requirements.txt index 387eb119..eebcb638 100644 --- a/ct-app/requirements.txt +++ b/ct-app/requirements.txt @@ -1,9 +1,10 @@ -git+https://github.com/hoprnet/hoprd-sdk-python.git@v2.1.0-rc.3 +git+https://github.com/hoprnet/hoprd-sdk-python.git@v2.1.0 aiohttp==3.8.4 numpy==1.25.0 pytest==7.1.3 pytest-mock==3.10.0 pytest-asyncio==0.21.0 +pyYAML>=6.0.1 Requests==2.31.0 validators==0.20.0 celery==5.2.2 diff --git a/ct-app/scripts/core_prod_config.yaml b/ct-app/scripts/core_prod_config.yaml index 47dd9e54..7b9c48a5 100644 --- a/ct-app/scripts/core_prod_config.yaml +++ b/ct-app/scripts/core_prod_config.yaml @@ -16,7 +16,6 @@ flags: getPeersRewards: 300 applyEconomicModel: 120 distributeRewards: 1 - node: healthcheck: 60 @@ -37,7 +36,7 @@ flags: economicModel: minSafeAllowance: -1 maxAPRPercentage: 15.0 - NFTThreshold: ~ + NFTThreshold: 30000 coefficients: a: 1 @@ -54,6 +53,7 @@ economicModel: condition: "x > c" budget: + # one distribution every 30min amount: 190000 period: 2628000 # in seconds s: 1 @@ -65,10 +65,9 @@ economicModel: # ============================================================================= distribution: minEligiblePeers: 100 - messageDeliveryDelay: 10.0 - delayBetweenTwoMessages: 0.1 - batchSize: 200 - maxIterations: 4 + messageDeliveryDelay: 5.0 + delayBetweenTwoMessages: 0.0005 + maxIterations: 3 # ============================================================================= # @@ -92,13 +91,6 @@ channel: fundingAmount: 35 maxAgeSeconds: 172800 -# ============================================================================= -# -# ============================================================================= -rabbitmq: - taskName: send_1_hop_message - projectName: ct-app - # ============================================================================= # # ============================================================================= diff --git a/ct-app/scripts/core_staging_config.yaml b/ct-app/scripts/core_staging_config.yaml index f2dacbb8..3830232e 100644 --- a/ct-app/scripts/core_staging_config.yaml +++ b/ct-app/scripts/core_staging_config.yaml @@ -15,7 +15,7 @@ flags: getNFTHolders: 30 getPeersRewards: 30 applyEconomicModel: 30 - distributeRewards: ~ + distributeRewards: 1 node: healthcheck: 10 @@ -23,12 +23,12 @@ flags: retrieveIncomingChannels: 30 retrieveOutgoingChannels: 30 retrieveBalances: 30 - openChannels: ~ + openChannels: 300 closeOldChannels: ~ closePendingChannels: ~ - fundChannels: ~ + fundChannels: 300 closeIncomingChannels: ~ - getTotalChannelFunds: ~ + getTotalChannelFunds: 300 # ============================================================================= # @@ -40,9 +40,9 @@ economicModel: coefficients: a: 1 - b: 1 + b: 2 c: 3 - l: 0 + l: 0.00001 equations: fx: @@ -53,21 +53,21 @@ economicModel: condition: "x > c" budget: - amount: 400 - period: 1200 - s: 0.25 - countsInPeriod: 1 - ticketPrice: 0.5 # deprecated + # one distribution every 15min + amount: 0.00000000005 + period: 86400 + s: 1 + countsInPeriod: 96 winningProbability: 1 # ============================================================================= # # ============================================================================= distribution: - minEligiblePeers: 500 - messageDeliveryDelay: 10.0 - delayBetweenTwoMessages: 0.2 - maxIterations: 6 + minEligiblePeers: 5 + messageDeliveryDelay: 5.0 + delayBetweenTwoMessages: 0.0005 + maxIterations: 3 # ============================================================================= # @@ -89,14 +89,7 @@ peer: channel: minBalance: 0.05 fundingAmount: 0.2 - maxAgeSeconds: 60 - -# ============================================================================= -# -# ============================================================================= -rabbitmq: - taskName: fake_task - projectName: ct-app + maxAgeSeconds: 86400 # ============================================================================= # diff --git a/ct-app/test/components/test_channelstatus.py b/ct-app/test/components/test_channelstatus.py index ab016260..a249535b 100644 --- a/ct-app/test/components/test_channelstatus.py +++ b/ct-app/test/components/test_channelstatus.py @@ -2,14 +2,14 @@ def test_channelstatus(): - assert ChannelStatus.isPending("PendingToClose") - assert not ChannelStatus.isPending("Open") - assert not ChannelStatus.isPending("Closed") - - assert not ChannelStatus.isOpen("PendingToClose") assert ChannelStatus.isOpen("Open") + assert not ChannelStatus.isOpen("PendingToClose") assert not ChannelStatus.isOpen("Closed") - assert not ChannelStatus.isClosed("PendingToClose") + assert not ChannelStatus.isPending("Open") + assert ChannelStatus.isPending("PendingToClose") + assert not ChannelStatus.isPending("Closed") + assert not ChannelStatus.isClosed("Open") + assert not ChannelStatus.isClosed("PendingToClose") assert ChannelStatus.isClosed("Closed") diff --git a/ct-app/test/components/test_code_for_environment/test_main1.py b/ct-app/test/components/test_code_for_environment/test_main1.py new file mode 100644 index 00000000..14f62f71 --- /dev/null +++ b/ct-app/test/components/test_code_for_environment/test_main1.py @@ -0,0 +1,14 @@ +from core.components.parameters import Parameters + + +def main(): + params = Parameters() + + _var1 = params.group1.var1 + _var2 = params.group1.var2 + _var3 = params.group2.var1 + _var4 = params.var1 + + +if __name__ == "__main__": + main() diff --git a/ct-app/test/components/test_code_for_environment/test_main2.py b/ct-app/test/components/test_code_for_environment/test_main2.py new file mode 100644 index 00000000..278c1fa9 --- /dev/null +++ b/ct-app/test/components/test_code_for_environment/test_main2.py @@ -0,0 +1,12 @@ +from core.components.parameters import Parameters + + +def main(): + params = Parameters() + + _foo1 = params.group1.var1 + _foo2 = params.var2 + + +if __name__ == "__main__": + main() diff --git a/ct-app/test/components/test_decorators.py b/ct-app/test/components/test_decorators.py index 1c645f8d..0f0c6fdd 100644 --- a/ct-app/test/components/test_decorators.py +++ b/ct-app/test/components/test_decorators.py @@ -1,5 +1,4 @@ import asyncio -from copy import deepcopy import pytest from core.components.baseclass import Base @@ -7,22 +6,21 @@ from core.components.lockedvar import LockedVar from core.components.parameters import Parameters -flag_dictionary = { - "flags": { - "fooclass": { - "fooFlagguardFunc": 1, - "fooFormalinFunc": 1 - } - } -} +flag_dictionary = {"flags": {"fooclass": {"fooFlagguardFunc": 1, "fooFormalinFunc": 1}}} + class FooClass(Base): + @property + def print_prefix(self): + return "FooClass" + def __init__(self): super().__init__() self.connected = LockedVar("connected", False) self.started = False self.counter = 0 self.params = Parameters() + self.params.parse(flag_dictionary) @connectguard async def foo_connectguard_func(self): @@ -40,6 +38,7 @@ async def foo_formalin_func(self): self.counter += 1 await asyncio.sleep(0.1) + @pytest.fixture def foo_class(): return FooClass() @@ -58,84 +57,24 @@ async def test_connectguard(foo_class: FooClass): @pytest.mark.asyncio async def test_flagguard(foo_class: FooClass): - res = await foo_class.foo_flagguard_func() - assert res is None - - # delete flag cache so that new flags are retrieved from env - foo_class.params.parse(flag_dictionary) - - res = await foo_class.foo_flagguard_func() - assert res is True - -@pytest.mark.asyncio -async def test_flagguard_missing_flags(foo_class: FooClass): - # reset instance counter - flags = deepcopy(flag_dictionary) - # # # # # # # # # # # # # # # # # # # # - - # should not run - del flags["flags"]["fooclass"]["fooFlagguardFunc"] - - foo_class.params.parse(flags) - - with pytest.raises(AttributeError): - await foo_class.foo_flagguard_func() - - - # reset instance counter - flags = deepcopy(flag_dictionary) - foo_class.params = Parameters() - # # # # # # # # # # # # # # # # # # # # - - del flags["flags"]["fooclass"] - foo_class.params.parse(flags) - - with pytest.raises(AttributeError): - await foo_class.foo_flagguard_func() - - - # reset instance counter - flags = deepcopy(flag_dictionary) - foo_class.params = Parameters() - # # # # # # # # # # # # # # # # # # # # - - foo_class.params.parse(flag_dictionary) foo_class.params.flags.fooclass.fooFlagguardFunc = None + assert await foo_class.foo_flagguard_func() is None - await foo_class.foo_flagguard_func() - - assert foo_class.counter == 0 # counter increased only once + foo_class.params.flags.fooclass.fooFlagguardFunc = 1 + assert await foo_class.foo_flagguard_func() is True @pytest.mark.asyncio async def test_formalin(foo_class: FooClass): - # reset instance counter - foo_class.counter = 0 - # # # # # # # # # # # # # # # # # # # # - - # should run only once - foo_class.params.parse(flag_dictionary) - foo_class.params.flags.fooclass.fooFormalinFunc = 0 - - foo_class.started = True - asyncio.create_task(foo_class.foo_formalin_func()) - await asyncio.sleep(1) - foo_class.started = False - await asyncio.sleep(0.5) - - assert foo_class.counter == 1 # counter increased only once - - # reset instance counter - foo_class.counter = 0 - # # # # # # # # # # # # # # # # # # # # - - # should run twice (every 0.5s in 1.1s) - foo_class.params.flags.fooclass.fooFormalinFunc = 0.5 - - foo_class.started = True - asyncio.create_task(foo_class.foo_formalin_func()) - await asyncio.sleep(1.1) - foo_class.started = False - await asyncio.sleep(0.5) - - assert foo_class.counter == 2 # counter increased twice \ No newline at end of file + async def setup_test(run_time: float, sleep_time: float, expected_count: int): + foo_class.params.flags.fooclass.foo_formalin_func = sleep_time + foo_class.started = True + asyncio.create_task(foo_class.foo_formalin_func()) + await asyncio.sleep(run_time) + foo_class.started = False + await asyncio.sleep(0.5) + assert foo_class.counter == expected_count + foo_class.counter = 0 + + setup_test(1, 0, 1) + setup_test(1.3, 0.5, 2) diff --git a/ct-app/test/components/test_environment_utils.py b/ct-app/test/components/test_environment_utils.py new file mode 100644 index 00000000..150a28e4 --- /dev/null +++ b/ct-app/test/components/test_environment_utils.py @@ -0,0 +1,32 @@ +from test.components.utils import handle_envvars + +from core.components.environment_utils import EnvironmentUtils + + +def test_envvar(): + with handle_envvars( + string_envvar="string-envvar", int_envvar="1", float_envvar="1.0" + ): + assert EnvironmentUtils.envvar("FAKE_STRING_ENVVAR", "default") == "default" + assert EnvironmentUtils.envvar("STRING_ENVVAR", type=str) == "string-envvar" + assert EnvironmentUtils.envvar("INT_ENVVAR", type=int) == 1 + assert EnvironmentUtils.envvar("FLOAT_ENVVAR", type=float) == 1.0 + + +def test_envvarWithPrefix(): + with handle_envvars( + test_envvar_2="2", + test_envvar_1="1", + test_envvar_3="3", + test_envvor_4="4", + ): + assert EnvironmentUtils.envvarWithPrefix("TEST_ENVVAR_") == { + "TEST_ENVVAR_1": "1", + "TEST_ENVVAR_2": "2", + "TEST_ENVVAR_3": "3", + } + assert EnvironmentUtils.envvarWithPrefix("TEST_ENVVAR_", type=int) == { + "TEST_ENVVAR_1": 1, + "TEST_ENVVAR_2": 2, + "TEST_ENVVAR_3": 3, + } diff --git a/ct-app/test/components/test_lockedvar.py b/ct-app/test/components/test_lockedvar.py index f2ab4def..928c7974 100644 --- a/ct-app/test/components/test_lockedvar.py +++ b/ct-app/test/components/test_lockedvar.py @@ -1,7 +1,6 @@ import asyncio import pytest - from core.components.lockedvar import LockedVar @@ -37,6 +36,7 @@ async def test_locker_var_infer_type(): await locked_var.set("string") assert await locked_var.get() == "string" + @pytest.mark.asyncio async def test_locked_var_inc_with_infer_type(): locked_var = LockedVar("test_var", 0, infer_type=True) @@ -45,6 +45,7 @@ async def test_locked_var_inc_with_infer_type(): assert await locked_var.get() == 1.0 + @pytest.mark.asyncio async def test_locked_var_update_with_infer_type(): locked_var = LockedVar("test_var", {}, infer_type=True) @@ -53,4 +54,4 @@ async def test_locked_var_update_with_infer_type(): assert (await locked_var.get())["key"] == 1.0 with pytest.raises(TypeError): - await locked_var.update(10) \ No newline at end of file + await locked_var.update(10) diff --git a/ct-app/test/components/test_parameters.py b/ct-app/test/components/test_parameters.py index 1644ea1c..fc1e7095 100644 --- a/ct-app/test/components/test_parameters.py +++ b/ct-app/test/components/test_parameters.py @@ -1,17 +1,13 @@ import os -from core.components.parameters import Parameters import pytest +from core.components.parameters import Parameters params_from_yaml = { - "parent1": "value1", - "parent2": { - "child1": "value2", - "child2": { - "grandchild1": "value3" - } - } - } + "parent1": "value1", + "parent2": {"child1": "value2", "child2": {"grandchild1": "value3"}}, +} + def test_parse(): params = Parameters() @@ -21,6 +17,7 @@ def test_parse(): assert params.parent2.child1 == "value2" assert params.parent2.child2.grandchild1 == "value3" + def test_overrides(): os.environ["OVERRIDES_PARENT1"] = "value1-override" os.environ["OVERRIDES_PARENT2_CHILD1"] = "value2-override" @@ -38,16 +35,18 @@ def test_overrides(): del os.environ["OVERRIDES_PARENT2_CHILD1"] del os.environ["OVERRIDES_PARENT2_CHILD2_GRANDCHILD1"] + def test_overrides_raises(): os.environ["OVERRIDES_PARENT3"] = "value1-override" params = Parameters() params.parse(params_from_yaml) - + with pytest.raises(KeyError): params.overrides("OVERRIDES_") del os.environ["OVERRIDES_PARENT3"] + def test_from_env(): os.environ["ENVPREFIX_STRING"] = "random-string" os.environ["ENVPREFIX_VALUE"] = "2" @@ -72,9 +71,10 @@ def test_from_env(): del os.environ["ENVPREFIX_DECIMAL"] del os.environ["ENVPREFIX_URL"] + def test__convert(): params = Parameters() assert isinstance(params._convert("1"), int) assert isinstance(params._convert("1.2"), float) - assert isinstance(params._convert("http://localhost:8000"), str) \ No newline at end of file + assert isinstance(params._convert("http://localhost:8000"), str) diff --git a/ct-app/test/components/test_utils.py b/ct-app/test/components/test_utils.py index 27536ec7..14fc2545 100644 --- a/ct-app/test/components/test_utils.py +++ b/ct-app/test/components/test_utils.py @@ -1,59 +1,92 @@ import datetime -import os +import random +from test.components.utils import handle_envvars import pytest from core.components.utils import Utils from core.model.address import Address from core.model.peer import Peer - - -def test_envvar(): - os.environ["STRING_ENVVAR"] = "string-envvar" - os.environ["INT_ENVVAR"] = "1" - os.environ["FLOAT_ENVVAR"] = "1.0" - - assert Utils.envvar("FAKE_STRING_ENVVAR", "default") == "default" - assert Utils.envvar("STRING_ENVVAR", type=str) == "string-envvar" - assert Utils.envvar("INT_ENVVAR", type=int) == 1 - assert Utils.envvar("FLOAT_ENVVAR", type=float) == 1.0 - - del os.environ["STRING_ENVVAR"] - del os.environ["INT_ENVVAR"] - del os.environ["FLOAT_ENVVAR"] - - -def test_envvarWithPrefix(): - os.environ["TEST_ENVVAR_2"] = "2" - os.environ["TEST_ENVVAR_1"] = "1" - os.environ["TEST_ENVVAR_3"] = "3" - os.environ["TEST_ENVVOR_4"] = "3" - - assert Utils.envvarWithPrefix("TEST_ENVVAR_", type=int) == { - "TEST_ENVVAR_1": 1, - "TEST_ENVVAR_2": 2, - "TEST_ENVVAR_3": 3, - } - - del os.environ["TEST_ENVVAR_1"] - del os.environ["TEST_ENVVAR_2"] - del os.environ["TEST_ENVVAR_3"] - del os.environ["TEST_ENVVOR_4"] +from core.model.subgraph_entry import SubgraphEntry +from core.model.topology_entry import TopologyEntry +from hoprd_sdk.models import ChannelInfoResponse + + +@pytest.fixture +def channel_topology(): + return [ + ChannelInfoResponse( + f"{1*1e18:.0f}", + 1, + "channel_1", + 5, + "dst_addr_1", + "dst_1", + "src_addr_1", + "src_1", + "Open", + 0, + ), + ChannelInfoResponse( + f"{2*1e18:.0f}", + 1, + "channel_2", + 5, + "dst_addr_2", + "dst_2", + "src_addr_1", + "src_1", + "Open", + 0, + ), + ChannelInfoResponse( + f"{3*1e18:.0f}", + 1, + "channel_3", + 5, + "dst_addr_3", + "dst_3", + "src_addr_1", + "src_1", + "Closed", + 0, + ), + ChannelInfoResponse( + f"{4*1e18:.0f}", + 1, + "channel_4", + 5, + "dst_addr_1", + "dst_1", + "src_addr_2", + "src_2", + "Open", + 0, + ), + ChannelInfoResponse( + f"{1*1e18:.0f}", + 1, + "channel_5", + 5, + "dst_addr_2", + "dst_2", + "src_addr_2", + "src_2", + "Open", + 0, + ), + ] def test_nodeAddresses(): - os.environ["NODE_ADDRESS_1"] = "address_1" - os.environ["NODE_ADDRESS_2"] = "address_2" - os.environ["NODE_KEY_1"] = "address_1_key" - os.environ["NODE_KEY_2"] = "address_2_key" - - addresses, keys = Utils.nodesAddresses("NODE_ADDRESS_", "NODE_KEY_") - assert addresses == ["address_1", "address_2"] - assert keys == ["address_1_key", "address_2_key"] - - del os.environ["NODE_ADDRESS_1"] - del os.environ["NODE_ADDRESS_2"] - del os.environ["NODE_KEY_1"] - del os.environ["NODE_KEY_2"] + with handle_envvars( + node_address_1="address_1", + node_address_2="address_2", + node_key_1="address_1_key", + node_key_2="address_2_key", + ): + addresses, keys = Utils.nodesAddresses("NODE_ADDRESS_", "NODE_KEY_") + assert addresses == ["address_1", "address_2"] + assert keys == ["address_1_key", "address_2_key"] def test_httpPOST(): @@ -61,7 +94,27 @@ def test_httpPOST(): def test_mergeDataSources(): - pytest.skip("Not implemented") + topology_list = [ + TopologyEntry(None, None, 1), + TopologyEntry("peer_id_2", "address_2", 2), + TopologyEntry("peer_id_3", "address_3", 3), + TopologyEntry("peer_id_4", "address_4", 4), + ] + peers_list = [ + Peer("peer_id_1", "address_1", "1.0.0"), + Peer("peer_id_2", "address_2", "1.1.0"), + Peer("peer_id_3", "address_3", "1.0.2"), + ] + subgraph_list = [ + SubgraphEntry("address_1", "10", "safe_address_1", "1"), + SubgraphEntry("address_2", "10", "safe_address_2", "2"), + SubgraphEntry("address_3", None, "safe_address_3", "3"), + ] + + merged = Utils.mergeDataSources(topology_list, peers_list, subgraph_list) + + print(merged) + assert len(merged) == 1 def test_allowManyNodePerSafe(): @@ -104,8 +157,12 @@ def test_exclude(): assert item.address in blacklist -def test_rewardProbability(): - pass +def test_rewardProbability(peers: list[Peer]): + Utils.rewardProbability(peers) + + splits = [peer.reward_probability for peer in peers] + assert sum(splits) == pytest.approx(1.0) + assert all(splits) def test_stringArrayToGCP(): @@ -141,9 +198,22 @@ def test_nextDelayInSeconds(): assert delay == 1 -def test_aggregatePeerBalanceInChannels(): - pytest.skip("Not implemented") +@pytest.mark.asyncio +async def test_aggregatePeerBalanceInChannels(channel_topology): + results = await Utils.aggregatePeerBalanceInChannels(channel_topology) + + assert len(results) == 2 + assert results["src_1"]["channels_balance"] == 3 + assert results["src_2"]["channels_balance"] == 5 + + +def test_splitDict(): + bins = random.randint(2, 10) + num_elements = random.randint(50, 100) + source_dict = {f"key_{i}": f"value_{i}" for i in range(num_elements)} + result = Utils.splitDict(source_dict, bins) + key_counts = [len(item.keys()) for item in result] -def test_taskSendMessage(): - pytest.skip("Not implemented") \ No newline at end of file + assert len(result) == bins + assert max(key_counts) - min(key_counts) <= 1 diff --git a/ct-app/test/components/utils.py b/ct-app/test/components/utils.py new file mode 100644 index 00000000..8966dfad --- /dev/null +++ b/ct-app/test/components/utils.py @@ -0,0 +1,14 @@ +import os +from contextlib import contextmanager + + +@contextmanager +def handle_envvars(**kwargs): + for key, value in kwargs.items(): + os.environ[key.upper()] = value + + try: + yield + finally: + for key in kwargs.keys(): + del os.environ[key.upper()] diff --git a/ct-app/test/conftest.py b/ct-app/test/conftest.py new file mode 100644 index 00000000..ff7cd398 --- /dev/null +++ b/ct-app/test/conftest.py @@ -0,0 +1,263 @@ +from itertools import repeat +from random import choice, choices, randint +from test.decorators_patches import patches + +import pytest +import yaml +from core.components.parameters import Parameters +from core.model.economic_model import Budget +from core.model.economic_model import Coefficients as Coefficients +from core.model.economic_model import EconomicModel, Equation, Equations +from core.model.peer import Peer +from database import Utils as DBUtils +from hoprd_sdk.models import ChannelInfoResponse, NodeChannelsResponse +from pytest_mock import MockerFixture + +for p in patches: + p.start() + +# needs to be imported after the patches are applied +from core.core import Core # noqa: E402 +from core.node import Node # noqa: E402 + + +class SideEffect: + def __init__(self): + self.it_send_message_success = self.generator_send_message_success() + self.it_channel_balance = self.generator_channel_balance() + self.it_node_balance = self.generator_node_balance() + self.it_inbox_messages = self.generator_inbox_messages() + + @staticmethod + def generator_send_message_success(): + # yields 1 95% of the time and 0 5% of the time + rate = 0.95 + zeros = int(100 * (1 - rate)) + ones = int(100 * rate) + yield from repeat(choice([0] * zeros + [1] * ones)) + + @staticmethod + def generator_channel_balance(): + # yields a random integer between 50 and 100 + yield from repeat(randint(50, 100)) + + @staticmethod + def generator_node_balance(): + # yields a dict with 2 random integers between 1 and 10 + yield from repeat({"hopr": randint(1, 10), "native": randint(1, 10)}) + + @staticmethod + def generator_inbox_messages(): + # yields a list of 10 random characters repeated 2 to 10 times + yield from repeat( + [ + choices("abcdefghijklmnopqrstuvwxyz ", k=10) + for _ in range(randint(2, 10)) + ] + ) + + def send_message_success(self, *args, **kwargs): + return next(self.it_send_message_success) + + def channel_balance(self, *args, **kwargs): + return next(self.it_channel_balance) + + def node_balance(self, *args, **kwargs): + return next(self.it_node_balance) + + def inbox_messages(self, *args, **kwargs): + return next(self.it_inbox_messages) + + +@pytest.fixture +def economic_model() -> EconomicModel: + equations = Equations( + Equation("a * x", "l <= x <= c"), + Equation("a * c + (x - c) ** (1 / b)", "x > c"), + ) + parameters = Coefficients(1, 1, 3, 0) + budget = Budget(100, 15, 0.25, 2, 1) + return EconomicModel(equations, parameters, budget) + + +@pytest.fixture +def peers_raw() -> list[dict]: + return [ + {"peer_id": "id_0", "peer_address": "address_0", "reported_version": "2.0.0"}, + {"peer_id": "id_1", "peer_address": "address_1", "reported_version": "1.7.0"}, + {"peer_id": "id_2", "peer_address": "address_2", "reported_version": "1.0.3"}, + { + "peer_id": "id_3", + "peer_address": "address_3", + "reported_version": "1.0.0-rc.3", + }, + {"peer_id": "id_4", "peer_address": "address_4", "reported_version": "1.0.0"}, + ] + + +@pytest.fixture +def peers(peers_raw: list[dict], economic_model: EconomicModel) -> list[Peer]: + peers = [ + Peer(peer["peer_id"], peer["peer_address"], peer["reported_version"]) + for peer in peers_raw + ] + for peer in peers: + peer.economic_model = economic_model + peer.reward_probability = 0.02 + peer.safe_balance = randint(100, 200) + peer.channel_balance = randint(10, 50) + peer.economic_model.budget.ticket_price = 0.01 + + return peers + + +@pytest.fixture +def addresses() -> list[dict]: + return [ + {"hopr": "id_0", "native": "address_0"}, + {"hopr": "id_1", "native": "address_1"}, + {"hopr": "id_2", "native": "address_2"}, + {"hopr": "id_3", "native": "address_3"}, + {"hopr": "id_4", "native": "address_4"}, + ] + + +@pytest.fixture +async def nodes( + mocker: MockerFixture, + peers: list[Peer], + addresses: list[dict], + peers_raw: list[dict], + channels: NodeChannelsResponse, +) -> list[Node]: + nodes = [ + Node("localhost:9000", "random_key"), + Node("localhost:9001", "random_key"), + Node("localhost:9002", "random_key"), + Node("localhost:9003", "random_key"), + Node("localhost:9004", "random_key"), + ] + for idx, node in enumerate(nodes): + mocker.patch.object( + node.peers, "get", return_value=peers[:idx] + peers[idx + 1 :] + ) + mocker.patch.object(node.api, "get_address", return_value=addresses[idx]) + mocker.patch.object(node.api, "all_channels", return_value=channels) + mocker.patch.object( + node.api, "channel_balance", side_effect=SideEffect().channel_balance + ) + + mocker.patch.object( + node.api, "send_message", side_effect=SideEffect().send_message_success + ) + mocker.patch.object( + node.api, "messages_pop_all", side_effect=SideEffect().inbox_messages + ) + mocker.patch.object(node.api, "balances", side_effect=SideEffect().node_balance) + mocker.patch.object( + node.api, "peers", return_value=peers_raw[:idx] + peers_raw[idx + 1 :] + ) + + mocker.patch.object(node.api, "healthyz", return_value=True) + mocker.patch.object(node.api, "startedz", return_value=True) + mocker.patch.object(node.api, "ticket_price", return_value=0.01) + + setattr(node.params, "distribution", Parameters()) + setattr(node.params.distribution, "delay_between_two_messages", 0.001) + + await node._retrieve_address() + + return nodes + + +@pytest.fixture +def channels(peers: list[Peer]) -> NodeChannelsResponse: + channels = list[ChannelInfoResponse]() + index = 0 + + for src in peers: + for dest in peers: + if src.address == dest.address: + continue + + channels.append( + ChannelInfoResponse( + f"{1*1e18:.0f}", + 1, + f"channel_{index}", + 5, + dest.address.address, + dest.address.id, + src.address.address, + src.address.id, + "Open", + 0, + ) + ) + + index += 1 + + return NodeChannelsResponse(all=channels, incoming=[], outgoing=[]) + + +@pytest.fixture +async def core(mocker: MockerFixture, nodes: list[Node]) -> Core: + core = Core() + + mocker.patch.object(DBUtils, "peerIDToInt", return_value=0) + + params = Parameters() + with open("./test/test_config.yaml", "r") as file: + params.parse(yaml.safe_load(file)) + setattr(params.subgraph, "deployerKey", "foo_deployer_key") + + setattr(params, "pg", Parameters()) + setattr(params.pg, "user", "user") + setattr(params.pg, "password", "password") + setattr(params.pg, "host", "host") + setattr(params.pg, "port", "port") + setattr(params.pg, "database", "database") + + core.post_init(nodes, params) + + await core._retrieve_address() + + return core + + +@pytest.fixture +async def node( + nodes: list[Node], + mocker: MockerFixture, + peers_raw: list[dict], + channels: NodeChannelsResponse, + addresses: dict, +) -> Node: + node = Node("localhost", "random_key") + + mocker.patch.object(node.api, "all_channels", return_value=channels) + mocker.patch.object(node.api, "peers", return_value=peers_raw[1:]) + mocker.patch.object(node.api, "get_address", return_value=addresses[0]) + mocker.patch.object(node.api, "balances", side_effect=SideEffect().node_balance) + mocker.patch.object( + node.api, "channel_balance", side_effect=SideEffect().channel_balance + ) + mocker.patch.object(node.api, "send_message", return_value=1) + mocker.patch.object(node.api, "healthyz", return_value=True) + mocker.patch.object(node.api, "startedz", return_value=True) + + params = Parameters() + with open("./test/test_config.yaml", "r") as file: + params.parse(yaml.safe_load(file)) + setattr(params.subgraph, "deployerKey", "foo_deployer_key") + + node.params = params + + await node.healthcheck() + await node._retrieve_address() + + return node + + +for p in patches: + p.stop() diff --git a/ct-app/test/decorators_patches.py b/ct-app/test/decorators_patches.py new file mode 100644 index 00000000..b04c4e87 --- /dev/null +++ b/ct-app/test/decorators_patches.py @@ -0,0 +1,16 @@ +from unittest.mock import patch + + +def _mock_decorator(f): + def decorated_function(g): + return g + + if callable(f): + return decorated_function(f) + return decorated_function + + +patches = [ + patch("core.components.decorators.formalin", _mock_decorator), + patch("core.components.decorators.flagguard", _mock_decorator), +] diff --git a/ct-app/test/test_config.yaml b/ct-app/test/test_config.yaml new file mode 100644 index 00000000..65c3fcdd --- /dev/null +++ b/ct-app/test/test_config.yaml @@ -0,0 +1,120 @@ +--- +# ============================================================================= +# +# ============================================================================= +flags: + core: + healthcheck: 10 + checkSubgraphURLs: 30 + getFundings: 30 + getTicketPrice: 30 + aggregatePeers: 30 + getTopologyData: 30 + getSubgraphData: 30 + getRegisteredNodes: 30 + getNFTHolders: 30 + getPeersRewards: 30 + applyEconomicModel: 30 + distributeRewards: ~ + + node: + healthcheck: 10 + retrievePeers: 30 + retrieveIncomingChannels: 30 + retrieveOutgoingChannels: 30 + retrieveBalances: 30 + openChannels: ~ + closeOldChannels: ~ + closePendingChannels: ~ + fundChannels: ~ + closeIncomingChannels: ~ + getTotalChannelFunds: ~ + +# ============================================================================= +# +# ============================================================================= +economicModel: + minSafeAllowance: -1 + maxAPRPercentage: 15.0 + NFTThreshold: ~ + + coefficients: + a: 1 + b: 1 + c: 3 + l: 0 + + equations: + fx: + formula: "a * x" + condition: "l <= x <= c" + gx: + formula: "a * c + (x - c) ** (1 / b)" + condition: "x > c" + + budget: + amount: 400 + period: 1200 + s: 1 + countsInPeriod: 1 + ticketPrice: 0.5 # deprecated + winningProbability: 1 + +# ============================================================================= +# +# ============================================================================= +distribution: + minEligiblePeers: 500 + messageDeliveryDelay: 0.2 + delayBetweenTwoMessages: 0.01 + maxIterations: 4 + +# ============================================================================= +# +# ============================================================================= +gcp: + filePrefix: expected_reward + folder: expected_rewards + bucket: hoprnet-ctdapp-staging + +# ============================================================================= +# +# ============================================================================= +peer: + minVersion: '2.0.7' + +# ============================================================================= +# +# ============================================================================= +channel: + minBalance: 0.05 + fundingAmount: 0.2 + maxAgeSeconds: 60 + +# ============================================================================= +# +# ============================================================================= +rabbitmq: + taskName: fake_task + projectName: ct-app + +# ============================================================================= +# +# ============================================================================= +subgraph: + safesBalance: + queryID: query-id-safes + URLBackup: safes_backup_url + + staking: + queryID: query-id-staking + URLBackup: staking_backp_url + + wxHOPRTxs: + queryID: ~ + URLBackup: wxHOPRTxs_backp_url + + rewards: + queryID: ~ + URLBackup: URLBackup_backp_url +... \ No newline at end of file diff --git a/ct-app/test/test_core.py b/ct-app/test/test_core.py new file mode 100644 index 00000000..6bb7afd8 --- /dev/null +++ b/ct-app/test/test_core.py @@ -0,0 +1,92 @@ +import pytest +from core.model.address import Address +from core.model.peer import Peer +from core.model.subgraph_type import SubgraphType + +from .conftest import Core + + +def test__safe_subgraph_url(core: Core): + assert "query-id-safes" in core._safe_subgraph_url(SubgraphType.DEFAULT) + assert core._safe_subgraph_url(SubgraphType.BACKUP) == "safes_backup_url" + assert core._safe_subgraph_url("random") is None + + +@pytest.mark.asyncio +async def test__retrieve_address(core: Core, addresses: list[dict]): + await core._retrieve_address() + + assert core.address in [Address(addr["hopr"], addr["native"]) for addr in addresses] + + +@pytest.mark.asyncio +async def test_core_healthcheck(core: Core): + await core.healthcheck() + + assert await core.connected.get() + + +@pytest.mark.asyncio +async def test_check_subgraph_urls(core: Core): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_aggregate_peers(core: Core, peers: list[Peer]): + assert len(await core.all_peers.get()) == 0 + + # drop manually some peers from nodes + await core.nodes[0].peers.set(set(list(await core.nodes[0].peers.get())[:3])) + await core.nodes[1].peers.set(set(list(await core.nodes[1].peers.get())[2:])) + await core.nodes[2].peers.set(set(list(await core.nodes[2].peers.get())[::2])) + + await core.aggregate_peers() + + assert len(await core.all_peers.get()) == len(peers) + + +@pytest.mark.asyncio +async def test_get_subgraph_data(core: Core): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_get_topology_data(core: Core, peers: list[Peer]): + await core.connected.set(True) + await core.get_topology_data() + + assert len(await core.topology_list.get()) == len(peers) + + +@pytest.mark.asyncio +async def test_apply_economic_model(core: Core): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_get_fundings(core: Core): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_multiple_attempts_sending_stops_by_reward(core: Core, peers: list[Peer]): + max_iter = 20 + rewards, iter = await core.multiple_attempts_sending(peers[:-1], max_iter) + + assert iter < max_iter + assert len(rewards) == len(peers) - 1 + assert all([reward["remaining"] <= 0 for reward in rewards.values()]) + assert all([reward["issued"] >= reward["expected"] for reward in rewards.values()]) + + +@pytest.mark.asyncio +async def test_multiple_attempts_sending_stops_by_max_iter( + core: Core, peers: list[Peer] +): + max_iter = 2 + await core.get_ticket_price() + rewards, iter = await core.multiple_attempts_sending(peers[:-1], max_iter) + + assert iter == max_iter + assert len(rewards) == len(peers) - 1 + assert all([reward["remaining"] >= 0 for reward in rewards.values()]) diff --git a/ct-app/test/test_node.py b/ct-app/test/test_node.py new file mode 100644 index 00000000..46b5e6d0 --- /dev/null +++ b/ct-app/test/test_node.py @@ -0,0 +1,181 @@ +import asyncio +import time +from random import randint, random + +import pytest +from hoprd_sdk.models import NodeChannelsResponse + +from .conftest import Node, Peer + + +@pytest.mark.asyncio +async def test__retrieve_address(node: Node, addresses: dict): + await node._retrieve_address() + address = await node.address.get() + + assert address.address in [addr["native"] for addr in addresses] + assert address.id in [addr["hopr"] for addr in addresses] + + +@pytest.mark.asyncio +async def test_node_healthcheck(node: Node): + await node.healthcheck() + + assert await node.connected.get() + + +@pytest.mark.asyncio +async def test_retrieve_balances(node: Node): + await node.healthcheck() + + balances = await node.retrieve_balances() + + assert balances.get("hopr", None) is not None + assert balances.get("native", None) is not None + assert isinstance(balances.get("hopr"), int) + assert isinstance(balances.get("native"), int) + + +@pytest.mark.asyncio +async def test_open_channels(node: Node): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_close_incoming_channels(node: Node): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_close_pending_channels(node: Node): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_close_old_channels(node: Node): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_fund_channels(node: Node): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_retrieve_peers(node: Node, peers: list[Peer]): + await node.peers.set(set()) + await node.peer_history.set(dict()) + await node.retrieve_peers() + + assert len(await node.peers.get()) == len(peers) - 1 + assert await node.peer_history.get() != dict() + + +@pytest.mark.asyncio +async def test_retrieve_outgoing_channels(node: Node, channels: NodeChannelsResponse): + assert await node.outgoings.get() == [] + + await node.retrieve_outgoing_channels() + + outgoings_from_node = await node.outgoings.get() + outgoings_from_fixture = [ + c for c in channels.all if c.source_peer_id == (await node.address.get()).id + ] + + assert [c.channel_id for c in outgoings_from_node] == [ + c.channel_id for c in outgoings_from_fixture + ] + + +@pytest.mark.asyncio +async def test_retrieve_incoming_channels(node: Node, channels: NodeChannelsResponse): + assert await node.incomings.get() == [] + + await node.retrieve_incoming_channels() + + incomings_from_node = await node.incomings.get() + incomings_from_fixture = [ + c + for c in channels.all + if c.destination_peer_id == (await node.address.get()).id + ] + + assert [c.channel_id for c in incomings_from_node] == [ + c.channel_id for c in incomings_from_fixture + ] + + +@pytest.mark.asyncio +async def test_get_total_channel_funds(node: Node, channels: NodeChannelsResponse): + await node.retrieve_outgoing_channels() + + total_funds_from_node = await node.get_total_channel_funds() + total_funds_from_fixture = sum( + [ + int(c.balance) + for c in channels.all + if c.source_peer_id == (await node.address.get()).id + ] + ) + + assert total_funds_from_fixture / 1e18 == total_funds_from_node + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "num_tasks,sleep", [(randint(10, 20), round(random() * 0.3 + 0.2, 2))] +) +async def test__delay_message(node: Node, num_tasks: int, sleep: float): + tasks = set[asyncio.Task]() + + for idx in range(num_tasks): + tasks.add( + asyncio.create_task( + node._delay_message(idx, "random_relayer", 0, sleep * idx) + ) + ) + + before = time.time() + issued = await asyncio.gather(*tasks) + after = time.time() + + assert after - before <= sleep * num_tasks + assert after - before >= sleep * (num_tasks - 1) + assert sum(issued) == num_tasks + + +@pytest.mark.asyncio +async def test_distribute_rewards(node: Node): + await node.retrieve_peers() + + peer_group = {} + for idx, peer in enumerate(await node.peers.get()): + message_count = randint(4, 10) + + peer_group[peer.address.id] = { + "expected": message_count, + "remaining": message_count, + "issued": 0, + "tag": idx, + "ticket-price": 0.01, + } + + issued_count = await node.distribute_rewards(peer_group) + + assert len(issued_count) == len(peer_group) + assert all([v != 0 for v in issued_count.values()]) + + +@pytest.mark.asyncio +async def test_check_inbox(node: Node): + pytest.skip("Not implemented") + + +@pytest.mark.asyncio +async def test_fromAddressAndKeyLists(node: Node): + addresses = ["LOCALHOST:9091", "LOCALHOST:9092", "LOCALHOST:9093"] + keys = ["key1", "key2", "key3"] + + nodes = Node.fromAddressAndKeyLists(addresses, keys) + + assert len(nodes) == len(addresses) == len(keys) diff --git a/ct-app/tests_endurance/__main__.py b/ct-app/tests_endurance/__main__.py index cf73c47d..164b80da 100644 --- a/ct-app/tests_endurance/__main__.py +++ b/ct-app/tests_endurance/__main__.py @@ -1,7 +1,7 @@ -import yaml import os import click +import yaml from . import * # noqa: F403 from . import EnduranceTest @@ -65,12 +65,12 @@ def main(configfile: str): success = eval(value.get("executor"))(**stage)() except Exception as e: EnduranceTest.error(f"{e.__class__.__name__}: {e}", prefix="\t") - success = False + success = False, "Exception raised" stage_results.append(success) display_success(success) - successful_stages = sum(stage_results) + successful_stages = sum([result[0] for result in stage_results]) scenarios_results.append(successful_stages == num_stages) display_results(successful_stages, num_stages, "test") @@ -85,11 +85,12 @@ def main(configfile: str): del_envvars(global_env.keys()) -def display_success(success: bool): +def display_success(result: tuple[bool, str]): + success, message = result if success: EnduranceTest.success("Test successful", prefix="\t", end="\n" * 2) else: - EnduranceTest.error("Test failed", prefix="\t", end="\n" * 2) + EnduranceTest.error(f"Test failed: {message}", prefix="\t", end="\n" * 2) def display_results(hit: int, total: int, element: str): diff --git a/ct-app/tests_endurance/test_fund_channels.py b/ct-app/tests_endurance/test_fund_channels.py index dbfcc2a5..766d64a6 100644 --- a/ct-app/tests_endurance/test_fund_channels.py +++ b/ct-app/tests_endurance/test_fund_channels.py @@ -4,7 +4,6 @@ from core.components.hoprd_api import HoprdAPI from core.components.utils import Utils -# tools import HoprdAPIHelper, envvar from . import EnduranceTest, Metric diff --git a/ct-app/tests_endurance/test_send_messages.py b/ct-app/tests_endurance/test_send_messages.py index ae5deb3b..7dc35fe2 100644 --- a/ct-app/tests_endurance/test_send_messages.py +++ b/ct-app/tests_endurance/test_send_messages.py @@ -10,6 +10,7 @@ class SendMessages(EnduranceTest): async def on_start(self): self.results = [] + self.tag = random.randint(0, 2**16 - 1) self.api = HoprdAPI(Utils.envvar("API_URL"), Utils.envvar("API_KEY")) self.recipient = await self.api.get_address("hopr") @@ -40,6 +41,7 @@ async def on_start(self): self.info(f"channel: {channel.channel_id}", prefix="\t") self.info(f"status : {channel.status}", prefix="\t") self.info(f"balance: {channel.balance}HOPR", prefix="\t") + self.info(f"tag : {self.tag}", prefix="\t") await self.api.messages_pop_all(Utils.envvar("MESSAGE_TAG", type=int)) diff --git a/helm/ctdapp/templates/configmap-postman.yaml b/helm/ctdapp/templates/configmap-postman.yaml deleted file mode 100644 index 2bfccae2..00000000 --- a/helm/ctdapp/templates/configmap-postman.yaml +++ /dev/null @@ -1,11 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - annotations: - argocd.argoproj.io/sync-wave: "1" - name: postman-config -data: - CONFIG_FILE_PATH: ./scripts/core_{{ .Values.environmentName }}_config.yaml - {{- if .Values.ctdapp.postman.extraEnvVars -}} - {{ .Values.ctdapp.postman.extraEnvVars | toYaml | nindent 2 }} - {{- end }} \ No newline at end of file diff --git a/helm/ctdapp/templates/deployment-core.yaml b/helm/ctdapp/templates/deployment-core.yaml index d2744968..40b85cbf 100644 --- a/helm/ctdapp/templates/deployment-core.yaml +++ b/helm/ctdapp/templates/deployment-core.yaml @@ -44,7 +44,7 @@ spec: - secretRef: name: subgraph - secretRef: - name: rabbitmq-ctdapp + name: postgres args: - python - '-m' diff --git a/helm/ctdapp/templates/deployment-postman.yaml b/helm/ctdapp/templates/deployment-postman.yaml deleted file mode 100644 index 1e342c81..00000000 --- a/helm/ctdapp/templates/deployment-postman.yaml +++ /dev/null @@ -1,50 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - annotations: - argocd.argoproj.io/sync-wave: "4" - name: {{ include "ctdapp.fullname" . }}-postman - labels: - {{- include "ctdapp.labels" . | nindent 4 }} -spec: - replicas: {{ .Values.ctdapp.postman.replicas }} - selector: - matchLabels: - {{- include "ctdapp.selectorLabels" . | nindent 6 }} - app.kubernetes.io/component: postman - template: - metadata: - {{- with .Values.ctdapp.podAnnotations }} - annotations: - {{- toYaml . | nindent 8 }} - {{- end }} - labels: - app.kubernetes.io/component: postman - {{- include "ctdapp.labels" . | nindent 8 }} - {{- with .Values.ctdapp.podLabels }} - {{- toYaml . | nindent 8 }} - {{- end }} - spec: - serviceAccountName: {{ include "ctdapp.serviceAccountName" . }} - containers: - - name: {{ .Chart.Name }} - image: "{{ .Values.ctdapp.postman.repository }}:{{ .Values.ctdapp.postman.tag }}" - imagePullPolicy: {{ .Values.ctdapp.postman.pullPolicy }} - envFrom: - - configMapRef: - name: postman-config - - configMapRef: - name: hoprd-nodes - - secretRef: - name: hoprd-nodes - - secretRef: - name: postgres - - secretRef: - name: rabbitmq-ctdapp - args: - - /bin/sh - - -c - - | - celery -A postman.postman_tasks.app worker -c 5 -n worker@%h -l info -E -Q "send_messages" - resources: - {{- toYaml .Values.ctdapp.postman.resources | nindent 12 }} diff --git a/helm/ctdapp/values.yaml b/helm/ctdapp/values.yaml index 7f617495..cd6fa78b 100644 --- a/helm/ctdapp/values.yaml +++ b/helm/ctdapp/values.yaml @@ -45,16 +45,3 @@ ctdapp: requests: cpu: 250m memory: 256Mi - postman: - repository: europe-west3-docker.pkg.dev/hoprassociation/docker-images/cover-traffic - pullPolicy: Always - tag: "" - replicas: 1 - extraEnvVars: {} - resources: - limits: - cpu: '1' - memory: 1Gi - requests: - cpu: 500m - memory: 512Mi diff --git a/helm/values-prod-blue.yaml b/helm/values-prod-blue.yaml index 0f13aa23..dc5970af 100644 --- a/helm/values-prod-blue.yaml +++ b/helm/values-prod-blue.yaml @@ -11,6 +11,8 @@ config: | on_fail_continue: true allow_recursive: false strategies: [] + inbox: + capacity: 2048 identityPool: funding: diff --git a/helm/values-prod-green.yaml b/helm/values-prod-green.yaml index ab794ac3..8af469dc 100644 --- a/helm/values-prod-green.yaml +++ b/helm/values-prod-green.yaml @@ -11,6 +11,8 @@ config: | on_fail_continue: true allow_recursive: false strategies: [] + inbox: + capacity: 2048 identityPool: funding: diff --git a/helm/values-prod.yaml b/helm/values-prod.yaml index 735d69e2..4520b1c0 100644 --- a/helm/values-prod.yaml +++ b/helm/values-prod.yaml @@ -10,10 +10,7 @@ backup: ctdapp: core: replicas: 1 - tag: v2.2.1 - postman: - replicas: 1 - tag: v2.2.1 + tag: v3.0.0 nodes: NODE_ADDRESS_1: http://ctdapp-blue-node-1:3001 NODE_ADDRESS_2: http://ctdapp-blue-node-2:3001 diff --git a/helm/values-staging-blue.yaml b/helm/values-staging-blue.yaml index a129b908..dda9feb7 100644 --- a/helm/values-staging-blue.yaml +++ b/helm/values-staging-blue.yaml @@ -11,6 +11,8 @@ config: | on_fail_continue: true allow_recursive: false strategies: [] + inbox: + capacity: 2048 identityPool: funding: diff --git a/helm/values-staging-green.yaml b/helm/values-staging-green.yaml index 10f9a3bb..43fdeec0 100644 --- a/helm/values-staging-green.yaml +++ b/helm/values-staging-green.yaml @@ -11,6 +11,8 @@ config: | on_fail_continue: true allow_recursive: false strategies: [] + inbox: + capacity: 2048 identityPool: funding: diff --git a/helm/values-staging.yaml b/helm/values-staging.yaml index 7b6d1ff4..4713bfa3 100644 --- a/helm/values-staging.yaml +++ b/helm/values-staging.yaml @@ -11,9 +11,6 @@ ctdapp: core: replicas: 1 tag: staging - postman: - replicas: 1 - tag: staging nodes: NODE_ADDRESS_1: http://ctdapp-blue-node-1.ctdapp.svc:3001 NODE_ADDRESS_2: http://ctdapp-blue-node-2.ctdapp.svc:3001