-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Manifest/Functional Store #427
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,22 @@ | ||
import json | ||
import re | ||
from collections.abc import ItemsView, Iterable, Iterator, KeysView, ValuesView | ||
from collections.abc import ( | ||
AsyncIterator, | ||
ItemsView, | ||
Iterable, | ||
Iterator, | ||
KeysView, | ||
ValuesView, | ||
) | ||
from pathlib import PosixPath | ||
from typing import Any, Callable, NewType, Tuple, TypedDict, cast | ||
from urllib.parse import urlparse, urlunparse | ||
|
||
import numpy as np | ||
import xarray as xr | ||
from fsspec.asyn import AsyncFileSystem | ||
from zarr.abc.store import RangeByteRequest, Store | ||
from zarr.core.buffer import Buffer, BufferPrototype | ||
|
||
from virtualizarr.types import ChunkKey | ||
|
||
|
@@ -54,6 +65,110 @@ | |
return ChunkEntry(path=path, offset=offset, length=length) | ||
|
||
|
||
class ManifestStore(Store): | ||
supports_writes: bool = False | ||
supports_deletes: bool = False | ||
supports_partial_writes: bool = False | ||
supports_listing: bool = True | ||
|
||
fs: AsyncFileSystem | ||
vds: xr.Dataset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically this could also be a |
||
|
||
def __init__(self, fs, vds, read_only=True): | ||
super().__init__(read_only=read_only) | ||
self.fs = fs | ||
self.vds = vds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if some of the variables in the dataset are not virtual? Do we have any need to support that case? |
||
|
||
async def clear(self) -> None: | ||
self.fs = None | ||
self.vds = None | ||
|
||
def __str__(self) -> str: | ||
return f"manifest://{id(self.vds)}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to build a custom repr using the |
||
|
||
def __repr__(self) -> str: | ||
return f"ManifestStore('{self}')" | ||
|
||
def __eq__(self, other: object) -> bool: | ||
return ( | ||
isinstance(other, type(self)) | ||
and self.fs == other.fs | ||
and xr.testing.assert_identical(self.vds, other.vds) | ||
) | ||
|
||
async def get( | ||
self, | ||
key: str, | ||
prototype: BufferPrototype, | ||
byte_range: None = None, # could this optionally accept a RangeByteRequest? | ||
) -> Buffer | None: | ||
if not self._is_open: | ||
await self._open() | ||
print("key: ", key) | ||
print("key split: ", key.split("/")) | ||
array_name, _, chunk_key = key.split("/") | ||
# TODO: is this the best way? | ||
url, offset, length = self.vds[array_name].data.manifest.dict()[chunk_key] | ||
Comment on lines
+110
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a reason to improve the API of |
||
value = prototype.buffer.from_bytes( | ||
await self.fs._cat_file( | ||
url, | ||
start=offset, | ||
end=offset + length, | ||
) | ||
) | ||
return value | ||
|
||
# TODO: need a get_v3_array_metadata method | ||
# to handle key="zarr.json" and return the metadata for the array | ||
|
||
async def get_partial_values( | ||
self, | ||
prototype: BufferPrototype, | ||
key_ranges: Iterable[tuple[str, RangeByteRequest | None]], | ||
) -> list[Buffer | None]: | ||
key_ranges = list(key_ranges) | ||
paths: list[str] = [] | ||
starts: list[int] = [] | ||
stops: list[int] = [] | ||
for key, _ in key_ranges: | ||
array_name, _, chunk_key = key.split("/") | ||
url, offset, length = self.vds[array_name].data.manifest.dict()[chunk_key] | ||
paths.append(url) | ||
starts.append(offset) | ||
stops.append(offset + length) | ||
res = await self.fs._cat_ranges(paths, starts, stops, on_error="return") | ||
return [prototype.buffer.from_bytes(r) for r in res] | ||
|
||
async def exists(self, key: str) -> bool: | ||
array_name, _, chunk_key = key.split("/") | ||
url, _, _ = self.vds[array_name].data.manifest.dict()[chunk_key] | ||
return await self.fs._exists(url) | ||
Comment on lines
+142
to
+145
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should instead look at the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But I think we need to check the path for that specific chunk. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I mean is that there is a convention in the It's the readers' job to find out if a chunk doesn't actually exist, so by the time we get here we should already know which chunks exist in storage and which don't without having to reach out to the storage to find out. |
||
|
||
async def list(self) -> AsyncIterator[str]: | ||
for array_name in self.vds.data_vars: | ||
for chunk_key in self.vds[array_name].data.manifest: | ||
yield f"{array_name}/{chunk_key}" | ||
|
||
async def list_prefix(self, prefix: str) -> AsyncIterator[str]: | ||
raise NotImplementedError | ||
|
||
async def list_dir(self, prefix: str) -> AsyncIterator[str]: | ||
raise NotImplementedError | ||
|
||
async def delete(self, key: str) -> None: | ||
raise NotImplementedError | ||
|
||
async def set( | ||
self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None | ||
) -> None: | ||
raise NotImplementedError | ||
|
||
async def set_partial_values( | ||
self, key_start_values: Iterable[tuple[str, int, bytes]] | ||
) -> None: | ||
raise NotImplementedError | ||
|
||
|
||
def validate_and_normalize_path_to_uri(path: str, fs_root: str | None = None) -> str: | ||
""" | ||
Makes all paths into fully-qualified absolute URIs, or raises. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class should be defined in it's own file, and potentially even made public (developer) API.