From 3573ead292c45a0d97509d5024f8d3216692d02f Mon Sep 17 00:00:00 2001 From: benjamin Date: Thu, 27 Jan 2022 00:09:15 +0100 Subject: [PATCH] Use fsspec for all types of file access using a monkey patch --- .github/workflows/tests.yml | 2 +- CHANGELOG.rst | 1 + THIRD_PARTY_NOTICES | 26 -- docs/conf.py | 3 +- docs/img/readme_img.py | 9 +- poetry.lock | 34 ++- pyproject.toml | 6 +- requirements.txt | 4 +- tests/__init__.py | 2 + tests/provider/dwd/test_index.py | 5 +- tests/ui/explorer/test_ui.py | 3 + wetterdienst/__init__.py | 9 + wetterdienst/provider/dwd/forecast/api.py | 22 +- wetterdienst/provider/dwd/index.py | 16 +- .../provider/dwd/observation/fileindex.py | 2 - .../provider/dwd/observation/metaindex.py | 12 +- wetterdienst/provider/dwd/radar/index.py | 6 +- wetterdienst/provider/eccc/observation/api.py | 7 +- wetterdienst/util/cache.py | 55 ---- wetterdienst/util/fsspec_monkeypatch.py | 243 ++++++++++++++++++ wetterdienst/util/network.py | 80 +----- 21 files changed, 330 insertions(+), 217 deletions(-) create mode 100644 wetterdienst/util/fsspec_monkeypatch.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d571f5d3f..7c2f13596 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -48,7 +48,7 @@ jobs: uses: actions/cache@v2 env: # Increase this value to reset cache if `poetry.lock` has not changed. - CACHE_NUMBER: 1 + CACHE_NUMBER: 2 with: path: ${{ steps.poetry-cache-dir.outputs.dir }} key: poetry-${{ runner.os }}-py${{ matrix.python-version }}-${{ hashFiles('poetry.lock') }}-${{ env.CACHE_NUMBER }} diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6e73da2b8..317759b42 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,7 @@ Development *********** - Fix access to ECCC stations listing using Google Drive storage +- Remove/replace caching entirely by fsspec (+monkeypatch) 0.24.0 (24.01.2022) ******************* diff --git a/THIRD_PARTY_NOTICES b/THIRD_PARTY_NOTICES index c9766b673..50859d5dd 100644 --- a/THIRD_PARTY_NOTICES +++ b/THIRD_PARTY_NOTICES @@ -4315,32 +4315,6 @@ licenses_ directory. .. _GPL-compatible: http://www.gnu.org/licenses/license-list.html -dogpile.cache -1.1.5 -MIT License -Mike Bayer -https://github.com/sqlalchemy/dogpile.cache -Copyright 2005-2022 Michael Bayer. - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies -of the Software, and to permit persons to whom the Software is furnished to do -so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. - - duckdb 0.2.8 MIT License diff --git a/docs/conf.py b/docs/conf.py index 527d6317d..25a643fbc 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -15,8 +15,9 @@ # for how to get the version from the pyproject.toml import os import sys -import tomlkit + import sphinx_material +import tomlkit sys.path.insert(0, os.path.abspath("..")) diff --git a/docs/img/readme_img.py b/docs/img/readme_img.py index 6fa58cf82..ba00b7578 100644 --- a/docs/img/readme_img.py +++ b/docs/img/readme_img.py @@ -10,7 +10,14 @@ from matplotlib.colors import ListedColormap from matplotlib.patches import Rectangle -from wetterdienst.provider.dwd.observation import DwdObservationRequest, DwdObservationValues, DwdObservationParameter, DwdObservationDataset, DwdObservationResolution, DwdObservationPeriod +from wetterdienst.provider.dwd.observation import ( + DwdObservationDataset, + DwdObservationParameter, + DwdObservationPeriod, + DwdObservationRequest, + DwdObservationResolution, + DwdObservationValues, +) plt.style.use('ggplot') diff --git a/poetry.lock b/poetry.lock index 0bef07ed8..c003a1bc4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -576,6 +576,14 @@ docs = ["Sphinx (>=3)", "sphinx-rtd-theme (>=0.2)"] numpy = ["numpy (>=1.13.0)", "numpy (>=1.15.0)", "numpy (>=1.18.0)", "numpy (>=1.20.0)"] tests = ["check-manifest (>=0.42)", "mock (>=1.3.0)", "pytest-cov (>=2.10.1)", "pytest-isort (>=1.2.0)", "sphinx (>=3)", "tox (>=3.7.0)", "pytest (==5.4.3)", "pytest-pycodestyle (>=2)", "pytest-pydocstyle (>=2)", "pytest (>=6)", "pytest-pycodestyle (>=2.2.0)", "pytest-pydocstyle (>=2.2.0)"] +[[package]] +name = "diskcache" +version = "5.4.0" +description = "Disk Cache -- Disk and file backed persistent cache." +category = "main" +optional = false +python-versions = ">=3" + [[package]] name = "docformatter" version = "1.4" @@ -595,18 +603,6 @@ category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" -[[package]] -name = "dogpile.cache" -version = "1.1.5" -description = "A caching front-end based on the Dogpile lock." -category = "main" -optional = false -python-versions = ">=3.6" - -[package.dependencies] -decorator = ">=4.0.0" -stevedore = ">=3.0.0" - [[package]] name = "duckdb" version = "0.2.9" @@ -1716,7 +1712,7 @@ python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" name = "pbr" version = "5.8.0" description = "Python Build Reasonableness" -category = "main" +category = "dev" optional = false python-versions = ">=2.6" @@ -2546,7 +2542,7 @@ full = ["aiofiles", "graphene", "itsdangerous", "jinja2", "python-multipart", "p name = "stevedore" version = "3.5.0" description = "Manage dynamic plugins for Python applications" -category = "main" +category = "dev" optional = false python-versions = ">=3.6" @@ -2968,7 +2964,7 @@ sql = ["duckdb"] [metadata] lock-version = "1.1" python-versions = "^3.7.1" -content-hash = "f7f42dcfb5d1b31dfdcd4eddb5bbd93e6721012435a3cc48dd9863b54fdca1a4" +content-hash = "95dbb05eba4867ab17a7470df74239a0b46ec19e7e853d7897890af358aea873" [metadata.files] aenum = [ @@ -3440,6 +3436,10 @@ dictdiffer = [ {file = "dictdiffer-0.9.0-py2.py3-none-any.whl", hash = "sha256:442bfc693cfcadaf46674575d2eba1c53b42f5e404218ca2c2ff549f2df56595"}, {file = "dictdiffer-0.9.0.tar.gz", hash = "sha256:17bacf5fbfe613ccf1b6d512bd766e6b21fb798822a133aa86098b8ac9997578"}, ] +diskcache = [ + {file = "diskcache-5.4.0-py3-none-any.whl", hash = "sha256:af3ec6d7f167bbef7b6c33d9ee22f86d3e8f2dd7131eb7c4703d8d91ccdc0cc4"}, + {file = "diskcache-5.4.0.tar.gz", hash = "sha256:8879eb8c9b4a2509a5e633d2008634fb2b0b35c2b36192d89655dbde02419644"}, +] docformatter = [ {file = "docformatter-1.4.tar.gz", hash = "sha256:064e6d81f04ac96bc0d176cbaae953a0332482b22d3ad70d47c8a7f2732eef6f"}, ] @@ -3447,10 +3447,6 @@ docutils = [ {file = "docutils-0.16-py2.py3-none-any.whl", hash = "sha256:0c5b78adfbf7762415433f5515cd5c9e762339e23369dbe8000d84a4bf4ab3af"}, {file = "docutils-0.16.tar.gz", hash = "sha256:c2de3a60e9e7d07be26b7f2b00ca0309c207e06c100f9cc2a94931fc75a478fc"}, ] -"dogpile.cache" = [ - {file = "dogpile.cache-1.1.5-py3-none-any.whl", hash = "sha256:5f9dcf99087240c7733fad5539b0806b52555917dccad1ef43499eaca8b459d9"}, - {file = "dogpile.cache-1.1.5.tar.gz", hash = "sha256:0f01bdc329329a8289af9705ff40fadb1f82a28c336f3174e12142b70d31c756"}, -] duckdb = [ {file = "duckdb-0.2.9-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:ac886826b8e2add255aa3ae3101e65da34c32125799c3a460f5abedaaaa09749"}, {file = "duckdb-0.2.9-cp36-cp36m-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:2ecd22aef9009604786029a434c38454a18b003ecbf8eecd78962c014a3f3039"}, diff --git a/pyproject.toml b/pyproject.toml index b1e17259c..2891058a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,7 +93,6 @@ beautifulsoup4 = "^4.9" requests = "^2.20" requests-ftp = "^0.3" python-dateutil = "^2.8" -"dogpile.cache" = "^1.0" appdirs = "^1.4" lxml = "^4.5" tqdm = "^4.47" @@ -145,6 +144,7 @@ ipython-genutils = { version = "^0.2", optional = true } zarr = { version = "^2.7", optional = true, markers = "sys_platform != 'darwin' or (sys_platform == 'darwin' and platform_machine != 'arm64')" } # not supported through numcodecs xarray = { version = "^0.17", optional = true } timezonefinder = "^5.2" +diskcache = "^5.4.0" [tool.poetry.dev-dependencies] @@ -258,8 +258,12 @@ flake8-print = ["+*"] flake8-return = ["+*"] flake8-2020 = ["+*"] +[tool.flakeheaven.exceptions."wetterdienst/__init__.py"] +pycodestyle = ["-E402"] [tool.flakeheaven.exceptions."**/__init__.py"] pyflakes = ["-F401"] +[tool.flakeheaven.exceptions."wetterdienst/util/fsspec_monkeypatch.py"] +flake8-bugbear = ["-B301"] [tool.flakeheaven.exceptions."example/"] flake8-print = ["-*"] [tool.flakeheaven.exceptions."tests/"] diff --git a/requirements.txt b/requirements.txt index 1321e72fc..accb39b7e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,9 +32,9 @@ decorator==5.1.1; python_version >= "3.6" and python_version < "4.0" defusedxml==0.7.1; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" and python_version >= "3.6" deprecation==2.1.0 dictdiffer==0.9.0 +diskcache==5.4.0; python_version >= "3" docformatter==1.4 docutils==0.16; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" and python_version >= "3.6" -dogpile.cache==1.1.5; python_version >= "3.6" entrypoints==0.3; python_full_version >= "3.6.2" and python_full_version < "4.0.0" and python_version >= "3.6" and (python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" and python_version >= "3.6") eradicate==2.0.0; python_version >= "3.6" and python_version < "4.0" execnet==1.9.0; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" and python_version >= "3.6" @@ -94,7 +94,7 @@ pandas==1.3.5; python_full_version >= "3.7.1" pandocfilters==1.5.0; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" and python_version >= "3.6" pastel==0.2.1; python_version >= "3.6" and python_full_version < "3.0.0" and python_version < "4.0" or python_version >= "3.6" and python_version < "4.0" and python_full_version >= "3.4.0" pathspec==0.9.0; python_full_version >= "3.6.2" -pbr==5.8.0; python_version >= "3.6" +pbr==5.8.0; python_version >= "3.7" percy==2.0.2 pint==0.17; python_version >= "3.6" pip-licenses==3.5.3; python_version >= "3.6" and python_version < "4.0" diff --git a/tests/__init__.py b/tests/__init__.py index 7ba634768..e1c7edaf7 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -15,3 +15,5 @@ windows_unsupported = pytest.mark.skipif( windows, reason="can't be tested under windows due to unsupported wradlib library" ) + +mac_py39 = sys.platform == "darwin" and sys.version_info.major == 3 and sys.version_info.minor == 9 diff --git a/tests/provider/dwd/test_index.py b/tests/provider/dwd/test_index.py index cf978f4fe..05b9eb26f 100644 --- a/tests/provider/dwd/test_index.py +++ b/tests/provider/dwd/test_index.py @@ -14,6 +14,7 @@ DwdObservationPeriod, DwdObservationResolution, ) +from wetterdienst.util.cache import CacheExpiry from wetterdienst.util.network import list_remote_files_fsspec @@ -29,8 +30,8 @@ def test_build_index_path(): @pytest.mark.remote def test_list_files_of_climate_observations(): files_server = list_remote_files_fsspec( - "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/" "annual/kl/recent", - recursive=False, + "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/annual/kl/recent", + CacheExpiry.NO_CACHE, ) assert ( diff --git a/tests/ui/explorer/test_ui.py b/tests/ui/explorer/test_ui.py index 04c7a2ace..da7874abb 100644 --- a/tests/ui/explorer/test_ui.py +++ b/tests/ui/explorer/test_ui.py @@ -22,6 +22,8 @@ import pytest from bs4 import BeautifulSoup +from tests import mac_py39 + @pytest.mark.slow @pytest.mark.cflake @@ -37,6 +39,7 @@ def test_app_layout(wetterdienst_ui, dash_tre): assert dash_tre.find_element("#graph") +@pytest.mark.skipif(mac_py39, reason="problem with selenium and mac on py39") @pytest.mark.slow @pytest.mark.cflake @pytest.mark.explorer diff --git a/wetterdienst/__init__.py b/wetterdienst/__init__.py index 7cc5c5ad6..93d77b941 100644 --- a/wetterdienst/__init__.py +++ b/wetterdienst/__init__.py @@ -4,6 +4,15 @@ # Distributed under the MIT License. See LICENSE for more info. __appname__ = "wetterdienst" + +# TODO: MONKEY PATCH FSSPEC +def monkey_patch(): + import wetterdienst.util.fsspec_monkeypatch + + +monkey_patch() + + from wetterdienst.api import Wetterdienst from wetterdienst.metadata.kind import Kind from wetterdienst.metadata.parameter import Parameter diff --git a/wetterdienst/provider/dwd/forecast/api.py b/wetterdienst/provider/dwd/forecast/api.py index 834cbfc1b..3f78b13f8 100644 --- a/wetterdienst/provider/dwd/forecast/api.py +++ b/wetterdienst/provider/dwd/forecast/api.py @@ -9,7 +9,8 @@ from urllib.parse import urljoin import pandas as pd -import requests +from fsspec.implementations.cached import WholeFileCacheFileSystem +from fsspec.implementations.http import HTTPFileSystem from requests import HTTPError from wetterdienst.core.scalar.request import ScalarRequestCore @@ -38,7 +39,7 @@ DWD_SERVER, ) from wetterdienst.provider.dwd.metadata.datetime import DatetimeFormat -from wetterdienst.util.cache import metaindex_cache +from wetterdienst.util.cache import CacheExpiry, cache_dir from wetterdienst.util.enumeration import parse_enumeration_from_template from wetterdienst.util.geo import convert_dm_to_dd from wetterdienst.util.network import list_remote_files_fsspec @@ -273,7 +274,7 @@ def get_url_for_date(url: str, date: Union[datetime, DwdForecastDate]) -> str: :param date: date used for filtering of the available files :return: file url based on the filtering """ - urls = list_remote_files_fsspec(url, recursive=False) + urls = list_remote_files_fsspec(url, CacheExpiry.NO_CACHE) if date == DwdForecastDate.LATEST: try: @@ -325,7 +326,7 @@ class DwdMosmixRequest(ScalarRequestCore): _dataset_base = DwdMosmixDataset _unit_tree = DwdMosmixUnit - _url = "https://www.dwd.de/DE/leistungen/met_verfahren_mosmix/" "mosmix_stationskatalog.cfg?view=nasPublication" + _url = "https://www.dwd.de/DE/leistungen/met_verfahren_mosmix/mosmix_stationskatalog.cfg?view=nasPublication" _colspecs = [ (0, 5), @@ -477,18 +478,23 @@ def issue_end(self): """Required for typing""" return self.issue_end - @metaindex_cache.cache_on_arguments() def _all(self) -> pd.DataFrame: """ Create meta data DataFrame from available station list :return: """ - # TODO: Cache payload with FSSPEC - payload = requests.get(self._url, headers={"User-Agent": ""}) + + fs = WholeFileCacheFileSystem( + fs=HTTPFileSystem(client_kwargs={"headers": {"User-Agent": ""}}), + cache_storage=cache_dir, + expiry_time=CacheExpiry.METAINDEX.value, + ) + + payload = fs.cat(self._url) df = pd.read_fwf( - StringIO(payload.text), + StringIO(payload.decode(encoding="latin-1")), skiprows=4, skip_blank_lines=True, colspecs=self._colspecs, diff --git a/wetterdienst/provider/dwd/index.py b/wetterdienst/provider/dwd/index.py index c62861baa..9788e501c 100644 --- a/wetterdienst/provider/dwd/index.py +++ b/wetterdienst/provider/dwd/index.py @@ -15,7 +15,7 @@ DWDCDCBase, ) from wetterdienst.provider.dwd.observation.metadata.dataset import DwdObservationDataset -from wetterdienst.util.cache import fileindex_cache_five_minutes +from wetterdienst.util.cache import CacheExpiry from wetterdienst.util.network import list_remote_files_fsspec @@ -40,18 +40,12 @@ def _create_file_index_for_dwd_server( url = reduce(urljoin, [DWD_SERVER, DWD_CDC_PATH, cdc_base.value, parameter_path]) - if resolution in [Resolution.MINUTE_1] and period in [Period.HISTORICAL]: - recursive = True - else: - recursive = False - files_server = list_remote_files_fsspec(url, recursive=recursive) - - return pd.DataFrame(files_server, columns=[DwdColumns.FILENAME.value], dtype=str) + files_server = list_remote_files_fsspec(url, ttl=CacheExpiry.TWELVE_HOURS) + if not files_server: + raise FileNotFoundError(f"url {url} does not have a list of files") -def reset_file_index_cache() -> None: - """Function to reset the cached file index for all kinds of parameters""" - fileindex_cache_five_minutes.invalidate() + return pd.DataFrame(files_server, columns=[DwdColumns.FILENAME.value], dtype=str) def build_path_to_parameter( diff --git a/wetterdienst/provider/dwd/observation/fileindex.py b/wetterdienst/provider/dwd/observation/fileindex.py index 80a0026c2..51ae8493e 100644 --- a/wetterdienst/provider/dwd/observation/fileindex.py +++ b/wetterdienst/provider/dwd/observation/fileindex.py @@ -18,7 +18,6 @@ from wetterdienst.provider.dwd.metadata.datetime import DatetimeFormat from wetterdienst.provider.dwd.observation.metadata.dataset import DwdObservationDataset from wetterdienst.provider.dwd.observation.metadata.resolution import HIGH_RESOLUTIONS -from wetterdienst.util.cache import fileindex_cache_twelve_hours def create_file_list_for_climate_observations( @@ -52,7 +51,6 @@ def create_file_list_for_climate_observations( return file_index[DwdColumns.FILENAME.value].values.tolist() -@fileindex_cache_twelve_hours.cache_on_arguments() def create_file_index_for_climate_observations( parameter_set: DwdObservationDataset, resolution: Resolution, diff --git a/wetterdienst/provider/dwd/observation/metaindex.py b/wetterdienst/provider/dwd/observation/metaindex.py index 057a6110d..004c00c13 100644 --- a/wetterdienst/provider/dwd/observation/metaindex.py +++ b/wetterdienst/provider/dwd/observation/metaindex.py @@ -31,7 +31,7 @@ DWDCDCBase, ) from wetterdienst.provider.dwd.observation.metadata.dataset import DwdObservationDataset -from wetterdienst.util.cache import CacheExpiry, metaindex_cache +from wetterdienst.util.cache import CacheExpiry from wetterdienst.util.network import download_file, list_remote_files_fsspec METADATA_COLUMNS = [ @@ -63,7 +63,6 @@ ] -@metaindex_cache.cache_on_arguments() def create_meta_index_for_climate_observations( parameter_set: DwdObservationDataset, resolution: Resolution, @@ -141,7 +140,7 @@ def _create_meta_index_for_climate_observations( ], ) - files_server = list_remote_files_fsspec(url, recursive=True) + files_server = list_remote_files_fsspec(url, ttl=CacheExpiry.METAINDEX) # Find the one meta file from the files listed on the server meta_file = _find_meta_file(files_server, url) @@ -212,7 +211,7 @@ def _create_meta_index_for_1minute_historical_precipitation() -> pd.DataFrame: ], ) - metadata_file_paths = list_remote_files_fsspec(url, recursive=False) + metadata_file_paths = list_remote_files_fsspec(url, ttl=CacheExpiry.METAINDEX) station_ids = [re.findall(STATION_ID_REGEX, file).pop(0) for file in metadata_file_paths] @@ -333,8 +332,3 @@ def _parse_zipped_data_into_df(file: BytesIO) -> pd.DataFrame: ) return file - - -def reset_meta_index_cache() -> None: - """Function to reset cache of meta index""" - metaindex_cache.invalidate() diff --git a/wetterdienst/provider/dwd/radar/index.py b/wetterdienst/provider/dwd/radar/index.py index e23c055bc..078b7c5ac 100644 --- a/wetterdienst/provider/dwd/radar/index.py +++ b/wetterdienst/provider/dwd/radar/index.py @@ -29,7 +29,7 @@ RADOLAN_DT_PATTERN, get_date_from_filename, ) -from wetterdienst.util.cache import fileindex_cache_five_minutes +from wetterdienst.util.cache import CacheExpiry from wetterdienst.util.network import list_remote_files_fsspec @@ -50,7 +50,6 @@ def use_cache() -> int: # pragma: no cover return 0 -@fileindex_cache_five_minutes.cache_on_arguments(expiration_time=use_cache) def create_fileindex_radar( parameter: DwdRadarParameter, site: Optional[DwdRadarSite] = None, @@ -89,7 +88,7 @@ def create_fileindex_radar( url = urljoin(DWD_SERVER, parameter_path) - files_server = list_remote_files_fsspec(url, recursive=True) + files_server = list_remote_files_fsspec(url, ttl=CacheExpiry.NO_CACHE) files_server = pd.DataFrame(files_server, columns=[DwdColumns.FILENAME.value], dtype="str") @@ -114,7 +113,6 @@ def create_fileindex_radar( return files_server -@fileindex_cache_five_minutes.cache_on_arguments() def create_fileindex_radolan_cdc(resolution: Resolution, period: Period) -> pd.DataFrame: """ Function used to create a file index for the RADOLAN_CDC product. The file index diff --git a/wetterdienst/provider/eccc/observation/api.py b/wetterdienst/provider/eccc/observation/api.py index 47052b215..07c29a43c 100644 --- a/wetterdienst/provider/eccc/observation/api.py +++ b/wetterdienst/provider/eccc/observation/api.py @@ -360,17 +360,14 @@ def _download_stations() -> BytesIO: ) try: - response = fs.cat(gdrive_url) - - payload = response + payload = fs.cat(gdrive_url) except Exception: log.exception(f"Unable to access Google drive server at {gdrive_url}") # Fall back to different source. try: response = fs.cat(http_url) - response.raise_for_status() - with gzip.open(BytesIO(response.content), mode="rb") as f: + with gzip.open(BytesIO(response), mode="rb") as f: payload = f.read() except Exception: log.exception(f"Unable to access HTTP server at {http_url}") diff --git a/wetterdienst/util/cache.py b/wetterdienst/util/cache.py index 45746672e..cf1230f7e 100644 --- a/wetterdienst/util/cache.py +++ b/wetterdienst/util/cache.py @@ -1,17 +1,11 @@ # -*- coding: utf-8 -*- # Copyright (c) 2018-2021, earthobservations developers. # Distributed under the MIT License. See LICENSE for more info. -import logging import os -import platform import sys from enum import Enum import appdirs -from dogpile.cache import make_region -from dogpile.cache.util import kwarg_function_key_generator - -log = logging.getLogger() # FSSPEC aiohttp client kwargs, may be used to pass extra arguments # such as proxies etc to aiohttp @@ -29,29 +23,6 @@ # Early reporting. if WD_CACHE_DISABLE: sys.stderr.write("INFO: Wetterdienst cache is disabled\n") -else: - sys.stderr.write("INFO: Wetterdienst cache directory is %s\n" % cache_dir) - -# Ensure cache directories exist. -# FIXME: Get rid of this as it executes "os.makedirs()" on the module level. -# This is not really good style but it is needed for the dogpile setup. -if not WD_CACHE_DISABLE: - cache_directories = [ - os.path.join(cache_dir, "dogpile"), - os.path.join(cache_dir, "fsspec"), - ] - for cache_directory in cache_directories: - if not os.path.exists(cache_directory): - os.makedirs(cache_directory, exist_ok=True) - -# Configure cache backend. -# TODO: Make cache backend configurable, e.g. optionally use Redis for running -# in multi-threaded environments. -platform = platform.system() -backend = "dogpile.cache.dbm" -# Python on Windows has no "fcntl", which is required by the dbm backend. -if WD_CACHE_DISABLE or platform == "Windows": - backend = "dogpile.cache.memory" class CacheExpiry(Enum): @@ -70,29 +41,3 @@ class CacheExpiry(Enum): METAINDEX = TWELVE_HOURS FILEINDEX = FIVE_MINUTES - - -# Define cache regions. -metaindex_cache = make_region(function_key_generator=kwarg_function_key_generator).configure( - backend, - expiration_time=60 * 60 * 12, - arguments={"filename": os.path.join(cache_dir, "dogpile", "metaindex.dbm")}, -) - -fileindex_cache_five_minutes = make_region(function_key_generator=kwarg_function_key_generator).configure( - backend, - expiration_time=60 * 5, - arguments={"filename": os.path.join(cache_dir, "dogpile", "fileindex_5m.dbm")}, -) - -fileindex_cache_twelve_hours = make_region(function_key_generator=kwarg_function_key_generator).configure( - backend, - expiration_time=60 * 60 * 12, - arguments={"filename": os.path.join(cache_dir, "dogpile", "fileindex_12h.dbm")}, -) - -payload_cache_twelve_hours = make_region(function_key_generator=kwarg_function_key_generator).configure( - backend, - expiration_time=60 * 60 * 12, - arguments={"filename": os.path.join(cache_dir, "dogpile", "payload_12h.dbm")}, -) diff --git a/wetterdienst/util/fsspec_monkeypatch.py b/wetterdienst/util/fsspec_monkeypatch.py new file mode 100644 index 000000000..acdb34180 --- /dev/null +++ b/wetterdienst/util/fsspec_monkeypatch.py @@ -0,0 +1,243 @@ +""" +FSSPEC MONKEY PATCH + +Code taken from +- FSSPEC (https://github.com/fsspec/filesystem_spec) +and +- our PR at (https://github.com/fsspec/filesystem_spec/pull/895) + +For FSSPEC related code the license of the project (https://github.com/fsspec/filesystem_spec/blob/master/LICENSE) +.applies + +TODO: remove this as soon as our PR is merged +""" +import logging +import warnings +from collections.abc import MutableMapping +from copy import copy +from pathlib import Path + +import aiohttp +from fsspec import asyn +from fsspec.asyn import sync +from fsspec.dircache import DirCache +from fsspec.implementations import http + +logger = logging.getLogger(__name__) + + +async def get_client(**kwargs): + return aiohttp.ClientSession(**kwargs) + + +MemDirCache = DirCache + + +class HTTPFileSystem(http.HTTPFileSystem): + def __init__( + self, + simple_links=True, + block_size=None, + same_scheme=True, + size_policy=None, + cache_type="bytes", + cache_options=None, + asynchronous=False, + loop=None, + client_kwargs=None, + get_client=get_client, + **storage_options, + ): + """ + NB: if this is called async, you must await set_client + + Parameters + ---------- + block_size: int + Blocks to read bytes; if 0, will default to raw requests file-like + objects instead of HTTPFile instances + simple_links: bool + If True, will consider both HTML tags and anything that looks + like a URL; if False, will consider only the former. + same_scheme: True + When doing ls/glob, if this is True, only consider paths that have + http/https matching the input URLs. + size_policy: this argument is deprecated + client_kwargs: dict + Passed to aiohttp.ClientSession, see + https://docs.aiohttp.org/en/stable/client_reference.html + For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}`` + get_client: Callable[..., aiohttp.ClientSession] + A callable which takes keyword arguments and constructs + an aiohttp.ClientSession. It's state will be managed by + the HTTPFileSystem class. + storage_options: key-value + Any other parameters passed on to requests + cache_type, cache_options: defaults used in open + """ + super().__init__( + simple_links=simple_links, + block_size=block_size, + same_scheme=same_scheme, + size_policy=size_policy, + cache_type=cache_type, + cache_options=cache_options, + asynchronous=asynchronous, + loop=loop, + client_kwargs=client_kwargs, + get_client=get_client, + **storage_options, + ) + request_options = copy(storage_options) + self.use_listings_cache = request_options.pop("use_listings_cache", False) + request_options.pop("listings_expiry_time", None) + request_options.pop("max_paths", None) + request_options.pop("skip_instance_cache", None) + listings_cache_type = request_options.pop("listings_cache_type", None) + listings_cache_location = request_options.pop("listings_cache_location", None) + + if self.use_listings_cache: + if listings_cache_type == "filedircache": + logger.warning(f"Dircache located at {listings_cache_location}") + + self.kwargs = request_options + + if not asynchronous: + sync(self.loop, self.set_session) + + +http.HTTPFileSystem = HTTPFileSystem + + +class FileDirCache(MutableMapping): + def __init__( + self, + use_listings_cache=True, + listings_expiry_time=None, + listings_cache_location=None, + **kwargs, + ): + """ + + Parameters + ---------- + use_listings_cache: bool + If False, this cache never returns items, but always reports KeyError, + and setting items has no effect + listings_expiry_time: int or float (optional) + Time in seconds that a listing is considered valid. If None, + listings do not expire. + listings_cache_location: str (optional) + Directory path at which the listings cache file is stored. If None, + an autogenerated path at the user folder is created. + + """ + import appdirs + from diskcache import Cache + + listings_expiry_time = listings_expiry_time and float(listings_expiry_time) + + if listings_cache_location: + listings_cache_location = Path(listings_cache_location) / str(listings_expiry_time) + listings_cache_location.mkdir(exist_ok=True, parents=True) + else: + listings_cache_location = Path(appdirs.user_cache_dir(appname="fsspec_dircache")) / str( + listings_expiry_time + ) + + try: + listings_cache_location.mkdir(exist_ok=True, parents=True) + except Exception: + logger.error(f"folder for dircache could not be created at {listings_cache_location}") + + self.cache_location = listings_cache_location + + self._cache = Cache(directory=listings_cache_location) + self.use_listings_cache = use_listings_cache + self.listings_expiry_time = listings_expiry_time + + def __getitem__(self, item): + """Draw item as fileobject from cache, retry if timeout occurs""" + return self._cache.get(key=item, read=True, retry=True) + + def clear(self): + self._cache.clear() + + def __len__(self): + return len(list(self._cache.iterkeys())) + + def __contains__(self, item): + value = self._cache.get(item, retry=True) # None, if expired + if value: + return True + return False + + def __setitem__(self, key, value): + if not self.use_listings_cache: + return + self._cache.set(key=key, value=value, expire=self.listings_expiry_time, retry=True) + + def __delitem__(self, key): + del self._cache[key] + + def __iter__(self): + return (k for k in self._cache.iterkeys() if k in self) + + def __reduce__(self): + return ( + FileDirCache, + (self.use_listings_cache, self.listings_expiry_time, self.cache_location), + ) + + +class AsyncFileSystem(asyn.AsyncFileSystem): + def __init__(self, *args, **storage_options): + """Create and configure file-system instance + + Instances may be cachable, so if similar enough arguments are seen + a new instance is not required. The token attribute exists to allow + implementations to cache instances if they wish. + + A reasonable default should be provided if there are no arguments. + + Subclasses should call this method. + + Parameters + ---------- + use_listings_cache, listings_expiry_time, max_paths: + passed to ``MemDirCache``, if the implementation supports + directory listing caching. Pass use_listings_cache=False + to disable such caching. + skip_instance_cache: bool + If this is a cachable implementation, pass True here to force + creating a new instance even if a matching instance exists, and prevent + storing this instance. + asynchronous: bool + loop: asyncio-compatible IOLoop or None + """ + if self._cached: + # reusing instance, don't change + return + self._cached = True + self._intrans = False + self._transaction = None + self._invalidated_caches_in_transaction = [] + + listings_cache_type = storage_options.get("listings_cache_type", "memdircache") + if listings_cache_type not in ("memdircache", "filedircache"): + raise ValueError("'listings_cache_type' has to be one of ('memdircache', 'filedircache')") + if listings_cache_type == "memdircache": + self.dircache = MemDirCache(**storage_options) + else: + self.dircache = FileDirCache(**storage_options) + + if storage_options.pop("add_docs", None): + warnings.warn("add_docs is no longer supported.", FutureWarning) + + if storage_options.pop("add_aliases", None): + warnings.warn("add_aliases has been removed.", FutureWarning) + # This is set in _Cached + self._fs_token_ = None + + +asyn.AsyncFileSystem = AsyncFileSystem diff --git a/wetterdienst/util/network.py b/wetterdienst/util/network.py index 9f80a88f9..e77ebf79c 100644 --- a/wetterdienst/util/network.py +++ b/wetterdienst/util/network.py @@ -4,10 +4,7 @@ import os from io import BytesIO from typing import List, Optional, Union -from urllib.parse import urljoin -import requests -from bs4 import BeautifulSoup from fsspec.implementations.cached import WholeFileCacheFileSystem from fsspec.implementations.http import HTTPFileSystem @@ -18,11 +15,7 @@ cache_dir, ) -# v1: Global HTTP session object for custom implementation based on "requests". -session = requests.Session() - -# v2: Remote filesystem access through FSSPEC. class NetworkFilesystemManager: """ Manage multiple FSSPEC instances keyed by cache expiration time. @@ -52,9 +45,7 @@ def register(cls, ttl=CacheExpiry.NO_CACHE): filesystem_effective = filesystem_real else: filesystem_effective = WholeFileCacheFileSystem( - fs=filesystem_real, - cache_storage=real_cache_dir, - expiry_time=ttl_value, + fs=filesystem_real, cache_storage=real_cache_dir, expiry_time=ttl_value ) cls.filesystems[key] = filesystem_effective @@ -67,50 +58,7 @@ def get(cls, ttl=CacheExpiry.NO_CACHE): return cls.filesystems[key] -# v1: Custom "remote directory index" implementation. -def list_remote_files_legacy(url: str, recursive: bool) -> List[str]: - """ - A function used to create a listing of all files of a given path on the server - - Args: - url: the url which should be searched for files - recursive: definition if the function should iteratively list files - from sub folders - - Returns: - a list of strings representing the files from the path - """ - - if not url.endswith("/"): - url += "/" - - r = session.get(url) - r.raise_for_status() - - soup = BeautifulSoup(r.text, "lxml") - - files_and_folders = [link.get("href") for link in soup.find_all("a") if link.get("href") != "../"] - - files = [] - folders = [] - - for f in files_and_folders: - if not f.endswith("/"): - files.append(urljoin(url, f)) - else: - folders.append(urljoin(url, f)) - - if recursive: - files_in_folders = [list_remote_files_legacy(folder, recursive) for folder in folders] - - for files_in_folder in files_in_folders: - files.extend(files_in_folder) - - return files - - -# v2: "Remote directory index" implementation based on FSSPEC. -def list_remote_files_fsspec(url: str, recursive: bool = False, ttl: CacheExpiry = CacheExpiry.FILEINDEX) -> List[str]: +def list_remote_files_fsspec(url: str, ttl: CacheExpiry = CacheExpiry.FILEINDEX) -> List[str]: """ A function used to create a listing of all files of a given path on the server. @@ -125,22 +73,14 @@ def list_remote_files_fsspec(url: str, recursive: bool = False, ttl: CacheExpiry Returns: A list of strings representing the files from the path. """ - - # Acquire filesystem instance. - filesystem = NetworkFilesystemManager.get(ttl=ttl) - - # Recursively list remote directory. - if not recursive: - remote_urls = filesystem.find(url) - else: - remote_urls = filesystem.expand_path(url, recursive=recursive) - - # Only list files, so remove all directories. - try: - remote_urls.remove(url) - except ValueError: - pass - return [i for i in remote_urls if not i.endswith("/")] + fs = HTTPFileSystem( + use_listings_cache=True, + listings_expiry_time=not WD_CACHE_DISABLE and ttl.value, + listings_cache_type="filedircache", + listings_cache_location=cache_dir, + ) + + return fs.find(url) def download_file(url: str, ttl: Optional[int] = CacheExpiry.NO_CACHE) -> BytesIO: