Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ Move generic hashing and storage utilities from lamindb into lamindb-setup #661

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ jobs:
repository: laminlabs/laminapp-ui
token: ${{ secrets.GH_TOKEN_DEPLOY_LAMINAPP }}
path: laminhub
ref: setup
ref: main
- uses: actions/setup-python@v4
with:
python-version: '3.10'
Expand Down
62 changes: 62 additions & 0 deletions lamindb_setup/core/hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Hashing.

.. autosummary::
:toctree: .

hash_set
hash_file

"""

import base64
import hashlib
from typing import List, Set, Tuple


def to_b64_str(bstr: bytes):
b64 = base64.urlsafe_b64encode(bstr).decode().strip("=")
return b64


def b16_to_b64(s: str):
return to_b64_str(base64.b16decode(s.strip('"'), casefold=True))


# a lot to read about this: lamin-notes/2022/hashing
def hash_set(s: Set[str]) -> str:
bstr = ":".join(sorted(s)).encode("utf-8")
# as we're truncating at 20 b64, we choose md5 over sha512
return to_b64_str(hashlib.md5(bstr).digest())[:20]


def hash_md5s_from_dir(etags: List[str]) -> Tuple[str, str]:
# need to sort below because we don't want the order of parsing the dir to
# affect the hash
digests = b"".join(
hashlib.md5(etag.encode("utf-8")).digest() for etag in sorted(etags)
)
digest = hashlib.md5(digests).digest()
return to_b64_str(digest)[:22], "md5-d"


def hash_file(file_path, chunk_size=50 * 1024 * 1024) -> Tuple[str, str]:
chunks = []
with open(file_path, "rb") as fp:
# read first chunk
chunks = [fp.read(chunk_size)]
# try reading the 2nd chunk
data = fp.read(chunk_size)
if data:
# go to end of file
fp.seek(-chunk_size, 2)
# read last chunk
data = fp.read(chunk_size)
chunks.append(data)
if len(chunks) == 1:
digest = hashlib.md5(chunks[0]).digest()
hash_type = "md5"
else:
digests = b"".join(hashlib.sha1(chunk).digest() for chunk in chunks)
digest = hashlib.sha1(digests).digest()
hash_type = "sha1-fl" # sha1 first last chunk
return to_b64_str(digest)[:22], hash_type
65 changes: 64 additions & 1 deletion lamindb_setup/core/upath.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from typing import Literal, Dict
import fsspec
from itertools import islice
from typing import Optional, Set, Any
from typing import Optional, Set, Any, Tuple
from collections import defaultdict
from dateutil.parser import isoparse # type: ignore
from lamin_utils import logger
from upath import UPath
from upath.implementations.cloud import CloudPath, S3Path # noqa # keep CloudPath!
from upath.implementations.local import LocalPath, PosixUPath, WindowsUPath
from .types import UPathStr
from .hashing import b16_to_b64, hash_md5s_from_dir

LocalPathClasses = (PosixUPath, WindowsUPath, LocalPath)

Expand Down Expand Up @@ -484,3 +485,65 @@ def create_path(path: UPath, access_token: Optional[str] = None) -> UPath:
secret=credentials["secret"],
token=credentials["token"],
)


def get_stat_file_cloud(stat: Dict) -> Tuple[int, str, str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need tests for this, @bpenteado!

In lamindb, this was covered, now, I fear, it's uncovered by CI.

My guess is that there were no fine-grained unit tests in lamindb for this, but coverage came through more high-level functionality.

Given this is in lamindb-setup now and the interfaces are used by laminhub and lamindb, we need a more fine-grained test that reflects needs in both downstream repos (they should be the same if we design the code well).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an issue for this: #665

size = stat["size"]
# small files
if "-" not in stat["ETag"]:
# only store hash for non-multipart uploads
# we can't rapidly validate multi-part uploaded files client-side
# we can add more logic later down-the-road
hash = b16_to_b64(stat["ETag"])
hash_type = "md5"
else:
stripped_etag, suffix = stat["ETag"].split("-")
suffix = suffix.strip('"')
hash = f"{b16_to_b64(stripped_etag)}-{suffix}"
hash_type = "md5-n" # this is the S3 chunk-hashing strategy
return size, hash, hash_type


def get_stat_dir_s3(path: UPath) -> Tuple[int, str, str, int]:
import boto3

if not AWS_CREDENTIALS_PRESENT:
# passing the following param directly to Session() doesn't
# work, unfortunately: botocore_session=path.fs.session
from botocore import UNSIGNED
from botocore.config import Config

config = Config(signature_version=UNSIGNED)
s3 = boto3.session.Session().resource("s3", config=config)
else:
s3 = boto3.session.Session().resource("s3")
bucket, key, _ = path.fs.split_path(path.as_posix())
# assuming this here is the fastest way of querying for many objects
objects = s3.Bucket(bucket).objects.filter(Prefix=key)
size = sum([object.size for object in objects])
md5s = [
# skip leading and trailing quotes
object.e_tag[1:-1]
for object in objects
]
n_objects = len(md5s)
hash, hash_type = hash_md5s_from_dir(md5s)
return size, hash, hash_type, n_objects


def get_stat_dir_gs(path: UPath) -> Tuple[int, str, str, int]:
import google.cloud.storage as gc_storage

bucket, key, _ = path.fs.split_path(path.as_posix())
# assuming this here is the fastest way of querying for many objects
client = gc_storage.Client(
credentials=path.fs.credentials.credentials, project=path.fs.project
)
objects = client.Bucket(bucket).list_blobs(prefix=key)
sizes, md5s = [], []
for object in objects:
sizes.append(object.size)
md5s.append(object.md5_hash)
n_objects = len(md5s)
hash, hash_type = hash_md5s_from_dir(md5s)
return sum(sizes), hash, hash_type, n_objects
35 changes: 35 additions & 0 deletions tests/test_hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import base64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fear these tests don't executed - see the noxfile. I'll take care of this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed here: #666

from pathlib import Path

from lamindb_setup.core.hashing import b16_to_b64, hash_file, to_b64_str


def test_compute_hash():
files = [
# file_content, hash, chunk_size, hash_type
("a", "DMF1ucDxtqgxw5niaXcmYQ", None, "md5"),
("b", "kutf_uauL-w61xx3dTFXjw", None, "md5"),
("abc", "kAFQmDzST7DWlj99KOF_cg", None, "md5"),
("a", "DMF1ucDxtqgxw5niaXcmYQ", 1, "md5"),
("b", "kutf_uauL-w61xx3dTFXjw", 1, "md5"),
# the last case here triggers multi-chunk compute with sha1
("abc", "p0EbDbQEP1wS-Tw6TuBjKS", 1, "sha1-fl"),
]
for content, hash, chunk_size, hash_type in files:
filepath = Path("file_1")
filepath.write_text(content)
computed_hash, computed_hash_type = hash_file(filepath, chunk_size=chunk_size)
assert computed_hash == hash
assert computed_hash_type == hash_type
filepath.unlink()


def test_base64():
mytest = b"test"
b64_str = to_b64_str(mytest)
b64_str_padded = f"{b64_str}=="
assert base64.urlsafe_b64decode(b64_str_padded.encode()).hex() == mytest.hex()


def test_b16_to_b64():
assert b16_to_b64("9b89c8c1acf79dba5b5341d1fff9806f") == "m4nIwaz3nbpbU0HR__mAbw"
Loading