diff --git a/README.md b/README.md index 06f9a02..44596c6 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ DISCLAIMER: This project is a private open source project and doesn't have any connection with Deutscher Wetterdienst. -# IMPORTANT: I will change the way to access the weather data shortly, to clean up the code and reduce the usage of reused code. The functionality will remain the same, but you have change a few function calls and some units of measurement change. +## IMPORTANT: v1 changes the methods to access the data This is a python package for simple access to hourly forecast data for the next 10 days. The data is updated every six hours and updated when needed. @@ -36,9 +36,9 @@ from datetime import datetime, timedelta, timezone dwd_weather = dwdforecast.Weather("10385") # Station-ID For BERLIN-SCHOENEFELD time_now = datetime.now(timezone.utc) -temperature_now = dwd_weather.get_forecast_temperature(time_now) +temperature_now = dwd_weather.get_forecast_data(dwdforecast.WeatherDataType.TEMPERATURE, time_now) time_tomorrow = datetime.now(timezone.utc)+timedelta(days=1) -temperature_tomorrow = dwd_weather.get_forecast_temperature(time_tomorrow) +temperature_tomorrow = dwd_weather.get_forecast_data(dwdforecast.WeatherDataType.TEMPERATURE, time_tomorrow) ``` ### Available methods @@ -52,33 +52,30 @@ dwdforecast.get_nearest_station_id(latitude, longitude) #Returns nearest Station get_station_name(optional bool shouldUpdate) #Return Station name -get_forecast_condition(datetime, optional bool shouldUpdate) #Result is condition as text - -get_forecast_temperature(datetime, optional bool shouldUpdate) #Result is in degrees Celcius - -get_forecast_pressure(datetime, optional bool shouldUpdate) #Result is in hPa - -get_forecast_wind_direction(datetime, optional bool shouldUpdate) #Result is in degrees magnetic - -get_forecast_wind_speed(datetime, optional bool shouldUpdate) #Result is in m/s - -get_forecast_precipitation(datetime, optional bool shouldUpdate) #Result is in kg/m^2 - -get_forecast_precipitation_probability(datetime, optional bool shouldUpdate) #Result is in percent +class WeatherDataType(Enum): + CONDITION = "condition" + TEMPERATURE = "TTT" # Unit: K + DEWPOINT = "Td" # Unit: K + PRESSURE = "PPPP" # Unit: Pa + WIND_SPEED = "FF" # Unit: m/s + WIND_DIRECTION = "DD" # Unit: Degrees + WIND_GUSTS = "FX1" # Unit: m/s + PRECIPITATION = "RR1c" # Unit: kg/m2 + PRECIPITATION_PROBABILITY = "wwP" # Unit: % (0..100) + PRECIPITATION_DURATION = "DRR1" # Unit: s + CLOUD_COVERAGE = "N" # Unit: % (0..100) + VISIBILITY = "VV" # Unit: m + SUN_DURATION = "SunD1" # Unit: s + SUN_IRRADIANCE = "Rad1h" # Unit: kJ/m2 + FOG_PROBABILITY = "wwM" # Unit: % (0..100) + +get_forecast_data(weatherDataType: see WeatherDataType , datetime, optional bool shouldUpdate) # Returns the requestes weather data + +get_daily_max(weatherDataType: see WeatherDataType , datetime, optional bool shouldUpdate) # Returns the maximum daily value + +get_daily_min(weatherDataType: see WeatherDataType , datetime, optional bool shouldUpdate) # Returns the minimum daily value -get_forecast_cloud_coverage(datetime, optional bool shouldUpdate) #Result is in percent - -get_forecast_visibility(datetime, optional bool shouldUpdate) #Result is in meters - -get_forecast_sun_duration(datetime, optional bool shouldUpdate) #Result is in minutes of the last hour - -get_daily_temp_max(datetime, optional bool shouldUpdate) #Result is in degrees Celcius - -get_daily_temp_min(datetime, optional bool shouldUpdate) #Result is in degrees Celcius - -get_daily_precipitation(datetime, optional bool shouldUpdate) #Result is in kg/m^2 - -get_daily_precipitation_probability(datetime, optional bool shouldUpdate) #Result is the largest probability in percent +get_forecast_condition(datetime, optional bool shouldUpdate) #Result is condition as text get_daily_condition(datetime, optional bool shouldUpdate) #Result is worst condition at this day ``` diff --git a/setup.py b/setup.py index e9c4fbb..752c3c4 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="simple_dwd_weatherforecast", - version="0.9.13", + version="1.0.0", author="Max Fermor", description="A simple tool to retrieve weather forecast from DWD OpenData", long_description=long_description, diff --git a/simple_dwd_weatherforecast/dwdforecast.py b/simple_dwd_weatherforecast/dwdforecast.py index 3b7b3b7..6b8b8db 100644 --- a/simple_dwd_weatherforecast/dwdforecast.py +++ b/simple_dwd_weatherforecast/dwdforecast.py @@ -1,6 +1,7 @@ import requests from io import BytesIO from zipfile import ZipFile +from enum import Enum from lxml import etree from datetime import datetime, timedelta, timezone import time @@ -8,6 +9,7 @@ from .stations import stations + def is_valid_station_id(station_id: str): for line in stations.splitlines(): if (len(line) > 0 and line[0].isdigit()): @@ -15,6 +17,7 @@ def is_valid_station_id(station_id: str): return True return False + def get_nearest_station_id(lat: float, lon: float): result = "" distance = 99999999 @@ -35,6 +38,24 @@ def get_distance(lat, lon, _lat, _lon): return math.sqrt(math.pow(lat_diff, 2) + math.pow(lon_diff, 2)) +class WeatherDataType(Enum): + CONDITION = "condition" + TEMPERATURE = "TTT" # Unit: K + DEWPOINT = "Td" # Unit: K + PRESSURE = "PPPP" # Unit: Pa + WIND_SPEED = "FF" # Unit: m/s + WIND_DIRECTION = "DD" # Unit: Degrees + WIND_GUSTS = "FX1" # Unit: m/s + PRECIPITATION = "RR1c" # Unit: kg/m2 + PRECIPITATION_PROBABILITY = "wwP" # Unit: % (0..100) + PRECIPITATION_DURATION = "DRR1" # Unit: s + CLOUD_COVERAGE = "N" # Unit: % (0..100) + VISIBILITY = "VV" # Unit: m + SUN_DURATION = "SunD1" # Unit: s + SUN_IRRADIANCE = "Rad1h" # Unit: kJ/m2 + FOG_PROBABILITY = "wwM" # Unit: % (0..100) + + class Weather: """A class for interacting with weather data from dwd.de""" station_id = "" @@ -42,6 +63,13 @@ class Weather: issue_time = None forecast_data = None + namespaces = { + 'kml': + 'http://www.opengis.net/kml/2.2', + 'dwd': + 'https://opendata.dwd.de/weather/lib/pointforecast_dwd_extension_V1_0.xsd' + } + weather_codes = { "1": ("sunny", 29), "0": ("sunny", 28), @@ -85,189 +113,83 @@ def get_station_name(self, shouldUpdate=True): self.update() return self.station_name - def get_forecast_condition(self, timestamp: datetime, shouldUpdate=True): - if (shouldUpdate): - self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str( - self.weather_codes[self.forecast_data[time]["condition"]][0]) - return None - - def get_forecast_temperature(self, timestamp: datetime, shouldUpdate=True): - if (shouldUpdate): - self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["temp"]) - return None - - def get_forecast_pressure(self, timestamp: datetime, shouldUpdate=True): - if (shouldUpdate): - self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["pressure"]) - return None - - def get_forecast_wind_direction(self, - timestamp: datetime, - shouldUpdate=True): - if (shouldUpdate): - self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["wind_dir"]) - return None - - def get_forecast_wind_speed(self, timestamp: datetime, shouldUpdate=True): - if (shouldUpdate): - self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["wind_speed"]) - return None - - def get_forecast_precipitation(self, - timestamp: datetime, - shouldUpdate=True): - if (shouldUpdate): - self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["prec_sum"]) - return None - - def get_forecast_precipitation_probability(self, - timestamp: datetime, - shouldUpdate=True): - if (shouldUpdate): - self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["prec_prop"]) - return None + def is_in_timerange(self, timestamp: datetime): + return list(self.forecast_data.keys())[0] <= self.strip_to_hour_str( + timestamp) <= list(self.forecast_data.keys())[-1] - def get_forecast_cloud_coverage(self, - timestamp: datetime, - shouldUpdate=True): + def get_forecast_data(self, + weatherDataType: WeatherDataType, + timestamp: datetime, + shouldUpdate=True): if (shouldUpdate): self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["cloud_cov"]) + if self.is_in_timerange(timestamp): + return self.forecast_data[self.strip_to_hour_str(timestamp)][ + weatherDataType.value] return None - def get_forecast_visibility(self, timestamp: datetime, shouldUpdate=True): + def get_forecast_condition(self, timestamp: datetime, shouldUpdate=True): if (shouldUpdate): self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["visibility"]) - return None - def get_forecast_sun_duration(self, timestamp: datetime, shouldUpdate=True): - if (shouldUpdate): - self.update() - time = self.strip_to_hour_str(timestamp) - if list(self.forecast_data.keys())[0] <= time <= list( - self.forecast_data.keys())[-1]: - return str(self.forecast_data[time]["sun_dur"]) + if self.is_in_timerange(timestamp): + return str( + self.weather_codes[self.forecast_data[self.strip_to_hour_str( + timestamp)][WeatherDataType.CONDITION.value]][0]) return None def get_daily_condition(self, timestamp: datetime, shouldUpdate=True): if (shouldUpdate): self.update() - if list(self.forecast_data.keys())[0] <= self.strip_to_hour_str( - timestamp) <= list(self.forecast_data.keys())[-1]: + if self.is_in_timerange(timestamp): weather_data = self.get_day_values(timestamp) priority = 99 condition_text = "" for item in weather_data: - condition = self.weather_codes[item["condition"]] + condition = self.weather_codes[item[ + WeatherDataType.CONDITION.value]] if condition[1] < priority: priority = condition[1] condition_text = condition[0] return str(condition_text) return None - def get_daily_temp_max(self, timestamp: datetime, shouldUpdate=True): + def get_daily_max(self, + weatherDataType: WeatherDataType, + timestamp: datetime, + shouldUpdate=True): if (shouldUpdate): self.update() - if list(self.forecast_data.keys())[0] <= self.strip_to_hour_str( - timestamp) <= list(self.forecast_data.keys())[-1]: + if self.is_in_timerange(timestamp): weather_data = self.get_day_values(timestamp) - temp = None + value = None for item in weather_data: - temp_new = item["temp"] - if temp_new: - if not temp: - temp = -9999999 - if temp_new > temp: - temp = temp_new - return str(temp) + value_new = item[weatherDataType.value] + if value_new: + if not value: + value = -9999999 + if value_new > value: + value = value_new + return round(value, 2) return None - def get_daily_temp_min(self, timestamp: datetime, shouldUpdate=True): + def get_daily_min(self, + weatherDataType: WeatherDataType, + timestamp: datetime, + shouldUpdate=True): if (shouldUpdate): self.update() - if list(self.forecast_data.keys())[0] <= self.strip_to_hour_str( - timestamp) <= list(self.forecast_data.keys())[-1]: + if self.is_in_timerange(timestamp): weather_data = self.get_day_values(timestamp) - temp = None + value = None for item in weather_data: - temp_new = item["temp"] - if temp_new: - if not temp: - temp = 9999999 - if temp_new < temp: - temp = temp_new - return str(temp) - return None - - def get_daily_precipitation(self, timestamp: datetime, shouldUpdate=True): - if (shouldUpdate): - self.update() - if list(self.forecast_data.keys())[0] <= self.strip_to_hour_str( - timestamp) <= list(self.forecast_data.keys())[-1]: - weather_data = self.get_day_values(timestamp) - precipitation = None - for item in weather_data: - value = item["prec_sum"] - if (value): - if not precipitation: - precipitation = 0.0 - precipitation += float(value) - return str(precipitation) - return None - - def get_daily_precipitation_probability(self, - timestamp: datetime, - shouldUpdate=True): - if (shouldUpdate): - self.update() - if list(self.forecast_data.keys())[0] <= self.strip_to_hour_str( - timestamp) <= list(self.forecast_data.keys())[-1]: - weather_data = self.get_day_values(timestamp) - prec_prop = None - for item in weather_data: - value = item["prec_prop"] - if value: - if not prec_prop: - prec_prop = 0.0 - prec_prop_new = float(value) - if prec_prop_new > prec_prop: - prec_prop = prec_prop_new - return str(int(prec_prop)) + value_new = item[weatherDataType.value] + if value_new: + if not value: + value = 9999999 + if value_new < value: + value = value_new + return round(value, 2) return None def get_day_values(self, timestamp: datetime): @@ -288,7 +210,7 @@ def get_day_values(self, timestamp: datetime): time += timedelta(hours=1) return result - def strip_to_hour_str(self, timestamp): + def strip_to_hour_str(self, timestamp: datetime): return timestamp.strftime("%Y-%m-%dT%H:00:00.000Z") def strip_to_day(self, timestamp): @@ -300,131 +222,93 @@ def update(self): kml = self.download_latest_kml(self.station_id) self.parse_kml(kml) + def get_weather_type(self, kmlTree, weatherDataType: WeatherDataType): + """ Parses the kml-File to the requested value anbd returns the items as array""" + + result = kmlTree.xpath( + '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="{}"]/dwd:value'. + format(weatherDataType.value), + namespaces=self.namespaces)[0].text + items = [] + for elem in result.split(): + if (elem != "-"): + items.append(round(float(elem), 2)) + else: + items.append(None) + return items + def parse_kml(self, kml): - namespaces = { - 'kml': - 'http://www.opengis.net/kml/2.2', - 'dwd': - 'https://opendata.dwd.de/weather/lib/pointforecast_dwd_extension_V1_0.xsd' - } tree = etree.parse(BytesIO(kml)) - result = tree.xpath('//dwd:IssueTime', namespaces=namespaces)[0].text + result = tree.xpath('//dwd:IssueTime', + namespaces=self.namespaces)[0].text self.issue_time = datetime( *(time.strptime(result, '%Y-%m-%dT%H:%M:%S.%fZ')[0:6]), 0, timezone.utc) result = tree.xpath('//dwd:ForecastTimeSteps/dwd:TimeStep', - namespaces=namespaces) + namespaces=self.namespaces) timesteps = [] for elem in result: timesteps.append(elem.text) self.station_name = tree.xpath('//kml:Placemark/kml:description', - namespaces=namespaces)[0].text + namespaces=self.namespaces)[0].text result = tree.xpath( '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="ww"]/dwd:value', - namespaces=namespaces)[0].text + namespaces=self.namespaces)[0].text conditions = [] for elem in result.split(): conditions.append(elem.split('.')[0]) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="TTT"]/dwd:value', - namespaces=namespaces)[0].text - temperatures = [] - for elem in result.split(): - if (elem != "-"): - temperatures.append(round(float(elem) - 273.15, 2)) - else: - temperatures.append(None) + temperatures = self.get_weather_type(tree, WeatherDataType.TEMPERATURE) + + dewpoints = self.get_weather_type(tree, WeatherDataType.DEWPOINT) + + pressure = self.get_weather_type(tree, WeatherDataType.PRESSURE) + + wind_dir = self.get_weather_type(tree, WeatherDataType.WIND_DIRECTION) + + wind_speed = self.get_weather_type(tree, WeatherDataType.WIND_SPEED) + + wind_gusts = self.get_weather_type(tree, WeatherDataType.WIND_GUSTS) + + prec_sum = self.get_weather_type(tree, WeatherDataType.PRECIPITATION) + + prec_prop = self.get_weather_type( + tree, WeatherDataType.PRECIPITATION_PROBABILITY) + + prec_dur = self.get_weather_type(tree, + WeatherDataType.PRECIPITATION_DURATION) + + cloud_cov = self.get_weather_type(tree, WeatherDataType.CLOUD_COVERAGE) + + visibility = self.get_weather_type(tree, WeatherDataType.VISIBILITY) + + sun_dur = self.get_weather_type(tree, WeatherDataType.SUN_DURATION) + + sun_irr = self.get_weather_type(tree, WeatherDataType.SUN_IRRADIANCE) + + fog_prop = self.get_weather_type(tree, WeatherDataType.FOG_PROBABILITY) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="PPPP"]/dwd:value', - namespaces=namespaces)[0].text - pressure = [] - for elem in result.split(): - if (elem != "-"): - pressure.append(float(elem) / 100) - else: - pressure.append(None) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="DD"]/dwd:value', - namespaces=namespaces)[0].text - wind_dir = [] - for elem in result.split(): - if (elem != "-"): - wind_dir.append(elem) - else: - wind_dir.append(None) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="FF"]/dwd:value', - namespaces=namespaces)[0].text - wind_speed = [] - for elem in result.split(): - if (elem != "-"): - wind_speed.append(elem) - else: - wind_speed.append(None) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="RR1c"]/dwd:value', - namespaces=namespaces)[0].text - prec_sum = [] - for elem in result.split(): - if (elem != "-"): - prec_sum.append(elem) - else: - prec_sum.append(None) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="wwP"]/dwd:value', - namespaces=namespaces)[0].text - prec_prop = [] - for elem in result.split(): - if (elem != "-"): - prec_prop.append(elem) - else: - prec_prop.append(None) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="N"]/dwd:value', - namespaces=namespaces)[0].text - cloud_cov = [] - for elem in result.split(): - if (elem != "-"): - cloud_cov.append(elem) - else: - cloud_cov.append(None) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="VV"]/dwd:value', - namespaces=namespaces)[0].text - visibility = [] - for elem in result.split(): - if (elem != "-"): - visibility.append(elem) - else: - visibility.append(None) - result = tree.xpath( - '//kml:ExtendedData/dwd:Forecast[@dwd:elementName="SunD1"]/dwd:value', - namespaces=namespaces)[0].text - sun_dur = [] - for elem in result.split(): - if (elem != "-"): - sun_dur.append(round(float(elem) / 60)) - else: - sun_dur.append(None) merged_list = {} for i in range(len(timesteps)): - item = { - "temp": temperatures[i], - "condition": conditions[i], - "pressure": pressure[i], - "wind_dir": wind_dir[i], - "wind_speed": wind_speed[i], - "prec_sum": prec_sum[i], - "prec_prop": prec_prop[i], - "cloud_cov": cloud_cov[i], - "visibility": visibility[i], - "sun_dur": sun_dur[i] + WeatherDataType.TEMPERATURE.value: temperatures[i], + WeatherDataType.DEWPOINT.value: dewpoints[i], + WeatherDataType.CONDITION.value: conditions[i], + WeatherDataType.PRESSURE.value: pressure[i], + WeatherDataType.WIND_DIRECTION.value: wind_dir[i], + WeatherDataType.WIND_SPEED.value: wind_speed[i], + WeatherDataType.WIND_GUSTS.value: wind_gusts[i], + WeatherDataType.PRECIPITATION.value: prec_sum[i], + WeatherDataType.PRECIPITATION_PROBABILITY.value: prec_prop[i], + WeatherDataType.PRECIPITATION_DURATION.value: prec_dur[i], + WeatherDataType.CLOUD_COVERAGE.value: cloud_cov[i], + WeatherDataType.VISIBILITY.value: visibility[i], + WeatherDataType.SUN_DURATION.value: sun_dur[i], + WeatherDataType.SUN_IRRADIANCE.value: sun_irr[i], + WeatherDataType.FOG_PROBABILITY.value: fog_prop[i] } merged_list[timesteps[i]] = item self.forecast_data = merged_list