Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize safe events indexer #2194

Merged
merged 24 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions safe_transaction_service/history/indexers/events_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ def decode_elements(self, log_receipts: Sequence[LogReceipt]) -> List[EventData]
decoded_elements.append(decoded_element)
return decoded_elements

def _process_decoded_elements(self, decoded_elements: List[EventData]) -> List[Any]:
processed_elements = []
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)
return processed_elements

def process_elements(self, log_receipts: Sequence[LogReceipt]) -> List[Any]:
"""
Process all events found by `find_relevant_elements`
Expand Down Expand Up @@ -281,10 +288,7 @@ def process_elements(self, log_receipts: Sequence[LogReceipt]) -> List[Any]:
self.index_service.txs_create_or_update_from_tx_hashes(tx_hashes)
logger.debug("End prefetching and storing of ethereum txs")
logger.debug("Processing %d decoded events", len(decoded_elements))
processed_elements = []
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)
processed_elements = self._process_decoded_elements(decoded_elements)
logger.debug("End processing %d decoded events", len(decoded_elements))

logger.debug("Marking events as processed")
Expand Down
313 changes: 246 additions & 67 deletions safe_transaction_service/history/indexers/safe_events_indexer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import cached_property
from logging import getLogger
from typing import List, Optional
from typing import Any, Dict, List, Optional

from django.db import IntegrityError, transaction

Expand Down Expand Up @@ -287,6 +287,54 @@ def _is_setup_indexed(self, safe_address: ChecksumAddress):
def decode_elements(self, *args) -> List[EventData]:
return super().decode_elements(*args)

def _get_internal_tx_from_decoded_event(
self, decoded_event: EventData, **kwargs
) -> InternalTx:
"""
Creates an InternalTx instance from the given decoded_event.
Allows overriding object parameters with additional keyword arguments.

:param decoded_event:
:param kwargs:
:return:
"""
ethereum_tx_hash = HexBytes(decoded_event["transactionHash"])
ethereum_block_number = decoded_event["blockNumber"]
ethereum_block_timestamp = EthereumBlock.objects.get_timestamp_by_hash(
decoded_event["blockHash"]
)
address = decoded_event["address"]
default_trace_address = str(decoded_event["logIndex"])

# Setting default values
internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=address,
gas=50000,
data=b"",
to=decoded_event["args"].get("to", NULL_ADDRESS),
value=decoded_event["args"].get("value", 0),
gas_used=50000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
call_type=EthereumTxCallType.CALL.value,
trace_address=default_trace_address,
error=None,
)
# Overriding passed keys
for key, value in kwargs.items():
if hasattr(internal_tx, key):
setattr(internal_tx, key, value)
else:
raise AttributeError(f"Invalid atribute {key} for InternalTx")
moisses89 marked this conversation as resolved.
Show resolved Hide resolved

return internal_tx

@transaction.atomic
def _process_decoded_element(
self, decoded_element: EventData
Expand All @@ -299,7 +347,6 @@ def _process_decoded_element(
args = dict(decoded_element["args"])
ethereum_tx_hash = HexBytes(decoded_element["transactionHash"])
ethereum_tx_hash_hex = ethereum_tx_hash.hex()
ethereum_block_number = decoded_element["blockNumber"]
ethereum_block_timestamp = EthereumBlock.objects.get_timestamp_by_hash(
decoded_element["blockHash"]
)
Expand All @@ -311,65 +358,23 @@ def _process_decoded_element(
decoded_element,
)

internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=safe_address,
gas=50000,
data=b"",
to=NULL_ADDRESS, # It should be Master copy address but we cannot detect it
value=0,
gas_used=50000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
internal_tx = self._get_internal_tx_from_decoded_event(
decoded_element,
call_type=EthereumTxCallType.DELEGATE_CALL.value,
trace_address=trace_address,
error=None,
to=NULL_ADDRESS,
value=0,
)
child_internal_tx: Optional[InternalTx] = None # For Ether transfers
internal_tx_decoded = InternalTxDecoded(
internal_tx=internal_tx,
function_name="",
arguments=args,
)
if event_name == "ProxyCreation":
# Should be the 2nd event to be indexed, after `SafeSetup`
safe_address = args.pop("proxy")

if self._is_setup_indexed(safe_address):
internal_tx = None
else:
new_trace_address = f"{trace_address},0"
to = args.pop("singleton")

# Try to update InternalTx created by SafeSetup (if Safe was created using the ProxyFactory) with
# the master copy used. Without tracing it cannot be detected otherwise
InternalTx.objects.filter(
contract_address=safe_address, decoded_tx__function_name="setup"
).update(to=to, contract_address=None, trace_address=new_trace_address)
# Add creation internal tx. _from is the address of the proxy instead of the safe_address
internal_tx.contract_address = safe_address
internal_tx.tx_type = InternalTxType.CREATE.value
internal_tx.call_type = None
internal_tx_decoded = None
elif event_name == "SafeSetup":
# Should be the 1st event to be indexed, unless custom `to` and `data` are set
if self._is_setup_indexed(safe_address):
internal_tx = None
else:
# Usually ProxyCreation is called before SafeSetup, but it can be the opposite if someone
# creates a Safe and configure it in the next transaction. Remove it if that's the case
InternalTx.objects.filter(contract_address=safe_address).delete()
internal_tx.contract_address = safe_address
internal_tx_decoded.function_name = "setup"
args["payment"] = 0
args["paymentReceiver"] = NULL_ADDRESS
args["_threshold"] = args.pop("threshold")
args["_owners"] = args.pop("owners")
if event_name == "ProxyCreation" or event_name == "SafeSetup":
# Will ignore these events because were indexed in process_safe_creation_events
internal_tx = None

elif event_name == "SafeMultiSigTransaction":
internal_tx_decoded.function_name = "execTransaction"
data = HexBytes(args["data"])
Expand All @@ -392,25 +397,13 @@ def _process_decoded_element(
additional_info.hex(),
)
if args["value"] and not data: # Simulate ether transfer
child_internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=safe_address,
child_internal_tx = self._get_internal_tx_from_decoded_event(
decoded_element,
gas=23000,
data=b"",
to=args["to"],
value=args["value"],
gas_used=23000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
call_type=EthereumTxCallType.CALL.value,
trace_address=f"{trace_address},0",
error=None,
)

elif event_name == "SafeModuleTransaction":
internal_tx_decoded.function_name = "execTransactionFromModule"
args["data"] = HexBytes(args["data"]).hex()
Expand Down Expand Up @@ -485,3 +478,189 @@ def _process_decoded_element(
)

return internal_tx

def _get_safe_creation_events(
self, decoded_elements: List[EventData]
) -> Dict[ChecksumAddress, List[EventData]]:
"""
Get the creation events (ProxyCreation and SafeSetup) from decoded elements and generates a dictionary that groups these events by Safe address.

:param decoded_elements:
:return:
"""
safe_creation_events: Dict[ChecksumAddress, List[Dict]] = {}
for decoded_element in decoded_elements:
event_name = decoded_element["event"]
if event_name == "SafeSetup":
safe_address = decoded_element["address"]
safe_creation_events.setdefault(safe_address, []).append(
decoded_element
)
elif event_name == "ProxyCreation":
safe_address = decoded_element["args"].get("proxy")
safe_creation_events.setdefault(safe_address, []).append(
decoded_element
)

return safe_creation_events

def _process_safe_creation_events(
self,
safe_addresses_with_creation_events: Dict[ChecksumAddress, List[EventData]],
) -> List[InternalTx]:
"""
Process creation events (ProxyCreation and SafeSetup).

:param safe_addresses_with_creation_events:
:return:
"""
internal_txs = []
internal_decoded_txs = []
# Check if were indexed
safe_creation_events_addresses = set(safe_addresses_with_creation_events.keys())
indexed_addresses = InternalTxDecoded.objects.filter(
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
internal_tx___from__in=safe_creation_events_addresses,
function_name="setup",
internal_tx__contract_address=None,
).values_list("internal_tx___from", flat=True)
# Ignoring the already indexed contracts
addresses_to_index = safe_creation_events_addresses - set(indexed_addresses)

for safe_address in addresses_to_index:
events = safe_addresses_with_creation_events[safe_address]
for event_position, event in enumerate(events):
if event["event"] == "SafeSetup":
setup_event = event
# If we have both events we should extract Singleton and trace_address from ProxyCreation event
if len(events) > 1:
if (
event_position == 0
and events[1]["event"] == "ProxyCreation"
):
# Usually SafeSetup is the first event and next is ProxyCreation when ProxyFactory is called with initializer.
proxy_creation_event = events[1]
elif (
event_position == 1
and events[0]["event"] == "ProxyCreation"
):
# ProxyCreation first and SafeSetup later
proxy_creation_event = events[0]
else:
# Proxy was created in previous blocks.
Copy link
Member Author

@moisses89 moisses89 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation always tried to remove the InternalTx generated from ProxyCreation just in case the ProxyCreation came first. https://github.com/safe-global/safe-transaction-service/blob/main/safe_transaction_service/history/indexers/safe_events_indexer.py#L366
In such case, because the ProxyCreation never comes later, the InternalTx generated by SafeSetup leaves without singleton (to) forever.
Maybe would be better try to get the Proxy InternalTx from database to insert the correct parameters of SafeSetup what do you think? @safe-global/core-api
In the current implementation the ProxyCreation is storing the singleton address in the to field of InternalTx, not sure if is completely right.

proxy_creation_event = None
# Safe was created and configure it in the next transaction. Remove it if that's the case
InternalTx.objects.filter(
contract_address=safe_address
).delete()

# Generate InternalTx and internalDecodedTx for SafeSetup event
setup_trace_address = (
f"{str(proxy_creation_event['logIndex'])},0"
if proxy_creation_event
else str(setup_event["logIndex"])
)
singleton = (
proxy_creation_event["args"].get("singleton")
if proxy_creation_event
else NULL_ADDRESS
)
# Keep previous implementation
contract_address = None if proxy_creation_event else safe_address
internal_tx = self._get_internal_tx_from_decoded_event(
setup_event,
contract_address=contract_address,
to=singleton,
trace_address=setup_trace_address,
call_type=EthereumTxCallType.DELEGATE_CALL.value,
)
# Generate InternalDecodedTx for SafeSetup event
setup_args = dict(event["args"])
setup_args["payment"] = 0
setup_args["paymentReceiver"] = NULL_ADDRESS
setup_args["_threshold"] = setup_args.pop("threshold")
setup_args["_owners"] = setup_args.pop("owners")
internal_tx_decoded = InternalTxDecoded(
internal_tx=internal_tx,
function_name="setup",
arguments=setup_args,
)
internal_txs.append(internal_tx)
internal_decoded_txs.append(internal_tx_decoded)
elif event["event"] == "ProxyCreation":
proxy_creation_event = event
# Generate InternalTx for ProxyCreation
internal_tx = self._get_internal_tx_from_decoded_event(
proxy_creation_event,
contract_address=proxy_creation_event["args"].get("proxy"),
tx_type=InternalTxType.CREATE.value,
call_type=None,
)
internal_txs.append(internal_tx)
else:
logger.error(f"Event is not a Safe creation event {event['event']}")

# Store which internal txs where inserted into database
stored_internal_txs: List[InternalTx] = []
with transaction.atomic():
try:
InternalTx.objects.bulk_create(internal_txs)
InternalTxDecoded.objects.bulk_create(internal_decoded_txs)
stored_internal_txs = internal_txs
except IntegrityError:
logger.info(
"Cannot bulk create the provided Safe Creation Events, trying one by one"
)

if internal_txs and not stored_internal_txs:
# Fallback handler in case of integrity error
for internal_decoded_tx in internal_decoded_txs:
try:
with transaction.atomic():
# Insert first the internal_tx with internalDecodedTx relation
InternalTx.save(internal_decoded_tx.internal_tx)
InternalTxDecoded.save(internal_decoded_tx)
# Remove inserted internal transactions from the list
internal_txs.remove(internal_decoded_tx.internal_tx)
stored_internal_txs.append(internal_tx)
except IntegrityError:
logger.info(
"Ignoring already processed InternalTx with tx-hash=%s and trace-address=%s",
internal_decoded_tx.internal_tx.ethereum_tx_id.hex(),
internal_decoded_tx.internal_tx.trace_address,
)
pass
# Insert the remaining InternalTxs
for internal_tx in internal_txs:
try:
with transaction.atomic():
InternalTx.save(internal_tx)
stored_internal_txs.append(internal_tx)
except IntegrityError:
logger.info(
"Ignoring already processed InternalTx with tx-hash=%s and trace-address=%s",
internal_tx.ethereum_tx_id.hex(),
internal_tx.trace_address,
)
pass

return stored_internal_txs

def _process_decoded_elements(self, decoded_elements: list[EventData]) -> list[Any]:
processed_elements = []
# Extract Safe creation events by Safe from decoded_elements list
safe_addresses_creation_events = self._get_safe_creation_events(
decoded_elements
)
if safe_addresses_creation_events:
# Process safe creation events
creation_events_processed = self._process_safe_creation_events(
safe_addresses_creation_events
)
processed_elements.extend(creation_events_processed)

# Process the rest of Safe events
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)

return processed_elements
Loading