Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback provider dianogstics info #177

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eth_defi/event_reader/lazy_timestamp_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,4 @@ def on_block_data(self, block_hash, block_number, timestamp):
self.count += 1

def get_count(self) -> int:
return self.count
return self.count
32 changes: 16 additions & 16 deletions eth_defi/event_reader/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ class MultithreadEventReader(Web3EventReader):
"""

def __init__(
self,
json_rpc_url: str,
max_threads=10,
reader_context: Any = None,
api_counter=True,
max_blocks_once=50_000,
reorg_mon: Optional[ReorganisationMonitor] = None,
notify: Optional[ProgressUpdate] = None,
auto_close_notify=True,
self,
json_rpc_url: str,
max_threads=10,
reader_context: Any = None,
api_counter=True,
max_blocks_once=50_000,
reorg_mon: Optional[ReorganisationMonitor] = None,
notify: Optional[ProgressUpdate] = None,
auto_close_notify=True,
):
"""Creates a multithreaded JSON-RPC reader pool.

Expand Down Expand Up @@ -227,13 +227,13 @@ def close(self):
self.notify.close()

def __call__(
self,
web3: ReaderConnection,
start_block: int,
end_block: int,
events: Optional[List[ContractEvent]] = None,
filter: Optional[Filter] = None,
extract_timestamps: Optional[Callable] = None,
self,
web3: ReaderConnection,
start_block: int,
end_block: int,
events: Optional[List[ContractEvent]] = None,
filter: Optional[Filter] = None,
extract_timestamps: Optional[Callable] = None,
) -> Iterable[LogResult]:
"""Wrap the underlying low-level function.

Expand Down
1 change: 0 additions & 1 deletion eth_defi/event_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,6 @@ def read_events(
last_timestamp = None

for block_num in range(start_block, end_block + 1, chunk_size):

last_of_chunk = min(end_block, block_num + chunk_size - 1)

logger.debug("Extracting eth_getLogs from %d - %d", block_num, last_of_chunk)
Expand Down
10 changes: 2 additions & 8 deletions eth_defi/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,12 @@
# cannot handle gracefully.
# ValueError: {'message': 'Internal JSON-RPC error.', 'code': -32603}
-32603,

# ValueError: {'code': -32000, 'message': 'nonce too low'}.
# Might happen when we are broadcasting multiple transactions through multiple RPC providers
# using eth_sendRawTransaction
# One provide has not yet seeing a transaction broadcast through the other provider.
# CRAP! -32000 is also Execution reverted on Alchemy.
# -32000,

# ValueError: {'code': -32003, 'message': 'nonce too low'}.
# Anvil variant for nonce too low, same as above
-32003,
Expand All @@ -128,9 +126,7 @@
#:
#: See :py:data:`DEFAULT_RETRYABLE_RPC_ERROR_CODES`.
#:
DEFAULT_RETRYABLE_RPC_ERROR_MESSAGES = {
"nonce too low"
}
DEFAULT_RETRYABLE_RPC_ERROR_MESSAGES = {"nonce too low"}

#: Ethereum JSON-RPC calls where the value never changes
#:
Expand Down Expand Up @@ -184,7 +180,6 @@ def is_retryable_http_exception(
if len(exc.args) > 0:
arg = exc.args[0]
if type(arg) == dict:

code = arg.get("code")
message = arg.get("message", "")

Expand Down Expand Up @@ -411,8 +406,8 @@ def static_call_cache_middleware(
The cache is web3 instance itself, to allow sharing the cache
between different JSON-RPC providers.
"""
def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:

def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
cache = getattr(web3, "static_call_cache", {})
if method in STATIC_CALL_LIST:
cached = cache.get(method)
Expand All @@ -425,4 +420,3 @@ def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
return resp

return middleware

84 changes: 83 additions & 1 deletion eth_defi/provider/fallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,60 @@

- See :py:class:`FallbackProvider`
"""
import datetime
import enum
import time
from collections import defaultdict, Counter
from typing import List, Any, cast, Dict
from typing import List, Any, cast, Dict, TypedDict
import logging

from web3 import Web3
from web3.types import RPCEndpoint, RPCResponse

from eth_defi.event_reader.conversion import convert_jsonrpc_value_to_int
from eth_defi.middleware import is_retryable_http_exception, DEFAULT_RETRYABLE_EXCEPTIONS, DEFAULT_RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_RPC_ERROR_CODES, ProbablyNodeHasNoBlock
from eth_defi.provider.named import BaseNamedProvider, NamedProvider, get_provider_name

logger = logging.getLogger(__name__)


class DiagnosticsInfo(TypedDict):
"""Fallback provider diangnostics info.

Per JSON-RPC provider stats in a human-readable format to give helpful diagnostics
information in troubleshooting.

Designed to be human-readable.

See py:meth:`FallbackProvider.get_diagnostics_info`.
"""

index: int

name: str

#: JSON-RPC provider URL, including the API key
url: str

#: Is this currently selected provider
active: bool

#: Is this provider up-to-date with the chain
last_block_number: int | None

#: If getting the last block number failed
last_block_error: str | None

#: API call count
call_count: int

#: API retry count
retry_count: int

#: When did we call this provider last time
last_call: datetime.datetime | None


class FallbackStrategy(enum.Enum):
"""Different supported fallback strategies."""

Expand Down Expand Up @@ -120,6 +159,9 @@ def __init__(
#: provider number-> api method name -> retry counts dict
self.api_retry_counts = defaultdict(Counter)

#: provider number -> UTC datetime mapping
self.api_last_call = defaultdict(lambda: None)

self.retry_count = 0
self.switchover_noisiness = switchover_noisiness

Expand Down Expand Up @@ -194,6 +236,7 @@ def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:

# Track API counts
self.api_call_counts[self.currently_active_provider][method] += 1
self.api_last_call[self.currently_active_provider] = datetime.datetime.utcnow()

return resp_data

Expand Down Expand Up @@ -223,6 +266,45 @@ def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:

raise AssertionError("Should never be reached")

def get_diagnostics_info(self) -> List[DiagnosticsInfo]:
"""Get the diagnostic info of all providers.

:return:
List of dicts of debug info per provider.
"""

result = []

active = self.get_active_provider()

for idx, provider in enumerate(self.providers):
try:
resp = provider.make_request("eth_blockNumber", [])
last_block_number = convert_jsonrpc_value_to_int(resp["result"])
last_block_error = None
except Exception as e:
last_block_number = None
last_block_error = str(e)

total_api_call_count = sum(self.api_call_counts[idx].values())
total_api_retry_count = sum(self.api_retry_counts[idx].values())

result.append(
{
"index": idx,
"name": get_provider_name(provider),
"active": active == provider,
"last_block_number": last_block_number,
"last_block_error": last_block_error,
"url": provider.endpoint_uri,
"call_count": total_api_call_count,
"retry_count": total_api_retry_count,
"last_call": self.api_last_call[idx],
}
)

return result


def _check_faulty_rpc_response(
method: str,
Expand Down
4 changes: 2 additions & 2 deletions eth_defi/provider/multi_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ def create_multi_provider_web3(
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.mount("http://", adapter)
session.mount("https://", adapter)

call_providers = [HTTPProvider(url, request_kwargs=request_kwargs, session=session) for url in call_endpoints]

Expand Down
19 changes: 7 additions & 12 deletions eth_defi/uniswap_v3/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
convert_uint256_string_to_address,
convert_uint256_string_to_int,
decode_data,
convert_int256_bytes_to_int, convert_jsonrpc_value_to_int,
convert_int256_bytes_to_int,
convert_jsonrpc_value_to_int,
)
from eth_defi.event_reader.logresult import LogContext
from eth_defi.event_reader.reader import LogResult
Expand Down Expand Up @@ -64,7 +65,7 @@ def _decode_base(log: LogResult) -> dict:
block_time = datetime.datetime.utcfromtimestamp(log["timestamp"])

return {
"block_number": convert_jsonrpc_value_to_int(log["blockNumber"]),
"block_number": convert_jsonrpc_value_to_int(log["blockNumber"]),
"timestamp": datetime.datetime.utcfromtimestamp(log["timestamp"]),
"tx_hash": log["transactionHash"],
"log_index": convert_jsonrpc_value_to_int(log["logIndex"]),
Expand Down Expand Up @@ -351,19 +352,14 @@ def fetch_events_to_csv(
contract_events = [event_data["contract_event"] for event_data in event_mapping.values()]

# Create a filter for any Uniswap v3 pool contract, all our events we are interested in
filter = Filter.create_filter(
address=None,
event_types=contract_events
)
filter = Filter.create_filter(address=None, event_types=contract_events)

# Start scanning
restored, restored_start_block = state.restore_state(start_block)
original_block_range = end_block - start_block

if restored:
log_info(
f"Restored previous scan state, data until block {restored_start_block:,}, we are skipping {restored_start_block - start_block:,} blocks out of {original_block_range:,} total"
)
log_info(f"Restored previous scan state, data until block {restored_start_block:,}, we are skipping {restored_start_block - start_block:,} blocks out of {original_block_range:,} total")
else:
log_info(
f"No previous scan done, starting fresh from block {start_block:,}, total {original_block_range:,} blocks",
Expand All @@ -375,15 +371,14 @@ def fetch_events_to_csv(
buffers = {}

for event_name, mapping in event_mapping.items():

# Each event type gets its own CSV
file_path = f"{output_folder}/uniswap-v3-{event_name.lower()}.csv"

exists_already = Path(file_path).exists()
file_handler = open(file_path, "a", encoding="utf-8")
csv_writer = csv.DictWriter(file_handler, fieldnames=mapping["field_names"])
if not restored:
headers = ", ".join(mapping['field_names'])
headers = ", ".join(mapping["field_names"])
log_info(f"Creating a new CSV file: {file_path}, with headers: {headers}")
csv_writer.writeheader()

Expand Down Expand Up @@ -474,4 +469,4 @@ def update_progress(
buffer["file_handler"].close()
log_info(f"Wrote {buffer['total']} {event_name} events to {buffer['file_path']}")

return web3
return web3
26 changes: 26 additions & 0 deletions tests/rpc/test_fallback_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,29 @@ def test_broadcast_and_wait_multiple_nonce_too_high(web3: Web3, deployer: str):
node_switch_timeout=datetime.timedelta(seconds=1),
check_nonce_validity=True,
)


def test_get_fallback_provider_diagnosics(web3, fallback_provider: FallbackProvider, deployer):
"""Get diagnostics output."""

web3.eth.set_gas_price_strategy(node_default_gas_price_strategy)

# Fill in user wallet
tx1_hash = web3.eth.send_transaction({"from": deployer, "to": deployer, "value": 5 * 10**18})
assert_transaction_success_with_explanation(web3, tx1_hash)

diagnostics = fallback_provider.get_diagnostics_info()

assert len(diagnostics) == 2

fallback1 = diagnostics[0]
assert fallback1["active"] == True
assert fallback1["last_call"] is not None
assert fallback1["call_count"] == 10
assert fallback1["retry_count"] == 0
assert fallback1["last_block_number"] == 1
assert fallback1["last_block_error"] is None

fallback2 = diagnostics[1]
assert fallback2["last_call"] is None
assert fallback1["last_block_number"] == 1
2 changes: 1 addition & 1 deletion tests/test_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ def test_cache_reset_erc_20_details(web3: Web3, deployer: str):
reset_default_token_cache()
assert len(DEFAULT_TOKEN_CACHE) == 0
fetch_erc20_details(web3, token.address)
assert len(DEFAULT_TOKEN_CACHE) == 1
assert len(DEFAULT_TOKEN_CACHE) == 1
1 change: 0 additions & 1 deletion tests/uniswap_v2/test_uniswap_v2_price.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def web3(tester_provider):
yield Web3(tester_provider)



@pytest.fixture()
def deployer(web3) -> str:
"""Deploy account.
Expand Down
Loading