Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Allow to synchronize via st_size if st_mtime is not present in UPath.stat() #913

Merged
merged 14 commits into from
Dec 17, 2024
82 changes: 82 additions & 0 deletions docs/hub-prod/test-cloud-sync.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,37 @@
"assert settings.paths.cloud_to_local_no_update(dir_sync.as_posix(), cache_key=\"dir_cache/key\") == settings.cache_dir / \"dir_cache/key\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "eda84820",
"metadata": {},
"outputs": [],
"source": [
"# for http urls\n",
"http_path = UPath(\"https://raw.githubusercontent.com/laminlabs/lamindb-setup/refs/heads/main/README.md\")\n",
"assert http_path.protocol == \"https\"\n",
"\n",
"http_stat = http_path.stat()\n",
"assert http_stat.st_size != 0\n",
"assert http_stat.st_mtime == 0\n",
"assert http_stat.as_info()[\"type\"] == \"file\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "df6a9be4",
"metadata": {},
"outputs": [],
"source": [
"http_key = \"raw.githubusercontent.com/laminlabs/lamindb-setup/refs/heads/main/README.md\"\n",
"\n",
"assert settings.paths.cloud_to_local_no_update(http_path) == settings.cache_dir / http_key\n",
"assert settings.paths.cloud_to_local_no_update(str(http_path)) == settings.cache_dir / http_key\n",
"assert settings.paths.cloud_to_local_no_update(http_path, cache_key=\"check/README.md\") == settings.cache_dir / \"check/README.md\""
]
},
{
"cell_type": "markdown",
"id": "0b79f2f7",
Expand Down Expand Up @@ -191,6 +222,57 @@
"dir_sync_local.rmdir()"
]
},
{
"cell_type": "markdown",
"id": "d2246f90",
"metadata": {},
"source": [
"Test `cloud_to_local` for http"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bc1b7736",
"metadata": {},
"outputs": [],
"source": [
"http_local = settings.paths.cloud_to_local(http_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ca9f0ba8",
"metadata": {},
"outputs": [],
"source": [
"assert isinstance(http_local, LocalPathClasses)\n",
"assert http_local.stat().st_size == http_path.stat().st_size"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6ae7cd2b",
"metadata": {},
"outputs": [],
"source": [
"http_local_mtime = http_local.stat().st_mtime\n",
"# no changes here because the file exists already\n",
"assert settings.paths.cloud_to_local(http_path).stat().st_mtime == http_local_mtime"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9da41c21",
"metadata": {},
"outputs": [],
"source": [
"http_local.unlink()"
]
},
{
"cell_type": "markdown",
"id": "574c3f95",
Expand Down
11 changes: 8 additions & 3 deletions lamindb_setup/core/_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,14 @@ def cloud_to_local_no_update(
# cache_key is ignored if filepath is a local path
if not isinstance(filepath, LocalPathClasses):
# settings is defined further in this file
local_filepath = settings.cache_dir / (
filepath.path if cache_key is None else cache_key # type: ignore
)
if cache_key is None:
local_key = filepath.path # type: ignore
protocol = filepath.protocol # type: ignore
if protocol in {"http", "https"}:
local_key = local_key.removeprefix(protocol + "://")
else:
local_key = cache_key
local_filepath = settings.cache_dir / local_key
else:
local_filepath = filepath
return local_filepath
Expand Down
29 changes: 15 additions & 14 deletions lamindb_setup/core/_settings_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

import fsspec
from lamin_utils import logger

from ._aws_credentials import HOSTED_REGIONS, get_aws_credentials_manager
Expand All @@ -24,6 +25,10 @@

IS_INITIALIZED_KEY = ".lamindb/_is_initialized"

# a list of supported fsspec protocols
# rename file to local before showing to a user
VALID_PROTOCOLS = ("file", "gs", "s3", "hf", "http", "https")


def base62(n_char: int) -> str:
"""Like nanoid without hyphen and underscore."""
Expand Down Expand Up @@ -114,16 +119,11 @@ def init_storage(
root_str = f"s3://lamin-{region}/{uid}"
else:
root_str = f"s3://lamin-hosted-test/{uid}"
elif root_str.startswith(("gs://", "s3://", "hf://")):
pass
else: # local path
try:
_ = Path(root_str)
except Exception as e:
logger.error(
"`storage` is not a valid local, GCP storage, AWS S3 path or Hugging Face path"
)
raise e
elif (input_protocol := fsspec.utils.get_protocol(root_str)) not in VALID_PROTOCOLS:
valid_protocols = ("local",) + VALID_PROTOCOLS[1:] # show local instead of file
raise ValueError(
f"Protocol {input_protocol} is not supported, valid protocols are {', '.join(valid_protocols)}"
)
ssettings = StorageSettings(
uid=uid,
root=root_str,
Expand Down Expand Up @@ -227,7 +227,7 @@ def _mark_storage_root(self) -> UPath:

@property
def record(self) -> Any:
"""Storage record in current instance."""
"""Storage record in the current instance."""
if self._record is None:
# dynamic import because of import order
from lnschema_core.models import Storage
Expand Down Expand Up @@ -299,14 +299,15 @@ def region(self) -> str | None:
return self._region

@property
def type(self) -> Literal["local", "s3", "gs"]:
def type(self) -> Literal["local", "s3", "gs", "hf", "http", "https"]:
"""AWS S3 vs. Google Cloud vs. local.

Returns the protocol as a string: "local", "s3", "gs".
Returns the protocol as a string: "local", "s3", "gs", "http", "https".
"""
import fsspec

convert = {"file": "local"}
# init_storage checks that the root protocol belongs to VALID_PROTOCOLS
protocol = fsspec.utils.get_protocol(self.root_as_str)
return convert.get(protocol, protocol) # type: ignore

Expand Down Expand Up @@ -345,5 +346,5 @@ def key_to_filepath(self, filekey: UPathStr) -> UPath:
return self.root / filekey

def local_filepath(self, filekey: UPathStr) -> UPath:
"""Local (cache) filepath from filekey: `local(filepath(...))`."""
"""Local (cache) filepath from filekey."""
return self.cloud_to_local(self.key_to_filepath(filekey))
62 changes: 36 additions & 26 deletions lamindb_setup/core/upath.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@
pass

def update_relative_value(self, inc=1):
self.value += inc
if inc != 0:
self.value += inc
# this is specific to http filesystem
# for some reason the last update is 0 always
# here 100% is forced manually in this case
elif self.value >= 0.999:
self.value = self.size

Check warning on line 199 in lamindb_setup/core/upath.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/upath.py#L198-L199

Added lines #L198 - L199 were not covered by tests
self.call()

def branch(self, path_1, path_2, kwargs):
Expand Down Expand Up @@ -350,27 +356,19 @@
exists = True
cloud_mts = timestamp
else:
# hf requires special treatment
if protocol == "hf":
try:
stat_hf = self.stat().as_info()
is_dir = stat_hf["type"] == "directory"
exists = True
if not is_dir:
cloud_mts = stat_hf["last_commit"].date.timestamp()
except FileNotFoundError:
exists = False
else:
# perform only one network request to check existence, type and timestamp
try:
cloud_mts = self.modified.timestamp()
is_dir = False
exists = True
except FileNotFoundError:
exists = False
except IsADirectoryError:
is_dir = True
exists = True
try:
cloud_stat = self.stat()
cloud_info = cloud_stat.as_info()
exists = True
is_dir = cloud_info["type"] == "directory"
if not is_dir:
# hf requires special treatment
if protocol == "hf":
cloud_mts = cloud_info["last_commit"].date.timestamp()
else:
cloud_mts = cloud_stat.st_mtime
except FileNotFoundError:
exists = False

if not exists:
warn_or_error = f"The original path {self} does not exist anymore."
Expand All @@ -386,6 +384,7 @@
return None

# synchronization logic for directories
# to synchronize directories, it should be possible to get modification times
if is_dir:
files = self.fs.find(str(self), detail=True)
if protocol == "s3":
Expand Down Expand Up @@ -451,8 +450,16 @@
callback, print_progress, objectpath.name, "synchronizing"
)
if objectpath.exists():
local_mts_obj = objectpath.stat().st_mtime # type: ignore
need_synchronize = cloud_mts > local_mts_obj
if cloud_mts != 0:
local_mts_obj = objectpath.stat().st_mtime
need_synchronize = cloud_mts > local_mts_obj
else:
# this is true for http for example
# where size is present but st_mtime is not
# we assume that any change without the change in size is unlikely
cloud_size = cloud_stat.st_size
local_size_obj = objectpath.stat().st_size
need_synchronize = cloud_size != local_size_obj
else:
objectpath.parent.mkdir(parents=True, exist_ok=True)
need_synchronize = True
Expand All @@ -464,7 +471,8 @@
self.download_to(
objectpath, recursive=False, print_progress=False, callback=callback
)
os.utime(objectpath, times=(cloud_mts, cloud_mts))
if cloud_mts != 0:
os.utime(objectpath, times=(cloud_mts, cloud_mts))
else:
# nothing happens if parent_update is not defined
# because of Callback.no_op
Expand Down Expand Up @@ -739,7 +747,9 @@
hash = b16_to_b64(stat["blob_id"])
hash_type = "sha1"
# s3
elif "ETag" in stat:
# StorageClass is checked to be sure that it is indeed s3
# because http also has ETag
elif "ETag" in stat and "StorageClass" in stat:
etag = stat["ETag"]
# small files
if "-" not in etag:
Expand Down
6 changes: 6 additions & 0 deletions tests/hub-local/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,9 @@ def test_init_storage_with_non_existing_bucket(create_testadmin1_session):
)[0]
)
assert error.exconly().endswith("Not Found")


def test_init_storage_incorrect_protocol():
with pytest.raises(ValueError) as error:
init_storage_base("incorrect-protocol://some-path/some-path-level")
assert "Protocol incorrect-protocol is not supported" in error.exconly()
Loading