Skip to content

Commit

Permalink
pull ton shards in gather parallel mode and add retry logic for tron … (
Browse files Browse the repository at this point in the history
#139)

* pull ton shards in gather parallel mode and add retry logic for tron an ton monitorings

* make retry more generic

* reamde

---------

Co-authored-by: Oleksandr Prudnikov <[email protected]>
  • Loading branch information
Grommash9 and Oleksandr Prudnikov authored Dec 29, 2024
1 parent d8ded9a commit fa8164f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [8.0.0]
- Added retry logic to TRON and TON block monitoring's
- In TON during monitoring shards now pulled in gather (parallel) mode to be able to keep up with network

## [7.0.0]
- Added connection lifecycle management with new connect() and disconnect() methods
- Implemented async context manager support for cleaner resource management
Expand Down
20 changes: 20 additions & 0 deletions aiotx/clients/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import aiohttp

from aiotx.exceptions import RpcConnectionError
from aiotx.log import logger


class NotConnectedError(Exception):
"""Raised when trying to use client methods before connecting."""
Expand Down Expand Up @@ -142,6 +145,8 @@ def __init__(self, client: AioTxClient):
self.block_transactions_handlers: List[callable] = []
self._stop_signal: Optional[asyncio.Event] = None
self._latest_block: Optional[int] = None
self.max_retries: Optional[int] = 10
self.retry_delay: Optional[float] = 0.2

def on_block(self, func):
self.block_handlers.append(func)
Expand All @@ -159,6 +164,21 @@ def on_new_utxo_transaction(self, func):
self.new_utxo_transaction_handlers.append(func)
return func

async def _make_request_with_retry(self, request_func, *args, **kwargs):
"""Make a request with retry logic."""
for attempt in range(self.max_retries):
try:
return await request_func(*args, **kwargs)
except RpcConnectionError as e:
if attempt < self.max_retries - 1:
delay = self.retry_delay * (2**attempt)
logger.warning(
f"RpcConnectionError {e}, retrying in {delay} seconds... (Attempt {attempt + 1}/{self.max_retries})"
)
await asyncio.sleep(delay)
else:
raise

async def start(
self,
monitoring_start_block: Optional[int],
Expand Down
40 changes: 33 additions & 7 deletions aiotx/clients/_ton_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ async def run_get_method(self, method: str, address: str, stack: list):

async def _make_rpc_call(self, payload) -> dict:
self._check_connection()

payload["jsonrpc"] = "2.0"
payload["id"] = 1
payload_json = json.dumps(payload)
Expand Down Expand Up @@ -409,28 +410,53 @@ async def _make_rpc_call(self, payload) -> dict:


class TonMonitor(BlockMonitor):
def __init__(self, client: AioTxTONClient, last_master_block: Optional[int] = None):
def __init__(
self,
client: AioTxTONClient,
last_master_block: Optional[int] = None,
max_retries: int = 10,
retry_delay: float = 0.2,
):
self.client = client
self.block_handlers = []
self.transaction_handlers = []
self.block_transactions_handlers = []
self.running = False
self._last_master_block = last_master_block
self.max_retries = max_retries
self.retry_delay = retry_delay

async def _process_shard(self, shard: dict, timeout_between_blocks: int):
"""Process a single shard and its transactions."""
shard_transactions = await self._make_request_with_retry(
self.client.get_block_transactions,
shard["workchain"],
shard["shard"],
shard["seqno"],
1000,
)
await self.process_shard_transactions(shard_transactions)

async def poll_blocks(self, timeout_between_blocks: int):
workchain, shard, seqno = await self.client._get_network_params()
if self.client.workchain is None:
self.client.workchain = workchain

target_block = seqno if self._latest_block is None else self._latest_block
if target_block > seqno:
await asyncio.sleep(timeout_between_blocks)
return
shards = await self.client.get_master_block_shards(target_block)
for shard in shards:
shard_transactions = await self.client.get_block_transactions(
shard["workchain"], shard["shard"], shard["seqno"], 1000
)
await self.process_shard_transactions(shard_transactions)

shards = await self._make_request_with_retry(
self.client.get_master_block_shards, target_block
)

# Process all shards concurrently
shard_tasks = [
self._process_shard(shard, timeout_between_blocks) for shard in shards
]
await asyncio.gather(*shard_tasks)

await self.process_master_block(target_block)
self._latest_block = target_block + 1

Expand Down
19 changes: 16 additions & 3 deletions aiotx/clients/_tron_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,22 +400,35 @@ async def _make_rpc_call(self, payload, path="/jsonrpc") -> dict:


class TronMonitor(BlockMonitor):
def __init__(self, client: AioTxTRONClient, last_block: Optional[int] = None):
def __init__(
self,
client: AioTxTRONClient,
last_block: Optional[int] = None,
max_retries: int = 3,
retry_delay: float = 1,
):
self.client = client
self.block_handlers = []
self.transaction_handlers = []
self.block_transactions_handlers = []
self.running = False
self._last_block = last_block
self.max_retries = max_retries
self.retry_delay = retry_delay

async def poll_blocks(self, _: int):
network_last_block = await self.client.get_last_block_number()
network_last_block = await self._make_request_with_retry(
self.client.get_last_block_number
)
target_block = (
network_last_block if self._latest_block is None else self._latest_block
)
if target_block > network_last_block:
return
block_data = await self.client.get_block_by_number(target_block)
block_data = await self._make_request_with_retry(
self.client.get_block_by_number,
target_block,
)
await self.process_transactions(block_data["transactions"])
await self.process_block(target_block, network_last_block)
self._latest_block = target_block + 1
Expand Down

0 comments on commit fa8164f

Please sign in to comment.