Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor processing of Internal Txs #2220

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion safe_transaction_service/history/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,11 @@ def is_corrupted(self) -> bool:
"""
SafeStatus nonce must be incremental. If current nonce is bigger than the number of SafeStatus for that Safe
something is wrong. There could be more SafeStatus than nonce (e.g. a call to a MultiSend
adding owners and enabling a Module in the same contract `execTransaction`)
adding owners and enabling a Module in the same contract `execTransaction`), but never less.

However, there's the possibility that there isn't a problem in the indexer. For example,
in a L2 network a Safe could be migrated from L1 to L2 and some transactions will never be detected
by the indexer.

:return: `True` if corrupted, `False` otherwise
"""
Expand Down
45 changes: 44 additions & 1 deletion safe_transaction_service/history/services/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __new__(cls):
get_auto_ethereum_client(),
settings.ETH_REORG_BLOCKS,
settings.ETH_L2_NETWORK,
settings.ETH_INTERNAL_TX_DECODED_PROCESS_BATCH,
)
return cls.instance

Expand All @@ -82,10 +83,19 @@ def __init__(
ethereum_client: EthereumClient,
eth_reorg_blocks: int,
eth_l2_network: bool,
eth_internal_tx_decoded_process_batch: int,
):
self.ethereum_client = ethereum_client
self.eth_reorg_blocks = eth_reorg_blocks
self.eth_l2_network = eth_l2_network
self.eth_internal_tx_decoded_process_batch = (
eth_internal_tx_decoded_process_batch
)

# Prevent circular import
from ..indexers.tx_processor import SafeTxProcessor, SafeTxProcessorProvider

self.tx_processor: SafeTxProcessor = SafeTxProcessorProvider()

def block_get_or_create_from_block_hash(self, block_hash: int):
try:
Expand Down Expand Up @@ -358,7 +368,8 @@ def fix_out_of_order(
) -> None:
"""
Fix a Safe that has transactions out of order (not processed transactions
in between processed ones, usually due a reindex), by reprocessing all of them
in between processed ones, usually due a reindex), by marking
them as not processed from the `internal_tx` where the issue was detected.

:param address: Safe to fix
:param internal_tx: Only reprocess transactions from `internal_tx` and newer
Expand Down Expand Up @@ -388,6 +399,38 @@ def fix_out_of_order(
SafeLastStatus.objects.filter(address=address).delete()
logger.info("[%s] Ended fixing out of order", address)

def process_decoded_txs(self, safe_address: ChecksumAddress) -> int:
"""
Process all the pending `InternalTxDecoded` for a Safe

:param safe_address:
:return: Number of `InternalTxDecoded` processed
"""

# Check if a new decoded tx appeared before other already processed (due to a reindex)
if InternalTxDecoded.objects.out_of_order_for_safe(safe_address):
logger.error("[%s] Found out of order transactions", safe_address)
self.fix_out_of_order(
safe_address,
InternalTxDecoded.objects.pending_for_safe(safe_address)[0].internal_tx,
)
self.tx_processor.clear_cache(safe_address)

# Use chunks for memory issues
total_processed_txs = 0
while True:
internal_txs_decoded_queryset = InternalTxDecoded.objects.pending_for_safe(
safe_address
)[: self.eth_internal_tx_decoded_process_batch]
if not internal_txs_decoded_queryset:
break
total_processed_txs += len(
self.tx_processor.process_decoded_transactions(
internal_txs_decoded_queryset
)
)
return total_processed_txs

def reprocess_addresses(self, addresses: List[ChecksumAddress]):
"""
Given a list of safe addresses it will delete all `SafeStatus`, conflicting `MultisigTxs` and will mark
Expand Down
146 changes: 24 additions & 122 deletions safe_transaction_service/history/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import random
from typing import Optional, Tuple

from django.conf import settings
from django.utils import timezone

from celery import app
Expand All @@ -24,15 +23,7 @@
ProxyFactoryIndexerProvider,
SafeEventsIndexerProvider,
)
from .indexers.tx_processor import SafeTxProcessor, SafeTxProcessorProvider
from .models import (
EthereumBlock,
InternalTxDecoded,
MultisigTransaction,
SafeContract,
SafeLastStatus,
SafeStatus,
)
from .models import EthereumBlock, InternalTxDecoded, MultisigTransaction, SafeContract
from .services import (
CollectiblesServiceProvider,
IndexingException,
Expand Down Expand Up @@ -278,6 +269,29 @@ def process_decoded_internal_txs_task(self) -> Optional[int]:
logger.info("%d Safes to process", count)


@app.shared_task(bind=True, soft_time_limit=SOFT_TIMEOUT, time_limit=LOCK_TIMEOUT)
def process_decoded_internal_txs_for_safe_task(
self, safe_address: ChecksumAddress, reindex_master_copies: bool = True
) -> Optional[int]:
"""
Process decoded internal txs for one Safe. Processing decoded transactions
could be slow and this way multiple Safes can be processed at the same time

:param safe_address:
:param reindex_master_copies: Trigger auto reindexing if a problem is found
:return: Number of `InternalTxDecoded` processed
"""
with contextlib.suppress(LockError):
with only_one_running_task(self, lock_name_suffix=safe_address):
logger.info("[%s] Start processing decoded internal txs", safe_address)
index_service: IndexService = IndexServiceProvider()
number_processed = index_service.process_decoded_txs(safe_address)
logger.info(
"[%s] Processed %d decoded transactions", safe_address, number_processed
)
return number_processed


@app.shared_task(bind=True, soft_time_limit=SOFT_TIMEOUT, time_limit=LOCK_TIMEOUT)
def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[int]:
"""
Expand Down Expand Up @@ -384,118 +398,6 @@ def reindex_erc20_events_task(
)


@app.shared_task(bind=True, soft_time_limit=SOFT_TIMEOUT, time_limit=LOCK_TIMEOUT)
def process_decoded_internal_txs_for_safe_task(
self, safe_address: ChecksumAddress, reindex_master_copies: bool = True
) -> Optional[int]:
"""
Process decoded internal txs for one Safe. Processing decoded transactions is very slow and this way multiple
Safes can be processed at the same time

:param safe_address:
:param reindex_master_copies: Trigger auto reindexing if a problem is found
:return:
"""
with contextlib.suppress(LockError):
with only_one_running_task(self, lock_name_suffix=safe_address):
logger.info("[%s] Start processing decoded internal txs", safe_address)

tx_processor: SafeTxProcessor = SafeTxProcessorProvider()
index_service: IndexService = IndexServiceProvider()

# Check if something is wrong during indexing
try:
safe_last_status = SafeLastStatus.objects.get_or_generate(
address=safe_address
)
except SafeLastStatus.DoesNotExist:
safe_last_status = None

if safe_last_status and safe_last_status.is_corrupted():
try:
# Find first corrupted safe status
previous_safe_status: Optional[SafeStatus] = None
for safe_status in SafeStatus.objects.filter(
address=safe_address
).sorted_reverse_by_mined():
if safe_status.is_corrupted():
message = (
f"[{safe_address}] A problem was found in SafeStatus "
f"with nonce={safe_status.nonce} "
f"on internal-tx-id={safe_status.internal_tx_id} "
f"tx-hash={safe_status.internal_tx.ethereum_tx_id} "
)
logger.error(message)
logger.info(
"[%s] Processing traces again",
safe_address,
)
if reindex_master_copies and previous_safe_status:
block_number = previous_safe_status.block_number
to_block_number = safe_last_status.block_number
logger.info(
"[%s] Last known not corrupted SafeStatus with nonce=%d on block=%d , "
"reindexing until block=%d",
safe_address,
previous_safe_status.nonce,
block_number,
to_block_number,
)
# Setting the safe address reindexing should be very fast
reindex_master_copies_task.delay(
block_number,
to_block_number=to_block_number,
addresses=[safe_address],
)
logger.info(
"[%s] Processing traces again after reindexing",
safe_address,
)
raise ValueError(message)
previous_safe_status = safe_status
finally:
tx_processor.clear_cache(safe_address)
index_service.reprocess_addresses([safe_address])

# Check if a new decoded tx appeared before other already processed (due to a reindex)
if InternalTxDecoded.objects.out_of_order_for_safe(safe_address):
logger.error("[%s] Found out of order transactions", safe_address)
index_service.fix_out_of_order(
safe_address,
InternalTxDecoded.objects.pending_for_safe(safe_address)[
0
].internal_tx,
)
tx_processor.clear_cache(safe_address)

# Use chunks for memory issues
number_processed = 0
while True:
internal_txs_decoded_queryset = (
InternalTxDecoded.objects.pending_for_safe(safe_address)[
: settings.ETH_INTERNAL_TX_DECODED_PROCESS_BATCH
]
)
if not internal_txs_decoded_queryset:
break
number_processed += len(
tx_processor.process_decoded_transactions(
internal_txs_decoded_queryset
)
)

logger.info(
"[%s] Processed %d decoded transactions", safe_address, number_processed
)
if number_processed:
logger.info(
"[%s] %d decoded internal txs successfully processed",
safe_address,
number_processed,
)
return number_processed


@app.shared_task(
soft_time_limit=SOFT_TIMEOUT,
time_limit=LOCK_TIMEOUT,
Expand Down
46 changes: 46 additions & 0 deletions safe_transaction_service/history/tests/test_index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from .factories import (
EthereumTxFactory,
InternalTxDecodedFactory,
MultisigTransactionFactory,
SafeMasterCopyFactory,
SafeStatusFactory,
Expand Down Expand Up @@ -125,6 +126,51 @@ def test_is_service_synced(self, current_block_number_mock: PropertyMock):
current_block_number_mock.side_effect = RequestsConnectionError
self.assertFalse(self.index_service.is_service_synced())

def test_process_decoded_txs(self):
safe_address = Account.create().address
with mock.patch.object(
IndexService, "fix_out_of_order"
) as fix_out_of_order_mock:
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
fix_out_of_order_mock.assert_not_called()

# Setup for a random Safe should not be processed
InternalTxDecodedFactory(
function_name="setup",
)
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)

setup_internal_tx = InternalTxDecodedFactory(
function_name="setup",
internal_tx___from=safe_address,
)
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1)
fix_out_of_order_mock.assert_not_called()
# After processed, it should not be processed again
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)

exec_transactions = [
InternalTxDecodedFactory(
function_name="execTransaction",
internal_tx___from=safe_address,
)
for _ in range(3)
]

self.assertEqual(self.index_service.process_decoded_txs(safe_address), 3)
fix_out_of_order_mock.assert_not_called()
# After processed, they should not be processed again
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)

# Add a transaction out of order
exec_transactions[1].processed = False
exec_transactions[1].save(update_fields=["processed"])
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1)
# Out of order transaction was detected
fix_out_of_order_mock.assert_called_with(
safe_address, exec_transactions[1].internal_tx
)

def test_reprocess_addresses(self):
index_service: IndexService = self.index_service
self.assertIsNone(index_service.reprocess_addresses([]))
Expand Down
53 changes: 15 additions & 38 deletions safe_transaction_service/history/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,46 +189,23 @@ def test_process_decoded_internal_txs_for_banned_safe(self):
self.assertEqual(SafeStatus.objects.filter(address=safe_address).count(), 0)

def test_process_decoded_internal_txs_for_safe_task(self):
# Test corrupted SafeStatus
safe_status_0 = SafeStatusFactory(nonce=0)
safe_address = safe_status_0.address
safe_status_2 = SafeStatusFactory(nonce=2, address=safe_address)
safe_status_5 = SafeStatusFactory(nonce=5, address=safe_address)
SafeLastStatus.objects.update_or_create_from_safe_status(safe_status_5)
with patch.object(IndexService, "reindex_master_copies") as reindex_mock:
with patch.object(IndexService, "reprocess_addresses") as reprocess_mock:
with self.assertLogs(logger=task_logger) as cm:
process_decoded_internal_txs_for_safe_task.delay(safe_address)
reprocess_mock.assert_called_with([safe_address])
reindex_mock.assert_called_with(
safe_status_0.block_number,
to_block_number=safe_status_5.block_number,
addresses=[safe_address],
)
self.assertIn(
f"[{safe_address}] A problem was found in SafeStatus "
f"with nonce=2 on internal-tx-id={safe_status_2.internal_tx_id}",
cm.output[1],
)
self.assertIn(
f"[{safe_address}] Processing traces again",
cm.output[2],
)
self.assertIn(
f"[{safe_address}] Last known not corrupted SafeStatus with nonce=0 on "
f"block={safe_status_0.internal_tx.ethereum_tx.block_id} , "
f"reindexing until block={safe_status_5.block_number}",
cm.output[3],
)
self.assertIn(
f"Reindexing master copies from-block={safe_status_0.internal_tx.ethereum_tx.block_id} "
f"to-block={safe_status_5.block_number} addresses={[safe_address]}",
cm.output[4],
)
self.assertIn(
f"[{safe_address}] Processing traces again after reindexing",
cm.output[5],
)
SafeLastStatus.objects.update_or_create_from_safe_status(safe_status_0)
with self.assertLogs(logger=task_logger) as cm:
with patch.object(
IndexService, "process_decoded_txs", return_value=5
) as process_decoded_txs_mock:
process_decoded_internal_txs_for_safe_task.delay(safe_address)
process_decoded_txs_mock.assert_called_with(safe_address)
self.assertIn(
f"[{safe_address}] Start processing decoded internal txs",
cm.output[0],
)
self.assertIn(
f"[{safe_address}] Processed 5 decoded transactions",
cm.output[1],
)

@patch.object(CollectiblesService, "get_metadata", autospec=True, return_value={})
def test_retry_get_metadata_task(self, get_metadata_mock: MagicMock):
Expand Down