Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize safe events indexer #2194

Merged
merged 24 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
from collections import OrderedDict
from logging import getLogger
from typing import Iterator, List, NamedTuple, Optional, Sequence
from typing import Any, Iterator, List, NamedTuple, Optional, Sequence

from django.db.models import QuerySet
from django.db.models.query import EmptyQuerySet
Expand Down Expand Up @@ -139,6 +139,9 @@ def _process_decoded_element(self, decoded_element: EventData) -> None:
"""
pass

def _process_decoded_elements(self, decoded_element: List[EventData]) -> List[Any]:
pass

def events_to_erc20_transfer(
self, log_receipts: Sequence[EventData]
) -> Iterator[ERC20Transfer]:
Expand Down
9 changes: 5 additions & 4 deletions safe_transaction_service/history/indexers/events_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def _find_elements_using_topics(
def _process_decoded_element(self, decoded_element: EventData) -> Any:
pass

@abstractmethod
def _process_decoded_elements(self, decoded_element: List[EventData]) -> List[Any]:
pass

def find_relevant_elements(
self,
addresses: set[ChecksumAddress],
Expand Down Expand Up @@ -281,10 +285,7 @@ def process_elements(self, log_receipts: Sequence[LogReceipt]) -> List[Any]:
self.index_service.txs_create_or_update_from_tx_hashes(tx_hashes)
logger.debug("End prefetching and storing of ethereum txs")
logger.debug("Processing %d decoded events", len(decoded_elements))
processed_elements = []
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)
processed_elements = self._process_decoded_elements(decoded_elements)
logger.debug("End processing %d decoded events", len(decoded_elements))

logger.debug("Marking events as processed")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import cached_property
from logging import getLogger
from typing import List, Optional, Sequence
from typing import Any, List, Optional, Sequence

from web3.contract.contract import ContractEvent
from web3.types import EventData, LogReceipt
Expand Down Expand Up @@ -85,6 +85,13 @@ def _process_decoded_element(
ethereum_tx_id=decoded_element["transactionHash"],
)

def _process_decoded_elements(self, decoded_elements: List[EventData]) -> List[Any]:
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
processed_elements = []
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)
return processed_elements

def process_elements(
self, log_receipts: Sequence[LogReceipt]
) -> List[SafeContract]:
Expand Down
261 changes: 194 additions & 67 deletions safe_transaction_service/history/indexers/safe_events_indexer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import cached_property
from logging import getLogger
from typing import List, Optional
from typing import Any, Dict, List, Optional

from django.db import IntegrityError, transaction

Expand Down Expand Up @@ -287,6 +287,54 @@ def _is_setup_indexed(self, safe_address: ChecksumAddress):
def decode_elements(self, *args) -> List[EventData]:
return super().decode_elements(*args)

def _get_internal_tx_from_decoded_event(
self, decoded_event: EventData, **kwargs
) -> InternalTx:
"""
Creates an InternalTx instance from the given decoded_event.
Allows overriding object parameters with additional keyword arguments.

:param decoded_event:
:param kwargs:
:return:
"""
ethereum_tx_hash = HexBytes(decoded_event["transactionHash"])
ethereum_block_number = decoded_event["blockNumber"]
ethereum_block_timestamp = EthereumBlock.objects.get_timestamp_by_hash(
decoded_event["blockHash"]
)
address = decoded_event["address"]
default_trace_address = str(decoded_event["logIndex"])

# Setting default values
internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=address,
gas=50000,
data=b"",
to=decoded_event["args"].get("to", NULL_ADDRESS),
value=decoded_event["args"].get("value", 0),
gas_used=50000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
call_type=EthereumTxCallType.CALL.value,
trace_address=default_trace_address,
error=None,
)
# Overriding passed keys
for key, value in kwargs.items():
if hasattr(internal_tx, key):
setattr(internal_tx, key, value)
else:
raise AttributeError(f"Invalid atribute {key} for InternalTx")
moisses89 marked this conversation as resolved.
Show resolved Hide resolved

return internal_tx

@transaction.atomic
def _process_decoded_element(
self, decoded_element: EventData
Expand All @@ -299,7 +347,6 @@ def _process_decoded_element(
args = dict(decoded_element["args"])
ethereum_tx_hash = HexBytes(decoded_element["transactionHash"])
ethereum_tx_hash_hex = ethereum_tx_hash.hex()
ethereum_block_number = decoded_element["blockNumber"]
ethereum_block_timestamp = EthereumBlock.objects.get_timestamp_by_hash(
decoded_element["blockHash"]
)
Expand All @@ -311,65 +358,23 @@ def _process_decoded_element(
decoded_element,
)

internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=safe_address,
gas=50000,
data=b"",
to=NULL_ADDRESS, # It should be Master copy address but we cannot detect it
value=0,
gas_used=50000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
internal_tx = self._get_internal_tx_from_decoded_event(
decoded_element,
call_type=EthereumTxCallType.DELEGATE_CALL.value,
trace_address=trace_address,
error=None,
to=NULL_ADDRESS,
value=0,
)
child_internal_tx: Optional[InternalTx] = None # For Ether transfers
internal_tx_decoded = InternalTxDecoded(
internal_tx=internal_tx,
function_name="",
arguments=args,
)
if event_name == "ProxyCreation":
# Should be the 2nd event to be indexed, after `SafeSetup`
safe_address = args.pop("proxy")

if self._is_setup_indexed(safe_address):
internal_tx = None
else:
new_trace_address = f"{trace_address},0"
to = args.pop("singleton")

# Try to update InternalTx created by SafeSetup (if Safe was created using the ProxyFactory) with
# the master copy used. Without tracing it cannot be detected otherwise
InternalTx.objects.filter(
contract_address=safe_address, decoded_tx__function_name="setup"
).update(to=to, contract_address=None, trace_address=new_trace_address)
# Add creation internal tx. _from is the address of the proxy instead of the safe_address
internal_tx.contract_address = safe_address
internal_tx.tx_type = InternalTxType.CREATE.value
internal_tx.call_type = None
internal_tx_decoded = None
elif event_name == "SafeSetup":
# Should be the 1st event to be indexed, unless custom `to` and `data` are set
if self._is_setup_indexed(safe_address):
internal_tx = None
else:
# Usually ProxyCreation is called before SafeSetup, but it can be the opposite if someone
# creates a Safe and configure it in the next transaction. Remove it if that's the case
InternalTx.objects.filter(contract_address=safe_address).delete()
internal_tx.contract_address = safe_address
internal_tx_decoded.function_name = "setup"
args["payment"] = 0
args["paymentReceiver"] = NULL_ADDRESS
args["_threshold"] = args.pop("threshold")
args["_owners"] = args.pop("owners")
if event_name == "ProxyCreation" or event_name == "SafeSetup":
# Will ignore this events because were indexed in process_safe_creation_events
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
internal_tx = None

elif event_name == "SafeMultiSigTransaction":
internal_tx_decoded.function_name = "execTransaction"
data = HexBytes(args["data"])
Expand All @@ -392,25 +397,13 @@ def _process_decoded_element(
additional_info.hex(),
)
if args["value"] and not data: # Simulate ether transfer
child_internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=safe_address,
child_internal_tx = self._get_internal_tx_from_decoded_event(
decoded_element,
gas=23000,
data=b"",
to=args["to"],
value=args["value"],
gas_used=23000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
call_type=EthereumTxCallType.CALL.value,
trace_address=f"{trace_address},0",
error=None,
)

elif event_name == "SafeModuleTransaction":
internal_tx_decoded.function_name = "execTransactionFromModule"
args["data"] = HexBytes(args["data"]).hex()
Expand Down Expand Up @@ -485,3 +478,137 @@ def _process_decoded_element(
)

return internal_tx

def _get_safe_creation_events(
self, decoded_elements: List[EventData]
) -> Dict[ChecksumAddress, List[EventData]]:
"""
Get the creation events (ProxyCreation and SafeSetup) from decoded elements and generates a dictionary that groups these events by Safe address.

:param decoded_elements:
:return:
"""
safe_setup_events: Dict[ChecksumAddress, List[Dict]] = {}
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
for decoded_element in decoded_elements:
event_name = decoded_element["event"]
if event_name == "SafeSetup":
safe_address = decoded_element["address"]
safe_setup_events.setdefault(safe_address, []).append(decoded_element)
elif event_name == "ProxyCreation":
safe_address = decoded_element["args"].get("proxy")
safe_setup_events.setdefault(safe_address, []).append(decoded_element)

return safe_setup_events

@transaction.atomic
def _process_safe_creation_events(
self, safe_setup_events: Dict[ChecksumAddress, List[EventData]]
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
) -> List[InternalTx]:
"""
Process creation events (ProxyCreation and SafeSetup).

:param safe_setup_events:
:return:
"""
internal_txs = []
internal_decoded_txs = []
# Check if were indexed
safe_setup_events_addresses = list(safe_setup_events.keys())
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
indexed_addresses = InternalTxDecoded.objects.filter(
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
internal_tx___from__in=safe_setup_events_addresses,
function_name="setup",
internal_tx__contract_address=None,
).values_list("internal_tx___from", flat=True)
addresses_to_index = set(safe_setup_events_addresses) - set(indexed_addresses)
for safe_address in addresses_to_index:
events = safe_setup_events[safe_address]
for event_position, event in enumerate(events):
if event["event"] == "SafeSetup":
setup_event = event
# If we have both events we should extract Singleton and trace_address from ProxyCreation event
if len(events) > 1:
if (
event_position == 0
and events[1]["event"] == "ProxyCreation"
):
# Usually SafeSetup is the first event and next is ProxyCreation when ProxyCreation is called with initializer.
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
proxy_creation_event = events[1]
elif (
event_position == 1
and events[0]["event"] == "ProxyCreation"
):
# ProxyCreation first and SafeSetup later
proxy_creation_event = events[1]
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
else:
# Proxy was created in previous blocks.
Copy link
Member Author

@moisses89 moisses89 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation always tried to remove the InternalTx generated from ProxyCreation just in case the ProxyCreation came first. https://github.com/safe-global/safe-transaction-service/blob/main/safe_transaction_service/history/indexers/safe_events_indexer.py#L366
In such case, because the ProxyCreation never comes later, the InternalTx generated by SafeSetup leaves without singleton (to) forever.
Maybe would be better try to get the Proxy InternalTx from database to insert the correct parameters of SafeSetup what do you think? @safe-global/core-api
In the current implementation the ProxyCreation is storing the singleton address in the to field of InternalTx, not sure if is completely right.

proxy_creation_event = None
# TODO store decoded ProxyCreation event to get the singleton address
moisses89 marked this conversation as resolved.
Show resolved Hide resolved

# Generate InternalTx and internalDecodedTx for SafeSetup event
setup_trace_address = (
f"{str(proxy_creation_event['logIndex'])},0"
if proxy_creation_event
else str(setup_event["logIndex"])
)
singleton = (
proxy_creation_event["args"].get("singleton")
if proxy_creation_event
else NULL_ADDRESS
)
internal_tx = self._get_internal_tx_from_decoded_event(
setup_event,
to=singleton,
trace_address=setup_trace_address,
call_type=EthereumTxCallType.DELEGATE_CALL.value,
)
# Generate InternalDecodedTx for SafeSetup event
setup_args = dict(event["args"])
setup_args["payment"] = 0
setup_args["paymentReceiver"] = NULL_ADDRESS
setup_args["_threshold"] = setup_args.pop("threshold")
setup_args["_owners"] = setup_args.pop("owners")
internal_tx_decoded = InternalTxDecoded(
internal_tx=internal_tx,
function_name="setup",
arguments=setup_args,
)
internal_txs.append(internal_tx)
internal_decoded_txs.append(internal_tx_decoded)
elif event["event"] == "ProxyCreation":
proxy_creation_event = event
# Generate InternalTx for ProxyCreation
internal_tx = self._get_internal_tx_from_decoded_event(
proxy_creation_event,
contract_address=proxy_creation_event["args"].get("proxy"),
tx_type=InternalTxType.CREATE.value,
call_type=None,
)
internal_txs.append(internal_tx)
else:
logger.error(f"Event is not a Safe creation event {event['event']}")

with transaction.atomic():
InternalTx.objects.bulk_create(internal_txs)
InternalTxDecoded.objects.bulk_create(internal_decoded_txs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if they are already inserted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise an exception.
Ignoring conflicts in such case would fix the issue, 5eeabd4

Copy link
Member Author

@moisses89 moisses89 Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Uxio0 ignore_conflicts fail because InternalDecodedTx have relation with internal_tx, basically the ORM try to prevent lose data.
We can use instead of that update_conflicts, but I'm concerned because I just discovered that bulk_create does not call save, and in that case post_save will not be fired.
Will this affect our signals handle?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bulk_create does in case we inherit our BulkCreateMixin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a workaround in case that would be a collision inserting the same instance, shouldn't be any common because the bulk create is in atomic section and we are checking if the contract were indexed, basically if the setup and proxyevent were correctly inserted, shouldn't reach the insert section if would be indexed again.

logger.info(f"Inserted {len(internal_txs)} internal_txs ")
logger.info(f"Inserted {len(internal_decoded_txs)} internal_decoded_txs")

return internal_txs

def _process_decoded_elements(self, decoded_elements: list[EventData]) -> List[Any]:
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
processed_elements = []
# Extract Safe creation events from decoded_elements list
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
safe_setup_events = self._get_safe_creation_events(decoded_elements)
if safe_setup_events:
# Process safe creation events
creation_events_processed = self._process_safe_creation_events(
safe_setup_events
)
processed_elements.extend(creation_events_processed)

# Process the rest of Safe events
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)

return processed_elements
Loading
Loading