diff --git a/ci/environment-docs.yml b/ci/environment-docs.yml index b0a7c71..ccb5167 100644 --- a/ci/environment-docs.yml +++ b/ci/environment-docs.yml @@ -4,13 +4,16 @@ channels: - nodefaults dependencies: - cf_xarray + - fsspec - furo + - gcsfs - joblib - jupyterlab - myst-nb - netcdf4 - pip - pydantic + - s3fs - sphinx-copybutton - sphinx-inline-tabs - xarray @@ -18,4 +21,5 @@ dependencies: - sphinxext-opengraph - autodoc_pydantic - -r ../requirements.txt + - git+https://github.com/intake/intake-esm - -e .. diff --git a/ci/environment.yml b/ci/environment.yml index 77df547..d1a13c8 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -4,11 +4,11 @@ channels: - nodefaults dependencies: - cf_xarray + - fsspec + - gcsfs - intake-esm - joblib - - nbsphinx - netcdf4 - - numpydoc - pip - pre-commit - pydantic @@ -17,7 +17,7 @@ dependencies: - pytest-sugar - pytest-xdist - pyyaml - - typer + - s3fs - xarray - pip: - git+https://github.com/intake/intake-esm diff --git a/ecgtools/__init__.py b/ecgtools/__init__.py index 59dc8c0..a19b2d5 100644 --- a/ecgtools/__init__.py +++ b/ecgtools/__init__.py @@ -3,7 +3,7 @@ """Top-level module for ecgtools .""" from pkg_resources import DistributionNotFound, get_distribution -from .builder import Builder +from .builder import Builder, RootDirectory, glob_to_regex try: __version__ = get_distribution(__name__).version diff --git a/ecgtools/builder.py b/ecgtools/builder.py index 0c66f7c..bba69ea 100644 --- a/ecgtools/builder.py +++ b/ecgtools/builder.py @@ -1,149 +1,156 @@ -import datetime -import enum import fnmatch -import itertools -import json -import pathlib +import os.path +import re +import tempfile import typing import warnings +import fsspec import joblib import pandas as pd import pydantic +import toolz +from intake_esm.cat import ( + Aggregation, + AggregationControl, + Assets, + Attribute, + DataFormat, + ESMCatalogModel, +) INVALID_ASSET = 'INVALID_ASSET' TRACEBACK = 'TRACEBACK' -class DataFormatEnum(str, enum.Enum): - netcdf = 'netcdf' - zarr = 'zarr' +def glob_to_regex(*, include_patterns, exclude_patterns): + include_regex = r'|'.join([fnmatch.translate(x) for x in include_patterns]) + exclude_regex = r'|'.join([fnmatch.translate(x) for x in exclude_patterns]) or r'$.' + return include_regex, exclude_regex -class Attribute(pydantic.BaseModel): - column_name: str - vocabulary: str = None - - -class Assets(pydantic.BaseModel): - column_name: str - format: DataFormatEnum - - -class Aggregation(pydantic.BaseModel): - type: str - attribute_name: str - options: typing.Optional[typing.Dict[str, typing.Any]] - +class RootDirectory(pydantic.BaseModel): + path: str + depth: int = 0 + storage_options: typing.Dict[typing.Any, typing.Any] = pydantic.Field(default_factory=dict) + exclude_regex: str = pydantic.Field(default_factory=str) + include_regex: str = pydantic.Field(default_factory=str) + + def __hash__(self): + return hash(f'{self.path}{self.raw_path}') + + @property + def mapper(self): + return fsspec.get_mapper(self.path, **self.storage_options) + + @property + def protocol(self): + protocol = self.mapper.fs.protocol + if isinstance(protocol, (list, tuple)): + protocol = protocol[0] + return protocol + + @property + def raw_path(self): + return self.mapper.fs._strip_protocol(self.path) + + def walk(self): + all_assets = [] + for root, dirs, files in self.mapper.fs.walk(self.raw_path, maxdepth=self.depth + 1): + # exclude dirs + dirs[:] = [os.path.join(root, directory) for directory in dirs] + dirs[:] = [ + directory for directory in dirs if not re.match(self.exclude_regex, directory) + ] -class AggregationControl(pydantic.BaseModel): - variable_column_name: str - groupby_attrs: typing.List[str] = None - aggregations: typing.List[Aggregation] = None + if files: + # exclude/include assets + if self.protocol != 'file': + files = [f'{self.protocol}://{os.path.join(root, file)}' for file in files] + else: + files = [os.path.join(root, file) for file in files] + files = [file for file in files if not re.match(self.exclude_regex, file)] + files = [file for file in files if re.match(self.include_regex, file)] + all_assets.extend(files) + # Look for zarr assets. This works for zarr stores created with consolidated metadata + # print(all_assets) + for directory in dirs: + if self.mapper.fs.exists(f'{directory}/.zmetadata'): + path = ( + f'{self.protocol}://{directory}' if self.protocol != 'file' else directory + ) + all_assets.append(path) -class ESMCollection(pydantic.BaseModel): - catalog_file: typing.Union[str, pathlib.Path, pydantic.AnyUrl] - attributes: typing.List[Attribute] - assets: Assets - aggregation_control: AggregationControl - esmcat_version: str = '0.0.1' - id: str = None - description: str = None - last_updated: typing.Union[datetime.datetime, datetime.date] = None + return all_assets @pydantic.dataclasses.dataclass class Builder: - """Generates a catalog from a list of files. + """Generates a catalog from a list of netCDF files or zarr stores Parameters ---------- - root_path : str or list - Path(s) of root directory. - extension : str, optional - File extension, by default None. If None, the builder will look for files with - `*.nc` extension. + paths : list of str + List of paths to crawl for assets/files. + storage_options : dict, optional + Parameters passed to the backend file-system such as Google Cloud Storage, + Amazon Web Service S3 depth : int, optional - Recursion depth. Recursively crawl `root_path` up to a specified depth, by default 0 - exclude_patterns : list, optional - Directory, file patterns to exclude during catalog generation. - These could be substring or regular expressions. by default None - njobs : int, optional - The maximum number of concurrently running jobs, - by default -1 meaning all CPUs are used. - + Maximum depth to crawl for assets. Default is 0. + exclude_patterns : list of str, optional + List of glob patterns to exclude from crawling. + include_patterns : list of str, optional + List of glob patterns to include from crawling. + joblib_parallel_kwargs : dict, optional + Parameters passed to joblib.Parallel. Default is {}. """ - root_path: typing.Union[pydantic.DirectoryPath, typing.List[pydantic.DirectoryPath]] - extension: str = '.nc' + paths: typing.List[str] + storage_options: typing.Dict[typing.Any, typing.Any] = None depth: int = 0 exclude_patterns: typing.List[str] = None - njobs: int = -1 - INVALID_ASSET: typing.ClassVar[str] = INVALID_ASSET - TRACEBACK: typing.ClassVar[str] = TRACEBACK + include_patterns: typing.List[str] = None + joblib_parallel_kwargs: typing.Dict[str, typing.Any] = None def __post_init_post_parse__(self): - self.df = pd.DataFrame() + self.storage_options = self.storage_options or {} + self.joblib_parallel_kwargs = self.joblib_parallel_kwargs or {} + self.exclude_patterns = self.exclude_patterns or [] + self.include_patterns = self.include_patterns or [] + # transform glob patterns to regular expressions + self.include_regex, self.exclude_regex = glob_to_regex( + include_patterns=self.include_patterns, exclude_patterns=self.exclude_patterns + ) + + self._root_dirs = [ + RootDirectory( + path=path, + storage_options=self.storage_options, + depth=self.depth, + exclude_regex=self.exclude_regex, + include_regex=self.include_regex, + ) + for path in self.paths + ] + self.assets = None self.invalid_assets = pd.DataFrame() - self.dirs = None - self.filelist = None self.entries = None + self.df = pd.DataFrame() - def get_directories(self): - """Walk `root_path`'s subdirectories and returns a list of directories - up to the specified depth from `root_path`. - - Returns - ------- - `ecgtools.Builder` - """ - pattern = '*/' * (self.depth + 1) - - if isinstance(self.root_path, pathlib.PosixPath): - dirs = [x for x in self.root_path.glob(pattern) if x.is_dir()] - - elif isinstance(self.root_path, list): - dirs = [x for path in self.root_path for x in path.glob(pattern) if x.is_dir()] - - if not dirs: - - if not isinstance(self.root_path, list): - dirs = [self.root_path] - - else: - dirs = self.root_path - - self.dirs = dirs + def get_assets(self): + assets = [directory.walk() for directory in self._root_dirs] + self.assets = sorted(toolz.unique(toolz.concat(assets))) return self - def get_filelist(self): - """Get a list of files from a list of directories.""" + @pydantic.validate_arguments + def parse(self, *, parsing_func: typing.Callable, parsing_func_kwargs: dict = None): + if not self.assets: + raise ValueError('asset list provided is None. Please run `.get_assets()` first') - def _filter_files(filelist): - return not any( - fnmatch.fnmatch(filelist, pat=exclude_pattern) - for exclude_pattern in self.exclude_patterns - ) - - def _glob_dir(directory, extension): - return sorted(list(directory.rglob(f'*{extension}'))) - - filelist = joblib.Parallel(n_jobs=self.njobs, verbose=5)( - joblib.delayed(_glob_dir)(directory, self.extension) for directory in self.dirs - ) - filelist = itertools.chain(*filelist) - if self.exclude_patterns: - filelist = list(filter(_filter_files, filelist)) - self.filelist = sorted(list(filelist)) - return self - - def _parse(self, parsing_func, parsing_func_kwargs=None): parsing_func_kwargs = {} if parsing_func_kwargs is None else parsing_func_kwargs - if parsing_func is None: - raise ValueError(f'`parsing_func` must a valid Callable. Got {type(parsing_func)}') - entries = joblib.Parallel(n_jobs=self.njobs, verbose=5)( - joblib.delayed(parsing_func)(file, **parsing_func_kwargs) for file in self.filelist + entries = joblib.Parallel(**self.joblib_parallel_kwargs)( + joblib.delayed(parsing_func)(asset, **parsing_func_kwargs) for asset in self.assets ) self.entries = entries self.df = pd.DataFrame(entries) @@ -151,71 +158,79 @@ def _parse(self, parsing_func, parsing_func_kwargs=None): def clean_dataframe(self): """Clean the dataframe by excluding invalid assets and removing duplicate entries.""" - if self.INVALID_ASSET in self.df.columns: - invalid_assets = self.df[self.df[self.INVALID_ASSET].notnull()][ - [self.INVALID_ASSET, self.TRACEBACK] - ] - df = self.df[self.df[self.INVALID_ASSET].isnull()].drop( - columns=[self.INVALID_ASSET, self.TRACEBACK] - ) + if INVALID_ASSET in self.df.columns: + invalid_assets = self.df[self.df[INVALID_ASSET].notnull()][[INVALID_ASSET, TRACEBACK]] + df = self.df[self.df[INVALID_ASSET].isnull()].drop(columns=[INVALID_ASSET, TRACEBACK]) self.invalid_assets = invalid_assets if not self.invalid_assets.empty: warnings.warn( - f'Unable to parse {len(self.invalid_assets)} assets/files. A list of these assets can be found in `.invalid_assets` attribute.', + f'Unable to parse {len(self.invalid_assets)} assets. A list of these assets can be found in `.invalid_assets` attribute.', stacklevel=2, ) self.df = df return self + @pydantic.validate_arguments def build( self, + *, parsing_func: typing.Callable, parsing_func_kwargs: dict = None, postprocess_func: typing.Callable = None, + postprocess_func_kwargs: dict = None, ): - """Collect a list of files and harvest attributes from them. + """Builds a catalog from a list of netCDF files or zarr stores. Parameters ---------- parsing_func : callable - A function that will be called to parse attributes from a given file/filepath - parsing_func_kwargs: dict, optional - Additional named arguments passed to `parsing_func` - postprocess_func: Callable, optional - A function that will be used to postprocess the built dataframe. + Function that parses the asset and returns a dictionary of metadata. + parsing_func_kwargs : dict, optional + Parameters passed to the parsing function. Default is {}. + postprocess_func : callable, optional + Function that post-processes the built dataframe and returns a pandas dataframe. + Default is None. + postprocess_func_kwargs : dict, optional + Parameters passed to the post-processing function. Default is {}. Returns ------- - `ecgtools.Builder` + :py:class:`~ecgtools.Builder` + The builder object. + """ - self.get_directories().get_filelist()._parse( - parsing_func, parsing_func_kwargs + self.get_assets().parse( + parsing_func=parsing_func, parsing_func_kwargs=parsing_func_kwargs ).clean_dataframe() + if postprocess_func: - self.df = postprocess_func(self.df) + postprocess_func_kwargs = postprocess_func_kwargs or {} + self.df = postprocess_func(self.df, **postprocess_func_kwargs) return self + @pydantic.validate_arguments def save( self, - catalog_file: typing.Union[str, pathlib.Path, pydantic.AnyUrl], + *, + name: str, path_column_name: str, variable_column_name: str, - data_format: DataFormatEnum, + data_format: DataFormat, groupby_attrs: typing.List[str] = None, aggregations: typing.List[Aggregation] = None, esmcat_version: str = '0.0.1', - id: str = None, description: str = None, - last_updated: typing.Union[datetime.datetime, datetime.date] = None, - use_relative_path: bool = True, - **kwargs, + directory: str = None, + catalog_type: str = 'file', + to_csv_kwargs: dict = None, + json_dump_kwargs: dict = None, ): """Persist catalog contents to files. Parameters ---------- - catalog_file : str - Path to a the CSV file in which catalog contents will be persisted. + name: str + The name of the file to save the catalog to. path_column_name : str The name of the column containing the path to the asset. Must be in the header of the CSV file. @@ -227,31 +242,30 @@ def save( List of aggregations to apply to query results, default None esmcat_version : str The ESM Catalog version the collection implements, default None - id : str - Identifier for the collection, default None description : str Detailed multi-line description to fully explain the collection, default None - use_relative_path: bool - Whether to use a relative path for the catalog file (csv file) - entry in the json file, default True - kwargs : dict - Additional keyword arguments are passed through to the - :py:meth:`~pandas.DataFrame.to_csv` method. + directory: str + The directory to save the catalog to. If None, use the current directory + catalog_type: str + The type of catalog to save. Whether to save the catalog table as a dictionary + in the JSON file or as a separate CSV file. Valid options are 'dict' and 'file'. + to_csv_kwargs : dict, optional + Additional keyword arguments passed through to the :py:meth:`~pandas.DataFrame.to_csv` method. + json_dump_kwargs : dict, optional + Additional keyword arguments passed through to the :py:func:`~json.dump` function. + Returns ------- - `ecgtools.Builder` + :py:class:`~ecgtools.Builder` + The builder object. Notes ----- See https://github.com/NCAR/esm-collection-spec/blob/master/collection-spec/collection-spec.md for more - """ - last_updated = last_updated or datetime.datetime.now().utcnow().strftime( - '%Y-%m-%dT%H:%M:%SZ' - ) for col in {variable_column_name, path_column_name}.union(set(groupby_attrs or [])): assert col in self.df.columns, f'{col} must be a column in the dataframe.' @@ -264,31 +278,28 @@ def save( aggregations=aggregations, ) - _catalog_file = pathlib.Path(catalog_file) - catalog_file_location = _catalog_file.name if use_relative_path else str(_catalog_file) - esmcol_data = ESMCollection( - catalog_file=catalog_file_location, - attributes=attributes, - assets=Assets(column_name=path_column_name, format=data_format), - aggregation_control=_aggregation_control, + cat = ESMCatalogModel( esmcat_version=esmcat_version, - id=id, description=description, - last_updated=last_updated, + attributes=attributes, + aggregation_control=_aggregation_control, + assets=Assets(column_name=path_column_name, format=data_format), ) - esmcol_data = json.loads(esmcol_data.json()) - index = kwargs.pop('index') if 'index' in kwargs else False - self.df.to_csv(_catalog_file, index=index, **kwargs) + + cat._df = self.df + + cat.save( + name=name, + directory=directory, + catalog_type=catalog_type, + to_csv_kwargs=to_csv_kwargs, + json_dump_kwargs=json_dump_kwargs, + ) + if not self.invalid_assets.empty: - invalid_assets_report_file = ( - _catalog_file.parent / f'invalid_assets_{_catalog_file.stem}.csv' - ) + invalid_assets_report_file = f'{tempfile.gettempdir()}/{name}_invalid_assets.csv' warnings.warn( f'Unable to parse {len(self.invalid_assets)} assets/files. A list of these assets can be found in {invalid_assets_report_file}.', stacklevel=2, ) self.invalid_assets.to_csv(invalid_assets_report_file, index=False) - json_path = _catalog_file.parent / f'{_catalog_file.stem}.json' - with open(json_path, mode='w') as outfile: - json.dump(esmcol_data, outfile, indent=2) - print(f'Saved catalog location: {json_path} and {_catalog_file}') diff --git a/requirements.txt b/requirements.txt index 90a1a3b..7f203b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,9 @@ cf_xarray joblib netCDF4 -typer xarray pyyaml pydantic pandas -dask +fsspec +intake-esm diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..ef3c509 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,9 @@ +import os +import pathlib + +import pytest + + +@pytest.fixture +def sample_data_directory(): + return pathlib.Path(os.path.dirname(__file__)).parent / 'sample_data' diff --git a/tests/parsers/test_cesm.py b/tests/parsers/test_cesm.py new file mode 100644 index 0000000..91f2213 --- /dev/null +++ b/tests/parsers/test_cesm.py @@ -0,0 +1,82 @@ +import pytest + +from ecgtools.parsers.cesm import parse_cesm_timeseries + + +@pytest.mark.parametrize( + 'file_path, variable, frequency, component, case', + [ + ( + 'cesm-le/b.e11.B1850C5CN.f09_g16.005.pop.h.SHF.040001-049912.nc', + 'SHF', + 'month_1', + 'ocn', + 'b.e11.B1850C5CN.f09_g16.005', + ), + ( + 'cesm/g.e11_LENS.GECOIAF.T62_g16.009.pop.h.ECOSYS_XKW.024901-031612.nc', + 'ECOSYS_XKW', + 'month_1', + 'ocn', + 'g.e11_LENS.GECOIAF.T62_g16.009', + ), + ( + 'cesm/g.e11_LENS.GECOIAF.T62_g16.009.pop.h.ECOSYS_XKW.024901-031612.nc', + 'ECOSYS_XKW', + 'month_1', + 'ocn', + 'g.e11_LENS.GECOIAF.T62_g16.009', + ), + ( + 'cesm/g.e11_LENS.GECOIAF.T62_g16.009.pop.h.ECOSYS_XKW.024901-031612.nc', + 'ECOSYS_XKW', + 'month_1', + 'ocn', + 'g.e11_LENS.GECOIAF.T62_g16.009', + ), + ( + 'cesm/g.e11_LENS.GECOIAF.T62_g16.009.pop.h.ECOSYS_XKW.024901-031612.nc', + 'ECOSYS_XKW', + 'month_1', + 'ocn', + 'g.e11_LENS.GECOIAF.T62_g16.009', + ), + ( + 'cesm/g.e11_LENS.GECOIAF.T62_g16.009.pop.h.ECOSYS_XKW.024901-031612.nc', + 'ECOSYS_XKW', + 'month_1', + 'ocn', + 'g.e11_LENS.GECOIAF.T62_g16.009', + ), + ( + 'cesm/g.e11_LENS.GECOIAF.T62_g16.009.pop.h.ECOSYS_XKW.024901-031612.nc', + 'ECOSYS_XKW', + 'month_1', + 'ocn', + 'g.e11_LENS.GECOIAF.T62_g16.009', + ), + ], +) +def test_cesm_timeseries(sample_data_directory, file_path, variable, frequency, component, case): + path = sample_data_directory / file_path + entry = parse_cesm_timeseries(path) + assert { + 'component', + 'stream', + 'case', + 'member_id', + 'variable', + 'start_time', + 'end_time', + 'time_range', + 'long_name', + 'units', + 'vertical_levels', + 'frequency', + 'path', + } == set(entry.keys()) + assert entry['variable'] == variable + assert entry['frequency'] == frequency + assert entry['path'] == str(path) + assert entry['case'] == case + assert entry['component'] == component diff --git a/tests/parsers/test_cmip.py b/tests/parsers/test_cmip.py new file mode 100644 index 0000000..032cee9 --- /dev/null +++ b/tests/parsers/test_cmip.py @@ -0,0 +1,20 @@ +import pytest + +from ecgtools.parsers.cmip import parse_cmip6 + + +@pytest.mark.parametrize( + 'file_path', + [ + 'cmip/CMIP6/CMIP/BCC/BCC-ESM1/piControl/r1i1p1f1/Amon/tasmax/gn/v20181214/tasmax/tasmax_Amon_BCC-ESM1_piControl_r1i1p1f1_gn_185001-230012.nc' + ], +) +def test_parse_cmip6(sample_data_directory, file_path): + path = sample_data_directory / file_path + entry = parse_cmip6(path) + assert {'activity_id', 'variable_id', 'table_id'}.issubset(set(list(entry.keys()))) + assert entry['experiment_id'] == 'piControl' + assert entry['member_id'] == 'r1i1p1f1' + assert entry['grid_label'] == 'gn' + assert entry['table_id'] == 'Amon' + assert entry['variable_id'] == 'tasmax' diff --git a/tests/parsers/test_obs.py b/tests/parsers/test_obs.py new file mode 100644 index 0000000..c361974 --- /dev/null +++ b/tests/parsers/test_obs.py @@ -0,0 +1,28 @@ +import pandas as pd +import pytest + +from ecgtools import Builder +from ecgtools.parsers.observations import parse_amwg_obs + + +@pytest.mark.parametrize( + 'file_path', + [ + 'cesm_obs/AIRS_01_climo.nc', + 'cesm_obs/MODIS_ANN_climo.nc', + ], +) +def test_obs_parser(sample_data_directory, file_path): + parsed_dict = parse_amwg_obs(sample_data_directory / file_path) + assert isinstance(parsed_dict, dict) + assert 'path' in parsed_dict + + +@pytest.mark.parametrize( + 'path', + ['cesm_obs'], +) +def test_obs_builder(sample_data_directory, path): + b = Builder(paths=[str(sample_data_directory / path)]) + b.build(parsing_func=parse_amwg_obs) + assert isinstance(b.df, pd.DataFrame) diff --git a/tests/test_builder.py b/tests/test_builder.py index 6faa7c2..2170fdf 100644 --- a/tests/test_builder.py +++ b/tests/test_builder.py @@ -1,128 +1,150 @@ -import json import os import pathlib -import traceback +import intake import pandas as pd -import pydantic import pytest -from ecgtools import Builder +from ecgtools import Builder, RootDirectory, glob_to_regex +from ecgtools.parsers.cesm import parse_cesm_history sample_data_dir = pathlib.Path(os.path.dirname(__file__)).parent / 'sample_data' -def parsing_func(file): - return {'path': file, 'variable': 'placeholder'} - - -def parsing_func_errors(file): - try: - file.is_valid() - except: - return {Builder.INVALID_ASSET: file.as_posix(), Builder.TRACEBACK: traceback.format_exc()} - - -def test_root_path_error(): - with pytest.raises(pydantic.ValidationError): - Builder('test_directory') - - @pytest.mark.parametrize( - 'root_path', + 'path, depth, storage_options,include_patterns, exclude_patterns, num_assets', [ - sample_data_dir / 'cmip' / 'CMIP6', - sample_data_dir / 'cmip' / 'cmip5', - sample_data_dir / 'cesm', - [sample_data_dir / 'cmip' / 'CMIP6', sample_data_dir / 'cmip' / 'cmip5'], - [ - sample_data_dir / 'cmip' / 'CMIP6' / 'CMIP' / 'IPSL', - sample_data_dir / 'cmip' / 'cmip5' / 'output1' / 'IPSL', - ], + (str(sample_data_dir / 'cmip' / 'CMIP6'), 10, {}, ['*.nc'], [], 59), + (str(sample_data_dir / 'cmip' / 'cmip5'), 10, {}, ['*.nc'], ['*/esmControl/*'], 27), + ('s3://ncar-cesm-lens/atm/monthly', 0, {'anon': True}, [], ['*cesmLE-20C*'], 75), ], ) -def test_init(root_path): - _ = Builder(root_path) +def test_directory(path, depth, storage_options, include_patterns, exclude_patterns, num_assets): + include_regex, exclude_regex = glob_to_regex( + include_patterns=include_patterns, exclude_patterns=exclude_patterns + ) + directory = RootDirectory( + path=path, + depth=depth, + storage_options=storage_options, + include_regex=include_regex, + exclude_regex=exclude_regex, + ) + assets = directory.walk() + assert len(assets) == num_assets @pytest.mark.parametrize( - 'root_path', + 'paths, depth, storage_options, include_patterns, exclude_patterns, num_assets', [ - sample_data_dir / 'cmip' / 'CMIP6', - sample_data_dir / 'cmip' / 'cmip5', - sample_data_dir / 'cesm', - [sample_data_dir / 'cmip' / 'CMIP6', sample_data_dir / 'cmip' / 'cmip5'], - [ - sample_data_dir / 'cmip' / 'CMIP6' / 'CMIP' / 'IPSL', - sample_data_dir / 'cmip' / 'cmip5' / 'output1' / 'IPSL', - ], + ( + [ + str(sample_data_dir / 'cmip' / 'CMIP6' / 'CMIP' / 'BCC'), + str(sample_data_dir / 'cmip' / 'CMIP6' / 'CMIP' / 'IPSL'), + ], + 8, + {}, + ['*.nc'], + [], + 27, + ), + ( + ['s3://ncar-cesm-lens/lnd/monthly', 's3://ncar-cesm-lens/ocn/monthly'], + 0, + {'anon': True}, + [], + ['*cesmLE-20C*', '*cesmLE-RCP85*'], + 78, + ), ], ) -def test_get_filelist(root_path): - b = Builder( - root_path, - exclude_patterns=['*/files/*', '*/latest/*'], - ).get_directories() - assert b.dirs - assert isinstance(b.dirs[0], pathlib.Path) +def test_builder_init( + paths, depth, storage_options, include_patterns, exclude_patterns, num_assets +): + builder = Builder( + paths=paths, + depth=depth, + storage_options=storage_options, + include_patterns=include_patterns, + exclude_patterns=exclude_patterns, + ) + builder.get_assets() + assert isinstance(builder.assets, list) + assert len(builder.assets) == num_assets - b = b.get_filelist() - assert b.filelist - assert isinstance(b.filelist[0], pathlib.Path) +def parsing_func(file): + return {'path': file, 'variable': 'placeholder'} -def test_parse_error(): - b = Builder(sample_data_dir / 'cesm').get_directories().get_filelist() - with pytest.raises(ValueError): - b._parse(None) +def post_process_func(df, times=10): + df['my_column'] = 1 * times + return df @pytest.mark.parametrize( - 'root_path', + 'paths, depth, storage_options, include_patterns, exclude_patterns, num_assets', [ - sample_data_dir / 'cmip' / 'CMIP6', - sample_data_dir / 'cmip' / 'cmip5', - sample_data_dir / 'cesm', - [sample_data_dir / 'cmip' / 'CMIP6', sample_data_dir / 'cmip' / 'cmip5'], + ( + [ + str(sample_data_dir / 'cmip' / 'CMIP6' / 'CMIP' / 'BCC'), + str(sample_data_dir / 'cesm'), + ], + 1, + {}, + ['*.nc'], + [], + 3, + ), + ( + ['s3://ncar-cesm-lens/lnd/static', 's3://ncar-cesm-lens/ocn/static'], + 0, + {'anon': True}, + [], + ['*cesmLE-20C*', '*cesmLE-RCP85*'], + 4, + ), ], ) -def test_build(root_path): - def func(df): - df['my_column'] = 'test' - return df - - b = Builder(root_path, exclude_patterns=['*/files/*', '*/latest/*']).build( - parsing_func=parsing_func, postprocess_func=func +def test_builder_build( + paths, depth, storage_options, include_patterns, exclude_patterns, num_assets +): + builder = Builder( + paths=paths, + depth=depth, + storage_options=storage_options, + include_patterns=include_patterns, + exclude_patterns=exclude_patterns, + ) + builder.get_assets() + assert len(builder.assets) == num_assets + builder.build( + parsing_func=parsing_func, + postprocess_func=post_process_func, + postprocess_func_kwargs={'times': 100}, ) - assert 'my_column' in b.df.columns - assert b.entries - assert isinstance(b.entries[0], dict) - assert isinstance(b.df, pd.DataFrame) - assert not b.df.empty + assert isinstance(builder.df, pd.DataFrame) + assert len(builder.df) == num_assets + assert set(builder.df.columns) == {'path', 'variable', 'my_column'} -def test_parse_invalid_assets(): +def test_builder_save(tmp_path): + builder = Builder(paths=[str(sample_data_dir / 'cesm')], depth=5, include_patterns=['*.nc']) + builder.get_assets() + builder.assets.append('cesm/nonexistent_file.nc') # Add an invalid file with pytest.warns(UserWarning): - b = Builder(sample_data_dir / 'cesm').build(parsing_func=parsing_func_errors) - - assert not b.invalid_assets.empty - assert set(b.invalid_assets.columns) == {Builder.INVALID_ASSET, Builder.TRACEBACK} - - -def test_save(tmp_path): - catalog_file = tmp_path / 'test_catalog.csv' - - b = Builder(sample_data_dir / 'cesm').build(parsing_func=parsing_func) - b.save(catalog_file, 'path', 'variable', 'netcdf') - - df = pd.read_csv(catalog_file) - assert len(df) == len(b.df) - assert set(df.columns) == set(b.df.columns) - - json_path = tmp_path / 'test_catalog.json' - data = json.load(json_path.open()) - assert {'catalog_file', 'assets', 'aggregation_control', 'attributes'}.issubset( - set(data.keys()) - ) + builder.parse(parsing_func=parse_cesm_history).clean_dataframe() + with pytest.warns(UserWarning): + builder.save( + name='test', + path_column_name='path', + directory=str(tmp_path), + data_format='netcdf', + variable_column_name='variables', + aggregations=[], + groupby_attrs=[], + ) + assert not builder.invalid_assets.empty + cat = intake.open_esm_datastore(str(tmp_path / 'test.json')) + assert isinstance(cat.df, pd.DataFrame) diff --git a/tests/test_obs.py b/tests/test_obs.py deleted file mode 100644 index bccbf5b..0000000 --- a/tests/test_obs.py +++ /dev/null @@ -1,35 +0,0 @@ -import os -import pathlib - -import pandas as pd -import pytest - -from ecgtools import Builder -from ecgtools.parsers.observations import parse_amwg_obs - -sample_data_dir = pathlib.Path(os.path.dirname(__file__)).parent / 'sample_data' - -df = pd.DataFrame() - - -@pytest.mark.parametrize( - 'file_path', - [ - sample_data_dir / 'cesm_obs' / 'AIRS_01_climo.nc', - sample_data_dir / 'cesm_obs' / 'MODIS_ANN_climo.nc', - ], -) -def test_obs_parser(file_path): - parse_dict = parse_amwg_obs(file_path) - assert isinstance(parse_dict, dict) - assert isinstance(df.append(parse_dict, ignore_index=True), pd.DataFrame) - - -@pytest.mark.parametrize( - 'file_directory', - [sample_data_dir / 'cesm_obs'], -) -def test_obs_builder(file_directory): - b = Builder(file_directory) - b.build(parse_amwg_obs) - assert isinstance(b.df, pd.DataFrame)