Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev static fixes #225

Merged
merged 22 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,157 changes: 5,157 additions & 0 deletions notebooks/static_fixes.ipynb

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions rook/director/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ def __init__(self, coll, inputs):
raise InvalidCollection()

self._resolve()
# check if a fix will be applied
self._check_apply_fixes()
# if enabled for the project then check if a fix will be applied
if CONFIG[f"project:{self.project}"].get("use_fixes", False):
self._check_apply_fixes()
else:
self.inputs["apply_fixes"] = False

def _check_apply_fixes(self):
if (
Expand Down
36 changes: 36 additions & 0 deletions rook/processes/wps_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ def __init__(self):
min_occurs=1,
max_occurs=100,
),
LiteralInput(
"time",
"Time Period",
abstract="The time interval (start/end) to subset over separated by '/'"
" or a list of time points separated by ','."
" The format is according to the ISO-8601 standard."
" Example: 1860-01-01/1900-12-30 or 1860-01-01, 1870-01-01, 1880-01-01",
data_type="string",
min_occurs=0,
max_occurs=1,
),
LiteralInput(
"time_components",
"Time Components",
abstract="Optional time components to describe parts of the time period (e.g. year, month and day)."
" Example: month:01,02,03 or year:1970,1980|month:01,02,03",
data_type="string",
min_occurs=0,
max_occurs=1,
),
LiteralInput(
"dims",
"Dimensions",
Expand All @@ -40,6 +60,15 @@ def __init__(self):
min_occurs=1,
max_occurs=1,
),
LiteralInput(
"apply_average",
"Apply Average over dims",
data_type="boolean",
abstract="Apply Average over dims.",
default="0",
min_occurs=1,
max_occurs=1,
),
LiteralInput(
"pre_checked",
"Pre-Checked",
Expand Down Expand Up @@ -116,6 +145,13 @@ def _handler(self, request, response):
"pre_checked": parse_wps_input(
request.inputs, "pre_checked", default=False
),
"apply_average": parse_wps_input(
request.inputs, "apply_average", default=False
),
"time": parse_wps_input(request.inputs, "time", default=None),
"time_components": parse_wps_input(
request.inputs, "time_components", default=None
),
"dims": parse_wps_input(
request.inputs, "dims", as_sequence=True, default=None
),
Expand Down
1 change: 1 addition & 0 deletions rook/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def add_operator(self, operator, parameters, collection, output):
"dims",
"freq",
"apply_fixes",
"apply_average",
]:
if param in parameters:
value = parameters[param]
Expand Down
77 changes: 44 additions & 33 deletions rook/utils/concat_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

from roocs_utils.parameter import collection_parameter
from roocs_utils.parameter import dimension_parameter
from roocs_utils.parameter import time_parameter
from roocs_utils.parameter import time_components_parameter

from roocs_utils.project_utils import derive_ds_id

from daops.ops.base import Operation
from daops.utils import normalise

from clisops.utils.file_namers import get_file_namer
from clisops.utils.output_utils import get_output, get_time_slices
from clisops.ops import subset

from clisops.core.average import average_over_dims as average

from .decadal_fixes import apply_decadal_fixes

coord_by_standard_name = {
"realization": "realization_index",
Expand All @@ -25,12 +30,19 @@ def _resolve_params(self, collection, **params):
Resolve the input parameters to `self.params` and parameterise
collection parameter and set to `self.collection`.
"""
time = time_parameter.TimeParameter(params.get("time"))
time_components = time_components_parameter.TimeComponentsParameter(
params.get("time_components")
)
dims = dimension_parameter.DimensionParameter(params.get("dims"))
collection = collection_parameter.CollectionParameter(collection)

self.collection = collection
self.params = {
"time": time,
"time_components": time_components,
"dims": dims,
"apply_average": params.get("apply_average", False),
"ignore_undetected_dims": params.get("ignore_undetected_dims"),
}

Expand All @@ -51,11 +63,19 @@ def _calculate(self):
new_collection[ds_id] = dset.file_paths

# Normalise (i.e. "fix") data inputs based on "character"
norm_collection = normalise.normalise(new_collection, self._apply_fixes)
norm_collection = normalise.normalise(
new_collection, False # self._apply_fixes
)

rs = normalise.ResultSet(vars())

datasets = list(norm_collection.values())
# datasets = list(norm_collection.values())
# apply decadal fixes
datasets = []
for ds_id in norm_collection.keys():
ds = norm_collection[ds_id]
ds_mod = apply_decadal_fixes(ds_id, ds)
datasets.append(ds_mod)

dims = dimension_parameter.DimensionParameter(
self.params.get("dims", None)
Expand All @@ -71,49 +91,34 @@ def _calculate(self):
{dim: (dim, np.array(processed_ds[dim].values, dtype="int32"))}
)
processed_ds.coords[dim].attrs = {"standard_name": standard_name}

namer = get_file_namer("standard")()
time_slices = get_time_slices(processed_ds, "time:auto")

outputs = list()
# Loop through each time slice
for tslice in time_slices:

# If there is only one time slice, and it is None:
# - then just set the result Dataset to the processed Dataset
if tslice is None:
result_ds = processed_ds
# If there is a time slice then extract the time slice from the
# processed Dataset
else:
result_ds = processed_ds.sel(time=slice(tslice[0], tslice[1]))

# print(f"for times: {tslice}")

# Get the output (file or xarray Dataset)
# When this is a file: xarray will read all the data and write the file
output = get_output(
result_ds,
output_type="nc",
output_dir=self._output_dir,
namer=namer,
)
outputs.append(output)

# optional: average
if self.params.get("apply_average", False):
processed_ds = average(processed_ds, dims=["realization"])
# subset
outputs = subset(
processed_ds,
time=self.params.get("time", None),
time_components=self.params.get("time_components", None),
output_type="nc",
)
# result
rs.add("output", outputs)

return rs


def _concat(
collection,
time=None,
time_components=None,
dims=None,
ignore_undetected_dims=False,
output_dir=None,
output_type="netcdf",
split_method="time:auto",
file_namer="standard",
apply_fixes=True,
apply_average=False,
):
result_set = Concat(**locals())._calculate()
return result_set
Expand All @@ -126,22 +131,28 @@ def run_concat(args):

def concat(
collection,
time=None,
time_components=None,
dims=None,
ignore_undetected_dims=False,
output_dir=None,
output_type="netcdf",
split_method="time:auto",
file_namer="standard",
apply_fixes=True,
apply_average=False,
):
args = dict(
collection=collection,
time=time,
time_components=time_components,
dims=dims,
ignore_undetected_dims=ignore_undetected_dims,
output_dir=output_dir,
output_type=output_type,
split_method=split_method,
file_namer=file_namer,
apply_fixes=apply_fixes,
apply_average=apply_average,
)
return _concat(**args)
132 changes: 132 additions & 0 deletions rook/utils/decadal_fixes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from daops.data_utils.attr_utils import (
edit_var_attrs,
edit_global_attrs,
remove_coord_attr,
)
from daops.data_utils.coord_utils import add_scalar_coord, add_coord
from daops.data_utils.var_utils import add_data_var

model_specific_global_attrs = {
"CMCC-CM2-SR5": {
"forcing_description": "f1, CMIP6 historical forcings",
"physics_description": "physics from the standard model configuration, with no additional tuning or different parametrization", # noqa
"initialization_description": "hindcast initialized based on observations and using historical forcing", # noqa
},
"EC-Earth3": {
"forcing_description": "f1, CMIP6 historical forcings",
"physics_description": "physics from the standard model configuration, with no additional tuning or different parametrization", # noqa
"initialization_description": "Atmosphere initialization based on full-fields from ERA-Interim (s1979-s2018) or ERA-40 (s1960-s1978); ocean/sea-ice initialization based on full-fields from NEMO/LIM assimilation run nudged towards ORA-S4 (s1960-s2018)", # noqa
},
"HadGEM3-GC31-MM": {
"forcing_description": "f2, CMIP6 v6.2.0 forcings; no ozone remapping",
"physics_description": "physics from the standard model configuration, with no additional tuning or different parametrization", # noqa
"initialization_description": "hindcast initialized based on observations and using historical forcing", # noqa
},
"MPI-ESM1-2-HR": {
"forcing_description": "f1, CMIP6 historical forcings",
"physics_description": "physics from the standard model configuration, with no additional tuning or different parametrization", # noqa
"initialization_description": "hindcast initialized based on observations and using historical forcing", # noqa
},
"MPI-ESM1-2-LR": {
"forcing_description": "f1, CMIP6 historical forcings",
"physics_description": "physics from the standard model configuration, with no additional tuning or different parametrization", # noqa
"initialization_description": "hindcast initialized based on observations and using historical forcing", # noqa
},
}


def get_decadal_model_attr_from_dict(ds_id, ds, attr):
# TODO: method taken from daops.fix_utils.decadal_utils.py
# Add the model-specific global attr
model = ds_id.split(".")[3]
value = model_specific_global_attrs[model][attr]
return value


def apply_decadal_fixes(ds_id, ds):
ds_mod = decadal_fix_1(ds_id, ds)
ds_mod = decadal_fix_2(ds_id, ds_mod)
ds_mod = decadal_fix_3(ds_id, ds_mod)
ds_mod = decadal_fix_4(ds_id, ds_mod)
ds_mod = decadal_fix_5(ds_id, ds_mod)
return ds_mod


def decadal_fix_1(ds_id, ds):
operands = {"var_id": "time", "attrs": {"long_name": "valid_time"}}
ds_mod = edit_var_attrs(ds_id, ds, **operands)
return ds_mod


def decadal_fix_2(ds_id, ds):
operands = {
"attrs": {
"forcing_description": get_decadal_model_attr_from_dict(
ds_id, ds, "forcing_description"
), # noqa
"physics_description": get_decadal_model_attr_from_dict(
ds_id, ds, "physics_description"
), # noqa
"initialization_description": get_decadal_model_attr_from_dict(
ds_id, ds, "initialization_description"
), # noqa
"startdate": "derive: daops.fix_utils.decadal_utils.get_sub_experiment_id",
"sub_experiment_id": "derive: daops.fix_utils.decadal_utils.get_sub_experiment_id",
}
}

ds_mod = edit_global_attrs(ds_id, ds, **operands)
return ds_mod


def decadal_fix_3(ds_id, ds):
operands = {
"var_id": "reftime",
"value": "derive: daops.fix_utils.decadal_utils.get_reftime",
"dtype": "datetime64[ns]",
"attrs": {
"long_name": "Start date of the forecast",
"standard_name": "forecast_reference_time",
},
"encoding": {
"dtype": "int32",
"units": "days since 1850-01-01",
"calendar": "derive: daops.fix_utils.decadal_utils.get_time_calendar",
},
}

ds_mod = add_scalar_coord(ds_id, ds, **operands)
return ds_mod


def decadal_fix_4(ds_id, ds):
operands = {
"var_id": "leadtime",
"value": "derive: daops.fix_utils.decadal_utils.get_lead_times",
"dim": ["time"],
"dtype": "float64",
"attrs": {
"long_name": "Time elapsed since the start of the forecast",
"standard_name": "forecast_period",
"units": "days",
},
"encoding": {"dtype": "double"},
}

ds_mod = add_coord(ds_id, ds, **operands)
return ds_mod


def decadal_fix_5(ds_id, ds):
operands = {
"var_id": "realization",
"value": "1",
"dtype": "int32",
"attrs": {
"long_name": "realization",
"comment": "For more information on the ripf, refer to the variant_label, initialization_description, physics_description and forcing_description global attributes", # noqa
},
}

ds_mod = add_data_var(ds_id, ds, **operands)
return ds_mod
2 changes: 1 addition & 1 deletion rook/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def replace_inputs(wfdoc):
# fixes are only applied to start steps
if step_id in start_steps:
steps[step_id]["in"]["apply_fixes"] = steps[step_id]["in"].get(
"apply_fixes", True
"apply_fixes", False
)
else:
steps[step_id]["in"]["apply_fixes"] = False
Expand Down