Skip to content

Commit

Permalink
Add support for v2/sse/blocks #22.
Browse files Browse the repository at this point in the history
  • Loading branch information
nessshon committed Jul 21, 2024
1 parent a8bc4ef commit 9878cc0
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 15 deletions.
1 change: 0 additions & 1 deletion pytonapi/async_tonapi/methods/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ async def subscribe_to_blocks(
"""
method = "v2/sse/blocks"
params = {} if workchain is None else {'workchain': workchain}
print(params)
async for data in self._subscribe(method=method, params=params):
event = BlockEventData(**json.loads(data))
result = await handler(event, *args)
Expand Down
59 changes: 45 additions & 14 deletions pytonapi/tonapi/methods/sse.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from typing import List, Callable, Any, Tuple
from typing import List, Callable, Any, Tuple, Optional, Union

from pytonapi.schema.events import TransactionEventData, TraceEventData, MempoolEventData
from pytonapi.schema.events import TransactionEventData, TraceEventData, MempoolEventData, BlockEventData
from pytonapi.tonapi import TonapiClient


Expand All @@ -10,18 +10,32 @@ class SSEMethod(TonapiClient):
def subscribe_to_transactions(
self,
accounts: List[str],
handler: Callable[[TransactionEventData, List[Any]], Any],
args: Tuple[Any, ...] = (),
handler: Callable[[TransactionEventData, Any], Any],
operations: Optional[List[str]] = None,
args: Tuple = (),
) -> None:
"""
Subscribes to transactions SSE events for the specified accounts.
:param handler: A callable function to handle the SSEEvent
:param accounts: A list of account addresses to subscribe to
:param accounts: A comma-separated list of account IDs.
A special value of "accounts" is ALL. TonAPI will stream transactions for all accounts in this case.
:param operations: A comma-separated list of operations, which makes it possible
to get transactions based on the `first 4 bytes of a message body of an inbound message(opens in a new tab)
<https://docs.ton.org/develop/smart-contracts/guidelines/internal-messages#internal-message-body>`_.
Each operation is a string containing either one of the supported names or a hex string
representing a message operation opcode which is an unsigned 32-bit integer.
A hex string must start with "0x" prefix and have exactly 8 hex digits.
An example of "operations" is &operations=JettonTransfer,0x0524c7ae,StonfiSwap.
The advantage of using hex strings is that it's possible to get transactions for operations
that are not yet present on `the list <https://github.com/tonkeeper/tongo/blob/master/abi/messages.md>`_.
:param args: Additional arguments to pass to the handler
"""
method = "v2/sse/accounts/transactions"
params = {'accounts': accounts}
params = {'accounts': ",".join(accounts)}
if operations:
params['operations'] = ",".join(operations)

for data in self._subscribe(method=method, params=params):
event = TransactionEventData(**json.loads(data))
result = handler(event, *args)
Expand All @@ -31,17 +45,14 @@ def subscribe_to_transactions(
def subscribe_to_traces(
self,
accounts: List[str],
handler: Callable[[TraceEventData, List[Any]], Any],
args: Tuple[Any, ...] = (),
handler: Callable[[TraceEventData, Any], Any],
args: Tuple = (),
) -> None:
"""
Subscribes to traces SSE events for the specified accounts.
:handler: A callable function to handle the SSEEvent
:accounts: A list of account addresses to subscribe to
Returns:
None
"""
method = "v2/sse/accounts/traces"
params = {'accounts': accounts}
Expand All @@ -54,8 +65,8 @@ def subscribe_to_traces(
def subscribe_to_mempool(
self,
accounts: List[str],
handler: Callable[[MempoolEventData, List[Any]], Any],
args: Tuple[Any, ...] = (),
handler: Callable[[MempoolEventData, Any], Any],
args: Tuple = (),
) -> None:
"""
Subscribes to mempool SSE events for the specified accounts.
Expand All @@ -64,9 +75,29 @@ def subscribe_to_mempool(
:accounts: A list of account addresses to subscribe to
"""
method = "v2/sse/mempool"
params = {'accounts': accounts}
params = {'accounts': ",".join(accounts)}
for data in self._subscribe(method=method, params=params):
event = MempoolEventData(**json.loads(data))
result = handler(event, *args)
if result is not None:
return result

def subscribe_to_blocks(
self,
workchain: Optional[Union[int, None]],
handler: Callable[[BlockEventData, Any], Any],
args: Tuple = (),
) -> Any:
"""
Subscribes to blocks SSE events for the specified workchains.
:handler: A callable function to handle the SSEEvent
:workchain: The ID of the workchain to subscribe to. If None, subscribes to all workchains.
"""
method = "v2/sse/blocks"
params = {} if workchain is None else {'workchain': workchain}
for data in self._subscribe(method=method, params=params):
event = BlockEventData(**json.loads(data))
result = handler(event, *args)
if result is not None:
return result

0 comments on commit 9878cc0

Please sign in to comment.