Skip to content

Commit

Permalink
Added shard sequence tracking for TON monitoring (#143)
Browse files Browse the repository at this point in the history
* Added shard sequence tracking for TON monitoring

* add gap skipping fix for shards processed between master blocks

* ruff format

* drop python 3.9 support

* changelog update

* update ruff and fix lint

---------

Co-authored-by: Oleksandr Prudnikov <[email protected]>
  • Loading branch information
Grommash9 and Oleksandr Prudnikov authored Jan 24, 2025
1 parent 7c07c43 commit bf9bb41
Show file tree
Hide file tree
Showing 15 changed files with 4,287 additions and 850 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ["3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v4
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [9.0.0]
- Added shard sequence tracking for TON monitoring
- Now processing all intermediate shard blocks between masterchain references
- Improved transaction completeness by filling shard block gaps
- Added error handling for failed shard block fetches
- Remove tests for python3.9 (it's recommended to stop using python 3.9 from now)

## [8.1.0]
- cover _get_network_params by retry as well

Expand Down
69 changes: 48 additions & 21 deletions aiotx/clients/_ton_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ async def generate_address(self) -> tuple[str, str, str]:
)

def _unpack_mnemonic(self, mnemonic_str: str):
assert isinstance(
mnemonic_str, str
), "Mnemonic should be represented as string!"
assert isinstance(mnemonic_str, str), (
"Mnemonic should be represented as string!"
)
mnemonic_list = mnemonic_str.split(" ")
if not mnemonic_is_valid(mnemonic_list):
raise WrongPrivateKey("mnemonic phrase not valid!")
Expand Down Expand Up @@ -146,9 +146,9 @@ async def send(
seqno: int = None,
memo: str = None,
) -> str:
assert isinstance(
amount, int
), "Amount should be integer! Please use to_nano for convert it!"
assert isinstance(amount, int), (
"Amount should be integer! Please use to_nano for convert it!"
)
if self.workchain is None:
await self._get_network_params()

Expand Down Expand Up @@ -180,9 +180,9 @@ async def deploy_wallet(self, mnemonic_str: str) -> str:
def _create_bulk_transfer_boc(
self, mnemonic_str: str, recipients_list: dict
) -> str:
assert (
self.wallet_version == WalletVersionEnum.hv2
), "For using that method you should use HighloadWalletV2Contract"
assert self.wallet_version == WalletVersionEnum.hv2, (
"For using that method you should use HighloadWalletV2Contract"
)
mnemonic_list = self._unpack_mnemonic(mnemonic_str)
_, _, _, wallet = Wallets.from_mnemonics(
mnemonic_list,
Expand All @@ -208,9 +208,9 @@ def generate_query_id(self, timeout):
def _create_transfer_boc(
self, mnemonic_str: str, to_address, amount, seqno, memo
) -> str:
assert (
self.wallet_version != WalletVersionEnum.hv2
), "For using that method you should not use HighloadWalletV2Contract"
assert self.wallet_version != WalletVersionEnum.hv2, (
"For using that method you should not use HighloadWalletV2Contract"
)
mnemonic_list = self._unpack_mnemonic(mnemonic_str)
_, _, _, wallet = Wallets.from_mnemonics(
mnemonic_list, self.wallet_version, self.workchain
Expand Down Expand Up @@ -425,17 +425,44 @@ def __init__(
self._last_master_block = last_master_block
self.max_retries = max_retries
self.retry_delay = retry_delay
self.shard_last_seqno = {} # (workchain, shard) -> last_seqno

async def _process_shard(self, shard: dict, timeout_between_blocks: int):
"""Process a single shard and its transactions."""
shard_transactions = await self._make_request_with_retry(
self.client.get_block_transactions,
shard["workchain"],
shard["shard"],
shard["seqno"],
1000,
)
await self.process_shard_transactions(shard_transactions)
"""Process all blocks in shard chain since last seen seqno"""
shard_id = (shard["workchain"], shard["shard"])
current_seqno = shard["seqno"]

# Get last processed seqno for this shard
last_seqno = self.shard_last_seqno.get(shard_id, current_seqno - 1)

# If we're up-to-date, process just the latest block
if current_seqno <= last_seqno:
return

# Process all missing blocks in order
tasks = []
for seqno in range(last_seqno + 1, current_seqno + 1):
tasks.append(
self._make_request_with_retry(
self.client.get_block_transactions,
shard["workchain"],
shard["shard"],
seqno,
1000,
)
)

# Process all blocks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
for seqno, transactions in enumerate(results, start=last_seqno + 1):
if isinstance(transactions, Exception):
logger.error(f"Failed to process shard block {shard_id}:{seqno}")
continue

await self.process_shard_transactions(transactions)

# Update last processed seqno
self.shard_last_seqno[shard_id] = current_seqno

async def poll_blocks(self, timeout_between_blocks: int):
workchain, shard, seqno = await self._make_request_with_retry(
Expand Down
14 changes: 13 additions & 1 deletion docs/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,20 @@ Inside the `handle_block` handler, you can process the master block as needed. T

Inside the `handle_transaction` handler, you can process each transaction encountered. The `transaction` parameter contains basic transaction information such as the account address, logical time, and transaction hash.

The `handle_block_transactions` handler receives a list of transactions for each shard. Note that for TON, this handler will be triggered multiple times for each master block, as it processes shard transactions separately.
The `handle_block_transactions` handler receives a list of transactions for each shard. Note that for TON:
- This handler will be triggered for every shard block, even if not directly referenced by masterchain
- Full transaction continuity is maintained by tracking shard sequence numbers
- All intermediate blocks between masterchain updates are automatically processed
- Missed blocks due to network issues are retried up to 10 times

Example output showing gap handling:
.. code-block:: text
ton_client: block 38104588 # Master block
ton_client: shard transactions [...] # Shard 49232614
ton_client: shard transactions [...] # Shard 49232615 (auto-filled gap)
ton_client: block 38104589 # Next master block
By default, the transaction details are not fetched for every transaction to avoid consuming a large number of API calls. If you want to retrieve more details about a specific transaction, you can use the `get_transactions` method of `AioTxTONClient`, as shown in the example:

.. code-block:: python
Expand Down
8 changes: 4 additions & 4 deletions examples/two_block_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@


@bsc_client.monitor.on_block
async def handle_block(block, latest_block):
async def handle_bsc_block(block, latest_block):
print("bsc_client: block", block)


@bsc_client.monitor.on_transaction
async def handle_transaction(transaction):
async def handle_bsc_transaction(transaction):
print("bsc_client: transaction", transaction)


@eth_client.monitor.on_block
async def handle_block(block, latest_block):
async def handle_eth_block(block, latest_block):
print("eth_client: block", block)


@eth_client.monitor.on_transaction
async def handle_transaction(transaction):
async def handle_eth_transaction(transaction):
print("eth_client: transaction", transaction)


Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
async def ton_client() -> AioTxTONClient:
# current test rpc connection returning -1 as workchain but it should be 0,
# so we are setting that param by ourself
client = AioTxTONClient(TON_TEST_NODE_URL, workchain=0)
client = AioTxTONClient(TON_TEST_NODE_URL, workchain=0, headers={"X-API-KEY": "d"})
await client.connect()
return client

Expand Down
Loading

0 comments on commit bf9bb41

Please sign in to comment.