Skip to content

Commit

Permalink
Refactor all-transactions endpoint
Browse files Browse the repository at this point in the history
- Remove distinct from `ModuleTransaction`, as every `ModuleTransaction` is showed separated even if they share the same `ethereum txHash`
- Clean transaction_service, adding comments, renaming variables and refactoring typing
- Add logging to service and view
- Rename gevent variable for task locks (not very descriptive)
  • Loading branch information
Uxio0 committed Jul 26, 2023
1 parent 59d599a commit dd8eea6
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 36 deletions.
123 changes: 90 additions & 33 deletions safe_transaction_service/history/services/transaction_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
logger = logging.getLogger(__name__)


AnySafeTransaction = EthereumTx | MultisigTransaction | ModuleTransaction


class TransactionServiceException(Exception):
pass

Expand Down Expand Up @@ -65,7 +68,7 @@ def get_cache_key(self, safe_address: str, tx_id: str):

def get_txs_from_cache(
self, safe_address: str, ids_to_search: Sequence[str]
) -> List[Union[EthereumTx, MultisigTransaction, ModuleTransaction]]:
) -> List[AnySafeTransaction]:
keys_to_search = [
self.get_cache_key(safe_address, id_to_search)
for id_to_search in ids_to_search
Expand All @@ -78,13 +81,11 @@ def get_txs_from_cache(
def store_txs_in_cache(
self,
safe_address: str,
ids_with_txs: Tuple[
str, List[Union[EthereumTx, MultisigTransaction, ModuleTransaction]]
],
ids_with_txs: Tuple[str, List[AnySafeTransaction]],
):
"""
Store executed transactions older than 10 minutes, using `ethereum_tx_hash` as key (for
MultisigTransaction it will be `SafeTxHash`) and expire then in one hour
MultisigTransaction it will be `SafeTxHash`) and expire them in one hour
:param safe_address:
:param ids_with_txs:
Expand Down Expand Up @@ -119,7 +120,7 @@ def get_all_tx_identifiers(
Build a queryset with identifiers (`safeTxHash` or `txHash`) for every tx for a Safe for paginated filtering.
In the case of Multisig Transactions, as some of them are not mined, we use the `safeTxHash`.
Criteria for building this list:
- Return only multisig txs with `nonce < current Safe Nonce`
- Return ``SafeTxHash`` for every MultisigTx (even not executed)
- The endpoint should only show incoming transactions that have been mined
- The transactions should be sorted by execution date. If an outgoing transaction doesn't have an execution
date the execution date of the transaction with the same nonce that has been executed should be taken.
Expand All @@ -134,7 +135,13 @@ def get_all_tx_identifiers(
sent by a delegate or indexed). With `False` all txs are returned
:return: List with tx hashes sorted by date (newest first)
"""

logger.debug(
"Safe=%s Getting all tx identifiers executed=%s queued=%s trusted=%s",
safe_address,
executed,
queued,
trusted,
)
# If tx is not mined, get the execution date of a tx mined with the same nonce
case = Case(
When(
Expand Down Expand Up @@ -197,7 +204,6 @@ def get_all_tx_identifiers(
"block",
"safe_nonce",
)
.distinct()
.order_by("-execution_date")
)

Expand Down Expand Up @@ -282,57 +288,87 @@ def get_all_tx_identifiers(

def get_all_txs_from_identifiers(
self, safe_address: str, ids_to_search: Sequence[str]
) -> List[Union[EthereumTx, MultisigTransaction, ModuleTransaction]]:
) -> List[AnySafeTransaction]:
"""
Now that we know how to paginate, we retrieve the real transactions
:param safe_address:
:param ids_to_search: `SafeTxHash` for MultisigTransactions, `txHash` for other transactions
:return:
"""
cached_txs = {
id_to_search: cached_tx
for id_to_search, cached_tx in zip(

logger.debug(
"Safe=%s Getting %d txs from identifiers", safe_address, len(ids_to_search)
)
ids_with_cached_txs = {
id_to_search: cached_txs
for id_to_search, cached_txs in zip(
ids_to_search,
self.get_txs_from_cache(safe_address, ids_to_search),
)
if cached_tx
if cached_txs
}
id_not_cached = [
logger.debug(
"Safe=%s Got %d cached txs from identifiers",
safe_address,
len(ids_with_cached_txs),
)
ids_not_cached = [
hash_to_search
for hash_to_search in ids_to_search
if hash_to_search not in cached_txs
if hash_to_search not in ids_with_cached_txs
]
id_with_multisig_txs: Dict[HexStr, List[MultisigTransaction]] = {
logger.debug(
"Safe=%s %d not cached txs from identifiers",
safe_address,
len(ids_not_cached),
)
ids_with_multisig_txs: Dict[HexStr, List[MultisigTransaction]] = {
multisig_tx.safe_tx_hash: [multisig_tx]
for multisig_tx in MultisigTransaction.objects.filter(
safe=safe_address, safe_tx_hash__in=id_not_cached
safe=safe_address, safe_tx_hash__in=ids_not_cached
)
.with_confirmations_required()
.prefetch_related("confirmations")
.select_related("ethereum_tx__block")
.order_by("-nonce", "-created")
}
logger.debug(
"Safe=%s Got %d Multisig txs from identifiers",
safe_address,
len(ids_with_multisig_txs),
)

id_with_module_txs: Dict[HexStr, List[ModuleTransaction]] = {}
ids_with_module_txs: Dict[HexStr, List[ModuleTransaction]] = {}
for module_tx in ModuleTransaction.objects.filter(
safe=safe_address, internal_tx__ethereum_tx__in=id_not_cached
safe=safe_address, internal_tx__ethereum_tx__in=ids_not_cached
).select_related("internal_tx"):
id_with_module_txs.setdefault(
ids_with_module_txs.setdefault(
module_tx.internal_tx.ethereum_tx_id, []
).append(module_tx)
logger.debug(
"Safe=%s Got %d Module txs from identifiers",
safe_address,
len(ids_with_module_txs),
)

id_with_plain_ethereum_txs: Dict[HexStr, List[EthereumTx]] = {
ids_with_plain_ethereum_txs: Dict[HexStr, List[EthereumTx]] = {
ethereum_tx.tx_hash: [ethereum_tx]
for ethereum_tx in EthereumTx.objects.filter(
tx_hash__in=id_not_cached
tx_hash__in=ids_not_cached
).select_related("block")
}
logger.debug(
"Safe=%s Got %d Plain Ethereum txs from identifiers",
safe_address,
len(ids_with_plain_ethereum_txs),
)

# We also need the in/out transfers for the MultisigTxs
all_ids = id_not_cached + [
# We also need the in/out transfers for the MultisigTxs, we add the MultisigTx Ethereum Tx hashes
# to not cached ids
all_ids = ids_not_cached + [
multisig_tx.ethereum_tx_id
for multisig_txs in id_with_multisig_txs.values()
for multisig_txs in ids_with_multisig_txs.values()
for multisig_tx in multisig_txs
]

Expand All @@ -358,6 +394,10 @@ def get_all_txs_from_identifiers(
for transfer in transfers:
transfer_dict[transfer["transaction_hash"]].append(transfer)

logger.debug(
"Safe=%s Got %d Transfers from identifiers", safe_address, len(transfers)
)

# Add available information about the token on database for the transfers
tokens = {
token.address: token
Expand All @@ -369,36 +409,42 @@ def get_all_txs_from_identifiers(
}
)
}
logger.debug(
"Safe=%s Got %d tokens for transfers from database",
safe_address,
len(tokens),
)

for transfer in transfers:
transfer["token"] = tokens.get(transfer["token_address"])

# Build the list
def get_the_transactions(
transaction_id: str,
) -> List[Union[MultisigTransaction, ModuleTransaction, EthereumTx]]:
) -> List[MultisigTransaction | ModuleTransaction | EthereumTx]:
"""
:param transaction_id:
:return: Transactions for the transaction id
:param transaction_id: SafeTxHash (in case of a ``MultisigTransaction``) or Ethereum ``TxHash`` for the rest
:return: Transactions for the transaction id, with transfers appended
"""
if result := cached_txs.get(transaction_id):
if result := ids_with_cached_txs.get(transaction_id):
return result

result: Optional[Union[MultisigTransaction, ModuleTransaction, EthereumTx]]
if result := id_with_multisig_txs.get(transaction_id):
if result := ids_with_multisig_txs.get(transaction_id):
for multisig_tx in result:
# Populate transfers
multisig_tx.transfers = transfer_dict[multisig_tx.ethereum_tx_id]
return result

if result := id_with_module_txs.get(transaction_id):
if result := ids_with_module_txs.get(transaction_id):
for module_tx in result:
# Populate transfers
module_tx.transfers = transfer_dict[
module_tx.internal_tx.ethereum_tx_id
]
return result

if result := id_with_plain_ethereum_txs.get(transaction_id):
if result := ids_with_plain_ethereum_txs.get(transaction_id):
# If no Multisig or Module tx found, fallback to simple tx
for ethereum_tx in result:
# Populate transfers
Expand All @@ -411,18 +457,27 @@ def get_the_transactions(
"Tx not found, problem merging all transactions together"
)

logger.debug(
"Safe=%s Got all transactions from tx identifiers. Storing in cache",
safe_address,
)
ids_with_txs = [
(id_to_search, get_the_transactions(id_to_search))
for id_to_search in ids_to_search
]
self.store_txs_in_cache(safe_address, ids_with_txs)
logger.debug(
"Safe=%s Got all transactions from tx identifiers. Stored in cache",
safe_address,
)
return list(
dict.fromkeys(tx for (_, txs) in ids_with_txs for tx in txs)
) # Sorted already by execution_date

def serialize_all_txs(
self, models: List[Union[EthereumTx, MultisigTransaction, ModuleTransaction]]
self, models: List[AnySafeTransaction]
) -> List[Dict[str, Any]]:
logger.debug("Serializing all transactions")
results = []
for model in models:
model_type = type(model)
Expand All @@ -437,4 +492,6 @@ def serialize_all_txs(
serialized = serializer(model)
# serialized.is_valid(raise_exception=True)
results.append(serialized.data)

logger.debug("Serialized all transactions")
return results
32 changes: 32 additions & 0 deletions safe_transaction_service/history/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,28 @@ def list(self, request, *args, **kwargs):
transaction_service = TransactionServiceProvider()
safe = self.kwargs["address"]
executed, queued, trusted = self.get_parameters()
logger.debug(
"%s: Getting all tx identifiers for Safe=%s executed=%s queued=%s trusted=%s",
self.__class__.__name__,
safe,
executed,
queued,
trusted,
)
queryset = self.filter_queryset(
transaction_service.get_all_tx_identifiers(
safe, executed=executed, queued=queued, trusted=trusted
)
)
page = self.paginate_queryset(queryset)
logger.debug(
"%s: Got all tx identifiers for Safe=%s executed=%s queued=%s trusted=%s",
self.__class__.__name__,
safe,
executed,
queued,
trusted,
)

if not page:
return self.get_paginated_response([])
Expand All @@ -251,7 +267,23 @@ def list(self, request, *args, **kwargs):
all_txs = transaction_service.get_all_txs_from_identifiers(
safe, all_tx_identifiers
)
logger.debug(
"%s: Got all txs from identifiers for Safe=%s executed=%s queued=%s trusted=%s",
self.__class__.__name__,
safe,
executed,
queued,
trusted,
)
all_txs_serialized = transaction_service.serialize_all_txs(all_txs)
logger.debug(
"%s: All txs from identifiers for Safe=%s executed=%s queued=%s trusted=%s were serialized",
self.__class__.__name__,
safe,
executed,
queued,
trusted,
)
return self.get_paginated_response(all_txs_serialized)

@swagger_auto_schema(
Expand Down
6 changes: 3 additions & 3 deletions safe_transaction_service/utils/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def only_one_running_task(
task: CeleryTask,
lock_name_suffix: Optional[str] = None,
lock_timeout: Optional[int] = LOCK_TIMEOUT,
gevent: bool = True,
gevent_enabled: bool = True,
):
"""
Ensures one running task at the same, using `task` name as a unique key
Expand All @@ -67,7 +67,7 @@ def only_one_running_task(
when it has different arguments
:param lock_timeout: How long the lock will be stored, in case worker is halted so key is not stored forever
in Redis
:param gevent: If `True`, `close_gevent_db_connection` will be called at the end
:param gevent_enabled: If `True`, `close_gevent_db_connection` will be called at the end
:return: Instance of redis `Lock`
:raises: LockError if lock cannot be acquired
"""
Expand All @@ -83,6 +83,6 @@ def only_one_running_task(
yield lock
ACTIVE_LOCKS.remove(lock_name)
finally:
if gevent:
if gevent_enabled:
# Needed for django-db-geventpool
close_gevent_db_connection()

0 comments on commit dd8eea6

Please sign in to comment.