Skip to content

Commit

Permalink
pull ton shards in gather parallel mode and add retry logic for tron …
Browse files Browse the repository at this point in the history
…an ton monitorings
  • Loading branch information
Oleksandr Prudnikov committed Dec 29, 2024
1 parent d8ded9a commit 5928c76
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 12 deletions.
18 changes: 17 additions & 1 deletion aiotx/clients/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import signal
from contextlib import suppress
from typing import List, Optional

from aiotx.log import logger
from aiotx.exceptions import RpcConnectionError
import aiohttp


Expand Down Expand Up @@ -142,6 +143,8 @@ def __init__(self, client: AioTxClient):
self.block_transactions_handlers: List[callable] = []
self._stop_signal: Optional[asyncio.Event] = None
self._latest_block: Optional[int] = None
self.max_retries: Optional[int] = 10
self.retry_delay: Optional[float] = 0.2

def on_block(self, func):
self.block_handlers.append(func)
Expand All @@ -158,6 +161,19 @@ def on_transaction(self, func):
def on_new_utxo_transaction(self, func):
self.new_utxo_transaction_handlers.append(func)
return func

async def _make_request_with_retry(self, request_func, *args, **kwargs):
"""Make a request with retry logic."""
for attempt in range(self.max_retries):
try:
return await request_func(*args, **kwargs)
except RpcConnectionError as e:
if "No working liteservers" in str(e) and attempt < self.max_retries - 1:
delay = self.retry_delay * (2 ** attempt)
logger.warning(f"Lite server unavailable, retrying in {delay} seconds... (Attempt {attempt + 1}/{self.max_retries})")
await asyncio.sleep(delay)
else:
raise

async def start(
self,
Expand Down
43 changes: 35 additions & 8 deletions aiotx/clients/_ton_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,28 +409,55 @@ async def _make_rpc_call(self, payload) -> dict:


class TonMonitor(BlockMonitor):
def __init__(self, client: AioTxTONClient, last_master_block: Optional[int] = None):
def __init__(
self,
client: AioTxTONClient,
last_master_block: Optional[int] = None,
max_retries: int = 10,
retry_delay: float = 0.2
):
self.client = client
self.block_handlers = []
self.transaction_handlers = []
self.block_transactions_handlers = []
self.running = False
self._last_master_block = last_master_block
self.max_retries = max_retries
self.retry_delay = retry_delay

async def _process_shard(self, shard: dict, timeout_between_blocks: int):
"""Process a single shard and its transactions."""
shard_transactions = await self._make_request_with_retry(
self.client.get_block_transactions,
shard["workchain"],
shard["shard"],
shard["seqno"],
1000
)
await self.process_shard_transactions(shard_transactions)

async def poll_blocks(self, timeout_between_blocks: int):
workchain, shard, seqno = await self.client._get_network_params()
if self.client.workchain is None:
self.client.workchain = workchain

target_block = seqno if self._latest_block is None else self._latest_block
if target_block > seqno:
await asyncio.sleep(timeout_between_blocks)
return
shards = await self.client.get_master_block_shards(target_block)
for shard in shards:
shard_transactions = await self.client.get_block_transactions(
shard["workchain"], shard["shard"], shard["seqno"], 1000
)
await self.process_shard_transactions(shard_transactions)

shards = await self._make_request_with_retry(
self.client.get_master_block_shards,
target_block
)

# Process all shards concurrently
shard_tasks = [
self._process_shard(shard, timeout_between_blocks)
for shard in shards
]
await asyncio.gather(*shard_tasks)

await self.process_master_block(target_block)
self._latest_block = target_block + 1

Expand All @@ -444,4 +471,4 @@ async def process_shard_transactions(self, shard_transactions):

for transaction in shard_transactions:
for handler in self.transaction_handlers:
await handler(transaction)
await handler(transaction)
14 changes: 11 additions & 3 deletions aiotx/clients/_tron_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,22 +400,30 @@ async def _make_rpc_call(self, payload, path="/jsonrpc") -> dict:


class TronMonitor(BlockMonitor):
def __init__(self, client: AioTxTRONClient, last_block: Optional[int] = None):
def __init__(self, client: AioTxTRONClient, last_block: Optional[int] = None, max_retries: int = 3,
retry_delay: float = 1):
self.client = client
self.block_handlers = []
self.transaction_handlers = []
self.block_transactions_handlers = []
self.running = False
self._last_block = last_block
self.max_retries = max_retries
self.retry_delay = retry_delay

async def poll_blocks(self, _: int):
network_last_block = await self.client.get_last_block_number()
network_last_block = await self._make_request_with_retry(
self.client.get_last_block_number
)
target_block = (
network_last_block if self._latest_block is None else self._latest_block
)
if target_block > network_last_block:
return
block_data = await self.client.get_block_by_number(target_block)
block_data = await self._make_request_with_retry(
self.client.get_block_by_number,
target_block,
)
await self.process_transactions(block_data["transactions"])
await self.process_block(target_block, network_last_block)
self._latest_block = target_block + 1
Expand Down

0 comments on commit 5928c76

Please sign in to comment.