Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release 1.0.37 #148

Merged
merged 11 commits into from
Dec 12, 2023
5 changes: 3 additions & 2 deletions app/api/v1/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ async def _scan_token_activity(
db_crud.batch_insert_ignore_activity(activities)
logger.info(f"Activities for {org_name} and asset id: {key} added to the database.")

except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON for key {key} in organization {org_name}: {str(e)}")
# This is causing logging for benign errors, so commenting out for now
# except json.JSONDecodeError as e:
# logger.error(f"Failed to parse JSON for key {key} in organization {org_name}: {str(e)}")
except Exception as e:
logger.error(f"An error occurred for organization {org_name} under key {key}: {str(e)}")

Expand Down
80 changes: 58 additions & 22 deletions app/crud/chia.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,72 @@ def _headers(self) -> Dict[str, str]:

return headers

def get_climate_units(self, search: Dict[str, Any]) -> Any:
def _get_paginated_data(self, path: str, search_params: Dict[str, Any]) -> List[Any]:
"""
Generic function to retrieve paginated data from a given path.

Args:
path: API endpoint path.
search_params: A dictionary of search parameters including pagination.

Returns:
A list of all data retrieved from the paginated API.
"""
all_data = []
page = 1
limit = 10

try:
params = urlencode(search)
url = urlparse(self.url + "/v1/units")
while True:
# Update search parameters with current page and limit
params = {**search_params, "page": page, "limit": limit}
encoded_params = urlencode(params)

r = requests.get(url.geturl(), params=params, headers=self._headers())
if r.status_code != requests.codes.ok:
logger.error(f"Request Url: {r.url} Error Message: {r.text}")
raise error_code.internal_server_error(message="Call Climate API Failure")
# Construct the URL
url = urlparse(f"{self.url}{path}?{encoded_params}")

return r.json()
response = requests.get(url.geturl(), headers=self._headers())
if response.status_code != requests.codes.ok:
logger.error(f"Request Url: {response.url} Error Message: {response.text}")
raise error_code.internal_server_error(message="API Call Failure")

except TimeoutError as e:
logger.error("Call Climate API Timeout, ErrorMessage: " + str(e))
raise error_code.internal_server_error("Call Climate API Timeout")
data = response.json()

def get_climate_projects(self) -> Any:
try:
url = urlparse(self.url + "/v1/projects")
all_data.extend(data["data"]) # Add data from the current page

r = requests.get(url.geturl(), headers=self._headers())
if r.status_code != requests.codes.ok:
logger.error(f"Request Url: {r.url} Error Message: {r.text}")
raise error_code.internal_server_error(message="Call Climate API Failure")
if page >= data["pageCount"]:
break # Exit loop if all pages have been processed

return r.json()
page += 1

return all_data

except TimeoutError as e:
logger.error("Call Climate API Timeout, ErrorMessage: " + str(e))
raise error_code.internal_server_error("Call Climate API Timeout")
logger.error("API Call Timeout, ErrorMessage: " + str(e))
raise error_code.internal_server_error("API Call Timeout")

def get_climate_units(self, search: Dict[str, Any]) -> Any:
"""
Retrieves all climate units using pagination and given search parameters.

Args:
search: A dictionary of search parameters.

Returns:
A JSON object containing all the climate units.
"""
search_with_marketplace = {**search, "hasMarketplaceIdentifier": True}
return self._get_paginated_data("/v1/units", search_with_marketplace)

def get_climate_projects(self) -> Any:
"""
Retrieves all climate projects using pagination.

Returns:
A JSON object containing all the climate projects.
"""
search_params = {"onlyMarketplaceProjects": True}
return self._get_paginated_data("/v1/projects", search_params)

def get_climate_organizations(self) -> Any:
try:
Expand Down Expand Up @@ -142,7 +178,7 @@ def combine_climate_units_and_metadata(self, search: Dict[str, Any]) -> List[Dic
try:
warehouse_project_id = unit["issuance"]["warehouseProjectId"]
project = project_by_id[warehouse_project_id]
except KeyError:
except (KeyError, TypeError):
logger.warning(f"Can not get project by warehouse_project_id: {warehouse_project_id}")
continue

Expand Down
10 changes: 8 additions & 2 deletions app/logger.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from __future__ import annotations

import importlib.metadata
import logging

import uvicorn

from app.config import settings

version = importlib.metadata.version("Chia Climate Token Driver")

# Define the log format with version
log_format = f"%(asctime)s,%(msecs)d {version} %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s"

logging.basicConfig(
level=logging.INFO,
filename=settings.LOG_PATH,
format="%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
format=log_format,
filemode="w",
)

Expand All @@ -18,7 +24,7 @@

log_config["formatters"]["default"].update(
{
"fmt": "%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
"fmt": log_format,
"datefmt": "%Y-%m-%d:%H:%M:%S",
}
)
Loading