diff --git a/batch/indexer_block_tx_data.py b/batch/indexer_block_tx_data.py index 9412ced2..46effed4 100644 --- a/batch/indexer_block_tx_data.py +++ b/batch/indexer_block_tx_data.py @@ -154,7 +154,6 @@ def main(): while True: try: processor.process() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/indexer_e2e_messaging.py b/batch/indexer_e2e_messaging.py index 5681f0fa..63d24dbe 100644 --- a/batch/indexer_e2e_messaging.py +++ b/batch/indexer_e2e_messaging.py @@ -109,6 +109,7 @@ def process(self): db_session.commit() finally: db_session.close() + LOG.info("Sync job has been completed") def __get_idx_e2e_messaging_block_number(self, db_session: Session): _idx_e2e_messaging_block_number = db_session.query(IDXE2EMessagingBlockNumber). \ @@ -128,7 +129,7 @@ def __set_idx_e2e_messaging_block_number(self, db_session: Session, block_number db_session.merge(_idx_e2e_messaging_block_number) def __sync_all(self, db_session: Session, block_from: int, block_to: int): - LOG.info(f"syncing from={block_from}, to={block_to}") + LOG.info(f"Syncing from={block_from}, to={block_to}") self.__sync_message(db_session, block_from, block_to) def __sync_message(self, db_session: Session, block_from: int, block_to: int): @@ -281,7 +282,6 @@ def main(): start_time = time.time() try: processor.process() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/indexer_issue_redeem.py b/batch/indexer_issue_redeem.py index 7c05cfd3..241ccdc8 100644 --- a/batch/indexer_issue_redeem.py +++ b/batch/indexer_issue_redeem.py @@ -82,6 +82,7 @@ def sync_new_logs(self): db_session.commit() finally: db_session.close() + LOG.info("Sync job has been completed") def __get_token_list(self, db_session: Session): issued_token_address_list: tuple[str, ...] = tuple( @@ -128,7 +129,7 @@ def __set_idx_transfer_block_number(self, db_session: Session, block_number: int db_session.merge(_idx_transfer_block_number) def __sync_all(self, db_session: Session, block_from: int, block_to: int): - LOG.info(f"syncing from={block_from}, to={block_to}") + LOG.info(f"Syncing from={block_from}, to={block_to}") self.__sync_issue(db_session, block_from, block_to) self.__sync_redeem(db_session, block_from, block_to) @@ -232,7 +233,6 @@ def main(): while True: try: processor.sync_new_logs() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/indexer_personal_info.py b/batch/indexer_personal_info.py index a4b59711..bc457e35 100644 --- a/batch/indexer_personal_info.py +++ b/batch/indexer_personal_info.py @@ -67,7 +67,7 @@ def process(self): latest_block = web3.eth.block_number # latest blockNumber if block_number >= latest_block: - LOG.debug("skip Process") + LOG.debug("skip process") else: self.__sync_all( db_session=db_session, @@ -79,6 +79,7 @@ def process(self): db_session.commit() finally: db_session.close() + LOG.info("Sync job has been completed") def __refresh_personal_info_list(self, db_session: Session): self.personal_info_contract_list.clear() @@ -132,7 +133,7 @@ def __set_block_number(self, db_session: Session, block_number: int): db_session.merge(_block_number) def __sync_all(self, db_session: Session, block_from: int, block_to: int): - LOG.info(f"syncing from={block_from}, to={block_to}") + LOG.info(f"Syncing from={block_from}, to={block_to}") self.__sync_personal_info_register( db_session=db_session, block_from=block_from, @@ -228,7 +229,6 @@ def main(): while True: try: processor.process() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/indexer_position_bond.py b/batch/indexer_position_bond.py index 0cc5b903..bbd4c8dc 100644 --- a/batch/indexer_position_bond.py +++ b/batch/indexer_position_bond.py @@ -87,6 +87,7 @@ def sync_new_logs(self): db_session.commit() finally: db_session.close() + LOG.info("Sync job has been completed") def __get_contract_list(self, db_session: Session): self.exchange_address_list = [] @@ -149,7 +150,7 @@ def __set_idx_position_block_number(self, db_session: Session, block_number: int db_session.merge(_idx_position_block_number) def __sync_all(self, db_session: Session, block_from: int, block_to: int): - LOG.info("syncing from={}, to={}".format(block_from, block_to)) + LOG.info("Syncing from={}, to={}".format(block_from, block_to)) self.__sync_issuer(db_session) self.__sync_issue(db_session, block_from, block_to) self.__sync_transfer(db_session, block_from, block_to) @@ -748,7 +749,6 @@ def main(): while True: try: processor.sync_new_logs() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/indexer_position_share.py b/batch/indexer_position_share.py index bfab196d..a754f92e 100644 --- a/batch/indexer_position_share.py +++ b/batch/indexer_position_share.py @@ -87,6 +87,7 @@ def sync_new_logs(self): db_session.commit() finally: db_session.close() + LOG.info("Sync job has been completed") def __get_contract_list(self, db_session: Session): self.exchange_address_list = [] @@ -150,7 +151,7 @@ def __set_idx_position_block_number(self, db_session: Session, block_number: int db_session.merge(_idx_position_block_number) def __sync_all(self, db_session: Session, block_from: int, block_to: int): - LOG.info("syncing from={}, to={}".format(block_from, block_to)) + LOG.info("Syncing from={}, to={}".format(block_from, block_to)) self.__sync_issuer(db_session) self.__sync_issue(db_session, block_from, block_to) self.__sync_transfer(db_session, block_from, block_to) @@ -749,7 +750,6 @@ def main(): while True: try: processor.sync_new_logs() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/indexer_token_holders.py b/batch/indexer_token_holders.py index 02af33c4..dca4e196 100644 --- a/batch/indexer_token_holders.py +++ b/batch/indexer_token_holders.py @@ -156,14 +156,14 @@ def collect(self): self.__process_all(local_session, self.block_from, self.block_to) self.__update_status(local_session, TokenHolderBatchStatus.DONE) local_session.commit() - except Exception: - LOG.exception("An exception occurred during event synchronization") + LOG.info("Collect job has been completed") + except Exception as e: local_session.rollback() self.__update_status(local_session, TokenHolderBatchStatus.FAILED) local_session.commit() + raise e finally: local_session.close() - LOG.info(f"<{process_name}> Collect job has been completed") def __update_status(self, local_session: Session, status: TokenHolderBatchStatus): self.target.batch_status = status.value @@ -365,7 +365,6 @@ def main(): while True: try: processor.collect() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/indexer_transfer.py b/batch/indexer_transfer.py index d6b662f5..9b850c23 100644 --- a/batch/indexer_transfer.py +++ b/batch/indexer_transfer.py @@ -81,6 +81,7 @@ def sync_new_logs(self): db_session.commit() finally: db_session.close() + LOG.info("Sync job has been completed") def __get_token_list(self, db_session: Session): issued_token_address_list: tuple[str, ...] = tuple( @@ -127,7 +128,7 @@ def __set_idx_transfer_block_number(self, db_session: Session, block_number: int db_session.merge(_idx_transfer_block_number) def __sync_all(self, db_session: Session, block_from: int, block_to: int): - LOG.info(f"syncing from={block_from}, to={block_to}") + LOG.info(f"Syncing from={block_from}, to={block_to}") self.__sync_transfer(db_session, block_from, block_to) def __sync_transfer(self, db_session: Session, block_from: int, block_to: int): @@ -191,7 +192,6 @@ def main(): while True: try: processor.sync_new_logs() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/indexer_transfer_approval.py b/batch/indexer_transfer_approval.py index 871f6193..75beb0c4 100644 --- a/batch/indexer_transfer_approval.py +++ b/batch/indexer_transfer_approval.py @@ -40,7 +40,6 @@ ) from app.model.db import ( Token, - TokenType, IDXTransferApproval, IDXTransferApprovalBlockNumber, Notification, @@ -106,6 +105,7 @@ def sync_new_logs(self): db_session.commit() finally: db_session.close() + LOG.info("Sync job has been completed") def __get_contract_list(self, db_session: Session): self.exchange_list = [] @@ -171,7 +171,7 @@ def __set_idx_transfer_approval_block_number(self, db_session: Session, block_nu db_session.merge(_idx_transfer_approval_block_number) def __sync_all(self, db_session: Session, block_from: int, block_to: int): - LOG.info(f"syncing from={block_from}, to={block_to}") + LOG.info(f"Syncing from={block_from}, to={block_to}") self.__sync_token_apply_for_transfer(db_session, block_from, block_to) self.__sync_token_cancel_transfer(db_session, block_from, block_to) self.__sync_token_approve_transfer(db_session, block_from, block_to) diff --git a/batch/processor_batch_issue_redeem.py b/batch/processor_batch_issue_redeem.py index 16433f44..addca215 100644 --- a/batch/processor_batch_issue_redeem.py +++ b/batch/processor_batch_issue_redeem.py @@ -57,6 +57,12 @@ ) import batch_log +""" +[PROCESSOR-Batch-Issue-Redeem] + +Batch processing for additional issuance and redemption +""" + process_name = "PROCESSOR-Batch-Issue-Redeem" LOG = batch_log.get_logger(process_name=process_name) @@ -72,6 +78,8 @@ def process(self): filter(BatchIssueRedeemUpload.processed == False). \ all() for upload in upload_list: + LOG.info(f"Process start: upload_id={upload.upload_id}") + # Get issuer's private key issuer_account = db_session.query(Account). \ filter(Account.issuer_address == upload.issuer_address). \ @@ -196,6 +204,8 @@ def process(self): # Update to processed upload.processed = True db_session.commit() + + LOG.info(f"Process end: upload_id={upload.upload_id}") finally: db_session.close() @@ -230,7 +240,6 @@ def main(): while True: try: processor.process() - LOG.debug("Processed") except SQLAlchemyError as sa_err: LOG.error(f"A database error has occurred: code={sa_err.code}\n{sa_err}") except Exception as ex: diff --git a/batch/processor_batch_register_personal_info.py b/batch/processor_batch_register_personal_info.py index 5e8dd3b5..f6d88f36 100644 --- a/batch/processor_batch_register_personal_info.py +++ b/batch/processor_batch_register_personal_info.py @@ -30,7 +30,13 @@ path = os.path.join(os.path.dirname(__file__), "../") sys.path.append(path) -from config import DATABASE_URL, ZERO_ADDRESS, BATCH_REGISTER_PERSONAL_INFO_WORKER_LOT_SIZE, BATCH_REGISTER_PERSONAL_INFO_INTERVAL, BATCH_REGISTER_PERSONAL_INFO_WORKER_COUNT +from config import ( + DATABASE_URL, + ZERO_ADDRESS, + BATCH_REGISTER_PERSONAL_INFO_WORKER_LOT_SIZE, + BATCH_REGISTER_PERSONAL_INFO_INTERVAL, + BATCH_REGISTER_PERSONAL_INFO_WORKER_COUNT +) from app.model.db import ( Account, Token, @@ -53,6 +59,12 @@ ) import batch_log +""" +[PROCESSOR-Batch-Register-Personal-Info] + +Batch processing for force registration of investor's personal information +""" + process_name = "PROCESSOR-Batch-Register-Personal-Info" LOG = batch_log.get_logger(process_name=process_name) @@ -77,9 +89,12 @@ def process(self): return for _upload in upload_list: + LOG.info(f"<{self.thread_num}> Process start: upload_id={_upload.upload_id}") + # Get issuer's private key - issuer_account: Account | None = db_session.query(Account). \ - filter(Account.issuer_address == _upload.issuer_address).first() + issuer_account: Account | None = db_session.query(Account).\ + filter(Account.issuer_address == _upload.issuer_address).\ + first() if issuer_account is None: # If issuer does not exist, update the status of the upload to ERROR LOG.warning(f"Issuer of the upload_id:{_upload.upload_id} does not exist") self.__sink_on_finish_upload_process( @@ -177,6 +192,8 @@ def process(self): ) db_session.commit() self.__release_processing_issuer(_upload.upload_id) + + LOG.info(f"<{self.thread_num}> Process end: upload_id={_upload.upload_id}") finally: self.personal_info_contract_accessor_map = {} db_session.close() @@ -355,7 +372,7 @@ def main(): worker = Worker(i) thread = threading.Thread(target=worker.run, daemon=True) thread.start() - LOG.info(f"thread {i} started") + LOG.debug(f"Thread {i} started") while True: if len(err_bucket) > 0: diff --git a/batch/processor_bulk_transfer.py b/batch/processor_bulk_transfer.py index 97137cd0..616c33fe 100644 --- a/batch/processor_bulk_transfer.py +++ b/batch/processor_bulk_transfer.py @@ -62,6 +62,12 @@ ) import batch_log +""" +[PROCESSOR-Bulk-Transfer] + +Asynchronous batch processing for token bulk transfers +""" + process_name = "PROCESSOR-Bulk-Transfer" LOG = batch_log.get_logger(process_name=process_name) @@ -81,8 +87,7 @@ def process(self): return for _upload in upload_list: - LOG.info( - f"thread {self.thread_num} START upload_id:{_upload.upload_id} issuer_address:{_upload.issuer_address}") + LOG.info(f"<{self.thread_num}> Process start: upload_id={_upload.upload_id}") # Get issuer's private key try: @@ -212,6 +217,7 @@ def process(self): db_session.commit() self.__release_processing_issuer(_upload.upload_id) + LOG.info(f"<{self.thread_num}> Process end: upload_id={_upload.upload_id}") finally: db_session.close() @@ -354,7 +360,7 @@ def main(): worker = Worker(i) thread = threading.Thread(target=worker.run, daemon=True) thread.start() - LOG.info(f"thread {i} started") + LOG.debug(f"thread {i} started") while True: if len(err_bucket) > 0: diff --git a/batch/processor_create_utxo.py b/batch/processor_create_utxo.py index d05156d0..58110923 100644 --- a/batch/processor_create_utxo.py +++ b/batch/processor_create_utxo.py @@ -45,6 +45,12 @@ from app.exceptions import ServiceUnavailableError import batch_log +""" +[PROCESSOR-Create-UTXO] + +Batch processing for creation of ledger data +""" + process_name = "PROCESSOR-Create-UTXO" LOG = batch_log.get_logger(process_name=process_name) @@ -76,7 +82,7 @@ def process(self): if block_to - block_from > CREATE_UTXO_BLOCK_LOT_MAX_SIZE - 1: block_to = block_from + CREATE_UTXO_BLOCK_LOT_MAX_SIZE - 1 latest_synced = False - LOG.info(f"syncing from={block_from}, to={block_to}") + LOG.info(f"Syncing from={block_from}, to={block_to}") for token_contract in self.token_contract_list: event_triggered = False event_triggered = event_triggered | self.__process_transfer( @@ -106,6 +112,7 @@ def process(self): db_session.commit() finally: db_session.close() + LOG.info("Sync job has been completed") return latest_synced @@ -423,7 +430,6 @@ def main(): latest_synced = True try: latest_synced = processor.process() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/processor_generate_rsa_key.py b/batch/processor_generate_rsa_key.py index a8e03032..b2cc07e0 100644 --- a/batch/processor_generate_rsa_key.py +++ b/batch/processor_generate_rsa_key.py @@ -40,6 +40,12 @@ from app.utils.e2ee_utils import E2EEUtils import batch_log +""" +[PROCESSOR-Generate-RSA-Key] + +Process for generating and updating issuer RSA keys +""" + process_name = "PROCESSOR-Generate-RSA-Key" LOG = batch_log.get_logger(process_name=process_name) @@ -54,20 +60,25 @@ def process(self): account_list = self.__get_account_list(db_session=db_session) for account in account_list: + LOG.info(f"Process start: issuer_address={account.issuer_address}") + # rsa_passphrase is encrypted, so decrypt it. passphrase = E2EEUtils.decrypt(account.rsa_passphrase) - LOG.info(f"Generate Start: issuer_address={account.issuer_address}") + # Generate RSA key rsa_private_pem, rsa_public_pem = self.__generate_rsa_key(passphrase) - LOG.info(f"Generate End: issuer_address={account.issuer_address}") + # Update the issuer's RSA key data self.__sink_on_account( db_session=db_session, issuer_address=account.issuer_address, rsa_private_pem=rsa_private_pem, rsa_public_pem=rsa_public_pem ) + db_session.commit() + LOG.info(f"Process end: issuer_address={account.issuer_address}") + finally: db_session.close() @@ -113,7 +124,6 @@ def main(): while True: try: processor.process() - LOG.debug("Processed") except SQLAlchemyError as sa_err: LOG.error(f"A database error has occurred: code={sa_err.code}\n{sa_err}") except Exception as ex: diff --git a/batch/processor_modify_personal_info.py b/batch/processor_modify_personal_info.py index dad2424b..7a8d4db1 100644 --- a/batch/processor_modify_personal_info.py +++ b/batch/processor_modify_personal_info.py @@ -52,6 +52,13 @@ from app.exceptions import ServiceUnavailableError import batch_log +""" +[PROCESSOR-Modify-Personal-Info] + +Re-encryption of investor's personal information +when the issuer's RSA key is updated. +""" + process_name = "PROCESSOR-Modify-Personal-Info" LOG = batch_log.get_logger(process_name=process_name) @@ -65,6 +72,7 @@ def process(self): try: temporary_list = self.__get_temporary_list(db_session=db_session) for temporary in temporary_list: + LOG.info(f"Process start: issuer={temporary.issuer_address}") contract_accessor_list = self.__get_personal_info_contract_accessor_list( db_session=db_session, @@ -72,13 +80,13 @@ def process(self): ) # Get target PersonalInfo account address - idx_personal_info_list = db_session.query(IDXPersonalInfo).filter( - IDXPersonalInfo.issuer_address == temporary.issuer_address).all() + idx_personal_info_list = db_session.query(IDXPersonalInfo).\ + filter(IDXPersonalInfo.issuer_address == temporary.issuer_address).\ + all() count = len(idx_personal_info_list) completed_count = 0 for idx_personal_info in idx_personal_info_list: - # Get target PersonalInfo contract accessor for contract_accessor in contract_accessor_list: is_registered = ContractUtils.call_function( @@ -101,12 +109,16 @@ def process(self): # Confirm to that modify data is not modified in the next process. completed_count += 1 + # Once all investors' personal information has been updated, + # update the status of the issuer's RSA key. if count == completed_count: self.__sink_on_account( db_session=db_session, issuer_address=temporary.issuer_address ) db_session.commit() + + LOG.info(f"Process end: issuer={temporary.issuer_address}") finally: db_session.close() @@ -170,14 +182,10 @@ def __modify_personal_info(self, personal_info_contract_accessor.issuer.rsa_private_key = temporary.rsa_private_key personal_info_contract_accessor.issuer.rsa_passphrase = temporary.rsa_passphrase # Modify - LOG.info( - f"Modify Start: issuer_address={temporary.issuer_address}, account_address={idx_personal_info.account_address}") info = personal_info_contract_accessor.get_info( idx_personal_info.account_address, default_value=None ) - LOG.info( - f"Modify End: issuer_address={temporary.issuer_address}, account_address={idx_personal_info.account_address}") # Back RSA personal_info_contract_accessor.issuer.rsa_private_key = org_rsa_private_key personal_info_contract_accessor.issuer.rsa_passphrase = org_rsa_passphrase @@ -224,7 +232,6 @@ def main(): while True: try: processor.process() - LOG.debug("Processed") except ServiceUnavailableError: LOG.warning("An external service was unavailable") except SQLAlchemyError as sa_err: diff --git a/batch/processor_monitor_block_sync.py b/batch/processor_monitor_block_sync.py index ee774169..eda293ed 100644 --- a/batch/processor_monitor_block_sync.py +++ b/batch/processor_monitor_block_sync.py @@ -47,6 +47,12 @@ from app.exceptions import ServiceUnavailableError import batch_log +""" +[PROCESSOR-Monitor-Block-Sync] + +Processor for block synchronization monitoring +""" + process_name = "PROCESSOR-Monitor-Block-Sync" LOG = batch_log.get_logger(process_name=process_name) diff --git a/batch/processor_rotate_e2e_messaging_rsa_key.py b/batch/processor_rotate_e2e_messaging_rsa_key.py index 8bf9fb89..2b38e9d0 100644 --- a/batch/processor_rotate_e2e_messaging_rsa_key.py +++ b/batch/processor_rotate_e2e_messaging_rsa_key.py @@ -54,6 +54,12 @@ ) import batch_log +""" +[PROCESSOR-Rotate-E2E-Messaging-RSA-Key] + +Processor for key rotation for encrypted E2E messaging on the blockchain +""" + process_name = "PROCESSOR-Rotate-E2E-Messaging-RSA-Key" LOG = batch_log.get_logger(process_name=process_name) @@ -135,6 +141,7 @@ def __auto_generate_rsa_key(self, db_session: Session, base_time: int, e2e_messa tx_from=e2e_messaging_account.account_address, private_key=private_key ) + LOG.info(f"New RSA key created: account_address={e2e_messaging_account.account_address}") except ContractRevertError as e: LOG.warning(f"Transaction reverted: account_address=<{e2e_messaging_account.account_address}> error_code:<{e.code}> error_msg:<{e.message}>") return diff --git a/batch/processor_scheduled_events.py b/batch/processor_scheduled_events.py index 4c21bc84..7f9c15f6 100644 --- a/batch/processor_scheduled_events.py +++ b/batch/processor_scheduled_events.py @@ -66,6 +66,12 @@ ) import batch_log +""" +[PROCESSOR-Scheduled-Events] + +Processor for scheduled token update events +""" + process_name = "PROCESSOR-Scheduled-Events" LOG = batch_log.get_logger(process_name=process_name) @@ -129,7 +135,8 @@ def __release_processing_issuer(self, issuer_address: str): def __process(self, db_session: Session, events_list: List[ScheduledEvents]): for _event in events_list: - LOG.info(f"START event_id:{_event.id}") + LOG.info(f"<{self.thread_num}> Process start: upload_id={_event.id}") + _upload_status = 1 # Get issuer's private key @@ -236,6 +243,8 @@ def __process(self, db_session: Session, events_list: List[ScheduledEvents]): ) db_session.commit() + LOG.info(f"<{self.thread_num}> Process end: upload_id={_event.id}") + @staticmethod def __sink_on_finish_event_process(db_session: Session, record_id: int, status: int): scheduled_event_record = db_session.query(ScheduledEvents). \ @@ -295,7 +304,7 @@ def main(): worker = Worker(i) thread = threading.Thread(target=worker.run, daemon=True) thread.start() - LOG.info(f"thread {i} started") + LOG.debug(f"thread {i} started") while True: time.sleep(99999) diff --git a/batch/processor_update_token.py b/batch/processor_update_token.py index 870f5eeb..2bf55cff 100644 --- a/batch/processor_update_token.py +++ b/batch/processor_update_token.py @@ -64,6 +64,12 @@ ) import batch_log +""" +[PROCESSOR-Update-token] + +Processor for asynchronous updating of update items when issuing new tokens +""" + process_name = "PROCESSOR-Update-token" LOG = batch_log.get_logger(process_name=process_name) @@ -77,6 +83,7 @@ def process(self): try: _update_token_list = self.__get_update_token_list(db_session=db_session) for _update_token in _update_token_list: + LOG.info(f"Process start: upload_id={_update_token.token_address}") notice_type = "" if _update_token.trigger == "Issue": @@ -232,6 +239,7 @@ def process(self): arguments=_update_token.arguments) db_session.commit() + LOG.info(f"Process end: upload_id={_update_token.token_address}") finally: db_session.close() diff --git a/tests/test_batch_indexer_issue_redeem.py b/tests/test_batch_indexer_issue_redeem.py index cd739a48..78634c0f 100644 --- a/tests/test_batch_indexer_issue_redeem.py +++ b/tests/test_batch_indexer_issue_redeem.py @@ -703,14 +703,6 @@ def test_error_1(self, main_func, db: Session, personal_info_contract, caplog: p db.commit() - # Run mainloop once successfully - with patch("batch.indexer_issue_redeem.INDEXER_SYNC_INTERVAL", None),\ - patch.object(Processor, "sync_new_logs", return_value=True),\ - pytest.raises(TypeError): - main_func() - assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "Processed")) - caplog.clear() - # Run mainloop once and fail with web3 utils error with patch("batch.indexer_issue_redeem.INDEXER_SYNC_INTERVAL", None),\ patch.object(web3.eth, "contract", side_effect=ServiceUnavailableError()), \ diff --git a/tests/test_batch_indexer_personal_info.py b/tests/test_batch_indexer_personal_info.py index 72d96936..2ec6fa26 100644 --- a/tests/test_batch_indexer_personal_info.py +++ b/tests/test_batch_indexer_personal_info.py @@ -804,7 +804,7 @@ def test_normal_5(self, processor: Processor, db: Session, caplog: pytest.LogCap db.commit() processor.process() - assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "skip Process")) + assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "skip process")) # # If DB session fails in phase sinking register/modify events, batch logs exception message. @@ -942,14 +942,6 @@ def test_error_1(self, main_func, db: Session, personal_info_contract, caplog: p db.commit() - # Run mainloop once successfully - with patch("batch.indexer_personal_info.INDEXER_SYNC_INTERVAL", None),\ - patch.object(Processor, "process", return_value=True), \ - pytest.raises(TypeError): - main_func() - assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "Processed")) - caplog.clear() - # Run mainloop once and fail with web3 utils error with patch("batch.indexer_personal_info.INDEXER_SYNC_INTERVAL", None),\ patch.object(web3.eth, "contract", side_effect=ServiceUnavailableError()), \ diff --git a/tests/test_batch_indexer_position_bond.py b/tests/test_batch_indexer_position_bond.py index 2e88dd6f..6bc368aa 100644 --- a/tests/test_batch_indexer_position_bond.py +++ b/tests/test_batch_indexer_position_bond.py @@ -2708,14 +2708,6 @@ def test_error_1(self, main_func, db:Session, personal_info_contract, caplog: py db.commit() - # Run mainloop once successfully - with patch("batch.indexer_position_bond.INDEXER_SYNC_INTERVAL", None),\ - patch.object(Processor, "sync_new_logs", return_value=True),\ - pytest.raises(TypeError): - main_func() - assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "Processed")) - caplog.clear() - # Run mainloop once and fail with web3 utils error with patch("batch.indexer_position_bond.INDEXER_SYNC_INTERVAL", None),\ patch.object(web3.eth, "contract", side_effect=ServiceUnavailableError()), \ diff --git a/tests/test_batch_indexer_position_share.py b/tests/test_batch_indexer_position_share.py index de50d30f..3b420718 100644 --- a/tests/test_batch_indexer_position_share.py +++ b/tests/test_batch_indexer_position_share.py @@ -2708,14 +2708,6 @@ def test_error_1(self, main_func, db :Session, personal_info_contract, caplog: p db.commit() - # Run mainloop once successfully - with patch("batch.indexer_position_share.INDEXER_SYNC_INTERVAL", None),\ - patch.object(Processor, "sync_new_logs", return_value=True),\ - pytest.raises(TypeError): - main_func() - assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "Processed")) - caplog.clear() - # Run mainloop once and fail with web3 utils error with patch("batch.indexer_position_share.INDEXER_SYNC_INTERVAL", None),\ patch.object(web3.eth, "contract", side_effect=ServiceUnavailableError()), \ diff --git a/tests/test_batch_indexer_token_holders.py b/tests/test_batch_indexer_token_holders.py index 2259a687..ca33b484 100644 --- a/tests/test_batch_indexer_token_holders.py +++ b/tests/test_batch_indexer_token_holders.py @@ -987,7 +987,7 @@ def test_normal_7( processor.collect() assert 1 == caplog.record_tuples.count((LOG.name, logging.INFO, f"Token holder list({_token_holders_list1.list_id}) status changes to be done.")) - assert 2 == caplog.record_tuples.count((LOG.name, logging.INFO, f" Collect job has been completed")) + assert 1 == caplog.record_tuples.count((LOG.name, logging.INFO, "Collect job has been completed")) # # StraightBond @@ -1175,7 +1175,6 @@ def test_error_1( processor.collect() assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "There are no pending collect batch")) - assert 1 == caplog.record_tuples.count((LOG.name, logging.INFO, f" Collect job has been completed")) # # There is target token holders list id with batch_status PENDING. @@ -1201,7 +1200,6 @@ def test_error_2( processor.collect() assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "Token contract must be listed to TokenList contract.")) assert 1 == caplog.record_tuples.count((LOG.name, logging.INFO, f"Token holder list({target_token_holders_list.list_id}) status changes to be failed.")) - assert 1 == caplog.record_tuples.count((LOG.name, logging.INFO, f" Collect job has been completed")) # Batch status of token holders list expects to be "ERROR" error_record_num = len(list(db.query(TokenHoldersList).filter(TokenHoldersList.batch_status == TokenHolderBatchStatus.FAILED.value))) @@ -1311,13 +1309,6 @@ def test_error_4(self, main_func, db: Session, ibet_security_token_escrow_contra db.add(_token_holders_list) db.commit() - with patch("batch.indexer_token_holders.INDEXER_SYNC_INTERVAL", None),\ - patch.object(Processor, "collect", return_value=True),\ - pytest.raises(TypeError): - main_func() - assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "Processed")) - caplog.clear() - with patch("batch.indexer_token_holders.INDEXER_SYNC_INTERVAL", None),\ patch.object(Session, "close", side_effect=SQLAlchemyError()), \ pytest.raises(TypeError): diff --git a/tests/test_batch_indexer_transfer.py b/tests/test_batch_indexer_transfer.py index d516ff44..3ef8636e 100644 --- a/tests/test_batch_indexer_transfer.py +++ b/tests/test_batch_indexer_transfer.py @@ -746,14 +746,6 @@ def test_error_1(self, main_func, db:Session, personal_info_contract, caplog: py db.commit() - # Run mainloop once successfully - with patch("batch.indexer_transfer.INDEXER_SYNC_INTERVAL", None),\ - patch.object(Processor, "sync_new_logs", return_value=True),\ - pytest.raises(TypeError): - main_func() - assert 1 == caplog.record_tuples.count((LOG.name, logging.DEBUG, "Processed")) - caplog.clear() - # Run mainloop once and fail with web3 utils error with patch("batch.indexer_transfer.INDEXER_SYNC_INTERVAL", None),\ patch.object(web3.eth, "contract", side_effect=ServiceUnavailableError()), \