From 1af054417e1b6ab872d0c777aff7a176801cd924 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 28 Nov 2024 15:25:38 +0000 Subject: [PATCH 1/9] Implementing the functionality to publish to HDX --- config/hdx_dataset_static.yml | 15 ++ config/project_configuration.yaml | 26 +++ exploration/02_wrangle_floodscan.py | 4 +- hdx_floodscan.py | 207 ++++++++++++++++++++ requirements.txt | 1 + run.py | 280 ++++++++++++++++++++++++++++ src/datasources/floodscan.py | 7 +- src/utils/cloud_utils.py | 40 ++++ 8 files changed, 574 insertions(+), 6 deletions(-) create mode 100644 config/hdx_dataset_static.yml create mode 100644 config/project_configuration.yaml create mode 100644 hdx_floodscan.py create mode 100644 run.py diff --git a/config/hdx_dataset_static.yml b/config/hdx_dataset_static.yml new file mode 100644 index 0000000..a127b20 --- /dev/null +++ b/config/hdx_dataset_static.yml @@ -0,0 +1,15 @@ +license_id: "cc-by-igo" +methodology: " +1. Raster \n +- a. Daily FloodScan SFED(flood fraction) band obtained directly from AER.\n +- b. Daily historical SFED band smoothed with 11 day centered rolling mean (+/-5 days).\n +- c. Average day-of-year (DOY) raster calculated using last 10 years of smoothed data (2013-2023) to create SFED Baseline band\n +- d. SFED & SFED Baseline band merged per day.\n +- e. Last 90 days of merged SFED + SFED Baseline bands zipped\n +\n +2. Tabular \n +-a. Daily zonal stats (mean) for all historical SFED data 1998-01-01 - 2023-12-31 for each Admin 2 covered by FloodScan extent\n +-b. Return periods calculated form zonal mean data per admin 2 using the Log III Pearson distribution." +dataset_source: "AER FloodScan: Atmospheric and Environmental Research (AER). Modified and Simplified by UN OCHA" +package_creator: "isabelle-tot" +private: False \ No newline at end of file diff --git a/config/project_configuration.yaml b/config/project_configuration.yaml new file mode 100644 index 0000000..6ee1928 --- /dev/null +++ b/config/project_configuration.yaml @@ -0,0 +1,26 @@ +# Collector specific configuration +#TODO add details below to github secrets +url: "test" +account: "account" +container: "container" +last90_days_filename: "{dir}baseline_v2024-01-31_v05r01.nc4" +stats_filename: "20240701_som_floodscan_adm1.xlsx" +key: "key" + +dataset_names: + HDX-FLOODSCAN: "hdx-floodscan" + +allowed_tags: + - "climate hazards" + - "climate-weather" + - "hydrology" + - "natural disasters" + +# Metadata +title: "HDX FloodScan" +update_frequency: "daily" +maintainer_id: "2f9fd160-2a16-49c0-89d6-0bc3230599bf" +organization_id: "hdx" +description_90days_file: "Daily zip file containing previous 90 days of raster both SFED and SFED baseline bands." +description_stats_file: "Excel file containing date, admin metadata, raw SFED mean values (per admin 2), and equivalent approximate calculated return period." +notes: "HDX FloodScan Notes" \ No newline at end of file diff --git a/exploration/02_wrangle_floodscan.py b/exploration/02_wrangle_floodscan.py index e180126..e798831 100644 --- a/exploration/02_wrangle_floodscan.py +++ b/exploration/02_wrangle_floodscan.py @@ -38,12 +38,12 @@ import xarray as xr from src.datasources import floodscan -from src.utils import gen_utils +from src.utils import date_utils # %% # just a quick utility function to get recent floodscan date that will # will be in blob. Only doing a couple days just as proof of concept. -end_date = gen_utils.date_to_run() +end_date = date_utils.date_to_run() start_date = end_date - datetime.timedelta(days=3) da_current = floodscan.load_floodscan_cogs( start_date=start_date, end_date=end_date diff --git a/hdx_floodscan.py b/hdx_floodscan.py new file mode 100644 index 0000000..9e34ce2 --- /dev/null +++ b/hdx_floodscan.py @@ -0,0 +1,207 @@ +#!/usr/bin/python +""" +HDX Pipeline: +------------ + +TODO +- Add summary about this dataset pipeline + +""" +import logging +import os +from copy import copy +from datetime import datetime, timezone +import pandas as pd +from hdx.data.dataset import Dataset +from slugify import slugify +import xarray as xr +import shutil + +from src.utils.date_utils import create_date_range, get_start_and_last_date_from_90_days + +logger = logging.getLogger(__name__) +DATE_FORMAT = "%Y-%m-%d" + + +class HDXFloodscan: + def __init__(self, configuration, retriever, folder, errors): + self.configuration = configuration + self.retriever = retriever + self.folder = folder + self.manual_url = None + self.dataset_data = {} + self.errors = errors + self.created_date = None + self.start_date = None + self.latest_date = None + + def get_data(self): + + try: + url = os.environ["BLOB_URL"] + account = os.environ["STORAGE_ACCOUNT"] + container = os.environ["CONTAINER"] + key = os.environ["KEY"] + except Exception: + url = self.configuration["url"] + account = self.configuration["account"] + container = self.configuration["container"] + key = self.configuration["key"] + + last90_days_filename = self.configuration["last90_days_filename"] + stats_filename = self.configuration["stats_filename"] + dataset_name = self.configuration["dataset_names"]["HDX-FLOODSCAN"] + + last90_days_files = self._get_latest_90_days_geotiffs(account, container, key) + historical_baseline = self._get_historical_baseline(account, container, key, last90_days_filename) + last90_days_file = self._generate_zipped_file(last90_days_files, historical_baseline) + + # Find the minimum and maximum dates + self.start_date, self.latest_date = get_start_and_last_date_from_90_days(last90_days_file) + + # Save all geotiffs as one zipped file + last90_days_file = shutil.make_archive("baseline_zipped_file", 'zip', "geotiffs") + shutil.rmtree("geotiffs") + + stats_file = self.retriever.download_file( + url=url, + account=account, + container="dsci", # TODO change this to parameter before pushing changes to prod + key=key, + blob=stats_filename) + + data_df_stats = pd.read_excel(stats_file, sheet_name="FloodScan", keep_default_na=False).replace('[“”]', '', regex=True) + + self.dataset_data[dataset_name] = [data_df_stats.apply(lambda x: x.to_dict(), axis=1), + last90_days_file] + + self.created_date = datetime.fromtimestamp((os.path.getctime(stats_file)), tz=timezone.utc) + return [{"name": dataset_name}] + + def generate_dataset_and_showcase(self, dataset_name): + + # Setting metadata and configurations + name = self.configuration["dataset_names"]["HDX-FLOODSCAN"] + title = self.configuration["title"] + update_frequency = self.configuration["update_frequency"] + dataset = Dataset({"name": slugify(name), "title": title}) + rows = self.dataset_data[dataset_name][0] + dataset.set_maintainer(self.configuration["maintainer_id"]) + dataset.set_organization(self.configuration["organization_id"]) + dataset.set_expected_update_frequency(update_frequency) + dataset.set_subnational(False) + dataset.add_other_location("world") + dataset["notes"] = self.configuration["notes"] + filename = "hdx_floodscan_zonal_stats.xlsx" + resource_data = {"name": filename, + "description": self.configuration["description_stats_file"]} + tags = sorted([t for t in self.configuration["allowed_tags"]]) + dataset.add_tags(tags) + + # Setting time period + start_date = self.start_date + ongoing = False + if not start_date: + logger.error(f"Start date missing for {dataset_name}") + return None, None + dataset.set_time_period(start_date, self.latest_date, ongoing) + + headers = rows[0].keys() + date_headers = [h for h in headers if "date" in h.lower() and type(rows[0][h]) == int] + for row in rows: + for date_header in date_headers: + row_date = row[date_header] + if not row_date: + continue + if len(str(row_date)) > 9: + row_date = row_date / 1000 + row_date = datetime.utcfromtimestamp(row_date) + row_date = row_date.strftime("%Y-%m-%d") + row[date_header] = row_date + + rows + dataset.generate_resource_from_rows( + self.folder, + filename, + rows, + resource_data, + list(rows[0].keys()), + encoding='utf-8' + ) + + second_filename = "aer_floodscan_300s_SFED_90d.zip" + resource_data = {"name": second_filename, + "description": self.configuration["description_90days_file"]} + + res = copy(dataset.get_resource(0)) + dataset.resources.append(res) + resource = dataset.get_resource(1) + resource.set_format("zipped geotiff") + resource['name'] = resource_data['name'] + resource['description'] = resource_data['description'] + resource.set_file_to_upload(self.dataset_data[dataset_name][1]) + dataset.add_update_resource(resource) + + return dataset + + def _get_latest_90_days_geotiffs(self, account, container, key): + + geotiffs = [] + # TODO add back yesterday = datetime.today() - pd.DateOffset(days=1) + yesterday = datetime.strptime("2024-01-01", "%Y-%m-%d") + dates = create_date_range(90, yesterday) + + for date in dates: + blob = f"floodscan/daily/v5/processed/aer_area_300s_v{date.strftime(DATE_FORMAT)}_v05r01.tif" + + geotiff_file_for_date = self.retriever.download_file( + url=blob, + account=account, + container=container, + key=key, + blob=blob) + ds = xr.open_dataset(geotiff_file_for_date) + ds['date'] = date + geotiffs.append(ds.sel({"band": 1}, drop=True)) + + return geotiffs + + def _get_historical_baseline(self, account, container, key, blob): + + historical_baseline_file = self.retriever.download_file( + url=blob, + account=account, + container=container, + key=key, + blob=blob) + ds_historical_baseline = xr.open_dataset(historical_baseline_file) + + return ds_historical_baseline + + def _generate_zipped_file(self, last90_days_geotiffs, ds_historical_baseline): + + os.makedirs('geotiffs', exist_ok=True) + + out_files = [] + for ds_current_sfed in last90_days_geotiffs: + ds_historical_baseline = ds_historical_baseline.persist() + + dt_temp_str = ds_current_sfed.date.dt.strftime("%Y%m%d") + doy_temp = int(ds_current_sfed["date"].dt.dayofyear) + ds_current_sfed = ds_current_sfed.drop_vars(["date"]) + h_sfed_temp = ds_historical_baseline.sel({"dayofyear": doy_temp}, drop=True) + + h_sfed_temp_interp = h_sfed_temp.interp_like(ds_current_sfed, method="nearest") + merged_temp = xr.merge([ds_current_sfed, h_sfed_temp_interp]) + merged_temp= merged_temp.drop_vars(["band_data"]) + merged_temp = merged_temp.rename({'__xarray_dataarray_variable__' : "SFED_BASELINE"}) + + out_file = f"geotiffs/{int(dt_temp_str)}_aer_floodscan_sfed.tif" + + # Save geotiff + merged_temp.rio.to_raster(out_file, driver="COG") + out_files.append(out_file) + + return out_files + + diff --git a/requirements.txt b/requirements.txt index eb0c5b7..18fea7d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ netCDF4==1.7.2 scipy==1.14.1 lmoments3==1.0.7 pyarrow==17.0.0 +hdx-python-api==6.2.6 \ No newline at end of file diff --git a/run.py b/run.py new file mode 100644 index 0000000..911130d --- /dev/null +++ b/run.py @@ -0,0 +1,280 @@ +#!/usr/bin/python +""" +Top level script. Calls other functions that generate datasets that this script then creates in HDX. + +""" +import logging +from os.path import expanduser, join, exists +from hdx.utilities.downloader import Download +from hdx.utilities.errors_onexit import ErrorsOnExit +from hdx.utilities.path import progress_storing_folder, wheretostart_tempdir_batch +from hdx.utilities.retriever import Retrieve +from hdx.data.hdxobject import HDXError +from datetime import datetime +import hmac +import hashlib +import base64 +from hdx_floodscan import HDXFloodscan + +"""Facade to simplify project setup that calls project main function with kwargs""" + +import sys +from inspect import getdoc +from typing import Any, Callable, Optional # noqa: F401 +import defopt +from makefun import with_signature +from hdx.api import __version__ +from hdx.api.configuration import Configuration +from hdx.utilities.useragent import UserAgent + +logger = logging.getLogger(__name__) + +lookup = "hdx-floodscan" +updated_by_script = "HDX Scraper: HDX FloodScan" + + +class AzureBlobDownload(Download): + + def download_file( + self, + url: str, + account: str, + container: str, + key: str, + blob: None, + **kwargs: Any, + ) -> str: + """Download file from blob storage and store in provided folder or temporary + folder if no folder supplied. + + Args: + url (str): URL for the exact blob location + account (str): Storage account to access the blob + container (str): Container to download from + key (str): Key to access the blob + blob (str): Name of the blob to be downloaded. If empty, then it is assumed to download the whole container. + **kwargs: See below + folder (str): Folder to download it to. Defaults to temporary folder. + filename (str): Filename to use for downloaded file. Defaults to deriving from url. + path (str): Full path to use for downloaded file instead of folder and filename. + overwrite (bool): Whether to overwrite existing file. Defaults to False. + keep (bool): Whether to keep already downloaded file. Defaults to False. + post (bool): Whether to use POST instead of GET. Defaults to False. + parameters (Dict): Parameters to pass. Defaults to None. + timeout (float): Timeout for connecting to URL. Defaults to None (no timeout). + headers (Dict): Headers to pass. Defaults to None. + encoding (str): Encoding to use for text response. Defaults to None (best guess). + + Returns: + str: Path of downloaded file + """ + folder = kwargs.get("folder") + filename = kwargs.get("filename") + path = kwargs.get("path") + overwrite = kwargs.get("overwrite", False) + keep = kwargs.get("keep", False) + + request_time = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') + api_version = '2018-03-28' + parameters = { + 'verb': 'GET', + 'Content-Encoding': '', + 'Content-Language': '', + 'Content-Length': '', + 'Content-MD5': '', + 'Content-Type': '', + 'Date': '', + 'If-Modified-Since': '', + 'If-Match': '', + 'If-None-Match': '', + 'If-Unmodified-Since': '', + 'Range': '', + 'CanonicalizedHeaders': 'x-ms-date:' + request_time + '\nx-ms-version:' + api_version + '\n', + 'CanonicalizedResource': '/' + account + '/' + container + '/' + blob + } + + signature = (parameters['verb'] + '\n' + + parameters['Content-Encoding'] + '\n' + + parameters['Content-Language'] + '\n' + + parameters['Content-Length'] + '\n' + + parameters['Content-MD5'] + '\n' + + parameters['Content-Type'] + '\n' + + parameters['Date'] + '\n' + + parameters['If-Modified-Since'] + '\n' + + parameters['If-Match'] + '\n' + + parameters['If-None-Match'] + '\n' + + parameters['If-Unmodified-Since'] + '\n' + + parameters['Range'] + '\n' + + parameters['CanonicalizedHeaders'] + + parameters['CanonicalizedResource']) + + signed_string = base64.b64encode(hmac.new(base64.b64decode(key), msg=signature.encode('utf-8'), + digestmod=hashlib.sha256).digest()).decode() + + headers = { + 'x-ms-date': request_time, + 'x-ms-version': api_version, + 'Authorization': ('SharedKey ' + account + ':' + signed_string) + } + + url = ('https://' + account + '.blob.core.windows.net/' + container + '/' + blob) + + if keep and exists(url): + print(f"The blob URL exists: {url}") + return path + self.setup( + url=url, + stream=True, + post=kwargs.get("post", False), + parameters=kwargs.get("parameters"), + timeout=kwargs.get("timeout"), + headers=headers, + encoding=kwargs.get("encoding"), + ) + return self.stream_path( + path, f"Download of {url} failed in retrieval of stream!" + ) + + # TODO Improve logic around facade +def facade(projectmainfn: Callable[[Any], None], **kwargs: Any): + """Facade to simplify project setup that calls project main function. It infers + command line arguments from the passed in function using defopt. The function passed + in should have type hints and a docstring from which to infer the command line + arguments. Any **kwargs given will be merged with command line arguments, with the + command line arguments taking precedence. + + Args: + projectmainfn ((Any) -> None): main function of project + **kwargs: Configuration parameters to pass to HDX Configuration & other parameters to pass to main function + + Returns: + None + """ + + # + # Setting up configuration + # + + parsed_main_doc = defopt._parse_docstring(getdoc(projectmainfn)) + main_doc = [f"{parsed_main_doc.first_line}\n\nArgs:"] + no_main_params = len(parsed_main_doc.params) + for param_name, param_info in parsed_main_doc.params.items(): + main_doc.append( + f"\n {param_name} ({param_info.type}): {param_info.text}" + ) + create_config_doc = getdoc(Configuration.create) + kwargs_index = create_config_doc.index("**kwargs") + kwargs_index = create_config_doc.index("\n", kwargs_index) + args_doc = create_config_doc[kwargs_index:] + main_doc.append(args_doc) + main_doc = "".join(main_doc) + + main_sig = defopt.signature(projectmainfn) + param_names = [] + for param in main_sig.parameters.values(): + param_names.append(str(param)) + + parsed_main_doc = defopt._parse_docstring(main_doc) + main_doc = [f"{parsed_main_doc.first_line}\n\nArgs:"] + count = 0 + for param_name, param_info in parsed_main_doc.params.items(): + param_type = param_info.type + if param_type == "dict": + continue + if count < no_main_params: + count += 1 + else: + if param_type == "str": + param_type = "Optional[str]" + default = None + elif param_type == "bool": + default = False + else: + raise ValueError( + "Configuration.create has new parameter with unknown type!" + ) + param_names.append(f"{param_name}: {param_type} = {default}") + main_doc.append( + f"\n {param_name} ({param_type}): {param_info.text}" + ) + main_doc = "".join(main_doc) + + projectmainname = projectmainfn.__name__ + main_sig = f"{projectmainname}({','.join(param_names)})" + + argv = sys.argv[1:] + for key in kwargs: + name = f"--{key.replace('_', '-')}" + if name not in argv: + argv.append(name) + argv.append(kwargs[key]) + + @with_signature(main_sig) + def gen_func(*args, **kwargs): + """docstring""" + site_url = Configuration._create(*args, **kwargs) + logger.info("--------------------------------------------------") + logger.info(f"> Using HDX Python API Library {__version__}") + logger.info(f"> HDX Site: {site_url}") + + gen_func.__doc__ = main_doc + + configuration_create = defopt.bind(gen_func, argv=argv, cli_options="all") + main_func, _ = defopt.bind_known( + projectmainfn, argv=argv, cli_options="all" + ) + + configuration_create() + UserAgent.user_agent = Configuration.read().user_agent + main_func() + + +def main(save: bool = False, use_saved: bool = False) -> None: + """Generate datasets and create them in HDX""" + with ErrorsOnExit() as errors: + with wheretostart_tempdir_batch(lookup) as info: + folder = info["folder"] + with AzureBlobDownload() as downloader: + retriever = Retrieve( + downloader, folder, "saved_data", folder, save, use_saved + ) + folder = info["folder"] + batch = info["batch"] + configuration = Configuration.read() + floodscan = HDXFloodscan(configuration, retriever, folder, errors) + dataset_names = floodscan.get_data() + logger.info(f"Number of datasets to upload: {len(dataset_names)}") + + for _, nextdict in progress_storing_folder(info, dataset_names, "name"): + dataset_name = nextdict["name"] + dataset = floodscan.generate_dataset_and_showcase(dataset_name=dataset_name) + if dataset: + dataset.update_from_yaml() + dataset["notes"] = dataset["notes"].replace( + "\n", " \n" + ) # ensure markdown has line breaks + try: + dataset.create_in_hdx( + remove_additional_resources=True, + hxl_update=False, + updated_by_script=updated_by_script, + batch=batch, + ignore_fields=["resource:description", "extras"], + ) + except HDXError as err: + errors.add(f"Could not upload {dataset_name}: {err}") + continue + + + +if __name__ == "__main__": + logging.basicConfig() + logging.getLogger().setLevel(logging.INFO) + facade( + main, + user_agent_config_yaml="/Workspace/Shared/init-scripts/.useragents.yaml", + hdx_config_yaml="/Workspace/Shared/init-scripts/.hdx_configuration.yaml", + user_agent_lookup=lookup, + project_config_yaml=join("config", "project_configuration.yaml"), + + ) \ No newline at end of file diff --git a/src/datasources/floodscan.py b/src/datasources/floodscan.py index 426e6cc..ed3ec49 100644 --- a/src/datasources/floodscan.py +++ b/src/datasources/floodscan.py @@ -5,9 +5,8 @@ import rioxarray as rxr import xarray as xr -from src.utils import cloud_utils, cog_utils, gen_utils +from src.utils import cloud_utils, cog_utils, date_utils -# from src.utils import blob DATA_DIR_GDRIVE = Path(os.getenv("AA_DATA_DIR_NEW")) FP_FS_HISTORICAL = ( @@ -44,7 +43,7 @@ def load_floodscan_cogs( cogs_list = [ x.name for x in container_client.list_blobs(name_starts_with=prefix) - if (gen_utils.extract_date(x.name).date() >= start_date) + if (date_utils.extract_date(x.name).date() >= start_date) & (gen_utils.extract_date(x.name).date() <= end_date) # noqa ] @@ -72,7 +71,7 @@ def process_floodscan_cog(cog_name, mode, container_name): cog_name=cog_name, mode=mode, container_name=container_name ) da_in = rxr.open_rasterio(url_str_tmp, chunks="auto") - da_in["date"] = gen_utils.extract_date(url_str_tmp) + da_in["date"] = date_utils.extract_date(url_str_tmp) da_in = da_in.expand_dims(["date"]) da_in = da_in.persist() return da_in diff --git a/src/utils/cloud_utils.py b/src/utils/cloud_utils.py index 18c1096..50d1eee 100644 --- a/src/utils/cloud_utils.py +++ b/src/utils/cloud_utils.py @@ -1,10 +1,13 @@ import os +import logging from io import BytesIO +from azure.core.exceptions import ResourceNotFoundError from azure.storage.blob import ContainerClient from dotenv import load_dotenv load_dotenv() +logger = logging.getLogger(__name__) def get_container_client(mode, container_name): @@ -71,3 +74,40 @@ def write_output_stats(df, fname, mode="dev"): data = parquet_buffer.getvalue() container_client.upload_blob(name=fname, data=data, overwrite=True) return + + +def download_from_azure( + blob_service_client, container_name, blob_path, local_file_path +): + """ + Download a file from Azure Blob Storage. + + Args: + blob_service_client (BlobServiceClient): The Azure Blob Service Client + container_name (str): The name of the container + blob_path (str or Path): The path of the blob in the container + local_file_path (str or Path): The local path where the file should be saved + + Returns: + bool: True if download was successful, False otherwise + """ + try: + # Get the blob client + blob_client = blob_service_client.get_blob_client( + container=container_name, blob=str(blob_path) + ) + + # Download the blob + with open(local_file_path, "wb") as download_file: + download_file.write(blob_client.download_blob().readall()) + + logger.info(f"Successfully downloaded blob {blob_path} to {local_file_path}") + return True + + except ResourceNotFoundError: + logger.warning(f"Blob {blob_path} not found") + return False + + except Exception as e: + logger.error(f"An error occurred while downloading {blob_path}: {str(e)}") + return False \ No newline at end of file From d2e71aaa3fd2da0cc2273e367c9135ab20895d77 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 28 Nov 2024 16:00:09 +0000 Subject: [PATCH 2/9] Adding missing changes --- src/utils/date_utils.py | 61 +++++++++++++++++++++++++++++++++++++++++ src/utils/gen_utils.py | 17 ------------ 2 files changed, 61 insertions(+), 17 deletions(-) create mode 100644 src/utils/date_utils.py delete mode 100644 src/utils/gen_utils.py diff --git a/src/utils/date_utils.py b/src/utils/date_utils.py new file mode 100644 index 0000000..5fbf8eb --- /dev/null +++ b/src/utils/date_utils.py @@ -0,0 +1,61 @@ +import argparse + +import re +from datetime import datetime, timedelta +from zipfile import ZipFile + + +def extract_date(x): + # Extract the date string in the format YYYYMMDD + date_str = re.search(r"\d{8}", x).group() + # Convert the extracted string to a date object + return datetime.strptime(date_str, "%Y%m%d") + + +def date_to_run(date=None): + if date: + ret = datetime.strptime(date, "%Y-%m-%d") # .date() + else: + ret = datetime.today() - timedelta(days=5) # .date() + return ret + +def create_date_range(days, last_date): + """ + Method to create a range of dates fof the past N days. + + Args: + last_date: Which date to use as the end date. + days (int): Number of the days before the last_date to generate range of. + + Returns: + dates (list): List with the range dates + """ + + if days < 1: + raise argparse.ArgumentTypeError("Days cannot be lower than 2.") + + date_range = [] + current_date = last_date + while len(date_range) <= days: + date_range.append(current_date) + current_date -= timedelta(days=1) + + return date_range + + +def get_start_and_last_date_from_90_days_file(zipped_file): + search_str = "([0-9]{4}[0-9]{2}[0-9]{2})" + with ZipFile(zipped_file, "r") as zipobj: + filenames = zipobj.namelist() + newest = max(filenames) + oldest = min(filenames) + end_date_str = re.search(search_str, newest) + start_date_str = re.search(search_str, oldest) + return datetime.strptime(start_date_str[0], "%Y%m%d"), datetime.strptime(end_date_str[0], "%Y%m%d") + +def get_start_and_last_date_from_90_days(files): + search_str = "([0-9]{4}[0-9]{2}[0-9]{2})" + end_date_str = re.search(search_str, max(files)) + start_date_str = re.search(search_str, min(files)) + + return datetime.strptime(start_date_str[0], "%Y%m%d"), datetime.strptime(end_date_str[0], "%Y%m%d") \ No newline at end of file diff --git a/src/utils/gen_utils.py b/src/utils/gen_utils.py deleted file mode 100644 index 2ffe57e..0000000 --- a/src/utils/gen_utils.py +++ /dev/null @@ -1,17 +0,0 @@ -import datetime -import re - - -def extract_date(x): - # Extract the date string in the format YYYYMMDD - date_str = re.search(r"\d{8}", x).group() - # Convert the extracted string to a date object - return datetime.datetime.strptime(date_str, "%Y%m%d") - - -def date_to_run(date=None): - if date: - ret = datetime.datetime.strptime(date, "%Y-%m-%d") # .date() - else: - ret = datetime.date.today() - datetime.timedelta(days=5) # .date() - return ret From febb625e792706f6af4c124d242878e019dad817 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 28 Nov 2024 16:15:18 +0000 Subject: [PATCH 3/9] Adding filename --- config/project_configuration.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/project_configuration.yaml b/config/project_configuration.yaml index 6ee1928..e904a43 100644 --- a/config/project_configuration.yaml +++ b/config/project_configuration.yaml @@ -3,7 +3,7 @@ url: "test" account: "account" container: "container" -last90_days_filename: "{dir}baseline_v2024-01-31_v05r01.nc4" +last90_days_filename: "floodscan/daily/v5/raw/baseline_v2024-01-31_v05r01.nc4" stats_filename: "20240701_som_floodscan_adm1.xlsx" key: "key" From 841843adca1a460f82b441a9b1be5ffc005f8a1a Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 28 Nov 2024 16:30:34 +0000 Subject: [PATCH 4/9] Adding path to create dir --- hdx_floodscan.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hdx_floodscan.py b/hdx_floodscan.py index 9e34ce2..6f708db 100644 --- a/hdx_floodscan.py +++ b/hdx_floodscan.py @@ -9,6 +9,7 @@ """ import logging import os +from pathlib import Path from copy import copy from datetime import datetime, timezone import pandas as pd @@ -180,7 +181,7 @@ def _get_historical_baseline(self, account, container, key, blob): def _generate_zipped_file(self, last90_days_geotiffs, ds_historical_baseline): - os.makedirs('geotiffs', exist_ok=True) + Path("geottifs").mkdir(parents=True, exist_ok=True) out_files = [] for ds_current_sfed in last90_days_geotiffs: From d828e1f0818f269eb95efca1ab74f57ce47a1449 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 28 Nov 2024 16:45:10 +0000 Subject: [PATCH 5/9] Trying to fix databricks path issue --- hdx_floodscan.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hdx_floodscan.py b/hdx_floodscan.py index 6f708db..f9d7acf 100644 --- a/hdx_floodscan.py +++ b/hdx_floodscan.py @@ -9,6 +9,7 @@ """ import logging import os +import sys from pathlib import Path from copy import copy from datetime import datetime, timezone @@ -181,7 +182,8 @@ def _get_historical_baseline(self, account, container, key, blob): def _generate_zipped_file(self, last90_days_geotiffs, ds_historical_baseline): - Path("geottifs").mkdir(parents=True, exist_ok=True) + cur_dir = os.path.dirname(os.path.abspath(sys.argv[0])) + os.mkdir(os.path.join(cur_dir, "geotiffs")) out_files = [] for ds_current_sfed in last90_days_geotiffs: From f8cd39d4555257d797b5ef792f3eb651a1a7d8d3 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 28 Nov 2024 17:01:31 +0000 Subject: [PATCH 6/9] Trying to fix databricks path issue [2] --- hdx_floodscan.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hdx_floodscan.py b/hdx_floodscan.py index f9d7acf..954f5e5 100644 --- a/hdx_floodscan.py +++ b/hdx_floodscan.py @@ -182,8 +182,10 @@ def _get_historical_baseline(self, account, container, key, blob): def _generate_zipped_file(self, last90_days_geotiffs, ds_historical_baseline): - cur_dir = os.path.dirname(os.path.abspath(sys.argv[0])) - os.mkdir(os.path.join(cur_dir, "geotiffs")) + base_dir = os.path.dirname(os.path.abspath(sys.argv[0])) + geotiffs_dir = Path("geotiffs") + new_dir = base_dir / geotiffs_dir + new_dir.mkdir(exist_ok=True) out_files = [] for ds_current_sfed in last90_days_geotiffs: From 2a66030ef5f467cb3db489e4f5ecc48b9d1fa919 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 28 Nov 2024 17:11:51 +0000 Subject: [PATCH 7/9] Trying to fix databricks path issue [3] --- hdx_floodscan.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/hdx_floodscan.py b/hdx_floodscan.py index 954f5e5..b449945 100644 --- a/hdx_floodscan.py +++ b/hdx_floodscan.py @@ -182,10 +182,7 @@ def _get_historical_baseline(self, account, container, key, blob): def _generate_zipped_file(self, last90_days_geotiffs, ds_historical_baseline): - base_dir = os.path.dirname(os.path.abspath(sys.argv[0])) - geotiffs_dir = Path("geotiffs") - new_dir = base_dir / geotiffs_dir - new_dir.mkdir(exist_ok=True) + os.mkdir("geotiffs") out_files = [] for ds_current_sfed in last90_days_geotiffs: From 15053a7053488d12ca7d84b4897d2bf920dedfc7 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Mon, 6 Jan 2025 20:08:20 -0300 Subject: [PATCH 8/9] Fixing the merge of the SFED and SFED_BASELINE bands in the aer_floodscan_300s_SFED_90d.zip file --- config/project_configuration.yaml | 19 +++++- hdx_floodscan.py | 103 +++++++++++++++++++++--------- 2 files changed, 89 insertions(+), 33 deletions(-) diff --git a/config/project_configuration.yaml b/config/project_configuration.yaml index e904a43..c56a8d0 100644 --- a/config/project_configuration.yaml +++ b/config/project_configuration.yaml @@ -4,17 +4,17 @@ url: "test" account: "account" container: "container" last90_days_filename: "floodscan/daily/v5/raw/baseline_v2024-01-31_v05r01.nc4" -stats_filename: "20240701_som_floodscan_adm1.xlsx" key: "key" dataset_names: - HDX-FLOODSCAN: "hdx-floodscan" + HDX-FLOODSCAN: "hdx-floodscan-test" allowed_tags: - "climate hazards" - "climate-weather" - "hydrology" - "natural disasters" + - "flooding" # Metadata title: "HDX FloodScan" @@ -23,4 +23,17 @@ maintainer_id: "2f9fd160-2a16-49c0-89d6-0bc3230599bf" organization_id: "hdx" description_90days_file: "Daily zip file containing previous 90 days of raster both SFED and SFED baseline bands." description_stats_file: "Excel file containing date, admin metadata, raw SFED mean values (per admin 2), and equivalent approximate calculated return period." -notes: "HDX FloodScan Notes" \ No newline at end of file +notes: "Tabular Admin 2: \n +A daily dataset providing average FloodScan Standard Flood Extent Depiction (SFED) flood fraction (0-100%) per admin 2 +level. Historical baseline values (SFED_BASELINE) are calculated per day-of-year from the last 10 years of historical +data (non-inclusive of current year) after applying an 11 day smoothing mean window. Return Period (RP) is calculated +empirically based on all historical data up to the current year (non-inclusive).\n +\n + +Raster Data\n +Zipped Cloud Optimized Geotifs (COGs) containing AER FloodScan estimated daily flood fraction (0-100%) gridded data at +approximately 10 km resolution (0.083 degrees) for the last 90 days. These data are provided as an aggregated version of + FloodScan’s higher resolution data set. Each COG represents the estimates for a single day and includes 2 bands: SFED + (Standard Flood Event Depiction) and SFED_BASELINE. The baseline band provides users an easy way to compare current + values with historical averages. The baseline is calculated per day-of-year from the last 10 years of historical data + (non-inclusive of current year) after applying an 11 day temporal smoothing mean window." diff --git a/hdx_floodscan.py b/hdx_floodscan.py index b449945..bbb9a11 100644 --- a/hdx_floodscan.py +++ b/hdx_floodscan.py @@ -9,16 +9,19 @@ """ import logging import os -import sys -from pathlib import Path from copy import copy from datetime import datetime, timezone import pandas as pd from hdx.data.dataset import Dataset from slugify import slugify import xarray as xr +import rioxarray as rxr +import numpy as np import shutil +from src.utils import pg +from src.utils import return_periods as rp + from src.utils.date_utils import create_date_range, get_start_and_last_date_from_90_days logger = logging.getLogger(__name__) @@ -55,7 +58,7 @@ def get_data(self): dataset_name = self.configuration["dataset_names"]["HDX-FLOODSCAN"] last90_days_files = self._get_latest_90_days_geotiffs(account, container, key) - historical_baseline = self._get_historical_baseline(account, container, key, last90_days_filename) + historical_baseline = self._get_historical_baseline(account, container, key) last90_days_file = self._generate_zipped_file(last90_days_files, historical_baseline) # Find the minimum and maximum dates @@ -65,21 +68,46 @@ def get_data(self): last90_days_file = shutil.make_archive("baseline_zipped_file", 'zip', "geotiffs") shutil.rmtree("geotiffs") - stats_file = self.retriever.download_file( - url=url, - account=account, - container="dsci", # TODO change this to parameter before pushing changes to prod - key=key, - blob=stats_filename) - - data_df_stats = pd.read_excel(stats_file, sheet_name="FloodScan", keep_default_na=False).replace('[“”]', '', regex=True) + merged_zonal_stats_admin1 = self.get_zonal_stats_for_admin(mode="prod", admin_level=1, band='SFED') + merged_zonal_stats_admin2 = self.get_zonal_stats_for_admin(mode="prod", admin_level=2, band='SFED') - self.dataset_data[dataset_name] = [data_df_stats.apply(lambda x: x.to_dict(), axis=1), - last90_days_file] + with pd.ExcelWriter('hdx_floodscan_zonal_stats.xlsx') as excel_merged_file: + pd.DataFrame({'README': ['This is a placeholder']}).to_excel(excel_merged_file, sheet_name='readme') + merged_zonal_stats_admin1.to_excel(excel_merged_file, sheet_name='admin1') + merged_zonal_stats_admin2.to_excel(excel_merged_file, sheet_name='admin2') + self.dataset_data[dataset_name] = [merged_zonal_stats_admin1.apply(lambda x: x.to_dict(), axis=1), + last90_days_file, + excel_merged_file] - self.created_date = datetime.fromtimestamp((os.path.getctime(stats_file)), tz=timezone.utc) + self.created_date = datetime.today().date() return [{"name": dataset_name}] + def get_zonal_stats_for_admin(self, mode, admin_level, band): + df_current = pg.fs_last_90_days(mode=mode, admin_level=admin_level, band=band) + df_yr_max = pg.fs_year_max(mode=mode, admin_level=admin_level, band=band) + df_w_rps = rp.fs_add_rp( + df=df_current, df_maxima=df_yr_max, by=["iso3", "pcode"] + ) + df_w_rps = df_w_rps.rename(columns={'value': band}) + df_rolling_11_day_mean = pg.fs_rolling_11_day_mean( + mode=mode, admin_level=admin_level, band=band + ) + df_rolling_11_day_mean['valid_date'] = pd.to_datetime( + datetime.today().year * 1000 + df_rolling_11_day_mean['doy'], format='%Y%j').dt.strftime(DATE_FORMAT) + df_rolling_11_day_mean = df_rolling_11_day_mean.drop('doy', axis=1) + df_rolling_11_day_mean.valid_date = df_rolling_11_day_mean.valid_date.astype(str) + df_rolling_11_day_mean.iso3 = df_rolling_11_day_mean.iso3.astype(str) + df_rolling_11_day_mean.pcode = df_rolling_11_day_mean.pcode.astype(str) + + df_w_rps.valid_date = df_w_rps.valid_date.astype(str) + df_w_rps.iso3 = df_w_rps.iso3.astype(str) + df_w_rps.pcode = df_w_rps.pcode.astype(str) + + merged_zonal_stats = df_w_rps.merge(df_rolling_11_day_mean, on=["iso3", "pcode", "valid_date"], how="left") + merged_zonal_stats = merged_zonal_stats.rename(columns={'sfed_baseline': "SFED_BASELINE"}) + + return merged_zonal_stats + def generate_dataset_and_showcase(self, dataset_name): # Setting metadata and configurations @@ -130,6 +158,12 @@ def generate_dataset_and_showcase(self, dataset_name): list(rows[0].keys()), encoding='utf-8' ) + res = dataset.get_resource(0) + res['name'] = "hdx_floodscan_zonal_stats.xlsx" + res['description'] = self.configuration["description_stats_file"] + res.set_file_to_upload(self.dataset_data[dataset_name][2]) + res.set_format("xlsx") + dataset.add_update_resource(res) second_filename = "aer_floodscan_300s_SFED_90d.zip" resource_data = {"name": second_filename, @@ -148,7 +182,7 @@ def generate_dataset_and_showcase(self, dataset_name): def _get_latest_90_days_geotiffs(self, account, container, key): - geotiffs = [] + das = {} # TODO add back yesterday = datetime.today() - pd.DateOffset(days=1) yesterday = datetime.strptime("2024-01-01", "%Y-%m-%d") dates = create_date_range(90, yesterday) @@ -162,41 +196,48 @@ def _get_latest_90_days_geotiffs(self, account, container, key): container=container, key=key, blob=blob) - ds = xr.open_dataset(geotiff_file_for_date) - ds['date'] = date - geotiffs.append(ds.sel({"band": 1}, drop=True)) + da_in = rxr.open_rasterio(geotiff_file_for_date, chunks="auto") + das[date] = da_in.sel({'band' : 1}, drop=True) - return geotiffs + return das - def _get_historical_baseline(self, account, container, key, blob): + def _get_historical_baseline(self, account, container, key): + blob = f"floodscan/daily/v5/raw/baseline_v2024-01-31_v05r01.nc4" historical_baseline_file = self.retriever.download_file( url=blob, account=account, container=container, key=key, blob=blob) - ds_historical_baseline = xr.open_dataset(historical_baseline_file) + + chunks = {"lat": 1080, "lon": 1080, "time": 1} + ds_historical_baseline = xr.open_dataset(historical_baseline_file, chunks=chunks) + ds_historical_baseline = ds_historical_baseline.rename_vars( + {"__xarray_dataarray_variable__": "SFED_BASELINE"} + ) return ds_historical_baseline def _generate_zipped_file(self, last90_days_geotiffs, ds_historical_baseline): - os.mkdir("geotiffs") + os.makedirs('geotiffs', exist_ok=True) out_files = [] - for ds_current_sfed in last90_days_geotiffs: + for tif_date in last90_days_geotiffs: + da_current = last90_days_geotiffs[tif_date] ds_historical_baseline = ds_historical_baseline.persist() - dt_temp_str = ds_current_sfed.date.dt.strftime("%Y%m%d") - doy_temp = int(ds_current_sfed["date"].dt.dayofyear) - ds_current_sfed = ds_current_sfed.drop_vars(["date"]) + dt_temp_str = tif_date.strftime("%Y%m%d") + doy_temp = int(tif_date.strftime('%j')) h_sfed_temp = ds_historical_baseline.sel({"dayofyear": doy_temp}, drop=True) - h_sfed_temp_interp = h_sfed_temp.interp_like(ds_current_sfed, method="nearest") - merged_temp = xr.merge([ds_current_sfed, h_sfed_temp_interp]) - merged_temp= merged_temp.drop_vars(["band_data"]) - merged_temp = merged_temp.rename({'__xarray_dataarray_variable__' : "SFED_BASELINE"}) + ds_current_sfed = da_current.to_dataset(name="SFED") + + merged_temp = xr.merge([ds_current_sfed.SFED, h_sfed_temp.SFED_BASELINE], combine_attrs="drop") + merged_temp['SFED'] = merged_temp.SFED.rio.write_nodata(np.nan, inplace=True) + merged_temp = merged_temp.rio.set_spatial_dims(y_dim="y", x_dim="x") + merged_temp = merged_temp.rio.write_crs(4326) out_file = f"geotiffs/{int(dt_temp_str)}_aer_floodscan_sfed.tif" @@ -204,6 +245,8 @@ def _generate_zipped_file(self, last90_days_geotiffs, ds_historical_baseline): merged_temp.rio.to_raster(out_file, driver="COG") out_files.append(out_file) + logger.info("Finished adding baseline geotiffs!") return out_files + From 4fe6adbc02a7b694ef3e8d26a9904c84fed79cf4 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 30 Jan 2025 14:58:56 -0300 Subject: [PATCH 9/9] Merging all changes to get the pipeline running on github actions --- .github/workflows/run-python-script.yaml | 58 ++++ config/hdx_dataset_static.yml | 23 +- config/project_configuration.yaml | 33 +- floodscan.py | 392 +++++++++++++++++++++++ hdx_floodscan.py | 252 --------------- run.py | 279 +++++++--------- src/datasources/floodscan.py | 1 - src/utils/cloud_utils.py | 12 +- src/utils/date_utils.py | 11 +- src/utils/pg.py | 17 +- 10 files changed, 620 insertions(+), 458 deletions(-) create mode 100644 .github/workflows/run-python-script.yaml create mode 100644 floodscan.py delete mode 100644 hdx_floodscan.py diff --git a/.github/workflows/run-python-script.yaml b/.github/workflows/run-python-script.yaml new file mode 100644 index 0000000..9c31731 --- /dev/null +++ b/.github/workflows/run-python-script.yaml @@ -0,0 +1,58 @@ +# This workflow will install Python dependencies and run the script + +name: Run script + +on: + workflow_dispatch: # add run button in github + schedule: + - cron: "0 0 * * *" + +jobs: + run: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.x + uses: actions/setup-python@v4 + with: + python-version: "3.x" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Run script + env: + HDX_SITE: ${{ secrets.HDX_SITE }} + HDX_KEY: ${{ secrets.HDX_BOT_SCRAPERS_API_TOKEN }} + PREPREFIX: ${{ secrets.PREPREFIX }} + USER_AGENT: ${{ secrets.USER_AGENT }} + BLOB_URL: ${{ secrets.BLOB_URL }} + STORAGE_ACCOUNT: ${{ secrets.STORAGE_ACCOUNT }} + CONTAINER: ${{ secrets.CONTAINER }} + BLOB: ${{ secrets.BLOB }} + KEY: ${{ secrets.KEY }} + AZURE_DB_PW_PROD: ${{ secrets.DB_PW }} + run: | + python run.py + - name: Commit updated data bundle + if: success() + uses: stefanzweifel/git-auto-commit-action@v4 + with: + file_pattern: "dataset_dates.txt" + commit_message: automatic - Data bundle updated + push_options: "--force" + skip_dirty_check: false + - name: Send mail + if: failure() + uses: dawidd6/action-send-mail@v3 + with: + server_address: ${{secrets.EMAIL_SERVER}} + server_port: ${{secrets.EMAIL_PORT}} + username: ${{secrets.EMAIL_USERNAME}} + password: ${{secrets.EMAIL_PASSWORD}} + subject: "FAILED: ${{github.repository}} run job" + body: GitHub Actions run job for ${{github.repository}} failed! + to: ${{secrets.EMAIL_LIST}} + from: ${{secrets.EMAIL_FROM}} + content_type: text/html diff --git a/config/hdx_dataset_static.yml b/config/hdx_dataset_static.yml index a127b20..abe0288 100644 --- a/config/hdx_dataset_static.yml +++ b/config/hdx_dataset_static.yml @@ -1,15 +1,16 @@ license_id: "cc-by-igo" methodology: " -1. Raster \n -- a. Daily FloodScan SFED(flood fraction) band obtained directly from AER.\n -- b. Daily historical SFED band smoothed with 11 day centered rolling mean (+/-5 days).\n -- c. Average day-of-year (DOY) raster calculated using last 10 years of smoothed data (2013-2023) to create SFED Baseline band\n -- d. SFED & SFED Baseline band merged per day.\n -- e. Last 90 days of merged SFED + SFED Baseline bands zipped\n +Raster \n +\n- Daily FloodScan SFED(flood fraction) band obtained directly from AER.\n +\n- Daily historical SFED band smoothed with 11 day centered rolling mean (+/-5 days).\n +\n- Average day-of-year (DOY) raster calculated using last 10 years of smoothed data (2013-2023) to create SFED Baseline band\n +\n- SFED & SFED Baseline band merged per day.\n +\n- Last 90 days of merged SFED + SFED Baseline bands zipped\n \n -2. Tabular \n --a. Daily zonal stats (mean) for all historical SFED data 1998-01-01 - 2023-12-31 for each Admin 2 covered by FloodScan extent\n --b. Return periods calculated form zonal mean data per admin 2 using the Log III Pearson distribution." -dataset_source: "AER FloodScan: Atmospheric and Environmental Research (AER). Modified and Simplified by UN OCHA" +Tabular \n +\n- Daily zonal stats (mean) for all historical SFED data 1998-01-01 - 2023-12-31 for each Admin 2 covered by FloodScan extent\n +\n- Return periods calculated form zonal mean data per admin 2 using the Log III Pearson distribution." +dataset_source: "AER FloodScan: Atmospheric and Environmental Research (AER)." package_creator: "isabelle-tot" -private: False \ No newline at end of file +private: False +caveats: "Modified and Simplified by UN OCHA" diff --git a/config/project_configuration.yaml b/config/project_configuration.yaml index c56a8d0..5324282 100644 --- a/config/project_configuration.yaml +++ b/config/project_configuration.yaml @@ -3,11 +3,11 @@ url: "test" account: "account" container: "container" -last90_days_filename: "floodscan/daily/v5/raw/baseline_v2024-01-31_v05r01.nc4" key: "key" +baseline_filename: "floodscan/daily/v5/raw/baseline_v2025-01-01_v05r01.nc4" dataset_names: - HDX-FLOODSCAN: "hdx-floodscan-test" + HDX-FLOODSCAN: "floodscan" allowed_tags: - "climate hazards" @@ -17,23 +17,16 @@ allowed_tags: - "flooding" # Metadata -title: "HDX FloodScan" +title: "FloodScan" update_frequency: "daily" maintainer_id: "2f9fd160-2a16-49c0-89d6-0bc3230599bf" -organization_id: "hdx" -description_90days_file: "Daily zip file containing previous 90 days of raster both SFED and SFED baseline bands." -description_stats_file: "Excel file containing date, admin metadata, raw SFED mean values (per admin 2), and equivalent approximate calculated return period." -notes: "Tabular Admin 2: \n -A daily dataset providing average FloodScan Standard Flood Extent Depiction (SFED) flood fraction (0-100%) per admin 2 -level. Historical baseline values (SFED_BASELINE) are calculated per day-of-year from the last 10 years of historical -data (non-inclusive of current year) after applying an 11 day smoothing mean window. Return Period (RP) is calculated -empirically based on all historical data up to the current year (non-inclusive).\n -\n - -Raster Data\n -Zipped Cloud Optimized Geotifs (COGs) containing AER FloodScan estimated daily flood fraction (0-100%) gridded data at -approximately 10 km resolution (0.083 degrees) for the last 90 days. These data are provided as an aggregated version of - FloodScan’s higher resolution data set. Each COG represents the estimates for a single day and includes 2 bands: SFED - (Standard Flood Event Depiction) and SFED_BASELINE. The baseline band provides users an easy way to compare current - values with historical averages. The baseline is calculated per day-of-year from the last 10 years of historical data - (non-inclusive of current year) after applying an 11 day temporal smoothing mean window." +organization_id: "53acf7a0-29d5-453d-82ab-20daa6645128" +90days_filename: "aer_floodscan_300s_SFED_90d.zip" +stats_filename: "hdx_floodscan_zonal_stats.xlsx" +description_90days_file: "Daily zip file containing previous 90 days of raster data with both SFED and SFED baseline bands." +description_stats_file: "Daily Excel file containing date, admin metadata, raw SFED mean values (per admin 1 and 2), the approximate calculated return period and the baseline value calculated from the past 10 years of data." +notes: "FloodScan uses satellite data to map and monitor floods daily, helping compare current flood conditions with historical averages. This dataset contains two resources: +\n\n +The first (hdx_floodscan_zonal_stats.xlsx) is a daily tabular dataset providing average FloodScan Standard Flood Extent Depiction (SFED) flood fraction (0-100%) per admin 1 and 2 level. Historical baseline values (SFED_BASELINE) are calculated per day-of-year from the last 10 years of historical data (non-inclusive of current year) after applying an 11 day smoothing mean window. Return Period (RP) is calculated empirically based on all historical data up to the current year (non-inclusive). +\n\n +The second resource (aer_floodscan_300s_SFED_90d.zip) is a zipped file containing AER FloodScan estimated daily flood fraction (0-100%) gridded data at approximately 10 km resolution (300 arcseconds equivalent to approximately 0.083 degrees) for the last 90 days. Each file represents the estimates for a single day and includes 2 bands: SFED and SFED_BASELINE. The baseline band provides users an easy way to compare current values with historical averages. The baseline is calculated per day-of-year from the last 10 years of historical data (non-inclusive of current year) after applying an 11 day temporal smoothing mean window." diff --git a/floodscan.py b/floodscan.py new file mode 100644 index 0000000..002ea3f --- /dev/null +++ b/floodscan.py @@ -0,0 +1,392 @@ +# !/usr/bin/python +""" +HDX Pipeline: +------------ + +- This pipeline produces two datasets for FloodScan: + - Zonal stats for the most recently available data + - Geotiffs for the past 90 days + +""" +import logging +import os +import os.path +import re +import shutil +from copy import copy +from datetime import datetime + +import numpy as np +import pandas as pd +import rioxarray as rxr +import xarray as xr +from azure.storage.blob import BlobServiceClient +from hdx.data.dataset import Dataset +from hdx.location.country import Country +from slugify import slugify + +from src.utils import pg +from src.utils import return_periods as rp +from src.utils.date_utils import ( + create_date_range, + get_start_and_last_date_from_90_days, +) + +logger = logging.getLogger(__name__) +DATE_FORMAT = "%Y-%m-%d" + + +class Floodscan: + def __init__(self, configuration, retriever, folder, errors): + self.configuration = configuration + self.retriever = retriever + self.folder = folder + self.manual_url = None + self.dataset_data = {} + self.errors = errors + self.created_date = None + self.start_date = None + self.latest_date = None + + def get_data(self): + try: + account = os.environ["STORAGE_ACCOUNT"] + container = os.environ["CONTAINER"] + key = os.environ["KEY"] + except Exception: + account = self.configuration["account"] + container = self.configuration["container"] + key = self.configuration["key"] + + dataset_name = self.configuration["dataset_names"]["HDX-FLOODSCAN"] + + last90_days_files = self._get_latest_90_days_geotiffs( + account, container, key + ) + historical_baseline = self._get_historical_baseline( + account, container, key + ) + last90_days_file = self._generate_zipped_file( + last90_days_files, historical_baseline + ) + + # Find the minimum and maximum dates + ( + self.start_date, + self.latest_date, + ) = get_start_and_last_date_from_90_days(last90_days_file) + + # save all geotiffs as one zipped file + last90_days_file = shutil.make_archive( + "baseline_zipped_file", "zip", "geotiffs" + ) + shutil.rmtree("geotiffs") + + merged_zonal_stats_admin1 = self.get_zonal_stats_for_admin( + mode="prod", admin_level=1, band="SFED" + ) + merged_zonal_stats_admin2 = self.get_zonal_stats_for_admin( + mode="prod", admin_level=2, band="SFED" + ) + + with pd.ExcelWriter( + "files" + os.sep + "floodscan_readme.xlsx", + mode="a", + engine="openpyxl", + if_sheet_exists="replace", + ) as excel_merged_file: + merged_zonal_stats_admin1.to_excel( + excel_merged_file, sheet_name="admin1", index=False + ) + merged_zonal_stats_admin2.to_excel( + excel_merged_file, sheet_name="admin2", index=False + ) + + self.dataset_data[dataset_name] = [ + merged_zonal_stats_admin2.apply(lambda x: x.to_dict(), axis=1), + last90_days_file, + excel_merged_file, + ] + + self.created_date = datetime.today().date() + return [{"name": dataset_name}] + + def get_adm2_labels(self, df_adm2_90d, level): + admin_lookup = self.retriever.download_file( + url="admin_lookup.parquet", + account=self.configuration["account"], + container="polygon", + key=self.configuration["key"], + blob="admin_lookup.parquet", + ) + + df_parquet_labels = pd.read_parquet(admin_lookup) + + # %% + df_labels_adm2 = df_parquet_labels[df_parquet_labels.ADM_LEVEL == 2] + + df_fs_labelled = pd.merge( + df_adm2_90d, + df_labels_adm2, + left_on=["iso3", "pcode"], + right_on=["ISO3", f"ADM{level}_PCODE"], + how="left", + ) + cols_subset = [ + "iso3", + "ADM0_PCODE", + "ADM0_NAME", + "ADM1_PCODE", + "ADM1_NAME", + "ADM2_PCODE", + "ADM2_NAME", + "valid_date", + "value", + ] + + df_fs_labelled_subset = df_fs_labelled[cols_subset] + + countries = [] + for iso3 in df_fs_labelled_subset["iso3"]: + countries.append(Country.get_country_name_from_iso3(iso3)) + + df_fs_labelled_subset["ADM0_NAME"] = countries + + return df_fs_labelled_subset + + def get_zonal_stats_for_admin(self, mode, admin_level, band): + df_current = pg.fs_last_90_days( + mode=mode, admin_level=admin_level, band=band, only_HRP=True + ) + df_with_labels = self.get_adm2_labels(df_current, admin_level) + df_current = df_with_labels.rename( + columns={f"ADM{admin_level}_PCODE": "pcode"} + ) + df_yr_max = pg.fs_year_max( + mode=mode, admin_level=admin_level, band=band + ) + df_w_rps = rp.fs_add_rp( + df=df_current, df_maxima=df_yr_max, by=["iso3", "pcode"] + ) + df_w_rps = df_w_rps.rename(columns={"value": band}) + df_w_rps["doy"] = pd.to_datetime(df_w_rps["valid_date"]).dt.dayofyear + df_rolling_11_day_mean = pg.fs_rolling_11_day_mean( + mode=mode, admin_level=admin_level, band=band, only_HRP=True + ) + + df_rolling_11_day_mean.iso3 = df_rolling_11_day_mean.iso3.astype(str) + df_rolling_11_day_mean.pcode = df_rolling_11_day_mean.pcode.astype(str) + + df_w_rps.valid_date = df_w_rps.valid_date.astype(str) + df_w_rps.iso3 = df_w_rps.iso3.astype(str) + df_w_rps.pcode = df_w_rps.pcode.astype(str) + + merged_zonal_stats = df_w_rps.merge( + df_rolling_11_day_mean, on=["iso3", "pcode", "doy"], how="left" + ) + merged_zonal_stats = merged_zonal_stats.rename( + columns={"sfed_baseline": "SFED_BASELINE"} + ) + merged_zonal_stats = merged_zonal_stats.drop("doy", axis=1) + + return merged_zonal_stats + + def generate_dataset_and_showcase(self, dataset_name): + # Setting metadata and configurations + name = self.configuration["dataset_names"]["HDX-FLOODSCAN"] + title = self.configuration["title"] + dataset = Dataset({"name": slugify(name), "title": title}) + rows = self.dataset_data[dataset_name][0] + dataset.set_maintainer(self.configuration["maintainer_id"]) + dataset.set_organization(self.configuration["organization_id"]) + dataset.set_expected_update_frequency( + self.configuration["update_frequency"] + ) + dataset.set_subnational(False) + dataset["notes"] = self.configuration["notes"] + + resource_data = { + "name": self.configuration["stats_filename"], + "description": self.configuration["description_stats_file"], + } + + tags = sorted([t for t in self.configuration["allowed_tags"]]) + dataset.add_tags(tags) + + # Setting time period + start_date = self.start_date + ongoing = False + if not start_date: + logger.error(f"Start date missing for {dataset_name}") + return None, None + dataset.set_time_period(start_date, self.latest_date, ongoing) + + headers = rows[0].keys() + date_headers = [ + h + for h in headers + if "date" in h.lower() and type(rows[0][h]) == int + ] + for row in rows: + dataset.add_other_location(row["iso3"]) + for date_header in date_headers: + row_date = row[date_header] + if not row_date: + continue + if len(str(row_date)) > 9: + row_date = row_date / 1000 + row_date = datetime.utcfromtimestamp(row_date) + row_date = row_date.strftime("%Y-%m-%d") + row[date_header] = row_date + + rows + dataset.generate_resource_from_rows( + self.folder, + resource_data["name"], + rows, + resource_data, + list(rows[0].keys()), + encoding="utf-8", + ) + res = dataset.get_resource(0) + res["name"] = self.configuration["stats_filename"] + res["description"] = self.configuration["description_stats_file"] + res.set_file_to_upload(self.dataset_data[dataset_name][2]) + res.set_format("xlsx") + dataset.add_update_resource(res) + + resource_data = { + "name": self.configuration["90days_filename"], + "description": self.configuration["description_90days_file"], + } + + res = copy(dataset.get_resource(0)) + dataset.resources.append(res) + resource = dataset.get_resource(1) + resource.set_format("zipped geotiff") + resource["name"] = resource_data["name"] + resource["description"] = resource_data["description"] + resource.set_file_to_upload(self.dataset_data[dataset_name][1]) + dataset.add_update_resource(resource) + + return dataset + + def subset_band(self, da, band="SFED"): + long_name = np.array(da.attrs["long_name"]) + index_band = np.where(long_name == band)[0] + da_subset = da.isel(band=index_band) + da_subset.attrs["long_name"] = band + return da_subset + + def blob_client(self): + key = self.configuration["key"] + storage_account = self.configuration["account"] + account_url = f"https://{storage_account}.blob.core.windows.net" + return BlobServiceClient(account_url=account_url, credential=key) + + def _get_latest_90_days_geotiffs(self, account, container, key): + das = {} + existing_files = [ + x.name + for x in self.blob_client() + .get_container_client(self.configuration["container"]) + .list_blobs( + name_starts_with="floodscan/daily/v5/processed/aer_area" + ) + ] + + latest_available_file = sorted(existing_files)[-1] + search_str = "([0-9]{4}-[0-9]{2}-[0-9]{2})" + search_res = re.search(search_str, latest_available_file) + latest_available_date = datetime.strptime(search_res[0], "%Y-%m-%d") + dates = create_date_range(90, latest_available_date) + + for date in dates: + blob = f"floodscan/daily/v5/processed/aer_area_300s_v{date.strftime(DATE_FORMAT)}_v05r01.tif" + + if not os.path.isfile(blob): + try: + geotiff_file_for_date = self.retriever.download_file( + url=blob, + account=account, + container=container, + key=key, + blob=blob, + ) + shutil.move(geotiff_file_for_date, blob) + logger.info( + f"Moving downloaded blob {blob} to the cached folder." + ) + except Exception as e: + logger.error(f"Missing geotiff {blob}: {e}") + return None + + da_in = rxr.open_rasterio(blob, chunks="auto") + das[date] = da_in.sel({"band": 1}, drop=True) + + return das + + def _get_historical_baseline(self, account, container, key): + blob = self.configuration["baseline_filename"] + + if not os.path.isfile(blob): + historical_baseline_file = self.retriever.download_file( + url=blob, + account=account, + container=container, + key=key, + blob=blob, + ) + else: + historical_baseline_file = blob + + chunks = {"lat": 1080, "lon": 1080, "time": 1} + ds_historical_baseline = xr.open_dataset( + historical_baseline_file, chunks=chunks + ) + ds_historical_baseline = ds_historical_baseline.rename_vars( + {"__xarray_dataarray_variable__": "SFED_BASELINE"} + ) + + return ds_historical_baseline + + def _generate_zipped_file( + self, last90_days_geotiffs, ds_historical_baseline + ): + os.makedirs("geotiffs", exist_ok=True) + out_files = [] + + logger.info("Calculating baseline...") + for tif_date in last90_days_geotiffs: + da_current = last90_days_geotiffs[tif_date] + ds_historical_baseline = ds_historical_baseline.persist() + + dt_temp_str = tif_date.strftime("%Y%m%d") + doy_temp = int(tif_date.strftime("%j")) + h_sfed_temp = ds_historical_baseline.sel( + {"dayofyear": doy_temp}, drop=True + ) + ds_current_sfed = da_current.to_dataset(name="SFED") + + merged_temp = xr.merge( + [ds_current_sfed.SFED, h_sfed_temp.SFED_BASELINE], + combine_attrs="drop", + ) + merged_temp["SFED"] = merged_temp.SFED.rio.write_nodata( + np.nan, inplace=True + ) + merged_temp = merged_temp.rio.set_spatial_dims( + y_dim="y", x_dim="x" + ) + merged_temp = merged_temp.rio.write_crs(4326) + + # Save geotiff + out_file = f"geotiffs/{int(dt_temp_str)}_aer_floodscan_sfed.tif" + merged_temp.rio.to_raster(out_file, driver="COG") + out_files.append(out_file) + + logger.info( + f"Finished adding baseline geotiffs to {len(out_files)} files." + ) + + return out_files diff --git a/hdx_floodscan.py b/hdx_floodscan.py deleted file mode 100644 index bbb9a11..0000000 --- a/hdx_floodscan.py +++ /dev/null @@ -1,252 +0,0 @@ -#!/usr/bin/python -""" -HDX Pipeline: ------------- - -TODO -- Add summary about this dataset pipeline - -""" -import logging -import os -from copy import copy -from datetime import datetime, timezone -import pandas as pd -from hdx.data.dataset import Dataset -from slugify import slugify -import xarray as xr -import rioxarray as rxr -import numpy as np -import shutil - -from src.utils import pg -from src.utils import return_periods as rp - -from src.utils.date_utils import create_date_range, get_start_and_last_date_from_90_days - -logger = logging.getLogger(__name__) -DATE_FORMAT = "%Y-%m-%d" - - -class HDXFloodscan: - def __init__(self, configuration, retriever, folder, errors): - self.configuration = configuration - self.retriever = retriever - self.folder = folder - self.manual_url = None - self.dataset_data = {} - self.errors = errors - self.created_date = None - self.start_date = None - self.latest_date = None - - def get_data(self): - - try: - url = os.environ["BLOB_URL"] - account = os.environ["STORAGE_ACCOUNT"] - container = os.environ["CONTAINER"] - key = os.environ["KEY"] - except Exception: - url = self.configuration["url"] - account = self.configuration["account"] - container = self.configuration["container"] - key = self.configuration["key"] - - last90_days_filename = self.configuration["last90_days_filename"] - stats_filename = self.configuration["stats_filename"] - dataset_name = self.configuration["dataset_names"]["HDX-FLOODSCAN"] - - last90_days_files = self._get_latest_90_days_geotiffs(account, container, key) - historical_baseline = self._get_historical_baseline(account, container, key) - last90_days_file = self._generate_zipped_file(last90_days_files, historical_baseline) - - # Find the minimum and maximum dates - self.start_date, self.latest_date = get_start_and_last_date_from_90_days(last90_days_file) - - # Save all geotiffs as one zipped file - last90_days_file = shutil.make_archive("baseline_zipped_file", 'zip', "geotiffs") - shutil.rmtree("geotiffs") - - merged_zonal_stats_admin1 = self.get_zonal_stats_for_admin(mode="prod", admin_level=1, band='SFED') - merged_zonal_stats_admin2 = self.get_zonal_stats_for_admin(mode="prod", admin_level=2, band='SFED') - - with pd.ExcelWriter('hdx_floodscan_zonal_stats.xlsx') as excel_merged_file: - pd.DataFrame({'README': ['This is a placeholder']}).to_excel(excel_merged_file, sheet_name='readme') - merged_zonal_stats_admin1.to_excel(excel_merged_file, sheet_name='admin1') - merged_zonal_stats_admin2.to_excel(excel_merged_file, sheet_name='admin2') - self.dataset_data[dataset_name] = [merged_zonal_stats_admin1.apply(lambda x: x.to_dict(), axis=1), - last90_days_file, - excel_merged_file] - - self.created_date = datetime.today().date() - return [{"name": dataset_name}] - - def get_zonal_stats_for_admin(self, mode, admin_level, band): - df_current = pg.fs_last_90_days(mode=mode, admin_level=admin_level, band=band) - df_yr_max = pg.fs_year_max(mode=mode, admin_level=admin_level, band=band) - df_w_rps = rp.fs_add_rp( - df=df_current, df_maxima=df_yr_max, by=["iso3", "pcode"] - ) - df_w_rps = df_w_rps.rename(columns={'value': band}) - df_rolling_11_day_mean = pg.fs_rolling_11_day_mean( - mode=mode, admin_level=admin_level, band=band - ) - df_rolling_11_day_mean['valid_date'] = pd.to_datetime( - datetime.today().year * 1000 + df_rolling_11_day_mean['doy'], format='%Y%j').dt.strftime(DATE_FORMAT) - df_rolling_11_day_mean = df_rolling_11_day_mean.drop('doy', axis=1) - df_rolling_11_day_mean.valid_date = df_rolling_11_day_mean.valid_date.astype(str) - df_rolling_11_day_mean.iso3 = df_rolling_11_day_mean.iso3.astype(str) - df_rolling_11_day_mean.pcode = df_rolling_11_day_mean.pcode.astype(str) - - df_w_rps.valid_date = df_w_rps.valid_date.astype(str) - df_w_rps.iso3 = df_w_rps.iso3.astype(str) - df_w_rps.pcode = df_w_rps.pcode.astype(str) - - merged_zonal_stats = df_w_rps.merge(df_rolling_11_day_mean, on=["iso3", "pcode", "valid_date"], how="left") - merged_zonal_stats = merged_zonal_stats.rename(columns={'sfed_baseline': "SFED_BASELINE"}) - - return merged_zonal_stats - - def generate_dataset_and_showcase(self, dataset_name): - - # Setting metadata and configurations - name = self.configuration["dataset_names"]["HDX-FLOODSCAN"] - title = self.configuration["title"] - update_frequency = self.configuration["update_frequency"] - dataset = Dataset({"name": slugify(name), "title": title}) - rows = self.dataset_data[dataset_name][0] - dataset.set_maintainer(self.configuration["maintainer_id"]) - dataset.set_organization(self.configuration["organization_id"]) - dataset.set_expected_update_frequency(update_frequency) - dataset.set_subnational(False) - dataset.add_other_location("world") - dataset["notes"] = self.configuration["notes"] - filename = "hdx_floodscan_zonal_stats.xlsx" - resource_data = {"name": filename, - "description": self.configuration["description_stats_file"]} - tags = sorted([t for t in self.configuration["allowed_tags"]]) - dataset.add_tags(tags) - - # Setting time period - start_date = self.start_date - ongoing = False - if not start_date: - logger.error(f"Start date missing for {dataset_name}") - return None, None - dataset.set_time_period(start_date, self.latest_date, ongoing) - - headers = rows[0].keys() - date_headers = [h for h in headers if "date" in h.lower() and type(rows[0][h]) == int] - for row in rows: - for date_header in date_headers: - row_date = row[date_header] - if not row_date: - continue - if len(str(row_date)) > 9: - row_date = row_date / 1000 - row_date = datetime.utcfromtimestamp(row_date) - row_date = row_date.strftime("%Y-%m-%d") - row[date_header] = row_date - - rows - dataset.generate_resource_from_rows( - self.folder, - filename, - rows, - resource_data, - list(rows[0].keys()), - encoding='utf-8' - ) - res = dataset.get_resource(0) - res['name'] = "hdx_floodscan_zonal_stats.xlsx" - res['description'] = self.configuration["description_stats_file"] - res.set_file_to_upload(self.dataset_data[dataset_name][2]) - res.set_format("xlsx") - dataset.add_update_resource(res) - - second_filename = "aer_floodscan_300s_SFED_90d.zip" - resource_data = {"name": second_filename, - "description": self.configuration["description_90days_file"]} - - res = copy(dataset.get_resource(0)) - dataset.resources.append(res) - resource = dataset.get_resource(1) - resource.set_format("zipped geotiff") - resource['name'] = resource_data['name'] - resource['description'] = resource_data['description'] - resource.set_file_to_upload(self.dataset_data[dataset_name][1]) - dataset.add_update_resource(resource) - - return dataset - - def _get_latest_90_days_geotiffs(self, account, container, key): - - das = {} - # TODO add back yesterday = datetime.today() - pd.DateOffset(days=1) - yesterday = datetime.strptime("2024-01-01", "%Y-%m-%d") - dates = create_date_range(90, yesterday) - - for date in dates: - blob = f"floodscan/daily/v5/processed/aer_area_300s_v{date.strftime(DATE_FORMAT)}_v05r01.tif" - - geotiff_file_for_date = self.retriever.download_file( - url=blob, - account=account, - container=container, - key=key, - blob=blob) - da_in = rxr.open_rasterio(geotiff_file_for_date, chunks="auto") - das[date] = da_in.sel({'band' : 1}, drop=True) - - return das - - def _get_historical_baseline(self, account, container, key): - - blob = f"floodscan/daily/v5/raw/baseline_v2024-01-31_v05r01.nc4" - historical_baseline_file = self.retriever.download_file( - url=blob, - account=account, - container=container, - key=key, - blob=blob) - - chunks = {"lat": 1080, "lon": 1080, "time": 1} - ds_historical_baseline = xr.open_dataset(historical_baseline_file, chunks=chunks) - ds_historical_baseline = ds_historical_baseline.rename_vars( - {"__xarray_dataarray_variable__": "SFED_BASELINE"} - ) - - return ds_historical_baseline - - def _generate_zipped_file(self, last90_days_geotiffs, ds_historical_baseline): - - os.makedirs('geotiffs', exist_ok=True) - - out_files = [] - for tif_date in last90_days_geotiffs: - da_current = last90_days_geotiffs[tif_date] - ds_historical_baseline = ds_historical_baseline.persist() - - dt_temp_str = tif_date.strftime("%Y%m%d") - doy_temp = int(tif_date.strftime('%j')) - h_sfed_temp = ds_historical_baseline.sel({"dayofyear": doy_temp}, drop=True) - - ds_current_sfed = da_current.to_dataset(name="SFED") - - merged_temp = xr.merge([ds_current_sfed.SFED, h_sfed_temp.SFED_BASELINE], combine_attrs="drop") - merged_temp['SFED'] = merged_temp.SFED.rio.write_nodata(np.nan, inplace=True) - merged_temp = merged_temp.rio.set_spatial_dims(y_dim="y", x_dim="x") - merged_temp = merged_temp.rio.write_crs(4326) - - out_file = f"geotiffs/{int(dt_temp_str)}_aer_floodscan_sfed.tif" - - # Save geotiff - merged_temp.rio.to_raster(out_file, driver="COG") - out_files.append(out_file) - - logger.info("Finished adding baseline geotiffs!") - return out_files - - - diff --git a/run.py b/run.py index 911130d..c2918d4 100644 --- a/run.py +++ b/run.py @@ -3,46 +3,46 @@ Top level script. Calls other functions that generate datasets that this script then creates in HDX. """ +import base64 +import hashlib +import hmac import logging -from os.path import expanduser, join, exists +from datetime import datetime +from os.path import exists, expanduser, join + +from hdx.data.hdxobject import HDXError +from hdx.facades.infer_arguments import facade from hdx.utilities.downloader import Download from hdx.utilities.errors_onexit import ErrorsOnExit -from hdx.utilities.path import progress_storing_folder, wheretostart_tempdir_batch +from hdx.utilities.path import ( + progress_storing_folder, + wheretostart_tempdir_batch, +) from hdx.utilities.retriever import Retrieve -from hdx.data.hdxobject import HDXError -from datetime import datetime -import hmac -import hashlib -import base64 -from hdx_floodscan import HDXFloodscan + +from floodscan import Floodscan """Facade to simplify project setup that calls project main function with kwargs""" -import sys -from inspect import getdoc from typing import Any, Callable, Optional # noqa: F401 -import defopt -from makefun import with_signature -from hdx.api import __version__ + from hdx.api.configuration import Configuration -from hdx.utilities.useragent import UserAgent logger = logging.getLogger(__name__) -lookup = "hdx-floodscan" -updated_by_script = "HDX Scraper: HDX FloodScan" +lookup = "floodscan" +updated_by_script = "HDX Scraper: FloodScan" class AzureBlobDownload(Download): - def download_file( - self, - url: str, - account: str, - container: str, - key: str, - blob: None, - **kwargs: Any, + self, + url: str, + account: str, + container: str, + key: str, + blob: None, + **kwargs: Any, ) -> str: """Download file from blob storage and store in provided folder or temporary folder if no folder supplied. @@ -74,50 +74,85 @@ def download_file( overwrite = kwargs.get("overwrite", False) keep = kwargs.get("keep", False) - request_time = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') - api_version = '2018-03-28' + request_time = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT") + api_version = "2018-03-28" parameters = { - 'verb': 'GET', - 'Content-Encoding': '', - 'Content-Language': '', - 'Content-Length': '', - 'Content-MD5': '', - 'Content-Type': '', - 'Date': '', - 'If-Modified-Since': '', - 'If-Match': '', - 'If-None-Match': '', - 'If-Unmodified-Since': '', - 'Range': '', - 'CanonicalizedHeaders': 'x-ms-date:' + request_time + '\nx-ms-version:' + api_version + '\n', - 'CanonicalizedResource': '/' + account + '/' + container + '/' + blob + "verb": "GET", + "Content-Encoding": "", + "Content-Language": "", + "Content-Length": "", + "Content-MD5": "", + "Content-Type": "", + "Date": "", + "If-Modified-Since": "", + "If-Match": "", + "If-None-Match": "", + "If-Unmodified-Since": "", + "Range": "", + "CanonicalizedHeaders": "x-ms-date:" + + request_time + + "\nx-ms-version:" + + api_version + + "\n", + "CanonicalizedResource": "/" + + account + + "/" + + container + + "/" + + blob, } - signature = (parameters['verb'] + '\n' - + parameters['Content-Encoding'] + '\n' - + parameters['Content-Language'] + '\n' - + parameters['Content-Length'] + '\n' - + parameters['Content-MD5'] + '\n' - + parameters['Content-Type'] + '\n' - + parameters['Date'] + '\n' - + parameters['If-Modified-Since'] + '\n' - + parameters['If-Match'] + '\n' - + parameters['If-None-Match'] + '\n' - + parameters['If-Unmodified-Since'] + '\n' - + parameters['Range'] + '\n' - + parameters['CanonicalizedHeaders'] - + parameters['CanonicalizedResource']) + signature = ( + parameters["verb"] + + "\n" + + parameters["Content-Encoding"] + + "\n" + + parameters["Content-Language"] + + "\n" + + parameters["Content-Length"] + + "\n" + + parameters["Content-MD5"] + + "\n" + + parameters["Content-Type"] + + "\n" + + parameters["Date"] + + "\n" + + parameters["If-Modified-Since"] + + "\n" + + parameters["If-Match"] + + "\n" + + parameters["If-None-Match"] + + "\n" + + parameters["If-Unmodified-Since"] + + "\n" + + parameters["Range"] + + "\n" + + parameters["CanonicalizedHeaders"] + + parameters["CanonicalizedResource"] + ) - signed_string = base64.b64encode(hmac.new(base64.b64decode(key), msg=signature.encode('utf-8'), - digestmod=hashlib.sha256).digest()).decode() + signed_string = base64.b64encode( + hmac.new( + base64.b64decode(key), + msg=signature.encode("utf-8"), + digestmod=hashlib.sha256, + ).digest() + ).decode() headers = { - 'x-ms-date': request_time, - 'x-ms-version': api_version, - 'Authorization': ('SharedKey ' + account + ':' + signed_string) + "x-ms-date": request_time, + "x-ms-version": api_version, + "Authorization": ("SharedKey " + account + ":" + signed_string), } - url = ('https://' + account + '.blob.core.windows.net/' + container + '/' + blob) + url = ( + "https://" + + account + + ".blob.core.windows.net/" + + container + + "/" + + blob + ) if keep and exists(url): print(f"The blob URL exists: {url}") @@ -135,99 +170,6 @@ def download_file( path, f"Download of {url} failed in retrieval of stream!" ) - # TODO Improve logic around facade -def facade(projectmainfn: Callable[[Any], None], **kwargs: Any): - """Facade to simplify project setup that calls project main function. It infers - command line arguments from the passed in function using defopt. The function passed - in should have type hints and a docstring from which to infer the command line - arguments. Any **kwargs given will be merged with command line arguments, with the - command line arguments taking precedence. - - Args: - projectmainfn ((Any) -> None): main function of project - **kwargs: Configuration parameters to pass to HDX Configuration & other parameters to pass to main function - - Returns: - None - """ - - # - # Setting up configuration - # - - parsed_main_doc = defopt._parse_docstring(getdoc(projectmainfn)) - main_doc = [f"{parsed_main_doc.first_line}\n\nArgs:"] - no_main_params = len(parsed_main_doc.params) - for param_name, param_info in parsed_main_doc.params.items(): - main_doc.append( - f"\n {param_name} ({param_info.type}): {param_info.text}" - ) - create_config_doc = getdoc(Configuration.create) - kwargs_index = create_config_doc.index("**kwargs") - kwargs_index = create_config_doc.index("\n", kwargs_index) - args_doc = create_config_doc[kwargs_index:] - main_doc.append(args_doc) - main_doc = "".join(main_doc) - - main_sig = defopt.signature(projectmainfn) - param_names = [] - for param in main_sig.parameters.values(): - param_names.append(str(param)) - - parsed_main_doc = defopt._parse_docstring(main_doc) - main_doc = [f"{parsed_main_doc.first_line}\n\nArgs:"] - count = 0 - for param_name, param_info in parsed_main_doc.params.items(): - param_type = param_info.type - if param_type == "dict": - continue - if count < no_main_params: - count += 1 - else: - if param_type == "str": - param_type = "Optional[str]" - default = None - elif param_type == "bool": - default = False - else: - raise ValueError( - "Configuration.create has new parameter with unknown type!" - ) - param_names.append(f"{param_name}: {param_type} = {default}") - main_doc.append( - f"\n {param_name} ({param_type}): {param_info.text}" - ) - main_doc = "".join(main_doc) - - projectmainname = projectmainfn.__name__ - main_sig = f"{projectmainname}({','.join(param_names)})" - - argv = sys.argv[1:] - for key in kwargs: - name = f"--{key.replace('_', '-')}" - if name not in argv: - argv.append(name) - argv.append(kwargs[key]) - - @with_signature(main_sig) - def gen_func(*args, **kwargs): - """docstring""" - site_url = Configuration._create(*args, **kwargs) - logger.info("--------------------------------------------------") - logger.info(f"> Using HDX Python API Library {__version__}") - logger.info(f"> HDX Site: {site_url}") - - gen_func.__doc__ = main_doc - - configuration_create = defopt.bind(gen_func, argv=argv, cli_options="all") - main_func, _ = defopt.bind_known( - projectmainfn, argv=argv, cli_options="all" - ) - - configuration_create() - UserAgent.user_agent = Configuration.read().user_agent - main_func() - def main(save: bool = False, use_saved: bool = False) -> None: """Generate datasets and create them in HDX""" @@ -241,13 +183,19 @@ def main(save: bool = False, use_saved: bool = False) -> None: folder = info["folder"] batch = info["batch"] configuration = Configuration.read() - floodscan = HDXFloodscan(configuration, retriever, folder, errors) + floodscan = Floodscan(configuration, retriever, folder, errors) dataset_names = floodscan.get_data() - logger.info(f"Number of datasets to upload: {len(dataset_names)}") + logger.info( + f"Number of datasets to upload: {len(dataset_names)}" + ) - for _, nextdict in progress_storing_folder(info, dataset_names, "name"): + for _, nextdict in progress_storing_folder( + info, dataset_names, "name" + ): dataset_name = nextdict["name"] - dataset = floodscan.generate_dataset_and_showcase(dataset_name=dataset_name) + dataset = floodscan.generate_dataset_and_showcase( + dataset_name=dataset_name + ) if dataset: dataset.update_from_yaml() dataset["notes"] = dataset["notes"].replace( @@ -259,22 +207,25 @@ def main(save: bool = False, use_saved: bool = False) -> None: hxl_update=False, updated_by_script=updated_by_script, batch=batch, - ignore_fields=["resource:description", "extras"], + ignore_fields=[ + "resource:description", + "extras", + ], ) except HDXError as err: - errors.add(f"Could not upload {dataset_name}: {err}") + errors.add( + f"Could not upload {dataset_name}: {err}" + ) continue - if __name__ == "__main__": + print() logging.basicConfig() logging.getLogger().setLevel(logging.INFO) facade( main, - user_agent_config_yaml="/Workspace/Shared/init-scripts/.useragents.yaml", - hdx_config_yaml="/Workspace/Shared/init-scripts/.hdx_configuration.yaml", + user_agent_config_yaml=join(expanduser("~"), ".useragents.yaml"), user_agent_lookup=lookup, project_config_yaml=join("config", "project_configuration.yaml"), - - ) \ No newline at end of file + ) diff --git a/src/datasources/floodscan.py b/src/datasources/floodscan.py index ed3ec49..4c63623 100644 --- a/src/datasources/floodscan.py +++ b/src/datasources/floodscan.py @@ -7,7 +7,6 @@ from src.utils import cloud_utils, cog_utils, date_utils - DATA_DIR_GDRIVE = Path(os.getenv("AA_DATA_DIR_NEW")) FP_FS_HISTORICAL = ( DATA_DIR_GDRIVE diff --git a/src/utils/cloud_utils.py b/src/utils/cloud_utils.py index 50d1eee..54f85af 100644 --- a/src/utils/cloud_utils.py +++ b/src/utils/cloud_utils.py @@ -1,5 +1,5 @@ -import os import logging +import os from io import BytesIO from azure.core.exceptions import ResourceNotFoundError @@ -101,7 +101,9 @@ def download_from_azure( with open(local_file_path, "wb") as download_file: download_file.write(blob_client.download_blob().readall()) - logger.info(f"Successfully downloaded blob {blob_path} to {local_file_path}") + logger.info( + f"Successfully downloaded blob {blob_path} to {local_file_path}" + ) return True except ResourceNotFoundError: @@ -109,5 +111,7 @@ def download_from_azure( return False except Exception as e: - logger.error(f"An error occurred while downloading {blob_path}: {str(e)}") - return False \ No newline at end of file + logger.error( + f"An error occurred while downloading {blob_path}: {str(e)}" + ) + return False diff --git a/src/utils/date_utils.py b/src/utils/date_utils.py index 5fbf8eb..77c50b3 100644 --- a/src/utils/date_utils.py +++ b/src/utils/date_utils.py @@ -1,5 +1,4 @@ import argparse - import re from datetime import datetime, timedelta from zipfile import ZipFile @@ -19,6 +18,7 @@ def date_to_run(date=None): ret = datetime.today() - timedelta(days=5) # .date() return ret + def create_date_range(days, last_date): """ Method to create a range of dates fof the past N days. @@ -51,11 +51,16 @@ def get_start_and_last_date_from_90_days_file(zipped_file): oldest = min(filenames) end_date_str = re.search(search_str, newest) start_date_str = re.search(search_str, oldest) - return datetime.strptime(start_date_str[0], "%Y%m%d"), datetime.strptime(end_date_str[0], "%Y%m%d") + return datetime.strptime( + start_date_str[0], "%Y%m%d" + ), datetime.strptime(end_date_str[0], "%Y%m%d") + def get_start_and_last_date_from_90_days(files): search_str = "([0-9]{4}[0-9]{2}[0-9]{2})" end_date_str = re.search(search_str, max(files)) start_date_str = re.search(search_str, min(files)) - return datetime.strptime(start_date_str[0], "%Y%m%d"), datetime.strptime(end_date_str[0], "%Y%m%d") \ No newline at end of file + return datetime.strptime(start_date_str[0], "%Y%m%d"), datetime.strptime( + end_date_str[0], "%Y%m%d" + ) diff --git a/src/utils/pg.py b/src/utils/pg.py index b0203a3..c5be190 100644 --- a/src/utils/pg.py +++ b/src/utils/pg.py @@ -34,9 +34,15 @@ def fs_year_max(mode, admin_level, band="SFED"): return pd.read_sql(sql=query_yr_max, con=engine) -def fs_rolling_11_day_mean(mode, admin_level, band="SFED"): +def fs_rolling_11_day_mean(mode, admin_level, band="SFED", only_HRP=False): engine = get_engine(mode) + only_HRP_clause = ( + f" AND iso3 IN (SELECT iso3 FROM iso3 WHERE has_active_hrp=true)" + if only_HRP + else None + ) + query_rolling_mean = f""" WITH filtered_data AS ( SELECT iso3, pcode, valid_date, mean @@ -44,7 +50,7 @@ def fs_rolling_11_day_mean(mode, admin_level, band="SFED"): WHERE adm_level = {admin_level} AND band = '{band}' AND valid_date >= DATE_TRUNC('year', NOW()) - INTERVAL '10 years' - AND valid_date < DATE_TRUNC('year', NOW()) + AND valid_date < DATE_TRUNC('year', NOW()) {only_HRP_clause} ), rolling_mean AS ( SELECT iso3, pcode, valid_date, @@ -60,10 +66,11 @@ def fs_rolling_11_day_mean(mode, admin_level, band="SFED"): ) SELECT * FROM doy_mean """ # noqa: E202 E231 + return pd.read_sql(sql=query_rolling_mean, con=engine) -def fs_last_90_days(mode, admin_level, band="SFED"): +def fs_last_90_days(mode, admin_level, band="SFED", only_HRP=False): engine = get_engine(mode) query_last_90_days = f""" @@ -73,6 +80,10 @@ def fs_last_90_days(mode, admin_level, band="SFED"): AND band = '{band}' AND valid_date >= NOW() - INTERVAL '90 days' """ + if only_HRP: + query_last_90_days += ( + f" AND iso3 IN (SELECT iso3 FROM iso3 WHERE has_active_hrp=true)" + ) return pd.read_sql(sql=query_last_90_days, con=engine)