Skip to content

Commit

Permalink
Merge pull request #451 from dcs4cop/toniof-450-recognize-data-ids
Browse files Browse the repository at this point in the history
Toniof 450 recognize data ids
  • Loading branch information
TonioF authored May 26, 2021
2 parents f4d7dd0 + 1f5eb72 commit c102e1e
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
`latitude_centers` and to invert decreasing latitude coordinate values.
* Introduced `xcube.core.normalize.cubify_dataset()` function to normalize a dataset
and finally assert the result complies to the [xcube dataset conventions](https://github.com/dcs4cop/xcube/blob/master/docs/source/cubespec.md).
* Fixed that data stores `directory` and `s3` were not able to handle data identifiers that they
had assigned themselves during `write_data()`. (#450)

## Changes in 0.8.1

Expand Down
98 changes: 97 additions & 1 deletion test/core/store/stores/test_directory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import os.path
import shutil
import unittest

from xcube.core.new import new_cube
from xcube.core.store import DataStoreError, DatasetDescriptor
from xcube.core.store import TYPE_SPECIFIER_CUBE
from xcube.core.store import TYPE_SPECIFIER_DATASET
Expand All @@ -13,7 +15,8 @@
class DirectoryDataStoreTest(unittest.TestCase):

def setUp(self) -> None:
base_dir = os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'examples', 'serve', 'demo')
base_dir = os.path.join(os.path.dirname(__file__), '..', '..', '..', '..',
'examples', 'serve', 'demo')
self._store = new_data_store('directory', base_dir=base_dir)
self.assertIsInstance(self.store, DirectoryDataStore)
# noinspection PyUnresolvedReferences
Expand Down Expand Up @@ -280,3 +283,96 @@ def test_get_filename_ext(self):

self.assertIsNone(self.store._get_filename_ext(None))
self.assertIsNone(self.store._get_filename_ext(DataStoreError('A nonsense object')))


class WritableDirectoryDataStoreTest(unittest.TestCase):

def setUp(self) -> None:
self._base_dir = os.path.join(os.path.dirname(__file__), 'dir_store_test')
# make sure self._base_dir is empty
if os.path.exists(self._base_dir):
shutil.rmtree(self._base_dir)
os.mkdir(self._base_dir)
self._store = new_data_store('directory', base_dir=self._base_dir)
self.assertIsInstance(self.store, DirectoryDataStore)
# noinspection PyUnresolvedReferences
self.assertEqual(self._base_dir, self._store.base_dir)
# noinspection PyUnresolvedReferences
self.assertEqual(False, self._store.read_only)

def tearDown(self) -> None:
for data_id in self.store.get_data_ids():
self.store.delete_data(data_id)
shutil.rmtree(self._base_dir)

@property
def store(self) -> DirectoryDataStore:
# noinspection PyTypeChecker
return self._store

def test_write_dataset_only_data(self):
cube = new_cube()
cube_id = self.store.write_data(cube)
self.assertIsNotNone(cube_id)
self.assertTrue(self.store.has_data(cube_id))
cube_from_store = self.store.open_data(cube_id)
self.assertIsNotNone(cube_from_store)

def test_write_dataset_data_id(self):
cube = new_cube()
cube_id = self.store.write_data(cube, data_id='newcube.nc')
self.assertEquals('newcube.nc', cube_id)
self.assertTrue(self.store.has_data(cube_id))
cube_from_store = self.store.open_data(cube_id)
self.assertIsNotNone(cube_from_store)

def test_write_dataset_data_id_without_extension(self):
cube = new_cube()
cube_id = self.store.write_data(cube, data_id='newcube')
self.assertEquals('newcube', cube_id)

def test_write_dataset_invalid_data_id(self):
cube = new_cube()
try:
self.store.write_data(cube, data_id='newcube.shp')
except DataStoreError as dse:
self.assertEqual('Data id "newcube.shp" is not suitable for data of type '
'"dataset[cube]".',
str(dse))

def test_write_dataset_writer_id(self):
cube = new_cube()
cube_id = self.store.write_data(cube, writer_id='dataset:netcdf:posix')
self.assertTrue(cube_id.endswith('.nc'))
self.assertTrue(self.store.has_data(cube_id))
cube_from_store = self.store.open_data(cube_id)
self.assertIsNotNone(cube_from_store)

def test_write_dataset_invalid_writer_id(self):
cube = new_cube()
try:
self.store.write_data(cube, writer_id='geodataframe:shapefile:posix')
except TypeError as te:
self.assertEqual("data must be an instance of "
"(<class 'geopandas.geodataframe.GeoDataFrame'>, "
"<class 'pandas.core.frame.DataFrame'>), was "
"<class 'xarray.core.dataset.Dataset'>", str(te))

def test_write_dataset_data_id_and_writer_id(self):
cube = new_cube()
cube_id = self.store.write_data(cube,
data_id='newcube.nc',
writer_id='dataset:netcdf:posix')
self.assertEquals('newcube.nc', cube_id)
self.assertTrue(self.store.has_data(cube_id))
cube_from_store = self.store.open_data(cube_id)
self.assertIsNotNone(cube_from_store)

def test_write_dataset_invalid_data_id_and_writer_id(self):
cube = new_cube()
try:
self.store.write_data(cube, data_id='newcube.nc', writer_id='dataset:zarr:posix')
except DataStoreError as dse:
self.assertEqual('Writer ID "dataset:zarr:posix" seems inappropriate for '
'data id "newcube.nc". Try writer id "dataset:netcdf:posix" instead.',
str(dse))
112 changes: 84 additions & 28 deletions test/core/store/stores/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,29 @@ def test_get_data_writer_ids(self):
self.store.get_data_writer_ids(type_specifier='dataset[cube]')
self.assertEqual("type_specifier must be one of ('dataset',)", f'{cm.exception}')

def test_data_registration(self):

class WritingToS3DataStoreTest(S3Test):

def setUp(self) -> None:
super().setUp()
self._store = new_data_store('s3',
aws_access_key_id='test_fake_id',
aws_secret_access_key='test_fake_secret',
bucket_name=BUCKET_NAME,
endpoint_url=MOTO_SERVER_ENDPOINT_URL)
self.assertIsInstance(self.store, S3DataStore)
self.store.s3.mkdir(BUCKET_NAME)

def tearDown(self) -> None:
for data_id in self.store.get_data_ids():
self.store.delete_data(data_id)

@property
def store(self) -> S3DataStore:
# noinspection PyTypeChecker
return self._store

def test_data_registration(self):
dataset = new_cube(variables=dict(a=4.1, b=7.4))
self.store.register_data(data_id='cube', data=dataset)
self.assertTrue(self.store.has_data(data_id='cube'))
Expand All @@ -141,7 +162,6 @@ def test_data_registration(self):
self.assertFalse(self.store.has_data(data_id='cube'))

def test_write_and_describe_data_from_registry(self):
self.store.s3.mkdir(BUCKET_NAME)
dataset_1 = new_cube(variables=dict(a=4.1, b=7.4))
self.store.write_data(dataset_1, data_id='cube-1.zarr')

Expand All @@ -161,22 +181,16 @@ def test_write_and_describe_data_from_registry(self):
json.dumps(d)

def test_write_and_get_type_specifiers_for_data(self):
self.store.s3.mkdir(BUCKET_NAME)
dataset_1 = new_cube(variables=dict(a=4.1, b=7.4))
self.store.write_data(dataset_1, data_id='cube-1.zarr')

type_specifiers = self.store.get_type_specifiers_for_data('cube-1.zarr')
self.assertEqual(1, len(type_specifiers))
self.assertEqual(('dataset',), type_specifiers)
self.assertIsInstance(type_specifiers[0], str)
from xcube.core.store import TypeSpecifier
TypeSpecifier.parse(type_specifiers[0])

@unittest.skip('Currently fails on appveyor but not locally, execute on demand only')
def test_write_and_has_data(self):
self.assertFalse(self.store.has_data('cube-1.zarr'))

self.store.s3.mkdir(BUCKET_NAME)
dataset_1 = new_cube(variables=dict(a=4.1, b=7.4))
self.store.write_data(dataset_1, data_id='cube-1.zarr')

Expand All @@ -185,32 +199,20 @@ def test_write_and_has_data(self):
self.assertFalse(self.store.has_data('cube-1.zarr', type_specifier='geodataframe'))
self.assertFalse(self.store.has_data('cube-2.zarr'))

d = data_descriptor.to_dict()
self.assertIsInstance(d, dict)
# Assert is JSON-serializable
json.dumps(d)

@unittest.skip('Currently fails on appveyor but not locally, execute on demand only')
def test_write_and_describe_data_from_zarr_describer(self):
self.store.s3.mkdir(BUCKET_NAME)
dataset_1 = new_cube(variables=dict(a=4.1, b=7.4))
self.store.write_data(dataset_1, data_id='cube-1.zarr')
self.store.deregister_data('cube-1.zarr')

data_descriptor = self.store.describe_data('cube-1.zarr')
self.assertIsInstance(data_descriptor, DatasetDescriptor)
self.assertEqual('cube-1.zarr', data_descriptor.data_id)
self.assertEqual(TYPE_SPECIFIER_DATASET, data_descriptor.type_specifier)
self.assertEqual((-90.0, -180.0, 90.0, 180.0), data_descriptor.bbox)
self.assertEqual(TYPE_SPECIFIER_CUBE, data_descriptor.type_specifier)
self.assertEqual((-180.0, -90.0, 180.0, 90.0), data_descriptor.bbox)
self.assertDictEqual(dict(bnds=2, lat=180, lon=360, time=5), data_descriptor.dims)
self.assertEqual(('2010-01-01T00:00:00', '2010-01-06T00:00:00'),
data_descriptor.time_range)
self.assertEqual(('2010-01-01', '2010-01-06'), data_descriptor.time_range)
self.assertEqual({'a', 'b'}, set(data_descriptor.data_vars.keys()))

@unittest.skip('Currently fails on appveyor but not locally, execute on demand only')
def test_write_and_read_and_delete(self):
self.store.s3.mkdir(BUCKET_NAME)

dataset_1 = new_cube(variables=dict(a=4.1, b=7.4))
dataset_2 = new_cube(variables=dict(c=5.2, d=8.5))
dataset_3 = new_cube(variables=dict(e=6.3, f=9.6))
Expand All @@ -224,9 +226,9 @@ def test_write_and_read_and_delete(self):
self.assertTrue(self.store.has_data('cube-2.zarr'))
self.assertTrue(self.store.has_data('cube-3.zarr'))

self.assertIn(('cube-1.zarr', None), set(self.store.get_data_ids()))
self.assertIn(('cube-2.zarr', None), set(self.store.get_data_ids()))
self.assertIn(('cube-3.zarr', None), set(self.store.get_data_ids()))
self.assertIn('cube-1.zarr', set(self.store.get_data_ids()))
self.assertIn('cube-2.zarr', set(self.store.get_data_ids()))
self.assertIn('cube-3.zarr', set(self.store.get_data_ids()))
self.assertEqual(3, len(set(self.store.get_data_ids())))

# Open the 3 written cubes
Expand Down Expand Up @@ -254,9 +256,63 @@ def test_write_and_read_and_delete(self):

# Try deleting cube 1
self.store.delete_data('cube-1.zarr')
self.assertEqual({('cube-2.zarr', None), ('cube-3.zarr', None)},
set(self.store.get_data_ids()))
self.assertEqual({'cube-2.zarr', 'cube-3.zarr'}, set(self.store.get_data_ids()))
self.assertFalse(self.store.has_data('cube-1.zarr'))

# Now it should be save to also write with replace=False
self.store.write_data(dataset_1, data_id='cube-1.zarr', replace=False)

def test_write_dataset_only_data(self):
cube = new_cube()
cube_id = self.store.write_data(cube)
self.assertIsNotNone(cube_id)
self.assertTrue(self.store.has_data(cube_id))
cube_from_store = self.store.open_data(cube_id)
self.assertIsNotNone(cube_from_store)

def test_write_dataset_data_id(self):
cube = new_cube()
cube_id = self.store.write_data(cube, data_id='newcube.zarr')
self.assertEquals('newcube.zarr', cube_id)
self.assertTrue(self.store.has_data(cube_id))
cube_from_store = self.store.open_data(cube_id)
self.assertIsNotNone(cube_from_store)

def test_write_dataset_data_id_without_extension(self):
cube = new_cube()
cube_id = self.store.write_data(cube, data_id='newcube')
self.assertEquals('newcube', cube_id)

def test_write_dataset_invalid_data_id(self):
cube = new_cube()
try:
self.store.write_data(cube, data_id='newcube.levels')
except DataStoreError as dse:
self.assertEqual('Data id "newcube.levels" is not suitable for data of type '
'"dataset[cube]".',
str(dse))

def test_write_dataset_writer_id(self):
cube = new_cube()
cube_id = self.store.write_data(cube, writer_id='dataset:zarr:s3')
self.assertTrue(cube_id.endswith('.zarr'))
self.assertTrue(self.store.has_data(cube_id))
cube_from_store = self.store.open_data(cube_id)
self.assertIsNotNone(cube_from_store)

def test_write_dataset_invalid_writer_id(self):
cube = new_cube()
try:
self.store.write_data(cube, writer_id='dataset:zarr:posix')
except DataStoreError as dse:
self.assertEqual('Invalid writer id "dataset:zarr:posix"', str(dse))

def test_write_dataset_data_id_and_writer_id(self):
cube = new_cube()
cube_id = self.store.write_data(cube,
data_id='newcube.zarr',
writer_id='dataset:zarr:s3')
self.assertEquals('newcube.zarr', cube_id)
self.assertTrue(self.store.has_data(cube_id))
cube_from_store = self.store.open_data(cube_id)
self.assertIsNotNone(cube_from_store)
55 changes: 49 additions & 6 deletions xcube/core/store/stores/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os.path
import uuid
from typing import Optional, Iterator, Any, Tuple, List, Dict, Union, Container
import warnings

import geopandas as gpd
import xarray as xr
Expand Down Expand Up @@ -63,6 +64,14 @@
'.geojson': (str(TYPE_SPECIFIER_GEODATAFRAME), 'geojson', _STORAGE_ID),
}

_FORMAT_TO_FILENAME_EXT = {
'zarr': '.zarr',
'levels': '.levels',
'netcdf': '.nc',
'shapefile': '.shp',
'geojson': '.geojson'
}


# TODO: write tests
# TODO: complete docs
Expand Down Expand Up @@ -231,11 +240,37 @@ def write_data(self,
replace: bool = False,
**write_params) -> str:
assert_instance(data, (xr.Dataset, MultiLevelDataset, gpd.GeoDataFrame))
if writer_id and writer_id.split(':')[2] != _STORAGE_ID:
raise DataStoreError(f'Invalid writer id "{writer_id}"')
if data_id:
accessor_id_parts = self._get_accessor_id_parts(data_id, require=False)
if accessor_id_parts:
data_type_specifier = get_type_specifier(data)
if not data_type_specifier.satisfies(accessor_id_parts[0]):
raise DataStoreError(f'Data id "{data_id}" is not suitable for data of type '
f'"{data_type_specifier}".')
predicate = get_data_accessor_predicate(type_specifier=accessor_id_parts[0],
format_id=accessor_id_parts[1],
storage_id=accessor_id_parts[2])
extensions = find_data_writer_extensions(predicate=predicate)
assert extensions
writer_id_from_data_id = extensions[0].name
if writer_id is not None:
# checking that a user-provided data id is suitable for the data type and
# requested format
if writer_id_from_data_id.split(':')[0] != writer_id.split(':')[0] or \
writer_id_from_data_id.split(':')[1] != writer_id.split(':')[1]:
raise DataStoreError(f'Writer ID "{writer_id}" seems inappropriate for '
f'data id "{data_id}". Try writer id '
f'"{writer_id_from_data_id}" instead.')
else:
writer_id = writer_id_from_data_id
if not writer_id:
if isinstance(data, MultiLevelDataset):
predicate = get_data_accessor_predicate(type_specifier=TYPE_SPECIFIER_MULTILEVEL_DATASET,
format_id='levels',
storage_id=_STORAGE_ID)
predicate = get_data_accessor_predicate(
type_specifier=TYPE_SPECIFIER_MULTILEVEL_DATASET,
format_id='levels',
storage_id=_STORAGE_ID)
elif isinstance(data, xr.Dataset):
predicate = get_data_accessor_predicate(type_specifier=TYPE_SPECIFIER_DATASET,
format_id='zarr',
Expand All @@ -249,7 +284,8 @@ def write_data(self,
extensions = find_data_writer_extensions(predicate=predicate)
assert extensions
writer_id = extensions[0].name
data_id = self._ensure_valid_data_id(data_id, data)
data_format = writer_id.split(':')[1]
data_id = self._ensure_valid_data_id(data_format, data_id)
path = self._resolve_data_id_to_path(data_id)
new_data_writer(writer_id).write_data(data, path, replace=replace, **write_params)
return data_id
Expand All @@ -272,8 +308,15 @@ def deregister_data(self, data_id: str):
# Implementation helpers

@classmethod
def _ensure_valid_data_id(cls, data_id: Optional[str], data: Any) -> str:
return data_id or str(uuid.uuid4()) + cls._get_filename_ext(data)
def _ensure_valid_data_id(cls, data_format: str, data_id: Optional[str]) -> str:
extension = _FORMAT_TO_FILENAME_EXT[data_format]
if data_id is not None:
if not data_id.endswith(extension):
warnings.warn(f'Data if "{data_id}" has no expected file extension.'
f'It will be written as "{data_format}".'
f'You may encounter issues with accessing the data from the store.')
return data_id
return str(uuid.uuid4()) + extension

def _assert_valid_data_id(self, data_id):
if not self.has_data(data_id):
Expand Down
Loading

0 comments on commit c102e1e

Please sign in to comment.