diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index da45237b4..5c112c44c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -128,7 +128,7 @@ jobs: repository: laminlabs/laminapp-ui token: ${{ secrets.GH_TOKEN_DEPLOY_LAMINAPP }} path: laminhub - ref: setup + ref: main - uses: actions/setup-python@v4 with: python-version: '3.10' diff --git a/lamindb_setup/core/hashing.py b/lamindb_setup/core/hashing.py new file mode 100644 index 000000000..0819252be --- /dev/null +++ b/lamindb_setup/core/hashing.py @@ -0,0 +1,62 @@ +"""Hashing. + +.. autosummary:: + :toctree: . + + hash_set + hash_file + +""" + +import base64 +import hashlib +from typing import List, Set, Tuple + + +def to_b64_str(bstr: bytes): + b64 = base64.urlsafe_b64encode(bstr).decode().strip("=") + return b64 + + +def b16_to_b64(s: str): + return to_b64_str(base64.b16decode(s.strip('"'), casefold=True)) + + +# a lot to read about this: lamin-notes/2022/hashing +def hash_set(s: Set[str]) -> str: + bstr = ":".join(sorted(s)).encode("utf-8") + # as we're truncating at 20 b64, we choose md5 over sha512 + return to_b64_str(hashlib.md5(bstr).digest())[:20] + + +def hash_md5s_from_dir(etags: List[str]) -> Tuple[str, str]: + # need to sort below because we don't want the order of parsing the dir to + # affect the hash + digests = b"".join( + hashlib.md5(etag.encode("utf-8")).digest() for etag in sorted(etags) + ) + digest = hashlib.md5(digests).digest() + return to_b64_str(digest)[:22], "md5-d" + + +def hash_file(file_path, chunk_size=50 * 1024 * 1024) -> Tuple[str, str]: + chunks = [] + with open(file_path, "rb") as fp: + # read first chunk + chunks = [fp.read(chunk_size)] + # try reading the 2nd chunk + data = fp.read(chunk_size) + if data: + # go to end of file + fp.seek(-chunk_size, 2) + # read last chunk + data = fp.read(chunk_size) + chunks.append(data) + if len(chunks) == 1: + digest = hashlib.md5(chunks[0]).digest() + hash_type = "md5" + else: + digests = b"".join(hashlib.sha1(chunk).digest() for chunk in chunks) + digest = hashlib.sha1(digests).digest() + hash_type = "sha1-fl" # sha1 first last chunk + return to_b64_str(digest)[:22], hash_type diff --git a/lamindb_setup/core/upath.py b/lamindb_setup/core/upath.py index ae4b371de..0d0441cfb 100644 --- a/lamindb_setup/core/upath.py +++ b/lamindb_setup/core/upath.py @@ -9,7 +9,7 @@ from typing import Literal, Dict import fsspec from itertools import islice -from typing import Optional, Set, Any +from typing import Optional, Set, Any, Tuple from collections import defaultdict from dateutil.parser import isoparse # type: ignore from lamin_utils import logger @@ -17,6 +17,7 @@ from upath.implementations.cloud import CloudPath, S3Path # noqa # keep CloudPath! from upath.implementations.local import LocalPath, PosixUPath, WindowsUPath from .types import UPathStr +from .hashing import b16_to_b64, hash_md5s_from_dir LocalPathClasses = (PosixUPath, WindowsUPath, LocalPath) @@ -484,3 +485,65 @@ def create_path(path: UPath, access_token: Optional[str] = None) -> UPath: secret=credentials["secret"], token=credentials["token"], ) + + +def get_stat_file_cloud(stat: Dict) -> Tuple[int, str, str]: + size = stat["size"] + # small files + if "-" not in stat["ETag"]: + # only store hash for non-multipart uploads + # we can't rapidly validate multi-part uploaded files client-side + # we can add more logic later down-the-road + hash = b16_to_b64(stat["ETag"]) + hash_type = "md5" + else: + stripped_etag, suffix = stat["ETag"].split("-") + suffix = suffix.strip('"') + hash = f"{b16_to_b64(stripped_etag)}-{suffix}" + hash_type = "md5-n" # this is the S3 chunk-hashing strategy + return size, hash, hash_type + + +def get_stat_dir_s3(path: UPath) -> Tuple[int, str, str, int]: + import boto3 + + if not AWS_CREDENTIALS_PRESENT: + # passing the following param directly to Session() doesn't + # work, unfortunately: botocore_session=path.fs.session + from botocore import UNSIGNED + from botocore.config import Config + + config = Config(signature_version=UNSIGNED) + s3 = boto3.session.Session().resource("s3", config=config) + else: + s3 = boto3.session.Session().resource("s3") + bucket, key, _ = path.fs.split_path(path.as_posix()) + # assuming this here is the fastest way of querying for many objects + objects = s3.Bucket(bucket).objects.filter(Prefix=key) + size = sum([object.size for object in objects]) + md5s = [ + # skip leading and trailing quotes + object.e_tag[1:-1] + for object in objects + ] + n_objects = len(md5s) + hash, hash_type = hash_md5s_from_dir(md5s) + return size, hash, hash_type, n_objects + + +def get_stat_dir_gs(path: UPath) -> Tuple[int, str, str, int]: + import google.cloud.storage as gc_storage + + bucket, key, _ = path.fs.split_path(path.as_posix()) + # assuming this here is the fastest way of querying for many objects + client = gc_storage.Client( + credentials=path.fs.credentials.credentials, project=path.fs.project + ) + objects = client.Bucket(bucket).list_blobs(prefix=key) + sizes, md5s = [], [] + for object in objects: + sizes.append(object.size) + md5s.append(object.md5_hash) + n_objects = len(md5s) + hash, hash_type = hash_md5s_from_dir(md5s) + return sum(sizes), hash, hash_type, n_objects diff --git a/tests/test_hashing.py b/tests/test_hashing.py new file mode 100644 index 000000000..d2688ce6b --- /dev/null +++ b/tests/test_hashing.py @@ -0,0 +1,35 @@ +import base64 +from pathlib import Path + +from lamindb_setup.core.hashing import b16_to_b64, hash_file, to_b64_str + + +def test_compute_hash(): + files = [ + # file_content, hash, chunk_size, hash_type + ("a", "DMF1ucDxtqgxw5niaXcmYQ", None, "md5"), + ("b", "kutf_uauL-w61xx3dTFXjw", None, "md5"), + ("abc", "kAFQmDzST7DWlj99KOF_cg", None, "md5"), + ("a", "DMF1ucDxtqgxw5niaXcmYQ", 1, "md5"), + ("b", "kutf_uauL-w61xx3dTFXjw", 1, "md5"), + # the last case here triggers multi-chunk compute with sha1 + ("abc", "p0EbDbQEP1wS-Tw6TuBjKS", 1, "sha1-fl"), + ] + for content, hash, chunk_size, hash_type in files: + filepath = Path("file_1") + filepath.write_text(content) + computed_hash, computed_hash_type = hash_file(filepath, chunk_size=chunk_size) + assert computed_hash == hash + assert computed_hash_type == hash_type + filepath.unlink() + + +def test_base64(): + mytest = b"test" + b64_str = to_b64_str(mytest) + b64_str_padded = f"{b64_str}==" + assert base64.urlsafe_b64decode(b64_str_padded.encode()).hex() == mytest.hex() + + +def test_b16_to_b64(): + assert b16_to_b64("9b89c8c1acf79dba5b5341d1fff9806f") == "m4nIwaz3nbpbU0HR__mAbw"