diff --git a/CHANGELOG.md b/CHANGELOG.md index d0a4411..ce82f10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [8.0.0] +- Added retry logic to TRON and TON block monitoring's +- In TON during monitoring shards now pulled in gather (parallel) mode to be able to keep up with network + ## [7.0.0] - Added connection lifecycle management with new connect() and disconnect() methods - Implemented async context manager support for cleaner resource management diff --git a/aiotx/clients/_base_client.py b/aiotx/clients/_base_client.py index 524c8c3..cff6d68 100644 --- a/aiotx/clients/_base_client.py +++ b/aiotx/clients/_base_client.py @@ -6,6 +6,9 @@ import aiohttp +from aiotx.exceptions import RpcConnectionError +from aiotx.log import logger + class NotConnectedError(Exception): """Raised when trying to use client methods before connecting.""" @@ -142,6 +145,8 @@ def __init__(self, client: AioTxClient): self.block_transactions_handlers: List[callable] = [] self._stop_signal: Optional[asyncio.Event] = None self._latest_block: Optional[int] = None + self.max_retries: Optional[int] = 10 + self.retry_delay: Optional[float] = 0.2 def on_block(self, func): self.block_handlers.append(func) @@ -159,6 +164,21 @@ def on_new_utxo_transaction(self, func): self.new_utxo_transaction_handlers.append(func) return func + async def _make_request_with_retry(self, request_func, *args, **kwargs): + """Make a request with retry logic.""" + for attempt in range(self.max_retries): + try: + return await request_func(*args, **kwargs) + except RpcConnectionError as e: + if attempt < self.max_retries - 1: + delay = self.retry_delay * (2**attempt) + logger.warning( + f"RpcConnectionError {e}, retrying in {delay} seconds... (Attempt {attempt + 1}/{self.max_retries})" + ) + await asyncio.sleep(delay) + else: + raise + async def start( self, monitoring_start_block: Optional[int], diff --git a/aiotx/clients/_ton_base_client.py b/aiotx/clients/_ton_base_client.py index 933dd00..08d603a 100644 --- a/aiotx/clients/_ton_base_client.py +++ b/aiotx/clients/_ton_base_client.py @@ -382,6 +382,7 @@ async def run_get_method(self, method: str, address: str, stack: list): async def _make_rpc_call(self, payload) -> dict: self._check_connection() + payload["jsonrpc"] = "2.0" payload["id"] = 1 payload_json = json.dumps(payload) @@ -409,28 +410,53 @@ async def _make_rpc_call(self, payload) -> dict: class TonMonitor(BlockMonitor): - def __init__(self, client: AioTxTONClient, last_master_block: Optional[int] = None): + def __init__( + self, + client: AioTxTONClient, + last_master_block: Optional[int] = None, + max_retries: int = 10, + retry_delay: float = 0.2, + ): self.client = client self.block_handlers = [] self.transaction_handlers = [] self.block_transactions_handlers = [] self.running = False self._last_master_block = last_master_block + self.max_retries = max_retries + self.retry_delay = retry_delay + + async def _process_shard(self, shard: dict, timeout_between_blocks: int): + """Process a single shard and its transactions.""" + shard_transactions = await self._make_request_with_retry( + self.client.get_block_transactions, + shard["workchain"], + shard["shard"], + shard["seqno"], + 1000, + ) + await self.process_shard_transactions(shard_transactions) async def poll_blocks(self, timeout_between_blocks: int): workchain, shard, seqno = await self.client._get_network_params() if self.client.workchain is None: self.client.workchain = workchain + target_block = seqno if self._latest_block is None else self._latest_block if target_block > seqno: await asyncio.sleep(timeout_between_blocks) return - shards = await self.client.get_master_block_shards(target_block) - for shard in shards: - shard_transactions = await self.client.get_block_transactions( - shard["workchain"], shard["shard"], shard["seqno"], 1000 - ) - await self.process_shard_transactions(shard_transactions) + + shards = await self._make_request_with_retry( + self.client.get_master_block_shards, target_block + ) + + # Process all shards concurrently + shard_tasks = [ + self._process_shard(shard, timeout_between_blocks) for shard in shards + ] + await asyncio.gather(*shard_tasks) + await self.process_master_block(target_block) self._latest_block = target_block + 1 diff --git a/aiotx/clients/_tron_base_client.py b/aiotx/clients/_tron_base_client.py index 6ced433..8e58a2c 100644 --- a/aiotx/clients/_tron_base_client.py +++ b/aiotx/clients/_tron_base_client.py @@ -400,22 +400,35 @@ async def _make_rpc_call(self, payload, path="/jsonrpc") -> dict: class TronMonitor(BlockMonitor): - def __init__(self, client: AioTxTRONClient, last_block: Optional[int] = None): + def __init__( + self, + client: AioTxTRONClient, + last_block: Optional[int] = None, + max_retries: int = 3, + retry_delay: float = 1, + ): self.client = client self.block_handlers = [] self.transaction_handlers = [] self.block_transactions_handlers = [] self.running = False self._last_block = last_block + self.max_retries = max_retries + self.retry_delay = retry_delay async def poll_blocks(self, _: int): - network_last_block = await self.client.get_last_block_number() + network_last_block = await self._make_request_with_retry( + self.client.get_last_block_number + ) target_block = ( network_last_block if self._latest_block is None else self._latest_block ) if target_block > network_last_block: return - block_data = await self.client.get_block_by_number(target_block) + block_data = await self._make_request_with_retry( + self.client.get_block_by_number, + target_block, + ) await self.process_transactions(block_data["transactions"]) await self.process_block(target_block, network_last_block) self._latest_block = target_block + 1