Skip to content

Commit

Permalink
Optimize safe events indexer (#2194)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Uxío <[email protected]>
  • Loading branch information
moisses89 and Uxio0 authored Sep 13, 2024
1 parent 40b875e commit 8f96b0b
Show file tree
Hide file tree
Showing 4 changed files with 575 additions and 74 deletions.
12 changes: 8 additions & 4 deletions safe_transaction_service/history/indexers/events_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ def decode_elements(self, log_receipts: Sequence[LogReceipt]) -> List[EventData]
decoded_elements.append(decoded_element)
return decoded_elements

def _process_decoded_elements(self, decoded_elements: List[EventData]) -> List[Any]:
processed_elements = []
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)
return processed_elements

def process_elements(self, log_receipts: Sequence[LogReceipt]) -> List[Any]:
"""
Process all events found by `find_relevant_elements`
Expand Down Expand Up @@ -281,10 +288,7 @@ def process_elements(self, log_receipts: Sequence[LogReceipt]) -> List[Any]:
self.index_service.txs_create_or_update_from_tx_hashes(tx_hashes)
logger.debug("End prefetching and storing of ethereum txs")
logger.debug("Processing %d decoded events", len(decoded_elements))
processed_elements = []
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)
processed_elements = self._process_decoded_elements(decoded_elements)
logger.debug("End processing %d decoded events", len(decoded_elements))

logger.debug("Marking events as processed")
Expand Down
313 changes: 246 additions & 67 deletions safe_transaction_service/history/indexers/safe_events_indexer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import cached_property
from logging import getLogger
from typing import List, Optional
from typing import Any, Dict, List, Optional

from django.db import IntegrityError, transaction

Expand Down Expand Up @@ -287,6 +287,54 @@ def _is_setup_indexed(self, safe_address: ChecksumAddress):
def decode_elements(self, *args) -> List[EventData]:
return super().decode_elements(*args)

def _get_internal_tx_from_decoded_event(
self, decoded_event: EventData, **kwargs
) -> InternalTx:
"""
Creates an InternalTx instance from the given decoded_event.
Allows overriding object parameters with additional keyword arguments.
:param decoded_event:
:param kwargs:
:return:
"""
ethereum_tx_hash = HexBytes(decoded_event["transactionHash"])
ethereum_block_number = decoded_event["blockNumber"]
ethereum_block_timestamp = EthereumBlock.objects.get_timestamp_by_hash(
decoded_event["blockHash"]
)
address = decoded_event["address"]
default_trace_address = str(decoded_event["logIndex"])

# Setting default values
internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=address,
gas=50000,
data=b"",
to=decoded_event["args"].get("to", NULL_ADDRESS),
value=decoded_event["args"].get("value", 0),
gas_used=50000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
call_type=EthereumTxCallType.CALL.value,
trace_address=default_trace_address,
error=None,
)
# Overriding passed keys
for key, value in kwargs.items():
if hasattr(internal_tx, key):
setattr(internal_tx, key, value)
else:
raise AttributeError(f"Invalid atribute {key} for InternalTx")

return internal_tx

@transaction.atomic
def _process_decoded_element(
self, decoded_element: EventData
Expand All @@ -299,7 +347,6 @@ def _process_decoded_element(
args = dict(decoded_element["args"])
ethereum_tx_hash = HexBytes(decoded_element["transactionHash"])
ethereum_tx_hash_hex = ethereum_tx_hash.hex()
ethereum_block_number = decoded_element["blockNumber"]
ethereum_block_timestamp = EthereumBlock.objects.get_timestamp_by_hash(
decoded_element["blockHash"]
)
Expand All @@ -311,65 +358,23 @@ def _process_decoded_element(
decoded_element,
)

internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=safe_address,
gas=50000,
data=b"",
to=NULL_ADDRESS, # It should be Master copy address but we cannot detect it
value=0,
gas_used=50000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
internal_tx = self._get_internal_tx_from_decoded_event(
decoded_element,
call_type=EthereumTxCallType.DELEGATE_CALL.value,
trace_address=trace_address,
error=None,
to=NULL_ADDRESS,
value=0,
)
child_internal_tx: Optional[InternalTx] = None # For Ether transfers
internal_tx_decoded = InternalTxDecoded(
internal_tx=internal_tx,
function_name="",
arguments=args,
)
if event_name == "ProxyCreation":
# Should be the 2nd event to be indexed, after `SafeSetup`
safe_address = args.pop("proxy")

if self._is_setup_indexed(safe_address):
internal_tx = None
else:
new_trace_address = f"{trace_address},0"
to = args.pop("singleton")

# Try to update InternalTx created by SafeSetup (if Safe was created using the ProxyFactory) with
# the master copy used. Without tracing it cannot be detected otherwise
InternalTx.objects.filter(
contract_address=safe_address, decoded_tx__function_name="setup"
).update(to=to, contract_address=None, trace_address=new_trace_address)
# Add creation internal tx. _from is the address of the proxy instead of the safe_address
internal_tx.contract_address = safe_address
internal_tx.tx_type = InternalTxType.CREATE.value
internal_tx.call_type = None
internal_tx_decoded = None
elif event_name == "SafeSetup":
# Should be the 1st event to be indexed, unless custom `to` and `data` are set
if self._is_setup_indexed(safe_address):
internal_tx = None
else:
# Usually ProxyCreation is called before SafeSetup, but it can be the opposite if someone
# creates a Safe and configure it in the next transaction. Remove it if that's the case
InternalTx.objects.filter(contract_address=safe_address).delete()
internal_tx.contract_address = safe_address
internal_tx_decoded.function_name = "setup"
args["payment"] = 0
args["paymentReceiver"] = NULL_ADDRESS
args["_threshold"] = args.pop("threshold")
args["_owners"] = args.pop("owners")
if event_name == "ProxyCreation" or event_name == "SafeSetup":
# Will ignore these events because were indexed in process_safe_creation_events
internal_tx = None

elif event_name == "SafeMultiSigTransaction":
internal_tx_decoded.function_name = "execTransaction"
data = HexBytes(args["data"])
Expand All @@ -392,25 +397,13 @@ def _process_decoded_element(
additional_info.hex(),
)
if args["value"] and not data: # Simulate ether transfer
child_internal_tx = InternalTx(
ethereum_tx_id=ethereum_tx_hash,
timestamp=ethereum_block_timestamp,
block_number=ethereum_block_number,
_from=safe_address,
child_internal_tx = self._get_internal_tx_from_decoded_event(
decoded_element,
gas=23000,
data=b"",
to=args["to"],
value=args["value"],
gas_used=23000,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=InternalTxType.CALL.value,
call_type=EthereumTxCallType.CALL.value,
trace_address=f"{trace_address},0",
error=None,
)

elif event_name == "SafeModuleTransaction":
internal_tx_decoded.function_name = "execTransactionFromModule"
args["data"] = HexBytes(args["data"]).hex()
Expand Down Expand Up @@ -485,3 +478,189 @@ def _process_decoded_element(
)

return internal_tx

def _get_safe_creation_events(
self, decoded_elements: List[EventData]
) -> Dict[ChecksumAddress, List[EventData]]:
"""
Get the creation events (ProxyCreation and SafeSetup) from decoded elements and generates a dictionary that groups these events by Safe address.
:param decoded_elements:
:return:
"""
safe_creation_events: Dict[ChecksumAddress, List[Dict]] = {}
for decoded_element in decoded_elements:
event_name = decoded_element["event"]
if event_name == "SafeSetup":
safe_address = decoded_element["address"]
safe_creation_events.setdefault(safe_address, []).append(
decoded_element
)
elif event_name == "ProxyCreation":
safe_address = decoded_element["args"].get("proxy")
safe_creation_events.setdefault(safe_address, []).append(
decoded_element
)

return safe_creation_events

def _process_safe_creation_events(
self,
safe_addresses_with_creation_events: Dict[ChecksumAddress, List[EventData]],
) -> List[InternalTx]:
"""
Process creation events (ProxyCreation and SafeSetup).
:param safe_addresses_with_creation_events:
:return:
"""
internal_txs = []
internal_decoded_txs = []
# Check if were indexed
safe_creation_events_addresses = set(safe_addresses_with_creation_events.keys())
indexed_addresses = InternalTxDecoded.objects.filter(
internal_tx___from__in=safe_creation_events_addresses,
function_name="setup",
internal_tx__contract_address=None,
).values_list("internal_tx___from", flat=True)
# Ignoring the already indexed contracts
addresses_to_index = safe_creation_events_addresses - set(indexed_addresses)

for safe_address in addresses_to_index:
events = safe_addresses_with_creation_events[safe_address]
for event_position, event in enumerate(events):
if event["event"] == "SafeSetup":
setup_event = event
# If we have both events we should extract Singleton and trace_address from ProxyCreation event
if len(events) > 1:
if (
event_position == 0
and events[1]["event"] == "ProxyCreation"
):
# Usually SafeSetup is the first event and next is ProxyCreation when ProxyFactory is called with initializer.
proxy_creation_event = events[1]
elif (
event_position == 1
and events[0]["event"] == "ProxyCreation"
):
# ProxyCreation first and SafeSetup later
proxy_creation_event = events[0]
else:
# Proxy was created in previous blocks.
proxy_creation_event = None
# Safe was created and configure it in the next transaction. Remove it if that's the case
InternalTx.objects.filter(
contract_address=safe_address
).delete()

# Generate InternalTx and internalDecodedTx for SafeSetup event
setup_trace_address = (
f"{str(proxy_creation_event['logIndex'])},0"
if proxy_creation_event
else str(setup_event["logIndex"])
)
singleton = (
proxy_creation_event["args"].get("singleton")
if proxy_creation_event
else NULL_ADDRESS
)
# Keep previous implementation
contract_address = None if proxy_creation_event else safe_address
internal_tx = self._get_internal_tx_from_decoded_event(
setup_event,
contract_address=contract_address,
to=singleton,
trace_address=setup_trace_address,
call_type=EthereumTxCallType.DELEGATE_CALL.value,
)
# Generate InternalDecodedTx for SafeSetup event
setup_args = dict(event["args"])
setup_args["payment"] = 0
setup_args["paymentReceiver"] = NULL_ADDRESS
setup_args["_threshold"] = setup_args.pop("threshold")
setup_args["_owners"] = setup_args.pop("owners")
internal_tx_decoded = InternalTxDecoded(
internal_tx=internal_tx,
function_name="setup",
arguments=setup_args,
)
internal_txs.append(internal_tx)
internal_decoded_txs.append(internal_tx_decoded)
elif event["event"] == "ProxyCreation":
proxy_creation_event = event
# Generate InternalTx for ProxyCreation
internal_tx = self._get_internal_tx_from_decoded_event(
proxy_creation_event,
contract_address=proxy_creation_event["args"].get("proxy"),
tx_type=InternalTxType.CREATE.value,
call_type=None,
)
internal_txs.append(internal_tx)
else:
logger.error(f"Event is not a Safe creation event {event['event']}")

# Store which internal txs where inserted into database
stored_internal_txs: List[InternalTx] = []
with transaction.atomic():
try:
InternalTx.objects.bulk_create(internal_txs)
InternalTxDecoded.objects.bulk_create(internal_decoded_txs)
stored_internal_txs = internal_txs
except IntegrityError:
logger.info(
"Cannot bulk create the provided Safe Creation Events, trying one by one"
)

if internal_txs and not stored_internal_txs:
# Fallback handler in case of integrity error
for internal_decoded_tx in internal_decoded_txs:
try:
with transaction.atomic():
# Insert first the internal_tx with internalDecodedTx relation
InternalTx.save(internal_decoded_tx.internal_tx)
InternalTxDecoded.save(internal_decoded_tx)
# Remove inserted internal transactions from the list
internal_txs.remove(internal_decoded_tx.internal_tx)
stored_internal_txs.append(internal_tx)
except IntegrityError:
logger.info(
"Ignoring already processed InternalTx with tx-hash=%s and trace-address=%s",
internal_decoded_tx.internal_tx.ethereum_tx_id.hex(),
internal_decoded_tx.internal_tx.trace_address,
)
pass
# Insert the remaining InternalTxs
for internal_tx in internal_txs:
try:
with transaction.atomic():
InternalTx.save(internal_tx)
stored_internal_txs.append(internal_tx)
except IntegrityError:
logger.info(
"Ignoring already processed InternalTx with tx-hash=%s and trace-address=%s",
internal_tx.ethereum_tx_id.hex(),
internal_tx.trace_address,
)
pass

return stored_internal_txs

def _process_decoded_elements(self, decoded_elements: list[EventData]) -> list[Any]:
processed_elements = []
# Extract Safe creation events by Safe from decoded_elements list
safe_addresses_creation_events = self._get_safe_creation_events(
decoded_elements
)
if safe_addresses_creation_events:
# Process safe creation events
creation_events_processed = self._process_safe_creation_events(
safe_addresses_creation_events
)
processed_elements.extend(creation_events_processed)

# Process the rest of Safe events
for decoded_element in decoded_elements:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)

return processed_elements
Loading

0 comments on commit 8f96b0b

Please sign in to comment.