Skip to content

Commit

Permalink
Improve empty data array handling
Browse files Browse the repository at this point in the history
ONC API v3 has more/different ways of returning empty responses to data requests
that we have to handle. It's not as clean as I would like, but it is good
enough for the purpose.
  • Loading branch information
douglatornell committed Feb 29, 2024
1 parent cfaed7c commit 2df8de1
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 22 deletions.
51 changes: 35 additions & 16 deletions nowcast/workers/get_onc_ferry.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,13 @@ def _get_water_data(ferry_platform, device_category, ymd, devices_config):
return device_data


def _empty_device_data(ferry_platform, device_category, ymd, sensors):
def _empty_device_data(
ferry_platform, device_category, ymd, sensors, time_coord="sampleTime"
):
# Response from ONC contains no sensor data, so return an
# empty DataArray
logger.warning(
f"No ONC {ferry_platform} {device_category} data for {ymd}; "
f"No ONC {ferry_platform} {device_category} {sensors} data for {ymd}; "
f"substituting empty dataset"
)
onc_units = {
Expand All @@ -321,15 +323,21 @@ def _empty_device_data(ferry_platform, device_category, ymd, sensors):
sensor: xarray.DataArray(
name=sensor,
data=numpy.array([], dtype=float),
coords={"sampleTime": numpy.array([], dtype="datetime64[ns]")},
dims="sampleTime",
coords={time_coord: numpy.array([], dtype="datetime64[ns]")},
dims=time_coord,
attrs={
"device_category": device_category,
"qaqcFlag": numpy.array([], dtype=numpy.int64),
"unitOfMeasure": onc_units[sensor],
"units": "degrees_Celcius"
if sensor in {"temperature", "air_temperature"}
else onc_units[sensor],
},
)
for sensor in sensors.split(",")
}
if len(data_arrays) == 1:
return data_arrays[sensors]
return xarray.Dataset(data_arrays)


Expand All @@ -341,27 +349,34 @@ def _qaqc_filter(ferry_platform, device, device_data, ymd, devices_config):
f"filtering ONC {ferry_platform} {device} {onc_sensor} data "
f"for {ymd} to exclude 1<qaqcFlag<7"
)
onc_data = getattr(device_data, onc_sensor)
not_nan_mask = numpy.logical_not(numpy.isnan(onc_data.values))
sensor_qaqc_mask = onc_data.attrs["qaqcFlag"] <= 1
try:
onc_data = getattr(device_data, onc_sensor)
except AttributeError:
data_array = _empty_device_data(

Check warning on line 355 in nowcast/workers/get_onc_ferry.py

View check run for this annotation

Codecov / codecov/patch

nowcast/workers/get_onc_ferry.py#L352-L355

Added lines #L352 - L355 were not covered by tests
ferry_platform, device, ymd, onc_sensor, time_coord="time"
)
sensor_data_arrays.append(data_array)
continue
sensor_qaqc_mask = numpy.logical_or(

Check warning on line 360 in nowcast/workers/get_onc_ferry.py

View check run for this annotation

Codecov / codecov/patch

nowcast/workers/get_onc_ferry.py#L358-L360

Added lines #L358 - L360 were not covered by tests
onc_data.attrs["qaqcFlag"] <= 1, onc_data.attrs["qaqcFlag"] >= 7
)
try:
cf_units = cf_units_mapping[onc_data.unitOfMeasure]
except KeyError:
cf_units = onc_data.unitOfMeasure
sensor_data_arrays.append(
xarray.DataArray(
if not sensor_qaqc_mask.any():
data_array = _empty_device_data(

Check warning on line 368 in nowcast/workers/get_onc_ferry.py

View check run for this annotation

Codecov / codecov/patch

nowcast/workers/get_onc_ferry.py#L368

Added line #L368 was not covered by tests
ferry_platform, device, ymd, onc_sensor, time_coord="time"
)
else:
data_array = xarray.DataArray(

Check warning on line 372 in nowcast/workers/get_onc_ferry.py

View check run for this annotation

Codecov / codecov/patch

nowcast/workers/get_onc_ferry.py#L372

Added line #L372 was not covered by tests
name=sensor,
data=onc_data[not_nan_mask][sensor_qaqc_mask].values,
coords={
"time": onc_data.sampleTime[not_nan_mask][sensor_qaqc_mask].values
},
data=onc_data[sensor_qaqc_mask].values,
coords={"time": onc_data.sampleTime[sensor_qaqc_mask].values},
dims="time",
attrs={"device_category": device, "units": cf_units},
)
)
sensor_data_arrays.append(data_array)

Check warning on line 379 in nowcast/workers/get_onc_ferry.py

View check run for this annotation

Codecov / codecov/patch

nowcast/workers/get_onc_ferry.py#L379

Added line #L379 was not covered by tests
return sensor_data_arrays


Expand All @@ -387,7 +402,7 @@ def count(values, axis):
else:
try:
data_array = array.resample(time="1Min").mean()
except IndexError:
except (IndexError, ValueError):

Check warning on line 405 in nowcast/workers/get_onc_ferry.py

View check run for this annotation

Codecov / codecov/patch

nowcast/workers/get_onc_ferry.py#L405

Added line #L405 was not covered by tests
# array is empty, meaning there are no observations with
# qaqcFlag<=1 or qaqcFlac>=7, so substitute a DataArray full of NaNs
logger.warning(
Expand Down Expand Up @@ -418,7 +433,11 @@ def count(values, axis):
sample_count_var = f"{var}_sample_count"
sample_count_array = array.resample(time="1Min").count()
sample_count_array.attrs = array.attrs
del sample_count_array.attrs["units"]
try:
del sample_count_array.attrs["units"]
except KeyError:

Check warning on line 438 in nowcast/workers/get_onc_ferry.py

View check run for this annotation

Codecov / codecov/patch

nowcast/workers/get_onc_ferry.py#L436-L438

Added lines #L436 - L438 were not covered by tests
# empty data arrays lack units attributes
pass

Check warning on line 440 in nowcast/workers/get_onc_ferry.py

View check run for this annotation

Codecov / codecov/patch

nowcast/workers/get_onc_ferry.py#L440

Added line #L440 was not covered by tests
data_vars[sample_count_var] = _create_dataarray(
sample_count_var, sample_count_array, ferry_platform, location_config
)
Expand Down
55 changes: 49 additions & 6 deletions tests/workers/test_get_onc_ferry.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,16 +291,30 @@ class TestGetWaterData:
pass


@pytest.mark.parametrize(
"ferry_platform, device, sensors",
[("TWDP", "TSG", "temperature,conductivity,salinity")],
)
class TestEmptyDeviceData:
"""Unit tests for _empty_device_data() function."""

def test_empty_device_data(self, ferry_platform, device, sensors, caplog):
def test_msg(self, caplog):
caplog.set_level(logging.DEBUG)

get_onc_ferry._empty_device_data(
"TWDP", "TSG", "2024-02-08", "temperature,conductivity,salinity"
)

expected = (
f"No ONC TWDP TSG temperature,conductivity,salinity data for 2024-02-08; "
f"substituting empty dataset"
)
assert caplog.records[0].levelname == "WARNING"
assert caplog.messages[0] == expected

@pytest.mark.parametrize(
"ferry_platform, device_category, sensors",
[("TWDP", "TSG", "temperature,conductivity,salinity")],
)
def test_empty_device_data(self, ferry_platform, device_category, sensors, caplog):
dataset = get_onc_ferry._empty_device_data(
ferry_platform, device, "2017-12-01", sensors
ferry_platform, device_category, "2017-12-01", sensors
)
for sensor in sensors.split(","):
assert sensor in dataset.data_vars
Expand All @@ -311,6 +325,35 @@ def test_empty_device_data(self, ferry_platform, device, sensors, caplog):
assert dataset.sampleTime.dtype == "datetime64[ns]"
assert "sampleTime" in dataset.dims

@pytest.mark.parametrize(
"ferry_platform, device_category, sensors, uom, units",
[
("TWDP", "TSG", "temperature", "C", "degrees_Celcius"),
("TWDP", "TSG", "conductivity", "S/m", "S/m"),
("TWDP", "TSG", "salinity", "g/kg", "g/kg"),
("TWDP", "OXYSENSOR", "oxygen_saturation", "percent", "percent"),
("TWDP", "OXYSENSOR", "oxygen_corrected", "ml/l", "ml/l"),
("TWDP", "OXYSENSOR", "temperature", "C", "degrees_Celcius"),
("TWDP", "TURBCHLFL", "cdom_fluorescence", "ppb", "ppb"),
("TWDP", "TURBCHLFL", "chlorophyll", "ug/l", "ug/l"),
("TWDP", "TURBCHLFL", "turbidity", "NTU", "NTU"),
("TWDP", "CO2SENSOR", "partial_pressure", "pCO2 uatm", "pCO2 uatm"),
("TWDP", "CO2SENSOR", "co2", "umol/mol", "umol/mol"),
("TWDP", "TEMPHUMID", "air_temperature", "C", "degrees_Celcius"),
("TWDP", "TEMPHUMID", "rel_humidity", "%", "%"),
("TWDP", "BARPRESS", "barometric_pressure", "hPa", "hPa"),
("TWDP", "PYRANOMETER", "solar_radiation", "W/m^2", "W/m^2"),
("TWDP", "PYRGEOMETER", "downward_radiation", "W/m^2", "W/m^2"),
],
)
def test_attrs(self, ferry_platform, device_category, sensors, uom, units, caplog):
dataset = get_onc_ferry._empty_device_data(
ferry_platform, device_category, "2024-02-08", sensors
)
assert dataset.attrs["device_category"] == device_category
assert dataset.attrs["unitOfMeasure"] == uom
assert dataset.attrs["units"] == units


@pytest.mark.parametrize("ferry_platform", ["TWDP"])
class TestQaqcFilter:
Expand Down

0 comments on commit 2df8de1

Please sign in to comment.