Skip to content

Commit

Permalink
same aiohttp session reuse (#138)
Browse files Browse the repository at this point in the history
* same aiohttp session reuse

* tests update changelog added

* docs updates

* readme and more docs some tests fixes

* fix ton tests, revert utxo changes

* fix ton tests, revert utxo changes

* remove btc and ltc connect from docs

* ruff

* mute utxo fix examples

---------

Co-authored-by: Oleksandr Prudnikov <[email protected]>
  • Loading branch information
Grommash9 and Oleksandr Prudnikov authored Dec 29, 2024
1 parent f72a19a commit d8ded9a
Show file tree
Hide file tree
Showing 35 changed files with 611 additions and 227 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [7.0.0]
- Added connection lifecycle management with new connect() and disconnect() methods
- Implemented async context manager support for cleaner resource management
- Improved RPC call performance by ~40-50% through session reuse

## [6.0.0]
- Move timeout between blocks for TON outside of shards logic
- Before
Expand Down
31 changes: 25 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ or for all of them:
pip install aiotx[utxo,evm]
```

Important Note

^^^^^^^^^^^^^
All AioTx clients require an active connection before use. You can establish a connection in two ways:

```python

# Method 1: Using async context manager (recommended)
async with AioTxETHClient("NODE_URL") as client:
result = await client.get_balance("address")

# Method 2: Explicit connection management
client = AioTxETHClient("NODE_URL")
await client.connect()
try:
result = await client.get_balance("address")
finally:
await client.disconnect()

```

Once installed, you can import the desired client and start interacting with the respective blockchain. Here's a quick example of how to use the EVM client:

Sending Bulk Transactions (Bitcoin):
Expand All @@ -95,7 +116,6 @@ from aiotx.clients import AioTxBTCClient

async def main():
btc_client = AioTxBTCClient("NODE_URL")

destinations = {
"1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa": 1000000,
"1BvBMSEYstWetqTFn5Au4m4GFg7xJaNVN2": 500000
Expand All @@ -114,7 +134,7 @@ from aiotx.clients import AioTxETHClient

async def main():
eth_client = AioTxETHClient("NODE_URL")

await eth_client.connect()
private_key = "YOUR_PRIVATE_KEY"
to_address = "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
token_address = "0x1f9840a85d5aF5bf1D1762F925BDADdC4201F984" # Example token address (Uniswap)
Expand All @@ -132,6 +152,7 @@ Sending TON (for bulk please check [/examples](https://github.com/Grommash9/aiot
import asyncio

ton_client = AioTxTONClient("https://testnet.toncenter.com/api/v2", workchain=0)
await ton_client.connect()
# We are adding workchain here because testnet.toncenter working bad and identify itself as -1 but it should be 0
# If you are using any other provider it should work fine without workchain param

Expand All @@ -153,7 +174,7 @@ Transferring Jettons:
import asyncio

ton_client = AioTxTONClient("https://testnet.toncenter.com/api/v2", workchain=0)

await ton_client.connect()
mnemonic_str = "your wallet mnemonic phrase here"
recipient_address = "EQCc39VS5jcptHL8vMjEXrzGaRcCVYto7HUn4bpAOg8xqB2e"
jetton_master_address = "EQAiboDEv_qRrcEdrYdwbVLNOXBHwShFbtKGbQVJ2OKxY_Di"
Expand All @@ -177,7 +198,7 @@ from aiotx.clients import AioTxETHClient

async def main():
eth_client = AioTxETHClient("NODE_URL")

await eth_client.connect()
private_key = "YOUR_PRIVATE_KEY"
to_address = "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
amount = eth_client.to_wei(0.5, "ether") # Sending 0.5 ETH
Expand All @@ -194,7 +215,6 @@ from aiotx.clients import AioTxBTCClient

async def main():
btc_client = AioTxBTCClient("NODE_URL")

private_key = "YOUR_PRIVATE_KEY"
to_address = "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa"
amount = 1000000 # Sending 0.01 BTC (amount is in satoshis)
Expand All @@ -211,7 +231,6 @@ from aiotx.clients import AioTxBTCClient

async def main():
btc_client = AioTxBTCClient("NODE_URL")

# Converting satoshis to BTC
satoshis = 1000000
btc = btc_client.from_satoshi(satoshis)
Expand Down
46 changes: 46 additions & 0 deletions aiotx/clients/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
from contextlib import suppress
from typing import List, Optional

import aiohttp


class NotConnectedError(Exception):
"""Raised when trying to use client methods before connecting."""

pass


class AioTxClient:
def __init__(self, node_url: str, headers: dict = {}):
Expand All @@ -13,6 +21,35 @@ def __init__(self, node_url: str, headers: dict = {}):
self._stop_signal: Optional[asyncio.Event] = None
self._stopped_signal: Optional[asyncio.Event] = None
self._running_lock = asyncio.Lock()
self._session: Optional[aiohttp.ClientSession] = None
self._connected = False

async def connect(self) -> None:
"""Establish connection and create session."""
if not self._connected:
self._session = aiohttp.ClientSession()
self._connected = True

async def disconnect(self) -> None:
"""Close connection and cleanup session."""
if self._connected and self._session:
await self._session.close()
self._session = None
self._connected = False

def _check_connection(self) -> None:
"""Check if client is connected before making requests."""
if not self._connected or not self._session:
raise NotConnectedError("Client is not connected. Call connect() first.")

async def __aenter__(self):
"""Async context manager entry."""
await self.connect()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.disconnect()

def _setup_signal_handlers(self):
if os.name == "nt":
Expand All @@ -28,6 +65,8 @@ async def start_monitoring(
timeout_between_blocks: int = 1,
**kwargs,
) -> None:
self._check_connection()

if not self.monitor:
raise ValueError(
"BlockMonitor instance must be set before starting monitoring"
Expand Down Expand Up @@ -86,6 +125,13 @@ async def wait_closed(self):
if self._stopped_signal:
await self._stopped_signal.wait()

async def _make_request(
self, method: str, url: str, **kwargs
) -> aiohttp.ClientResponse:
"""Make HTTP request using the shared session."""
self._check_connection()
return await self._session.request(method, url, **kwargs)


class BlockMonitor:
def __init__(self, client: AioTxClient):
Expand Down
136 changes: 69 additions & 67 deletions aiotx/clients/_evm_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import sys
from typing import Union

import aiohttp

from aiotx.clients._base_client import AioTxClient, BlockMonitor
from aiotx.exceptions import (
AioTxError,
Expand Down Expand Up @@ -347,74 +345,78 @@ async def get_chain_id(self) -> int:
return 0 if tx_count == "0x" else int(tx_count, 16)

async def _make_rpc_call(self, payload) -> dict:
self._check_connection()
payload["jsonrpc"] = "2.0"
payload["id"] = 1
logger.info(f"rpc call payload: {payload}")
async with aiohttp.ClientSession() as session:
async with session.post(
self.node_url, data=json.dumps(payload), headers=self._headers
) as response:
response_text = await response.text()
logger.info(f"rpc call result: {response_text}")
if response.status != 200:
raise RpcConnectionError(response_text)
result = await response.json()
if "error" not in result.keys():
return result["result"]
error_code = result["error"]["code"]
error_message = result["error"]["message"]
if error_code == -32000:
if (
"header not found" in error_message
or "could not find block" in error_message
):
raise BlockNotFoundError(error_message)
elif "stack limit reached" in error_message:
raise StackLimitReachedError(error_message)
elif "method handler crashed" in error_message:
raise MethodHandlerCrashedError(error_message)
elif "execution timeout" in error_message:
raise ExecutionTimeoutError(error_message)
elif "nonce too low" in error_message:
raise NonceTooLowError(error_message)
elif "filter not found" in error_message:
raise FilterNotFoundError(error_message)
elif "replacement transaction underpriced" in error_message:
raise ReplacementTransactionUnderpriced(error_message)
else:
raise AioTxError(f"Error {error_code}: {error_message}")
elif error_code == -32009:
raise TraceRequestLimitExceededError(error_message)
elif error_code == -32010:
raise TransactionCostExceedsGasLimitError(error_message)
elif error_code == -32011:
raise NetworkError(error_message)
elif error_code == -32015:
raise VMExecutionError(error_message)
elif error_code == -32601:
if "method not found" in error_message:
raise MethodNotFoundError(error_message)
elif "failed to parse request" in error_message:
raise InvalidRequestError(error_message)
elif error_code == -32602:
if (
"invalid argument" in error_message
and "cannot unmarshal hex string without 0x prefix"
in error_message
or "cannot unmarshal hex string of odd length into"
in error_message
or "hex string has length" in error_message
):
raise InvalidArgumentError(error_message)
elif (
"eth_getLogs and eth_newFilter are limited to a 10,000 blocks range"
in error_message
):
raise BlockRangeLimitExceededError(error_message)
elif error_code == -32603:
raise InternalJSONRPCError(error_message)
else:
raise RpcConnectionError(f"Error {error_code}: {error_message}")

response = await self._make_request(
"POST", self.node_url, data=json.dumps(payload), headers=self._headers
)

response_text = await response.text()
logger.info(f"rpc call result: {response_text}")

if response.status != 200:
raise RpcConnectionError(response_text)

result = await response.json()
if "error" not in result.keys():
return result["result"]

error_code = result["error"]["code"]
error_message = result["error"]["message"]

if error_code == -32000:
if (
"header not found" in error_message
or "could not find block" in error_message
):
raise BlockNotFoundError(error_message)
elif "stack limit reached" in error_message:
raise StackLimitReachedError(error_message)
elif "method handler crashed" in error_message:
raise MethodHandlerCrashedError(error_message)
elif "execution timeout" in error_message:
raise ExecutionTimeoutError(error_message)
elif "nonce too low" in error_message:
raise NonceTooLowError(error_message)
elif "filter not found" in error_message:
raise FilterNotFoundError(error_message)
elif "replacement transaction underpriced" in error_message:
raise ReplacementTransactionUnderpriced(error_message)
else:
raise AioTxError(f"Error {error_code}: {error_message}")
elif error_code == -32009:
raise TraceRequestLimitExceededError(error_message)
elif error_code == -32010:
raise TransactionCostExceedsGasLimitError(error_message)
elif error_code == -32011:
raise NetworkError(error_message)
elif error_code == -32015:
raise VMExecutionError(error_message)
elif error_code == -32601:
if "method not found" in error_message:
raise MethodNotFoundError(error_message)
elif "failed to parse request" in error_message:
raise InvalidRequestError(error_message)
elif error_code == -32602:
if (
"invalid argument" in error_message
and "cannot unmarshal hex string without 0x prefix" in error_message
or "cannot unmarshal hex string of odd length into" in error_message
or "hex string has length" in error_message
):
raise InvalidArgumentError(error_message)
elif (
"eth_getLogs and eth_newFilter are limited to a 10,000 blocks range"
in error_message
):
raise BlockRangeLimitExceededError(error_message)
elif error_code == -32603:
raise InternalJSONRPCError(error_message)
else:
raise RpcConnectionError(f"Error {error_code}: {error_message}")


class EvmMonitor(BlockMonitor):
Expand Down
Loading

0 comments on commit d8ded9a

Please sign in to comment.