From f3832052994add050956987fc6250498af0d3ef5 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Tue, 7 Nov 2023 13:27:55 -0500 Subject: [PATCH 1/5] feat: optimized sub-optimal scan_token_activity cron --- app/api/v1/cron.py | 71 ++++++++++++++++++++++++++++++---------------- app/config.py | 1 + config.yaml | 1 + 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/app/api/v1/cron.py b/app/api/v1/cron.py index dd073f0..91123d7 100644 --- a/app/api/v1/cron.py +++ b/app/api/v1/cron.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import json from typing import List from blspy import G1Element @@ -72,37 +73,57 @@ async def _scan_token_activity( logger.info(f"Scanning blocks {start_height} - {end_height} for activity") - climate_units = climate_warehouse.combine_climate_units_and_metadata(search={}) - for unit in climate_units: - token = unit.get("token") - - # is None or empty - if not token: - logger.warning(f"Can not get token in climate warehouse unit. unit:{unit}") - continue - - public_key = G1Element.from_bytes(hexstr_to_bytes(token["public_key"])) - - activities: List[schemas.Activity] = await blockchain.get_activities( - org_uid=token["org_uid"], - warehouse_project_id=token["warehouse_project_id"], - vintage_year=token["vintage_year"], - sequence_num=token["sequence_num"], - public_key=public_key, - start_height=start_height, - end_height=end_height, - peak_height=state.peak_height, - ) - - if len(activities) == 0: + # Check if SCAN_ALL_ORGANIZATIONS is defined and True, otherwise treat as False + scan_all = getattr(settings, 'SCAN_ALL_ORGANIZATIONS', False) + + all_organizations = climate_warehouse.get_climate_organizations() + if not scan_all: + # Convert to a list of organizations where `isHome` is True + climate_organizations = [org for org in all_organizations.values() if org.get('isHome', False)] + else: + # Convert to a list of all organizations + climate_organizations = list(all_organizations.values()) + + for org_uid, org_name in climate_organizations.items(): + org_metadata = climate_warehouse.get_climate_organizations_metadata(org_uid) + if not org_metadata: + logger.warning(f"Cannot get metadata in CADT organization: {org_name}") continue - db_crud.batch_insert_ignore_activity(activities) + for key, value_str in org_metadata.items(): + try: + tokenization_dict = json.loads(value_str) + required_fields = ['org_uid', 'warehouse_project_id', 'vintage_year', 'sequence_num', 'public_key', 'index'] + optional_fields = ['permissionless_retirement', 'detokenization'] + + if not all(field in tokenization_dict for field in required_fields) or not any(field in tokenization_dict for field in optional_fields): + # not a tokenization record + continue + + public_key = G1Element.from_bytes(hexstr_to_bytes(tokenization_dict["public_key"])) + activities = await blockchain.get_activities( + org_uid=tokenization_dict["org_uid"], + warehouse_project_id=tokenization_dict["warehouse_project_id"], + vintage_year=tokenization_dict["vintage_year"], + sequence_num=tokenization_dict["sequence_num"], + public_key=public_key, + start_height=state.current_height, + end_height=end_height, + peak_height=state.peak_height + ) + + if activities: + db_crud.batch_insert_ignore_activity(activities) + logger.info(f"Activities for {org_name} and asset id: {key} added to the database.") + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON for key {key} in organization {org_name}: {str(e)}") + except Exception as e: + logger.error(f"An error occurred for organization {org_name} under key {key}: {str(e)}") db_crud.update_block_state(current_height=target_start_height) return True - @router.on_event("startup") @repeat_every(seconds=60, logger=logger) @disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT]) diff --git a/app/config.py b/app/config.py index acb02c5..d05866d 100644 --- a/app/config.py +++ b/app/config.py @@ -57,6 +57,7 @@ class Settings(BaseSettings): CLIMATE_TOKEN_CLIENT_PORT: Optional[int] = None CLIMATE_TOKEN_REGISTRY_PORT: Optional[int] = None DEV_PORT: Optional[int] = None + SCAN_ALL_ORGANIZATIONS: Optional[bool] = False _instance: Optional[Settings] = None diff --git a/config.yaml b/config.yaml index de415d2..22e2ddc 100644 --- a/config.yaml +++ b/config.yaml @@ -14,3 +14,4 @@ CLIMATE_TOKEN_REGISTRY_PORT: 31312 CLIMATE_EXPLORER_PORT: 31313 CLIMATE_TOKEN_CLIENT_PORT: 31314 DEV_PORT: 31999 +SCAN_ALL_ORGANIZATIONS: false From 0c125b36fb4f467cd3d59332365db507a20f7396 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Wed, 8 Nov 2023 14:07:03 -0500 Subject: [PATCH 2/5] fix org loop --- app/api/v1/cron.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app/api/v1/cron.py b/app/api/v1/cron.py index 91123d7..ca22a81 100644 --- a/app/api/v1/cron.py +++ b/app/api/v1/cron.py @@ -84,7 +84,10 @@ async def _scan_token_activity( # Convert to a list of all organizations climate_organizations = list(all_organizations.values()) - for org_uid, org_name in climate_organizations.items(): + for org in climate_organizations: + org_uid = org.orgUid; + org_name = org.name; + org_metadata = climate_warehouse.get_climate_organizations_metadata(org_uid) if not org_metadata: logger.warning(f"Cannot get metadata in CADT organization: {org_name}") From 2cd1fc6c5d0348c836a99000494296cd5f5a7621 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Wed, 8 Nov 2023 14:11:01 -0500 Subject: [PATCH 3/5] fix org loop --- app/api/v1/cron.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/api/v1/cron.py b/app/api/v1/cron.py index ca22a81..e695460 100644 --- a/app/api/v1/cron.py +++ b/app/api/v1/cron.py @@ -85,8 +85,8 @@ async def _scan_token_activity( climate_organizations = list(all_organizations.values()) for org in climate_organizations: - org_uid = org.orgUid; - org_name = org.name; + org_uid = org["orgUid"] + org_name = org["name"] org_metadata = climate_warehouse.get_climate_organizations_metadata(org_uid) if not org_metadata: From e5f276ddeebb0136a8d94e95e9d592a3ce079be3 Mon Sep 17 00:00:00 2001 From: Earle Lowe Date: Wed, 8 Nov 2023 11:49:00 -0800 Subject: [PATCH 4/5] style: linting update --- app/api/v1/cron.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/app/api/v1/cron.py b/app/api/v1/cron.py index e695460..c393532 100644 --- a/app/api/v1/cron.py +++ b/app/api/v1/cron.py @@ -74,12 +74,12 @@ async def _scan_token_activity( logger.info(f"Scanning blocks {start_height} - {end_height} for activity") # Check if SCAN_ALL_ORGANIZATIONS is defined and True, otherwise treat as False - scan_all = getattr(settings, 'SCAN_ALL_ORGANIZATIONS', False) + scan_all = getattr(settings, "SCAN_ALL_ORGANIZATIONS", False) all_organizations = climate_warehouse.get_climate_organizations() if not scan_all: # Convert to a list of organizations where `isHome` is True - climate_organizations = [org for org in all_organizations.values() if org.get('isHome', False)] + climate_organizations = [org for org in all_organizations.values() if org.get("isHome", False)] else: # Convert to a list of all organizations climate_organizations = list(all_organizations.values()) @@ -96,10 +96,19 @@ async def _scan_token_activity( for key, value_str in org_metadata.items(): try: tokenization_dict = json.loads(value_str) - required_fields = ['org_uid', 'warehouse_project_id', 'vintage_year', 'sequence_num', 'public_key', 'index'] - optional_fields = ['permissionless_retirement', 'detokenization'] - - if not all(field in tokenization_dict for field in required_fields) or not any(field in tokenization_dict for field in optional_fields): + required_fields = [ + "org_uid", + "warehouse_project_id", + "vintage_year", + "sequence_num", + "public_key", + "index", + ] + optional_fields = ["permissionless_retirement", "detokenization"] + + if not all(field in tokenization_dict for field in required_fields) or not any( + field in tokenization_dict for field in optional_fields + ): # not a tokenization record continue @@ -112,9 +121,9 @@ async def _scan_token_activity( public_key=public_key, start_height=state.current_height, end_height=end_height, - peak_height=state.peak_height + peak_height=state.peak_height, ) - + if activities: db_crud.batch_insert_ignore_activity(activities) logger.info(f"Activities for {org_name} and asset id: {key} added to the database.") @@ -127,6 +136,7 @@ async def _scan_token_activity( db_crud.update_block_state(current_height=target_start_height) return True + @router.on_event("startup") @repeat_every(seconds=60, logger=logger) @disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT]) From e69d540b8b6d32a31dda2a9f5ebd4e6f30d1d0a6 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Wed, 8 Nov 2023 15:21:11 -0500 Subject: [PATCH 5/5] fix: activities scan --- app/api/v1/cron.py | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/app/api/v1/cron.py b/app/api/v1/cron.py index e695460..68e0ffa 100644 --- a/app/api/v1/cron.py +++ b/app/api/v1/cron.py @@ -74,12 +74,12 @@ async def _scan_token_activity( logger.info(f"Scanning blocks {start_height} - {end_height} for activity") # Check if SCAN_ALL_ORGANIZATIONS is defined and True, otherwise treat as False - scan_all = getattr(settings, 'SCAN_ALL_ORGANIZATIONS', False) + scan_all = getattr(settings, "SCAN_ALL_ORGANIZATIONS", False) all_organizations = climate_warehouse.get_climate_organizations() if not scan_all: # Convert to a list of organizations where `isHome` is True - climate_organizations = [org for org in all_organizations.values() if org.get('isHome', False)] + climate_organizations = [org for org in all_organizations.values() if org.get("isHome", False)] else: # Convert to a list of all organizations climate_organizations = list(all_organizations.values()) @@ -96,15 +96,24 @@ async def _scan_token_activity( for key, value_str in org_metadata.items(): try: tokenization_dict = json.loads(value_str) - required_fields = ['org_uid', 'warehouse_project_id', 'vintage_year', 'sequence_num', 'public_key', 'index'] - optional_fields = ['permissionless_retirement', 'detokenization'] - - if not all(field in tokenization_dict for field in required_fields) or not any(field in tokenization_dict for field in optional_fields): + required_fields = [ + "org_uid", + "warehouse_project_id", + "vintage_year", + "sequence_num", + "public_key", + "index", + ] + optional_fields = ["permissionless_retirement", "detokenization"] + + if not all(field in tokenization_dict for field in required_fields) or not any( + field in tokenization_dict for field in optional_fields + ): # not a tokenization record continue public_key = G1Element.from_bytes(hexstr_to_bytes(tokenization_dict["public_key"])) - activities = await blockchain.get_activities( + activities: List[schemas.Activity] = await blockchain.get_activities( org_uid=tokenization_dict["org_uid"], warehouse_project_id=tokenization_dict["warehouse_project_id"], vintage_year=tokenization_dict["vintage_year"], @@ -112,12 +121,14 @@ async def _scan_token_activity( public_key=public_key, start_height=state.current_height, end_height=end_height, - peak_height=state.peak_height + peak_height=state.peak_height, ) - - if activities: - db_crud.batch_insert_ignore_activity(activities) - logger.info(f"Activities for {org_name} and asset id: {key} added to the database.") + + if len(activities) == 0: + continue + + db_crud.batch_insert_ignore_activity(activities) + logger.info(f"Activities for {org_name} and asset id: {key} added to the database.") except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON for key {key} in organization {org_name}: {str(e)}") @@ -127,6 +138,7 @@ async def _scan_token_activity( db_crud.update_block_state(current_height=target_start_height) return True + @router.on_event("startup") @repeat_every(seconds=60, logger=logger) @disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT])