From 25f0f8083b94106eeb1cad510c56647364e4067e Mon Sep 17 00:00:00 2001 From: jdegerickx Date: Wed, 18 Dec 2024 17:34:24 +0100 Subject: [PATCH 1/8] introduce legend functionality + example script --- scripts/misc/legend.py | 29 +++++++ src/worldcereal/utils/legend.py | 134 ++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 scripts/misc/legend.py create mode 100644 src/worldcereal/utils/legend.py diff --git a/scripts/misc/legend.py b/scripts/misc/legend.py new file mode 100644 index 0000000..00c1485 --- /dev/null +++ b/scripts/misc/legend.py @@ -0,0 +1,29 @@ +""" +Example script showing how to upload, download and delete the WorldCereal crop type legend from Artifactory. +""" + +from pathlib import Path + +from worldcereal.utils.legend import ( + delete_legend_file, + download_latest_legend_from_artifactory, + upload_legend_csv_artifactory, +) + +if __name__ == "__main__": + + # Example usage + srcpath = Path( + "/vitodata/worldcereal/data/legend/WorldCereal_LC_CT_legend_20241216.csv" + ) + date = "20241216" + download_path = Path("/vitodata/worldcereal/data/legend_v2") + + # Upload the legend to Artifactory + link = upload_legend_csv_artifactory(srcpath, date) + + # Download the latest legend from Artifactory + download_latest_legend_from_artifactory(download_path) + + # Delete the uploaded legend from Artifactory + delete_legend_file(link) diff --git a/src/worldcereal/utils/legend.py b/src/worldcereal/utils/legend.py new file mode 100644 index 0000000..3687a2a --- /dev/null +++ b/src/worldcereal/utils/legend.py @@ -0,0 +1,134 @@ +import json +import os +import subprocess +from pathlib import Path + +from loguru import logger + +ARTIFACTORY_BASE_URL = ( + "https://artifactory.vgt.vito.be/artifactory/auxdata-public/worldcereal/" +) + + +def _get_artifactory_credentials(): + """Get credentials for upload and delete operations on Artifactory. + Returns + ------- + tuple (str, str) + Tuple containing the Artifactory username and password. + Raises + ------ + ValueError + if ARTIFACTORY_USERNAME or ARTIFACTORY_PASSWORD are not set as environment variables. + """ + + artifactory_username = os.getenv("ARTIFACTORY_USERNAME") + artifactory_password = os.getenv("ARTIFACTORY_PASSWORD") + + if not artifactory_username or not artifactory_password: + raise ValueError( + "Artifactory credentials not found. " + "Please set ARTIFACTORY_USERNAME and ARTIFACTORY_PASSWORD environment variables." + ) + + return artifactory_username, artifactory_password + + +def upload_legend_csv_artifactory( + srcpath: Path, + date: str, +) -> str: + """Uploads a CSV file containing the worldcereal land cover/crop type legend to Artifactory. + Parameters + ---------- + srcpath : Path + Path to csv file that needs to be uploaded to Artifactory. + date : str + Date tag to be added to the filename. Should be in format YYYYMMDD. + Returns + ------- + str + artifactory download link + Raises + ------ + FileNotFoundError + if srcpath does not exist + """ + if not srcpath.is_file(): + raise FileNotFoundError(f"Required file `{srcpath}` not found.") + + # Get Artifactory credentials + artifactory_username, artifactory_password = _get_artifactory_credentials() + + # We upload the file with a specific date tag and also with a "latest" tag + target_names = [ + f"WorldCereal_LC_CT_legend_{date}.csv", + "WorldCereal_LC_CT_legend_latest.csv", + ] + targetpaths = [f"{ARTIFACTORY_BASE_URL}legend/{n}" for n in target_names] + + for targetpath in targetpaths: + logger.info(f"Uploading `{srcpath}` to `{targetpath}`") + + cmd = ( + f"curl -u{artifactory_username}:{artifactory_password} -T {srcpath} " + f'"{targetpath}"' + ) + + output, _ = subprocess.Popen( + cmd, stdout=subprocess.PIPE, shell=True + ).communicate() + decoded_output = output.decode("utf-8") + + # Parse as JSON if applicable + try: + parsed_output = json.loads(decoded_output) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse output as JSON: {decoded_output}") from e + + # Access the desired value + return parsed_output.get("downloadUri") + + +def download_latest_legend_from_artifactory(download_path: Path) -> Path: + """Downloads the latest version of the WorldCereal land cover/crop type legend from Artifactory. + Parameters + ---------- + download_path : Path + Folder where the legend needs to be downloaded to. + Returns + ------- + Path + Path to the downloaded legend file. + Raises + ------ + FileNotFoundError + Raises if no legend files are found in Artifactory. + """ + latest_file = "WorldCereal_LC_CT_legend_latest.csv" + link = f"{ARTIFACTORY_BASE_URL}legend/{latest_file}" + + logger.info(f"Downloading latest legend file: {latest_file}") + + download_path.mkdir(parents=True, exist_ok=True) + download_file = download_path / latest_file + + cmd = f'curl -o {download_file} "{link}"' + + subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() + + return download_file + + +def delete_legend_file(path: str) -> None: + """Deletes a legend file from Artifactory. + Parameters + ---------- + path : str + Path to the legend file in Artifactory. + """ + # Get Artifactory credentials + artifactory_username, artifactory_password = _get_artifactory_credentials() + + cmd = f"curl -u{artifactory_username}:{artifactory_password} -X DELETE {path}" + subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() From 692f95407b0427e78ef3500c668b9b2a26ad7e83 Mon Sep 17 00:00:00 2001 From: jdegerickx Date: Mon, 6 Jan 2025 12:58:42 +0100 Subject: [PATCH 2/8] added functionality to get the legend as pandas dataframe --- src/worldcereal/utils/legend.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/worldcereal/utils/legend.py b/src/worldcereal/utils/legend.py index 3687a2a..15089ec 100644 --- a/src/worldcereal/utils/legend.py +++ b/src/worldcereal/utils/legend.py @@ -1,8 +1,10 @@ import json import os import subprocess +import tempfile from pathlib import Path +import pandas as pd from loguru import logger ARTIFACTORY_BASE_URL = ( @@ -90,8 +92,34 @@ def upload_legend_csv_artifactory( return parsed_output.get("downloadUri") -def download_latest_legend_from_artifactory(download_path: Path) -> Path: - """Downloads the latest version of the WorldCereal land cover/crop type legend from Artifactory. +def get_latest_legend_from_artifactory() -> pd.DataFrame: + """Get the latest version of the WorldCereal land cover/crop type legend from Artifactory + as a Pandas Dataframe. + + Returns + ------- + pd.DataFrame + The WorldCereal land cover/crop type legend. + """ + # create temporary folder + with tempfile.TemporaryDirectory() as tmpdirname: + tmpdir = Path(tmpdirname) + # download the latest legend file + legend_path = _download_latest_legend_from_artifactory(tmpdir) + # read the legend file + legend = pd.read_csv(legend_path, header=0, sep=";") + + # clean up the legend file + legend = legend[legend["ewoc_code"].notna()] + drop_columns = [c for c in legend.columns if "Unnamed:" in c] + legend.drop(columns=drop_columns, inplace=True) + + return legend + + +def _download_latest_legend_from_artifactory(download_path: Path) -> Path: + """Downloads the latest version of the WorldCereal land cover/crop type legend from Artifactory + to a specified file path. Parameters ---------- download_path : Path From f235194424306c69b29eeaabe2bda2bd7a2f5cda Mon Sep 17 00:00:00 2001 From: jdegerickx Date: Mon, 6 Jan 2025 13:36:31 +0100 Subject: [PATCH 3/8] added retries to upload/download/delete --- src/worldcereal/utils/legend.py | 147 ++++++++++++++++++++++++++------ 1 file changed, 120 insertions(+), 27 deletions(-) diff --git a/src/worldcereal/utils/legend.py b/src/worldcereal/utils/legend.py index 15089ec..e0a9da8 100644 --- a/src/worldcereal/utils/legend.py +++ b/src/worldcereal/utils/legend.py @@ -2,6 +2,7 @@ import os import subprocess import tempfile +import time from pathlib import Path import pandas as pd @@ -36,6 +37,66 @@ def _get_artifactory_credentials(): return artifactory_username, artifactory_password +def _upload_with_retries( + srcpath, + targetpath, + username, + password, + retries=3, + wait=2, +) -> str: + """_summary_ + + Parameters + ---------- + srcpath : Path + Path to csv file that needs to be uploaded to Artifactory. + targetpath : str + Full link to the target location in Artifactory. + username : str + Artifactory username. + password : str + Artifactory password. + retries : int, optional + Number of retries, by default 3 + wait : int, optional + Seconds to wait in between retries, by default 2 + + Returns + ------- + str + Full link to the target location in Artifactory. + """ + # construct the curl command + cmd = f"curl -u{username}:{password} -T {srcpath} " f'"{targetpath}"' + + # execute the command with retries + for attempt in range(retries): + try: + logger.info( + f"Uploading `{srcpath}` to `{targetpath}` (Attempt {attempt + 1})" + ) + + output, _ = subprocess.Popen( + cmd, stdout=subprocess.PIPE, shell=True + ).communicate() + decoded_output = output.decode("utf-8") + + # Parse as JSON if applicable + parsed_output = json.loads(decoded_output) + logger.info("Upload successful") + return parsed_output.get("downloadUri") + + except (subprocess.CalledProcessError, json.JSONDecodeError) as e: + logger.error(f"Attempt {attempt + 1} failed: {e}") + if attempt < retries - 1: + time.sleep(wait) + else: + raise + + raise RuntimeError("Failed to upload file") + + def upload_legend_csv_artifactory( srcpath: Path, date: str, @@ -70,26 +131,12 @@ def upload_legend_csv_artifactory( targetpaths = [f"{ARTIFACTORY_BASE_URL}legend/{n}" for n in target_names] for targetpath in targetpaths: - logger.info(f"Uploading `{srcpath}` to `{targetpath}`") - - cmd = ( - f"curl -u{artifactory_username}:{artifactory_password} -T {srcpath} " - f'"{targetpath}"' + artifactory_link = _upload_with_retries( + srcpath, targetpath, artifactory_username, artifactory_password ) - output, _ = subprocess.Popen( - cmd, stdout=subprocess.PIPE, shell=True - ).communicate() - decoded_output = output.decode("utf-8") - - # Parse as JSON if applicable - try: - parsed_output = json.loads(decoded_output) - except json.JSONDecodeError as e: - raise ValueError(f"Failed to parse output as JSON: {decoded_output}") from e - - # Access the desired value - return parsed_output.get("downloadUri") + # Return the download link of latest uploaded file + return artifactory_link def get_latest_legend_from_artifactory() -> pd.DataFrame: @@ -117,13 +164,21 @@ def get_latest_legend_from_artifactory() -> pd.DataFrame: return legend -def _download_latest_legend_from_artifactory(download_path: Path) -> Path: +def _download_latest_legend_from_artifactory( + download_path: Path, + retries=3, + wait=2, +) -> Path: """Downloads the latest version of the WorldCereal land cover/crop type legend from Artifactory to a specified file path. Parameters ---------- download_path : Path Folder where the legend needs to be downloaded to. + retries : int, optional + Number of retries, by default 3 + wait : int, optional + Seconds to wait in between retries, by default 2 Returns ------- Path @@ -133,30 +188,68 @@ def _download_latest_legend_from_artifactory(download_path: Path) -> Path: FileNotFoundError Raises if no legend files are found in Artifactory. """ + # Construct the download link and curl command latest_file = "WorldCereal_LC_CT_legend_latest.csv" link = f"{ARTIFACTORY_BASE_URL}legend/{latest_file}" - - logger.info(f"Downloading latest legend file: {latest_file}") - download_path.mkdir(parents=True, exist_ok=True) download_file = download_path / latest_file - cmd = f'curl -o {download_file} "{link}"' - subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() + for attempt in range(retries): + try: + logger.info( + f"Downloading latest legend file: {latest_file} (Attempt {attempt + 1})" + ) + subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() + logger.info("Download successful!") + + return download_file - return download_file + except subprocess.CalledProcessError as e: + logger.error(f"Attempt {attempt + 1} failed: {e}") + if attempt < retries - 1: + time.sleep(wait) + else: + raise + raise RuntimeError("Failed to download file") -def delete_legend_file(path: str) -> None: + +def delete_legend_file( + path: str, + retries=3, + wait=2, +) -> None: """Deletes a legend file from Artifactory. Parameters ---------- path : str Path to the legend file in Artifactory. + retries : int, optional + Number of retries, by default 3 + wait : int, optional + Seconds to wait in between retries, by default 2 """ # Get Artifactory credentials artifactory_username, artifactory_password = _get_artifactory_credentials() + # construct the curl command cmd = f"curl -u{artifactory_username}:{artifactory_password} -X DELETE {path}" - subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() + + # execute the command with retries + for attempt in range(retries): + try: + logger.info(f"Deleting legend file: {path} (Attempt {attempt + 1})") + subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() + + logger.info("Deletion successful") + return + + except subprocess.CalledProcessError as e: + logger.error(f"Attempt {attempt + 1} failed: {e}") + if attempt < retries - 1: + time.sleep(wait) + else: + raise + + raise RuntimeError("Failed to delete file") From c559ae188d53c9e74892b00bde3f3676b21a3f57 Mon Sep 17 00:00:00 2001 From: jdegerickx Date: Mon, 6 Jan 2025 13:36:48 +0100 Subject: [PATCH 4/8] updated example script --- scripts/misc/legend.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/misc/legend.py b/scripts/misc/legend.py index 00c1485..bb40b80 100644 --- a/scripts/misc/legend.py +++ b/scripts/misc/legend.py @@ -6,7 +6,7 @@ from worldcereal.utils.legend import ( delete_legend_file, - download_latest_legend_from_artifactory, + get_latest_legend_from_artifactory, upload_legend_csv_artifactory, ) @@ -14,16 +14,15 @@ # Example usage srcpath = Path( - "/vitodata/worldcereal/data/legend/WorldCereal_LC_CT_legend_20241216.csv" + "/vitodata/worldcereal/data/legend/WorldCereal_LC_CT_legend_20241231.csv" ) - date = "20241216" - download_path = Path("/vitodata/worldcereal/data/legend_v2") + date = "20241231" # Upload the legend to Artifactory link = upload_legend_csv_artifactory(srcpath, date) # Download the latest legend from Artifactory - download_latest_legend_from_artifactory(download_path) + legend = get_latest_legend_from_artifactory() # Delete the uploaded legend from Artifactory delete_legend_file(link) From 7a5cc6b86c967b55df6183f1adbfff6aab09a069 Mon Sep 17 00:00:00 2001 From: jdegerickx Date: Tue, 7 Jan 2025 10:44:44 +0100 Subject: [PATCH 5/8] renaming function names and paths --- scripts/misc/legend.py | 10 ++----- src/worldcereal/utils/legend.py | 50 ++++++++++++++++----------------- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/scripts/misc/legend.py b/scripts/misc/legend.py index bb40b80..342dc0b 100644 --- a/scripts/misc/legend.py +++ b/scripts/misc/legend.py @@ -4,11 +4,7 @@ from pathlib import Path -from worldcereal.utils.legend import ( - delete_legend_file, - get_latest_legend_from_artifactory, - upload_legend_csv_artifactory, -) +from worldcereal.utils.legend import delete_legend_file, get_legend, upload_legend if __name__ == "__main__": @@ -19,10 +15,10 @@ date = "20241231" # Upload the legend to Artifactory - link = upload_legend_csv_artifactory(srcpath, date) + link = upload_legend(srcpath, date) # Download the latest legend from Artifactory - legend = get_latest_legend_from_artifactory() + legend = get_legend() # Delete the uploaded legend from Artifactory delete_legend_file(link) diff --git a/src/worldcereal/utils/legend.py b/src/worldcereal/utils/legend.py index e0a9da8..bed3fc7 100644 --- a/src/worldcereal/utils/legend.py +++ b/src/worldcereal/utils/legend.py @@ -37,21 +37,21 @@ def _get_artifactory_credentials(): return artifactory_username, artifactory_password -def _upload_with_retries( +def _upload_file( srcpath, - targetpath, + dstpath, username, password, retries=3, wait=2, ) -> str: - """_summary_ + """Function taking care of file upload to Artifactory with retries. Parameters ---------- srcpath : Path Path to csv file that needs to be uploaded to Artifactory. - targetpath : str + dstpath : str Full link to the target location in Artifactory. username : str Artifactory username. @@ -68,14 +68,12 @@ def _upload_with_retries( Full link to the target location in Artifactory. """ # construct the curl command - cmd = f"curl -u{username}:{password} -T {srcpath} " f'"{targetpath}"' + cmd = f"curl -u{username}:{password} -T {srcpath} " f'"{dstpath}"' # execute the command with retries for attempt in range(retries): try: - logger.info( - f"Uploading `{srcpath}` to `{targetpath}` (Attempt {attempt + 1})" - ) + logger.info(f"Uploading `{srcpath}` to `{dstpath}` (Attempt {attempt + 1})") output, _ = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True @@ -97,7 +95,7 @@ def _upload_with_retries( raise RuntimeError("Failed to upload file") -def upload_legend_csv_artifactory( +def upload_legend( srcpath: Path, date: str, ) -> str: @@ -124,22 +122,22 @@ def upload_legend_csv_artifactory( artifactory_username, artifactory_password = _get_artifactory_credentials() # We upload the file with a specific date tag and also with a "latest" tag - target_names = [ + dst_names = [ f"WorldCereal_LC_CT_legend_{date}.csv", "WorldCereal_LC_CT_legend_latest.csv", ] - targetpaths = [f"{ARTIFACTORY_BASE_URL}legend/{n}" for n in target_names] + dstpaths = [f"{ARTIFACTORY_BASE_URL}legend/{n}" for n in dst_names] - for targetpath in targetpaths: - artifactory_link = _upload_with_retries( - srcpath, targetpath, artifactory_username, artifactory_password + for dstpath in dstpaths: + artifactory_link = _upload_file( + srcpath, dstpath, artifactory_username, artifactory_password ) # Return the download link of latest uploaded file return artifactory_link -def get_latest_legend_from_artifactory() -> pd.DataFrame: +def get_legend() -> pd.DataFrame: """Get the latest version of the WorldCereal land cover/crop type legend from Artifactory as a Pandas Dataframe. @@ -150,9 +148,9 @@ def get_latest_legend_from_artifactory() -> pd.DataFrame: """ # create temporary folder with tempfile.TemporaryDirectory() as tmpdirname: - tmpdir = Path(tmpdirname) + dstpath = Path(tmpdirname) # download the latest legend file - legend_path = _download_latest_legend_from_artifactory(tmpdir) + legend_path = _download_legend(dstpath) # read the legend file legend = pd.read_csv(legend_path, header=0, sep=";") @@ -164,8 +162,8 @@ def get_latest_legend_from_artifactory() -> pd.DataFrame: return legend -def _download_latest_legend_from_artifactory( - download_path: Path, +def _download_legend( + dstpath: Path, retries=3, wait=2, ) -> Path: @@ -173,7 +171,7 @@ def _download_latest_legend_from_artifactory( to a specified file path. Parameters ---------- - download_path : Path + dstpath : Path Folder where the legend needs to be downloaded to. retries : int, optional Number of retries, by default 3 @@ -191,8 +189,8 @@ def _download_latest_legend_from_artifactory( # Construct the download link and curl command latest_file = "WorldCereal_LC_CT_legend_latest.csv" link = f"{ARTIFACTORY_BASE_URL}legend/{latest_file}" - download_path.mkdir(parents=True, exist_ok=True) - download_file = download_path / latest_file + dstpath.mkdir(parents=True, exist_ok=True) + download_file = dstpath / latest_file cmd = f'curl -o {download_file} "{link}"' for attempt in range(retries): @@ -216,14 +214,14 @@ def _download_latest_legend_from_artifactory( def delete_legend_file( - path: str, + srcpath: str, retries=3, wait=2, ) -> None: """Deletes a legend file from Artifactory. Parameters ---------- - path : str + srcpath : str Path to the legend file in Artifactory. retries : int, optional Number of retries, by default 3 @@ -234,12 +232,12 @@ def delete_legend_file( artifactory_username, artifactory_password = _get_artifactory_credentials() # construct the curl command - cmd = f"curl -u{artifactory_username}:{artifactory_password} -X DELETE {path}" + cmd = f"curl -u{artifactory_username}:{artifactory_password} -X DELETE {srcpath}" # execute the command with retries for attempt in range(retries): try: - logger.info(f"Deleting legend file: {path} (Attempt {attempt + 1})") + logger.info(f"Deleting legend file: {srcpath} (Attempt {attempt + 1})") subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() logger.info("Deletion successful") From 92cf46488e33603c96093100c988a1fbc02a7bc3 Mon Sep 17 00:00:00 2001 From: jdegerickx Date: Tue, 7 Jan 2025 10:49:58 +0100 Subject: [PATCH 6/8] reworked logging logic --- src/worldcereal/utils/legend.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/worldcereal/utils/legend.py b/src/worldcereal/utils/legend.py index bed3fc7..84ab116 100644 --- a/src/worldcereal/utils/legend.py +++ b/src/worldcereal/utils/legend.py @@ -73,7 +73,9 @@ def _upload_file( # execute the command with retries for attempt in range(retries): try: - logger.info(f"Uploading `{srcpath}` to `{dstpath}` (Attempt {attempt + 1})") + logger.debug( + f"Uploading `{srcpath}` to `{dstpath}` (Attempt {attempt + 1})" + ) output, _ = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True @@ -82,14 +84,15 @@ def _upload_file( # Parse as JSON if applicable parsed_output = json.loads(decoded_output) - logger.info("Upload successful") + logger.debug("Upload successful") return parsed_output.get("downloadUri") except (subprocess.CalledProcessError, json.JSONDecodeError) as e: - logger.error(f"Attempt {attempt + 1} failed: {e}") + logger.warning(f"Attempt {attempt + 1} failed: {e}") if attempt < retries - 1: time.sleep(wait) else: + logger.error(f"Failed to upload file to: {dstpath}") raise raise RuntimeError("Failed to upload file") @@ -195,19 +198,20 @@ def _download_legend( for attempt in range(retries): try: - logger.info( + logger.debug( f"Downloading latest legend file: {latest_file} (Attempt {attempt + 1})" ) subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() - logger.info("Download successful!") + logger.debug("Download successful!") return download_file except subprocess.CalledProcessError as e: - logger.error(f"Attempt {attempt + 1} failed: {e}") + logger.warning(f"Attempt {attempt + 1} failed: {e}") if attempt < retries - 1: time.sleep(wait) else: + logger.error("Failed to download latest legend from Artifactory") raise raise RuntimeError("Failed to download file") @@ -237,17 +241,18 @@ def delete_legend_file( # execute the command with retries for attempt in range(retries): try: - logger.info(f"Deleting legend file: {srcpath} (Attempt {attempt + 1})") + logger.debug(f"Deleting legend file: {srcpath} (Attempt {attempt + 1})") subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() - logger.info("Deletion successful") + logger.debug("Deletion successful") return except subprocess.CalledProcessError as e: - logger.error(f"Attempt {attempt + 1} failed: {e}") + logger.warning(f"Attempt {attempt + 1} failed: {e}") if attempt < retries - 1: time.sleep(wait) else: + logger.error(f"Failed to delete legend from Artifactory: {srcpath}") raise raise RuntimeError("Failed to delete file") From c61890b890ffbff33b6e0717494a922e7c35d2ea Mon Sep 17 00:00:00 2001 From: jdegerickx Date: Tue, 7 Jan 2025 11:15:18 +0100 Subject: [PATCH 7/8] retry mechanism captured in one function --- src/worldcereal/utils/legend.py | 124 +++++++++++++++++--------------- 1 file changed, 66 insertions(+), 58 deletions(-) diff --git a/src/worldcereal/utils/legend.py b/src/worldcereal/utils/legend.py index 84ab116..e6383e6 100644 --- a/src/worldcereal/utils/legend.py +++ b/src/worldcereal/utils/legend.py @@ -37,6 +37,56 @@ def _get_artifactory_credentials(): return artifactory_username, artifactory_password +def _run_curl_cmd(cmd: str, logging_msg: str, retries=3, wait=2) -> dict: + """Run a curl command with retries and return the output. + + Parameters + ---------- + cmd : str + The curl command to be executed + logging_msg : str + Message to be logged + retries : int, optional + Number of retries, by default 3 + wait : int, optional + Seconds to wait in between retries, by default 2 + Raises + ------ + RuntimeError + if the command fails after all retries + Returns + ------- + dict + The parsed output of the curl command + """ + + for attempt in range(retries): + try: + logger.debug(f"{logging_msg} (Attempt {attempt + 1})") + output, _ = subprocess.Popen( + cmd, stdout=subprocess.PIPE, shell=True + ).communicate() + decoded_output = output.decode("utf-8") + + # Parse as JSON if applicable + if decoded_output != "": + parsed_output = json.loads(decoded_output) + else: + parsed_output = {} + logger.debug("Execution successful") + return parsed_output + + except (subprocess.CalledProcessError, json.JSONDecodeError) as e: + logger.warning(f"Attempt {attempt + 1} failed: {e}") + if attempt < retries - 1: + time.sleep(wait) + else: + logger.error(f"Failed to execute command: {cmd}") + raise + + raise RuntimeError(f"Failed to execute command: {cmd}") + + def _upload_file( srcpath, dstpath, @@ -70,32 +120,14 @@ def _upload_file( # construct the curl command cmd = f"curl -u{username}:{password} -T {srcpath} " f'"{dstpath}"' - # execute the command with retries - for attempt in range(retries): - try: - logger.debug( - f"Uploading `{srcpath}` to `{dstpath}` (Attempt {attempt + 1})" - ) + # construct logging message + logging_msg = f"Uploading `{srcpath}` to `{dstpath}`" - output, _ = subprocess.Popen( - cmd, stdout=subprocess.PIPE, shell=True - ).communicate() - decoded_output = output.decode("utf-8") - - # Parse as JSON if applicable - parsed_output = json.loads(decoded_output) - logger.debug("Upload successful") - return parsed_output.get("downloadUri") - - except (subprocess.CalledProcessError, json.JSONDecodeError) as e: - logger.warning(f"Attempt {attempt + 1} failed: {e}") - if attempt < retries - 1: - time.sleep(wait) - else: - logger.error(f"Failed to upload file to: {dstpath}") - raise + # execute the command with retries + output = _run_curl_cmd(cmd, logging_msg, retries=retries, wait=wait) - raise RuntimeError("Failed to upload file") + # return the download link + return output["downloadUri"] def upload_legend( @@ -196,25 +228,14 @@ def _download_legend( download_file = dstpath / latest_file cmd = f'curl -o {download_file} "{link}"' - for attempt in range(retries): - try: - logger.debug( - f"Downloading latest legend file: {latest_file} (Attempt {attempt + 1})" - ) - subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() - logger.debug("Download successful!") - - return download_file + # construct logging message + logging_msg = f"Downloading latest legend file: {latest_file}" - except subprocess.CalledProcessError as e: - logger.warning(f"Attempt {attempt + 1} failed: {e}") - if attempt < retries - 1: - time.sleep(wait) - else: - logger.error("Failed to download latest legend from Artifactory") - raise + # execute the command with retries + _run_curl_cmd(cmd, logging_msg, retries=retries, wait=wait) - raise RuntimeError("Failed to download file") + # return the path to the downloaded file + return download_file def delete_legend_file( @@ -238,21 +259,8 @@ def delete_legend_file( # construct the curl command cmd = f"curl -u{artifactory_username}:{artifactory_password} -X DELETE {srcpath}" - # execute the command with retries - for attempt in range(retries): - try: - logger.debug(f"Deleting legend file: {srcpath} (Attempt {attempt + 1})") - subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True).communicate() + # construct logging message + logging_msg = f"Deleting legend file: {srcpath}" - logger.debug("Deletion successful") - return - - except subprocess.CalledProcessError as e: - logger.warning(f"Attempt {attempt + 1} failed: {e}") - if attempt < retries - 1: - time.sleep(wait) - else: - logger.error(f"Failed to delete legend from Artifactory: {srcpath}") - raise - - raise RuntimeError("Failed to delete file") + # execute the command with retries + _run_curl_cmd(cmd, logging_msg, retries=retries, wait=wait) From 2f96d1ec5bcb3e91e3a594c4a294b931f7c26606 Mon Sep 17 00:00:00 2001 From: jdegerickx Date: Tue, 7 Jan 2025 12:26:01 +0100 Subject: [PATCH 8/8] switch from curl to http requests --- src/worldcereal/utils/legend.py | 161 +++++++++++++------------------- 1 file changed, 66 insertions(+), 95 deletions(-) diff --git a/src/worldcereal/utils/legend.py b/src/worldcereal/utils/legend.py index e6383e6..6f331b0 100644 --- a/src/worldcereal/utils/legend.py +++ b/src/worldcereal/utils/legend.py @@ -1,11 +1,10 @@ -import json import os -import subprocess import tempfile import time from pathlib import Path import pandas as pd +import requests from loguru import logger ARTIFACTORY_BASE_URL = ( @@ -37,66 +36,48 @@ def _get_artifactory_credentials(): return artifactory_username, artifactory_password -def _run_curl_cmd(cmd: str, logging_msg: str, retries=3, wait=2) -> dict: - """Run a curl command with retries and return the output. - +def _run_request(method: str, url: str, **kwargs) -> requests.Response: + """Run an HTTP request with retries and return the response. Parameters ---------- - cmd : str - The curl command to be executed - logging_msg : str - Message to be logged - retries : int, optional - Number of retries, by default 3 - wait : int, optional - Seconds to wait in between retries, by default 2 + method : str + HTTP method to be used + url : str + URL to send the request to + kwargs : dict + Additional keyword arguments, may include `retries`, `wait` and `logging_msg` Raises ------ RuntimeError if the command fails after all retries Returns ------- - dict - The parsed output of the curl command + requests.Response + The response of the http request """ + retries = kwargs.pop("retries", 3) + wait = kwargs.pop("wait", 2) + logging_msg = kwargs.pop("logging_msg", "Request") for attempt in range(retries): try: logger.debug(f"{logging_msg} (Attempt {attempt + 1})") - output, _ = subprocess.Popen( - cmd, stdout=subprocess.PIPE, shell=True - ).communicate() - decoded_output = output.decode("utf-8") - - # Parse as JSON if applicable - if decoded_output != "": - parsed_output = json.loads(decoded_output) - else: - parsed_output = {} + response = requests.request(method, url, **kwargs) + response.raise_for_status() logger.debug("Execution successful") - return parsed_output - - except (subprocess.CalledProcessError, json.JSONDecodeError) as e: + return response + except requests.RequestException as e: logger.warning(f"Attempt {attempt + 1} failed: {e}") if attempt < retries - 1: time.sleep(wait) else: - logger.error(f"Failed to execute command: {cmd}") + logger.error(f"Failed to execute request: {url}") raise - - raise RuntimeError(f"Failed to execute command: {cmd}") + raise RuntimeError(f"Failed to execute request: {url}") -def _upload_file( - srcpath, - dstpath, - username, - password, - retries=3, - wait=2, -) -> str: - """Function taking care of file upload to Artifactory with retries. - +def _upload_file(srcpath, dstpath, username, password, retries=3, wait=2): + """Upload a file to Artifactory. Parameters ---------- srcpath : Path @@ -111,30 +92,33 @@ def _upload_file( Number of retries, by default 3 wait : int, optional Seconds to wait in between retries, by default 2 - Returns ------- str Full link to the target location in Artifactory. - """ - # construct the curl command - cmd = f"curl -u{username}:{password} -T {srcpath} " f'"{dstpath}"' - - # construct logging message - logging_msg = f"Uploading `{srcpath}` to `{dstpath}`" - - # execute the command with retries - output = _run_curl_cmd(cmd, logging_msg, retries=retries, wait=wait) - # return the download link - return output["downloadUri"] + """ + url = dstpath + with open(srcpath, "rb") as f: + file_content = f.read() # Read the file content as binary + headers = { + "Content-Type": "application/octet-stream", # Set the appropriate content type + } + response = _run_request( + "PUT", + url, + data=file_content, # Send raw file content in the request body + headers=headers, + auth=(username, password), + logging_msg=f"Uploading `{srcpath}` to `{dstpath}`", + retries=retries, + wait=wait, + ) + return response.json()["downloadUri"] -def upload_legend( - srcpath: Path, - date: str, -) -> str: - """Uploads a CSV file containing the worldcereal land cover/crop type legend to Artifactory. +def upload_legend(srcpath: Path, date: str) -> str: + """Upload a CSV file containing the WorldCereal land cover/crop type legend to Artifactory. Parameters ---------- srcpath : Path @@ -173,23 +157,16 @@ def upload_legend( def get_legend() -> pd.DataFrame: - """Get the latest version of the WorldCereal land cover/crop type legend from Artifactory - as a Pandas Dataframe. + """Get the latest version of the WorldCereal land cover/crop type legend as a Pandas DataFrame.""" - Returns - ------- - pd.DataFrame - The WorldCereal land cover/crop type legend. - """ # create temporary folder with tempfile.TemporaryDirectory() as tmpdirname: dstpath = Path(tmpdirname) - # download the latest legend file legend_path = _download_legend(dstpath) # read the legend file legend = pd.read_csv(legend_path, header=0, sep=";") - # clean up the legend file + # clean up the legend for use legend = legend[legend["ewoc_code"].notna()] drop_columns = [c for c in legend.columns if "Unnamed:" in c] legend.drop(columns=drop_columns, inplace=True) @@ -197,13 +174,8 @@ def get_legend() -> pd.DataFrame: return legend -def _download_legend( - dstpath: Path, - retries=3, - wait=2, -) -> Path: - """Downloads the latest version of the WorldCereal land cover/crop type legend from Artifactory - to a specified file path. +def _download_legend(dstpath: Path, retries=3, wait=2) -> Path: + """Download the latest version of the WorldCereal legend from Artifactory. Parameters ---------- dstpath : Path @@ -221,29 +193,28 @@ def _download_legend( FileNotFoundError Raises if no legend files are found in Artifactory. """ - # Construct the download link and curl command + # Construct the download link latest_file = "WorldCereal_LC_CT_legend_latest.csv" link = f"{ARTIFACTORY_BASE_URL}legend/{latest_file}" dstpath.mkdir(parents=True, exist_ok=True) download_file = dstpath / latest_file - cmd = f'curl -o {download_file} "{link}"' - # construct logging message - logging_msg = f"Downloading latest legend file: {latest_file}" + response = _run_request( + "GET", + link, + logging_msg=f"Downloading latest legend file: {latest_file}", + retries=retries, + wait=wait, + ) - # execute the command with retries - _run_curl_cmd(cmd, logging_msg, retries=retries, wait=wait) + with open(download_file, "wb") as f: + f.write(response.content) - # return the path to the downloaded file return download_file -def delete_legend_file( - srcpath: str, - retries=3, - wait=2, -) -> None: - """Deletes a legend file from Artifactory. +def delete_legend_file(srcpath: str, retries=3, wait=2): + """Delete a legend file from Artifactory. Parameters ---------- srcpath : str @@ -256,11 +227,11 @@ def delete_legend_file( # Get Artifactory credentials artifactory_username, artifactory_password = _get_artifactory_credentials() - # construct the curl command - cmd = f"curl -u{artifactory_username}:{artifactory_password} -X DELETE {srcpath}" - - # construct logging message - logging_msg = f"Deleting legend file: {srcpath}" - - # execute the command with retries - _run_curl_cmd(cmd, logging_msg, retries=retries, wait=wait) + _run_request( + "DELETE", + srcpath, + auth=(artifactory_username, artifactory_password), + logging_msg=f"Deleting legend file: {srcpath}", + retries=retries, + wait=wait, + )