Skip to content

Commit

Permalink
♻️ Move generic hashing and storage utilities from lamindb into `la…
Browse files Browse the repository at this point in the history
…mindb-setup` (#661)

* ♻️ Move generic hashing utilities into lamindb-setup

* ♻️ Move generic storage utilities from lamindb into lamindb-setup

* 🩹 Fix redundant import

* ✅ Move hashing tests into lamindb-setup

* 👷 Checkout main branch in laminapp-ui
  • Loading branch information
bpenteado authored Mar 6, 2024
1 parent a05d049 commit 27d761c
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 1 deletion.
62 changes: 62 additions & 0 deletions lamindb_setup/core/hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Hashing.
.. autosummary::
:toctree: .
hash_set
hash_file
"""

import base64
import hashlib
from typing import List, Set, Tuple


def to_b64_str(bstr: bytes):
b64 = base64.urlsafe_b64encode(bstr).decode().strip("=")
return b64


def b16_to_b64(s: str):
return to_b64_str(base64.b16decode(s.strip('"'), casefold=True))


# a lot to read about this: lamin-notes/2022/hashing
def hash_set(s: Set[str]) -> str:
bstr = ":".join(sorted(s)).encode("utf-8")
# as we're truncating at 20 b64, we choose md5 over sha512
return to_b64_str(hashlib.md5(bstr).digest())[:20]


def hash_md5s_from_dir(etags: List[str]) -> Tuple[str, str]:
# need to sort below because we don't want the order of parsing the dir to
# affect the hash
digests = b"".join(
hashlib.md5(etag.encode("utf-8")).digest() for etag in sorted(etags)
)
digest = hashlib.md5(digests).digest()
return to_b64_str(digest)[:22], "md5-d"


def hash_file(file_path, chunk_size=50 * 1024 * 1024) -> Tuple[str, str]:
chunks = []
with open(file_path, "rb") as fp:
# read first chunk
chunks = [fp.read(chunk_size)]
# try reading the 2nd chunk
data = fp.read(chunk_size)
if data:
# go to end of file
fp.seek(-chunk_size, 2)
# read last chunk
data = fp.read(chunk_size)
chunks.append(data)
if len(chunks) == 1:
digest = hashlib.md5(chunks[0]).digest()
hash_type = "md5"
else:
digests = b"".join(hashlib.sha1(chunk).digest() for chunk in chunks)
digest = hashlib.sha1(digests).digest()
hash_type = "sha1-fl" # sha1 first last chunk
return to_b64_str(digest)[:22], hash_type
65 changes: 64 additions & 1 deletion lamindb_setup/core/upath.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from typing import Literal, Dict
import fsspec
from itertools import islice
from typing import Optional, Set, Any
from typing import Optional, Set, Any, Tuple
from collections import defaultdict
from dateutil.parser import isoparse # type: ignore
from lamin_utils import logger
from upath import UPath
from upath.implementations.cloud import CloudPath, S3Path # noqa # keep CloudPath!
from upath.implementations.local import LocalPath, PosixUPath, WindowsUPath
from .types import UPathStr
from .hashing import b16_to_b64, hash_md5s_from_dir

LocalPathClasses = (PosixUPath, WindowsUPath, LocalPath)

Expand Down Expand Up @@ -484,3 +485,65 @@ def create_path(path: UPath, access_token: Optional[str] = None) -> UPath:
secret=credentials["secret"],
token=credentials["token"],
)


def get_stat_file_cloud(stat: Dict) -> Tuple[int, str, str]:
size = stat["size"]
# small files
if "-" not in stat["ETag"]:
# only store hash for non-multipart uploads
# we can't rapidly validate multi-part uploaded files client-side
# we can add more logic later down-the-road
hash = b16_to_b64(stat["ETag"])
hash_type = "md5"
else:
stripped_etag, suffix = stat["ETag"].split("-")
suffix = suffix.strip('"')
hash = f"{b16_to_b64(stripped_etag)}-{suffix}"
hash_type = "md5-n" # this is the S3 chunk-hashing strategy
return size, hash, hash_type


def get_stat_dir_s3(path: UPath) -> Tuple[int, str, str, int]:
import boto3

if not AWS_CREDENTIALS_PRESENT:
# passing the following param directly to Session() doesn't
# work, unfortunately: botocore_session=path.fs.session
from botocore import UNSIGNED
from botocore.config import Config

config = Config(signature_version=UNSIGNED)
s3 = boto3.session.Session().resource("s3", config=config)
else:
s3 = boto3.session.Session().resource("s3")
bucket, key, _ = path.fs.split_path(path.as_posix())
# assuming this here is the fastest way of querying for many objects
objects = s3.Bucket(bucket).objects.filter(Prefix=key)
size = sum([object.size for object in objects])
md5s = [
# skip leading and trailing quotes
object.e_tag[1:-1]
for object in objects
]
n_objects = len(md5s)
hash, hash_type = hash_md5s_from_dir(md5s)
return size, hash, hash_type, n_objects


def get_stat_dir_gs(path: UPath) -> Tuple[int, str, str, int]:
import google.cloud.storage as gc_storage

bucket, key, _ = path.fs.split_path(path.as_posix())
# assuming this here is the fastest way of querying for many objects
client = gc_storage.Client(
credentials=path.fs.credentials.credentials, project=path.fs.project
)
objects = client.Bucket(bucket).list_blobs(prefix=key)
sizes, md5s = [], []
for object in objects:
sizes.append(object.size)
md5s.append(object.md5_hash)
n_objects = len(md5s)
hash, hash_type = hash_md5s_from_dir(md5s)
return sum(sizes), hash, hash_type, n_objects
35 changes: 35 additions & 0 deletions tests/test_hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import base64
from pathlib import Path

from lamindb_setup.core.hashing import b16_to_b64, hash_file, to_b64_str


def test_compute_hash():
files = [
# file_content, hash, chunk_size, hash_type
("a", "DMF1ucDxtqgxw5niaXcmYQ", None, "md5"),
("b", "kutf_uauL-w61xx3dTFXjw", None, "md5"),
("abc", "kAFQmDzST7DWlj99KOF_cg", None, "md5"),
("a", "DMF1ucDxtqgxw5niaXcmYQ", 1, "md5"),
("b", "kutf_uauL-w61xx3dTFXjw", 1, "md5"),
# the last case here triggers multi-chunk compute with sha1
("abc", "p0EbDbQEP1wS-Tw6TuBjKS", 1, "sha1-fl"),
]
for content, hash, chunk_size, hash_type in files:
filepath = Path("file_1")
filepath.write_text(content)
computed_hash, computed_hash_type = hash_file(filepath, chunk_size=chunk_size)
assert computed_hash == hash
assert computed_hash_type == hash_type
filepath.unlink()


def test_base64():
mytest = b"test"
b64_str = to_b64_str(mytest)
b64_str_padded = f"{b64_str}=="
assert base64.urlsafe_b64decode(b64_str_padded.encode()).hex() == mytest.hex()


def test_b16_to_b64():
assert b16_to_b64("9b89c8c1acf79dba5b5341d1fff9806f") == "m4nIwaz3nbpbU0HR__mAbw"

0 comments on commit 27d761c

Please sign in to comment.