Skip to content

Commit

Permalink
Merge pull request #19 from ecmwf/develop
Browse files Browse the repository at this point in the history
Refactoring of the time series extraction tool
  • Loading branch information
corentincarton authored Oct 9, 2023
2 parents e823fe7 + e661941 commit 058bf6d
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 285 deletions.
28 changes: 13 additions & 15 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,28 @@

#### `hat-extract-timeseries`

Extract timeseries from a collection of simulation raster files. Timeseries extraction requires two variables
Extract timeseries from a collection of simulation raster files. Timeseries extraction requires a json configuration file, which can be provided on the command line using `--config`

1. simulation data directory
2. station metadata filepath
$ hat-extract-timeseries --config timeseries.json

They can be defined at the command line using `--simulation-datadir` and `--station-metadata`

$ hat-extract-timeseries --simulation-datadir $GRIB_DATADIR --station-metadata $STATION_METADATA_FILEPATH

This will use the default configuration, which you can show using `--show-default-config`
You can show an example of config file using `--show-default-config`

$ extract_simulation_timeseries --show-default-config

To use a custom configuration there is `--config`

$ hat-extract-timeseries --config $CUSTOM_CONFIG

To create your own configuration json file you might want to start with the default configuration as a template. Default values will be used where possible if not defined in the custom figuration file. Here is an example custom configuration file.

# example custom configuration .json file
{
"simulation_datadir": "/path/to/simulation_datadir",
"station_metadata": "path/to/Qgis_World_outlet_20221102.csv",
"input_file_extension": ".nc"
"station_metadata_filepath": "/path/to/stations.csv",
"simulation": {
"type": "file",
"files": "*.nc"
},
"simulation_output_filepath": "./simulation_timeseries.nc",
"station_epsg": 4326,
"station_id_column_name": "obsid",
"station_filters":"",
"station_coordinates": ["Lon", "Lat"]
}

#### `hat-hydrostats`
Expand Down
2 changes: 1 addition & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ If you have already [installed](installation.md) hat then

Run a command line tool, for example

$ extract_simulation_timeseries --help
$ hat-extract-timeseries --help

For more information on individual command line tools, use the `--help` option at the command line or read the [reference](reference.md) documentation

Expand Down
42 changes: 0 additions & 42 deletions hat/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,45 +109,3 @@ def read_config(custom_config_filepath: str):
config = valid_custom_config(custom_config)

return config


def timeseries_config(
simulation_datadir: str, station_metadata: str, config_filepath: str
):
"""Manage configuration settings for timeseries extraction.
Priority order:
1) command line arguments
2) custom config file
3) default config"""

# custom or default configutation file
if not config_filepath:
config = DEFAULT_CONFIG
else:
config = read_config(config_filepath)

# root directory for search of input files
if not simulation_datadir and not config["simulation_datadir"]:
raise ValueError(
"""Please provide a data directory of grib or netcdf
simulation files for timeseries extraction"""
)
elif simulation_datadir:
config["simulation_datadir"] = simulation_datadir

# station metadata filepath
if not station_metadata and not config["station_metadata_filepath"]:
raise ValueError("Please provide a station metadata filepath")
elif station_metadata:
config["station_metadata_filepath"] = station_metadata

# check output filepath is valid before running timeseries extraction
config["simulation_output_file_extension"] = config[
"simulation_output_filepath"
].split(".")[-1]
if config["simulation_output_file_extension"] not in ["nc", "csv"]:
raise ValueError(
"output_filepath must be .nc or .csv, please update config", config_filepath
)

return config
11 changes: 6 additions & 5 deletions hat/config_json/timeseries.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
{
"station_metadata_filepath": "",
"simulation_datadir": "",
"simulation_input_file_extension": ".grb",
"simulation": {
"type": "file",
"files": "*.nc"
},
"simulation_output_filepath": "./simulation_timeseries.nc",
"recursive_search": "False",
"river_network_name": "cama_3min",
"station_epsg": 4326,
"station_id_column_name": "station_id",
"station_filters":""
"station_filters":"",
"station_coordinates": ["LisfloodX", "LisfloodY"]
}
73 changes: 54 additions & 19 deletions hat/data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import glob
import json
import os
import pathlib
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Union

import earthkit.data
import geopandas as gpd
import humanize
import pandas as pd
Expand Down Expand Up @@ -67,27 +68,16 @@ def get_tmp_filepath(
return os.path.join(tmpdir, f"{fname}{extension}")


def find_files(simulation_datadir, file_extension: str = ".nc", recursive=False):
"""Find files in directory by file extension. Optionally recursive
(i.e. search all subdirectory too)"""
def find_files(simulation_files):
"""Find files matching regex"""

if not os.path.exists(simulation_datadir):
raise FileNotFoundError(f"Directory does not exist: {simulation_datadir}")

if recursive:
search_string = f"**/*{file_extension}"
else:
search_string = f"*{file_extension}"

fpaths = sorted(
[str(file) for file in Path(simulation_datadir).glob(search_string)]
)
fpaths = glob.glob(simulation_files)

if not fpaths:
raise FileNotFoundError(
f"""No {file_extension} files found,
with recursive search as {recursive}, in: {simulation_datadir}"""
)
raise Exception(f"Could not find any file from regex {simulation_files}")
else:
print("Found following simulation files:")
print(*fpaths, sep="\n")

return fpaths

Expand Down Expand Up @@ -265,3 +255,48 @@ def save_dataset_to_netcdf(ds: xr.Dataset, fpath: str):
os.remove(fpath)

ds.to_netcdf(fpath)


def find_main_var(ds):
variable_names = [k for k in ds.variables if len(ds.variables[k].dims) >= 3]
if len(variable_names) > 1:
raise Exception("More than one variable in dataset")
elif len(variable_names) == 0:
raise Exception("Could not find a valid variable in dataset")
else:
var_name = variable_names[0]
return var_name


def read_simulation_as_xarray(options):
if options["type"] == "file":
type = "file"
# find station files
files = find_files(
options["files"],
)
args = [files]
elif options["type"] in ["mars", "fdb"]:
type = options["type"]
args = [options["request"]]
else:
raise Exception(
f"Simulation type {options['type']} not supported. Currently supported: file, mars, fdb" # noqa: E501
)

# earthkit data file source
fs = earthkit.data.from_source(type, *args)

xarray_kwargs = {}
if isinstance(fs, earthkit.data.readers.netcdf.NetCDFFieldListReader):
xarray_kwargs["xarray_open_mfdataset_kwargs"] = {"chunks": {"time": "auto"}}
else:
xarray_kwargs["xarray_open_dataset_kwargs"] = {"chunks": {"time": "auto"}}

# xarray dataset
ds = fs.to_xarray(**xarray_kwargs)

var = find_main_var(ds)

da = ds[var]
return da
78 changes: 9 additions & 69 deletions hat/extract_simulation_timeseries.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
"""
Python module for extracting simulation timeseries from grids (i.e. rasters)
"""
from typing import List

import geopandas as gpd
import pandas as pd
import xarray as xr

from hat.config import load_package_config, valid_custom_config

# hat modules
from hat.data import find_files
from hat.geo import geopoints_to_array, latlon_coords
from hat.timeseries import all_timeseries, extract_timeseries_from_filepaths
from hat.timeseries import assign_stations, extract_timeseries_using_mask

# Read configutation files
# NOTE config files are installed as part of hat package using setup.py
Expand All @@ -21,81 +17,25 @@
DEFAULT_CONFIG = load_package_config("timeseries.json")


def geopandas_to_xarray(station_metadata: gpd.GeoDataFrame, timeseries: pd.DataFrame):
"""Convert results from geopandas geodataframe to xarray dataset"""

# NOTE we do not use this inbuilt method..
# ds = xr.Dataset.from_dataframe(timeseries)
# because it can be extremely slow (5 mins) when extracting all stations
# i.e. because it creates a separate data array for each station
# here instead we create a single data array for all stations

# 2D numpy array of timeseries
arr = timeseries.to_numpy()

# labels for the numpy array
coords = {
"time": list(timeseries.index),
"station": list(timeseries.columns),
}

# xarray data array is essentially "just" a numpy array with labels
da = xr.DataArray(
arr, dims=["time", "station"], coords=coords, name="simulation_timeseries"
)

# parse point geometries data into lists
lons = [point.x for point in station_metadata.geometry]
lats = [point.y for point in station_metadata.geometry]

# add to dataarray
da["longitude"] = ("station", lons)
da["latitude"] = ("station", lats)

# create xarray data set (in this case with just one data array)
ds = da.to_dataset()

return ds


def extract_timeseries(
station_metadata: gpd.GeoDataFrame,
simulation_datadir: str = "",
simulation_fpaths: List = [],
stations: gpd.GeoDataFrame,
simulations_da: xr.DataArray,
config: dict = DEFAULT_CONFIG,
):
config = valid_custom_config(config)

if not simulation_datadir and not simulation_fpaths:
raise TypeError(
"""extract_timeseries() missing 1 required variable:
'simulation_datadir' or 'simulation_fpaths' """
)

if not simulation_fpaths:
simulation_fpaths = find_files(
simulation_datadir,
file_extension=config["simulation_input_file_extension"],
recursive=config["recursive_search"],
)

# infer coordinates for all grids from first the grid
coords = latlon_coords(simulation_fpaths[0])
coords = latlon_coords(simulations_da)

# numpy array of station locations
station_mask = geopoints_to_array(station_metadata, coords)
station_mask = geopoints_to_array(stations, coords)

# extract timeseries from files using mask
timeseries_from_mask = extract_timeseries_from_filepaths(
simulation_fpaths, station_mask
)
da_points = extract_timeseries_using_mask(simulations_da, station_mask)

# all timeseries (i.e. handle proximal and duplicate stations)
timeseries = all_timeseries(
station_metadata, station_mask, timeseries_from_mask, coords
da_stations = assign_stations(
stations, station_mask, da_points, coords, config["station_id_column_name"]
)

# convert to xarray dataset
timeseries = geopandas_to_xarray(station_metadata, timeseries)

return timeseries
return da_stations
29 changes: 22 additions & 7 deletions hat/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import geopandas as gpd
import numpy as np
import shapely
import xarray as xr
from jsonschema import ValidationError, validate
from pyproj import CRS, Transformer

Expand Down Expand Up @@ -331,13 +330,29 @@ def geopoints_from_csv(fpath: str, lat_name: str, lon_name: str) -> gpd.GeoDataF
return gdf


def latlon_coords(fpath):
def get_latlon_keys(ds):
lat_key = None
lon_key = None
if "lat" in ds.coords and "lon" in ds.coords:
lat_key = "lat"
lon_key = "lon"
elif "latitude" in ds.coords and "longitude" in ds.coords:
lat_key = "latitude"
lon_key = "longitude"
else:
raise Exception(
f"Lat/lon coordinates could not be detected in dataset with coords {ds.coords}" # noqa: E501
)

return lat_key, lon_key


def latlon_coords(ds):
"""Latitude and longitude coordinates of an xarray"""

with xr.open_dataset(fpath) as ds:
lat_coords = ds.coords.get("latitude").data
lon_coords = ds.coords.get("longitude").data
coords = {"x": lon_coords, "y": lat_coords}
del ds
lat_key, lon_key = get_latlon_keys(ds)
lat_coords = ds.coords.get(lat_key).data
lon_coords = ds.coords.get(lon_key).data
coords = {"x": lon_coords, "y": lat_coords}

return coords
Loading

0 comments on commit 058bf6d

Please sign in to comment.