diff --git a/docs/reference.md b/docs/reference.md index b9764a1..1f797c3 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -4,30 +4,28 @@ #### `hat-extract-timeseries` -Extract timeseries from a collection of simulation raster files. Timeseries extraction requires two variables +Extract timeseries from a collection of simulation raster files. Timeseries extraction requires a json configuration file, which can be provided on the command line using `--config` -1. simulation data directory -2. station metadata filepath + $ hat-extract-timeseries --config timeseries.json -They can be defined at the command line using `--simulation-datadir` and `--station-metadata` - - $ hat-extract-timeseries --simulation-datadir $GRIB_DATADIR --station-metadata $STATION_METADATA_FILEPATH - -This will use the default configuration, which you can show using `--show-default-config` +You can show an example of config file using `--show-default-config` $ extract_simulation_timeseries --show-default-config -To use a custom configuration there is `--config` - - $ hat-extract-timeseries --config $CUSTOM_CONFIG - To create your own configuration json file you might want to start with the default configuration as a template. Default values will be used where possible if not defined in the custom figuration file. Here is an example custom configuration file. # example custom configuration .json file { - "simulation_datadir": "/path/to/simulation_datadir", - "station_metadata": "path/to/Qgis_World_outlet_20221102.csv", - "input_file_extension": ".nc" + "station_metadata_filepath": "/path/to/stations.csv", + "simulation": { + "type": "file", + "files": "*.nc" + }, + "simulation_output_filepath": "./simulation_timeseries.nc", + "station_epsg": 4326, + "station_id_column_name": "obsid", + "station_filters":"", + "station_coordinates": ["Lon", "Lat"] } #### `hat-hydrostats` diff --git a/docs/usage.md b/docs/usage.md index 7b40fd5..92a8477 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -10,7 +10,7 @@ If you have already [installed](installation.md) hat then Run a command line tool, for example - $ extract_simulation_timeseries --help + $ hat-extract-timeseries --help For more information on individual command line tools, use the `--help` option at the command line or read the [reference](reference.md) documentation diff --git a/hat/config.py b/hat/config.py index c0e0fed..49318da 100644 --- a/hat/config.py +++ b/hat/config.py @@ -109,45 +109,3 @@ def read_config(custom_config_filepath: str): config = valid_custom_config(custom_config) return config - - -def timeseries_config( - simulation_datadir: str, station_metadata: str, config_filepath: str -): - """Manage configuration settings for timeseries extraction. - Priority order: - 1) command line arguments - 2) custom config file - 3) default config""" - - # custom or default configutation file - if not config_filepath: - config = DEFAULT_CONFIG - else: - config = read_config(config_filepath) - - # root directory for search of input files - if not simulation_datadir and not config["simulation_datadir"]: - raise ValueError( - """Please provide a data directory of grib or netcdf - simulation files for timeseries extraction""" - ) - elif simulation_datadir: - config["simulation_datadir"] = simulation_datadir - - # station metadata filepath - if not station_metadata and not config["station_metadata_filepath"]: - raise ValueError("Please provide a station metadata filepath") - elif station_metadata: - config["station_metadata_filepath"] = station_metadata - - # check output filepath is valid before running timeseries extraction - config["simulation_output_file_extension"] = config[ - "simulation_output_filepath" - ].split(".")[-1] - if config["simulation_output_file_extension"] not in ["nc", "csv"]: - raise ValueError( - "output_filepath must be .nc or .csv, please update config", config_filepath - ) - - return config diff --git a/hat/config_json/timeseries.json b/hat/config_json/timeseries.json index b8794a3..52f8cb4 100644 --- a/hat/config_json/timeseries.json +++ b/hat/config_json/timeseries.json @@ -1,11 +1,12 @@ { "station_metadata_filepath": "", - "simulation_datadir": "", - "simulation_input_file_extension": ".grb", + "simulation": { + "type": "file", + "files": "*.nc" + }, "simulation_output_filepath": "./simulation_timeseries.nc", - "recursive_search": "False", - "river_network_name": "cama_3min", "station_epsg": 4326, "station_id_column_name": "station_id", - "station_filters":"" + "station_filters":"", + "station_coordinates": ["LisfloodX", "LisfloodY"] } diff --git a/hat/data.py b/hat/data.py index d74a01e..4f25eee 100644 --- a/hat/data.py +++ b/hat/data.py @@ -1,11 +1,12 @@ +import glob import json import os import pathlib import sys -from pathlib import Path from tempfile import TemporaryDirectory from typing import List, Union +import earthkit.data import geopandas as gpd import humanize import pandas as pd @@ -67,27 +68,16 @@ def get_tmp_filepath( return os.path.join(tmpdir, f"{fname}{extension}") -def find_files(simulation_datadir, file_extension: str = ".nc", recursive=False): - """Find files in directory by file extension. Optionally recursive - (i.e. search all subdirectory too)""" +def find_files(simulation_files): + """Find files matching regex""" - if not os.path.exists(simulation_datadir): - raise FileNotFoundError(f"Directory does not exist: {simulation_datadir}") - - if recursive: - search_string = f"**/*{file_extension}" - else: - search_string = f"*{file_extension}" - - fpaths = sorted( - [str(file) for file in Path(simulation_datadir).glob(search_string)] - ) + fpaths = glob.glob(simulation_files) if not fpaths: - raise FileNotFoundError( - f"""No {file_extension} files found, - with recursive search as {recursive}, in: {simulation_datadir}""" - ) + raise Exception(f"Could not find any file from regex {simulation_files}") + else: + print("Found following simulation files:") + print(*fpaths, sep="\n") return fpaths @@ -265,3 +255,48 @@ def save_dataset_to_netcdf(ds: xr.Dataset, fpath: str): os.remove(fpath) ds.to_netcdf(fpath) + + +def find_main_var(ds): + variable_names = [k for k in ds.variables if len(ds.variables[k].dims) >= 3] + if len(variable_names) > 1: + raise Exception("More than one variable in dataset") + elif len(variable_names) == 0: + raise Exception("Could not find a valid variable in dataset") + else: + var_name = variable_names[0] + return var_name + + +def read_simulation_as_xarray(options): + if options["type"] == "file": + type = "file" + # find station files + files = find_files( + options["files"], + ) + args = [files] + elif options["type"] in ["mars", "fdb"]: + type = options["type"] + args = [options["request"]] + else: + raise Exception( + f"Simulation type {options['type']} not supported. Currently supported: file, mars, fdb" # noqa: E501 + ) + + # earthkit data file source + fs = earthkit.data.from_source(type, *args) + + xarray_kwargs = {} + if isinstance(fs, earthkit.data.readers.netcdf.NetCDFFieldListReader): + xarray_kwargs["xarray_open_mfdataset_kwargs"] = {"chunks": {"time": "auto"}} + else: + xarray_kwargs["xarray_open_dataset_kwargs"] = {"chunks": {"time": "auto"}} + + # xarray dataset + ds = fs.to_xarray(**xarray_kwargs) + + var = find_main_var(ds) + + da = ds[var] + return da diff --git a/hat/extract_simulation_timeseries.py b/hat/extract_simulation_timeseries.py index 7eaf177..3d64323 100644 --- a/hat/extract_simulation_timeseries.py +++ b/hat/extract_simulation_timeseries.py @@ -1,18 +1,14 @@ """ Python module for extracting simulation timeseries from grids (i.e. rasters) """ -from typing import List - import geopandas as gpd -import pandas as pd import xarray as xr from hat.config import load_package_config, valid_custom_config # hat modules -from hat.data import find_files from hat.geo import geopoints_to_array, latlon_coords -from hat.timeseries import all_timeseries, extract_timeseries_from_filepaths +from hat.timeseries import assign_stations, extract_timeseries_using_mask # Read configutation files # NOTE config files are installed as part of hat package using setup.py @@ -21,81 +17,25 @@ DEFAULT_CONFIG = load_package_config("timeseries.json") -def geopandas_to_xarray(station_metadata: gpd.GeoDataFrame, timeseries: pd.DataFrame): - """Convert results from geopandas geodataframe to xarray dataset""" - - # NOTE we do not use this inbuilt method.. - # ds = xr.Dataset.from_dataframe(timeseries) - # because it can be extremely slow (5 mins) when extracting all stations - # i.e. because it creates a separate data array for each station - # here instead we create a single data array for all stations - - # 2D numpy array of timeseries - arr = timeseries.to_numpy() - - # labels for the numpy array - coords = { - "time": list(timeseries.index), - "station": list(timeseries.columns), - } - - # xarray data array is essentially "just" a numpy array with labels - da = xr.DataArray( - arr, dims=["time", "station"], coords=coords, name="simulation_timeseries" - ) - - # parse point geometries data into lists - lons = [point.x for point in station_metadata.geometry] - lats = [point.y for point in station_metadata.geometry] - - # add to dataarray - da["longitude"] = ("station", lons) - da["latitude"] = ("station", lats) - - # create xarray data set (in this case with just one data array) - ds = da.to_dataset() - - return ds - - def extract_timeseries( - station_metadata: gpd.GeoDataFrame, - simulation_datadir: str = "", - simulation_fpaths: List = [], + stations: gpd.GeoDataFrame, + simulations_da: xr.DataArray, config: dict = DEFAULT_CONFIG, ): config = valid_custom_config(config) - if not simulation_datadir and not simulation_fpaths: - raise TypeError( - """extract_timeseries() missing 1 required variable: - 'simulation_datadir' or 'simulation_fpaths' """ - ) - - if not simulation_fpaths: - simulation_fpaths = find_files( - simulation_datadir, - file_extension=config["simulation_input_file_extension"], - recursive=config["recursive_search"], - ) - # infer coordinates for all grids from first the grid - coords = latlon_coords(simulation_fpaths[0]) + coords = latlon_coords(simulations_da) # numpy array of station locations - station_mask = geopoints_to_array(station_metadata, coords) + station_mask = geopoints_to_array(stations, coords) # extract timeseries from files using mask - timeseries_from_mask = extract_timeseries_from_filepaths( - simulation_fpaths, station_mask - ) + da_points = extract_timeseries_using_mask(simulations_da, station_mask) # all timeseries (i.e. handle proximal and duplicate stations) - timeseries = all_timeseries( - station_metadata, station_mask, timeseries_from_mask, coords + da_stations = assign_stations( + stations, station_mask, da_points, coords, config["station_id_column_name"] ) - # convert to xarray dataset - timeseries = geopandas_to_xarray(station_metadata, timeseries) - - return timeseries + return da_stations diff --git a/hat/geo.py b/hat/geo.py index c9c8faf..6c0408b 100644 --- a/hat/geo.py +++ b/hat/geo.py @@ -6,7 +6,6 @@ import geopandas as gpd import numpy as np import shapely -import xarray as xr from jsonschema import ValidationError, validate from pyproj import CRS, Transformer @@ -331,13 +330,29 @@ def geopoints_from_csv(fpath: str, lat_name: str, lon_name: str) -> gpd.GeoDataF return gdf -def latlon_coords(fpath): +def get_latlon_keys(ds): + lat_key = None + lon_key = None + if "lat" in ds.coords and "lon" in ds.coords: + lat_key = "lat" + lon_key = "lon" + elif "latitude" in ds.coords and "longitude" in ds.coords: + lat_key = "latitude" + lon_key = "longitude" + else: + raise Exception( + f"Lat/lon coordinates could not be detected in dataset with coords {ds.coords}" # noqa: E501 + ) + + return lat_key, lon_key + + +def latlon_coords(ds): """Latitude and longitude coordinates of an xarray""" - with xr.open_dataset(fpath) as ds: - lat_coords = ds.coords.get("latitude").data - lon_coords = ds.coords.get("longitude").data - coords = {"x": lon_coords, "y": lat_coords} - del ds + lat_key, lon_key = get_latlon_keys(ds) + lat_coords = ds.coords.get(lat_key).data + lon_coords = ds.coords.get(lon_key).data + coords = {"x": lon_coords, "y": lat_coords} return coords diff --git a/hat/observations.py b/hat/observations.py index ebb1a7e..f895201 100644 --- a/hat/observations.py +++ b/hat/observations.py @@ -1,19 +1,9 @@ import geopandas as gpd import pandas as pd -from hat.config import load_package_config from hat.data import is_csv, read_csv_and_cache from hat.filters import filter_dataframe -# Read configutation files -# NOTE config files are installed as part of hat package using setup.py -# (just updating the .json files will not necessarily work -# (i.e. would require pip install -e .) -COORD_NAMES = load_package_config("river_network_coordinate_names.json") -DEFAULT_CONFIG = load_package_config("timeseries.json") -RIVER_NETWORK_COORD_NAMES = COORD_NAMES[DEFAULT_CONFIG["river_network_name"]] -EPSG = DEFAULT_CONFIG["station_epsg"] - def add_geometry_column(gdf: gpd.GeoDataFrame, coord_names): """add geometry column to stations geodataframe. station must have valid @@ -42,29 +32,25 @@ def add_geometry_column(gdf: gpd.GeoDataFrame, coord_names): def read_station_metadata_file( - fpath: str, coord_names: str, epsg: int = EPSG + fpath: str, coord_names: str, epsg: int, filters: str = None ) -> gpd.GeoDataFrame: """read hydrological stations from file. will cache as pickle object because .csv file used by the team takes 12 seconds to load""" - if is_csv(fpath): - gdf = read_csv_and_cache(fpath) - gdf = add_geometry_column(gdf, coord_names) - gdf = gdf.set_crs(epsg=epsg) - else: - gdf = gpd.read_file(fpath) - - return gdf - + print("station file") + print(fpath) -def read_station_metadata( - station_metadata_filepath, filters="", coord_names: str = RIVER_NETWORK_COORD_NAMES -): - # read station metadata from file - stations = read_station_metadata_file(station_metadata_filepath, coord_names) + try: + if is_csv(fpath): + gdf = read_csv_and_cache(fpath) + gdf = add_geometry_column(gdf, coord_names) + gdf = gdf.set_crs(epsg=epsg) + else: + gdf = gpd.read_file(fpath) + except Exception: + raise Exception(f"Could not open file {fpath}") # (optionally) filter the stations, e.g. 'Contintent == Europe' - if filters: - stations = filter_dataframe(stations, filters) - - return stations + if filters is not None: + gdf = filter_dataframe(gdf, filters) + return gdf diff --git a/hat/timeseries.py b/hat/timeseries.py index 272273e..1fb1958 100644 --- a/hat/timeseries.py +++ b/hat/timeseries.py @@ -1,12 +1,11 @@ -from typing import List - -import earthkit.data import geopandas as gpd import numpy as np import pandas as pd import xarray as xr +from dask.diagnostics import ProgressBar +from tqdm import tqdm -""" NETCDF""" +from hat.geo import get_latlon_keys def mask_array_np(arr, mask): @@ -17,13 +16,10 @@ def mask_array_np(arr, mask): def extract_timeseries_using_mask( - da: xr.DataArray, mask: np.ndarray, core_dims=["y", "x"] + da: xr.DataArray, mask: np.ndarray, station_dim="station" ): """extract timeseries using a station mask with xarray.apply_ufunc() - - input_core_dims - list of core dimensions on each input argument - (needs to be same length as number of input arguments) - output_core_dims list of core dimensions on each output (needs to be same length as number of output variables) @@ -41,39 +37,24 @@ def extract_timeseries_using_mask( """ + core_dims = get_latlon_keys(da) + # dask computational graph (i.e. lazy) task = xr.apply_ufunc( mask_array_np, da, mask, input_core_dims=[core_dims, core_dims], - output_core_dims=[["station"]], + output_core_dims=[[station_dim]], output_dtypes=[da.dtype], exclude_dims=set(core_dims), dask="parallelized", - dask_gufunc_kwargs={"output_sizes": {"station": int(mask.sum())}}, + dask_gufunc_kwargs={"output_sizes": {station_dim: int(mask.sum())}}, ) # extract timeseries (i.e. compute graph) - timeseries = task.compute() - - return timeseries - - -def extract_timeseries_from_filepaths(fpaths: List[str], mask: np.ndarray): - """extract timeseries from a collection of files and given a boolean mask - to represent point locations""" - - # earthkit data file source - fs = earthkit.data.from_source("file", fpaths) - - # xarray dataset - ds = fs.to_xarray() - - # timeseries extraction using masking algorithm - timeseries = extract_timeseries_using_mask( - ds.dis, mask, core_dims=["latitude", "longitude"] - ) + with ProgressBar(dt=10): + timeseries = task.compute() return timeseries @@ -91,8 +72,12 @@ def station_timeseries_index( return idx -def all_timeseries( - stations: gpd.GeoDataFrame, mask, masked_timeseries: xr.DataArray, coords: dict +def assign_stations( + stations: gpd.GeoDataFrame, + mask: np.ndarray, + da_stations: xr.DataArray, + coords: dict, + station_dim: str, ) -> pd.DataFrame: """ Fixed bug in timeseries extraction by mask where: @@ -107,42 +92,32 @@ def all_timeseries( The idea is to ensure we preserve ordering so that we can re-map station_ids to latlon coordinates """ - # broadcast the coord arrays to same shape as mask lon2d, lat2d = np.meshgrid(coords["x"], coords["y"]) # get lats and lons of True values in mask lat_in_mask = lat2d[mask] lon_in_mask = lon2d[mask] + """ We can now get the complete collection of timeseries (even where there are duplicates of proximal stations) """ - - # table of all station timeseries - complete_station_timeseries = {} - - for _, station in stations.iterrows(): + da_with_duplicate = None + stations_list = [] + stations_id = [] + for _, station in tqdm(stations.iterrows(), total=stations.shape[0]): # station id - station_id = station["station_id"] + station_id = station[station_dim] # timeseries index for a given station timeseries_index = station_timeseries_index(station, lon_in_mask, lat_in_mask) - # timeseries values for a given station - station_timeseries = masked_timeseries.sel(station=timeseries_index) - - # update complete station timeseries - complete_station_timeseries[station_id] = station_timeseries.values.flatten() - """ - Finally, create a pandas dataframe with a datetime index - """ - - # stationIDs and discharge values - df = pd.DataFrame(complete_station_timeseries) + # add to the list + stations_id += [station_id] + stations_list += [timeseries_index] - # datetime index - df["datetime"] = pd.to_datetime(station_timeseries.time.values) - df = df.set_index("datetime") + da_with_duplicate = da_stations.sel(station=stations_list) + da_with_duplicate = da_with_duplicate.assign_coords(station=stations_id) - return df + return da_with_duplicate diff --git a/hat/tools/extract_simulation_timeseries_cli.py b/hat/tools/extract_simulation_timeseries_cli.py index 87e9742..dd7b53a 100644 --- a/hat/tools/extract_simulation_timeseries_cli.py +++ b/hat/tools/extract_simulation_timeseries_cli.py @@ -10,21 +10,20 @@ import json import os -from typing import List import geopandas as gpd import typer from hat.cli import prettyprint, title -from hat.config import timeseries_config +from hat.config import read_config # hat modules -from hat.data import find_files, save_dataset_to_netcdf +from hat.data import read_simulation_as_xarray, save_dataset_to_netcdf from hat.extract_simulation_timeseries import DEFAULT_CONFIG, extract_timeseries -from hat.observations import read_station_metadata +from hat.observations import read_station_metadata_file -def print_overview(config: dict, station_metadata: gpd.GeoDataFrame, fpaths: List[str]): +def print_overview(config: dict, station_metadata: gpd.GeoDataFrame, simulation): """Print overview of relevant information for user""" title("Configuration", color="cyan") @@ -39,9 +38,7 @@ def print_overview(config: dict, station_metadata: gpd.GeoDataFrame, fpaths: Lis print(f"number of stations = {len(station_metadata)}") title("Simulation", color="cyan") - print( - f"number of simulation files = {len(fpaths)}\n", - ) + print(simulation) def print_default_config(DEFAULT_CONFIG): @@ -61,15 +58,7 @@ def print_default_config(DEFAULT_CONFIG): def command_line_tool( - simulation_datadir: str = typer.Option( - "", - help="Directory of simulation files", - ), - station_metadata_filepath: str = typer.Option( - "", - help="Path to station metadata file", - ), - config_filepath: str = typer.Option( + config: str = typer.Option( "", help="Path to configuration file", ), @@ -87,23 +76,27 @@ def command_line_tool( title("STARTING TIME SERIES EXTRACTION") - config = timeseries_config( - simulation_datadir, station_metadata_filepath, config_filepath - ) - station_metadata = read_station_metadata(station_metadata_filepath) - simulation_fpaths = find_files( - config["simulation_datadir"], - file_extension=config["simulation_input_file_extension"], - recursive=config["recursive_search"], + cfg = read_config(config) + + # read station file + stations = read_station_metadata_file( + cfg["station_metadata_filepath"], + cfg["station_coordinates"], + cfg["station_epsg"], + cfg["station_filters"], ) - print_overview(config, station_metadata, simulation_fpaths) + # read simulated data + simulation = read_simulation_as_xarray(cfg["simulation"]) - timeseries = extract_timeseries( - station_metadata, simulation_fpaths=simulation_fpaths - ) + print_overview(cfg, stations, simulation) + + # Extract time series + timeseries = extract_timeseries(stations, simulation) + title("Timeseries extracted") + print(timeseries) - save_dataset_to_netcdf(timeseries, "./simulation_timeseries.nc") + save_dataset_to_netcdf(timeseries, cfg["simulation_output_filepath"]) title("TIMESERIES EXTRACTION COMPLETE", background="cyan", bold=True) diff --git a/hat/version.py b/hat/version.py index 3d26edf..3d18726 100644 --- a/hat/version.py +++ b/hat/version.py @@ -1 +1 @@ -__version__ = "0.4.1" +__version__ = "0.5.0" diff --git a/tests/test_config.py b/tests/test_config.py index 9a36546..941a014 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -8,13 +8,7 @@ # See https://setuptools.pypa.io/en/latest/pkg_resources.html with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning) - from hat.config import ( - DEFAULT_CONFIG, - booleanify, - read_config, - timeseries_config, - valid_custom_config, - ) + from hat.config import DEFAULT_CONFIG, booleanify, read_config, valid_custom_config def test_DEFAULT_CONFIG(): @@ -56,10 +50,3 @@ def test_read_config(): with pytest.raises(ValueError): _ = read_config(filepath_exists_but_is_not_json) - - -def test_timeseries_config(): - empty_path = "" - - with pytest.raises(ValueError): - _ = timeseries_config(empty_path, empty_path, empty_path)