Skip to content

Commit

Permalink
Block and transaction monitorting (#2)
Browse files Browse the repository at this point in the history
* basic implementation

* examples added and sync/async problems fixed

* examples lint fix + exclude from setup

* block monitoring is covered by tests now

* base class added for client and monitoring to prevent future code duplication

* transaction input decoding logic added and covered by tests

* monitoring is now async only

* two clients example added
  • Loading branch information
Grommash9 authored May 3, 2024
1 parent 22b04ed commit f8c144e
Show file tree
Hide file tree
Showing 15 changed files with 732 additions and 32 deletions.
79 changes: 79 additions & 0 deletions aiotx/clients/_base_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio
import concurrent.futures


class AioTxClient:
def __init__(self, node_url):
self.node_url = node_url
self.monitor = BlockMonitor(self)
self._monitoring_task = None

async def start_monitoring(self, monitoring_start_block: int = None):
if self._monitoring_task is None:
self._monitoring_task = asyncio.create_task(self._run_monitoring(monitoring_start_block))
return self._monitoring_task

def stop_monitoring(self):
if self._monitoring_task is not None:
self._monitoring_task.cancel()
try:
asyncio.get_event_loop().run_until_complete(self._monitoring_task)
except asyncio.CancelledError:
pass
self._monitoring_task = None

async def _run_monitoring(self, monitoring_start_block):
try:
async with self.monitor:
await self.monitor.start(monitoring_start_block)
except asyncio.CancelledError:
pass

async def get_block_by_number(self, block_number: int):
pass

async def get_last_block(self) -> int:
pass

class BlockMonitor:
def __init__(self, client: AioTxClient):
self.client = client
self.block_handlers = []
self.transaction_handlers = []
self.running = False
self._latest_block = None

def on_block(self, func):
self.block_handlers.append(func)
return func

def on_transaction(self, func):
self.transaction_handlers.append(func)
return func

async def __aenter__(self):
self.running = True
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
self.stop()

async def start(self, monitoring_start_block):
self.running = True
self._latest_block = monitoring_start_block
while self.running:
try:
await self.poll_blocks()
await asyncio.sleep(1)
except Exception as e:
print(f"Error during polling: {e}")
await asyncio.sleep(2)

def stop(self):
self.running = False

async def poll_blocks(self):
pass

async def process_block(self, block):
pass
5 changes: 4 additions & 1 deletion aiotx/clients/_bitcoin_base_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
class AioTxUTXOClient:
from aiotx.clients._base_client import AioTxClient


class AioTxUTXOClient(AioTxClient):

def __init__(self, node_url):
self.node_url = node_url
Expand Down
93 changes: 88 additions & 5 deletions aiotx/clients/_evm_base_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import asyncio
import json
import secrets

import aiohttp
from eth_abi import encode
from eth_abi import decode, encode
from eth_account import Account
from eth_typing import HexStr
from eth_utils import keccak, to_checksum_address, to_hex
from eth_utils import (
decode_hex,
function_signature_to_4byte_selector,
keccak,
to_checksum_address,
to_hex,
)

from aiotx.clients._base_client import AioTxClient, BlockMonitor
from aiotx.exceptions import (
AioTxError,
BlockNotFoundError,
Expand All @@ -30,10 +38,15 @@
from aiotx.types import BlockParam


class AioTxEVMClient:
class AioTxEVMClient(AioTxClient):
def __init__(self, node_url, chain_id):
self.node_url = node_url
self.chain_id = chain_id
self.monitor = EvmMonitor(self)
self._monitoring_task = None
with open("aiotx/utils/bep20_abi.json") as file:
bep_20_abi = json.loads(file.read())
self._bep20_abi = bep_20_abi

def generate_address(self):
private_key_bytes = secrets.token_hex(32)
Expand All @@ -45,7 +58,7 @@ def get_address_from_private_key(self, private_key: str):
sender_address = Account.from_key(private_key).address
return to_checksum_address(sender_address)

async def get_last_block(self):
async def get_last_block(self) -> int:
payload = {"method": "eth_blockNumber", "params": []}
result = await self._make_rpc_call(payload)
last_block = result["result"]
Expand All @@ -58,11 +71,49 @@ async def get_balance(self, address, block_parameter: BlockParam = BlockParam.LA
balance = result["result"]
return 0 if balance == "0x" else int(result["result"], 16)

async def get_transaction(self, hash):
async def get_transaction(self, hash) -> dict:
payload = {"method": "eth_getTransactionByHash", "params": [hash]}
result = await self._make_rpc_call(payload)
if result["result"] is None:
raise TransactionNotFound(f"Transaction {hash} not found!")
tx_data = result["result"]
tx_data["aiotx_decoded_input"] = self.decode_transaction_input(tx_data["input"])
return tx_data

def decode_transaction_input(self, input_data: str) -> dict:
if input_data == "0x":
return {
'function_name': None,
'parameters': None
}
for abi_entry in self._bep20_abi:
function_name = abi_entry['name']
input_types = [inp['type'] for inp in abi_entry['inputs']]
function_signature = f"{function_name}({','.join(input_types)})"
function_selector = function_signature_to_4byte_selector(function_signature)

if input_data.startswith('0x' + function_selector.hex()):
decoded_data = decode(input_types, decode_hex(input_data[10:]))
decoded_params = {}
for i, param in enumerate(decoded_data):
param_name = abi_entry["inputs"][i]["name"]
param_value = param
decoded_params[param_name] = param_value

return {
'function_name': function_name,
'parameters': decoded_params
}

return {
'function_name': None,
'parameters': None
}

async def get_block_by_number(self, block_number: int, transaction_detail_flag: bool = True):
payload = {"method": "eth_getBlockByNumber", "params": [hex(block_number), transaction_detail_flag]}
result = await self._make_rpc_call(payload)
return result

async def get_token_balance(self, address, contract_address, block_parameter: BlockParam = BlockParam.LATEST) -> int:
function_signature = "balanceOf(address)".encode("UTF-8")
Expand Down Expand Up @@ -197,3 +248,35 @@ async def _make_rpc_call(self, payload) -> dict:
raise InternalJSONRPCError(error_message)
else:
raise AioTxError(f"Error {error_code}: {error_message}")



class EvmMonitor(BlockMonitor):
def __init__(self, client: AioTxEVMClient):
self.client = client
self.block_handlers = []
self.transaction_handlers = []
self.running = False
self._latest_block = None

async def poll_blocks(self,):
if self._latest_block is None:
self._latest_block = await self.client.get_last_block()
block = await self.client.get_block_by_number(self._latest_block)
await self.process_block(block["result"])
self._latest_block = self._latest_block + 1

async def process_block(self, block):
for handler in self.block_handlers:
if asyncio.iscoroutinefunction(handler):
await handler(int(block["number"], 16))
else:
handler(int(block["number"], 16))

for transaction in block["transactions"]:
for handler in self.transaction_handlers:
transaction["aiotx_decoded_input"] = self.client.decode_transaction_input(transaction["input"])
if asyncio.iscoroutinefunction(handler):
await handler(transaction)
else:
handler(transaction)
Loading

0 comments on commit f8c144e

Please sign in to comment.