diff --git a/CHANGELOG.md b/CHANGELOG.md index 016c2286..399975a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,13 @@ -# 0.21.9 +# 0.22 +- Refactor a lot of functionality to a new submodule `eth_defi.provider` +- Add MEV blocking support in the form of `eth_defi.mev_blocker.MEVBlockerProvider` +- Add JSON-RPC fallback switching in the form of `eth_defi.fallback_provider.FallbackProvider` +- Add `HotWallet.create_for_testing` - Add utility function `get_onchain_price()` to ask on-chain price of a Uniswap v3 pool at any given block number +- Deprecate `eth_defi.anvil` -> `eth_defi.provider.anvil` +- Deprecate `eth_defi.ganache` -> `eth_defi.provider.ganache` # 0.21.8 diff --git a/docs/source/api/core/index.rst b/docs/source/api/core/index.rst index db3c5206..8b4125de 100644 --- a/docs/source/api/core/index.rst +++ b/docs/source/api/core/index.rst @@ -20,8 +20,6 @@ The core API is built on the top of Web3.py library. eth_defi.confirmation eth_defi.revert_reason eth_defi.hotwallet - eth_defi.anvil - eth_defi.ganache eth_defi.middleware eth_defi.tx eth_defi.trace diff --git a/docs/source/api/index.rst b/docs/source/api/index.rst index 43ee3380..30c78ff3 100644 --- a/docs/source/api/index.rst +++ b/docs/source/api/index.rst @@ -14,6 +14,7 @@ API documentation :maxdepth: 1 core/index + provider/index usdc/index uniswap_v2/index uniswap_v3/index diff --git a/docs/source/api/provider/index.rst b/docs/source/api/provider/index.rst new file mode 100644 index 00000000..db1b9f2e --- /dev/null +++ b/docs/source/api/provider/index.rst @@ -0,0 +1,24 @@ +JSON-RPC provider API +--------------------- + +This submodule offers functionality to connect and enhance robustness of various EVM JSON-RPC API providers.. + +- Support for test and mainnet fork backends like :py:mod:`eth_defi.provider.anvil` and :py:mod:`eth_defi.provider.ganache` + +- `Malicious Extractable Value (MEV) `__ mitigations + in :py:mod:`eth_defi.provider.mev_blocker` + +- Using multiple JSON-APRC providers and fallback providers in :py:mod:`eth_defi.provider.fallback` + +- For the list of available Ethereum, Binance Smart Chain and such API providers please see `ethereumnodes.com `__ + +.. autosummary:: + :toctree: _autosummary_provider + :recursive: + + eth_defi.provider.mev_blocker + eth_defi.provider.fallback + eth_defi.provider.anvil + eth_defi.provider.ganache + eth_defi.provider.named + diff --git a/eth_defi/anvil.py b/eth_defi/anvil.py index a48e6a5d..27f02e56 100644 --- a/eth_defi/anvil.py +++ b/eth_defi/anvil.py @@ -1,487 +1,5 @@ -"""Anvil integration. - -- `Anvil `__ - is a blazing-fast local testnet node implementation in Rust. - -- Anvil can replace :py:class:`eth_tester.main.EthereumTester` as the unit/integration test backend. - -- Anvil is mostly used in mainnet fork test cases. - -- Anvil is a more stable an alternative to Ganache (:py:mod:`eth_defi.ganache`) - -- Anvil is part of `Foundry `__, - a toolkit for Ethereum application development. - -To install Anvil on your UNIX computer: - -.. code-block:: shell - - curl -L https://foundry.paradigm.xyz | bash - PATH=~/.foundry/bin:$PATH - foundryup # Needs to be in path, or installation fails - -This will install `foundryup`, `anvil` at `~/.foundry/bin` and adds the folder to your shell rc file `PATH`. - -For more information see `Anvil reference `__. - -See also :py:mod:`eth_defi.trace` for Solidity tracebacks using Anvil. - -This code was originally lifted from Brownie project. -""" - -import logging -import sys -import time import warnings -from dataclasses import dataclass -from subprocess import DEVNULL, PIPE -from typing import Any, Optional, Union - -import psutil -import requests -from eth_typing import HexAddress -from requests.exceptions import ConnectionError as RequestsConnectionError -from web3 import HTTPProvider, Web3 - -from eth_defi.utils import is_localhost_port_listening, shutdown_hard - -logger = logging.getLogger(__name__) - - -class InvalidArgumentWarning(Warning): - """Lifted from Brownie.""" - - -class RPCRequestError(Exception): - """Lifted from Brownie.""" - - -#: Mappings between Anvil command line parameters and our internal argument names -CLI_FLAGS = { - "port": "--port", - "host": "--host", - "fork": "--fork-url", - "fork_block": "--fork-block-number", - "hardfork": "--hardfork", - "chain_id": "--chain-id", - "default_balance": "--balance", - "gas_limit": "--gas-limit", - "block_time": "--block-time", - "steps_tracing": "--steps-tracing", -} - - -def _launch(cmd: str, **kwargs) -> tuple[psutil.Popen, list[str]]: - """Launches the RPC client. - - Args: - cmd: command string to execute as subprocess""" - if sys.platform == "win32" and not cmd.split(" ")[0].endswith(".cmd"): - if " " in cmd: - cmd = cmd.replace(" ", ".cmd ", 1) - else: - cmd += ".cmd" - cmd_list = cmd.split(" ") - for key, value in [(k, v) for k, v in kwargs.items() if v]: - try: - if value is True or value is False: - # GNU style flags like --step-tracing - if value: - cmd_list.append(CLI_FLAGS[key]) - else: - cmd_list.extend([CLI_FLAGS[key], str(value)]) - except KeyError: - warnings.warn( - f"Ignoring invalid commandline setting for anvil: " f'"{key}" with value "{value}".', - InvalidArgumentWarning, - ) - - # USDC hack - # Some contracts are too large to deploy when they are compiled unoptimized - # TODO: Move to argument - cmd_list += ["--code-size-limit", "99999"] - - final_cmd_str = " ".join(cmd_list) - logger.info("Launching anvil: %s", final_cmd_str) - out = DEVNULL if sys.platform == "win32" else PIPE - - return psutil.Popen(cmd_list, stdin=DEVNULL, stdout=out, stderr=out), cmd_list - - -def make_anvil_custom_rpc_request(web3: Web3, method: str, args: Optional[list] = None) -> Any: - """Make a request to special named EVM JSON-RPC endpoint. - - - `See the Anvil custom RPC methods here `__. - - :param method: - RPC endpoint name - - :param args: - JSON-RPC call arguments - - :return: - RPC result - - :raise RPCRequestError: - In the case RPC method errors - """ - - if args is None: - args = () - - try: - response = web3.provider.make_request(method, args) # type: ignore - if "result" in response: - return response["result"] - - except (AttributeError, RequestsConnectionError): - raise RPCRequestError("Web3 is not connected.") - - raise RPCRequestError(response["error"]["message"]) - - -@dataclass -class AnvilLaunch: - """Control Anvil processes launched on background. - - Comes with a helpful :py:meth:`close` method when it is time to put Anvil rest. - """ - - #: Which port was bound by the Anvil - port: int - - #: Used command-line to spin up anvil - cmd: list[str] - - #: Where does Anvil listen to JSON-RPC - json_rpc_url: str - - #: UNIX process that we opened - process: psutil.Popen - - def close(self, log_level: Optional[int] = None, block=True, block_timeout=30) -> tuple[bytes, bytes]: - """Close the background Anvil process. - - :param log_level: - Dump Anvil messages to logging - - :param block: - Block the execution until anvil is gone - - :param block_timeout: - How long time we try to kill Anvil until giving up. - - :return: - Anvil stdout, stderr as string - """ - stdout, stderr = shutdown_hard( - self.process, - log_level=log_level, - block=block, - block_timeout=block_timeout, - check_port=self.port, - ) - logger.info("Anvil shutdown %s", self.json_rpc_url) - return stdout, stderr - - -def launch_anvil( - fork_url: Optional[str] = None, - unlocked_addresses: list[Union[HexAddress, str]] = None, - cmd="anvil", - port: int = 19999, - block_time=0, - launch_wait_seconds=20.0, - attempts=3, - hardfork="london", - gas_limit: Optional[int] = None, - steps_tracing=False, - test_request_timeout=3.0, -) -> AnvilLaunch: - """Creates Anvil unit test backend or mainnet fork. - - - Anvil can be used as web3.py test backend instead of `EthereumTester`. - Anvil offers faster execution and tracing - see :py:mod:`eth_defi.trace`. - - - Forking a mainnet is a common way to test against live deployments. - This function invokes `anvil` command and tells it to fork a given JSON-RPC endpoint. - - When called, a subprocess is started on the background. - To stop this process, call :py:meth:`eth_defi.anvil.AnvilLaunch.close`. - - This function waits `launch_wait_seconds` in order to `anvil` process to start - and complete the chain fork. - - **Unit test backend**: - - - See `eth_defi.tests.enzyme.conftest `__ for an example - how to use Anvil in your Python based unit test suite - - **Mainnet fork**: Here is an example that forks BNB chain mainnet and transfer 500 BUSD stablecoin to a test - account we control: - - .. code-block:: python - - from eth_defi.anvil import fork_network_anvil - from eth_defi.chain import install_chain_middleware - from eth_defi.gas import node_default_gas_price_strategy - - @pytest.fixture() - def large_busd_holder() -> HexAddress: - # An onchain address with BUSD balance - # Binance Hot Wallet 6 - return HexAddress(HexStr("0x8894E0a0c962CB723c1976a4421c95949bE2D4E3")) - - - @pytest.fixture() - def user_1() -> LocalAccount: - # Create a test account - return Account.create() - - - @pytest.fixture() - def anvil_bnb_chain_fork(request, large_busd_holder, user_1, user_2) -> str: - # Create a testable fork of live BNB chain. - mainnet_rpc = os.environ["BNB_CHAIN_JSON_RPC"] - launch = fork_network_anvil(mainnet_rpc, unlocked_addresses=[large_busd_holder]) - try: - yield launch.json_rpc_url - finally: - # Wind down Anvil process after the test is complete - launch.close(log_level=logging.ERROR) - - - @pytest.fixture() - def web3(anvil_bnb_chain_fork: str): - # Set up a local unit testing blockchain - # https://web3py.readthedocs.io/en/stable/examples.html#contract-unit-tests-in-python - web3 = Web3(HTTPProvider(anvil_bnb_chain_fork)) - # Anvil needs POA middlware if parent chain needs POA middleware - install_chain_middleware(web3) - web3.eth.set_gas_price_strategy(node_default_gas_price_strategy) - return web3 - - def test_anvil_fork_transfer_busd(web3: Web3, large_busd_holder: HexAddress, user_1: LocalAccount): - # Forks the BNB chain mainnet and transfers from USDC to the user. - - # BUSD deployment on BNB chain - # https://bscscan.com/token/0xe9e7cea3dedca5984780bafc599bd69add087d56 - busd_details = fetch_erc20_details(web3, "0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56") - busd = busd_details.contract - - # Transfer 500 BUSD to the user 1 - tx_hash = busd.functions.transfer(user_1.address, 500 * 10**18).transact({"from": large_busd_holder}) - - # Because Ganache has instamine turned on by default, we do not need to wait for the transaction - receipt = web3.eth.get_transaction_receipt(tx_hash) - assert receipt.status == 1, "BUSD transfer reverted" - - assert busd.functions.balanceOf(user_1.address).call() == 500 * 10**18 - - `See the full example in tests source code `_. - - If `anvil` refuses to terminate properly, you can kill a process by a port in your terminal: - - .. code-block:: shell - - # Kill any process listening to localhost:19999 - kill -SIGKILL $(lsof -ti:19999) - - See also - - - :py:func:`eth_defi.trace.assert_transaction_success_with_explanation` - - - :py:func:`eth_defi.trace.print_symbolic_trace` - - :param cmd: - Override `anvil` command. If not given we look up from `PATH`. - - :param fork_url: - HTTP JSON-RPC URL of the network we want to fork. - - If not given launch an empty test backend. - - :param unlocked_addresses: - List of addresses of which ownership we take to allow test code to transact as them - - :param port: - Localhost port we bind for Anvil JSON-RPC - - :param launch_wait_seconds: - How long we wait anvil to start until giving up - - :param block_time: - - How long Anvil takes to mine a block. Default is zero: - Anvil is in `automining mode `__ - and creates a new block for each new transaction. - - Set to `1` or higher so that you can poll the transaction as you would do with - a live JSON-RPC node. - - :param attempts: - How many attempts we do to start anvil. - - Anvil launch may fail without any output. This could be because the given JSON-RPC - node is throttling your API requests. In this case we just try few more times - again by killing the Anvil process and starting it again. - - :param gas_limit: - Set the block gas limit. - - :param hardfork: - EVM version to use - - :param step_tracing: - Enable Anvil step tracing. - - Needed to get structured logs. - - Only needed on GoEthereum style tracing, not needed for Parity style tracing. - - See https://book.getfoundry.sh/reference/anvil/ - - :param test_request_timeout: - Set the timeout fro the JSON-RPC requests that attempt to determine if Anvil was successfully launched. - - """ - - assert not is_localhost_port_listening(port), f"localhost port {port} occupied.\n" f"You might have a zombie Anvil process around.\nRun to kill: kill -SIGKILL $(lsof -ti:{port})" - - url = f"http://localhost:{port}" - - attempts_left = attempts - process = None - final_cmd = None - current_block = 0 - web3 = None - - if unlocked_addresses is None: - unlocked_addresses = [] - - # https://book.getfoundry.sh/reference/anvil/ - args = dict( - port=port, - fork=fork_url, - hardfork=hardfork, - gas_limit=gas_limit, - steps_tracing=steps_tracing, - ) - - if block_time not in (0, None): - assert block_time > 0, f"Got bad block time {block_time}" - args["block_time"] = block_time - - current_block = chain_id = None - - while attempts_left > 0: - process, final_cmd = _launch( - cmd, - **args, - ) - - # Wait until Anvil is responsive - timeout = time.time() + launch_wait_seconds - - # Use shorter read timeout here - otherwise requests will wait > 10s if something is wrong - web3 = Web3(HTTPProvider(url, request_kwargs={"timeout": test_request_timeout})) - while time.time() < timeout: - try: - # See if web3 RPC works - current_block = web3.eth.block_number - chain_id = web3.eth.chain_id - break - except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e: - logger.info("Anvil not ready, got exception %s", e) - # requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer')) - time.sleep(0.1) - continue - - if current_block is None: - logger.error("Could not read the latest block from anvil %s within %f seconds, shutting down and dumping output", url, launch_wait_seconds) - stdout, stderr = shutdown_hard( - process, - log_level=logging.ERROR, - block=True, - check_port=port, - ) - - if len(stdout) == 0: - attempts_left -= 1 - if attempts_left > 0: - logger.info("anvil did not start properly, try again, attempts left %d", attempts_left) - continue - - raise AssertionError(f"Could not read block number from Anvil after the launch {cmd}: at {url}, stdout is {len(stdout)} bytes, stderr is {len(stderr)} bytes") - else: - # We have a successful launch - break - # Use f-string for a thousand separator formatting - logger.info(f"anvil forked network {chain_id}, the current block is {current_block:,}, Anvil JSON-RPC is {url}") - - # Perform unlock accounts for all accounts - for account in unlocked_addresses: - unlock_account(web3, account) - - return AnvilLaunch(port, final_cmd, url, process) - - -def unlock_account(web3: Web3, address: str): - """Make Anvil mainnet fork to accept transactions to any Ethereum account. - - This is even when we do not have a private key for the account. - - :param web3: - Web3 instance - - :param address: - Account to unlock - """ - web3.provider.make_request("anvil_impersonateAccount", [address]) # type: ignore - - -def sleep(web3: Web3, seconds: int) -> int: - """Call emv_increaseTime on Anvil""" - make_anvil_custom_rpc_request(web3, "evm_increaseTime", [hex(seconds)]) - return seconds - - -def mine(web3: Web3, timestamp: Optional[int] = None) -> None: - """Call evm_setNextBlockTimestamp on Anvil""" - - if timestamp is None: - block = web3.eth.get_block(web3.eth.block_number) - timestamp = block["timestamp"] + 1 - - make_anvil_custom_rpc_request(web3, "evm_mine", [timestamp]) - - -def snapshot(web3: Web3) -> int: - """Call evm_snapshot on Anvil""" - return int(make_anvil_custom_rpc_request(web3, "evm_snapshot", []), 16) - - -def revert(web3: Web3, snapshot_id: int) -> bool: - """Call evm_revert on Anvil - - https://book.getfoundry.sh/reference/anvil/ - - :return: - True if a snapshot was reverted - """ - ret_val = make_anvil_custom_rpc_request(web3, "evm_revert", [snapshot_id]) - return ret_val - - -def dump_state(web3: Web3) -> int: - """Call evm_snapshot on Anvil""" - return make_anvil_custom_rpc_request(web3, "anvil_dumpState") - - -def load_state(web3: Web3, state: str) -> int: - """Call evm_snapshot on Anvil""" - return make_anvil_custom_rpc_request(web3, "anvil_loadState", [state]) +warnings.warn("eth_defi.anvil has been moved to eth_defi.provider.anvil", DeprecationWarning, stacklevel=2) -# Backwards compatibility -fork_network_anvil = launch_anvil +from eth_defi.provider.anvil import * diff --git a/eth_defi/chain.py b/eth_defi/chain.py index ed2e4028..7b0cb07c 100644 --- a/eth_defi/chain.py +++ b/eth_defi/chain.py @@ -13,7 +13,9 @@ import requests from web3 import Web3, HTTPProvider from web3.middleware import geth_poa_middleware +from web3.providers import JSONBaseProvider from web3.types import RPCEndpoint, RPCResponse +from web3.datastructures import NamedElementOnion from eth_defi.event_reader.conversion import convert_jsonrpc_value_to_int from eth_defi.middleware import http_retry_request_with_sleep_middleware @@ -106,7 +108,7 @@ def install_api_call_counter_middleware(web3: Web3) -> Counter: assert counter["eth_blockNumber"] == 1 :return: - Counter object with columns per RPC endpoint and "toal" + Counter object with columns per RPC endpoint and "total" """ api_counter = Counter() @@ -122,6 +124,42 @@ def middleware(method: RPCEndpoint, params: Any) -> Optional[RPCResponse]: return api_counter +def install_api_call_counter_middleware_on_provider(provider: JSONBaseProvider) -> Counter: + """Install API call counter middleware on a specific API provider. + + Allows per-provider API call counting when using complex + provider setups. + + See also + + - :py:func:`install_api_call_counter_middleware` + + - :py:class:`eth_defi.fallback_provider.FallbackProvider` + + :return: + Counter object with columns per RPC endpoint and "total" + """ + + assert isinstance(provider, JSONBaseProvider), f"Got {provider.__class__}" + + api_counter = Counter() + + def factory(make_request: Callable[[RPCEndpoint, Any], Any], web3: "Web3"): + import ipdb + + ipdb.set_trace() + + def middleware(method: RPCEndpoint, params: Any) -> Optional[RPCResponse]: + api_counter[method] += 1 + api_counter["total"] += 1 + return make_request(method, params) + + return middleware + + provider.middlewares.add("api_counter_middleware", factory) + return api_counter + + def has_graphql_support(provider: HTTPProvider) -> bool: """Check if a node has GoEthereum GraphQL API turned on. diff --git a/eth_defi/ganache.py b/eth_defi/ganache.py index 19751f89..a8b308ea 100644 --- a/eth_defi/ganache.py +++ b/eth_defi/ganache.py @@ -1,434 +1,5 @@ -"""Ganache EVM test backend and mainnet forking. - -This module contains utilities to automatically launch and -manipulate `ganache-cli` process. - -You need to have `ganache-cli` installed in order to use these. - -How to install ganache-cli using npm: - -.. code-block:: shell - - npm install -g ganache - -For more information about Ganache see - -- `Ganache CLI command line documentation `_ - -- `Aave Web.py example `_ - -- `QuickNode how to fork mainnet with Ganache tutorial `_ - -`Most of this code is lifted from Brownie project (MIT) `_ -and it is not properly cleaned up yet. - -""" - -import datetime -import logging -import re -import shutil -import sys -import time import warnings -from dataclasses import dataclass -from subprocess import DEVNULL, PIPE -from typing import Dict, List, Tuple, Union - -import psutil -import requests - -from eth_typing import HexAddress -from hexbytes import HexBytes -from psutil import NoSuchProcess -from web3 import HTTPProvider, Web3 -from web3.types import Wei - -from eth_defi.utils import is_localhost_port_listening - - -logger = logging.getLogger(__name__) - - -EVM_EQUIVALENTS = {"atlantis": "byzantium", "agharta": "petersburg"} - -# https://github.com/trufflesuite/ganache -CLI_FLAGS = { - "7": { - "port": "--server.port", - "gas_limit": "--miner.blockGasLimit", - "accounts": "--wallet.totalAccounts", - "evm_version": "--hardfork", - "fork": "--fork.url", - "mnemonic": "--wallet.mnemonic", - "account_keys_path": "--wallet.accountKeysPath", - "block_time": "--miner.blockTime", - "default_balance": "--wallet.defaultBalance", - "time": "--chain.time", - "unlock": "--wallet.unlockedAccounts", - "network_id": "--chain.networkId", - "chain_id": "--chain.chainId", - "unlimited_contract_size": "--chain.allowUnlimitedContractSize", - "quiet": "--logging.quiet", - }, - "<=6": { - "port": "--port", - "gas_limit": "--gasLimit", - "accounts": "--accounts", - "evm_version": "--hardfork", - "fork": "--fork", - "mnemonic": "--mnemonic", - "account_keys_path": "--acctKeys", - "block_time": "--blockTime", - "default_balance": "--defaultBalanceEther", - "time": "--time", - "unlock": "--unlock", - "network_id": "--networkId", - "chain_id": "--chainId", - "unlimited_contract_size": "--allowUnlimitedContractSize", - }, -} - -EVM_VERSIONS = ["byzantium", "constantinople", "petersburg", "istanbul"] - -#: The default hardfork rules used by Ganache -EVM_DEFAULT = "london" - - -class NoGanacheInstalled(Exception): - """We could not launch because ganache-cli command is missing""" - - -class InvalidArgumentWarning(Warning): - """Warned when there are issued with ganache-cli command line.""" - - -def _launch(cmd: str, **kwargs: Dict) -> Tuple[psutil.Popen, List[str]]: - """Launches the RPC client. - - Args: - cmd: command string to execute as subprocess""" - if sys.platform == "win32" and not cmd.split(" ")[0].endswith(".cmd"): - if " " in cmd: - cmd = cmd.replace(" ", ".cmd ", 1) - else: - cmd += ".cmd" - cmd_list = cmd.split(" ") - - ganache_executable = cmd_list[0] - - found = shutil.which(ganache_executable) - if not found: - raise NoGanacheInstalled(f"Could not find ganache-cli installation: {ganache_executable} - are you sure it is installed?") - - ganache_version = _get_ganache_version(ganache_executable) - - if ganache_version <= 6: - cli_flags = CLI_FLAGS["<=6"] - else: - cli_flags = CLI_FLAGS["7"] - # this flag must be true so that reverting tx's return a - # more verbose output similar to what ganache 6 produced - cmd_list.extend(["--chain.vmErrorsOnRPCResponse", "true"]) - - kwargs.setdefault("evm_version", EVM_DEFAULT) # type: ignore - if kwargs["evm_version"] in EVM_EQUIVALENTS: - kwargs["evm_version"] = EVM_EQUIVALENTS[kwargs["evm_version"]] # type: ignore - kwargs = _validate_cmd_settings(kwargs) - for key, value in [(k, v) for k, v in kwargs.items() if v]: - if key == "unlock": - if not isinstance(value, list): - value = [value] # type: ignore - for address in value: - if isinstance(address, int): - address = HexBytes(address.to_bytes(20, "big")).hex() - cmd_list.extend([cli_flags[key], address]) - else: - try: - # Handle boolean options - if value is True: - cmd_list.append(cli_flags[key]) - elif value is not False: - cmd_list.extend([cli_flags[key], str(value)]) - except KeyError: - warnings.warn( - f"Ignoring invalid commandline setting for ganache-cli: '{key}' with value '{value}'.", - InvalidArgumentWarning, - ) - out = DEVNULL if sys.platform == "win32" else PIPE - - logger.info("Launching ganache-cli: %s", " ".join(cmd_list)) - return psutil.Popen(cmd_list, stdin=DEVNULL, stdout=out, stderr=out), cmd_list - - -def _get_ganache_version(ganache_executable: str) -> int: - ganache_version_proc = psutil.Popen([ganache_executable, "--version"], stdout=PIPE) - ganache_version_stdout, _ = ganache_version_proc.communicate() - ganache_version_match = re.search(r"v([0-9]+)\.", ganache_version_stdout.decode()) - if not ganache_version_match: - raise ValueError("could not read ganache version: {}".format(ganache_version_stdout)) - return int(ganache_version_match.group(1)) - - -def _validate_cmd_settings(cmd_settings: dict) -> dict: - ganache_keys = set(k for f in CLI_FLAGS.values() for k in f.keys()) - - CMD_TYPES = { - "port": int, - "gas_limit": int, - "block_time": int, - "time": datetime.datetime, - "accounts": int, - "evm_version": str, - "mnemonic": str, - "account_keys_path": str, - "fork": str, - "network_id": int, - "chain_id": int, - "quiet": bool, - } - for cmd, value in cmd_settings.items(): - if cmd in ganache_keys and cmd in CMD_TYPES.keys() and not isinstance(value, CMD_TYPES[cmd]): - raise TypeError(f"Wrong type for cmd_settings '{cmd}': {value}. Found {type(value).__name__}, but expected {CMD_TYPES[cmd].__name__}.") - - if "default_balance" in cmd_settings: - try: - cmd_settings["default_balance"] = int(cmd_settings["default_balance"]) - except ValueError: - # convert any input to ether, then format it properly - default_eth = Wei(cmd_settings["default_balance"]).to("ether") - cmd_settings["default_balance"] = default_eth.quantize(1) if default_eth > 1 else default_eth.normalize() - return cmd_settings - - -@dataclass -class GanacheLaunch: - """Control ganache-cli processes launched on background. - - Comes with a helpful :py:meth:`close` method when it is time to put Ganache rest. - """ - - #: Which port was bound by the ganache - port: int - - #: Used command-line to spin up ganache-cli - cmd: List[str] - - #: Where does Ganache listen to JSON-RPC - json_rpc_url: str - - #: UNIX process that we opened - process: psutil.Popen - - def close(self, verbose=False, block=True, block_timeout=30): - """Kill the ganache-cli process. - - Ganache is pretty hard to kill, so keep killing it until it dies and the port is free again. - - :param block: Block the execution until Ganache has terminated - :param block_timeout: How long we give for Ganache to clean up after itself - :param verbose: If set, dump anything in Ganache stdout to the Python logging using level `INFO`. - """ - - process = self.process - if verbose: - # TODO: This does not seem to work on macOS, - # but is fine on Ubuntu on Github CI - logger.info("Dumping Ganache output") - if process.poll() is not None: - output = process.communicate()[0].decode("utf-8") - for line in output.split("\n"): - logger.info(line) - - # process.terminate() - # Hahahahah, this is Ganache, do you think terminate signal is enough - try: - process.kill() - except NoSuchProcess: - raise AssertionError("ganache died on its own :(") - - if block: - deadline = time.time() + 30 - while time.time() < deadline: - if not is_localhost_port_listening(self.port): - # Port released, assume Ganache is gone - return - - raise AssertionError(f"Could not terminate ganache in {block_timeout} seconds") - - -def fork_network( - json_rpc_url: str, - unlocked_addresses: List[Union[HexAddress, str]] = [], - cmd="ganache-cli", - port=19999, - evm_version=EVM_DEFAULT, - block_time=0, - quiet=False, - launch_wait_seconds=20.0, -) -> GanacheLaunch: - """Creates the ganache "fork" of given JSON-RPC endpoint. - - .. warning:: - - This function is not recommended due to stability issues with Ganache. - Use :py:func:`eth_defi.anvil.fork_network_anvil` instead. - - Forking a mainnet is common way to test against live deployments. - This function invokes `ganache-cli` command and tells it to fork a given JSON-RPC endpoint. - - A subprocess is started on the background. To stop this process, call :py:meth:`eth_defi.ganache.GanacheLaunch.close`. - This function waits `launch_wait_seconds` in order to `ganache-cli` process to start - and complete the chain fork. - - .. note :: - - Currently only supports HTTP JSON-RPC connections. - - .. warning :: - - Forking a network with ganache-cli is a slow process. It is recommended - that you use fast Ethereum Tester based testing if possible. - - Here is an example that forks BNB chain mainnet and transfer 500 BUSD stablecoin to a test - account we control: - - .. code-block:: python - - @pytest.fixture() - def large_busd_holder() -> HexAddress: - # A random account picked from BNB Smart chain that holds a lot of BUSD. - # Binance Hot Wallet 6 - return HexAddress(HexStr("0x8894E0a0c962CB723c1976a4421c95949bE2D4E3")) - - - @pytest.fixture() - def ganache_bnb_chain_fork(large_busd_holder) -> str: - # Create a testable fork of live BNB chain. - mainnet_rpc = os.environ["BNB_CHAIN_JSON_RPC"] - launch = fork_network( - mainnet_rpc, - unlocked_addresses=[large_busd_holder]) - yield launch.json_rpc_url - # Wind down Ganache process after the test is complete - launch.close() - - - @pytest.fixture - def web3(ganache_bnb_chain_fork: str): - # Set up a local unit testing blockchain - return Web3(HTTPProvider(ganache_bnb_chain_fork)) - - - def test_mainnet_fork_transfer_busd(web3: Web3, large_busd_holder: HexAddress, user_1: LocalAccount): - - # BUSD deployment on BNB chain - # https://bscscan.com/token/0xe9e7cea3dedca5984780bafc599bd69add087d56 - busd_details = fetch_erc20_details(web3, "0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56") - busd = busd_details.contract - - # Transfer 500 BUSD to the user 1 - tx_hash = busd.functions.transfer(user_1.address, 500*10**18).transact({"from": large_busd_holder}) - - # Because Ganache has instamine turned on by default, we do not need to wait for the transaction - receipt = web3.eth.get_transaction_receipt(tx_hash) - assert receipt.status == 1, "BUSD transfer reverted" - - assert busd.functions.balanceOf(user_1.address).call() == 500*10**18 - - `See the full example in tests source code `_. - - Polygon needs to set a specific EVM version: - - .. code-block:: python - - mainnet_rpc = os.environ["POLYGON_JSON_RPC"] - launch = fork_network(mainnet_rpc, evm_version="istanbul") - - If `ganache-cli` refuses to terminate properly, you can kill a process by a port with: - - .. code-block:: shell - - # Kill any process listening to localhost:19999 - kill -SIGKILL $(lsof -ti:19999) - - This function uses Python logging subsystem. If you want to see error/info/debug logs with `pytest` you can do: - - .. code-block:: shell - - pytest --log-cli-level=debug - - For public JSON-RPC endpoints check - - - `BNB chain documentation `_ - - - `ethereumnodes.com `_ - - :param cmd: Override `ganache-cli` command. If not given we look up from `PATH`. - :param json_rpc_url: HTTP JSON-RPC URL of the network we want to fork - :param unlocked_addresses: List of addresses of which ownership we take to allow test code to transact as them - :param port: Localhost port we bind for Ganache JSON-RPC - :param launch_wait_seconds: How long we wait ganache-cli to start until giving up - :param evm_version: "london" for the default hard fork - :param block_time: - How long Ganache takes to mine a block. Default is zero and any RPC transaction - will immediately return with the transaction inclusion. - Set to `1` so that you can poll the transaction as you would do with - a live JSON-RPC node. - :param quiet: - Disable extensive logging. If there is a lot of Ganache logging it seems to crash - on Github CI. - - """ - - assert not is_localhost_port_listening(port), f"localhost port {port} occupied - you might have a zombie Ganache around" - - url = f"http://localhost:{port}" - - process, final_cmd = _launch( - cmd, - port=port, - fork=json_rpc_url, - unlock=unlocked_addresses, - evm_version=evm_version, - block_time=block_time, - quiet=quiet, - ) - - # Wait until Ganache is responsive - timeout = time.time() + launch_wait_seconds - current_block = None - - # Use short 1.0s HTTP read timeout here - otherwise requests will wa-it > 10s if something is wrong - web3 = Web3(HTTPProvider(url, request_kwargs={"timeout": 1.0})) - while time.time() < timeout: - if process.poll() is not None: - output = process.communicate()[0].decode("utf-8") - for line in output.split("\n"): - logger.error(line) - raise AssertionError(f"ganache-cli died on launch, used command was {final_cmd}") - - try: - current_block = web3.eth.block_number - break - except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout): - # requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer')) - time.sleep(0.1) - continue - - if current_block is None: - if process.poll() is not None: - output = process.communicate()[0].decode("utf-8") - for line in output.split("\n"): - logger.error(line) - raise AssertionError(f"ganache-cli died on launch, used command was {final_cmd}") - - logger.error("Could not read the latest block from ganache-cli within %f seconds", launch_wait_seconds) - raise AssertionError(f"Could not connect to ganache-cli {cmd}: at {url}") - - chain_id = web3.eth.chain_id - # Use f-string for thousand separator formatting - logger.info(f"ganache-cli forked network %d, the current block is {current_block:,}, Ganache JSON-RPC is %s", chain_id, url) +warnings.warn("eth_defi.ganache has been moved to eth_defi.provider.ganache", DeprecationWarning, stacklevel=2) - return GanacheLaunch(port, final_cmd, url, process) +from eth_defi.provider.ganache import * diff --git a/eth_defi/hotwallet.py b/eth_defi/hotwallet.py index 8ac191b7..1c41a0fe 100644 --- a/eth_defi/hotwallet.py +++ b/eth_defi/hotwallet.py @@ -1,6 +1,13 @@ -"""Utilities for managing hot wallets.""" +"""Utilities for managing hot wallets. + +- Create local wallets from a private key + +- Sign transactions in batches + +""" import logging +import secrets from decimal import Decimal from typing import Optional, NamedTuple @@ -58,7 +65,7 @@ class HotWallet: .. note :: - Not thread safe. Manages consumed nonce counter locally. + Not thread safe. This class manages consumed nonce counter locally. """ @@ -90,7 +97,30 @@ def allocate_nonce(self) -> int: def sign_transaction_with_new_nonce(self, tx: dict) -> SignedTransactionWithNonce: """Signs a transaction and allocates a nonce for it. - :param: Ethereum transaction data as a dict. This is modified in-place to include nonce. + Example: + + .. code-block:: python + + web3 = Web3(mev_blocker_provider) + wallet = HotWallet.create_for_testing(web3) + + # Send some ETH to zero address from + # the hot wallet + signed_tx = wallet.sign_transaction_with_new_nonce({ + "from": wallet.address, + "to": ZERO_ADDRESS, + "value": 1, + "gas": 100_000, + "gasPrice": web3.eth.gas_price, + }) + tx_hash = web3.eth.send_raw_transaction(signed_tx.rawTransaction) + + :param tx: + Ethereum transaction data as a dict. + This is modified in-place to include nonce. + + :return: + A transaction payload and nonce with used to generate this transaction. """ assert type(tx) == dict assert "nonce" not in tx @@ -163,3 +193,42 @@ def from_private_key(key: str) -> "HotWallet": assert key.startswith("0x") account = Account.from_key(key) return HotWallet(account) + + @staticmethod + def create_for_testing(web3: Web3, test_account_n=0, eth_amount=10) -> "HotWallet": + """Creates a new hot wallet and seeds it with ETH from one of well-known test accounts. + + Shortcut method for unit testing. + + Example: + + .. code-block:: python + + web3 = Web3(test_provider) + wallet = HotWallet.create_for_testing(web3) + + signed_tx = wallet.sign_transaction_with_new_nonce( + { + "from": wallet.address, + "to": ZERO_ADDRESS, + "value": 1, + "gas": 100_000, + "gasPrice": web3.eth.gas_price, + } + ) + + tx_hash = web3.eth.send_raw_transaction(signed_tx.rawTransaction) + assert_transaction_success_with_explanation(web3, tx_hash) + + """ + wallet = HotWallet.from_private_key("0x" + secrets.token_hex(32)) + tx_hash = web3.eth.send_transaction( + { + "from": web3.eth.accounts[test_account_n], + "to": wallet.address, + "value": eth_amount * 10**18, + } + ) + web3.eth.wait_for_transaction_receipt(tx_hash) + wallet.sync_nonce(web3) + return wallet diff --git a/eth_defi/provider/anvil.py b/eth_defi/provider/anvil.py new file mode 100644 index 00000000..6df3c6ef --- /dev/null +++ b/eth_defi/provider/anvil.py @@ -0,0 +1,488 @@ +"""Anvil integration. + +- `Anvil `__ + is a blazing-fast local testnet node implementation in Rust from + `Foundry projet <>`__ + +- Anvil can replace :py:class:`eth_tester.main.EthereumTester` as the unit/integration test backend. + +- Anvil is mostly used in mainnet fork test cases. + +- Anvil is a more stable an alternative to Ganache (:py:mod:`eth_defi.ganache`) + +- Anvil is part of `Foundry `__, + a toolkit for Ethereum application development. + +To install Anvil on your UNIX computer: + +.. code-block:: shell + + curl -L https://foundry.paradigm.xyz | bash + PATH=~/.foundry/bin:$PATH + foundryup # Needs to be in path, or installation fails + +This will install `foundryup`, `anvil` at `~/.foundry/bin` and adds the folder to your shell rc file `PATH`. + +For more information see `Anvil reference `__. + +See also :py:mod:`eth_defi.trace` for Solidity tracebacks using Anvil. + +This code was originally lifted from Brownie project. +""" + +import logging +import sys +import time +import warnings +from dataclasses import dataclass +from subprocess import DEVNULL, PIPE +from typing import Any, Optional, Union + +import psutil +import requests +from eth_typing import HexAddress +from requests.exceptions import ConnectionError as RequestsConnectionError +from web3 import HTTPProvider, Web3 + +from eth_defi.utils import is_localhost_port_listening, shutdown_hard + +logger = logging.getLogger(__name__) + + +class InvalidArgumentWarning(Warning): + """Lifted from Brownie.""" + + +class RPCRequestError(Exception): + """Lifted from Brownie.""" + + +#: Mappings between Anvil command line parameters and our internal argument names +CLI_FLAGS = { + "port": "--port", + "host": "--host", + "fork": "--fork-url", + "fork_block": "--fork-block-number", + "hardfork": "--hardfork", + "chain_id": "--chain-id", + "default_balance": "--balance", + "gas_limit": "--gas-limit", + "block_time": "--block-time", + "steps_tracing": "--steps-tracing", +} + + +def _launch(cmd: str, **kwargs) -> tuple[psutil.Popen, list[str]]: + """Launches the RPC client. + + Args: + cmd: command string to execute as subprocess""" + if sys.platform == "win32" and not cmd.split(" ")[0].endswith(".cmd"): + if " " in cmd: + cmd = cmd.replace(" ", ".cmd ", 1) + else: + cmd += ".cmd" + cmd_list = cmd.split(" ") + for key, value in [(k, v) for k, v in kwargs.items() if v]: + try: + if value is True or value is False: + # GNU style flags like --step-tracing + if value: + cmd_list.append(CLI_FLAGS[key]) + else: + cmd_list.extend([CLI_FLAGS[key], str(value)]) + except KeyError: + warnings.warn( + f"Ignoring invalid commandline setting for anvil: " f'"{key}" with value "{value}".', + InvalidArgumentWarning, + ) + + # USDC hack + # Some contracts are too large to deploy when they are compiled unoptimized + # TODO: Move to argument + cmd_list += ["--code-size-limit", "99999"] + + final_cmd_str = " ".join(cmd_list) + logger.info("Launching anvil: %s", final_cmd_str) + out = DEVNULL if sys.platform == "win32" else PIPE + + return psutil.Popen(cmd_list, stdin=DEVNULL, stdout=out, stderr=out), cmd_list + + +def make_anvil_custom_rpc_request(web3: Web3, method: str, args: Optional[list] = None) -> Any: + """Make a request to special named EVM JSON-RPC endpoint. + + - `See the Anvil custom RPC methods here `__. + + :param method: + RPC endpoint name + + :param args: + JSON-RPC call arguments + + :return: + RPC result + + :raise RPCRequestError: + In the case RPC method errors + """ + + if args is None: + args = () + + try: + response = web3.provider.make_request(method, args) # type: ignore + if "result" in response: + return response["result"] + + except (AttributeError, RequestsConnectionError): + raise RPCRequestError("Web3 is not connected.") + + raise RPCRequestError(response["error"]["message"]) + + +@dataclass +class AnvilLaunch: + """Control Anvil processes launched on background. + + Comes with a helpful :py:meth:`close` method when it is time to put Anvil rest. + """ + + #: Which port was bound by the Anvil + port: int + + #: Used command-line to spin up anvil + cmd: list[str] + + #: Where does Anvil listen to JSON-RPC + json_rpc_url: str + + #: UNIX process that we opened + process: psutil.Popen + + def close(self, log_level: Optional[int] = None, block=True, block_timeout=30) -> tuple[bytes, bytes]: + """Close the background Anvil process. + + :param log_level: + Dump Anvil messages to logging + + :param block: + Block the execution until anvil is gone + + :param block_timeout: + How long time we try to kill Anvil until giving up. + + :return: + Anvil stdout, stderr as string + """ + stdout, stderr = shutdown_hard( + self.process, + log_level=log_level, + block=block, + block_timeout=block_timeout, + check_port=self.port, + ) + logger.info("Anvil shutdown %s", self.json_rpc_url) + return stdout, stderr + + +def launch_anvil( + fork_url: Optional[str] = None, + unlocked_addresses: list[Union[HexAddress, str]] = None, + cmd="anvil", + port: int = 19999, + block_time=0, + launch_wait_seconds=20.0, + attempts=3, + hardfork="london", + gas_limit: Optional[int] = None, + steps_tracing=False, + test_request_timeout=3.0, +) -> AnvilLaunch: + """Creates Anvil unit test backend or mainnet fork. + + - Anvil can be used as web3.py test backend instead of `EthereumTester`. + Anvil offers faster execution and tracing - see :py:mod:`eth_defi.trace`. + + - Forking a mainnet is a common way to test against live deployments. + This function invokes `anvil` command and tells it to fork a given JSON-RPC endpoint. + + When called, a subprocess is started on the background. + To stop this process, call :py:meth:`eth_defi.anvil.AnvilLaunch.close`. + + This function waits `launch_wait_seconds` in order to `anvil` process to start + and complete the chain fork. + + **Unit test backend**: + + - See `eth_defi.tests.enzyme.conftest `__ for an example + how to use Anvil in your Python based unit test suite + + **Mainnet fork**: Here is an example that forks BNB chain mainnet and transfer 500 BUSD stablecoin to a test + account we control: + + .. code-block:: python + + from eth_defi.anvil import fork_network_anvil + from eth_defi.chain import install_chain_middleware + from eth_defi.gas import node_default_gas_price_strategy + + @pytest.fixture() + def large_busd_holder() -> HexAddress: + # An onchain address with BUSD balance + # Binance Hot Wallet 6 + return HexAddress(HexStr("0x8894E0a0c962CB723c1976a4421c95949bE2D4E3")) + + + @pytest.fixture() + def user_1() -> LocalAccount: + # Create a test account + return Account.create() + + + @pytest.fixture() + def anvil_bnb_chain_fork(request, large_busd_holder, user_1, user_2) -> str: + # Create a testable fork of live BNB chain. + mainnet_rpc = os.environ["BNB_CHAIN_JSON_RPC"] + launch = fork_network_anvil(mainnet_rpc, unlocked_addresses=[large_busd_holder]) + try: + yield launch.json_rpc_url + finally: + # Wind down Anvil process after the test is complete + launch.close(log_level=logging.ERROR) + + + @pytest.fixture() + def web3(anvil_bnb_chain_fork: str): + # Set up a local unit testing blockchain + # https://web3py.readthedocs.io/en/stable/examples.html#contract-unit-tests-in-python + web3 = Web3(HTTPProvider(anvil_bnb_chain_fork)) + # Anvil needs POA middlware if parent chain needs POA middleware + install_chain_middleware(web3) + web3.eth.set_gas_price_strategy(node_default_gas_price_strategy) + return web3 + + def test_anvil_fork_transfer_busd(web3: Web3, large_busd_holder: HexAddress, user_1: LocalAccount): + # Forks the BNB chain mainnet and transfers from USDC to the user. + + # BUSD deployment on BNB chain + # https://bscscan.com/token/0xe9e7cea3dedca5984780bafc599bd69add087d56 + busd_details = fetch_erc20_details(web3, "0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56") + busd = busd_details.contract + + # Transfer 500 BUSD to the user 1 + tx_hash = busd.functions.transfer(user_1.address, 500 * 10**18).transact({"from": large_busd_holder}) + + # Because Ganache has instamine turned on by default, we do not need to wait for the transaction + receipt = web3.eth.get_transaction_receipt(tx_hash) + assert receipt.status == 1, "BUSD transfer reverted" + + assert busd.functions.balanceOf(user_1.address).call() == 500 * 10**18 + + `See the full example in tests source code `_. + + If `anvil` refuses to terminate properly, you can kill a process by a port in your terminal: + + .. code-block:: shell + + # Kill any process listening to localhost:19999 + kill -SIGKILL $(lsof -ti:19999) + + See also + + - :py:func:`eth_defi.trace.assert_transaction_success_with_explanation` + + - :py:func:`eth_defi.trace.print_symbolic_trace` + + :param cmd: + Override `anvil` command. If not given we look up from `PATH`. + + :param fork_url: + HTTP JSON-RPC URL of the network we want to fork. + + If not given launch an empty test backend. + + :param unlocked_addresses: + List of addresses of which ownership we take to allow test code to transact as them + + :param port: + Localhost port we bind for Anvil JSON-RPC + + :param launch_wait_seconds: + How long we wait anvil to start until giving up + + :param block_time: + + How long Anvil takes to mine a block. Default is zero: + Anvil is in `automining mode `__ + and creates a new block for each new transaction. + + Set to `1` or higher so that you can poll the transaction as you would do with + a live JSON-RPC node. + + :param attempts: + How many attempts we do to start anvil. + + Anvil launch may fail without any output. This could be because the given JSON-RPC + node is throttling your API requests. In this case we just try few more times + again by killing the Anvil process and starting it again. + + :param gas_limit: + Set the block gas limit. + + :param hardfork: + EVM version to use + + :param step_tracing: + Enable Anvil step tracing. + + Needed to get structured logs. + + Only needed on GoEthereum style tracing, not needed for Parity style tracing. + + See https://book.getfoundry.sh/reference/anvil/ + + :param test_request_timeout: + Set the timeout fro the JSON-RPC requests that attempt to determine if Anvil was successfully launched. + + """ + + assert not is_localhost_port_listening(port), f"localhost port {port} occupied.\n" f"You might have a zombie Anvil process around.\nRun to kill: kill -SIGKILL $(lsof -ti:{port})" + + url = f"http://localhost:{port}" + + attempts_left = attempts + process = None + final_cmd = None + current_block = 0 + web3 = None + + if unlocked_addresses is None: + unlocked_addresses = [] + + # https://book.getfoundry.sh/reference/anvil/ + args = dict( + port=port, + fork=fork_url, + hardfork=hardfork, + gas_limit=gas_limit, + steps_tracing=steps_tracing, + ) + + if block_time not in (0, None): + assert block_time > 0, f"Got bad block time {block_time}" + args["block_time"] = block_time + + current_block = chain_id = None + + while attempts_left > 0: + process, final_cmd = _launch( + cmd, + **args, + ) + + # Wait until Anvil is responsive + timeout = time.time() + launch_wait_seconds + + # Use shorter read timeout here - otherwise requests will wait > 10s if something is wrong + web3 = Web3(HTTPProvider(url, request_kwargs={"timeout": test_request_timeout})) + while time.time() < timeout: + try: + # See if web3 RPC works + current_block = web3.eth.block_number + chain_id = web3.eth.chain_id + break + except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e: + logger.info("Anvil not ready, got exception %s", e) + # requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer')) + time.sleep(0.1) + continue + + if current_block is None: + logger.error("Could not read the latest block from anvil %s within %f seconds, shutting down and dumping output", url, launch_wait_seconds) + stdout, stderr = shutdown_hard( + process, + log_level=logging.ERROR, + block=True, + check_port=port, + ) + + if len(stdout) == 0: + attempts_left -= 1 + if attempts_left > 0: + logger.info("anvil did not start properly, try again, attempts left %d", attempts_left) + continue + + raise AssertionError(f"Could not read block number from Anvil after the launch {cmd}: at {url}, stdout is {len(stdout)} bytes, stderr is {len(stderr)} bytes") + else: + # We have a successful launch + break + # Use f-string for a thousand separator formatting + logger.info(f"anvil forked network {chain_id}, the current block is {current_block:,}, Anvil JSON-RPC is {url}") + + # Perform unlock accounts for all accounts + for account in unlocked_addresses: + unlock_account(web3, account) + + return AnvilLaunch(port, final_cmd, url, process) + + +def unlock_account(web3: Web3, address: str): + """Make Anvil mainnet fork to accept transactions to any Ethereum account. + + This is even when we do not have a private key for the account. + + :param web3: + Web3 instance + + :param address: + Account to unlock + """ + web3.provider.make_request("anvil_impersonateAccount", [address]) # type: ignore + + +def sleep(web3: Web3, seconds: int) -> int: + """Call emv_increaseTime on Anvil""" + make_anvil_custom_rpc_request(web3, "evm_increaseTime", [hex(seconds)]) + return seconds + + +def mine(web3: Web3, timestamp: Optional[int] = None) -> None: + """Call evm_setNextBlockTimestamp on Anvil""" + + if timestamp is None: + block = web3.eth.get_block(web3.eth.block_number) + timestamp = block["timestamp"] + 1 + + make_anvil_custom_rpc_request(web3, "evm_mine", [timestamp]) + + +def snapshot(web3: Web3) -> int: + """Call evm_snapshot on Anvil""" + return int(make_anvil_custom_rpc_request(web3, "evm_snapshot", []), 16) + + +def revert(web3: Web3, snapshot_id: int) -> bool: + """Call evm_revert on Anvil + + https://book.getfoundry.sh/reference/anvil/ + + :return: + True if a snapshot was reverted + """ + ret_val = make_anvil_custom_rpc_request(web3, "evm_revert", [snapshot_id]) + return ret_val + + +def dump_state(web3: Web3) -> int: + """Call evm_snapshot on Anvil""" + return make_anvil_custom_rpc_request(web3, "anvil_dumpState") + + +def load_state(web3: Web3, state: str) -> int: + """Call evm_snapshot on Anvil""" + return make_anvil_custom_rpc_request(web3, "anvil_loadState", [state]) + + +# Backwards compatibility +fork_network_anvil = launch_anvil diff --git a/eth_defi/provider/fallback.py b/eth_defi/provider/fallback.py new file mode 100644 index 00000000..d9117246 --- /dev/null +++ b/eth_defi/provider/fallback.py @@ -0,0 +1,161 @@ +"""JSON-RPC provider fallback and redundancy mechanisms. + +- See :py:class:`FallbackProvider` +""" +import enum +import time +from collections import defaultdict, Counter +from typing import List, Any +import logging + +from web3.types import RPCEndpoint, RPCResponse + +from eth_defi.middleware import is_retryable_http_exception, DEFAULT_RETRYABLE_EXCEPTIONS, DEFAULT_RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_RPC_ERROR_CODES +from eth_defi.provider.named import BaseNamedProvider, NamedProvider, get_provider_name + +logger = logging.getLogger(__name__) + + +class FallbackStrategy(enum.Enum): + """Different supported fallback strategies.""" + + #: Automatically switch to the next provider on an error + #: + cycle_on_error = "cycle_on_error" + + +class FallbackProvider(BaseNamedProvider): + """Fault-tolerance for JSON-RPC requests with multiple providers. + + Fall back to the next provider on the list if a JSON-RPC request fails. + Contains build-in retry logic in round-robin manner. + + See also + + - :py:func:`eth_defi.middlware.exception_retry_middleware` + + .. warning:: + + :py:class:`FallbackProvider` does not call any middlewares installed on the providers themselves. + """ + + def __init__( + self, + providers: List[NamedProvider], + strategy=FallbackStrategy.cycle_on_error, + retryable_exceptions=DEFAULT_RETRYABLE_EXCEPTIONS, + retryable_status_codes=DEFAULT_RETRYABLE_HTTP_STATUS_CODES, + retryable_rpc_error_codes=DEFAULT_RETRYABLE_RPC_ERROR_CODES, + sleep: float = 5.0, + backoff: float = 1.6, + retries: int = 6, + ): + """ + :param providers: + List of provider we cycle through. + + :param strategy: + What is the strategy to deal with errors. + + Currently on cycling supported. + + :param retryable_exceptions: + List of exceptions we can retry. + + :param retryable_status_codes: + List of HTTP status codes we can retry. + + :param retryable_rpc_error_codes: + List of GoEthereum error codes we can retry. + + :param sleep: + Seconds between retries. + + :param backoff: + Multiplier to increase sleep. + + :param retries: + How many retries we attempt before giving up. + + """ + + super().__init__() + + self.providers = providers + + for provider in providers: + assert "http_retry_request" not in provider.middlewares, "http_retry_request middleware cannot be used with FallbackProvider" + + #: Currently active provider + self.currently_active_provider = 0 + + self.strategy = strategy + + self.retryable_exceptions = retryable_exceptions + self.retryable_status_codes = retryable_status_codes + self.retryable_rpc_error_codes = retryable_rpc_error_codes + self.sleep = sleep + self.backoff = backoff + self.retries = retries + + #: provider number -> API name -> call count mappings. + # This tracks completed API requests. + self.api_call_counts = defaultdict(Counter) + self.retry_count = 0 + + @property + def endpoint_uri(self): + """Return the active node URI endpoint. + + For :py:class:`HTTPProvider` compatibility. + """ + return self.get_provider().endpoint_uri + + def switch_provider(self): + """""" + self.currently_active_provider = (self.currently_active_provider + 1) % len(self.providers) + + def get_provider(self) -> NamedProvider: + """Get currently active provider.""" + return self.providers[self.currently_active_provider] + + def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + """Make a request. + + - By default use the current active provider + + - If there are errors try cycle through providers and sleep + between cycles until one provider works + """ + current_sleep = self.sleep + for i in range(self.retries): + provider = self.get_provider() + try: + # Call the underlying provider + val = provider.make_request(method, params) + + # Track API counts + self.api_call_counts[self.currently_active_provider][method] += 1 + + return val + + except Exception as e: + if is_retryable_http_exception( + e, + retryable_rpc_error_codes=self.retryable_rpc_error_codes, + retryable_status_codes=self.retryable_status_codes, + retryable_exceptions=self.retryable_exceptions, + ): + old_provider_name = get_provider_name(provider) + self.switch_provider() + new_provider_name = get_provider_name(self.get_provider()) + + if i < self.retries - 1: + logger.warning("Encountered JSON-RPC retryable error %s when calling method %s.\n" "Switching providers %s -> %s\n" "Retrying in %f seconds, retry #%d", e, method, old_provider_name, new_provider_name, current_sleep, i) + time.sleep(current_sleep) + current_sleep *= self.backoff + self.retry_count += 1 + continue + else: + raise # Out of retries + raise # Not retryable exception diff --git a/eth_defi/provider/ganache.py b/eth_defi/provider/ganache.py new file mode 100644 index 00000000..b0d93774 --- /dev/null +++ b/eth_defi/provider/ganache.py @@ -0,0 +1,437 @@ +"""Ganache integration. + +Ganache is an EVM test backend and mainnet forking +written in JavaScript from Truffle project. + +This module contains utilities to automatically launch and +manipulate `ganache-cli` process. + +You need to have `ganache-cli` installed in order to use these. + +How to install ganache-cli using npm: + +.. code-block:: shell + + npm install -g ganache + +For more information about Ganache see + +- `Ganache CLI command line documentation `_ + +- `Aave Web.py example `_ + +- `QuickNode how to fork mainnet with Ganache tutorial `_ + +`Most of this code is lifted from Brownie project (MIT) `_ +and it is not properly cleaned up yet. + +""" + +import datetime +import logging +import re +import shutil +import sys +import time +import warnings +from dataclasses import dataclass +from subprocess import DEVNULL, PIPE +from typing import Dict, List, Tuple, Union + +import psutil +import requests + +from eth_typing import HexAddress +from hexbytes import HexBytes +from psutil import NoSuchProcess +from web3 import HTTPProvider, Web3 +from web3.types import Wei + +from eth_defi.utils import is_localhost_port_listening + + +logger = logging.getLogger(__name__) + + +EVM_EQUIVALENTS = {"atlantis": "byzantium", "agharta": "petersburg"} + +# https://github.com/trufflesuite/ganache +CLI_FLAGS = { + "7": { + "port": "--server.port", + "gas_limit": "--miner.blockGasLimit", + "accounts": "--wallet.totalAccounts", + "evm_version": "--hardfork", + "fork": "--fork.url", + "mnemonic": "--wallet.mnemonic", + "account_keys_path": "--wallet.accountKeysPath", + "block_time": "--miner.blockTime", + "default_balance": "--wallet.defaultBalance", + "time": "--chain.time", + "unlock": "--wallet.unlockedAccounts", + "network_id": "--chain.networkId", + "chain_id": "--chain.chainId", + "unlimited_contract_size": "--chain.allowUnlimitedContractSize", + "quiet": "--logging.quiet", + }, + "<=6": { + "port": "--port", + "gas_limit": "--gasLimit", + "accounts": "--accounts", + "evm_version": "--hardfork", + "fork": "--fork", + "mnemonic": "--mnemonic", + "account_keys_path": "--acctKeys", + "block_time": "--blockTime", + "default_balance": "--defaultBalanceEther", + "time": "--time", + "unlock": "--unlock", + "network_id": "--networkId", + "chain_id": "--chainId", + "unlimited_contract_size": "--allowUnlimitedContractSize", + }, +} + +EVM_VERSIONS = ["byzantium", "constantinople", "petersburg", "istanbul"] + +#: The default hardfork rules used by Ganache +EVM_DEFAULT = "london" + + +class NoGanacheInstalled(Exception): + """We could not launch because ganache-cli command is missing""" + + +class InvalidArgumentWarning(Warning): + """Warned when there are issued with ganache-cli command line.""" + + +def _launch(cmd: str, **kwargs: Dict) -> Tuple[psutil.Popen, List[str]]: + """Launches the RPC client. + + Args: + cmd: command string to execute as subprocess""" + if sys.platform == "win32" and not cmd.split(" ")[0].endswith(".cmd"): + if " " in cmd: + cmd = cmd.replace(" ", ".cmd ", 1) + else: + cmd += ".cmd" + cmd_list = cmd.split(" ") + + ganache_executable = cmd_list[0] + + found = shutil.which(ganache_executable) + if not found: + raise NoGanacheInstalled(f"Could not find ganache-cli installation: {ganache_executable} - are you sure it is installed?") + + ganache_version = _get_ganache_version(ganache_executable) + + if ganache_version <= 6: + cli_flags = CLI_FLAGS["<=6"] + else: + cli_flags = CLI_FLAGS["7"] + # this flag must be true so that reverting tx's return a + # more verbose output similar to what ganache 6 produced + cmd_list.extend(["--chain.vmErrorsOnRPCResponse", "true"]) + + kwargs.setdefault("evm_version", EVM_DEFAULT) # type: ignore + if kwargs["evm_version"] in EVM_EQUIVALENTS: + kwargs["evm_version"] = EVM_EQUIVALENTS[kwargs["evm_version"]] # type: ignore + kwargs = _validate_cmd_settings(kwargs) + for key, value in [(k, v) for k, v in kwargs.items() if v]: + if key == "unlock": + if not isinstance(value, list): + value = [value] # type: ignore + for address in value: + if isinstance(address, int): + address = HexBytes(address.to_bytes(20, "big")).hex() + cmd_list.extend([cli_flags[key], address]) + else: + try: + # Handle boolean options + if value is True: + cmd_list.append(cli_flags[key]) + elif value is not False: + cmd_list.extend([cli_flags[key], str(value)]) + except KeyError: + warnings.warn( + f"Ignoring invalid commandline setting for ganache-cli: '{key}' with value '{value}'.", + InvalidArgumentWarning, + ) + out = DEVNULL if sys.platform == "win32" else PIPE + + logger.info("Launching ganache-cli: %s", " ".join(cmd_list)) + return psutil.Popen(cmd_list, stdin=DEVNULL, stdout=out, stderr=out), cmd_list + + +def _get_ganache_version(ganache_executable: str) -> int: + ganache_version_proc = psutil.Popen([ganache_executable, "--version"], stdout=PIPE) + ganache_version_stdout, _ = ganache_version_proc.communicate() + ganache_version_match = re.search(r"v([0-9]+)\.", ganache_version_stdout.decode()) + if not ganache_version_match: + raise ValueError("could not read ganache version: {}".format(ganache_version_stdout)) + return int(ganache_version_match.group(1)) + + +def _validate_cmd_settings(cmd_settings: dict) -> dict: + ganache_keys = set(k for f in CLI_FLAGS.values() for k in f.keys()) + + CMD_TYPES = { + "port": int, + "gas_limit": int, + "block_time": int, + "time": datetime.datetime, + "accounts": int, + "evm_version": str, + "mnemonic": str, + "account_keys_path": str, + "fork": str, + "network_id": int, + "chain_id": int, + "quiet": bool, + } + for cmd, value in cmd_settings.items(): + if cmd in ganache_keys and cmd in CMD_TYPES.keys() and not isinstance(value, CMD_TYPES[cmd]): + raise TypeError(f"Wrong type for cmd_settings '{cmd}': {value}. Found {type(value).__name__}, but expected {CMD_TYPES[cmd].__name__}.") + + if "default_balance" in cmd_settings: + try: + cmd_settings["default_balance"] = int(cmd_settings["default_balance"]) + except ValueError: + # convert any input to ether, then format it properly + default_eth = Wei(cmd_settings["default_balance"]).to("ether") + cmd_settings["default_balance"] = default_eth.quantize(1) if default_eth > 1 else default_eth.normalize() + return cmd_settings + + +@dataclass +class GanacheLaunch: + """Control ganache-cli processes launched on background. + + Comes with a helpful :py:meth:`close` method when it is time to put Ganache rest. + """ + + #: Which port was bound by the ganache + port: int + + #: Used command-line to spin up ganache-cli + cmd: List[str] + + #: Where does Ganache listen to JSON-RPC + json_rpc_url: str + + #: UNIX process that we opened + process: psutil.Popen + + def close(self, verbose=False, block=True, block_timeout=30): + """Kill the ganache-cli process. + + Ganache is pretty hard to kill, so keep killing it until it dies and the port is free again. + + :param block: Block the execution until Ganache has terminated + :param block_timeout: How long we give for Ganache to clean up after itself + :param verbose: If set, dump anything in Ganache stdout to the Python logging using level `INFO`. + """ + + process = self.process + if verbose: + # TODO: This does not seem to work on macOS, + # but is fine on Ubuntu on Github CI + logger.info("Dumping Ganache output") + if process.poll() is not None: + output = process.communicate()[0].decode("utf-8") + for line in output.split("\n"): + logger.info(line) + + # process.terminate() + # Hahahahah, this is Ganache, do you think terminate signal is enough + try: + process.kill() + except NoSuchProcess: + raise AssertionError("ganache died on its own :(") + + if block: + deadline = time.time() + 30 + while time.time() < deadline: + if not is_localhost_port_listening(self.port): + # Port released, assume Ganache is gone + return + + raise AssertionError(f"Could not terminate ganache in {block_timeout} seconds") + + +def fork_network( + json_rpc_url: str, + unlocked_addresses: List[Union[HexAddress, str]] = [], + cmd="ganache-cli", + port=19999, + evm_version=EVM_DEFAULT, + block_time=0, + quiet=False, + launch_wait_seconds=20.0, +) -> GanacheLaunch: + """Creates the ganache "fork" of given JSON-RPC endpoint. + + .. warning:: + + This function is not recommended due to stability issues with Ganache. + Use :py:func:`eth_defi.anvil.fork_network_anvil` instead. + + Forking a mainnet is common way to test against live deployments. + This function invokes `ganache-cli` command and tells it to fork a given JSON-RPC endpoint. + + A subprocess is started on the background. To stop this process, call :py:meth:`eth_defi.ganache.GanacheLaunch.close`. + This function waits `launch_wait_seconds` in order to `ganache-cli` process to start + and complete the chain fork. + + .. note :: + + Currently only supports HTTP JSON-RPC connections. + + .. warning :: + + Forking a network with ganache-cli is a slow process. It is recommended + that you use fast Ethereum Tester based testing if possible. + + Here is an example that forks BNB chain mainnet and transfer 500 BUSD stablecoin to a test + account we control: + + .. code-block:: python + + @pytest.fixture() + def large_busd_holder() -> HexAddress: + # A random account picked from BNB Smart chain that holds a lot of BUSD. + # Binance Hot Wallet 6 + return HexAddress(HexStr("0x8894E0a0c962CB723c1976a4421c95949bE2D4E3")) + + + @pytest.fixture() + def ganache_bnb_chain_fork(large_busd_holder) -> str: + # Create a testable fork of live BNB chain. + mainnet_rpc = os.environ["BNB_CHAIN_JSON_RPC"] + launch = fork_network( + mainnet_rpc, + unlocked_addresses=[large_busd_holder]) + yield launch.json_rpc_url + # Wind down Ganache process after the test is complete + launch.close() + + + @pytest.fixture + def web3(ganache_bnb_chain_fork: str): + # Set up a local unit testing blockchain + return Web3(HTTPProvider(ganache_bnb_chain_fork)) + + + def test_mainnet_fork_transfer_busd(web3: Web3, large_busd_holder: HexAddress, user_1: LocalAccount): + + # BUSD deployment on BNB chain + # https://bscscan.com/token/0xe9e7cea3dedca5984780bafc599bd69add087d56 + busd_details = fetch_erc20_details(web3, "0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56") + busd = busd_details.contract + + # Transfer 500 BUSD to the user 1 + tx_hash = busd.functions.transfer(user_1.address, 500*10**18).transact({"from": large_busd_holder}) + + # Because Ganache has instamine turned on by default, we do not need to wait for the transaction + receipt = web3.eth.get_transaction_receipt(tx_hash) + assert receipt.status == 1, "BUSD transfer reverted" + + assert busd.functions.balanceOf(user_1.address).call() == 500*10**18 + + `See the full example in tests source code `_. + + Polygon needs to set a specific EVM version: + + .. code-block:: python + + mainnet_rpc = os.environ["POLYGON_JSON_RPC"] + launch = fork_network(mainnet_rpc, evm_version="istanbul") + + If `ganache-cli` refuses to terminate properly, you can kill a process by a port with: + + .. code-block:: shell + + # Kill any process listening to localhost:19999 + kill -SIGKILL $(lsof -ti:19999) + + This function uses Python logging subsystem. If you want to see error/info/debug logs with `pytest` you can do: + + .. code-block:: shell + + pytest --log-cli-level=debug + + For public JSON-RPC endpoints check + + - `BNB chain documentation `_ + + - `ethereumnodes.com `_ + + :param cmd: Override `ganache-cli` command. If not given we look up from `PATH`. + :param json_rpc_url: HTTP JSON-RPC URL of the network we want to fork + :param unlocked_addresses: List of addresses of which ownership we take to allow test code to transact as them + :param port: Localhost port we bind for Ganache JSON-RPC + :param launch_wait_seconds: How long we wait ganache-cli to start until giving up + :param evm_version: "london" for the default hard fork + :param block_time: + How long Ganache takes to mine a block. Default is zero and any RPC transaction + will immediately return with the transaction inclusion. + Set to `1` so that you can poll the transaction as you would do with + a live JSON-RPC node. + :param quiet: + Disable extensive logging. If there is a lot of Ganache logging it seems to crash + on Github CI. + + """ + + assert not is_localhost_port_listening(port), f"localhost port {port} occupied - you might have a zombie Ganache around" + + url = f"http://localhost:{port}" + + process, final_cmd = _launch( + cmd, + port=port, + fork=json_rpc_url, + unlock=unlocked_addresses, + evm_version=evm_version, + block_time=block_time, + quiet=quiet, + ) + + # Wait until Ganache is responsive + timeout = time.time() + launch_wait_seconds + current_block = None + + # Use short 1.0s HTTP read timeout here - otherwise requests will wa-it > 10s if something is wrong + web3 = Web3(HTTPProvider(url, request_kwargs={"timeout": 1.0})) + while time.time() < timeout: + if process.poll() is not None: + output = process.communicate()[0].decode("utf-8") + for line in output.split("\n"): + logger.error(line) + raise AssertionError(f"ganache-cli died on launch, used command was {final_cmd}") + + try: + current_block = web3.eth.block_number + break + except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout): + # requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer')) + time.sleep(0.1) + continue + + if current_block is None: + if process.poll() is not None: + output = process.communicate()[0].decode("utf-8") + for line in output.split("\n"): + logger.error(line) + raise AssertionError(f"ganache-cli died on launch, used command was {final_cmd}") + + logger.error("Could not read the latest block from ganache-cli within %f seconds", launch_wait_seconds) + raise AssertionError(f"Could not connect to ganache-cli {cmd}: at {url}") + + chain_id = web3.eth.chain_id + + # Use f-string for thousand separator formatting + logger.info(f"ganache-cli forked network %d, the current block is {current_block:,}, Ganache JSON-RPC is %s", chain_id, url) + + return GanacheLaunch(port, final_cmd, url, process) diff --git a/eth_defi/provider/mev_blocker.py b/eth_defi/provider/mev_blocker.py new file mode 100644 index 00000000..4ed4baae --- /dev/null +++ b/eth_defi/provider/mev_blocker.py @@ -0,0 +1,76 @@ +"""MEV blocking RPC provider functionality. + +`Malicious Extractable Value (MEV) `__ +is a problem for all trading activity on EVM-based blockchains. + +It can be mitigated by using a special +JSON-RPC node that provides a private mempool. + +This module provides methods to create special +:py:class:`web3.Web3` instances that + +- Use MEV blocking JSON-RPC endpoint for all transactions + +- Normal JSON-RPC node for reading data from the blockchain + +""" +from collections import Counter +from typing import Any + +from web3.providers import JSONBaseProvider +from web3.types import RPCEndpoint, RPCResponse + +from eth_defi.provider.named import BaseNamedProvider + +#: List of RPC methods that execution transactions +#: +TRANSACT_METHODS = ( + "eth_sendTransaction", + "eth_sendRawTransaction", +) + + +class MEVBlockerProvider(BaseNamedProvider): + """Routes methods that execute transaction through a special MEV proof endpoint. + + - Depending on whether we are sending a transaction or reading from the blockchain, + switch between the JSON-RPC endpoint. + + - Route all outgoing transactions through a special MEV blocker endpoint + """ + + def __init__( + self, + call_provider: JSONBaseProvider, + transact_provivder: JSONBaseProvider, + transact_methods=TRANSACT_METHODS, + ): + super().__init__() + self.call_provider = call_provider + self.transact_provider = transact_provivder + self.transact_methods = transact_methods + + #: Keep tabs on how much API traffic we generate through each endpoint + self.provider_counter = Counter( + { + "call": 0, + "transact": 0, + } + ) + + def is_transact_method(self, method: RPCEndpoint) -> bool: + """Does this RPC method do a transaction""" + return method in self.transact_methods + + def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + if self.is_transact_method(method): + self.provider_counter["transact"] += 1 + return self.transact_provider.make_request(method, params) + else: + self.provider_counter["call"] += 1 + return self.call_provider.make_request(method, params) + + @property + def endpoint_uri(self) -> str: + """Map us to the transact provider by the default""" + return self.transact_provider.endpoint_uri diff --git a/eth_defi/provider/named.py b/eth_defi/provider/named.py new file mode 100644 index 00000000..c1052b15 --- /dev/null +++ b/eth_defi/provider/named.py @@ -0,0 +1,55 @@ +"""Helper methods to extract the URL endpoint and name of a provider. + +See also + +- :py:mod:`eth_defi.provider.mev_blocker` + +- :py:mod:`eth_defi.provider.fallback` + +""" + +from abc import abstractproperty, ABC, abstractmethod +from typing import TypeAlias + +from web3 import HTTPProvider +from web3.providers import JSONBaseProvider + +from eth_defi.utils import get_url_domain + + +class BaseNamedProvider(ABC, JSONBaseProvider): + """A base class for getting a JSON-RPC provider name and URL.""" + + @property + @abstractmethod + def endpoint_uri(self) -> str: + """Return the active node URI endpoint. + + .. warning:: + + Endpoint URIs often contain API keys. + They should be never publicly displayed as is. + + """ + + +#: Named providers including web3.py core providers +NamedProvider: TypeAlias = BaseNamedProvider | HTTPProvider + + +def get_provider_name(provider: NamedProvider) -> str: + """Get loggable name of the JSON-RPC provider. + + Strips out API keys from the URL of a JSON-RPC API provider. + + Supports :py:mod:`eth_defi` customer provider classes + as well as :py:mod:`web3` core providers. + + :return: + HTTP provider URL's domain name if available. + + Assume any API keys are not part of the domain name. + """ + if isinstance(provider, HTTPProvider): + return get_url_domain(provider.endpoint_uri) + return str(provider) diff --git a/eth_defi/utils.py b/eth_defi/utils.py index 523c524a..9afdee6e 100644 --- a/eth_defi/utils.py +++ b/eth_defi/utils.py @@ -5,6 +5,7 @@ import socket import time from typing import Optional, Tuple +from urllib.parse import urlparse import psutil @@ -125,3 +126,12 @@ def to_unix_timestamp(dt: datetime.datetime) -> float: """ # https://stackoverflow.com/a/5499906/315168 return calendar.timegm(dt.utctimetuple()) + + +def get_url_domain(url: str) -> str: + """Redact URL so that only domain is displayed. + + Some services e.g. infura use path as an API key. + """ + parsed = urlparse(url) + return parsed.hostname diff --git a/tests/test_event_reader.py b/tests/test_event_reader.py index caa10859..3a71fffc 100644 --- a/tests/test_event_reader.py +++ b/tests/test_event_reader.py @@ -157,11 +157,8 @@ def test_read_events_concurrent_two_blocks_concurrent(web3): assert max(blocks) == 37898276 - def test_read_events_lazy_timestamp(web3): - """Read events but extract timestamps only for events, not whole block ranges. - - """ + """Read events but extract timestamps only for events, not whole block ranges.""" # Get contracts Pair = get_contract(web3, "sushi/UniswapV2Pair.json") diff --git a/tests/test_fallback_provider.py b/tests/test_fallback_provider.py new file mode 100644 index 00000000..857d7cde --- /dev/null +++ b/tests/test_fallback_provider.py @@ -0,0 +1,110 @@ +"""Test JSON-RPC provider fallback mechanism.""" +from unittest.mock import patch, DEFAULT + +import pytest +import requests +from web3 import HTTPProvider, Web3 + +from eth_defi.anvil import launch_anvil, AnvilLaunch +from eth_defi.provider.fallback import FallbackProvider + + +@pytest.fixture(scope="module") +def anvil() -> AnvilLaunch: + """Launch Anvil for the test backend.""" + anvil = launch_anvil() + try: + yield anvil + finally: + anvil.close() + + +@pytest.fixture() +def provider_1(anvil): + provider = HTTPProvider(anvil.json_rpc_url) + provider.middlewares.clear() + return provider + + +@pytest.fixture() +def provider_2(anvil): + provider = HTTPProvider(anvil.json_rpc_url) + provider.middlewares.clear() + return provider + + +@pytest.fixture() +def fallback_provider(provider_1, provider_2) -> FallbackProvider: + provider = FallbackProvider([provider_1, provider_2], sleep=0.1, backoff=1) + return provider + + +def test_fallback_no_issue(anvil: AnvilLaunch, fallback_provider: FallbackProvider): + """Callback goes through the first provider""" + web3 = Web3(fallback_provider) + assert fallback_provider.api_call_counts[0]["eth_blockNumber"] == 0 + assert fallback_provider.api_call_counts[1]["eth_blockNumber"] == 0 + assert fallback_provider.currently_active_provider == 0 + assert fallback_provider.endpoint_uri == anvil.json_rpc_url + web3.eth.block_number + assert fallback_provider.api_call_counts[0]["eth_blockNumber"] == 1 + assert fallback_provider.api_call_counts[1]["eth_blockNumber"] == 0 + assert fallback_provider.currently_active_provider == 0 + + +def test_fallback_single_fault(fallback_provider: FallbackProvider, provider_1): + """Fallback goes through the second provider when first fails""" + + web3 = Web3(fallback_provider) + + with patch.object(provider_1, "make_request", side_effect=requests.exceptions.ConnectionError): + web3.eth.block_number + + assert fallback_provider.api_call_counts[0]["eth_blockNumber"] == 0 + assert fallback_provider.api_call_counts[1]["eth_blockNumber"] == 1 + assert fallback_provider.currently_active_provider == 1 + + +def test_fallback_double_fault(fallback_provider: FallbackProvider, provider_1, provider_2): + """Fallback fails on both providers.""" + + web3 = Web3(fallback_provider) + + with patch.object(provider_1, "make_request", side_effect=requests.exceptions.ConnectionError), patch.object(provider_2, "make_request", side_effect=requests.exceptions.ConnectionError): + with pytest.raises(requests.exceptions.ConnectionError): + web3.eth.block_number + + assert fallback_provider.retry_count == 5 + + +def test_fallback_double_fault_recovery(fallback_provider: FallbackProvider, provider_1, provider_2): + """Fallback fails on both providers, but then recover.""" + + web3 = Web3(fallback_provider) + + count = 0 + + def borg_start(*args, **kwargs): + nonlocal count + count += 1 + if count <= 2: + raise requests.exceptions.ConnectionError() + return DEFAULT + + with patch.object(provider_1, "make_request", side_effect=borg_start), patch.object(provider_2, "make_request", side_effect=borg_start): + web3.eth.block_number + + assert fallback_provider.api_call_counts[0]["eth_blockNumber"] == 1 + assert fallback_provider.api_call_counts[1]["eth_blockNumber"] == 0 + assert fallback_provider.retry_count == 2 + assert fallback_provider.currently_active_provider == 0 + + +def test_fallback_unhandled_exception(fallback_provider: FallbackProvider, provider_1): + """Exception fallback provider cannot handle""" + + web3 = Web3(fallback_provider) + + with patch.object(provider_1, "make_request", side_effect=RuntimeError): + with pytest.raises(RuntimeError): + web3.eth.block_number diff --git a/tests/test_mev_blocker.py b/tests/test_mev_blocker.py new file mode 100644 index 00000000..9eef9427 --- /dev/null +++ b/tests/test_mev_blocker.py @@ -0,0 +1,75 @@ +"""Test MEV blocker provider switching.""" +import pytest +from web3 import HTTPProvider, Web3 + +from eth_defi.provider.anvil import launch_anvil, AnvilLaunch +from eth_defi.provider.mev_blocker import MEVBlockerProvider + +from eth_defi.hotwallet import HotWallet +from eth_defi.trace import assert_transaction_success_with_explanation +from eth_defi.uniswap_v2.utils import ZERO_ADDRESS + + +@pytest.fixture(scope="module") +def anvil() -> AnvilLaunch: + """Launch Anvil for the test backend.""" + anvil = launch_anvil() + try: + yield anvil + finally: + anvil.close() + + +@pytest.fixture() +def mev_blocker_provider(anvil: AnvilLaunch) -> MEVBlockerProvider: + provider = MEVBlockerProvider( + call_provider=HTTPProvider(anvil.json_rpc_url), + transact_provivder=HTTPProvider(anvil.json_rpc_url), + ) + return provider + + +def test_mev_blocker_call(mev_blocker_provider: MEVBlockerProvider): + """Read only methods route through the call provider""" + web3 = Web3(mev_blocker_provider) + block_number = web3.eth.block_number + assert block_number == 0 + assert mev_blocker_provider.provider_counter["call"] == 1 + assert mev_blocker_provider.provider_counter["transact"] == 0 + + +def test_mev_blocker_send_transaction(mev_blocker_provider: MEVBlockerProvider): + """eth_sendTransaction goes through the MEV blocker""" + web3 = Web3(mev_blocker_provider) + account = web3.eth.accounts[0] + assert mev_blocker_provider.provider_counter["call"] == 1 + assert mev_blocker_provider.provider_counter["transact"] == 0 + tx_hash = web3.eth.send_transaction({"to": ZERO_ADDRESS, "from": account, "value": 1}) + assert_transaction_success_with_explanation(web3, tx_hash) + assert mev_blocker_provider.provider_counter["call"] == 8 # Account for various gas cost methods + assert mev_blocker_provider.provider_counter["transact"] == 1 + + +def test_mev_blocker_send_transaction_raw(mev_blocker_provider: MEVBlockerProvider): + """eth_sendTransactionRaw goes through the MEV blocker""" + + web3 = Web3(mev_blocker_provider) + wallet = HotWallet.create_for_testing(web3) + + signed_tx = wallet.sign_transaction_with_new_nonce( + { + "from": wallet.address, + "to": ZERO_ADDRESS, + "value": 1, + "gas": 100_000, + "gasPrice": web3.eth.gas_price, + } + ) + + # Account for setup API counts from create_for_testing() + assert mev_blocker_provider.provider_counter["call"] == 10 + assert mev_blocker_provider.provider_counter["transact"] == 1 + tx_hash = web3.eth.send_raw_transaction(signed_tx.rawTransaction) + assert_transaction_success_with_explanation(web3, tx_hash) + assert mev_blocker_provider.provider_counter["call"] == 11 + assert mev_blocker_provider.provider_counter["transact"] == 2