Skip to content

Commit

Permalink
Implement remainder of data fetch and clean algorithm.
Browse files Browse the repository at this point in the history
  • Loading branch information
bolny committed Nov 30, 2024
1 parent b074c9d commit 9b44191
Show file tree
Hide file tree
Showing 5 changed files with 788 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
*.pyc
__pycache__
.pytest_cache
tmp
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ version = "0.1.0"
description = "BC ventilation index bulletin generator"
readme = "README.md"
requires-python = ">=3.9"
dependencies = []
dependencies = [
"cfgrib>=0.9.14.1",
"geopandas>=1.0.1",
"xarray>=2024.7.0",
]

[tool.ruff]
line-length = 120
Expand Down
240 changes: 222 additions & 18 deletions tests/test_data_fetch.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,228 @@
import pytest
from datetime import datetime
from pandas import DataFrame
from freezegun import freeze_time
from xarray import Dataset
from geopandas import GeoDataFrame

from utils.data_fetch import _build_urls
from utils.data_fetch import _build_geo_data_frames, _build_urls, _build_data_frames, _clean_data, _filter_data


class TestBuildUrls:
@freeze_time("2024-11-19")
def test_urls_correct(self):
offset1 = "012"
offset2 = "024"
offset3 = "048"
url1 = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km/00/012/20241119T00Z_MSC_HRDPS_VI_Sfc_RLatLon0.0225_PT012H.grib2"
url2 = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km/00/024/20241119T00Z_MSC_HRDPS_VI_Sfc_RLatLon0.0225_PT024H.grib2"
url3 = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km/00/048/20241119T00Z_MSC_HRDPS_VI_Sfc_RLatLon0.0225_PT048H.grib2"
@freeze_time("2024-11-19")
def test_build_urls():
offset_1 = "012"
offset_2 = "024"
offset3 = "048"
url_1 = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km/00/012/20241119T00Z_MSC_HRDPS_VI_Sfc_RLatLon0.0225_PT012H.grib2"
url_2 = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km/00/024/20241119T00Z_MSC_HRDPS_VI_Sfc_RLatLon0.0225_PT024H.grib2"
url_3 = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km/00/048/20241119T00Z_MSC_HRDPS_VI_Sfc_RLatLon0.0225_PT048H.grib2"

urls = _build_urls()
urls = _build_urls()

assert offset1 in urls.keys()
assert offset2 in urls.keys()
assert offset3 in urls.keys()
assert urls[offset1] == url1
assert urls[offset2] == url2
assert urls[offset3] == url3
assert offset_1 in urls.keys()
assert offset_2 in urls.keys()
assert offset3 in urls.keys()
assert urls[offset_1] == url_1
assert urls[offset_2] == url_2
assert urls[offset3] == url_3


def test_build_dataframes():
# This is an extremely minimal example of the structure of the data I've
# seen from Environment Canada for my purposes. I'm no data scientist,
# however.
dataset_1 = Dataset(
{
"unknown": (["loc"], [0.50]),
"latitude": (["loc"], [49]),
"longitude": (["loc"], [-130]),
},
coords={
"x": (["loc"], [1]),
"y": (["loc"], [1]),
},
)
dataset_2 = Dataset(
{
"unknown": (["loc"], [0.43]),
"latitude": (["loc"], [49]),
"longitude": (["loc"], [-130]),
},
coords={
"x": (["loc"], [1]),
"y": (["loc"], [1]),
},
)

datasets = {
"012": dataset_1,
"024": dataset_2,
}

result = _build_data_frames(datasets)

assert "012" in result
assert isinstance(result["012"], DataFrame)
assert "024" in result
assert isinstance(result["024"], DataFrame)


def test_filter_data():
data_frame_1 = DataFrame(
[
{
"y": 1,
"x": 1,
"latitude": 49,
"longitude": -130,
"unknown": 0.50,
},
{
"y": 2,
"x": 1,
"latitude": 49,
"longitude": -150,
"unknown": 0.67,
},
]
)
data_frame_2 = DataFrame(
[
{
"y": 1,
"x": 1,
"latitude": 49,
"longitude": -130,
"unknown": 0.43,
},
{
"y": 2,
"x": 1,
"latitude": 49,
"longitude": -150,
"unknown": 0.21,
},
]
)
data_frames = {
"012": data_frame_1,
"024": data_frame_2,
}

result = _filter_data(data_frames)

assert "012" in result
test_frame_1 = result["012"]
assert test_frame_1[test_frame_1["longitude"] == -150].empty

assert "024" in result
test_frame_2 = result["024"]
assert test_frame_2[test_frame_2["longitude"] == -150].empty


def test_clean_data():
data_frame_1 = DataFrame(
[
{
"y": 1,
"x": 1,
"time": datetime.now(),
"step": 1,
"surface": 0,
"valid_time": 24,
"latitude": 49,
"longitude": -130,
"unknown": 0.50,
},
]
)
data_frame_2 = DataFrame(
[
{
"y": 1,
"x": 1,
"time": datetime.now(),
"step": 1,
"surface": 0,
"valid_time": 24,
"latitude": 49,
"longitude": -130,
"unknown": 0.43,
},
]
)
data_frames = {
"012": data_frame_1,
"024": data_frame_2,
}

result = _clean_data(data_frames)

assert "012" in result
test_frame_1 = result["012"]
assert "time" not in test_frame_1.columns.values
assert "step" not in test_frame_1.columns.values
assert "surface" not in test_frame_1.columns.values
assert "valid_time" not in test_frame_1.columns.values
assert "x" in test_frame_1.columns.values
assert "y" in test_frame_1.columns.values
assert "latitude" in test_frame_1.columns.values
assert "longitude" in test_frame_1.columns.values
assert "ventilation_index" in test_frame_1.columns.values

assert "024" in result
test_frame_2 = result["012"]
assert "time" not in test_frame_2.columns.values
assert "step" not in test_frame_2.columns.values
assert "surface" not in test_frame_2.columns.values
assert "valid_time" not in test_frame_2.columns.values
assert "x" in test_frame_2.columns.values
assert "y" in test_frame_2.columns.values
assert "latitude" in test_frame_2.columns.values
assert "longitude" in test_frame_2.columns.values
assert "ventilation_index" in test_frame_2.columns.values


def test_build_geo_data_frames():
# This test generates false warnings. See:
# https://github.com/geopandas/geopandas/issues/3430
# https://github.com/pyproj4/pyproj/issues/1307
data_frame_1 = DataFrame(
[
{
"y": 1,
"x": 1,
"latitude": 49,
"longitude": -130,
"ventilation_index": 0.50,
},
]
)
data_frame_2 = DataFrame(
[
{
"y": 1,
"x": 1,
"latitude": 49,
"longitude": -130,
"ventilation_index": 0.43,
},
]
)
data_frames = {
"012": data_frame_1,
"024": data_frame_2,
}

result = _build_geo_data_frames(data_frames)

assert "012" in result
geo_data_frame_1 = result["012"]
assert isinstance(geo_data_frame_1, GeoDataFrame)
assert "geometry" in geo_data_frame_1.columns.values
assert geo_data_frame_1.crs.name == "WGS 84 / Pseudo-Mercator"

assert "024" in result
geo_data_frame_2 = result["024"]
assert isinstance(geo_data_frame_2, GeoDataFrame)
assert "geometry" in geo_data_frame_2.columns.values
assert geo_data_frame_2.crs.name == "WGS 84 / Pseudo-Mercator"
112 changes: 108 additions & 4 deletions utils/data_fetch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import datetime
from datetime import date
from os import makedirs
from urllib.request import urlretrieve
from pandas import DataFrame
from xarray import load_dataset, Dataset
from geopandas import points_from_xy, GeoDataFrame


BASE_URL = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km"
Expand All @@ -13,17 +18,116 @@ def _build_urls() -> dict[str, str]:
# for the data posted at 00 (midnight).
urls: dict[str, str] = {}
# The date in the filename is represented as "YYYYmmdd"
today = datetime.date.today().strftime("%Y%m%d")
today = date.today().strftime("%Y%m%d")
for offset in OFFSETS:
urls[offset] = (
f"{BASE_URL}/{POSTING_TIME}/{offset}/{today}T{POSTING_TIME}Z_MSC_HRDPS_VI_Sfc_RLatLon0.0225_PT{offset}H.grib2"
)

info = "\n".join([url for _, url in urls.items()])
print(f"Generated URLs:\n{info}\n")

return urls


def fetch_and_clean_data():
def _fetch_data(urls: dict[str, str]):
# Download the 12h, 24h, and 48h ventillation index forecast files from the
# Environment Canada datamart.
for offset, url in urls.items():
print(f"Fetching {url}...")
urlretrieve(url, f"./tmp/{offset}.grib2")
print("Done.\n")


def _read_data() -> dict[str, Dataset]:
# Load the data from files into memory.
datasets: dict[str, Dataset] = {}
for offset in OFFSETS:
datasets[offset] = load_dataset(f"./tmp/{offset}.grib2", engine="cfgrib")

num_data_points = sum([dataset.sizes["x"] * dataset.sizes["y"] for dataset in datasets.values()])
print(f"Loaded {num_data_points} data points.")

return datasets


def _build_data_frames(datasets: dict[str, Dataset]) -> dict[str, DataFrame]:
data_frames: dict[str, DataFrame] = {}
for offset in datasets.keys():
data_frames[offset] = datasets[offset].to_dataframe().reset_index()

return data_frames


def _filter_data(canada_data_frames: dict[str, Dataset]) -> dict[str, DataFrame]:
# Filter the data to only include BC.
# From https://catalogue.data.gov.bc.ca/dataset/legally-defined-administrative-areas-of-bc/resource/d70a242e-ebc5-4b3d-a418-38abefa03fb2
bc_data_frames: dict[str, DataFrame] = {}
for offset in canada_data_frames.keys():
canada_data_frame = canada_data_frames[offset]

# Filter to the legal bounds of BC.
bc_data_frames[offset] = canada_data_frame[
(canada_data_frame["latitude"] > 48.0)
& (canada_data_frame["latitude"] < 60.0)
& (canada_data_frame["longitude"] > -139.5)
& (canada_data_frame["longitude"] < -113.5)
]

num_data_points = sum([dataframe.shape[0] for dataframe in bc_data_frames.values()])
print(f"Filtered data to {num_data_points} data points.")

return bc_data_frames


def _clean_data(data_frames: dict[str, DataFrame]) -> dict[str, DataFrame]:
# Filter unused columns.
trimmed_data_frames: dict[str, DataFrame] = {}
for offset in data_frames.keys():
data_frame = data_frames[offset]
trimmed_data_frames[offset] = data_frame.drop(columns=["time", "step", "surface", "valid_time"], axis=1)

# There is no label for the ventilation index column, so it gets the default label of "unknown".
renamed_data_frames: dict[str, DataFrame] = {}
for offset in data_frames.keys():
data_frame = trimmed_data_frames[offset]
renamed_data_frames[offset] = data_frame.rename(columns={"unknown": "ventilation_index"})

print("Cleaned DataFrames.")
return renamed_data_frames


def _build_geo_data_frames(data_frames: dict[str, DataFrame]) -> dict[str, GeoDataFrame]:
geo_data_frames: dict[str, GeoDataFrame] = {}
for offset in data_frames.keys():
geo_data_frame = GeoDataFrame(
data_frames[offset],
geometry=points_from_xy(data_frames[offset]["longitude"], data_frames[offset]["latitude"]),
)
# This is the CRS for the 1984 GPS standard (latitude/longitude).
geo_data_frame.set_crs(epsg=4326, inplace=True)
# Converting it to web mercator to be compatible with ventilation index
# zones.
geo_data_frame.to_crs(epsg=3857, inplace=True)
geo_data_frames[offset] = geo_data_frame

print("Built GeoDataFrames.")
return geo_data_frames


def fetch_and_clean_data() -> dict[str, GeoDataFrame]:
makedirs("./tmp", exist_ok=True)

urls = _build_urls()
pass
_fetch_data(urls)
datasets = _read_data()
data_frames = _build_data_frames(datasets)
filtered_data_frames = _filter_data(data_frames)
clean_data_frames = _clean_data(filtered_data_frames)
geo_data_frames = _build_geo_data_frames(clean_data_frames)

return geo_data_frames


if __name__ == "__main__":
fetch_and_clean_data()
Loading

0 comments on commit 9b44191

Please sign in to comment.