Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index internal txs in batch for safe events #2176

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 106 additions & 29 deletions safe_transaction_service/history/indexers/safe_events_indexer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from functools import cached_property
from logging import getLogger
from typing import List, Optional
from typing import Any, Dict, List, Optional, OrderedDict, Sequence

from django.db import IntegrityError, transaction
from django.db import transaction

from eth_abi import decode as decode_abi
from eth_abi.exceptions import DecodingError
from eth_typing import ChecksumAddress
from hexbytes import HexBytes
from web3.contract.contract import ContractEvent
from web3.types import EventData
from web3.types import EventData, LogReceipt

from gnosis.eth import EthereumClient
from gnosis.eth.constants import NULL_ADDRESS
Expand Down Expand Up @@ -62,6 +62,10 @@ class SafeEventsIndexer(EventsIndexer):
True # Search for logs in every address (like the ProxyFactory)
)

def __init__(self, *args, **kwargs):
self.safe_setup_cache: Dict[ChecksumAddress, InternalTx] = {}
super().__init__(*args, **kwargs)

@cached_property
def contract_events(self) -> List[ContractEvent]:
"""
Expand Down Expand Up @@ -277,7 +281,10 @@ def _is_setup_indexed(self, safe_address: ChecksumAddress):
:param safe_address:
:return: ``True`` if ``SafeSetup`` event was processed, ``False`` otherwise
"""
return InternalTxDecoded.objects.filter(
return (
safe_address in self.safe_setup_cache
and self.safe_setup_cache[safe_address].contract_address is None
) or InternalTxDecoded.objects.filter(
function_name="setup",
internal_tx___from=safe_address,
internal_tx__contract_address=None,
Expand Down Expand Up @@ -350,6 +357,12 @@ def _process_decoded_element(
InternalTx.objects.filter(
contract_address=safe_address, decoded_tx__function_name="setup"
).update(to=to, contract_address=None, trace_address=new_trace_address)

setup_internal_tx = self.safe_setup_cache[safe_address]
setup_internal_tx.to = to
setup_internal_tx.contract_address = None
setup_internal_tx.trace_address = new_trace_address

# Add creation internal tx. _from is the address of the proxy instead of the safe_address
internal_tx.contract_address = safe_address
internal_tx.tx_type = InternalTxType.CREATE.value
Expand All @@ -369,6 +382,7 @@ def _process_decoded_element(
args["paymentReceiver"] = NULL_ADDRESS
args["_threshold"] = args.pop("threshold")
args["_owners"] = args.pop("owners")
self.safe_setup_cache[safe_address] = internal_tx
elif event_name == "SafeMultiSigTransaction":
internal_tx_decoded.function_name = "execTransaction"
data = HexBytes(args["data"])
Expand Down Expand Up @@ -449,32 +463,24 @@ def _process_decoded_element(
# 'ExecutionFromModuleSuccess', 'ExecutionFromModuleFailure'
internal_tx_decoded = None

internal_txs = []
internal_txs_decoded = []
safe_relevant_txs = []
if internal_tx:
with transaction.atomic():
try:
internal_tx.save()
if internal_tx.is_ether_transfer:
# Store Incoming Ether Transfers as relevant transactions for a Safe
SafeRelevantTransaction.objects.get_or_create(
ethereum_tx_id=ethereum_tx_hash,
safe=safe_address,
defaults={
"timestamp": ethereum_block["timestamp"],
},
)
if child_internal_tx:
child_internal_tx.save()
if internal_tx_decoded:
internal_tx_decoded.save()
except IntegrityError as exc:
logger.info(
"Ignoring already processed event %s for Safe %s on tx-hash=%s: %s",
event_name,
safe_address,
decoded_element["transactionHash"].hex(),
exc,
internal_txs.append(internal_tx)
if internal_tx.is_ether_transfer:
# Store Incoming Ether Transfers as relevant transactions for a Safe
safe_relevant_txs.append(
SafeRelevantTransaction(
ethereum_tx_id=ethereum_tx_hash,
safe=safe_address,
timestamp=ethereum_block["timestamp"],
)
return None
)
if child_internal_tx:
internal_txs.append(child_internal_tx)
if internal_tx_decoded:
internal_txs_decoded.append(internal_tx_decoded)

logger.debug(
"[%s] %s - tx-hash=%s - Processed event",
Expand All @@ -483,4 +489,75 @@ def _process_decoded_element(
ethereum_tx_hash_hex,
)

return internal_tx
return internal_txs, internal_txs_decoded, safe_relevant_txs

def process_elements(self, log_receipts: Sequence[LogReceipt]) -> List[Any]:
"""
Process all events found by `find_relevant_elements`

:param log_receipts: Events to store in database
:return: List of events already stored in database
"""
if not log_receipts:
return []

logger.debug("Excluding events processed recently")
# Ignore already processed events
not_processed_log_receipts = [
log_receipt
for log_receipt in log_receipts
if not self.element_already_processed_checker.is_processed(
log_receipt["transactionHash"],
log_receipt["blockHash"],
log_receipt["logIndex"],
)
]
logger.debug("Decoding `log_receipts` of the events")
decoded_elements: List[EventData] = self.decode_elements(
not_processed_log_receipts
)
logger.debug("Decoded `log_receipts` of the events")
tx_hashes = OrderedDict.fromkeys(
[event["transactionHash"] for event in not_processed_log_receipts]
).keys()
logger.debug("Prefetching and storing %d ethereum txs", len(tx_hashes))
self.index_service.txs_create_or_update_from_tx_hashes(tx_hashes)
logger.debug("End prefetching and storing of ethereum txs")
logger.debug("Processing %d decoded events", len(decoded_elements))
internal_txs_to_insert = []
internal_txs_decoded_to_insert = []
safe_relevant_txs_to_insert = []
for decoded_element in decoded_elements:
internal_txs, internal_txs_decoded, safe_relevant_txs = (
self._process_decoded_element(decoded_element)
)
internal_txs_to_insert.extend(internal_txs)
internal_txs_decoded_to_insert.extend(internal_txs_decoded)
safe_relevant_txs_to_insert.extend(safe_relevant_txs)
logger.debug("End processing %d decoded events", len(decoded_elements))

logger.info("Inserting %d elements on database", len(internal_txs_to_insert))
InternalTx.objects.filter(ethereum_tx_id__in=tx_hashes).delete()
InternalTx.objects.bulk_create(internal_txs_to_insert)
# If we use `ignore_conflicts`, things are not inserted
InternalTxDecoded.objects.bulk_create(
internal_txs_decoded_to_insert, ignore_conflicts=True
)
SafeRelevantTransaction.objects.bulk_create(
safe_relevant_txs_to_insert, ignore_conflicts=True
)
self.safe_setup_cache.clear()
logger.info(
"End inserting %d elements on database", len(internal_txs_to_insert)
)

logger.debug("Marking events as processed")
for log_receipt in not_processed_log_receipts:
self.element_already_processed_checker.mark_as_processed(
log_receipt["transactionHash"],
log_receipt["blockHash"],
log_receipt["logIndex"],
)
logger.debug("Marked events as processed")

return internal_txs_to_insert
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def test_safe_events_indexer(self):
multisig_tx.sign(owner_account_1.key)
multisig_tx.execute(self.ethereum_test_account.key)
# Process events: SafeMultiSigTransaction, ExecutionSuccess
self.assertEqual(self.safe_events_indexer.start(), (2, 1))
self.assertEqual(self.safe_events_indexer.start(), (3, 1))
self.safe_tx_processor.process_decoded_transactions(txs_decoded_queryset.all())
# Add one SafeStatus increasing the nonce
self.assertEqual(SafeStatus.objects.count(), 15)
Expand Down Expand Up @@ -668,7 +668,7 @@ def test_element_already_processed_checker(self):
)
self.assertEqual(len(processed_element_cache), 0)
self.assertEqual(
len(self.safe_events_indexer.process_elements(safe_events_mock)), 28
len(self.safe_events_indexer.process_elements(safe_events_mock)), 29
)
self.assertEqual(len(processed_element_cache), 28)

Expand All @@ -680,10 +680,11 @@ def test_element_already_processed_checker(self):
len(self.safe_events_indexer.process_elements(safe_events_mock)), 0
)

# Even if we empty the cache, events will not be reprocessed again
# If we empty the cache, events but creation will be reprocessed again
self.safe_events_indexer.element_already_processed_checker.clear()
self.safe_events_indexer.safe_setup_cache.clear()
self.assertEqual(
len(self.safe_events_indexer.process_elements(safe_events_mock)), 0
len(self.safe_events_indexer.process_elements(safe_events_mock)), 27
)

def test_auto_adjust_block_limit(self):
Expand Down
Loading