diff --git a/CHANGES.md b/CHANGES.md index 07312f006..381272f05 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,10 @@ ### Enhancements +* Enhanced the `xcube.core.store.DataStore` class to support _preloading_ + of data resources. The preloading API is represented by + new `xcube.core.store.DataPreloader` interface. + * A `xy_res` keyword argument was added to the `transform()` method of `xcube.core.gridmapping.GridMapping`, enabling users to set the grid-mapping resolution directly, which speeds up the method by avoiding time-consuming diff --git a/environment.yml b/environment.yml index 2f3808f34..dcaee5edd 100644 --- a/environment.yml +++ b/environment.yml @@ -38,6 +38,7 @@ dependencies: - s3fs >=2021.6 - setuptools >=41.0 - shapely >=1.6 + - tabulate >=0.9 - tornado >=6.0 - urllib3 >=1.26 - xarray >=2022.6 diff --git a/examples/notebooks/datastores/preload.ipynb b/examples/notebooks/datastores/preload.ipynb new file mode 100644 index 000000000..2aefd6e9a --- /dev/null +++ b/examples/notebooks/datastores/preload.ipynb @@ -0,0 +1,368 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "7f55e16a-46f3-4865-879a-a7cae151daa6", + "metadata": {}, + "source": [ + "### `ExecutorPreloadHandle` Demo\n", + "\n", + "This notebook is dedicated to developers wanting to enhance their\n", + "data store implementation by the new _data store preload API_. \n", + "This API has been added to `xcube.core.store.DataStore` in xcube 1.8.\n", + "Demonstrated here is the usage of the utility class ` xcube.core.store.preload.ExecutorPreloadHandle` \n", + "for cases where the preload process can be concurrently performed for each indiviual data resource. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "7e27d98a-6519-4c4f-9e59-ae6369e102b9", + "metadata": {}, + "outputs": [], + "source": [ + "import random\n", + "import time\n", + "\n", + "from xcube.core.store.preload import ExecutorPreloadHandle\n", + "from xcube.core.store.preload import PreloadHandle\n", + "from xcube.core.store.preload import PreloadState\n", + "from xcube.core.store.preload import PreloadStatus" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "92b57a80-14aa-4820-8acc-ab78fa45be0a", + "metadata": {}, + "outputs": [], + "source": [ + "data_ids = (\n", + " \"tt-data/tinky-winky.nc\", \n", + " \"tt-data/dipsy.zarr\", \n", + " \"tt-data/laa-laa.tiff\", \n", + " \"tt-data/po.zarr.zip\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "c3175fc6-084d-4da5-b7af-fe3f03758a7d", + "metadata": {}, + "outputs": [], + "source": [ + "def preload_data(handle: PreloadHandle, data_id: str):\n", + " duration = random.randint(5, 15) # seconds\n", + " num_ticks = 100 \n", + " for i in range(num_ticks):\n", + " time.sleep(duration / num_ticks)\n", + " if handle.cancelled:\n", + " # TODO: Note clear, why future.cancel() doesn't do the job\n", + " handle.notify(PreloadState(data_id, status=PreloadStatus.cancelled))\n", + " return\n", + " handle.notify(PreloadState(data_id, progress=i / num_ticks))\n", + " if i % 10 == 0:\n", + " handle.notify(PreloadState(data_id, message=f\"Step #{i // 10 + 1}\"))\n", + " handle.notify(PreloadState(data_id, progress=1.0, message=\"Done.\"))" + ] + }, + { + "cell_type": "markdown", + "id": "48792654-20e5-46db-aa47-37c03f46b0a7", + "metadata": {}, + "source": [ + "---\n", + "Synchronous / blocking call" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "c8bb6812-af18-4e5f-8df0-d77889552e99", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "f88a78a4ffff41cd917f73405a1183e6", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "VBox(children=(HTML(value='\\n\\n
Data ID Status Progress …" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "handle = ExecutorPreloadHandle(data_ids, preload_data=preload_data)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "8911ae1b-765c-4030-aeca-fc997e7a880d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
Data ID Status Progress Message Exception
tt-data/tinky-winky.ncSTOPPED 100% Done. -
tt-data/dipsy.zarr STOPPED 100% Done. -
tt-data/laa-laa.tiff STOPPED 100% Done. -
tt-data/po.zarr.zip STOPPED 100% Done. -
" + ], + "text/plain": [ + "Data ID Status Progress Message Exception\n", + "---------------------- -------- ---------- --------- -----------\n", + "tt-data/tinky-winky.nc STOPPED 100% Done. -\n", + "tt-data/dipsy.zarr STOPPED 100% Done. -\n", + "tt-data/laa-laa.tiff STOPPED 100% Done. -\n", + "tt-data/po.zarr.zip STOPPED 100% Done. -" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "handle" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "5fe9a8e6-1543-469f-bb29-f1c7b3462953", + "metadata": {}, + "outputs": [], + "source": [ + "handle.close()" + ] + }, + { + "cell_type": "markdown", + "id": "fe45c10a-e20f-439f-85bb-4aa95a68f3d4", + "metadata": {}, + "source": [ + "---\n", + "Asynchronous / non-blocking call" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "69cb1b3d-3fcc-4c66-b077-d474a0083e6e", + "metadata": {}, + "outputs": [], + "source": [ + "async_handle = ExecutorPreloadHandle(data_ids, blocking=False, preload_data=preload_data)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "d5a50ec1-addf-46a4-90e1-abfc5b2e95de", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "eec94f7b3cdd4d5e96411f980aa55a92", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "VBox(children=(HTML(value='\\n\\n
Data ID Status Progress …" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "async_handle.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "7a2f1f9a-9ea0-43b5-82ed-b90a851f938a", + "metadata": {}, + "outputs": [], + "source": [ + "time.sleep(2)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "436e7042-abf2-4450-a334-eed75f045c37", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "PreloadState(data_id='tt-data/dipsy.zarr', status=PreloadStatus.started, progress=0.17, message='Step #2')" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "async_handle.get_state(\"tt-data/dipsy.zarr\")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "07d81416-38e8-4b23-9220-d1f57160d9b6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "data_id=tt-data/dipsy.zarr, status=STARTED, progress=0.17, message=Step #2\n" + ] + } + ], + "source": [ + "print(async_handle.get_state(\"tt-data/dipsy.zarr\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "f17aeecb-6650-43d5-9c41-cedb46baaa3f", + "metadata": {}, + "outputs": [], + "source": [ + "time.sleep(2)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "a911b93a-fb76-4d2f-88e8-0fd4e635246c", + "metadata": {}, + "outputs": [], + "source": [ + "async_handle.cancel()" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "df2f5a5c-82a8-43ae-ae84-5c9d882e3757", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Data ID Status Progress Message Exception\n", + "---------------------- -------- ---------- --------- -----------\n", + "tt-data/tinky-winky.nc STARTED 38% Step #4 -\n", + "tt-data/dipsy.zarr STARTED 35% Step #4 -\n", + "tt-data/laa-laa.tiff STARTED 55% Step #6 -\n", + "tt-data/po.zarr.zip STARTED 29% Step #3 -\n" + ] + } + ], + "source": [ + "print(async_handle)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "769857c9-bad8-4824-92fb-a195e4d5ccb3", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
Data ID Status Progress Message Exception
tt-data/tinky-winky.ncCANCELLED38% Step #4 -
tt-data/dipsy.zarr STARTED 35% Step #4 -
tt-data/laa-laa.tiff STARTED 55% Step #6 -
tt-data/po.zarr.zip STARTED 29% Step #3 -
" + ], + "text/plain": [ + "Data ID Status Progress Message Exception\n", + "---------------------- --------- ---------- --------- -----------\n", + "tt-data/tinky-winky.nc CANCELLED 38% Step #4 -\n", + "tt-data/dipsy.zarr STARTED 35% Step #4 -\n", + "tt-data/laa-laa.tiff STARTED 55% Step #6 -\n", + "tt-data/po.zarr.zip STARTED 29% Step #3 -" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "async_handle" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "4d0d42ee-5caa-4eef-913a-1f0417670001", + "metadata": {}, + "outputs": [], + "source": [ + "async_handle.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5c3d89bf-f58f-495b-a5d1-c9fe0b9610a7", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pyproject.toml b/pyproject.toml index 0e4a89c9e..158a4ae11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ dependencies = [ "s3fs>=2021.6", "setuptools>=41.0", "shapely>=1.6", + "tabulate>=0.9", "tornado>=6.0", "urllib3>=1.26", "xarray>=2022.6,<=2024.6", diff --git a/test/core/store/test_preload.py b/test/core/store/test_preload.py new file mode 100644 index 000000000..136a9b9e8 --- /dev/null +++ b/test/core/store/test_preload.py @@ -0,0 +1,44 @@ +# Copyright (c) 2018-2024 by xcube team and contributors +# Permissions are hereby granted under the terms of the MIT License: +# https://opensource.org/licenses/MIT. + +import unittest + + +from xcube.core.store import PreloadStatus +from xcube.core.store import PreloadState + + +class PreloadStateTest(unittest.TestCase): + def test_str(self): + state = PreloadState( + "test.zip", status=PreloadStatus.started, progress=0.71, message="Unzipping" + ) + self.assertEqual( + "data_id=test.zip, " + "status=STARTED, " + "progress=0.71, " + "message=Unzipping", + str(state), + ) + + def test_repr(self): + state = PreloadState( + "test.zip", status=PreloadStatus.started, progress=0.71, message="Unzipping" + ) + self.assertEqual( + "PreloadState(" + "data_id='test.zip', " + "status=PreloadStatus.started, " + "progress=0.71, " + "message='Unzipping')", + repr(state), + ) + + +class PreloadStatusTest(unittest.TestCase): + def test_str(self): + self.assertEqual("CANCELLED", str(PreloadStatus.cancelled)) + + def test_repr(self): + self.assertEqual("PreloadStatus.started", repr(PreloadStatus.started)) diff --git a/xcube/core/store/__init__.py b/xcube/core/store/__init__.py index f7f4a00ac..c567c5f97 100644 --- a/xcube/core/store/__init__.py +++ b/xcube/core/store/__init__.py @@ -4,6 +4,7 @@ from .accessor import DataDeleter from .accessor import DataOpener +from .accessor import DataPreloader from .accessor import DataTimeSliceUpdater from .accessor import DataWriter from .accessor import find_data_opener_extensions @@ -27,10 +28,9 @@ from .descriptor import new_data_descriptor from .error import DataStoreError from .fs.registry import new_fs_data_store -from .preload import PreloadEvent -from .preload import PreloadEventType from .preload import PreloadHandle -from .preload import PreloadMonitor +from .preload import PreloadState +from .preload import PreloadStatus from .search import DataSearcher from .search import DefaultSearchMixin from .store import DataStore diff --git a/xcube/core/store/accessor.py b/xcube/core/store/accessor.py index 7a6182576..335c41349 100644 --- a/xcube/core/store/accessor.py +++ b/xcube/core/store/accessor.py @@ -3,7 +3,7 @@ # https://opensource.org/licenses/MIT. from abc import abstractmethod, ABC -from typing import Any, List, Optional +from typing import Any, Optional import xarray as xr @@ -15,6 +15,7 @@ from xcube.util.extension import ExtensionRegistry from xcube.util.jsonschema import JsonObjectSchema from xcube.util.plugin import get_extension_registry +from .preload import PreloadHandle from .datatype import DataType from .datatype import DataTypeLike from .error import DataStoreError @@ -128,7 +129,7 @@ def find_data_writer_extensions( def get_data_accessor_predicate( data_type: DataTypeLike = None, format_id: str = None, storage_id: str = None ) -> ExtensionPredicate: - """Get a predicate that checks if a data accessor extensions's name is + """Get a predicate that checks if the name of a data accessor extension is compliant with *data_type*, *format_id*, *storage_id*. Args: @@ -171,7 +172,7 @@ def _predicate(extension: Extension) -> bool: ####################################################### -# Classes +# Interface Classes ####################################################### @@ -322,6 +323,62 @@ def write_data( """ +class DataPreloader(ABC): + """An interface that specifies a parameterized `preload_data()` operation. + + Warning: This is an experimental and potentially unstable API + introduced in xcube 1.8. + + Many data store implementations rely on remote data APIs. + Such API may provide only limited data access performance. + Hence, the approach taken by ``DataStore.open_data(data_id, ...)`` alone + is suboptimal for a user's perspective. This is because the method is + blocking as it is not asynchronous, it may take long time before it + returns, it cannot report any progress while doing so. + The reasons for slow and unresponsive data APIs are manifold: intended + access is by file download, access is bandwidth limited, or not allowing + for sub-setting. + + Data stores may differently implement the ``preload_data()`` method, + usually not at all. If preloading is required, the data will be + downloaded in most cases and made available via some temporary cache. + """ + + @abstractmethod + def preload_data( + self, + *data_ids: str, + **preload_params: Any, + ) -> PreloadHandle: + """Preload the given data items for faster access. + + The method is blocking by default. You can pass a `monitor` to observe the + preload process, to cancel it, or to pass a function that is called if the + preload process successfully completed. + + Args: + data_ids: Data identifiers to be preloaded. + preload_params: data store specific preload parameters. + See method ``get_preload_data_params_schema()`` for information + on the possible options. + + Returns: + A handle for the preload process. The default implementation + returns an empty preload handle. + """ + + # noinspection PyMethodMayBeStatic + @abstractmethod + def get_preload_data_params_schema(self) -> JsonObjectSchema: + """Get the JSON schema that describes the keyword + arguments that can be passed to ``preload_data()``. + + Returns: + A ``JsonObjectSchema`` object whose properties describe + the parameters of ``preload_data()``. + """ + + class DataTimeSliceUpdater(DataWriter, ABC): """An interface that specifies writing of time slice data.""" diff --git a/xcube/core/store/preload.py b/xcube/core/store/preload.py index 0df97cf3f..be714b03f 100644 --- a/xcube/core/store/preload.py +++ b/xcube/core/store/preload.py @@ -2,116 +2,105 @@ # Permissions are hereby granted under the terms of the MIT License: # https://opensource.org/licenses/MIT. +from abc import abstractmethod, ABC +from asyncio import CancelledError +from concurrent.futures import Executor, Future +from concurrent.futures.thread import ThreadPoolExecutor from enum import Enum -from typing import Callable +import threading +from typing import Any, Callable +import tabulate -class PreloadState(Enum): - """Preload process state.""" +from xcube.util.assertions import assert_given, assert_instance - created = "created" + +class PreloadStatus(Enum): + """Preload process status.""" + + waiting = "waiting" started = "started" stopped = "stopped" cancelled = "cancelled" failed = "failed" + def __str__(self): + return self.name.upper() -class PreloadEventType(Enum): - """Type of preload process event.""" - - state = "state" - progress = "progress" - info = "info" - warning = "warning" - error = "error" - - -class PreloadEvent: - """Event to occur during the preload process.""" - - @classmethod - def state(cls, state: PreloadState): - """Create an event of type ``state``.""" - return PreloadEvent(PreloadEventType.state, state=state) - - @classmethod - def progress(cls, progress: float): - """Create an event of type ``process``.""" - return PreloadEvent(PreloadEventType.progress, progress=progress) - - @classmethod - def info(cls, message: str): - """Create an event of type ``info``.""" - return PreloadEvent(PreloadEventType.info, message=message) + def __repr__(self): + return f"{self.__class__.__name__}.{self.name}" - @classmethod - def warning(cls, message: str, warning: Warning | None = None): - """Create an event of type ``warning``.""" - return PreloadEvent(PreloadEventType.warning, message=message, warning=warning) - @classmethod - def error(cls, message: str, exception: Exception | None = None): - """Create an event of type ``error``.""" - return PreloadEvent( - PreloadEventType.error, message=message, exception=exception - ) +class PreloadState: + """Preload state.""" - # noinspection PyShadowingBuiltins def __init__( self, - type: PreloadEventType, - state: PreloadState | None = None, + data_id: str, + status: PreloadStatus | None = None, progress: float | None = None, message: str | None = None, - warning: Warning | None = None, - exception: Exception | None = None, + exception: BaseException | None = None, ): - self.type = type - self.state = state + assert_given(data_id, name="data_id") + self.data_id = data_id + self.status = status self.progress = progress self.message = message - self.warning = warning self.exception = exception + def update(self, event: "PreloadState"): + """Update this state with the given partial state. -class PreloadMonitor: + Args: + event: the partial state. + """ + assert_instance(event, PreloadState, name="data_id") + if self.data_id == event.data_id: + if event.status is not None: + self.status = event.status + if event.progress is not None: + self.progress = event.progress + if event.message is not None: + self.message = event.message + if event.exception is not None: + self.exception = event.exception - def __init__( - self, - on_event: Callable[["PreloadMonitor", PreloadEvent], None] | None = None, - on_done: Callable[["PreloadMonitor"], None] | None = None, - ): - self._is_cancelled = False - if on_event: - self.on_event = on_event - if on_done: - self.on_done = on_done + def __str__(self): + return ", ".join(f"{k}={v}" for k, v in _to_dict(self).items()) - @property - def is_cancelled(self): - """Is cancellation requested?""" - return self._is_cancelled + def __repr__(self): + args = ", ".join(f"{k}={v!r}" for k, v in _to_dict(self).items()) + return f"{self.__class__.__name__}({args})" - def cancel(self): - """Request cancellation.""" - self._is_cancelled = True - def on_event(self, event: PreloadEvent): - """Called when an event occurs.""" +class PreloadHandle(ABC): + """A handle for a preload job. - def on_done(self): - """Called when the preload process is done and - the data is ready to be accessed. + TODO: add more doc + """ - The method is not called only on success. + @abstractmethod + def get_state(self, data_id: str) -> PreloadState: + """Get the preload state for the given *data_id*. + + Args: + data_id: The data identifier. + Returns: + The preload state. """ + @property + @abstractmethod + def cancelled(self) -> bool: + """True` if the preload job has been cancelled.""" -class PreloadHandle: - """Represents an ongoing preload process.""" + @abstractmethod + def cancel(self): + """Cancel the preload job.""" def close(self): - """Closes the preload. + """Close the preload job. Should be called if the preloaded data is no longer needed. @@ -121,6 +110,27 @@ def close(self): The default implementation does nothing. """ + @abstractmethod + def show(self) -> Any: + """Show the current state of the preload job. + + This method is useful for non-blocking / asynchronous preload + implementations, especially in a Jupyter Notebook context. + In this case an implementation might want to display a widget + suitable for in-place updating, e.g., an ``ipywidgets`` widget. + """ + + def notify(self, event: PreloadState): + """Notify about a preload state change. + + Updates the preload job using the given partial state + *event* that refers to state changes of a running + preload task. + + Args: + event: A partial state + """ + def __enter__(self) -> "PreloadHandle": """Enter the context. @@ -132,3 +142,263 @@ def __enter__(self) -> "PreloadHandle": def __exit__(self, exc_type, exc_val, exc_tb): """Exit the context. Calls ``close()``.""" self.close() + + +class NullPreloadHandle(PreloadHandle): + """Null-pattern implementation of a ``PreloadHandle``.""" + + def get_state(self, data_id: str) -> PreloadState: + return PreloadState(data_id) + + @property + def cancelled(self) -> bool: + return False + + def cancel(self): + pass + + def show(self) -> Any: + return None + + +class ExecutorPreloadHandle(PreloadHandle): + """TODO - Add docs""" + + def __init__( + self, + data_ids: tuple[str, ...], + preload_data: Callable[[PreloadHandle, str], None] | None = None, + executor: Executor | None = None, + blocking: bool = True, + ): + self._preload_data = preload_data + self._executor = executor or ThreadPoolExecutor() + self._blocking = blocking + + self._states = {data_id: PreloadState(data_id=data_id) for data_id in data_ids} + self._cancel_event = threading.Event() + self._display = PreloadDisplay.create(list(self._states.values())) + self._lock = threading.Lock() + self._futures: dict[str, Future[str]] = {} + for data_id in data_ids: + future: Future[str] = self._executor.submit(self._run_preload_data, data_id) + future.add_done_callback(self._handle_preload_data_done) + self._futures[data_id] = future + + if blocking: + self._display.show() + self._executor.shutdown(wait=True) + + def get_state(self, data_id: str) -> PreloadState: + return self._states[data_id] + + @property + def cancelled(self) -> bool: + """Return true if and only if the internal flag is true.""" + return self._cancel_event.is_set() + + def cancel(self): + self._cancel_event.set() + for future in self._futures.values(): + future.cancel() + self._executor.shutdown(wait=False) + + def notify(self, event: PreloadState): + state = self._states[event.data_id] + if ( + event.status is not None + and event.status != state.status + and state.status + in (PreloadStatus.stopped, PreloadStatus.cancelled, PreloadStatus.failed) + ): + # Status cannot be changed + return + with self._lock: + state.update(event) + self._display.update() + + def preload_data(self, data_id: str): + """Preload the data resource given by *data_id*. + + Concurrently executes the *preload_data* passed to the constructor, + if any. Otherwise, it does nothing. + + Can be overridden by clients to implement the actual preload operation. + + Args: + data_id: The data identifier of the data resource to be preloaded. + """ + if self._preload_data is not None: + self._preload_data(self, data_id) + + def _run_preload_data(self, data_id: str) -> str: + self.notify(PreloadState(data_id, status=PreloadStatus.started)) + self.preload_data(data_id) + return data_id + + def _handle_preload_data_done(self, f: Future[str]): + data_id: str | None = None + for data_id, future in self._futures.items(): + if f is future: + break + if data_id is None: + return + try: + _value = f.result() + self.notify(PreloadState(data_id, status=PreloadStatus.stopped)) + except CancelledError as e: + self.notify( + PreloadState(data_id, status=PreloadStatus.cancelled, exception=e) + ) + except Exception as e: + self.notify(PreloadState(data_id, status=PreloadStatus.failed, exception=e)) + + def show(self) -> Any: + return self._display.show() + + def _repr_html_(self): + return self._display.to_html() + + def __str__(self): + return self._display.to_text() + + def __repr__(self): + return self._display.to_text() + + def __enter__(self) -> "PreloadHandle": + """Enter the context. + + Does nothing but returning this handle. + Only useful when in blocking mode. + + Returns: + This object. + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the context. + + Calls ``close()`` if in blocking mode. + Otherwise, does nothing. + """ + if self._blocking: + self.close() + + +class PreloadDisplay(ABC): + + @classmethod + def create(cls, states: list[PreloadState]) -> "PreloadDisplay": + try: + # noinspection PyUnresolvedReferences + from IPython.display import display + + if display is not None: + try: + return IPyWidgetsPreloadDisplay(states) + except ImportError: + return IPyPreloadDisplay(states) + except ImportError: + pass + return PreloadDisplay(states) + + def __init__(self, states: list[PreloadState]): + self.states = states + + def _repr_html_(self) -> str: + return self.to_html() + + def to_text(self) -> str: + return self.tabulate(table_format="simple") + + def to_html(self) -> str: + return self.tabulate(table_format="html") + + def tabulate(self, table_format: str = "simple") -> str: + """Generate HTML table from job list.""" + rows = [ + [ + state.data_id, + f"{state.status}" if state.status is not None else "-", + ( + f"{round(state.progress * 100)}%" + if state.progress is not None + else "-" + ), + state.message or "-", + state.exception or "-", + ] + for state in self.states + ] + + return tabulate.tabulate( + rows, + headers=["Data ID", "Status", "Progress", "Message", "Exception"], + tablefmt=table_format, + ) + + def show(self): + """Display the widget container.""" + print(self.to_text()) + + def update(self): + """Update the display.""" + print(self.to_text()) + + def log(self, message: str): + """Log a message to the output widget.""" + print(message) + + +class IPyPreloadDisplay(PreloadDisplay): + def __init__(self, states: list[PreloadState]): + super().__init__(states) + from IPython import display + + self._ipy_display = display + + def show(self): + """Display the widget container.""" + self._ipy_display.display(self.to_html()) + + def update(self): + """Update the display.""" + self._ipy_display.clear_output(wait=True) + self._ipy_display.display(self.to_html()) + + def log(self, message: str): + """Log a message to the output widget.""" + self._ipy_display.display(message) + + +class IPyWidgetsPreloadDisplay(IPyPreloadDisplay): + + def __init__(self, states: list[PreloadState]): + super().__init__(states) + import ipywidgets + + self._state_table = ipywidgets.HTML(self.to_html()) + self._output = ipywidgets.Output() # not used yet + self._container = ipywidgets.VBox([self._state_table, self._output]) + + def show(self): + """Display the widget container.""" + self._ipy_display.display(self._container) + + def update(self): + """Update the display.""" + self._state_table.value = self.to_html() + + def log(self, message: str): + """Log a message to the output widget.""" + with self._output: + print(message) + + +def _to_dict(obj: object): + return { + k: v + for k, v in obj.__dict__.items() + if isinstance(k, str) and not k.startswith("_") and v is not None + } diff --git a/xcube/core/store/store.py b/xcube/core/store/store.py index dbb4d9190..4fbcaa975 100644 --- a/xcube/core/store/store.py +++ b/xcube/core/store/store.py @@ -3,7 +3,7 @@ # https://opensource.org/licenses/MIT. from abc import abstractmethod, ABC -from typing import Tuple, Any, Optional, List, Type, Dict, Union +from typing import Any, Optional, Union from collections.abc import Iterator, Container from xcube.constants import EXTENSION_POINT_DATA_STORES @@ -13,13 +13,14 @@ from xcube.util.jsonschema import JsonObjectSchema from xcube.util.plugin import get_extension_registry from .accessor import DataOpener +from .accessor import DataPreloader from .accessor import DataWriter from .assertions import assert_valid_params from .datatype import DataTypeLike from .descriptor import DataDescriptor from .error import DataStoreError from .preload import PreloadHandle -from .preload import PreloadMonitor +from .preload import NullPreloadHandle from .search import DataSearcher @@ -141,7 +142,7 @@ def list_data_store_ids(detail: bool = False) -> Union[list[str], dict[str, Any] ####################################################### -class DataStore(DataOpener, DataSearcher, ABC): +class DataStore(DataOpener, DataSearcher, DataPreloader, ABC): """A data store represents a collection of data resources that can be enumerated, queried, and opened in order to obtain in-memory representations of the data. The same data resource may be @@ -466,11 +467,25 @@ def open_data( DataStoreError: If an error occurs. """ + def get_preload_data_params_schema(self) -> JsonObjectSchema: + """Get the JSON schema that describes the keyword + arguments that can be passed to ``preload_data()``. + + Refer to the ``DataPreloader`` interface for more information. + + Warning: This is an experimental and potentially unstable API + introduced in xcube 1.8. + + Returns: + A ``JsonObjectSchema`` object whose properties describe + the parameters of ``preload_data()``. + """ + return JsonObjectSchema(additional_properties=False) + # noinspection PyMethodMayBeStatic def preload_data( self, *data_ids: str, - monitor: PreloadMonitor | None = None, **preload_params: Any, ) -> PreloadHandle: """Preload the given data items for faster access. @@ -478,24 +493,10 @@ def preload_data( Warning: This is an experimental and potentially unstable API introduced in xcube 1.8. - Many implementations of this ``DataStore`` interface rely on remote - data APIs. Such API may provide only limited data access performance. - Hence, the approach taken by ``store.open_data(data_id, ...)`` alone - is suboptimal for a user's perspective. This is because the method is - blocking as it is not asynchronous, it may take long time before it - returns, it cannot report any progress while doing so. - The reasons for slow and unresponsive data APIs are manifold: intended - access is by file download, access is bandwidth limited, or not allowing - for sub-setting. - - Data stores may differently implement the ``preload_data()`` method, - usually not at all. If preloading is required, the data will be - downloaded in most cases and made available via some temporary cache. + Refer to the ``DataPreloader`` interface for more information. Args: data_ids: Data identifiers to be preloaded. - monitor: A monitor that can be used to observe and/or cancel - the preload process. preload_params: data store specific preload parameters. See method ``get_preload_data_params_schema()`` for information on the possible options. @@ -507,18 +508,6 @@ def preload_data( return NullPreloadHandle() # noinspection PyMethodMayBeStatic - def get_preload_data_params_schema(self) -> JsonObjectSchema: - """Get the JSON schema that describes the keyword - arguments that can be passed to ``preload_data()``. - - Warning: This is an experimental and potentially unstable API - introduced in xcube 1.8. - - Returns: - A ``JsonObjectSchema`` object whose properties describe - the parameters of ``preload_data()``. - """ - return JsonObjectSchema(additional_properties=False) class MutableDataStore(DataStore, DataWriter, ABC):