diff --git a/.coveragerc b/.coveragerc index 4a68097e..e212c43d 100644 --- a/.coveragerc +++ b/.coveragerc @@ -5,8 +5,13 @@ omit = *dev* *docs* *tutorials* - gpm_api/tests/* + gpm_api/bucket/* gpm_api/cli/* + gpm_api/encoding/* + gpm_api/etc/* + gpm_api/retrieval/* + gpm_api/tests/* + gpm_api/_version.py [report] exclude_lines = diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b7887361..6b26f5ae 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,6 +31,8 @@ jobs: steps: - uses: actions/checkout@v3 + with: + submodules: 'recursive' - name: Set up micromamba uses: mamba-org/setup-micromamba@v1 @@ -48,6 +50,8 @@ jobs: - name: Test with pytest run: | pytest + env: + HDF5_USE_FILE_LOCKING: FALSE - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v3 diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..0855cdca --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "gpm_api/tests/data"] + path = gpm_api/tests/data + url = git@github.com:ghiggi/gpm_api_test_data.git diff --git a/gpm_api/dataset/datatree.py b/gpm_api/dataset/datatree.py index c6d7d5f8..667d2c3d 100644 --- a/gpm_api/dataset/datatree.py +++ b/gpm_api/dataset/datatree.py @@ -72,7 +72,7 @@ def _identify_error(e, filepath): msg = f"The file {filepath} is corrupted and is being removed. It must be redownload." raise ValueError(msg) elif "[Errno -51] NetCDF: Unknown file format" in error_str: - msg = "The GPM-API is not currently able to read the file format of {filepath}. Report the issue please." + msg = f"The GPM-API is not currently able to read the file format of {filepath}. Report the issue please." raise ValueError(msg) elif "lock" in error_str: msg = "Unfortunately, HDF locking is occurring." diff --git a/gpm_api/etc/product_def.yml b/gpm_api/etc/product_def.yml index 6270cdbd..0b7a946e 100644 --- a/gpm_api/etc/product_def.yml +++ b/gpm_api/etc/product_def.yml @@ -1667,6 +1667,8 @@ pps_nrt_dir: null pps_rs_dir: radar ges_disc_dir: null + start_time: null + end_time: null available_versions: - 4 scan_modes: @@ -1680,6 +1682,8 @@ pps_nrt_dir: null pps_rs_dir: radar ges_disc_dir: null + start_time: null + end_time: null available_versions: - 4 scan_modes: diff --git a/gpm_api/io/checks.py b/gpm_api/io/checks.py index 4265d3c8..f8fa4fe7 100644 --- a/gpm_api/io/checks.py +++ b/gpm_api/io/checks.py @@ -9,14 +9,6 @@ import numpy as np -def is_not_empty(x): - return bool(x) - - -def is_empty(x): - return not x - - def check_base_dir(base_dir): """Check base directory path. @@ -192,7 +184,7 @@ def check_time(time): if isinstance(time, np.ndarray): if np.issubdtype(time.dtype, np.datetime64): if time.size == 1: - time = time.astype("datetime64[s]").tolist() + time = time[0].astype("datetime64[s]").tolist() else: raise ValueError("Expecting a single timestep!") else: diff --git a/gpm_api/io/download.py b/gpm_api/io/download.py index d04aa5b4..34ff4b9c 100644 --- a/gpm_api/io/download.py +++ b/gpm_api/io/download.py @@ -30,7 +30,6 @@ check_remote_storage, check_start_end_time, check_valid_time_request, - is_empty, ) from gpm_api.io.data_integrity import ( check_archive_integrity, @@ -267,6 +266,8 @@ def _get_single_file_cmd_function(transfer_tool, storage): "pps": {"wget": wget_pps_cmd, "curl": curl_pps_cmd}, "ges_disc": {"wget": wget_ges_disc_cmd, "curl": curl_ges_disc_cmd}, } + if transfer_tool not in dict_fun[storage].keys(): + raise NotImplementedError(f"Unsupported transfer tool: {transfer_tool}") func = dict_fun[storage][transfer_tool] return func @@ -513,7 +514,7 @@ def download_files( remote_filepaths=remote_filepaths, force_download=force_download, ) - if is_empty(new_remote_filepaths): + if len(new_remote_filepaths) == 0: if verbose: print(f"The requested files are already on disk at {local_filepaths}.") return None @@ -716,7 +717,7 @@ def _download_daily_data( ) # -------------------------------------------------------------------------. ## If no file to retrieve on NASA PPS, return None - if is_empty(remote_filepaths): + if len(remote_filepaths) == 0: if warn_missing_files: msg = f"No data found on PPS on date {date} for product {product}" warnings.warn(msg, GPMDownloadWarning) @@ -735,7 +736,7 @@ def _download_daily_data( remote_filepaths=remote_filepaths, force_download=force_download, ) - if is_empty(remote_filepaths): + if len(remote_filepaths) == 0: return [-1], available_version # flag for already on disk # -------------------------------------------------------------------------. diff --git a/gpm_api/io/filter.py b/gpm_api/io/filter.py index d701705e..df7a2a37 100644 --- a/gpm_api/io/filter.py +++ b/gpm_api/io/filter.py @@ -272,11 +272,6 @@ def filter_by_time(filepaths, start_time=None, end_time=None): # - Retrieve start_time and end_time of GPM granules l_start_time, l_end_time = get_start_end_time_from_filepaths(filepaths) - # -------------------------------------------------------------------------. - # Check file are available - if len(l_start_time) == 0: - return [] - # -------------------------------------------------------------------------. # Select granules with data within the start and end time # - Case 1 diff --git a/gpm_api/io/find.py b/gpm_api/io/find.py index fc354ce3..22e26fc7 100644 --- a/gpm_api/io/find.py +++ b/gpm_api/io/find.py @@ -20,7 +20,6 @@ check_start_end_time, check_storage, check_valid_time_request, - is_empty, ) from gpm_api.io.filter import filter_filepaths from gpm_api.io.ges_disc import get_gesdisc_daily_filepaths @@ -96,7 +95,7 @@ def _check_correct_version(filepaths, product, version): return filepaths, files_version -def ensure_valid_start_date(start_date, product): +def _ensure_valid_start_date(start_date, product): """Ensure that the product directory exists for start_date.""" if product == "2A-SAPHIR-MT1-CLIM": min_start_date = "2011-10-13 00:00:00" @@ -169,7 +168,7 @@ def find_daily_filepaths( version=version, verbose=verbose, ) - if is_empty(filepaths): + if len(filepaths) == 0: if storage == "local" and verbose: version_str = str(int(version)) print( @@ -188,7 +187,7 @@ def find_daily_filepaths( start_time=start_time, end_time=end_time, ) - if is_empty(filepaths): + if len(filepaths) == 0: return [], [] ## -----------------------------------------------------------------------. @@ -250,7 +249,7 @@ def find_filepaths( # --> Example granules starting at 23:XX:XX in the day before and extending to 01:XX:XX start_date = datetime.datetime(start_time.year, start_time.month, start_time.day) start_date = start_date - datetime.timedelta(days=1) - start_date = ensure_valid_start_date(start_date=start_date, product=product) + start_date = _ensure_valid_start_date(start_date=start_date, product=product) end_date = datetime.datetime(end_time.year, end_time.month, end_time.day) date_range = pd.date_range(start=start_date, end=end_date, freq="D") dates = list(date_range.to_pydatetime()) diff --git a/gpm_api/io/ges_disc.py b/gpm_api/io/ges_disc.py index e17f8d9a..342be4d2 100644 --- a/gpm_api/io/ges_disc.py +++ b/gpm_api/io/ges_disc.py @@ -5,7 +5,6 @@ @author: ghiggi """ import datetime -import os import re import subprocess @@ -42,7 +41,7 @@ def _get_href_value(input_string): return href_value -def _get_gesc_disc_list_path(url): +def _get_ges_disc_list_path(url): # Retrieve url content # - If it returns something, means url is correct wget_output = _get_ges_disc_url_content(url) @@ -51,7 +50,7 @@ def _get_gesc_disc_list_path(url): list_content = [s for s in list_content if s != ""] if len(list_content) == 0: raise ValueError(f"The GES DISC {url} directory is empty.") - list_path = [os.path.join(url, s) for s in list_content] + list_path = [f"{url}/{s}" for s in list_content] return list_path @@ -72,11 +71,11 @@ def _get_gesc_disc_list_path(url): def _get_ges_disc_server(product): # TRMM if is_trmm_product(product): - ges_disc_base_url = "https://disc2.gesdisc.eosdis.nasa.gov/data/" + ges_disc_base_url = "https://disc2.gesdisc.eosdis.nasa.gov/data" # GPM else: - ges_disc_base_url = "https://gpm1.gesdisc.eosdis.nasa.gov/data" + # ges_disc_base_url = "https://gpm1.gesdisc.eosdis.nasa.gov/data" ges_disc_base_url = "https://gpm2.gesdisc.eosdis.nasa.gov/data" return ges_disc_base_url @@ -114,9 +113,11 @@ def _get_ges_disc_product_directory_tree(product, date, version): # Specify the directory tree # --> TODO: currently specified only for L1 and L2 - directory_tree = os.path.join( - folder_name, - datetime.datetime.strftime(date, "%Y/%j"), + directory_tree = "/".join( + [ + folder_name, + datetime.datetime.strftime(date, "%Y/%j"), + ] ) return directory_tree @@ -148,7 +149,7 @@ def get_ges_disc_product_directory(product, date, version): product=product, date=date, version=version ) # Define product directory where data are listed - url_product_dir = os.path.join(url_server, dir_structure) + url_product_dir = f"{url_server}/{dir_structure}" return url_product_dir @@ -178,7 +179,7 @@ def _get_gesdisc_file_list(url_product_dir, product, date, version, verbose=True Default is False. Whether to specify when data are not available for a specific date. """ try: - filepaths = _get_gesc_disc_list_path(url_product_dir) + filepaths = _get_ges_disc_list_path(url_product_dir) except Exception as e: # If url not exist, raise an error if "was not found on the GES DISC server" in str(e): @@ -248,5 +249,5 @@ def define_gesdisc_filepath(product, product_type, date, version, filename): # Retrieve product directory url url_product_dir = get_ges_disc_product_directory(product=product, date=date, version=version) # Define GES DISC filepath - fpath = os.path.join(url_product_dir, filename) + fpath = f"{url_product_dir}/{filename}" return fpath diff --git a/gpm_api/io/info.py b/gpm_api/io/info.py index 5f3966d4..26ea4a4d 100644 --- a/gpm_api/io/info.py +++ b/gpm_api/io/info.py @@ -116,10 +116,7 @@ def _get_info_from_filename(fname): # Add product information # - ATTENTION: can not be inferred for products not defined in etc/product.yml - try: - info_dict["product"] = get_product_from_filepath(fname) - except Exception: - pass + info_dict["product"] = get_product_from_filepath(fname) # Return info dictionary return info_dict diff --git a/gpm_api/io/pps.py b/gpm_api/io/pps.py index 3ecf9548..028f40b1 100644 --- a/gpm_api/io/pps.py +++ b/gpm_api/io/pps.py @@ -5,7 +5,6 @@ @author: ghiggi """ import datetime -import os import subprocess from dateutil.relativedelta import relativedelta @@ -77,7 +76,7 @@ def _get_pps_nrt_product_dir(product, date): folder_name = _get_pps_nrt_product_folder_name(product) # Specify the directory tree if product in available_products(product_type="NRT", product_category="IMERG"): - directory_tree = os.path.join(folder_name, datetime.datetime.strftime(date, "%Y%m")) + directory_tree = f"{folder_name}/{datetime.datetime.strftime(date, '%Y%m')}" else: directory_tree = folder_name return directory_tree @@ -104,20 +103,24 @@ def _get_pps_rs_product_dir(product, date, version): # Specify the directory tree for current RS version if version == 7: - directory_tree = os.path.join( - "gpmdata", - datetime.datetime.strftime(date, "%Y/%m/%d"), - folder_name, + directory_tree = "/".join( + [ + "gpmdata", + datetime.datetime.strftime(date, "%Y/%m/%d"), + folder_name, + ] ) # Specify the directory tree for old RS version else: # version in [4, 5, 6]: version_str = "V0" + str(int(version)) - directory_tree = os.path.join( - "gpmallversions", - version_str, - datetime.datetime.strftime(date, "%Y/%m/%d"), - folder_name, + directory_tree = "/".join( + [ + "gpmallversions", + version_str, + datetime.datetime.strftime(date, "%Y/%m/%d"), + folder_name, + ] ) # Return the directory tree @@ -194,7 +197,7 @@ def get_pps_product_directory(product, product_type, date, version, server_type) product=product, product_type=product_type, date=date, version=version ) # Define product directory where data are listed - url_product_dir = os.path.join(url_server, dir_structure) + url_product_dir = f"{url_server}/{dir_structure}" return url_product_dir @@ -306,9 +309,9 @@ def get_pps_daily_filepaths(product, product_type, date, version, verbose=True): verbose=verbose, ) # Define the complete url of pps filepaths - # - Need to remove the starting "/" to each filepath + # Filepaths start with a "/" url_data_server = _get_pps_data_server(product_type) - filepaths = [os.path.join(url_data_server, filepath[1:]) for filepath in filepaths] + filepaths = [f"{url_data_server}{filepath}" for filepath in filepaths] return filepaths @@ -323,7 +326,7 @@ def define_pps_filepath(product, product_type, date, version, filename): server_type="data", ) # Define PPS filepath - fpath = os.path.join(url_product_dir, filename) + fpath = f"{url_product_dir}/{filename}" return fpath diff --git a/gpm_api/io/products.py b/gpm_api/io/products.py index 24d56fc3..ec0d78a7 100644 --- a/gpm_api/io/products.py +++ b/gpm_api/io/products.py @@ -99,8 +99,9 @@ def get_product_start_time(product): def get_product_end_time(product): """Provide the product end_time.""" - end_time = get_product_info(product)["end_time"] - end_time = datetime.datetime.utcnow() + end_time = get_info_dict()[product]["end_time"] + if end_time is None: + end_time = datetime.datetime.utcnow() return end_time diff --git a/gpm_api/tests/conftest.py b/gpm_api/tests/conftest.py index 9b9bf983..82ef6af2 100644 --- a/gpm_api/tests/conftest.py +++ b/gpm_api/tests/conftest.py @@ -1,6 +1,6 @@ import pytest import datetime -from typing import Any, List, Dict, Tuple +from typing import Any, List, Dict, Tuple, Iterable from gpm_api.io.products import get_info_dict, available_products import posixpath as pxp import ntpath as ntp @@ -10,7 +10,7 @@ @pytest.fixture(scope="session", autouse=True) -def mock_configuration(): +def mock_configuration() -> Iterable[Dict[str, str]]: """Patch the user configuration for entire session Doing this will retrieve the configuration from pytest memory and not @@ -20,6 +20,8 @@ def mock_configuration(): mocked_configuration = { "username_pps": "testuser", "password_pps": "testuser", + "username_earthdata": "testuser", + "password_earthdata": "testuser", "gpm_base_dir": os.path.join( os.getcwd(), "gpm_api", @@ -33,7 +35,7 @@ def mock_configuration(): "read_gpm_api_configs", return_value=mocked_configuration, ): - yield + yield mocked_configuration @pytest.fixture @@ -78,6 +80,13 @@ def products() -> List[str]: return available_products() +@pytest.fixture +def product_info() -> Dict[str, dict]: + """Return a dictionary of product info""" + + return get_info_dict() + + @pytest.fixture def remote_filepaths() -> Dict[str, Dict[str, Any]]: """Return a list of probable GPM server paths""" @@ -94,6 +103,7 @@ def remote_filepaths() -> Dict[str, Dict[str, Any]]: "start_time": datetime.datetime(2020, 7, 5, 17, 0, 44), "end_time": datetime.datetime(2020, 7, 5, 18, 33, 17), "version": 7, + "granule_id": 36092, }, "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/2A.GPM.DPR.V9-20211125.20200705-S183318-E200550.036093.V07A.HDF5": { "year": 2020, @@ -105,6 +115,7 @@ def remote_filepaths() -> Dict[str, Dict[str, Any]]: "start_time": datetime.datetime(2020, 7, 5, 18, 33, 18), "end_time": datetime.datetime(2020, 7, 5, 20, 5, 50), "version": 7, + "granule_id": 36093, }, "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/2A.GPM.DPR.V9-20211125.20200705-S200551-E213823.036094.V07A.HDF5": { "year": 2020, @@ -116,6 +127,70 @@ def remote_filepaths() -> Dict[str, Dict[str, Any]]: "start_time": datetime.datetime(2020, 7, 5, 20, 5, 51), "end_time": datetime.datetime(2020, 7, 5, 21, 38, 23), "version": 7, + "granule_id": 36094, + }, + # Over two days + "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/2A.GPM.DPR.V9-20211125.20200705-S231057-E004329.036096.V07A.HDF5": { + "year": 2020, + "month": 7, + "day": 5, + "product": "2A-DPR", + "product_category": "radar", + "product_type": "RS", + "start_time": datetime.datetime(2020, 7, 5, 23, 10, 57), + "end_time": datetime.datetime(2020, 7, 6, 0, 43, 29), + "version": 7, + "granule_id": 36096, + }, + # NRT + "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/2A.GPM.DPR.V9-20211125.20200705-S170044-E183317.V07A.HDF5": { + "year": 2020, + "month": 7, + "day": 5, + "product": "2A-DPR", + "product_category": "radar", + "product_type": "NRT", + "start_time": datetime.datetime(2020, 7, 5, 17, 0, 44), + "end_time": datetime.datetime(2020, 7, 5, 18, 33, 17), + "version": 7, + }, + # JAXA + "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/1B/GPMCOR_KAR_2007050002_0135_036081_1BS_DAB_07A.h5": { + "year": 2020, + "month": 7, + "day": 5, + "product": "1B-Ka", + "product_category": "radar", + "product_type": "RS", + "start_time": datetime.datetime(2020, 7, 5, 0, 2, 0), + "end_time": datetime.datetime(2020, 7, 5, 1, 35, 0), + "version": 7, + "granule_id": 36081, + }, + # JAXA over two days + "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/1B/GPMCOR_KUR_2007052310_0043_036096_1BS_DUB_07A.h5": { + "year": 2020, + "month": 7, + "day": 5, + "product": "1B-Ka", + "product_category": "radar", + "product_type": "RS", + "start_time": datetime.datetime(2020, 7, 5, 23, 10, 0), + "end_time": datetime.datetime(2020, 7, 6, 0, 43, 0), + "version": 7, + "granule_id": 36096, + }, + # JAXA NRT + "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/1B/GPMCOR_KAR_2007050002_0135_036081_1BR_DAB_07A.h5": { + "year": 2020, + "month": 7, + "day": 5, + "product": "1B-Ka", + "product_category": "radar", + "product_type": "NRT", + "start_time": datetime.datetime(2020, 7, 5, 0, 2, 0), + "end_time": datetime.datetime(2020, 7, 5, 1, 35, 0), + "version": 7, }, # Include non-ftps folders "ftp://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/2A.GPM.DPR.V9-20211125.20200705-S213824-E231056.036095.V07A.HDF5": { @@ -128,6 +203,7 @@ def remote_filepaths() -> Dict[str, Dict[str, Any]]: "start_time": datetime.datetime(2020, 7, 5, 21, 38, 24), "end_time": datetime.datetime(2020, 7, 5, 23, 10, 56), "version": 7, + "granule_id": 36095, }, "ftp://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/2A.GPM.DPR.V9-20211125.20200705-S231057-E004329.036096.V07A.HDF5": { "year": 2020, @@ -139,6 +215,7 @@ def remote_filepaths() -> Dict[str, Dict[str, Any]]: "start_time": datetime.datetime(2020, 7, 5, 23, 10, 57), "end_time": datetime.datetime(2020, 7, 6, 0, 43, 29), "version": 7, + "granule_id": 36096, }, "ftp://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/2A.GPM.DPR.V9-20211125.20200705-S004330-E021602.036097.V07A.HDF5": { "year": 2020, @@ -150,6 +227,7 @@ def remote_filepaths() -> Dict[str, Dict[str, Any]]: "start_time": datetime.datetime(2020, 7, 5, 0, 43, 30), "end_time": datetime.datetime(2020, 7, 5, 2, 16, 2), "version": 7, + "granule_id": 36097, }, "ftp://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2019/07/05/radar/2A.GPM.DPR.V9-20211125.20190705-S004330-E021602.036097.V07A.HDF5": { "year": 2019, @@ -161,6 +239,7 @@ def remote_filepaths() -> Dict[str, Dict[str, Any]]: "start_time": datetime.datetime(2019, 7, 5, 0, 43, 30), "end_time": datetime.datetime(2019, 7, 5, 2, 16, 2), "version": 7, + "granule_id": 36097, }, # TODO: Add more products with varying attributes ... } diff --git a/gpm_api/tests/data b/gpm_api/tests/data new file mode 160000 index 00000000..82f11362 --- /dev/null +++ b/gpm_api/tests/data @@ -0,0 +1 @@ +Subproject commit 82f113623bf9748c8c6453f7289901b1075442e4 diff --git a/gpm_api/tests/data/GPM/RS/V07/RADAR/2A-DPR/2022/07/05/2A.GPM.DPR.V9-20211125.20220705-S144632-E161905.047447.V07A.HDF5 b/gpm_api/tests/data/GPM/RS/V07/RADAR/2A-DPR/2022/07/05/2A.GPM.DPR.V9-20211125.20220705-S144632-E161905.047447.V07A.HDF5 deleted file mode 100644 index bf22240d..00000000 Binary files a/gpm_api/tests/data/GPM/RS/V07/RADAR/2A-DPR/2022/07/05/2A.GPM.DPR.V9-20211125.20220705-S144632-E161905.047447.V07A.HDF5 and /dev/null differ diff --git a/gpm_api/tests/test_dataset/generate_test_granule_data.py b/gpm_api/tests/test_dataset/generate_test_granule_data.py deleted file mode 100644 index 061b8042..00000000 --- a/gpm_api/tests/test_dataset/generate_test_granule_data.py +++ /dev/null @@ -1,321 +0,0 @@ -import datetime -import h5py -import os - -from dateutil.relativedelta import relativedelta -from tqdm import tqdm - -from gpm_api.dataset.granule import open_granule -from gpm_api.io import download, products as gpm_products -from gpm_api.io.find import find_filepaths - - -RAW_DIRNAME = "raw" -CUT_DIRNAME = "cut" -PROCESSED_DIRNAME = "processed" -KEPT_PRODUCT_TYPES = ["RS"] - - -# Create granule directories ################################################### - - -# Create the granules directory -granules_dir_path = "test_granule_data" -os.makedirs(granules_dir_path, exist_ok=True) - -# Change current working directory to the directory of this script -os.chdir(os.path.dirname(os.path.abspath(__file__))) - - -# Check available versions and scan_modes ###################################### - - -def check_scan_mode_versions(products: dict): - for product, info in products.items(): - version = info["available_versions"][-1] - if f"V{version}" not in info["scan_modes"]: - print( - f"WARNING: {product} does not have scan modes listed for latest version {version}" - ) - - -products = gpm_products.get_info_dict() -check_scan_mode_versions(products) - - -# Download raw granules ######################################################## - - -def download_raw_granules(products: dict) -> None: - print("Listing files to download...") - print('Please ignore the "No data found" warnings') - - pps_filepaths = list_files_to_download(products) - filenames = [pps_filepath.split("/")[-1] for pps_filepath in pps_filepaths] - product_basenames = [os.path.splitext(filename)[0] for filename in filenames] - local_filepaths = [ - os.path.join(granules_dir_path, product_basename, RAW_DIRNAME, product_basename + ".HDF5") - for product_basename in product_basenames - ] - - print("Downloading raw granules...") - - download._download_files( - pps_filepaths, - local_filepaths, - storage="pps", - transfer_tool="wget", - verbose=True, - ) - - -def list_files_to_download(products: dict) -> list[str]: - pps_filepaths = list_pps_filepaths(products) - missing_pps_filepaths = [] - - # Filter out files that have already been downloaded - for pps_filepath in pps_filepaths: - filename = pps_filepath.split("/")[-1] - product_basename = os.path.splitext(filename)[0] - if not os.path.exists(os.path.join(granules_dir_path, product_basename, RAW_DIRNAME)): - missing_pps_filepaths.append(pps_filepath) - - return missing_pps_filepaths - - -def list_pps_filepaths(products: dict) -> list[str]: - pps_filepaths = [] - - for product, product_info in tqdm(products.items()): - if "start_time" not in product_info: - print(f"Skipping {product}: no start_time was provided") - continue - - for product_type in product_info["product_types"]: - if product_type not in KEPT_PRODUCT_TYPES: - continue - - pps_filepath = find_first_pps_filepath( - product, product_type, product_info["start_time"] - ) - if pps_filepath is not None: - pps_filepaths.append(pps_filepath) - - return pps_filepaths - - -def find_first_pps_filepath( - product: str, product_type: str, start_time: datetime.datetime -) -> str | None: - end_time = start_time + relativedelta(days=1) - - pps_filepaths = find_filepaths( - storage="pps", - product=product, - start_time=start_time, - # start_time gets extended to (start_time - 1 day) in find_filepaths. - # May produce "No data found" warning - end_time=end_time, - product_type=product_type, - ) - - if len(pps_filepaths) == 0: - print(f"WARNING: No PPS files found for {product}") - return None - - return pps_filepaths[0] - - -download_raw_granules(products) - - -# Cut raw granules ############################################################# - - -def _get_fixed_dimensions(): - """Dimensions over which to not subset the GPM HDF5 files.""" - fixed_dims = [ - # Elevations / Range - "nBnPSD", - "nBnPSDhi", - "nBnEnv", - "nbinMS", - "nbinHS", - "nbinFS", - "nbin", - # Radar frequency - "nKuKa", - "nfreq", - # PMW frequency - "nemiss", - "nchan1", - "nchan2", - "nchannel1", - "nchannel2", - "nchannel3", - "nchannel4", - "nchannel5", - "nchannel6", - ] - return fixed_dims - - -def _get_subset_shape_chunks(h5_obj, subset_size=5): - """Return the shape and chunks of the subsetted HDF5 file.""" - dimnames = h5_obj.attrs.get("DimensionNames", None) - fixed_dims = _get_fixed_dimensions() - chunks = h5_obj.chunks - if dimnames is not None: - # Get dimension names list - dimnames = dimnames.decode().split(",") - # Get dimension shape - shape = h5_obj.shape - # Create dimension dictionary - dict_dims = dict(zip(dimnames, shape)) - # Create chunks dictionary - dict_chunks = dict(zip(dimnames, chunks)) - # Define subset shape and chunks - subset_shape = [] - subset_chunks = [] - for dim, src_size in dict_dims.items(): - chunk = dict_chunks[dim] - if dim in fixed_dims: - subset_shape.append(src_size) - subset_chunks.append(chunk) - else: - subset_size = min(subset_size, src_size) - subset_chunk = min(chunk, subset_size) - subset_shape.append(subset_size) - subset_chunks.append(subset_chunk) - - # Determine subset shape - subset_shape = tuple(subset_shape) - subset_chunks = tuple(subset_chunks) - else: - subset_shape = h5_obj.shape - subset_chunks = h5_obj.chunks - return subset_shape, subset_chunks - - -def _copy_attrs(src_h5_obj, dst_h5_obj): - """Copy attributes from the source file to the destination file.""" - for key, value in src_h5_obj.attrs.items(): - dst_h5_obj.attrs[key] = value - - -def _copy_datasets(src_group, dst_group, subset_size=5): - for name, h5_obj in src_group.items(): - if isinstance(h5_obj, h5py.Dataset): - # Determine the subset shape (2 indices per dimension) - subset_shape, subset_chunks = _get_subset_shape_chunks(h5_obj, subset_size=subset_size) - - # Create a new dataset in the subset group with the subset shape - subset_dataset = dst_group.create_dataset( - name, subset_shape, dtype=h5_obj.dtype, chunks=subset_chunks - ) - - # Copy data from the src_h5_obj dataset to the subset dataset - subset_dataset[:] = h5_obj[tuple(slice(0, size) for size in subset_shape)] - - # Copy attributes from the src_h5_obj dataset to the subset dataset - _copy_attrs(h5_obj, subset_dataset) - - # Copy encoding information - if h5_obj.compression is not None and "compression" in h5_obj.compression: - subset_dataset.compression = h5_obj.compression - subset_dataset.compression_opts = h5_obj.compression_opts - - elif isinstance(h5_obj, h5py.Group): - # If the h5_obj is a group, create a corresponding group in the subset file and copy its datasets recursively - subgroup = dst_group.create_group(name) - # Copy group attributes - _copy_attrs(h5_obj, subgroup) - _copy_datasets(h5_obj, subgroup, subset_size=subset_size) - - -def create_test_hdf5(src_fpath, dst_fpath): - # Open source HDF5 file - src_file = h5py.File(src_fpath, "r") - - # Create empty HDF5 file - dst_file = h5py.File(dst_fpath, "w") - - # Write a subset of the source HDF5 groups and leafs into the new HDF5 file - _copy_datasets(src_file, dst_file, subset_size=10) - - # Write attributes from the source HDF5 root group to the new HDF5 file root group - _copy_attrs(src_file, dst_file) - - # Close connection - src_file.close() - dst_file.close() - - -def cut_raw_granules(): - product_basenames = os.listdir(granules_dir_path) - - for product_basename in product_basenames: - print(f"Cutting {product_basename}") - raw_filepath = os.path.join( - granules_dir_path, product_basename, RAW_DIRNAME, product_basename + ".HDF5" - ) - cut_dir_path = os.path.join(granules_dir_path, product_basename, CUT_DIRNAME) - cut_filepath = os.path.join(cut_dir_path, product_basename + ".HDF5") - os.makedirs(cut_dir_path, exist_ok=True) - try: - create_test_hdf5(raw_filepath, cut_filepath) - except Exception as e: - print(f"Failed to cut {product_basename}: {e}") - - -cut_raw_granules() - - -# Open granules with gpm_api and save as netCDF ################################ - - -def open_and_save_processed_granules(products: dict): - product_basenames = os.listdir(granules_dir_path) - - for product, product_info in products.items(): - if "start_time" not in product_info: - continue - - product_basename = find_product_basename_from_pattern( - product_basenames, product_info["pattern"] - ) - if product_basename is None: - print(f"Could not find {product} file") - continue - - version = gpm_products.get_last_product_version(product) - scan_modes = product_info["scan_modes"][f"V{version}"] - process_granule(product_basename, scan_modes) - - -def find_product_basename_from_pattern(product_basenames: list[str], pattern: str) -> str | None: - for product_basename in product_basenames: - if pattern.rstrip("*").rstrip("\\d-") in product_basename: # TODO: clarify specs of pattern - return product_basename - - return None - - -def process_granule(product_basename: str, scan_modes: list[str]): - granule_path = os.path.join( - granules_dir_path, product_basename, CUT_DIRNAME, product_basename + ".HDF5" - ) - processed_dir_path = os.path.join(granules_dir_path, product_basename, PROCESSED_DIRNAME) - os.makedirs(processed_dir_path, exist_ok=True) - - for scan_mode in scan_modes: - print(f"Processing {product_basename} with scan mode {scan_mode}") - processed_granule_filepath = os.path.join(processed_dir_path, f"{scan_mode}.nc") - try: - ds = open_granule(granule_path, scan_mode) - ds.to_netcdf(processed_granule_filepath) - except Exception as e: - print(f"Failed to process {product_basename} with scan mode {scan_mode}: {e}") - - -open_and_save_processed_granules(products) diff --git a/gpm_api/tests/test_dataset/generate_test_granule_data1.py b/gpm_api/tests/test_dataset/generate_test_granule_data1.py deleted file mode 100644 index 282983f0..00000000 --- a/gpm_api/tests/test_dataset/generate_test_granule_data1.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Created on Fri Oct 20 13:04:39 2023 - -@author: ghiggi -""" -import os -import gpm_api -import shutil - -# from tqdm import tqdm -from gpm_api.io.products import ( - available_products, - available_scan_modes, -) -from gpm_api.io.pps import find_first_pps_granule_filepath -from gpm_api.io.download import _download_files -from gpm_api.tests.utils.hdf5 import create_test_hdf5 -from gpm_api import _root_path - -LOCAL_DIR_PATH = "/home/ghiggi/GPM_TEST_DATA" -RAW_DIRNAME = "raw" -CUT_DIRNAME = "cut" -PROCESSED_DIRNAME = "processed" - -VERSIONS = [7, 6, 5] -PRODUCT_TYPES = ["RS"] - -FORCE_DOWNLOAD = False -FORCE_CUT = False -FORCE_PROCESSED = True - - -gpm_api.config.set( - { - "warn_non_contiguous_scans": False, - "warn_non_regular_timesteps": False, - "warn_invalid_spatial_coordinates": False, - } -) - -# Debug -VERSIONS = [7, 6] - - -## Test directory structure -# .../gpm_api/tests/data/granules/ -# ////) -# ////.nc) - -############################################################################### -#### Create the test granules directory -local_granules_dir_path = os.path.join(LOCAL_DIR_PATH) -os.makedirs(local_granules_dir_path, exist_ok=True) - -#### Prepare the test data -for product_type in PRODUCT_TYPES: - for version in VERSIONS: - version_str = "V" + str(version) - for product in available_products(product_type="RS", version=version): - product_info = f"{product_type} {product} {version_str} product" - - # Retrieve a PPS filepath - try: - pps_filepath = find_first_pps_granule_filepath( - product=product, product_type=product_type, version=version - ) - except Exception as e: - print(e) - continue - - # Retrieve filename - filename = os.path.basename(pps_filepath) - # Define product dir - product_pattern = os.path.join(product_type, version_str, product) - # Define RAW filepath - raw_dir = os.path.join(local_granules_dir_path, RAW_DIRNAME, product_pattern) - raw_filepath = os.path.join(raw_dir, filename) - - # Download file - if FORCE_DOWNLOAD or not os.path.exists(raw_filepath): - print(f"Download {product_info}") - # Download raw data - _ = _download_files( - src_fpaths=[pps_filepath], - dst_fpaths=[raw_filepath], - storage="pps", - transfer_tool="wget", - verbose=True, - ) - - # Cut the granule - print(f"Cutting {product_info}") - cut_dir_path = os.path.join(local_granules_dir_path, CUT_DIRNAME, product_pattern) - cut_filepath = os.path.join(cut_dir_path, filename) - os.makedirs(cut_dir_path, exist_ok=True) - if FORCE_CUT or not os.path.exists(cut_filepath): - try: - create_test_hdf5(raw_filepath, cut_filepath) - except Exception as e: - print(f"Failed to cut {product_info}: {e}") - continue - - # Create the processed netCDF - print(f"Create {product_info} netCDF") - scan_modes = available_scan_modes(product=product, version=version) - processed_dir_path = os.path.join( - local_granules_dir_path, PROCESSED_DIRNAME, product_pattern - ) - os.makedirs(processed_dir_path, exist_ok=True) - - for scan_mode in scan_modes: - processed_filepath = os.path.join(processed_dir_path, f"{scan_mode}.nc") - if FORCE_PROCESSED or not os.path.exists(processed_filepath): - if os.path.exists(processed_filepath): - os.remove(processed_filepath) - try: - ds = gpm_api.open_granule(cut_filepath, scan_mode=scan_mode) - ds.to_netcdf(processed_filepath) - except Exception as e: - print(f"Failed to process {product_info} with scan mode {scan_mode}: {e}") - - -####---------------------------------------------------------------------------. -#### Move data from LOCAL directory to REPO directory -repo_granules_dir_path = os.path.join(_root_path, "gpm_api", "tests", "data", "granules") -repo_granules_dir_path = os.path.join("/home/ghiggi/GPM_TEST_DATA_DEMO") -os.makedirs(repo_granules_dir_path, exist_ok=True) - -# Move CUT directory -local_cut_dir = os.path.join(local_granules_dir_path, CUT_DIRNAME) -repo_cut_dir = os.path.join(repo_granules_dir_path, CUT_DIRNAME) -shutil.copytree(local_cut_dir, repo_cut_dir) - -# Move PROCESSED directory -local_cut_dir = os.path.join(local_granules_dir_path, PROCESSED_DIRNAME) -repo_cut_dir = os.path.join(repo_granules_dir_path, PROCESSED_DIRNAME) -shutil.copytree(local_cut_dir, repo_cut_dir) diff --git a/gpm_api/tests/test_dataset/test_granule.py b/gpm_api/tests/test_dataset/test_granule.py index a42a61dd..bd187a5b 100644 --- a/gpm_api/tests/test_dataset/test_granule.py +++ b/gpm_api/tests/test_dataset/test_granule.py @@ -86,12 +86,6 @@ def test_open_granule(monkeypatch): granule, "_get_relevant_groups_variables", lambda *args, **kwargs: ([""], []) ) - def patch_ensure_time_validity(ds, *args, **kwargs): - ds.attrs["time_validated"] = True - return ds - - monkeypatch.setattr(granule, "ensure_time_validity", patch_ensure_time_validity) - def patch_finalize_dataset(ds, *args, **kwargs): ds.attrs["finalized"] = True return ds @@ -102,7 +96,7 @@ def patch_finalize_dataset(ds, *args, **kwargs): monkeypatch.setattr(datatree, "open_datatree", lambda *args, **kwargs: dt) returned_dataset = granule.open_granule(filepath) - expected_attribute_keys = ["attribute", "ScanMode", "time_validated", "finalized"] + expected_attribute_keys = ["attribute", "ScanMode", "finalized"] expected_coordinate_keys = ["coord"] assert isinstance(returned_dataset, xr.Dataset) assert list(returned_dataset.attrs) == expected_attribute_keys @@ -297,31 +291,94 @@ def construct_dataset_and_check_validation(input_datetimes, expected_datetimes): construct_dataset_and_check_validation(datetimes_incomplete, datetimes) -def test_finalize_dataset(monkeypatch): +def _prepare_test_finalize_dataset(monkeypatch): + # Mock decoding coordinates + def mock_set_coordinates(ds, *args, **kwargs): + ds = ds.assign_coords({"decoding_coordinates": True}) + return ds + + monkeypatch.setattr(conventions, "set_coordinates", mock_set_coordinates) + + # Return a default dataset + da = xr.DataArray(np.random.rand(1, 1), dims=("other", "along_track")) + time = [0] + ds = xr.Dataset({"var": da, "time": time}) + return ds + + +def test_finalize_dataset_crs(monkeypatch): """Test finalize_dataset""" - # TODO: update for scan_mode argument - # Check reshaping + product = "product" + scan_mode = "scan_mode" + time = [0] + da = xr.DataArray(np.random.rand(1, 1), dims=("other", "along_track")) + _prepare_test_finalize_dataset(monkeypatch) + + # Check decoding coordinates + ds = xr.Dataset({"var": da, "time": time}) + ds = finalize_dataset(ds, product=product, scan_mode=scan_mode, decode_cf=False) + assert ds.coords["decoding_coordinates"].values + + # Check CF decoding + original_decode_cf = xr.decode_cf + + def mock_decode_cf(ds, *args, **kwargs): + ds.attrs["decoded"] = True + return original_decode_cf(ds, *args, **kwargs) + + monkeypatch.setattr(xr, "decode_cf", mock_decode_cf) + + ds = xr.Dataset({"var": da, "time": time}) + ds = finalize_dataset(ds, product=product, scan_mode=scan_mode, decode_cf=True) + assert ds.attrs["decoded"] + + # Check CRS information + def mock_set_dataset_crs(ds, *args, **kwargs): + ds.attrs["crs"] = True + return ds + + monkeypatch.setattr(conventions, "set_dataset_crs", mock_set_dataset_crs) + + ds = xr.Dataset({"var": da, "time": time}) + ds = finalize_dataset(ds, product=product, scan_mode=scan_mode, decode_cf=False) + assert ds.attrs["crs"] + + +def test_finalize_dataset_reshaping(monkeypatch): + """Test reshaping in finalize_dataset""" + + product = "product" + scan_mode = "scan_mode" + time = [0] + _prepare_test_finalize_dataset(monkeypatch) + da = xr.DataArray(np.random.rand(1, 1, 1), dims=("lat", "lon", "other")) expected_dims = ("other", "lat", "lon") - time = [0] ds = xr.Dataset({"var": da, "time": time}) - ds = finalize_dataset(ds, "product", decode_cf=False) + ds = finalize_dataset(ds, product=product, scan_mode=scan_mode, decode_cf=False) assert ds["var"].dims == expected_dims da = xr.DataArray(np.random.rand(1, 1, 1), dims=("other", "cross_track", "along_track")) expected_dims = ("cross_track", "along_track", "other") ds = xr.Dataset({"var": da, "time": time}) - ds = finalize_dataset(ds, "product", decode_cf=False) + ds = finalize_dataset(ds, product=product, scan_mode=scan_mode, decode_cf=False) assert ds["var"].dims == expected_dims da = xr.DataArray(np.random.rand(1, 1), dims=("other", "along_track")) expected_dims = ("along_track", "other") ds = xr.Dataset({"var": da, "time": time}) - ds = finalize_dataset(ds, "product", decode_cf=False) + ds = finalize_dataset(ds, product=product, scan_mode=scan_mode, decode_cf=False) assert ds["var"].dims == expected_dims - # Check time subsetting + +def test_finalize_dataset_time_subsetting(monkeypatch): + """Test time subsetting in finalize_dataset""" + + product = "product" + scan_mode = "scan_mode" + ds = _prepare_test_finalize_dataset(monkeypatch) + def mock_subset_by_time(ds, start_time, end_time): ds.attrs["start_time"] = start_time ds.attrs["end_time"] = end_time @@ -329,27 +386,42 @@ def mock_subset_by_time(ds, start_time, end_time): monkeypatch.setattr(conventions, "subset_by_time", mock_subset_by_time) - ds = xr.Dataset({"var": da, "time": time}) start_time = datetime.fromtimestamp(np.random.randint(0, MAX_TIMESTAMP)) end_time = datetime.fromtimestamp(np.random.randint(0, MAX_TIMESTAMP)) - ds = finalize_dataset(ds, "product", decode_cf=False, start_time=start_time, end_time=end_time) + ds = finalize_dataset( + ds, + product=product, + scan_mode=scan_mode, + decode_cf=False, + start_time=start_time, + end_time=end_time, + ) assert ds.attrs["start_time"] == start_time assert ds.attrs["end_time"] == end_time - # Check CF decoding - original_decode_cf = xr.decode_cf - def mock_decode_cf(ds, *args, **kwargs): - ds.attrs["decoded"] = True - return original_decode_cf(ds, *args, **kwargs) +def test_finalize_dataset_time_encoding(monkeypatch): + """Test time encoding int finalize_dataset""" - monkeypatch.setattr(xr, "decode_cf", mock_decode_cf) + product = "product" + scan_mode = "scan_mode" + ds = _prepare_test_finalize_dataset(monkeypatch) + + ds = finalize_dataset(ds, product=product, scan_mode=scan_mode, decode_cf=False) + expected_time_encoding = { + "units": "seconds since 1970-01-01 00:00:00", + "calendar": "proleptic_gregorian", + } + assert ds["time"].encoding == expected_time_encoding - ds = xr.Dataset({"var": da, "time": time}) - ds = finalize_dataset(ds, "product", decode_cf=True) - assert ds.attrs["decoded"] - # Check addition of attributes +def test_finalize_dataset_attrs(monkeypatch): + """Test addition of attributes in finalize_dataset""" + + product = "product" + scan_mode = "scan_mode" + ds = _prepare_test_finalize_dataset(monkeypatch) + def mock_set_coords_attrs(ds, *args, **kwargs): ds.attrs["coords_attrs"] = True return ds @@ -362,29 +434,7 @@ def mock_add_history(ds, *args, **kwargs): monkeypatch.setattr(conventions, "add_history", mock_add_history) - ds = xr.Dataset({"var": da, "time": time}) - ds = finalize_dataset(ds, "product", decode_cf=False) + ds = finalize_dataset(ds, product=product, scan_mode=scan_mode, decode_cf=False) assert ds.attrs["coords_attrs"] assert ds.attrs["history"] - assert ds.attrs["gpm_api_product"] == "product" - - # Check time encoding - ds = xr.Dataset({"var": da, "time": time}) - ds = finalize_dataset(ds, "product", decode_cf=False) - expected_time_encoding = { - "units": "seconds since 1970-01-01 00:00:00", - "calendar": "proleptic_gregorian", - } - assert ds.attrs["encoding"] - assert ds["time"].encoding == expected_time_encoding - - # Check CRS information - def mock_set_dataset_crs(ds, *args, **kwargs): - ds.attrs["crs"] = True - return ds - - monkeypatch.setattr(conventions, "set_dataset_crs", mock_set_dataset_crs) - - ds = xr.Dataset({"var": da, "time": time}) - ds = finalize_dataset(ds, "product", decode_cf=False) - assert ds.attrs["crs"] + assert ds.attrs["gpm_api_product"] == product diff --git a/gpm_api/tests/test_dataset/test_granule_files.py b/gpm_api/tests/test_dataset/test_granule_files.py index e454aae5..a754f64e 100644 --- a/gpm_api/tests/test_dataset/test_granule_files.py +++ b/gpm_api/tests/test_dataset/test_granule_files.py @@ -26,7 +26,7 @@ ) -def test_open_granule_on_real_files(tmp_path): +def test_open_granule_on_real_files(): """Test open_granule on real files. Load cut granules and check that the new file is identical to the saved reference. @@ -38,17 +38,17 @@ def test_open_granule_on_real_files(tmp_path): ├── cut │ ├── V7/RS/1A-GMI │ │ └── 1A.GPM.GMI.COUNT2021.20140304-S223658-E000925.000082.V07A.HDF5 - ├── processed - │ ├── V7/RS/1A-GMI - │ ├── S1.nc - │ ├── S2.nc - │ ├── S4.nc - │ └── S5.nc - └── ... + │ └── ... + └── processed + ├── V7/RS/1A-GMI + │ ├── S1.nc + │ ├── S2.nc + │ ├── S4.nc + │ └── S5.nc + └── ... """ granules_dir_path = os.path.join(_root_path, "gpm_api", "tests", "data", "granules") - granules_dir_path = os.path.join("/home/ghiggi/GPM_TEST_DATA_DEMO") if not os.path.exists(granules_dir_path): pytest.skip("Test granules not found. Please run `python generate_test_granule_data.py`.") @@ -76,9 +76,15 @@ def test_open_granule_on_real_files(tmp_path): ds = open_granule(cut_filepath, scan_mode=scan_mode).compute() ds_expected = xr.open_dataset(processed_filepath).compute() - # Remove history attribute - _ = ds.attrs.pop("history", None) - _ = ds_expected.attrs.pop("history", None) + for _ds in [ds, ds_expected]: + # Remove history attribute + _ds.attrs.pop("history") + + # Remove attributes conflicting between python versions + if "crsWGS84" in _ds.coords: + _ds.coords["crsWGS84"].attrs.pop("crs_wkt") + _ds.coords["crsWGS84"].attrs.pop("horizontal_datum_name") + _ds.coords["crsWGS84"].attrs.pop("spatial_ref") # Check equality xr.testing.assert_identical(ds, ds_expected) diff --git a/gpm_api/tests/test_io/test_data_integrity.py b/gpm_api/tests/test_io/test_data_integrity.py index 6ae06017..4a8f4804 100644 --- a/gpm_api/tests/test_io/test_data_integrity.py +++ b/gpm_api/tests/test_io/test_data_integrity.py @@ -3,6 +3,7 @@ import os import posixpath as pxp import ntpath as ntp +import xarray as xr def test_get_corrupted_filepaths( @@ -20,7 +21,28 @@ def test_get_corrupted_filepaths( ), "Corrupted paths array should be the same length as input paths" assert abs_paths == res, "Corrupted paths array should be the same as input paths" - # TODO: Test an actual HDF5 for OSError and empty list (success) + +def test_get_corrupted_filepaths_real_files( + tmpdir: str, +) -> None: + filepath = os.path.join(tmpdir, "test.h5") + + # Create hdf5 file + array = xr.DataArray([]) + array.to_netcdf(filepath) + + # Test that no paths are "corrupted" + res = di.get_corrupted_filepaths([filepath]) + assert len(res) == 0, "Corrupted paths array should be empty" + + # Corrupt file by truncating it + with open(filepath, "r+") as f: + file_size = os.path.getsize(filepath) + f.truncate(round(file_size / 2)) + + # Test that all paths are "corrupted" + res = di.get_corrupted_filepaths([filepath]) + assert len(res) == 1, "Corrupted paths array should have one path" def test_remove_corrupted_filepaths( diff --git a/gpm_api/tests/test_io/test_download.py b/gpm_api/tests/test_io/test_download.py index cc2bba1a..5620994e 100644 --- a/gpm_api/tests/test_io/test_download.py +++ b/gpm_api/tests/test_io/test_download.py @@ -1,17 +1,16 @@ import pytest import os import datetime -import ftplib from typing import Any, List, Dict from pytest_mock.plugin import MockerFixture from gpm_api.io import find from gpm_api.io import download as dl -from gpm_api.io.products import available_products +from gpm_api.io.products import available_products, get_product_start_time from gpm_api.utils.warnings import GPMDownloadWarning -from gpm_api import configs def test_construct_curl_pps_cmd( + mock_configuration: Dict[str, str], remote_filepaths: Dict[str, Dict[str, Any]], tmpdir: str, ) -> None: @@ -32,13 +31,14 @@ def test_construct_curl_pps_cmd( ), f"Folder {os.path.dirname(local_filepath)} already exists" curl_truth = ( - "curl --verbose --ipv4 --insecure " + "curl --ipv4 --insecure -n " "--user {username}:{password} --ftp-ssl " "--header 'Connection: close' --connect-timeout 20 " - "--retry 5 --retry-delay 10 -n {remote_filepath} -o {local_filepath}" + "--retry 5 --retry-delay 10 --url {remote_filepath} -o {local_filepath}" ) - username_pps, password_pps, gpm_base_dir = configs.read_gpm_api_configs().values() + username_pps = mock_configuration["username_pps"] + password_pps = mock_configuration["password_pps"] for remote_filepath in remote_filepaths: path = dl.curl_pps_cmd( @@ -67,8 +67,7 @@ def test_construct_curl_pps_cmd( def test_construct_wget_pps_cmd( - # username: str, - # password: str, + mock_configuration: Dict[str, str], remote_filepaths: Dict[str, Dict[str, Any]], tmpdir: str, ) -> None: @@ -94,7 +93,8 @@ def test_construct_wget_pps_cmd( "--tries=5 -O {local_filepath} {remote_filepath}" ) - username_pps, password_pps, gpm_base_dir = configs.read_gpm_api_configs().values() + username_pps = mock_configuration["username_pps"] + password_pps = mock_configuration["password_pps"] for remote_filepath in remote_filepaths: path = dl.wget_pps_cmd( @@ -117,17 +117,18 @@ def test_construct_wget_pps_cmd( ), f"Folder {os.path.dirname(local_filepath)} was not created" +@pytest.mark.parametrize("storage", ["pps", "ges_disc"]) def test_download_file_private( remote_filepaths: Dict[str, Dict[str, Any]], tmpdir: str, mocker: MockerFixture, + storage: str, ) -> None: """Build curl/wget calls for download, but don't actually download anything Uses tmpdir to create a unique path for each test and mocker to mock the download function """ - storage = "pps" # Don't actually download anything, so mock the run function mocker.patch.object(dl, "run", autospec=True, return_value=None) @@ -162,70 +163,102 @@ def test_download_file_private( ) -def test_download_data( - products: List[str], - product_types: List[str], - remote_filepaths: Dict[str, Dict[str, Any]], - mocker: MockerFixture, - versions: List[str], -): - """Test download_data function +class TestDownloadArchive: + @pytest.fixture(autouse=True) + def mock_download( + self, + mocker: MockerFixture, + remote_filepaths: Dict[str, Dict[str, Any]], + versions: List[str], + ) -> None: + mocker.patch.object(dl, "_download_files", autospec=True, return_value=[]) + mocker.patch.object(dl, "_download_daily_data", autospec=True, return_value=([], versions)) + mocker.patch.object(dl, "run", autospec=True, return_value=None) + from gpm_api.io import info, pps + + mocker.patch.object( + info, + "_get_info_from_filename", + autospec=True, + return_value={ + "product": "2A-CLIM", + "product_type": "CLIM", + "start_time": datetime.datetime(2022, 9, 7, 12, 0, 0), + "end_time": datetime.datetime(2022, 9, 7, 13, 0, 0), + "version": "V07A", + "satellite": "GPM", + "granule_id": "2A-CLIM.GPM.GMI.GPROF2021v1.20150301-S121433-E134706.005708.V07A.HDF5", + }, + ) + mocker.patch.object( + find, + "find_daily_filepaths", + autospec=True, + return_value=(remote_filepaths.keys(), versions), + ) - This test is somewhat redundant considering it is testing methods - bundled in another functions which need to be turned off in order to - test this function. However, it is useful to have a test that checks - the entire download process. + def test_download_data( + self, + check, # For non-failing asserts + products: List[str], + product_types: List[str], + ): + """Test download_data function - It may be useful as boilerplate to increase the number of tests here in the - future. - """ + This test is somewhat redundant considering it is testing methods + bundled in another functions which need to be turned off in order to + test this function. However, it is useful to have a test that checks + the entire download process. - mocker.patch.object(dl, "_download_files", autospec=True, return_value=[]) - mocker.patch.object(dl, "_download_daily_data", autospec=True, return_value=([], versions)) - mocker.patch.object(dl, "run", autospec=True, return_value=None) - from gpm_api.io import info, pps - - mocker.patch.object( - info, - "_get_info_from_filename", - autospec=True, - return_value={ - "product": "2A-CLIM", - "product_type": "CLIM", - "start_time": datetime.datetime(2022, 9, 7, 12, 0, 0), - "end_time": datetime.datetime(2022, 9, 7, 13, 0, 0), - "version": "V07A", - "satellite": "GPM", - "granule_id": "2A-CLIM.GPM.GMI.GPROF2021v1.20150301-S121433-E134706.005708.V07A.HDF5", - }, - ) - mocker.patch.object( - find, - "find_daily_filepaths", - autospec=True, - return_value=(remote_filepaths.keys(), versions), - ) + It may be useful as boilerplate to increase the number of tests here in the + future. + """ - # Assume files pass file integrity check by mocking return as empty - for product in products: + # Assume files pass file integrity check by mocking return as empty for product_type in product_types: - if product in available_products(product_type=product_type): + for product in available_products(product_type=product_type): + start_time = get_product_start_time(product) + if start_time is None: + continue res = dl.download_archive( product=product, - start_time=datetime.datetime(2022, 9, 7, 12, 0, 0), - end_time=datetime.datetime(2022, 9, 8, 12, 0, 0), + start_time=start_time, + end_time=start_time + datetime.timedelta(hours=1), product_type=product_type, ) - assert res is None # Assume data is downloaded + with check: + assert res is None # Assume data is downloaded + + def test_corrupted_archive( + self, + mocker: MockerFixture, + ) -> None: + product = "1A-GMI" + start_time = get_product_start_time(product) + end_time = start_time + datetime.timedelta(hours=1) + + # Mock download status as failed + mocker.patch.object(dl, "_check_download_status", autospec=True, return_value=False) + # Mock file integrity check as passed + mocker.patch.object(dl, "check_archive_integrity", autospec=True, return_value=[]) + dl.download_archive( + product=product, + start_time=start_time, + end_time=end_time, + ) + + +@pytest.mark.parametrize("storage", ["pps", "ges_disc"]) def test_download_daily_data_private( tmpdir: str, versions: List[str], products: List[str], product_types: List[str], mocker: MockerFixture, + storage: str, ) -> None: """Test download_daily_data function @@ -245,7 +278,7 @@ def test_download_daily_data_private( for product_type in product_types: for product in available_products(product_type=product_type): dl._download_daily_data( - storage="pps", + storage=storage, date=datetime.datetime(2022, 9, 7, 12, 0, 0), version=version, product=product, @@ -291,6 +324,10 @@ def test_download_files( for obj in [(), {}, 1, 1.0, "str", True]: dl.download_files(filepaths=obj) + # Test data already on disk (force_download=False) + mocker.patch.object(dl, "filter_download_list", autospec=True, return_value=([], [])) + assert dl.download_files(filepaths=list(remote_filepaths.keys())) == None + def test_check_download_status( products: List[str], @@ -305,35 +342,54 @@ def test_check_download_status( assert dl._check_download_status([1, 0, 1], product, True) is True # Some failed download -def test_get_fpaths_from_fnames( - remote_filepaths: Dict[str, Dict[str, Any]], - versions: List[str], - products: List[str], - product_types: List[str], - tmpdir: str, -) -> None: - """Test convert_pps_to_local_filepaths function - - Parameters - """ - # TODO: WRONG REDO ! - assert dl.get_fpaths_from_fnames( - filepaths=[ - "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/2A.GPM.DPR.V9-20211125.20200705-S170044-E183317.036092.V07A.HDF5" - ], - storage="local", - product_type="RS", - ) == [ - os.path.join( - tmpdir, - "GPM", - "RS", - "V07", - "RADAR", - "2A-DPR", - "2020", - "07", - "05", - "2A.GPM.DPR.V9-20211125.20200705-S170044-E183317.036092.V07A.HDF5", - ) - ] +class TestGetFpathsFromFnames: + """Test get_fpaths_from_fnames function""" + + filename = "2A.GPM.DPR.V9-20211125.20200705-S170044-E183317.036092.V07A.HDF5" + + def test_local( + self, + mock_configuration: Dict[str, str], + ) -> None: + assert dl.get_fpaths_from_fnames( + filepaths=[self.filename], + storage="local", + product_type="RS", + ) == [ + os.path.join( + mock_configuration["gpm_base_dir"], + "GPM", + "RS", + "V07", + "RADAR", + "2A-DPR", + "2020", + "07", + "05", + self.filename, + ) + ] + + def test_pps(self) -> None: + assert dl.get_fpaths_from_fnames( + filepaths=[self.filename], + storage="pps", + product_type="RS", + ) == [f"ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/{self.filename}"] + + def test_ges_disc(self) -> None: + assert dl.get_fpaths_from_fnames( + filepaths=[self.filename], + storage="ges_disc", + product_type="RS", + ) == [ + f"https://gpm2.gesdisc.eosdis.nasa.gov/data/GPM_L2/GPM_2ADPR.07/2020/187/{self.filename}" + ] + + def test_invalid_filename(self) -> None: + with pytest.raises(ValueError): + dl.get_fpaths_from_fnames( + filepaths=["invalid_filename"], + storage="local", + product_type="RS", + ) diff --git a/gpm_api/tests/test_io/test_filter.py b/gpm_api/tests/test_io/test_filter.py index 421405d2..ad83f08e 100644 --- a/gpm_api/tests/test_io/test_filter.py +++ b/gpm_api/tests/test_io/test_filter.py @@ -1,8 +1,6 @@ from gpm_api.io import filter from typing import Dict, Any, List import datetime -import pytest -from pytest_mock.plugin import MockerFixture def test_granule_within_time() -> None: @@ -50,85 +48,126 @@ def test_granule_within_time() -> None: ) -def test_filter_filepaths( - remote_filepaths: Dict[str, Dict[str, Any]], - products: Dict[str, Dict[str, Any]], - mocker: MockerFixture, -) -> None: +class TestFilterFilepaths: """Test filter filepaths""" - # Test year filtering - # Count and assert 2019 paths - count_2019 = 0 - for remote_filepath, info_dict in remote_filepaths.items(): - if ( - info_dict["year"] == 2019 - and info_dict["product"] == "2A-DPR" - and info_dict["version"] == 7 - ): - count_2019 += 1 + product = "2A-DPR" - res = filter.filter_filepaths( - filepaths=list(remote_filepaths.keys()), - product="2A-DPR", - start_time=datetime.datetime(2019, 1, 1), - end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), - version=7, - ) + def test_year_filtering( + self, + remote_filepaths: Dict[str, Dict[str, Any]], + ) -> None: + # Count and assert 2019 paths + count_2019 = 0 + for remote_filepath, info_dict in remote_filepaths.items(): + if ( + info_dict["year"] == 2019 + and info_dict["product"] == self.product + and info_dict["version"] == 7 + ): + count_2019 += 1 + + res = filter.filter_filepaths( + filepaths=list(remote_filepaths.keys()), + product=self.product, + start_time=datetime.datetime(2019, 1, 1), + end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), + version=7, + ) - assert len(res) == count_2019 + assert len(res) == count_2019 - # Test None filepaths - res = filter.filter_filepaths( - filepaths=None, - product="2A-DPR", - start_time=datetime.datetime(2019, 1, 1), - end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), - version=7, - ) - assert res == [] + def test_none_filepath( + self, + remote_filepaths: Dict[str, Dict[str, Any]], + ) -> None: + res = filter.filter_filepaths( + filepaths=None, + product=self.product, + start_time=datetime.datetime(2019, 1, 1), + end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), + version=7, + ) + assert res == [] - # Test empty filepath list - res = filter.filter_filepaths( - filepaths=[], - product="2A-DPR", - start_time=datetime.datetime(2019, 1, 1), - end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), - version=7, - ) - assert res == [] + def test_empty_filepath_list( + self, + remote_filepaths: Dict[str, Dict[str, Any]], + ) -> None: + res = filter.filter_filepaths( + filepaths=[], + product=self.product, + start_time=datetime.datetime(2019, 1, 1), + end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), + version=7, + ) + assert res == [] - # Test empty start time - count_until_2019 = 0 - for remote_filepath, info_dict in remote_filepaths.items(): - if info_dict["year"] == 2019: - count_until_2019 += 1 - res = filter.filter_filepaths( - filepaths=list(remote_filepaths.keys()), - product="2A-DPR", - start_time=None, - end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), - version=7, - ) + def test_empty_start_time( + self, + remote_filepaths: Dict[str, Dict[str, Any]], + ) -> None: + count_until_2019 = 0 + for remote_filepath, info_dict in remote_filepaths.items(): + if info_dict["year"] == 2019 and info_dict["product"] == self.product: + count_until_2019 += 1 + res = filter.filter_filepaths( + filepaths=list(remote_filepaths.keys()), + product=self.product, + start_time=None, + end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), + version=7, + ) - assert len(res) == count_until_2019 + assert len(res) == count_until_2019 - # Test empty end time (Error as time given (datetime.datetime.now()) - # requires date to be less than now() in supportive - # function checks.check_start_end_time) - count_from_2019 = 0 - for remote_filepath, info_dict in remote_filepaths.items(): - if info_dict["year"] >= 2019: - count_from_2019 += 1 + def test_empty_end_time( + self, + remote_filepaths: Dict[str, Dict[str, Any]], + ) -> None: + """Test empty end time (Error as time given (datetime.datetime.now()) + requires date to be less than now() in supportive + function checks.check_start_end_time)""" - res = filter.filter_filepaths( - filepaths=list(remote_filepaths.keys()), - product="2A-DPR", - start_time=datetime.datetime(2019, 1, 1), - end_time=None, - version=7, - ) - assert len(res) == count_from_2019 + count_from_2019 = 0 + for remote_filepath, info_dict in remote_filepaths.items(): + if info_dict["year"] >= 2019 and info_dict["product"] == self.product: + count_from_2019 += 1 + + res = filter.filter_filepaths( + filepaths=list(remote_filepaths.keys()), + product=self.product, + start_time=datetime.datetime(2019, 1, 1), + end_time=None, + version=7, + ) + assert len(res) == count_from_2019 + + def test_unmatched_version( + self, + remote_filepaths: Dict[str, Dict[str, Any]], + ) -> None: + res = filter.filter_filepaths( + filepaths=list(remote_filepaths.keys()), + product=self.product, + start_time=datetime.datetime(2019, 1, 1), + end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), + version=0, + ) + assert res == [] + + def test_unmatched_product( + self, + remote_filepaths: Dict[str, Dict[str, Any]], + ) -> None: + res = filter.filter_filepaths( + filepaths=list(remote_filepaths.keys()), + product="1A-GMI", + start_time=datetime.datetime(2019, 1, 1), + end_time=datetime.datetime(2019, 12, 31, 23, 59, 59), + version=7, + ) + assert res == [] def test_filter_by_time( @@ -195,6 +234,20 @@ def test_filter_by_time( end_time=None, ) + # Test granule starting on previous day + count_previous_day = 0 + for remote_filepath, info_dict in remote_filepaths.items(): + if info_dict["start_time"].day != info_dict["end_time"].day: + count_previous_day += 1 + + res = filter.filter_by_time( + filepaths=list(remote_filepaths.keys()), + start_time=datetime.datetime(2020, 7, 6, 0, 0, 20), + end_time=datetime.datetime(2020, 7, 6, 0, 0, 30), + ) + + assert len(res) == count_previous_day + def test_filter_by_product( remote_filepaths: Dict[str, Dict[str, Any]], @@ -213,12 +266,12 @@ def test_filter_by_product( assert products_2A_DPR > 0, "The test remote_filepaths fixture does not contain expected value" - filter.filter_by_product( + filtered_filepaths = filter.filter_by_product( filepaths=list(remote_filepaths.keys()), product="2A-DPR", ) - assert len(remote_filepaths) == products_2A_DPR + assert len(filtered_filepaths) == products_2A_DPR # Test None filepath assert ( diff --git a/gpm_api/tests/test_io/test_find.py b/gpm_api/tests/test_io/test_find.py new file mode 100644 index 00000000..12991ea8 --- /dev/null +++ b/gpm_api/tests/test_io/test_find.py @@ -0,0 +1,464 @@ +from datetime import datetime +import os +from typing import Any, Dict, List, Tuple + +import pytest +from pytest_mock.plugin import MockerFixture + +from gpm_api.io import find +from gpm_api.io.products import available_products +from gpm_api.utils.warnings import GPMDownloadWarning + + +class TestGetDailyFilepaths: + """Test _get_all_daily_filepaths""" + + date = datetime(2020, 12, 31) + mock_filenames = [ + "file1.HDF5", + "file2.HDF5", + ] + + def test_local_non_existent_files( + self, + ) -> None: + """Test _get_all_daily_filepaths for "local" storage with non-existent files""" + + storage = "local" + + returned_filepaths = find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product="1C-GMI", + product_type="RS", + version=7, + verbose=True, + ) + assert returned_filepaths == [] + + def test_local_existing_files( + self, + check, # For non-failing asserts + mock_configuration: Dict[str, str], + mocker: MockerFixture, + product_info: Dict[str, dict], + ) -> None: + """Test _get_all_daily_filepaths for "local" storage with existing (mocked) files""" + + storage = "local" + + # Mock os.listdir to return a list of filenames + mocker.patch("gpm_api.io.local.os.listdir", return_value=self.mock_filenames) + mocker.patch("gpm_api.io.local.os.path.exists", return_value=True) + + # Test with existing files (mocked) + for product_type in ["RS", "NRT"]: + for product in available_products(product_type=product_type): + info = product_info[product] + version = info["available_versions"][-1] + product_category = info["product_category"] + + returned_filepaths = find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product=product, + product_type=product_type, + version=version, + verbose=True, + ) + + expected_filepath_elements = [ + mock_configuration["gpm_base_dir"], + "GPM", + product_type, + ] + + if product_type == "RS": + expected_filepath_elements.append(f"V0{version}") + + expected_filepath_elements.extend( + [ + product_category, + product, + self.date.strftime("%Y"), + self.date.strftime("%m"), + self.date.strftime("%d"), + ] + ) + + expected_filepaths = [ + os.path.join(*expected_filepath_elements, filename) + for filename in self.mock_filenames + ] + + with check: + assert returned_filepaths == expected_filepaths + + @pytest.fixture + def mock_get_pps_file_list( + self, + mocker: MockerFixture, + ) -> None: + """Mock gpm_api.io.pps.__get_pps_file_list, which uses curl to get a list of files""" + + def mocked_get_pps_file_list(url_product_dir: str) -> List[str]: + # Remove the base URL, assuming they have the following format: + # RS: https://arthurhouhttps.pps.eosdis.nasa.gov/text/... + # NRT: https://jsimpsonhttps.pps.eosdis.nasa.gov/text/... + url_without_base = url_product_dir.split("/text")[1] + return [f"{url_without_base}/{filename}" for filename in self.mock_filenames] + + mocker.patch("gpm_api.io.pps.__get_pps_file_list", side_effect=mocked_get_pps_file_list) + + def test_pps_rs_version_7( + self, + check, # For non-failing asserts + mock_get_pps_file_list: None, + product_info: Dict[str, dict], + ) -> None: + """Test _get_all_daily_filepaths for "pps" storage with RS version 7 products""" + + storage = "pps" + product_type = "RS" + version = 7 + + for product in available_products(product_type=product_type, version=version): + info = product_info[product] + pps_dir = info["pps_rs_dir"] + + returned_filepaths = find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product=product, + product_type=product_type, + version=version, + verbose=True, + ) + base_url = f"ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/{self.date.strftime('%Y/%m/%d')}/{pps_dir}/" + expected_filepaths = [f"{base_url}{filename}" for filename in self.mock_filenames] + with check: + assert returned_filepaths == expected_filepaths + + def test_pps_rs_lower_version( + self, + check, # For non-failing asserts + mock_get_pps_file_list: None, + product_info: Dict[str, dict], + ) -> None: + """Test _get_all_daily_filepaths for "pps" storage with RS lower version products""" + + storage = "pps" + product_type = "RS" + + for product in available_products(product_type=product_type): + info = product_info[product] + pps_dir = info["pps_rs_dir"] + + for version in info["available_versions"]: + if version == 7: + continue + + returned_filepaths = find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product=product, + product_type=product_type, + version=version, + verbose=True, + ) + base_url = f"ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmallversions/V0{version}/{self.date.strftime('%Y/%m/%d')}/{pps_dir}/" + expected_filepaths = [f"{base_url}{filename}" for filename in self.mock_filenames] + with check: + assert returned_filepaths == expected_filepaths + + def test_pps_nrt( + self, + check, # For non-failing asserts + mock_get_pps_file_list: None, + product_info: Dict[str, dict], + ) -> None: + """Test _get_all_daily_filepaths for "pps" storage with NRT products (except IMERG)""" + + storage = "pps" + product_type = "NRT" + + for product in available_products(product_type=product_type): + info = product_info[product] + if info["product_category"] == "IMERG": + continue + + version = info["available_versions"][-1] + pps_dir = info["pps_nrt_dir"] + + returned_filepaths = find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product=product, + product_type=product_type, + version=version, + verbose=True, + ) + base_url = f"ftps://jsimpsonftps.pps.eosdis.nasa.gov/data/{pps_dir}/" + expected_filepaths = [f"{base_url}{filename}" for filename in self.mock_filenames] + with check: + assert returned_filepaths == expected_filepaths + + def test_pps_nrt_imerg( + self, + check, # For non-failing asserts + mock_get_pps_file_list: None, + product_info: Dict[str, dict], + ) -> None: + """Test _get_all_daily_filepaths for "pps" storage with NRT IMERG products""" + + storage = "pps" + product_type = "NRT" + product_category = "IMERG" + + for product in available_products( + product_type=product_type, product_category=product_category + ): + info = product_info[product] + version = info["available_versions"][-1] + pps_dir = info["pps_nrt_dir"] + + returned_filepaths = find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product=product, + product_type=product_type, + version=version, + verbose=True, + ) + base_url = f"ftps://jsimpsonftps.pps.eosdis.nasa.gov/data/{pps_dir}/{self.date.strftime('%Y%m')}/" + expected_filepaths = [f"{base_url}{filename}" for filename in self.mock_filenames] + with check: + assert returned_filepaths == expected_filepaths + + def test_pps_missing_pps_product_dir( + self, + product_info: Dict[str, dict], + mocker: MockerFixture, + ) -> None: + storage = "pps" + product = "1A-GMI" + version = 7 + info = product_info[product] + + # Mock missing dirs + del info["pps_rs_dir"] + del info["pps_nrt_dir"] + mocker.patch("gpm_api.io.products.get_product_info", return_value=info) + + for product_type in ["RS", "NRT"]: + with pytest.raises(ValueError): + find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product=product, + product_type=product_type, + version=version, + verbose=True, + ) + + @pytest.fixture + def mock_get_ges_disc_list_path( + self, + mocker: MockerFixture, + ) -> None: + """Mock gpm_api.io.ges_disc._get_gesc_disc_list_path, which uses wget to get a list of files""" + + def mocked_get_ges_disc_list_path(url: str) -> List[str]: + return [f"{url}/{filename}" for filename in self.mock_filenames] + + mocker.patch( + "gpm_api.io.ges_disc._get_ges_disc_list_path", side_effect=mocked_get_ges_disc_list_path + ) + + def test_ges_disc( + self, + check, # For non-failing asserts + mock_get_ges_disc_list_path: None, + product_info: Dict[str, dict], + ) -> None: + """Test _get_all_daily_filepaths for "ges_disc" storage""" + + storage = "ges_disc" + version = 7 + + for product, info in product_info.items(): + version = info["available_versions"][-1] + ges_disc_dir = info["ges_disc_dir"] + if ges_disc_dir is None: + continue + + returned_filepaths = find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product=product, + product_type=None, + version=version, + verbose=True, + ) + + if "TRMM" in ges_disc_dir: + subdomain = "disc2" + else: + subdomain = "gpm2" + + base_url = f"https://{subdomain}.gesdisc.eosdis.nasa.gov/data/{ges_disc_dir}.0{version}/{self.date.strftime('%Y/%j')}" + expected_filepaths = [f"{base_url}/{filename}" for filename in self.mock_filenames] + with check: + assert returned_filepaths == expected_filepaths + + def test_invalid_storage(self) -> None: + """Test _get_all_daily_filepaths for invalid "storage" argument""" + + storage = "invalid" + product = "1C-GMI" + product_type = "RS" + version = 7 + + with pytest.raises(ValueError): + find._get_all_daily_filepaths( + storage=storage, + date=self.date, + product=product, + product_type=product_type, + version=version, + verbose=True, + ) + + +@pytest.mark.filterwarnings("error") +def test_check_correct_version( + remote_filepaths: Dict[str, Dict[str, Any]], +) -> None: + """Test _check_correct_version""" + + product = "2A-DPR" + version = 7 + filepath_template = "2A.GPM.DPR.V9-20211125.20200705-S170044-E183317.036092.V0{}A.HDF5" + + # Test correct version + files_version = [7] * 3 + filepaths = [filepath_template.format(v) for v in files_version] + returned_filepaths, returned_version = find._check_correct_version( + filepaths=filepaths, product=product, version=version + ) + assert returned_filepaths == filepaths + assert returned_version == version + + # Test incorrect version + files_version = [6] * 3 + filepaths = [filepath_template.format(v) for v in files_version] + with pytest.raises(GPMDownloadWarning): + find._check_correct_version(filepaths=filepaths, product=product, version=version) + + # Test multiple versions + files_version = [6, 7, 7] + filepaths = [filepath_template.format(v) for v in files_version] + with pytest.raises(ValueError): + find._check_correct_version(filepaths=filepaths, product=product, version=version) + + # Test empty list + filepaths = [] + _, returned_version = find._check_correct_version( + filepaths=filepaths, product=product, version=version + ) + assert returned_version == version + + +def test_find_daily_filepaths( + mocker: MockerFixture, +) -> None: + """Test find_daily_filepaths""" + + storage = "storage" + date = datetime(2020, 12, 31) + product = "product" + product_type = "product-type" + version = 7 + start_time = datetime(2020, 12, 31, 1, 2, 3) + end_time = datetime(2020, 12, 31, 4, 5, 6) + + date_checked = datetime(1900, 1, 1) + mocker.patch.object(find, "check_date", autospec=True, return_value=date_checked) + + # Mock _get_all_daily_filepaths, already tested above + def mock_get_all_daily_filepaths(**kwargs: Any) -> List[str]: + base_filepath = "_".join([f"{key}:{value}" for key, value in kwargs.items()]) + return [f"{base_filepath}_{i}" for i in range(3)] + + patch_get_all_daily_filepaths = mocker.patch.object( + find, "_get_all_daily_filepaths", autospec=True, side_effect=mock_get_all_daily_filepaths + ) + + # Mock filter_filepaths, already tested in test_filter.py + def mock_filter_filepaths(filepaths, **kwargs: Any) -> List[str]: + suffix = "_".join([f"{key}-filtered:{value}" for key, value in kwargs.items()]) + return [f"{filepath}_{suffix}" for filepath in filepaths] + + patch_filter_filepaths = mocker.patch.object( + find, "filter_filepaths", autospec=True, side_effect=mock_filter_filepaths + ) + + # Mock _check_correct_version, already tested above + def mock_check_correct_version(filepaths, **kwargs: Any) -> Tuple[List[str], int]: + suffix = "_".join([f"{key}-version-checked:{value}" for key, value in kwargs.items()]) + return [f"{filepath}_{suffix}" for filepath in filepaths], version + + mocker.patch.object( + find, "_check_correct_version", autospec=True, side_effect=mock_check_correct_version + ) + + kwargs = { + "storage": storage, + "date": date, + "product": product, + "product_type": product_type, + "version": version, + "start_time": start_time, + "end_time": end_time, + "verbose": True, + } + + returned_filepaths, returned_versions = find.find_daily_filepaths(**kwargs) + + assert returned_versions[0] == version + returned_filepath = returned_filepaths[0] + + # Check date checked + assert str(date_checked) in returned_filepath + + # Check all _get_all_daily_filepaths kwargs passed + assert f"storage:{storage}" in returned_filepath + assert f"product:{product}" in returned_filepath + assert f"product_type:{product_type}" in returned_filepath + assert f"date:{date_checked}" in returned_filepath + assert f"version:{version}" in returned_filepath + assert f"verbose:True" in returned_filepath + + # Check all filter_filepaths kwargs passed + assert f"product-filtered:{product}" in returned_filepath + assert f"product_type-filtered:{product_type}" in returned_filepath + assert f"version-filtered:None" in returned_filepath + assert f"start_time-filtered:{start_time}" in returned_filepath + assert f"end_time-filtered:{end_time}" in returned_filepath + + # Check all _check_correct_version kwargs passed + assert f"product-version-checked:{product}" in returned_filepath + assert f"version-version-checked:{version}" in returned_filepath + + # Check empty filtered filepaths list + patch_filter_filepaths.side_effect = lambda filepaths, **kwargs: [] + returned_filepaths, returned_versions = find.find_daily_filepaths(**kwargs) + assert returned_filepaths == [] + assert returned_versions == [] + + # Check empty filepaths list + patch_get_all_daily_filepaths.side_effect = lambda **kwargs: [] + kwargs["storage"] = "local" + returned_filepaths, returned_versions = find.find_daily_filepaths(**kwargs) + assert returned_filepaths == [] + assert returned_versions == [] diff --git a/gpm_api/tests/test_io/test_info.py b/gpm_api/tests/test_io/test_info.py index 853a788f..70b7b08f 100644 --- a/gpm_api/tests/test_io/test_info.py +++ b/gpm_api/tests/test_io/test_info.py @@ -14,6 +14,11 @@ def test_get_start_time_from_filepaths( generated_start_time = info.get_start_time_from_filepaths(remote_filepath) assert [info_dict["start_time"]] == generated_start_time + # Also test all the filepaths at once + generated_start_time = info.get_start_time_from_filepaths(list(remote_filepaths.keys())) + expected_start_time = [info_dict["start_time"] for info_dict in remote_filepaths.values()] + assert expected_start_time == generated_start_time + def test_get_end_time_from_filepaths( remote_filepaths: Dict[str, Dict[str, Any]], @@ -38,3 +43,50 @@ def test_get_version_from_filepaths( generated_version = info.get_version_from_filepaths(remote_filepath) assert [info_dict["version"]] == generated_version + + +def test_get_granule_from_filepaths( + remote_filepaths: Dict[str, Dict[str, Any]], +) -> None: + """Test get_granule_from_filepaths""" + + for remote_filepath, info_dict in remote_filepaths.items(): + if info_dict["product_type"] == "NRT": + continue + + generated_granule = info.get_granule_from_filepaths(remote_filepath) + assert [info_dict["granule_id"]] == generated_granule + + +def test_get_start_end_time_from_filepaths( + remote_filepaths: Dict[str, Dict[str, Any]], +) -> None: + """Test get_start_end_time_from_filepaths""" + + for remote_filepath, info_dict in remote_filepaths.items(): + generated_start_time, generated_end_time = info.get_start_end_time_from_filepaths( + remote_filepath + ) + assert [info_dict["start_time"]] == generated_start_time + assert [info_dict["end_time"]] == generated_end_time + + +def test_invalid_filepaths(): + with pytest.raises(ValueError): + info.get_info_from_filepath("invalid_filepath") + + # Invalid JAXA product type + with pytest.raises(ValueError): + info.get_info_from_filepath( + "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/1B/GPMCOR_KAR_2007050002_0135_036081_1B😵_DAB_07A.h5" + ) + + # Unknown product (not in product_def.yml) + with pytest.raises(ValueError): + info.get_info_from_filepath( + "ftps://arthurhouftps.pps.eosdis.nasa.gov/gpmdata/2020/07/05/radar/😥.GPM.DPR.V9-20211125.20200705-S170044-E183317.036092.V07A.HDF5" + ) + + # Filepath not a string + with pytest.raises(TypeError): + info.get_info_from_filepath(123) diff --git a/gpm_api/tests/test_io/test_checks.py b/gpm_api/tests/test_io/test_io_checks.py similarity index 77% rename from gpm_api/tests/test_io/test_checks.py rename to gpm_api/tests/test_io/test_io_checks.py index 8b26a6fc..7dbdc018 100644 --- a/gpm_api/tests/test_io/test_checks.py +++ b/gpm_api/tests/test_io/test_io_checks.py @@ -15,39 +15,11 @@ import posixpath as ptp import pytz import pandas as pd -from typing import List +from typing import List, Dict, Any from gpm_api.io import checks from gpm_api.io.products import available_products, available_scan_modes, available_versions -def test_is_not_empty() -> None: - """Test is_not_empty() which always returns a boolean""" - - # Test False responses: - for obj in [None, (), {}, []]: - res = checks.is_not_empty(obj) - assert res is False, "Function returned True, expected False" - - # Test True responses: - for obj in [[1, 2, 3], (1, 2, 3), (1), [1]]: - res = checks.is_not_empty(obj) - assert res is True, "Function returned False, expected True" - - -def test_is_empty() -> None: - """Test is_empty()""" - - # Test False responses: - for obj in [[1, 2, 3], (1, 2, 3), (1), [1]]: - res = checks.is_empty(obj) - assert res is False, "Function returned True, expected False" - - # Test True responses: - for obj in [None, (), {}, []]: - res = checks.is_empty(obj) - assert res is True, "Function returned False, expected True" - - def test_check_base_dir() -> None: """Check path constructor for base_dir""" @@ -166,6 +138,48 @@ def test_check_groups() -> None: checks.check_groups(123) +def test_check_storage() -> None: + """Test check_storage()""" + + # Check valid storage + valid_storage = ["ges_disc", "pps", "local", "GES_DISC", "PPS", "LOCAL"] + expected_return = ["ges_disc", "pps", "local", "ges_disc", "pps", "local"] + + for storage, expected in zip(valid_storage, expected_return): + returned_storage = checks.check_storage(storage) + assert ( + returned_storage == expected + ), f"Function returned '{returned_storage}' for storage '{storage}', expected '{expected}'" + + # Check invalid storage + with pytest.raises(ValueError): + checks.check_storage("invalid_storage") + + with pytest.raises(TypeError): + checks.check_storage(123) + + +def test_check_remote_storage() -> None: + """Test check_remote_storage()""" + + # Check valid storage + valid_storage = ["ges_disc", "pps", "GES_DISC", "PPS"] + expected_return = ["ges_disc", "pps", "ges_disc", "pps"] + + for storage, expected in zip(valid_storage, expected_return): + returned_storage = checks.check_remote_storage(storage) + assert ( + returned_storage == expected + ), f"Function returned '{returned_storage}' for storage '{storage}', expected '{expected}'" + + # Check invalid storage + with pytest.raises(ValueError): + checks.check_remote_storage("invalid_storage") + + with pytest.raises(TypeError): + checks.check_remote_storage(123) + + def test_check_version( versions: List[int], ) -> None: @@ -176,7 +190,7 @@ def test_check_version( # Check if None, None is returned with pytest.raises(ValueError): - res = checks.check_version(None) + checks.check_version(None) # Check if string, exception is raised with pytest.raises(ValueError): @@ -188,8 +202,8 @@ def test_check_version( # Check available range should not raise exception for version in versions: - res = checks.check_version(version) - assert res is None, f"Function returned {res} for version {version}, expected None" + checks.check_version(version) + # Should run without raising Exception # Try versions outside of range for version in list(range(0, 3)) + list(range(8, 10)): @@ -197,6 +211,34 @@ def test_check_version( checks.check_version(version) +def test_check_product_version( + check, # For non-failing asserts + product_info: Dict[str, Any], + versions: List[int], +) -> None: + """Test check_product_version()""" + + for product, info in product_info.items(): + # Check valid versions + valid_versions = info.get("available_versions", []) + + for version in valid_versions: + with check: + assert checks.check_product_version(version, product) == version + + # Check last version return if None + last_version = info.get("available_versions", [])[-1] + with check: + assert checks.check_product_version(None, product) == last_version + + # Check invalid versions + invalid_versions = list(set(versions) - set(info.get("available_versions", []))) + + for version in invalid_versions: + with check.raises(ValueError): + checks.check_product_version(version, product) + + def test_check_product( product_types: List[str], ) -> None: @@ -209,8 +251,8 @@ def test_check_product( # Test a product that does exist for product_type in product_types: for product in available_products(product_type=product_type): - res = checks.check_product(product, product_type=product_type) - assert res is None, f"Function returned {res} for product {product} expected None" + checks.check_product(product, product_type=product_type) + # Should run without raising Exception # Test a product that isn't a string for product_type in product_types: @@ -226,10 +268,8 @@ def test_check_product_type( # Test a product_type that does exist for product_type in product_types: - res = checks.check_product_type(product_type) - assert res is None, ( - f"Function returned {res} for product_type {product_type}, " f"expected None" - ) + checks.check_product_type(product_type) + # Should run without raising Exception # Test a product_type that doesn't exist for product_type in ["IMERG", 123, None]: @@ -249,10 +289,8 @@ def test_check_product_category( # Test a product_category that does exist for product_category in product_categories: - res = checks.check_product_category(product_category) - assert res is None, ( - f"Function returned {res} for product_category {product_category}," f" expected None" - ) + checks.check_product_category(product_category) + # Should run without raising Exception # Test a product_category that doesn't exist for product_category in ["NOT", "A", "CATEGORY"]: @@ -272,10 +310,8 @@ def test_check_product_level( # Test a product_level that does exist for product_level in product_levels: - res = checks.check_product_level(product_level) - assert ( - res is None - ), f"Function returned {res} for product_level {product_level}, expected None" + checks.check_product_level(product_level) + # Should run without raising Exception # Test a product_level that doesn't exist for product_level in ["NOT", "A", "LEVEL"]: @@ -291,8 +327,8 @@ def test_check_product_validity( # Test a product that does exist for product_type in product_types: for product in available_products(product_type=product_type): - res = checks.check_product_validity(product, product_type=product_type) - assert res is None, f"Function returned {res} for product {product}, expected None" + checks.check_product_validity(product, product_type=product_type) + # Should run without raising Exception # Test a product that doesn't exist for product_type in product_types: @@ -376,6 +412,11 @@ def test_check_time() -> None: with pytest.raises(TypeError): checks.check_time(123) + # Check numpy single timestamp + res = checks.check_time(np.array(["2014-12-31"], dtype="datetime64[s]")) + assert isinstance(res, datetime.datetime) + assert res == datetime.datetime(2014, 12, 31) + # Check numpy multiple timestamp with pytest.raises(ValueError): checks.check_time(np.array(["2014-12-31", "2015-01-01"], dtype="datetime64[s]")) @@ -384,6 +425,12 @@ def test_check_time() -> None: with pytest.raises(ValueError): checks.check_time(np.array(["2014-12-31"])) + # Check non-UTC timezone + with pytest.raises(ValueError): + checks.check_time( + datetime.datetime(2014, 12, 31, 12, 30, 30, 300, tzinfo=pytz.timezone("Europe/Zurich")) + ) + def test_check_date() -> None: """Check date/datetime object is returned from varying inputs""" @@ -498,6 +545,36 @@ def test_check_start_end_time() -> None: ) +def test_check_valid_time_request( + check, # For non-failing asserts + product_info: Dict[str, Any], +) -> None: + """Test check_valid_time_request()""" + + for product, info in product_info.items(): + valid_start_time = info["start_time"] + valid_end_time = info["end_time"] + + if valid_start_time is not None: + # Check valid times + start_time = valid_start_time + end_time = valid_start_time + datetime.timedelta(days=1) + checks.check_valid_time_request(start_time, end_time, product) + + # Check invalid start time + start_time = valid_start_time - datetime.timedelta(days=1) + end_time = valid_start_time + datetime.timedelta(days=1) + with check.raises(ValueError): + checks.check_valid_time_request(start_time, end_time, product) + + # Check invalid end time + if valid_end_time is not None: + start_time = valid_end_time - datetime.timedelta(days=1) + end_time = valid_end_time + datetime.timedelta(days=1) + with check.raises(ValueError): + checks.check_valid_time_request(start_time, end_time, product) + + def test_check_scan_mode( products: List[str], ) -> None: diff --git a/gpm_api/tests/test_io/test_local.py b/gpm_api/tests/test_io/test_local.py new file mode 100644 index 00000000..9d9ecde8 --- /dev/null +++ b/gpm_api/tests/test_io/test_local.py @@ -0,0 +1,60 @@ +import os +from typing import Dict + +from pytest_mock import MockerFixture + +from gpm_api.io import local + + +def test_get_local_filepaths( + mock_configuration: Dict[str, str], + mocker: MockerFixture, +): + product = "2A-DPR" + product_category = "RADAR" + product_type = "RS" + version = 7 + + # Test with non-existent files + returned_filepaths = local.get_local_filepaths( + product=product, + product_type=product_type, + version=version, + ) + + assert returned_filepaths == [] + + # Mock glob.glob to return a list of filepaths + mock_filenames = [ + "file1.HDF5", + "file2.HDF5", + ] + + def mock_glob(pattern): + return [os.path.join(pattern.rstrip("*"), filename) for filename in mock_filenames] + + mocker.patch("gpm_api.io.local.glob.glob", side_effect=mock_glob) + mocker.patch("gpm_api.io.local.os.path.exists", return_value=True) + + returned_filepaths = local.get_local_filepaths( + product=product, + product_type=product_type, + version=version, + ) + + expected_filepaths = [ + os.path.join( + mock_configuration["gpm_base_dir"], + "GPM", + product_type, + f"V0{version}", + product_category, + product, + "*", + "*", + "*", + filename, + ) + for filename in mock_filenames + ] + assert returned_filepaths == expected_filepaths diff --git a/gpm_api/tests/test_io/test_pps.py b/gpm_api/tests/test_io/test_pps.py index d45f1ee0..fd0f1358 100644 --- a/gpm_api/tests/test_io/test_pps.py +++ b/gpm_api/tests/test_io/test_pps.py @@ -1,10 +1,7 @@ import datetime -import os -from typing import Dict, Any, List -from pytest_mock import MockerFixture +from typing import List from gpm_api.io import pps -from gpm_api.io import find -from gpm_api.io.products import available_products, available_versions +from gpm_api.io.products import available_products def test_get_pps_nrt_product_dir(products: List[str]) -> None: @@ -26,106 +23,6 @@ def test_get_pps_nrt_product_dir(products: List[str]) -> None: product_type="NRT", product_category="IMERG", ): - assert res == os.path.join( - foldername, - date.strftime("%Y%m"), - ) + assert res == f"{foldername}/{date.strftime('%Y%m')}" else: assert res == foldername - - -# def test_get_pps_product_directory( -# products: List[str], -# product_types: List[str], -# ) -> None: -# for product in products: -# for product_type in product_types: -# # Only work on NRT products -# if product in available_products(product_type=product_type): -# # Dependent on dir forming private function -# foldername = pps._get_pps_nrt_product_folder_name(product) - -# res = pps.get_pps_product_directory(product, product_type, date, version) -# assert res == foldername -# pass - - -def test_find_pps_daily_filepaths_private( - mocker: MockerFixture, - product_types: List[str], - remote_filepaths: Dict[str, Any], -) -> None: - """Test the find_pps_daily_filepaths function.""" - - # Mock server call, with a return of empty data - mocker.patch.object(pps, "get_pps_daily_filepaths", return_value=[]) - - for product_type in product_types: - for product in available_products(product_type=product_type): - for version in available_versions(product=product): - find.find_daily_filepaths( - storage="pps", - date="2021-01-01", - product=product, - version=version, - product_type=product_type, - ) - - # Return the curated remote_filepath list - mocker.patch.object( - pps, - "get_pps_daily_filepaths", - return_value=list(remote_filepaths), - ) - - for product_type in product_types: - for product in available_products(product_type=product_type): - find.find_daily_filepaths( - storage="pps", - date="2021-01-01", - product=product, - version=None, - product_type=product_type, - ) - - -def test_find_pps_filepaths( - product_types: List[str], - mocker: MockerFixture, - remote_filepaths: Dict[str, Any], -) -> None: - """Test the PPS find_filepaths function.""" - - sftp_paths = [x for x in list(remote_filepaths) if x.split("://")[0] == "sftp"] - mocker.patch.object( - find, - "find_daily_filepaths", - autospec=True, - return_value=(sftp_paths, []), - ) - - for product_type in product_types: - for product in available_products(product_type=product_type): - assert ( - find.find_filepaths( - storage="pps", - product=product, - product_type=product_type, - start_time="2021-01-01", - end_time="2021-01-01", - ) - == sftp_paths - ) - - # Non-parallel - assert ( - find.find_filepaths( - storage="pps", - product=product, - product_type=product_type, - start_time="2021-01-01", - end_time="2021-01-01", - parallel=False, - ) - == sftp_paths - ) diff --git a/pyproject.toml b/pyproject.toml index 478e7954..1ff35359 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ dynamic = ["version"] image = ["ximage"] dev = ["pre-commit", "black[jupyter]", "blackdoc", "codespell", "ruff", - "pytest", "pytest-cov", "pytest-mock", "pydantic", + "pytest", "pytest-cov", "pytest-mock", "pydantic", "pytest-check", "pytest-watcher", "pip-tools", "bumpver", "twine", "setuptools>=61.0.0", "wheel", "sphinx", "sphinx-gallery", "sphinx-book-theme", "nbsphinx", "sphinx_mdinclude"] @@ -90,7 +90,7 @@ download_gpm_files="gpm_api.scripts.download_gpm_files:download_gpm_files" [tool.pytest.ini_options] -addopts = "--ignore=gpm_api/tests/0_tmp/ --cov --cov-report term-missing" +addopts = "--ignore=gpm_api/tests/0_tmp/ --cov --cov-report term-missing --check-max-report=10" [tool.black] line-length = 100 @@ -147,13 +147,3 @@ exclude = [ [tool.codespell] ignore-words-list = "ges,nd" - - -[tool.coverage.run] -omit = [ - "gpm_api/_version.py", - "gpm_api/etc/*", - "gpm_api/encoding/*", - "gpm_api/retrieval/*", - "gpm_api/bucket/*", -]