From 8754f3d1c970bfe33ed6ff1e1274db3da5ff10d0 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Fri, 7 Jul 2023 08:30:49 +0000 Subject: [PATCH 1/9] test --- src/hat/images.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/hat/images.py b/src/hat/images.py index 14ebeae..8eee99e 100644 --- a/src/hat/images.py +++ b/src/hat/images.py @@ -1,8 +1,7 @@ import matplotlib + import numpy as np from quicklook import quicklook - - def arr_to_image(arr: np.array) -> np.array: """modify array so that it is optimized for viewing""" From 23dead763c289cd78d959905527d5432d130f55f Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Mon, 2 Oct 2023 15:08:51 +0000 Subject: [PATCH 2/9] fix conflicts --- hat/observations.py | 29 +++++++---------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/hat/observations.py b/hat/observations.py index ebb1a7e..035bc12 100644 --- a/hat/observations.py +++ b/hat/observations.py @@ -5,15 +5,6 @@ from hat.data import is_csv, read_csv_and_cache from hat.filters import filter_dataframe -# Read configutation files -# NOTE config files are installed as part of hat package using setup.py -# (just updating the .json files will not necessarily work -# (i.e. would require pip install -e .) -COORD_NAMES = load_package_config("river_network_coordinate_names.json") -DEFAULT_CONFIG = load_package_config("timeseries.json") -RIVER_NETWORK_COORD_NAMES = COORD_NAMES[DEFAULT_CONFIG["river_network_name"]] -EPSG = DEFAULT_CONFIG["station_epsg"] - def add_geometry_column(gdf: gpd.GeoDataFrame, coord_names): """add geometry column to stations geodataframe. station must have valid @@ -42,7 +33,10 @@ def add_geometry_column(gdf: gpd.GeoDataFrame, coord_names): def read_station_metadata_file( - fpath: str, coord_names: str, epsg: int = EPSG + fpath: str, + coord_names: str, + epsg: int, + filters: str = None ) -> gpd.GeoDataFrame: """read hydrological stations from file. will cache as pickle object because .csv file used by the team takes 12 seconds to load""" @@ -54,17 +48,8 @@ def read_station_metadata_file( else: gdf = gpd.read_file(fpath) - return gdf - - -def read_station_metadata( - station_metadata_filepath, filters="", coord_names: str = RIVER_NETWORK_COORD_NAMES -): - # read station metadata from file - stations = read_station_metadata_file(station_metadata_filepath, coord_names) - # (optionally) filter the stations, e.g. 'Contintent == Europe' - if filters: - stations = filter_dataframe(stations, filters) + if filters is not None: + gdf = filter_dataframe(gdf, filters) + return gdf - return stations From 7c32476bb88bccf6a3858e8487a484069f40955d Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Thu, 5 Oct 2023 09:16:32 +0000 Subject: [PATCH 3/9] update extract_timeseries tool --- hat/config.py | 25 ++++++---------- hat/config_json/timeseries.json | 8 ++--- hat/data.py | 26 +++++------------ hat/extract_simulation_timeseries.py | 14 ++------- hat/geo.py | 21 ++++++++++++-- hat/observations.py | 19 ++++++++---- hat/timeseries.py | 26 ++++++++++++----- .../extract_simulation_timeseries_cli.py | 29 ++++++++++++------- 8 files changed, 93 insertions(+), 75 deletions(-) diff --git a/hat/config.py b/hat/config.py index c0e0fed..17d1470 100644 --- a/hat/config.py +++ b/hat/config.py @@ -112,7 +112,9 @@ def read_config(custom_config_filepath: str): def timeseries_config( - simulation_datadir: str, station_metadata: str, config_filepath: str + simulation_files: str, + station_metadata: str, + config_filepath: str ): """Manage configuration settings for timeseries extraction. Priority order: @@ -127,27 +129,18 @@ def timeseries_config( config = read_config(config_filepath) # root directory for search of input files - if not simulation_datadir and not config["simulation_datadir"]: + if simulation_files is None and not config["simulation_files"]: raise ValueError( - """Please provide a data directory of grib or netcdf + """Please provide a list of grib or netcdf simulation files for timeseries extraction""" ) - elif simulation_datadir: - config["simulation_datadir"] = simulation_datadir + elif simulation_files is not None: + config["simulation_datadir"] = simulation_files # station metadata filepath - if not station_metadata and not config["station_metadata_filepath"]: + if station_metadata is None and not config["station_metadata_filepath"]: raise ValueError("Please provide a station metadata filepath") - elif station_metadata: + elif station_metadata is not None: config["station_metadata_filepath"] = station_metadata - # check output filepath is valid before running timeseries extraction - config["simulation_output_file_extension"] = config[ - "simulation_output_filepath" - ].split(".")[-1] - if config["simulation_output_file_extension"] not in ["nc", "csv"]: - raise ValueError( - "output_filepath must be .nc or .csv, please update config", config_filepath - ) - return config diff --git a/hat/config_json/timeseries.json b/hat/config_json/timeseries.json index b8794a3..af44f39 100644 --- a/hat/config_json/timeseries.json +++ b/hat/config_json/timeseries.json @@ -1,11 +1,9 @@ { "station_metadata_filepath": "", - "simulation_datadir": "", - "simulation_input_file_extension": ".grb", + "simulation_files": "*.nc", "simulation_output_filepath": "./simulation_timeseries.nc", - "recursive_search": "False", - "river_network_name": "cama_3min", "station_epsg": 4326, "station_id_column_name": "station_id", - "station_filters":"" + "station_filters":"", + "station_coordinates": ["LisfloodX", "LisfloodY"] } diff --git a/hat/data.py b/hat/data.py index d74a01e..befe4ab 100644 --- a/hat/data.py +++ b/hat/data.py @@ -2,6 +2,7 @@ import os import pathlib import sys +import glob from pathlib import Path from tempfile import TemporaryDirectory from typing import List, Union @@ -67,27 +68,16 @@ def get_tmp_filepath( return os.path.join(tmpdir, f"{fname}{extension}") -def find_files(simulation_datadir, file_extension: str = ".nc", recursive=False): - """Find files in directory by file extension. Optionally recursive - (i.e. search all subdirectory too)""" +def find_files(simulation_files): + """Find files matching regex""" - if not os.path.exists(simulation_datadir): - raise FileNotFoundError(f"Directory does not exist: {simulation_datadir}") - - if recursive: - search_string = f"**/*{file_extension}" - else: - search_string = f"*{file_extension}" - - fpaths = sorted( - [str(file) for file in Path(simulation_datadir).glob(search_string)] - ) + fpaths = glob.glob(simulation_files) if not fpaths: - raise FileNotFoundError( - f"""No {file_extension} files found, - with recursive search as {recursive}, in: {simulation_datadir}""" - ) + raise Exception(f"Could not find any file from regex {simulation_files}") + else: + print("Found following simulation files:") + print(*fpaths, sep="\n") return fpaths diff --git a/hat/extract_simulation_timeseries.py b/hat/extract_simulation_timeseries.py index 7eaf177..791abbf 100644 --- a/hat/extract_simulation_timeseries.py +++ b/hat/extract_simulation_timeseries.py @@ -60,23 +60,15 @@ def geopandas_to_xarray(station_metadata: gpd.GeoDataFrame, timeseries: pd.DataF def extract_timeseries( station_metadata: gpd.GeoDataFrame, - simulation_datadir: str = "", - simulation_fpaths: List = [], + simulation_fpaths: List, config: dict = DEFAULT_CONFIG, ): config = valid_custom_config(config) - if not simulation_datadir and not simulation_fpaths: + if not simulation_fpaths: raise TypeError( """extract_timeseries() missing 1 required variable: - 'simulation_datadir' or 'simulation_fpaths' """ - ) - - if not simulation_fpaths: - simulation_fpaths = find_files( - simulation_datadir, - file_extension=config["simulation_input_file_extension"], - recursive=config["recursive_search"], + 'simulation_fpaths' """ ) # infer coordinates for all grids from first the grid diff --git a/hat/geo.py b/hat/geo.py index c9c8faf..248d3aa 100644 --- a/hat/geo.py +++ b/hat/geo.py @@ -331,12 +331,29 @@ def geopoints_from_csv(fpath: str, lat_name: str, lon_name: str) -> gpd.GeoDataF return gdf +def get_latlon_keys(ds): + + lat_key = None + lon_key = None + if 'lat' in ds.coords and 'lon' in ds.coords: + lat_key = 'lat' + lon_key = 'lon' + elif 'latitude' in ds.coords and 'longitude' in ds.coords: + lat_key = 'latitude' + lon_key = 'longitude' + else: + raise Exception(f"Lat/lon coordinates could not be detected in dataset with coords {ds.coords}") + + return lat_key, lon_key + + def latlon_coords(fpath): """Latitude and longitude coordinates of an xarray""" with xr.open_dataset(fpath) as ds: - lat_coords = ds.coords.get("latitude").data - lon_coords = ds.coords.get("longitude").data + lat_key, lon_key = get_latlon_keys(ds) + lat_coords = ds.coords.get(lat_key).data + lon_coords = ds.coords.get(lon_key).data coords = {"x": lon_coords, "y": lat_coords} del ds diff --git a/hat/observations.py b/hat/observations.py index 035bc12..faf09e2 100644 --- a/hat/observations.py +++ b/hat/observations.py @@ -41,12 +41,19 @@ def read_station_metadata_file( """read hydrological stations from file. will cache as pickle object because .csv file used by the team takes 12 seconds to load""" - if is_csv(fpath): - gdf = read_csv_and_cache(fpath) - gdf = add_geometry_column(gdf, coord_names) - gdf = gdf.set_crs(epsg=epsg) - else: - gdf = gpd.read_file(fpath) + print('station file') + print(fpath) + + try: + if is_csv(fpath): + gdf = read_csv_and_cache(fpath) + gdf = add_geometry_column(gdf, coord_names) + gdf = gdf.set_crs(epsg=epsg) + else: + gdf = gpd.read_file(fpath) + except Exception: + raise Exception(f"Could not open file {fpath}") + # (optionally) filter the stations, e.g. 'Contintent == Europe' if filters is not None: diff --git a/hat/timeseries.py b/hat/timeseries.py index 272273e..4484917 100644 --- a/hat/timeseries.py +++ b/hat/timeseries.py @@ -5,8 +5,9 @@ import numpy as np import pandas as pd import xarray as xr +from dask.diagnostics import ProgressBar -""" NETCDF""" +from hat.geo import get_latlon_keys def mask_array_np(arr, mask): @@ -17,7 +18,10 @@ def mask_array_np(arr, mask): def extract_timeseries_using_mask( - da: xr.DataArray, mask: np.ndarray, core_dims=["y", "x"] + da: xr.DataArray, + mask: np.ndarray, + core_dims=["y", "x"], + station_dim = "station" ): """extract timeseries using a station mask with xarray.apply_ufunc() @@ -47,15 +51,16 @@ def extract_timeseries_using_mask( da, mask, input_core_dims=[core_dims, core_dims], - output_core_dims=[["station"]], + output_core_dims=[[station_dim]], output_dtypes=[da.dtype], exclude_dims=set(core_dims), dask="parallelized", - dask_gufunc_kwargs={"output_sizes": {"station": int(mask.sum())}}, + dask_gufunc_kwargs={"output_sizes": {station_dim: int(mask.sum())}}, ) # extract timeseries (i.e. compute graph) - timeseries = task.compute() + with ProgressBar(dt=10): + timeseries = task.compute() return timeseries @@ -67,12 +72,19 @@ def extract_timeseries_from_filepaths(fpaths: List[str], mask: np.ndarray): # earthkit data file source fs = earthkit.data.from_source("file", fpaths) + xarray_kwargs = {} + if isinstance(fs, earthkit.data.readers.netcdf.NetCDFFieldListReader): + xarray_kwargs["xarray_open_mfdataset_kwargs"] = {"chunks": {'time': 'auto'}} + # xarray dataset - ds = fs.to_xarray() + ds = fs.to_xarray(**xarray_kwargs) + print(ds) + + core_dims = get_latlon_keys(ds) # timeseries extraction using masking algorithm timeseries = extract_timeseries_using_mask( - ds.dis, mask, core_dims=["latitude", "longitude"] + ds.dis, mask, core_dims=core_dims ) return timeseries diff --git a/hat/tools/extract_simulation_timeseries_cli.py b/hat/tools/extract_simulation_timeseries_cli.py index 87e9742..dec7a6e 100644 --- a/hat/tools/extract_simulation_timeseries_cli.py +++ b/hat/tools/extract_simulation_timeseries_cli.py @@ -21,7 +21,7 @@ # hat modules from hat.data import find_files, save_dataset_to_netcdf from hat.extract_simulation_timeseries import DEFAULT_CONFIG, extract_timeseries -from hat.observations import read_station_metadata +from hat.observations import read_station_metadata_file def print_overview(config: dict, station_metadata: gpd.GeoDataFrame, fpaths: List[str]): @@ -61,12 +61,12 @@ def print_default_config(DEFAULT_CONFIG): def command_line_tool( - simulation_datadir: str = typer.Option( - "", - help="Directory of simulation files", + simulation_files: str = typer.Option( + None, + help="List of simulation files", ), station_metadata_filepath: str = typer.Option( - "", + None, help="Path to station metadata file", ), config_filepath: str = typer.Option( @@ -88,20 +88,29 @@ def command_line_tool( title("STARTING TIME SERIES EXTRACTION") config = timeseries_config( - simulation_datadir, station_metadata_filepath, config_filepath + simulation_files, station_metadata_filepath, config_filepath ) - station_metadata = read_station_metadata(station_metadata_filepath) + + # read station file + station_metadata = read_station_metadata_file( + config["station_metadata_filepath"], + config['station_coordinates'], + config['station_epsg'], + config["station_filters"] + ) + + # find station files simulation_fpaths = find_files( - config["simulation_datadir"], - file_extension=config["simulation_input_file_extension"], - recursive=config["recursive_search"], + config["simulation_files"], ) print_overview(config, station_metadata, simulation_fpaths) + # Extract time series timeseries = extract_timeseries( station_metadata, simulation_fpaths=simulation_fpaths ) + print(timeseries) save_dataset_to_netcdf(timeseries, "./simulation_timeseries.nc") From 92ac911b5bff2585f20d6f0b49050df192684d3b Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Thu, 5 Oct 2023 16:13:37 +0000 Subject: [PATCH 4/9] update timeseries extraction to support mars and fdb --- hat/config.py | 24 +----- hat/config_json/timeseries.json | 5 +- hat/data.py | 45 +++++++++++ hat/extract_simulation_timeseries.py | 69 ++++------------- hat/geo.py | 12 ++- hat/observations.py | 2 - hat/timeseries.py | 75 ++++++------------- .../extract_simulation_timeseries_cli.py | 38 +++------- 8 files changed, 102 insertions(+), 168 deletions(-) diff --git a/hat/config.py b/hat/config.py index 17d1470..9f69e14 100644 --- a/hat/config.py +++ b/hat/config.py @@ -112,35 +112,15 @@ def read_config(custom_config_filepath: str): def timeseries_config( - simulation_files: str, - station_metadata: str, config_filepath: str ): """Manage configuration settings for timeseries extraction. Priority order: 1) command line arguments 2) custom config file - 3) default config""" + """ # custom or default configutation file - if not config_filepath: - config = DEFAULT_CONFIG - else: - config = read_config(config_filepath) - - # root directory for search of input files - if simulation_files is None and not config["simulation_files"]: - raise ValueError( - """Please provide a list of grib or netcdf - simulation files for timeseries extraction""" - ) - elif simulation_files is not None: - config["simulation_datadir"] = simulation_files - - # station metadata filepath - if station_metadata is None and not config["station_metadata_filepath"]: - raise ValueError("Please provide a station metadata filepath") - elif station_metadata is not None: - config["station_metadata_filepath"] = station_metadata + config = read_config(config_filepath) return config diff --git a/hat/config_json/timeseries.json b/hat/config_json/timeseries.json index af44f39..52f8cb4 100644 --- a/hat/config_json/timeseries.json +++ b/hat/config_json/timeseries.json @@ -1,6 +1,9 @@ { "station_metadata_filepath": "", - "simulation_files": "*.nc", + "simulation": { + "type": "file", + "files": "*.nc" + }, "simulation_output_filepath": "./simulation_timeseries.nc", "station_epsg": 4326, "station_id_column_name": "station_id", diff --git a/hat/data.py b/hat/data.py index befe4ab..e14aac0 100644 --- a/hat/data.py +++ b/hat/data.py @@ -7,6 +7,7 @@ from tempfile import TemporaryDirectory from typing import List, Union +import earthkit.data import geopandas as gpd import humanize import pandas as pd @@ -255,3 +256,47 @@ def save_dataset_to_netcdf(ds: xr.Dataset, fpath: str): os.remove(fpath) ds.to_netcdf(fpath) + + +def find_main_var(ds): + variable_names = [k for k in ds.variables if len(ds.variables[k].dims) >= 3] + if len(variable_names) > 1: + raise Exception('More than one variable in dataset') + elif len(variable_names) == 0: + raise Exception('Could not find a valid variable in dataset') + else: + var_name = variable_names[0] + return var_name + + +def read_simulation_as_xarray(options): + + if options["type"] == "file": + type = "file" + # find station files + files = find_files( + options["files"], + ) + args = [files] + elif options["type"] in ["mars", 'fdb']: + type = options["type"] + args = [options["request"]] + else: + raise Exception(f"Simulation type {options['type']} not supported. Currently supported: file, mars, fdb") + + # earthkit data file source + fs = earthkit.data.from_source(type, *args) + + xarray_kwargs = {} + if isinstance(fs, earthkit.data.readers.netcdf.NetCDFFieldListReader): + xarray_kwargs["xarray_open_mfdataset_kwargs"] = {"chunks": {'time': 'auto'}} + else: + xarray_kwargs["xarray_open_dataset_kwargs"] = {"chunks": {'time': 'auto'}} + + # xarray dataset + ds = fs.to_xarray(**xarray_kwargs) + + var = find_main_var(ds) + + da = ds[var] + return da diff --git a/hat/extract_simulation_timeseries.py b/hat/extract_simulation_timeseries.py index 791abbf..17afcc2 100644 --- a/hat/extract_simulation_timeseries.py +++ b/hat/extract_simulation_timeseries.py @@ -12,7 +12,7 @@ # hat modules from hat.data import find_files from hat.geo import geopoints_to_array, latlon_coords -from hat.timeseries import all_timeseries, extract_timeseries_from_filepaths +from hat.timeseries import extract_timeseries_using_mask, assign_stations # Read configutation files # NOTE config files are installed as part of hat package using setup.py @@ -21,73 +21,30 @@ DEFAULT_CONFIG = load_package_config("timeseries.json") -def geopandas_to_xarray(station_metadata: gpd.GeoDataFrame, timeseries: pd.DataFrame): - """Convert results from geopandas geodataframe to xarray dataset""" - - # NOTE we do not use this inbuilt method.. - # ds = xr.Dataset.from_dataframe(timeseries) - # because it can be extremely slow (5 mins) when extracting all stations - # i.e. because it creates a separate data array for each station - # here instead we create a single data array for all stations - - # 2D numpy array of timeseries - arr = timeseries.to_numpy() - - # labels for the numpy array - coords = { - "time": list(timeseries.index), - "station": list(timeseries.columns), - } - - # xarray data array is essentially "just" a numpy array with labels - da = xr.DataArray( - arr, dims=["time", "station"], coords=coords, name="simulation_timeseries" - ) - - # parse point geometries data into lists - lons = [point.x for point in station_metadata.geometry] - lats = [point.y for point in station_metadata.geometry] - - # add to dataarray - da["longitude"] = ("station", lons) - da["latitude"] = ("station", lats) - - # create xarray data set (in this case with just one data array) - ds = da.to_dataset() - - return ds - def extract_timeseries( - station_metadata: gpd.GeoDataFrame, - simulation_fpaths: List, + stations: gpd.GeoDataFrame, + simulations_da: xr.DataArray, config: dict = DEFAULT_CONFIG, ): config = valid_custom_config(config) - if not simulation_fpaths: - raise TypeError( - """extract_timeseries() missing 1 required variable: - 'simulation_fpaths' """ - ) - # infer coordinates for all grids from first the grid - coords = latlon_coords(simulation_fpaths[0]) + coords = latlon_coords(simulations_da) # numpy array of station locations - station_mask = geopoints_to_array(station_metadata, coords) + station_mask = geopoints_to_array(stations, coords) # extract timeseries from files using mask - timeseries_from_mask = extract_timeseries_from_filepaths( - simulation_fpaths, station_mask - ) + da_points = extract_timeseries_using_mask(simulations_da, station_mask) # all timeseries (i.e. handle proximal and duplicate stations) - timeseries = all_timeseries( - station_metadata, station_mask, timeseries_from_mask, coords + da_stations = assign_stations( + stations, + station_mask, + da_points, + coords, + config["station_id_column_name"] ) - # convert to xarray dataset - timeseries = geopandas_to_xarray(station_metadata, timeseries) - - return timeseries + return da_stations diff --git a/hat/geo.py b/hat/geo.py index 248d3aa..3db6af4 100644 --- a/hat/geo.py +++ b/hat/geo.py @@ -347,14 +347,12 @@ def get_latlon_keys(ds): return lat_key, lon_key -def latlon_coords(fpath): +def latlon_coords(ds): """Latitude and longitude coordinates of an xarray""" - with xr.open_dataset(fpath) as ds: - lat_key, lon_key = get_latlon_keys(ds) - lat_coords = ds.coords.get(lat_key).data - lon_coords = ds.coords.get(lon_key).data - coords = {"x": lon_coords, "y": lat_coords} - del ds + lat_key, lon_key = get_latlon_keys(ds) + lat_coords = ds.coords.get(lat_key).data + lon_coords = ds.coords.get(lon_key).data + coords = {"x": lon_coords, "y": lat_coords} return coords diff --git a/hat/observations.py b/hat/observations.py index faf09e2..2de58ad 100644 --- a/hat/observations.py +++ b/hat/observations.py @@ -54,9 +54,7 @@ def read_station_metadata_file( except Exception: raise Exception(f"Could not open file {fpath}") - # (optionally) filter the stations, e.g. 'Contintent == Europe' if filters is not None: gdf = filter_dataframe(gdf, filters) return gdf - diff --git a/hat/timeseries.py b/hat/timeseries.py index 4484917..ef89561 100644 --- a/hat/timeseries.py +++ b/hat/timeseries.py @@ -1,6 +1,6 @@ from typing import List -import earthkit.data +from tqdm import tqdm import geopandas as gpd import numpy as np import pandas as pd @@ -20,14 +20,10 @@ def mask_array_np(arr, mask): def extract_timeseries_using_mask( da: xr.DataArray, mask: np.ndarray, - core_dims=["y", "x"], station_dim = "station" ): """extract timeseries using a station mask with xarray.apply_ufunc() - - input_core_dims - list of core dimensions on each input argument - (needs to be same length as number of input arguments) - output_core_dims list of core dimensions on each output (needs to be same length as number of output variables) @@ -45,6 +41,8 @@ def extract_timeseries_using_mask( """ + core_dims = get_latlon_keys(da) + # dask computational graph (i.e. lazy) task = xr.apply_ufunc( mask_array_np, @@ -65,31 +63,6 @@ def extract_timeseries_using_mask( return timeseries -def extract_timeseries_from_filepaths(fpaths: List[str], mask: np.ndarray): - """extract timeseries from a collection of files and given a boolean mask - to represent point locations""" - - # earthkit data file source - fs = earthkit.data.from_source("file", fpaths) - - xarray_kwargs = {} - if isinstance(fs, earthkit.data.readers.netcdf.NetCDFFieldListReader): - xarray_kwargs["xarray_open_mfdataset_kwargs"] = {"chunks": {'time': 'auto'}} - - # xarray dataset - ds = fs.to_xarray(**xarray_kwargs) - print(ds) - - core_dims = get_latlon_keys(ds) - - # timeseries extraction using masking algorithm - timeseries = extract_timeseries_using_mask( - ds.dis, mask, core_dims=core_dims - ) - - return timeseries - - def station_timeseries_index( station: gpd.GeoDataFrame, lon_in_mask: np.ndarray, lat_in_mask: np.ndarray ): @@ -103,8 +76,12 @@ def station_timeseries_index( return idx -def all_timeseries( - stations: gpd.GeoDataFrame, mask, masked_timeseries: xr.DataArray, coords: dict +def assign_stations( + stations: gpd.GeoDataFrame, + mask: np.ndarray, + da_stations: xr.DataArray, + coords: dict, + station_dim: str, ) -> pd.DataFrame: """ Fixed bug in timeseries extraction by mask where: @@ -119,42 +96,32 @@ def all_timeseries( The idea is to ensure we preserve ordering so that we can re-map station_ids to latlon coordinates """ - # broadcast the coord arrays to same shape as mask lon2d, lat2d = np.meshgrid(coords["x"], coords["y"]) # get lats and lons of True values in mask lat_in_mask = lat2d[mask] lon_in_mask = lon2d[mask] + """ We can now get the complete collection of timeseries (even where there are duplicates of proximal stations) """ - - # table of all station timeseries - complete_station_timeseries = {} - - for _, station in stations.iterrows(): + da_with_duplicate = None + stations_list = [] + stations_id = [] + for _, station in tqdm(stations.iterrows(), total=stations.shape[0]): # station id - station_id = station["station_id"] + station_id = station[station_dim] # timeseries index for a given station timeseries_index = station_timeseries_index(station, lon_in_mask, lat_in_mask) - # timeseries values for a given station - station_timeseries = masked_timeseries.sel(station=timeseries_index) - - # update complete station timeseries - complete_station_timeseries[station_id] = station_timeseries.values.flatten() - """ - Finally, create a pandas dataframe with a datetime index - """ - - # stationIDs and discharge values - df = pd.DataFrame(complete_station_timeseries) + # add to the list + stations_id += [station_id] + stations_list += [timeseries_index] - # datetime index - df["datetime"] = pd.to_datetime(station_timeseries.time.values) - df = df.set_index("datetime") + da_with_duplicate = da_stations.sel(station=stations_list) + da_with_duplicate = da_with_duplicate.assign_coords(station=stations_id) - return df + return da_with_duplicate diff --git a/hat/tools/extract_simulation_timeseries_cli.py b/hat/tools/extract_simulation_timeseries_cli.py index dec7a6e..c6256ee 100644 --- a/hat/tools/extract_simulation_timeseries_cli.py +++ b/hat/tools/extract_simulation_timeseries_cli.py @@ -19,12 +19,13 @@ from hat.config import timeseries_config # hat modules -from hat.data import find_files, save_dataset_to_netcdf +from hat.data import save_dataset_to_netcdf, read_simulation_as_xarray from hat.extract_simulation_timeseries import DEFAULT_CONFIG, extract_timeseries from hat.observations import read_station_metadata_file -def print_overview(config: dict, station_metadata: gpd.GeoDataFrame, fpaths: List[str]): + +def print_overview(config: dict, station_metadata: gpd.GeoDataFrame, simulation): """Print overview of relevant information for user""" title("Configuration", color="cyan") @@ -39,9 +40,7 @@ def print_overview(config: dict, station_metadata: gpd.GeoDataFrame, fpaths: Lis print(f"number of stations = {len(station_metadata)}") title("Simulation", color="cyan") - print( - f"number of simulation files = {len(fpaths)}\n", - ) + print(simulation) def print_default_config(DEFAULT_CONFIG): @@ -61,14 +60,6 @@ def print_default_config(DEFAULT_CONFIG): def command_line_tool( - simulation_files: str = typer.Option( - None, - help="List of simulation files", - ), - station_metadata_filepath: str = typer.Option( - None, - help="Path to station metadata file", - ), config_filepath: str = typer.Option( "", help="Path to configuration file", @@ -87,32 +78,27 @@ def command_line_tool( title("STARTING TIME SERIES EXTRACTION") - config = timeseries_config( - simulation_files, station_metadata_filepath, config_filepath - ) + config = timeseries_config(config_filepath) # read station file - station_metadata = read_station_metadata_file( + stations = read_station_metadata_file( config["station_metadata_filepath"], config['station_coordinates'], config['station_epsg'], config["station_filters"] ) - # find station files - simulation_fpaths = find_files( - config["simulation_files"], - ) + # read simulated data + simulation = read_simulation_as_xarray(config['simulation']) - print_overview(config, station_metadata, simulation_fpaths) + print_overview(config, stations, simulation) # Extract time series - timeseries = extract_timeseries( - station_metadata, simulation_fpaths=simulation_fpaths - ) + timeseries = extract_timeseries(stations, simulation) + title("Timeseries extracted") print(timeseries) - save_dataset_to_netcdf(timeseries, "./simulation_timeseries.nc") + save_dataset_to_netcdf(timeseries, config["simulation_output_filepath"]) title("TIMESERIES EXTRACTION COMPLETE", background="cyan", bold=True) From dec7827f253c68a33160b49cfddb42f692b335ae Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Mon, 9 Oct 2023 13:35:30 +0000 Subject: [PATCH 5/9] fix formatting --- hat/config.py | 4 +--- hat/data.py | 19 ++++++++++--------- hat/extract_simulation_timeseries.py | 13 ++----------- hat/geo.py | 17 +++++++++-------- hat/images.py | 3 ++- hat/observations.py | 10 +++------- hat/timeseries.py | 14 +++++--------- .../extract_simulation_timeseries_cli.py | 12 +++++------- 8 files changed, 37 insertions(+), 55 deletions(-) diff --git a/hat/config.py b/hat/config.py index 9f69e14..3a662c9 100644 --- a/hat/config.py +++ b/hat/config.py @@ -111,9 +111,7 @@ def read_config(custom_config_filepath: str): return config -def timeseries_config( - config_filepath: str -): +def timeseries_config(config_filepath: str): """Manage configuration settings for timeseries extraction. Priority order: 1) command line arguments diff --git a/hat/data.py b/hat/data.py index e14aac0..40f0b9f 100644 --- a/hat/data.py +++ b/hat/data.py @@ -1,9 +1,8 @@ +import glob import json import os import pathlib import sys -import glob -from pathlib import Path from tempfile import TemporaryDirectory from typing import List, Union @@ -261,16 +260,16 @@ def save_dataset_to_netcdf(ds: xr.Dataset, fpath: str): def find_main_var(ds): variable_names = [k for k in ds.variables if len(ds.variables[k].dims) >= 3] if len(variable_names) > 1: - raise Exception('More than one variable in dataset') + raise Exception("More than one variable in dataset") elif len(variable_names) == 0: - raise Exception('Could not find a valid variable in dataset') + raise Exception("Could not find a valid variable in dataset") else: var_name = variable_names[0] return var_name def read_simulation_as_xarray(options): - + if options["type"] == "file": type = "file" # find station files @@ -278,20 +277,22 @@ def read_simulation_as_xarray(options): options["files"], ) args = [files] - elif options["type"] in ["mars", 'fdb']: + elif options["type"] in ["mars", "fdb"]: type = options["type"] args = [options["request"]] else: - raise Exception(f"Simulation type {options['type']} not supported. Currently supported: file, mars, fdb") + raise Exception( + f"Simulation type {options['type']} not supported. Currently supported: file, mars, fdb" # noqa: E501 + ) # earthkit data file source fs = earthkit.data.from_source(type, *args) xarray_kwargs = {} if isinstance(fs, earthkit.data.readers.netcdf.NetCDFFieldListReader): - xarray_kwargs["xarray_open_mfdataset_kwargs"] = {"chunks": {'time': 'auto'}} + xarray_kwargs["xarray_open_mfdataset_kwargs"] = {"chunks": {"time": "auto"}} else: - xarray_kwargs["xarray_open_dataset_kwargs"] = {"chunks": {'time': 'auto'}} + xarray_kwargs["xarray_open_dataset_kwargs"] = {"chunks": {"time": "auto"}} # xarray dataset ds = fs.to_xarray(**xarray_kwargs) diff --git a/hat/extract_simulation_timeseries.py b/hat/extract_simulation_timeseries.py index 17afcc2..3d64323 100644 --- a/hat/extract_simulation_timeseries.py +++ b/hat/extract_simulation_timeseries.py @@ -1,18 +1,14 @@ """ Python module for extracting simulation timeseries from grids (i.e. rasters) """ -from typing import List - import geopandas as gpd -import pandas as pd import xarray as xr from hat.config import load_package_config, valid_custom_config # hat modules -from hat.data import find_files from hat.geo import geopoints_to_array, latlon_coords -from hat.timeseries import extract_timeseries_using_mask, assign_stations +from hat.timeseries import assign_stations, extract_timeseries_using_mask # Read configutation files # NOTE config files are installed as part of hat package using setup.py @@ -21,7 +17,6 @@ DEFAULT_CONFIG = load_package_config("timeseries.json") - def extract_timeseries( stations: gpd.GeoDataFrame, simulations_da: xr.DataArray, @@ -40,11 +35,7 @@ def extract_timeseries( # all timeseries (i.e. handle proximal and duplicate stations) da_stations = assign_stations( - stations, - station_mask, - da_points, - coords, - config["station_id_column_name"] + stations, station_mask, da_points, coords, config["station_id_column_name"] ) return da_stations diff --git a/hat/geo.py b/hat/geo.py index 3db6af4..4813bf7 100644 --- a/hat/geo.py +++ b/hat/geo.py @@ -6,7 +6,6 @@ import geopandas as gpd import numpy as np import shapely -import xarray as xr from jsonschema import ValidationError, validate from pyproj import CRS, Transformer @@ -335,14 +334,16 @@ def get_latlon_keys(ds): lat_key = None lon_key = None - if 'lat' in ds.coords and 'lon' in ds.coords: - lat_key = 'lat' - lon_key = 'lon' - elif 'latitude' in ds.coords and 'longitude' in ds.coords: - lat_key = 'latitude' - lon_key = 'longitude' + if "lat" in ds.coords and "lon" in ds.coords: + lat_key = "lat" + lon_key = "lon" + elif "latitude" in ds.coords and "longitude" in ds.coords: + lat_key = "latitude" + lon_key = "longitude" else: - raise Exception(f"Lat/lon coordinates could not be detected in dataset with coords {ds.coords}") + raise Exception( + f"Lat/lon coordinates could not be detected in dataset with coords {ds.coords}" # noqa: E501 + ) return lat_key, lon_key diff --git a/hat/images.py b/hat/images.py index 8eee99e..14ebeae 100644 --- a/hat/images.py +++ b/hat/images.py @@ -1,7 +1,8 @@ import matplotlib - import numpy as np from quicklook import quicklook + + def arr_to_image(arr: np.array) -> np.array: """modify array so that it is optimized for viewing""" diff --git a/hat/observations.py b/hat/observations.py index 2de58ad..f895201 100644 --- a/hat/observations.py +++ b/hat/observations.py @@ -1,7 +1,6 @@ import geopandas as gpd import pandas as pd -from hat.config import load_package_config from hat.data import is_csv, read_csv_and_cache from hat.filters import filter_dataframe @@ -33,17 +32,14 @@ def add_geometry_column(gdf: gpd.GeoDataFrame, coord_names): def read_station_metadata_file( - fpath: str, - coord_names: str, - epsg: int, - filters: str = None + fpath: str, coord_names: str, epsg: int, filters: str = None ) -> gpd.GeoDataFrame: """read hydrological stations from file. will cache as pickle object because .csv file used by the team takes 12 seconds to load""" - print('station file') + print("station file") print(fpath) - + try: if is_csv(fpath): gdf = read_csv_and_cache(fpath) diff --git a/hat/timeseries.py b/hat/timeseries.py index ef89561..b19f723 100644 --- a/hat/timeseries.py +++ b/hat/timeseries.py @@ -1,11 +1,9 @@ -from typing import List - -from tqdm import tqdm import geopandas as gpd import numpy as np import pandas as pd import xarray as xr from dask.diagnostics import ProgressBar +from tqdm import tqdm from hat.geo import get_latlon_keys @@ -18,9 +16,7 @@ def mask_array_np(arr, mask): def extract_timeseries_using_mask( - da: xr.DataArray, - mask: np.ndarray, - station_dim = "station" + da: xr.DataArray, mask: np.ndarray, station_dim="station" ): """extract timeseries using a station mask with xarray.apply_ufunc() @@ -71,15 +67,15 @@ def station_timeseries_index( dx = station.geometry.x - lon_in_mask dy = station.geometry.y - lat_in_mask - idx = (dx**2 + dy**2).argmin() + idx = (dx ** 2 + dy ** 2).argmin() return idx def assign_stations( stations: gpd.GeoDataFrame, - mask: np.ndarray, - da_stations: xr.DataArray, + mask: np.ndarray, + da_stations: xr.DataArray, coords: dict, station_dim: str, ) -> pd.DataFrame: diff --git a/hat/tools/extract_simulation_timeseries_cli.py b/hat/tools/extract_simulation_timeseries_cli.py index c6256ee..c497de0 100644 --- a/hat/tools/extract_simulation_timeseries_cli.py +++ b/hat/tools/extract_simulation_timeseries_cli.py @@ -10,7 +10,6 @@ import json import os -from typing import List import geopandas as gpd import typer @@ -19,12 +18,11 @@ from hat.config import timeseries_config # hat modules -from hat.data import save_dataset_to_netcdf, read_simulation_as_xarray +from hat.data import read_simulation_as_xarray, save_dataset_to_netcdf from hat.extract_simulation_timeseries import DEFAULT_CONFIG, extract_timeseries from hat.observations import read_station_metadata_file - def print_overview(config: dict, station_metadata: gpd.GeoDataFrame, simulation): """Print overview of relevant information for user""" @@ -83,13 +81,13 @@ def command_line_tool( # read station file stations = read_station_metadata_file( config["station_metadata_filepath"], - config['station_coordinates'], - config['station_epsg'], - config["station_filters"] + config["station_coordinates"], + config["station_epsg"], + config["station_filters"], ) # read simulated data - simulation = read_simulation_as_xarray(config['simulation']) + simulation = read_simulation_as_xarray(config["simulation"]) print_overview(config, stations, simulation) From b757294b9f34f6c100b7befc4025751d49122453 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Mon, 9 Oct 2023 13:44:40 +0000 Subject: [PATCH 6/9] update extract tool interface and doc --- docs/reference.md | 28 +++++++++---------- docs/usage.md | 2 +- .../extract_simulation_timeseries_cli.py | 18 ++++++------ 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/docs/reference.md b/docs/reference.md index b9764a1..1f797c3 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -4,30 +4,28 @@ #### `hat-extract-timeseries` -Extract timeseries from a collection of simulation raster files. Timeseries extraction requires two variables +Extract timeseries from a collection of simulation raster files. Timeseries extraction requires a json configuration file, which can be provided on the command line using `--config` -1. simulation data directory -2. station metadata filepath + $ hat-extract-timeseries --config timeseries.json -They can be defined at the command line using `--simulation-datadir` and `--station-metadata` - - $ hat-extract-timeseries --simulation-datadir $GRIB_DATADIR --station-metadata $STATION_METADATA_FILEPATH - -This will use the default configuration, which you can show using `--show-default-config` +You can show an example of config file using `--show-default-config` $ extract_simulation_timeseries --show-default-config -To use a custom configuration there is `--config` - - $ hat-extract-timeseries --config $CUSTOM_CONFIG - To create your own configuration json file you might want to start with the default configuration as a template. Default values will be used where possible if not defined in the custom figuration file. Here is an example custom configuration file. # example custom configuration .json file { - "simulation_datadir": "/path/to/simulation_datadir", - "station_metadata": "path/to/Qgis_World_outlet_20221102.csv", - "input_file_extension": ".nc" + "station_metadata_filepath": "/path/to/stations.csv", + "simulation": { + "type": "file", + "files": "*.nc" + }, + "simulation_output_filepath": "./simulation_timeseries.nc", + "station_epsg": 4326, + "station_id_column_name": "obsid", + "station_filters":"", + "station_coordinates": ["Lon", "Lat"] } #### `hat-hydrostats` diff --git a/docs/usage.md b/docs/usage.md index 7b40fd5..92a8477 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -10,7 +10,7 @@ If you have already [installed](installation.md) hat then Run a command line tool, for example - $ extract_simulation_timeseries --help + $ hat-extract-timeseries --help For more information on individual command line tools, use the `--help` option at the command line or read the [reference](reference.md) documentation diff --git a/hat/tools/extract_simulation_timeseries_cli.py b/hat/tools/extract_simulation_timeseries_cli.py index c497de0..ffcdbc3 100644 --- a/hat/tools/extract_simulation_timeseries_cli.py +++ b/hat/tools/extract_simulation_timeseries_cli.py @@ -58,7 +58,7 @@ def print_default_config(DEFAULT_CONFIG): def command_line_tool( - config_filepath: str = typer.Option( + config: str = typer.Option( "", help="Path to configuration file", ), @@ -76,27 +76,27 @@ def command_line_tool( title("STARTING TIME SERIES EXTRACTION") - config = timeseries_config(config_filepath) + cfg = timeseries_config(config) # read station file stations = read_station_metadata_file( - config["station_metadata_filepath"], - config["station_coordinates"], - config["station_epsg"], - config["station_filters"], + cfg["station_metadata_filepath"], + cfg["station_coordinates"], + cfg["station_epsg"], + cfg["station_filters"], ) # read simulated data - simulation = read_simulation_as_xarray(config["simulation"]) + simulation = read_simulation_as_xarray(cfg["simulation"]) - print_overview(config, stations, simulation) + print_overview(cfg, stations, simulation) # Extract time series timeseries = extract_timeseries(stations, simulation) title("Timeseries extracted") print(timeseries) - save_dataset_to_netcdf(timeseries, config["simulation_output_filepath"]) + save_dataset_to_netcdf(timeseries, cfg["simulation_output_filepath"]) title("TIMESERIES EXTRACTION COMPLETE", background="cyan", bold=True) From 5c4e12c5d81084e50d64bafce6615472bbef82b5 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Mon, 9 Oct 2023 13:51:27 +0000 Subject: [PATCH 7/9] fix formatting --- hat/data.py | 1 - hat/geo.py | 1 - hat/timeseries.py | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/hat/data.py b/hat/data.py index 40f0b9f..4f25eee 100644 --- a/hat/data.py +++ b/hat/data.py @@ -269,7 +269,6 @@ def find_main_var(ds): def read_simulation_as_xarray(options): - if options["type"] == "file": type = "file" # find station files diff --git a/hat/geo.py b/hat/geo.py index 4813bf7..6c0408b 100644 --- a/hat/geo.py +++ b/hat/geo.py @@ -331,7 +331,6 @@ def geopoints_from_csv(fpath: str, lat_name: str, lon_name: str) -> gpd.GeoDataF def get_latlon_keys(ds): - lat_key = None lon_key = None if "lat" in ds.coords and "lon" in ds.coords: diff --git a/hat/timeseries.py b/hat/timeseries.py index b19f723..1fb1958 100644 --- a/hat/timeseries.py +++ b/hat/timeseries.py @@ -67,7 +67,7 @@ def station_timeseries_index( dx = station.geometry.x - lon_in_mask dy = station.geometry.y - lat_in_mask - idx = (dx ** 2 + dy ** 2).argmin() + idx = (dx**2 + dy**2).argmin() return idx From 59bfe1aab00544e72570fbd87b1db11388012ba2 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Mon, 9 Oct 2023 14:11:17 +0000 Subject: [PATCH 8/9] fix testts --- hat/config.py | 13 ------------- hat/tools/extract_simulation_timeseries_cli.py | 4 ++-- tests/test_config.py | 15 +-------------- 3 files changed, 3 insertions(+), 29 deletions(-) diff --git a/hat/config.py b/hat/config.py index 3a662c9..49318da 100644 --- a/hat/config.py +++ b/hat/config.py @@ -109,16 +109,3 @@ def read_config(custom_config_filepath: str): config = valid_custom_config(custom_config) return config - - -def timeseries_config(config_filepath: str): - """Manage configuration settings for timeseries extraction. - Priority order: - 1) command line arguments - 2) custom config file - """ - - # custom or default configutation file - config = read_config(config_filepath) - - return config diff --git a/hat/tools/extract_simulation_timeseries_cli.py b/hat/tools/extract_simulation_timeseries_cli.py index ffcdbc3..dd7b53a 100644 --- a/hat/tools/extract_simulation_timeseries_cli.py +++ b/hat/tools/extract_simulation_timeseries_cli.py @@ -15,7 +15,7 @@ import typer from hat.cli import prettyprint, title -from hat.config import timeseries_config +from hat.config import read_config # hat modules from hat.data import read_simulation_as_xarray, save_dataset_to_netcdf @@ -76,7 +76,7 @@ def command_line_tool( title("STARTING TIME SERIES EXTRACTION") - cfg = timeseries_config(config) + cfg = read_config(config) # read station file stations = read_station_metadata_file( diff --git a/tests/test_config.py b/tests/test_config.py index 9a36546..941a014 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -8,13 +8,7 @@ # See https://setuptools.pypa.io/en/latest/pkg_resources.html with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning) - from hat.config import ( - DEFAULT_CONFIG, - booleanify, - read_config, - timeseries_config, - valid_custom_config, - ) + from hat.config import DEFAULT_CONFIG, booleanify, read_config, valid_custom_config def test_DEFAULT_CONFIG(): @@ -56,10 +50,3 @@ def test_read_config(): with pytest.raises(ValueError): _ = read_config(filepath_exists_but_is_not_json) - - -def test_timeseries_config(): - empty_path = "" - - with pytest.raises(ValueError): - _ = timeseries_config(empty_path, empty_path, empty_path) From e661941a1a50cda28b2221a11bbb3c48b2db17b2 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Mon, 9 Oct 2023 15:35:58 +0000 Subject: [PATCH 9/9] update version --- hat/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hat/version.py b/hat/version.py index 3d26edf..3d18726 100644 --- a/hat/version.py +++ b/hat/version.py @@ -1 +1 @@ -__version__ = "0.4.1" +__version__ = "0.5.0"