Skip to content

Commit

Permalink
Working on RPC fallback and fault tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
miohtama committed Jul 22, 2023
1 parent 90b6fcd commit bf8fb41
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 3 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Current

- Add MEV blocking support
- Add JSON-RPC fallback switching

# 0.21.8

- Add test coverage for `extract_timestamps_json_rpc_lazy`
Expand Down
37 changes: 36 additions & 1 deletion eth_defi/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import requests
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware
from web3.providers import JSONBaseProvider
from web3.types import RPCEndpoint, RPCResponse
from web3.datastructures import NamedElementOnion

from eth_defi.event_reader.conversion import convert_jsonrpc_value_to_int
from eth_defi.middleware import http_retry_request_with_sleep_middleware
Expand Down Expand Up @@ -106,7 +108,7 @@ def install_api_call_counter_middleware(web3: Web3) -> Counter:
assert counter["eth_blockNumber"] == 1
:return:
Counter object with columns per RPC endpoint and "toal"
Counter object with columns per RPC endpoint and "total"
"""
api_counter = Counter()

Expand All @@ -122,6 +124,39 @@ def middleware(method: RPCEndpoint, params: Any) -> Optional[RPCResponse]:
return api_counter


def install_api_call_counter_middleware_on_provider(provider: JSONBaseProvider) -> Counter:
"""Install API call counter middleware on a specific API provider.
Allows per-provider API call counting when using complex
provider setups.
See also
- :py:func:`install_api_call_counter_middleware`
- :py:class:`eth_defi.fallback_provider.FallbackProvider`
:return:
Counter object with columns per RPC endpoint and "total"
"""

assert isinstance(provider, JSONBaseProvider), f"Got {provider.__class__}"

api_counter = Counter()

def factory(make_request: Callable[[RPCEndpoint, Any], Any], web3: "Web3"):
import ipdb; ipdb.set_trace()
def middleware(method: RPCEndpoint, params: Any) -> Optional[RPCResponse]:
api_counter[method] += 1
api_counter["total"] += 1
return make_request(method, params)

return middleware

provider.middlewares.add("api_counter_middleware", factory)
return api_counter


def has_graphql_support(provider: HTTPProvider) -> bool:
"""Check if a node has GoEthereum GraphQL API turned on.
Expand Down
158 changes: 158 additions & 0 deletions eth_defi/fallback_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Fallback JSON-RPC provider mechanics.
If one JSON-RPC endpoint fails, automatically move to the next one.
"""
import enum
import time
from collections import defaultdict, Counter
from typing import List, Any
import logging

from web3 import HTTPProvider
from web3.providers import JSONBaseProvider
from web3.types import RPCEndpoint, RPCResponse

from eth_defi.middleware import is_retryable_http_exception, DEFAULT_RETRYABLE_EXCEPTIONS, DEFAULT_RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_RPC_ERROR_CODES
from eth_defi.utils import get_url_domain


logger = logging.getLogger(__name__)


class FallbackStrategy(enum.Enum):

#: Automatically switch to the next provider on an error
#:
cycle_on_error = "cycle_on_error"



class FallbackProvider(JSONBaseProvider):
"""Fall back to the next provder in the list if a JSON-RPC request fails.
Contains build-in retry logic in round robin manner.
See also
- :py:func:`eth_defi.middlware.exception_retry_middleware`
.. warning::
:py:class:`FallbackProvider` does not call any middlewares installed on providers themselves.
"""

def __init__(
self,
providers: List[JSONBaseProvider],
strategy=FallbackStrategy.cycle_on_error,
retryable_exceptions=DEFAULT_RETRYABLE_EXCEPTIONS,
retryable_status_codes=DEFAULT_RETRYABLE_HTTP_STATUS_CODES,
retryable_rpc_error_codes= DEFAULT_RETRYABLE_RPC_ERROR_CODES,
sleep: float = 5.0,
backoff: float = 1.6,
retries: int = 6,
):
"""
:param providers:
List of provider we cycle through.
:param strategy:
:param retryable_exceptions:
:param retryable_status_codes:
:param retryable_rpc_error_codes:
:param sleep:
:param backoff:
:param retries:
"""
self.providers = providers

for provider in providers:
assert "http_retry_request" not in provider.middlewares, "http_retry_request middleware cannot be used with FallbackProvider"

#: Currently active provider
self.currently_active_provider = 0

self.strategy = strategy

self.retryable_exceptions = retryable_exceptions
self.retryable_status_codes = retryable_status_codes
self.retryable_rpc_error_codes = retryable_rpc_error_codes
self.sleep = sleep
self.backoff = backoff
self.retries = retries

#: provider number -> API name -> call count mappings.
# This tracks completed API requests.
self.api_call_counts = defaultdict(Counter)
self.retry_count = 0

def switch_provider(self):
""""""
self.currently_active_provider = (self.currently_active_provider + 1) % len(self.providers)

def get_provider(self) -> JSONBaseProvider:
"""Get currently active provider."""
return self.providers[self.currently_active_provider]

def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
"""Make a request.
- By default use the current active provider
- If there are errors try cycle through providers and sleep
between cycles until one provider works
"""
current_sleep = self.sleep
for i in range(self.retries):
provider = self.get_provider()
try:

# Call the underlying provider
val = provider.make_request(method, params)

# Track API counts
self.api_call_counts[self.currently_active_provider][method] += 1

return val

except Exception as e:

if is_retryable_http_exception(
e,
retryable_rpc_error_codes=self.retryable_rpc_error_codes,
retryable_status_codes=self.retryable_status_codes,
retryable_exceptions=self.retryable_exceptions,
):

old_provider_name = _get_provider_name(provider)
self.switch_provider()
new_provider_name = _get_provider_name(self.get_provider())

if i < self.retries - 1:
logger.warning(
"Encountered JSON-RPC retryable error %s when calling method %s.\n"
"Switching providers %s -> %s\n"
"Retrying in %f seconds, retry #%d",
e, method,
old_provider_name, new_provider_name,
current_sleep, i)
time.sleep(current_sleep)
current_sleep *= self.backoff
self.retry_count += 1
continue
else:
raise # Out of retries
raise # Not retryable exception


def _get_provider_name(provider: JSONBaseProvider) -> str:
"""Get loggable name of the JSON-RPC provider.
:return:
HTTP provider URL's domain name if available.
Assume any API keys are not part of the domain name.
"""
if isinstance(provider, HTTPProvider):
return get_url_domain(provider.endpoint_uri)
return str(provider)
24 changes: 22 additions & 2 deletions eth_defi/hotwallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,28 @@ def sign_transaction_with_new_nonce(self, tx: dict) -> SignedTransactionWithNonc
Example:
.. code-block:: python
:param: Ethereum transaction data as a dict. This is modified in-place to include nonce.
web3 = Web3(mev_blocker_provider)
wallet = HotWallet.create_for_testing(web3)
# Send some ETH to zero address from
# the hot wallet
signed_tx = wallet.sign_transaction_with_new_nonce({
"from": wallet.address,
"to": ZERO_ADDRESS,
"value": 1,
"gas": 100_000,
"gasPrice": web3.eth.gas_price,
})
tx_hash = web3.eth.send_raw_transaction(signed_tx.rawTransaction)
:param tx:
Ethereum transaction data as a dict.
This is modified in-place to include nonce.
:return:
A transaction payload and nonce with used to generate this transaction.
"""
assert type(tx) == dict
assert "nonce" not in tx
Expand Down Expand Up @@ -174,6 +193,7 @@ def create_for_testing(web3: Web3, test_account_n=0, eth_amount=10):
"""Creates a new hot wallet and seeds it with ETH from one of well-known test accounts.
Shortcut method for unit testing.
"""
wallet = HotWallet.from_private_key("0x" + secrets.token_hex(32))
tx_hash = web3.eth.send_transaction({
Expand Down
8 changes: 8 additions & 0 deletions eth_defi/mev_blocker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
"""MEV blocking RPC provider functionality.
Malicious Extractable Value (MEV) is a nuisance on all
EVM-based blockchains. It can be mitigated by using a special
JSON-RPC node that provides a private mempool.
This module provides methods to create special
:py:class:`web3.Web3` instances that use MEV blocking
JSON-RPC endpoint for all transactions, but a normal JSON-RPC
node for reading data from the blockchain.
"""
from collections import Counter
from typing import Any
Expand Down
10 changes: 10 additions & 0 deletions eth_defi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import socket
import time
from typing import Optional, Tuple
from urllib.parse import urlparse

import psutil

Expand Down Expand Up @@ -125,3 +126,12 @@ def to_unix_timestamp(dt: datetime.datetime) -> float:
"""
# https://stackoverflow.com/a/5499906/315168
return calendar.timegm(dt.utctimetuple())


def get_url_domain(url: str) -> str:
"""Redact URL so that only domain is displayed.
Some services e.g. infura use path as an API key.
"""
parsed = urlparse(url)
return parsed.hostname
91 changes: 91 additions & 0 deletions tests/test_fallback_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Test JSON-RPC provider fallback mechanism."""
from unittest.mock import patch

import pytest
import requests
from requests import HTTPError
from web3 import HTTPProvider, Web3

from eth_defi.anvil import launch_anvil, AnvilLaunch
from eth_defi.fallback_provider import FallbackProvider


@pytest.fixture(scope="session")
def anvil() -> AnvilLaunch:
"""Launch Anvil for the test backend."""
anvil = launch_anvil()
try:
yield anvil
finally:
anvil.close()


@pytest.fixture()
def provider_1(anvil):
provider = HTTPProvider(anvil.json_rpc_url)
provider.middlewares.clear()
return provider


@pytest.fixture()
def provider_2(anvil):
provider = HTTPProvider(anvil.json_rpc_url)
provider.middlewares.clear()
return provider


@pytest.fixture()
def fallback_provider(provider_1, provider_2) -> FallbackProvider:
provider = FallbackProvider([provider_1, provider_2], sleep=0.1, backoff=1)
return provider


def test_fallback_no_issue(fallback_provider: FallbackProvider):
"""Callback goes through the first provider """
web3 = Web3(fallback_provider)
assert fallback_provider.api_call_counts[0]["eth_blockNumber"] == 0
assert fallback_provider.api_call_counts[1]["eth_blockNumber"] == 0
assert fallback_provider.currently_active_provider == 0
web3.eth.block_number
assert fallback_provider.api_call_counts[0]["eth_blockNumber"] == 1
assert fallback_provider.api_call_counts[1]["eth_blockNumber"] == 0
assert fallback_provider.currently_active_provider == 0


def test_fallback_single_fault(fallback_provider: FallbackProvider, provider_1):
"""Fallback goes through the second provider when first fails"""

web3 = Web3(fallback_provider)

with patch.object(provider_1, "make_request", side_effect=requests.exceptions.ConnectionError):
web3.eth.block_number

assert fallback_provider.api_call_counts[0]["eth_blockNumber"] == 0
assert fallback_provider.api_call_counts[1]["eth_blockNumber"] == 1
assert fallback_provider.currently_active_provider == 1


def test_fallback_double_fault(fallback_provider: FallbackProvider, provider_1, provider_2):
"""Fallback fails on both providers."""

web3 = Web3(fallback_provider)

with patch.object(provider_1, "make_request", side_effect=requests.exceptions.ConnectionError), \
patch.object(provider_2, "make_request", side_effect=requests.exceptions.ConnectionError):

with pytest.raises(requests.exceptions.ConnectionError):
web3.eth.block_number

assert fallback_provider.retry_count == 5


def test_fallback_double_fault_recovery(fallback_provider: FallbackProvider, provider_1, provider_2):
"""Fallback fails on both providers, but then recover."""

web3 = Web3(fallback_provider)

with patch.object(provider_1, "make_request", side_effect=requests.exceptions.ConnectionError), \
patch.object(provider_2, "make_request", side_effect=requests.exceptions.ConnectionError):

with pytest.raises(requests.exceptions.ConnectionError):
web3.eth.block_number

0 comments on commit bf8fb41

Please sign in to comment.