diff --git a/safe_transaction_service/history/models.py b/safe_transaction_service/history/models.py index 4b85a7641..c41611274 100644 --- a/safe_transaction_service/history/models.py +++ b/safe_transaction_service/history/models.py @@ -1927,7 +1927,11 @@ def is_corrupted(self) -> bool: """ SafeStatus nonce must be incremental. If current nonce is bigger than the number of SafeStatus for that Safe something is wrong. There could be more SafeStatus than nonce (e.g. a call to a MultiSend - adding owners and enabling a Module in the same contract `execTransaction`) + adding owners and enabling a Module in the same contract `execTransaction`), but never less. + + However, there's the possibility that there isn't a problem in the indexer. For example, + in a L2 network a Safe could be migrated from L1 to L2 and some transactions will never be detected + by the indexer. :return: `True` if corrupted, `False` otherwise """ diff --git a/safe_transaction_service/history/services/index_service.py b/safe_transaction_service/history/services/index_service.py index e519b7817..2fc0ec6f7 100644 --- a/safe_transaction_service/history/services/index_service.py +++ b/safe_transaction_service/history/services/index_service.py @@ -67,6 +67,7 @@ def __new__(cls): get_auto_ethereum_client(), settings.ETH_REORG_BLOCKS, settings.ETH_L2_NETWORK, + settings.ETH_INTERNAL_TX_DECODED_PROCESS_BATCH, ) return cls.instance @@ -82,10 +83,19 @@ def __init__( ethereum_client: EthereumClient, eth_reorg_blocks: int, eth_l2_network: bool, + eth_internal_tx_decoded_process_batch: int, ): self.ethereum_client = ethereum_client self.eth_reorg_blocks = eth_reorg_blocks self.eth_l2_network = eth_l2_network + self.eth_internal_tx_decoded_process_batch = ( + eth_internal_tx_decoded_process_batch + ) + + # Prevent circular import + from ..indexers.tx_processor import SafeTxProcessor, SafeTxProcessorProvider + + self.tx_processor: SafeTxProcessor = SafeTxProcessorProvider() def block_get_or_create_from_block_hash(self, block_hash: int): try: @@ -358,7 +368,8 @@ def fix_out_of_order( ) -> None: """ Fix a Safe that has transactions out of order (not processed transactions - in between processed ones, usually due a reindex), by reprocessing all of them + in between processed ones, usually due a reindex), by marking + them as not processed from the `internal_tx` where the issue was detected. :param address: Safe to fix :param internal_tx: Only reprocess transactions from `internal_tx` and newer @@ -388,6 +399,38 @@ def fix_out_of_order( SafeLastStatus.objects.filter(address=address).delete() logger.info("[%s] Ended fixing out of order", address) + def process_decoded_txs(self, safe_address: ChecksumAddress) -> int: + """ + Process all the pending `InternalTxDecoded` for a Safe + + :param safe_address: + :return: Number of `InternalTxDecoded` processed + """ + + # Check if a new decoded tx appeared before other already processed (due to a reindex) + if InternalTxDecoded.objects.out_of_order_for_safe(safe_address): + logger.error("[%s] Found out of order transactions", safe_address) + self.fix_out_of_order( + safe_address, + InternalTxDecoded.objects.pending_for_safe(safe_address)[0].internal_tx, + ) + self.tx_processor.clear_cache(safe_address) + + # Use chunks for memory issues + total_processed_txs = 0 + while True: + internal_txs_decoded_queryset = InternalTxDecoded.objects.pending_for_safe( + safe_address + )[: self.eth_internal_tx_decoded_process_batch] + if not internal_txs_decoded_queryset: + break + total_processed_txs += len( + self.tx_processor.process_decoded_transactions( + internal_txs_decoded_queryset + ) + ) + return total_processed_txs + def reprocess_addresses(self, addresses: List[ChecksumAddress]): """ Given a list of safe addresses it will delete all `SafeStatus`, conflicting `MultisigTxs` and will mark diff --git a/safe_transaction_service/history/tasks.py b/safe_transaction_service/history/tasks.py index 27033f61e..f63487077 100644 --- a/safe_transaction_service/history/tasks.py +++ b/safe_transaction_service/history/tasks.py @@ -5,7 +5,6 @@ import random from typing import Optional, Tuple -from django.conf import settings from django.utils import timezone from celery import app @@ -24,15 +23,7 @@ ProxyFactoryIndexerProvider, SafeEventsIndexerProvider, ) -from .indexers.tx_processor import SafeTxProcessor, SafeTxProcessorProvider -from .models import ( - EthereumBlock, - InternalTxDecoded, - MultisigTransaction, - SafeContract, - SafeLastStatus, - SafeStatus, -) +from .models import EthereumBlock, InternalTxDecoded, MultisigTransaction, SafeContract from .services import ( CollectiblesServiceProvider, IndexingException, @@ -278,6 +269,29 @@ def process_decoded_internal_txs_task(self) -> Optional[int]: logger.info("%d Safes to process", count) +@app.shared_task(bind=True, soft_time_limit=SOFT_TIMEOUT, time_limit=LOCK_TIMEOUT) +def process_decoded_internal_txs_for_safe_task( + self, safe_address: ChecksumAddress, reindex_master_copies: bool = True +) -> Optional[int]: + """ + Process decoded internal txs for one Safe. Processing decoded transactions + could be slow and this way multiple Safes can be processed at the same time + + :param safe_address: + :param reindex_master_copies: Trigger auto reindexing if a problem is found + :return: Number of `InternalTxDecoded` processed + """ + with contextlib.suppress(LockError): + with only_one_running_task(self, lock_name_suffix=safe_address): + logger.info("[%s] Start processing decoded internal txs", safe_address) + index_service: IndexService = IndexServiceProvider() + number_processed = index_service.process_decoded_txs(safe_address) + logger.info( + "[%s] Processed %d decoded transactions", safe_address, number_processed + ) + return number_processed + + @app.shared_task(bind=True, soft_time_limit=SOFT_TIMEOUT, time_limit=LOCK_TIMEOUT) def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[int]: """ @@ -384,118 +398,6 @@ def reindex_erc20_events_task( ) -@app.shared_task(bind=True, soft_time_limit=SOFT_TIMEOUT, time_limit=LOCK_TIMEOUT) -def process_decoded_internal_txs_for_safe_task( - self, safe_address: ChecksumAddress, reindex_master_copies: bool = True -) -> Optional[int]: - """ - Process decoded internal txs for one Safe. Processing decoded transactions is very slow and this way multiple - Safes can be processed at the same time - - :param safe_address: - :param reindex_master_copies: Trigger auto reindexing if a problem is found - :return: - """ - with contextlib.suppress(LockError): - with only_one_running_task(self, lock_name_suffix=safe_address): - logger.info("[%s] Start processing decoded internal txs", safe_address) - - tx_processor: SafeTxProcessor = SafeTxProcessorProvider() - index_service: IndexService = IndexServiceProvider() - - # Check if something is wrong during indexing - try: - safe_last_status = SafeLastStatus.objects.get_or_generate( - address=safe_address - ) - except SafeLastStatus.DoesNotExist: - safe_last_status = None - - if safe_last_status and safe_last_status.is_corrupted(): - try: - # Find first corrupted safe status - previous_safe_status: Optional[SafeStatus] = None - for safe_status in SafeStatus.objects.filter( - address=safe_address - ).sorted_reverse_by_mined(): - if safe_status.is_corrupted(): - message = ( - f"[{safe_address}] A problem was found in SafeStatus " - f"with nonce={safe_status.nonce} " - f"on internal-tx-id={safe_status.internal_tx_id} " - f"tx-hash={safe_status.internal_tx.ethereum_tx_id} " - ) - logger.error(message) - logger.info( - "[%s] Processing traces again", - safe_address, - ) - if reindex_master_copies and previous_safe_status: - block_number = previous_safe_status.block_number - to_block_number = safe_last_status.block_number - logger.info( - "[%s] Last known not corrupted SafeStatus with nonce=%d on block=%d , " - "reindexing until block=%d", - safe_address, - previous_safe_status.nonce, - block_number, - to_block_number, - ) - # Setting the safe address reindexing should be very fast - reindex_master_copies_task.delay( - block_number, - to_block_number=to_block_number, - addresses=[safe_address], - ) - logger.info( - "[%s] Processing traces again after reindexing", - safe_address, - ) - raise ValueError(message) - previous_safe_status = safe_status - finally: - tx_processor.clear_cache(safe_address) - index_service.reprocess_addresses([safe_address]) - - # Check if a new decoded tx appeared before other already processed (due to a reindex) - if InternalTxDecoded.objects.out_of_order_for_safe(safe_address): - logger.error("[%s] Found out of order transactions", safe_address) - index_service.fix_out_of_order( - safe_address, - InternalTxDecoded.objects.pending_for_safe(safe_address)[ - 0 - ].internal_tx, - ) - tx_processor.clear_cache(safe_address) - - # Use chunks for memory issues - number_processed = 0 - while True: - internal_txs_decoded_queryset = ( - InternalTxDecoded.objects.pending_for_safe(safe_address)[ - : settings.ETH_INTERNAL_TX_DECODED_PROCESS_BATCH - ] - ) - if not internal_txs_decoded_queryset: - break - number_processed += len( - tx_processor.process_decoded_transactions( - internal_txs_decoded_queryset - ) - ) - - logger.info( - "[%s] Processed %d decoded transactions", safe_address, number_processed - ) - if number_processed: - logger.info( - "[%s] %d decoded internal txs successfully processed", - safe_address, - number_processed, - ) - return number_processed - - @app.shared_task( soft_time_limit=SOFT_TIMEOUT, time_limit=LOCK_TIMEOUT, diff --git a/safe_transaction_service/history/tests/test_index_service.py b/safe_transaction_service/history/tests/test_index_service.py index c2061503c..8f023727d 100644 --- a/safe_transaction_service/history/tests/test_index_service.py +++ b/safe_transaction_service/history/tests/test_index_service.py @@ -23,6 +23,7 @@ ) from .factories import ( EthereumTxFactory, + InternalTxDecodedFactory, MultisigTransactionFactory, SafeMasterCopyFactory, SafeStatusFactory, @@ -125,6 +126,51 @@ def test_is_service_synced(self, current_block_number_mock: PropertyMock): current_block_number_mock.side_effect = RequestsConnectionError self.assertFalse(self.index_service.is_service_synced()) + def test_process_decoded_txs(self): + safe_address = Account.create().address + with mock.patch.object( + IndexService, "fix_out_of_order" + ) as fix_out_of_order_mock: + self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0) + fix_out_of_order_mock.assert_not_called() + + # Setup for a random Safe should not be processed + InternalTxDecodedFactory( + function_name="setup", + ) + self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0) + + setup_internal_tx = InternalTxDecodedFactory( + function_name="setup", + internal_tx___from=safe_address, + ) + self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1) + fix_out_of_order_mock.assert_not_called() + # After processed, it should not be processed again + self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0) + + exec_transactions = [ + InternalTxDecodedFactory( + function_name="execTransaction", + internal_tx___from=safe_address, + ) + for _ in range(3) + ] + + self.assertEqual(self.index_service.process_decoded_txs(safe_address), 3) + fix_out_of_order_mock.assert_not_called() + # After processed, they should not be processed again + self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0) + + # Add a transaction out of order + exec_transactions[1].processed = False + exec_transactions[1].save(update_fields=["processed"]) + self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1) + # Out of order transaction was detected + fix_out_of_order_mock.assert_called_with( + safe_address, exec_transactions[1].internal_tx + ) + def test_reprocess_addresses(self): index_service: IndexService = self.index_service self.assertIsNone(index_service.reprocess_addresses([])) diff --git a/safe_transaction_service/history/tests/test_tasks.py b/safe_transaction_service/history/tests/test_tasks.py index e9c2ad96e..ed4b9b854 100644 --- a/safe_transaction_service/history/tests/test_tasks.py +++ b/safe_transaction_service/history/tests/test_tasks.py @@ -189,46 +189,23 @@ def test_process_decoded_internal_txs_for_banned_safe(self): self.assertEqual(SafeStatus.objects.filter(address=safe_address).count(), 0) def test_process_decoded_internal_txs_for_safe_task(self): - # Test corrupted SafeStatus safe_status_0 = SafeStatusFactory(nonce=0) safe_address = safe_status_0.address - safe_status_2 = SafeStatusFactory(nonce=2, address=safe_address) - safe_status_5 = SafeStatusFactory(nonce=5, address=safe_address) - SafeLastStatus.objects.update_or_create_from_safe_status(safe_status_5) - with patch.object(IndexService, "reindex_master_copies") as reindex_mock: - with patch.object(IndexService, "reprocess_addresses") as reprocess_mock: - with self.assertLogs(logger=task_logger) as cm: - process_decoded_internal_txs_for_safe_task.delay(safe_address) - reprocess_mock.assert_called_with([safe_address]) - reindex_mock.assert_called_with( - safe_status_0.block_number, - to_block_number=safe_status_5.block_number, - addresses=[safe_address], - ) - self.assertIn( - f"[{safe_address}] A problem was found in SafeStatus " - f"with nonce=2 on internal-tx-id={safe_status_2.internal_tx_id}", - cm.output[1], - ) - self.assertIn( - f"[{safe_address}] Processing traces again", - cm.output[2], - ) - self.assertIn( - f"[{safe_address}] Last known not corrupted SafeStatus with nonce=0 on " - f"block={safe_status_0.internal_tx.ethereum_tx.block_id} , " - f"reindexing until block={safe_status_5.block_number}", - cm.output[3], - ) - self.assertIn( - f"Reindexing master copies from-block={safe_status_0.internal_tx.ethereum_tx.block_id} " - f"to-block={safe_status_5.block_number} addresses={[safe_address]}", - cm.output[4], - ) - self.assertIn( - f"[{safe_address}] Processing traces again after reindexing", - cm.output[5], - ) + SafeLastStatus.objects.update_or_create_from_safe_status(safe_status_0) + with self.assertLogs(logger=task_logger) as cm: + with patch.object( + IndexService, "process_decoded_txs", return_value=5 + ) as process_decoded_txs_mock: + process_decoded_internal_txs_for_safe_task.delay(safe_address) + process_decoded_txs_mock.assert_called_with(safe_address) + self.assertIn( + f"[{safe_address}] Start processing decoded internal txs", + cm.output[0], + ) + self.assertIn( + f"[{safe_address}] Processed 5 decoded transactions", + cm.output[1], + ) @patch.object(CollectiblesService, "get_metadata", autospec=True, return_value={}) def test_retry_get_metadata_task(self, get_metadata_mock: MagicMock):