Skip to content

Commit

Permalink
Improve batch log output (#442)
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshihitoAso authored Dec 21, 2022
1 parent 1669dd7 commit d83acc8
Show file tree
Hide file tree
Showing 25 changed files with 128 additions and 94 deletions.
1 change: 0 additions & 1 deletion batch/indexer_block_tx_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def main():
while True:
try:
processor.process()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
4 changes: 2 additions & 2 deletions batch/indexer_e2e_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def process(self):
db_session.commit()
finally:
db_session.close()
LOG.info("Sync job has been completed")

def __get_idx_e2e_messaging_block_number(self, db_session: Session):
_idx_e2e_messaging_block_number = db_session.query(IDXE2EMessagingBlockNumber). \
Expand All @@ -128,7 +129,7 @@ def __set_idx_e2e_messaging_block_number(self, db_session: Session, block_number
db_session.merge(_idx_e2e_messaging_block_number)

def __sync_all(self, db_session: Session, block_from: int, block_to: int):
LOG.info(f"syncing from={block_from}, to={block_to}")
LOG.info(f"Syncing from={block_from}, to={block_to}")
self.__sync_message(db_session, block_from, block_to)

def __sync_message(self, db_session: Session, block_from: int, block_to: int):
Expand Down Expand Up @@ -281,7 +282,6 @@ def main():
start_time = time.time()
try:
processor.process()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
4 changes: 2 additions & 2 deletions batch/indexer_issue_redeem.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def sync_new_logs(self):
db_session.commit()
finally:
db_session.close()
LOG.info("Sync job has been completed")

def __get_token_list(self, db_session: Session):
issued_token_address_list: tuple[str, ...] = tuple(
Expand Down Expand Up @@ -128,7 +129,7 @@ def __set_idx_transfer_block_number(self, db_session: Session, block_number: int
db_session.merge(_idx_transfer_block_number)

def __sync_all(self, db_session: Session, block_from: int, block_to: int):
LOG.info(f"syncing from={block_from}, to={block_to}")
LOG.info(f"Syncing from={block_from}, to={block_to}")
self.__sync_issue(db_session, block_from, block_to)
self.__sync_redeem(db_session, block_from, block_to)

Expand Down Expand Up @@ -232,7 +233,6 @@ def main():
while True:
try:
processor.sync_new_logs()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
6 changes: 3 additions & 3 deletions batch/indexer_personal_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def process(self):
latest_block = web3.eth.block_number # latest blockNumber

if block_number >= latest_block:
LOG.debug("skip Process")
LOG.debug("skip process")
else:
self.__sync_all(
db_session=db_session,
Expand All @@ -79,6 +79,7 @@ def process(self):
db_session.commit()
finally:
db_session.close()
LOG.info("Sync job has been completed")

def __refresh_personal_info_list(self, db_session: Session):
self.personal_info_contract_list.clear()
Expand Down Expand Up @@ -132,7 +133,7 @@ def __set_block_number(self, db_session: Session, block_number: int):
db_session.merge(_block_number)

def __sync_all(self, db_session: Session, block_from: int, block_to: int):
LOG.info(f"syncing from={block_from}, to={block_to}")
LOG.info(f"Syncing from={block_from}, to={block_to}")
self.__sync_personal_info_register(
db_session=db_session,
block_from=block_from,
Expand Down Expand Up @@ -228,7 +229,6 @@ def main():
while True:
try:
processor.process()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
4 changes: 2 additions & 2 deletions batch/indexer_position_bond.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def sync_new_logs(self):
db_session.commit()
finally:
db_session.close()
LOG.info("Sync job has been completed")

def __get_contract_list(self, db_session: Session):
self.exchange_address_list = []
Expand Down Expand Up @@ -149,7 +150,7 @@ def __set_idx_position_block_number(self, db_session: Session, block_number: int
db_session.merge(_idx_position_block_number)

def __sync_all(self, db_session: Session, block_from: int, block_to: int):
LOG.info("syncing from={}, to={}".format(block_from, block_to))
LOG.info("Syncing from={}, to={}".format(block_from, block_to))
self.__sync_issuer(db_session)
self.__sync_issue(db_session, block_from, block_to)
self.__sync_transfer(db_session, block_from, block_to)
Expand Down Expand Up @@ -748,7 +749,6 @@ def main():
while True:
try:
processor.sync_new_logs()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
4 changes: 2 additions & 2 deletions batch/indexer_position_share.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def sync_new_logs(self):
db_session.commit()
finally:
db_session.close()
LOG.info("Sync job has been completed")

def __get_contract_list(self, db_session: Session):
self.exchange_address_list = []
Expand Down Expand Up @@ -150,7 +151,7 @@ def __set_idx_position_block_number(self, db_session: Session, block_number: int
db_session.merge(_idx_position_block_number)

def __sync_all(self, db_session: Session, block_from: int, block_to: int):
LOG.info("syncing from={}, to={}".format(block_from, block_to))
LOG.info("Syncing from={}, to={}".format(block_from, block_to))
self.__sync_issuer(db_session)
self.__sync_issue(db_session, block_from, block_to)
self.__sync_transfer(db_session, block_from, block_to)
Expand Down Expand Up @@ -749,7 +750,6 @@ def main():
while True:
try:
processor.sync_new_logs()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
7 changes: 3 additions & 4 deletions batch/indexer_token_holders.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ def collect(self):
self.__process_all(local_session, self.block_from, self.block_to)
self.__update_status(local_session, TokenHolderBatchStatus.DONE)
local_session.commit()
except Exception:
LOG.exception("An exception occurred during event synchronization")
LOG.info("Collect job has been completed")
except Exception as e:
local_session.rollback()
self.__update_status(local_session, TokenHolderBatchStatus.FAILED)
local_session.commit()
raise e
finally:
local_session.close()
LOG.info(f"<{process_name}> Collect job has been completed")

def __update_status(self, local_session: Session, status: TokenHolderBatchStatus):
self.target.batch_status = status.value
Expand Down Expand Up @@ -365,7 +365,6 @@ def main():
while True:
try:
processor.collect()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
4 changes: 2 additions & 2 deletions batch/indexer_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def sync_new_logs(self):
db_session.commit()
finally:
db_session.close()
LOG.info("Sync job has been completed")

def __get_token_list(self, db_session: Session):
issued_token_address_list: tuple[str, ...] = tuple(
Expand Down Expand Up @@ -127,7 +128,7 @@ def __set_idx_transfer_block_number(self, db_session: Session, block_number: int
db_session.merge(_idx_transfer_block_number)

def __sync_all(self, db_session: Session, block_from: int, block_to: int):
LOG.info(f"syncing from={block_from}, to={block_to}")
LOG.info(f"Syncing from={block_from}, to={block_to}")
self.__sync_transfer(db_session, block_from, block_to)

def __sync_transfer(self, db_session: Session, block_from: int, block_to: int):
Expand Down Expand Up @@ -191,7 +192,6 @@ def main():
while True:
try:
processor.sync_new_logs()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
4 changes: 2 additions & 2 deletions batch/indexer_transfer_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
)
from app.model.db import (
Token,
TokenType,
IDXTransferApproval,
IDXTransferApprovalBlockNumber,
Notification,
Expand Down Expand Up @@ -106,6 +105,7 @@ def sync_new_logs(self):
db_session.commit()
finally:
db_session.close()
LOG.info("Sync job has been completed")

def __get_contract_list(self, db_session: Session):
self.exchange_list = []
Expand Down Expand Up @@ -171,7 +171,7 @@ def __set_idx_transfer_approval_block_number(self, db_session: Session, block_nu
db_session.merge(_idx_transfer_approval_block_number)

def __sync_all(self, db_session: Session, block_from: int, block_to: int):
LOG.info(f"syncing from={block_from}, to={block_to}")
LOG.info(f"Syncing from={block_from}, to={block_to}")
self.__sync_token_apply_for_transfer(db_session, block_from, block_to)
self.__sync_token_cancel_transfer(db_session, block_from, block_to)
self.__sync_token_approve_transfer(db_session, block_from, block_to)
Expand Down
11 changes: 10 additions & 1 deletion batch/processor_batch_issue_redeem.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@
)
import batch_log

"""
[PROCESSOR-Batch-Issue-Redeem]
Batch processing for additional issuance and redemption
"""

process_name = "PROCESSOR-Batch-Issue-Redeem"
LOG = batch_log.get_logger(process_name=process_name)

Expand All @@ -72,6 +78,8 @@ def process(self):
filter(BatchIssueRedeemUpload.processed == False). \
all()
for upload in upload_list:
LOG.info(f"Process start: upload_id={upload.upload_id}")

# Get issuer's private key
issuer_account = db_session.query(Account). \
filter(Account.issuer_address == upload.issuer_address). \
Expand Down Expand Up @@ -196,6 +204,8 @@ def process(self):
# Update to processed
upload.processed = True
db_session.commit()

LOG.info(f"Process end: upload_id={upload.upload_id}")
finally:
db_session.close()

Expand Down Expand Up @@ -230,7 +240,6 @@ def main():
while True:
try:
processor.process()
LOG.debug("Processed")
except SQLAlchemyError as sa_err:
LOG.error(f"A database error has occurred: code={sa_err.code}\n{sa_err}")
except Exception as ex:
Expand Down
25 changes: 21 additions & 4 deletions batch/processor_batch_register_personal_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@
path = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(path)

from config import DATABASE_URL, ZERO_ADDRESS, BATCH_REGISTER_PERSONAL_INFO_WORKER_LOT_SIZE, BATCH_REGISTER_PERSONAL_INFO_INTERVAL, BATCH_REGISTER_PERSONAL_INFO_WORKER_COUNT
from config import (
DATABASE_URL,
ZERO_ADDRESS,
BATCH_REGISTER_PERSONAL_INFO_WORKER_LOT_SIZE,
BATCH_REGISTER_PERSONAL_INFO_INTERVAL,
BATCH_REGISTER_PERSONAL_INFO_WORKER_COUNT
)
from app.model.db import (
Account,
Token,
Expand All @@ -53,6 +59,12 @@
)
import batch_log

"""
[PROCESSOR-Batch-Register-Personal-Info]
Batch processing for force registration of investor's personal information
"""

process_name = "PROCESSOR-Batch-Register-Personal-Info"
LOG = batch_log.get_logger(process_name=process_name)

Expand All @@ -77,9 +89,12 @@ def process(self):
return

for _upload in upload_list:
LOG.info(f"<{self.thread_num}> Process start: upload_id={_upload.upload_id}")

# Get issuer's private key
issuer_account: Account | None = db_session.query(Account). \
filter(Account.issuer_address == _upload.issuer_address).first()
issuer_account: Account | None = db_session.query(Account).\
filter(Account.issuer_address == _upload.issuer_address).\
first()
if issuer_account is None: # If issuer does not exist, update the status of the upload to ERROR
LOG.warning(f"Issuer of the upload_id:{_upload.upload_id} does not exist")
self.__sink_on_finish_upload_process(
Expand Down Expand Up @@ -177,6 +192,8 @@ def process(self):
)
db_session.commit()
self.__release_processing_issuer(_upload.upload_id)

LOG.info(f"<{self.thread_num}> Process end: upload_id={_upload.upload_id}")
finally:
self.personal_info_contract_accessor_map = {}
db_session.close()
Expand Down Expand Up @@ -355,7 +372,7 @@ def main():
worker = Worker(i)
thread = threading.Thread(target=worker.run, daemon=True)
thread.start()
LOG.info(f"thread {i} started")
LOG.debug(f"Thread {i} started")

while True:
if len(err_bucket) > 0:
Expand Down
12 changes: 9 additions & 3 deletions batch/processor_bulk_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
)
import batch_log

"""
[PROCESSOR-Bulk-Transfer]
Asynchronous batch processing for token bulk transfers
"""

process_name = "PROCESSOR-Bulk-Transfer"
LOG = batch_log.get_logger(process_name=process_name)

Expand All @@ -81,8 +87,7 @@ def process(self):
return

for _upload in upload_list:
LOG.info(
f"thread {self.thread_num} START upload_id:{_upload.upload_id} issuer_address:{_upload.issuer_address}")
LOG.info(f"<{self.thread_num}> Process start: upload_id={_upload.upload_id}")

# Get issuer's private key
try:
Expand Down Expand Up @@ -212,6 +217,7 @@ def process(self):

db_session.commit()
self.__release_processing_issuer(_upload.upload_id)
LOG.info(f"<{self.thread_num}> Process end: upload_id={_upload.upload_id}")
finally:
db_session.close()

Expand Down Expand Up @@ -354,7 +360,7 @@ def main():
worker = Worker(i)
thread = threading.Thread(target=worker.run, daemon=True)
thread.start()
LOG.info(f"thread {i} started")
LOG.debug(f"thread {i} started")

while True:
if len(err_bucket) > 0:
Expand Down
10 changes: 8 additions & 2 deletions batch/processor_create_utxo.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
from app.exceptions import ServiceUnavailableError
import batch_log

"""
[PROCESSOR-Create-UTXO]
Batch processing for creation of ledger data
"""

process_name = "PROCESSOR-Create-UTXO"
LOG = batch_log.get_logger(process_name=process_name)

Expand Down Expand Up @@ -76,7 +82,7 @@ def process(self):
if block_to - block_from > CREATE_UTXO_BLOCK_LOT_MAX_SIZE - 1:
block_to = block_from + CREATE_UTXO_BLOCK_LOT_MAX_SIZE - 1
latest_synced = False
LOG.info(f"syncing from={block_from}, to={block_to}")
LOG.info(f"Syncing from={block_from}, to={block_to}")
for token_contract in self.token_contract_list:
event_triggered = False
event_triggered = event_triggered | self.__process_transfer(
Expand Down Expand Up @@ -106,6 +112,7 @@ def process(self):
db_session.commit()
finally:
db_session.close()
LOG.info("Sync job has been completed")

return latest_synced

Expand Down Expand Up @@ -423,7 +430,6 @@ def main():
latest_synced = True
try:
latest_synced = processor.process()
LOG.debug("Processed")
except ServiceUnavailableError:
LOG.warning("An external service was unavailable")
except SQLAlchemyError as sa_err:
Expand Down
Loading

0 comments on commit d83acc8

Please sign in to comment.