Skip to content

Commit

Permalink
Merge pull request #124 from bento-platform/refact/remove-bundles
Browse files Browse the repository at this point in the history
refact!: remove bundles (deprecated part of spec)
  • Loading branch information
davidlougheed authored Aug 20, 2024
2 parents 46a3cf5 + 654976d commit 568d202
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 289 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ or inside a MinIO instance (which is a s3-like software).
## TODO / Future considerations

- Ingesting is either through the command line or by the endpoint of the same name
(which will create a single object). There is currently no way to ingest an archive
or to package objects into bundles.
(which will create a single object).
- Consider how to be aware of http vs https depending on the deployment setup
(in singularity, docker, as is).

Expand Down
50 changes: 5 additions & 45 deletions chord_drs/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,11 @@
from flask.cli import with_appcontext

from .db import db
from .models import DrsBlob, DrsBundle


def create_drs_bundle(
location: str,
parent: DrsBundle | None = None,
project_id: str | None = None,
dataset_id: str | None = None,
data_type: str | None = None,
exclude: frozenset[str] = frozenset({}),
) -> DrsBundle:
perms_kwargs = {"project_id": project_id, "dataset_id": dataset_id, "data_type": data_type}

bundle = DrsBundle(name=os.path.basename(location), **perms_kwargs)

if parent:
bundle.parent_bundle = parent

for f in os.listdir(location):
if exclude and f in exclude:
continue

f = os.path.abspath(os.path.join(location, f))

if os.path.isfile(f):
create_drs_blob(f, parent=bundle, **perms_kwargs)
else:
create_drs_bundle(f, parent=bundle, **perms_kwargs)

bundle.update_checksum_and_size()
db.session.add(bundle)

current_app.logger.info(f"Created a new bundle, name: {bundle.name}, ID : {bundle.id}, size: {bundle.size}")

return bundle
from .models import DrsBlob


def create_drs_blob(
location: str,
parent: DrsBundle | None = None,
project_id: str | None = None,
dataset_id: str | None = None,
data_type: str | None = None,
Expand All @@ -58,9 +23,6 @@ def create_drs_blob(
data_type=data_type,
)

if parent:
drs_blob.bundle = parent

db.session.add(drs_blob)

current_app.logger.info(f"Created a new blob, filename: {drs_blob.location} ID : {drs_blob.id}")
Expand All @@ -82,18 +44,16 @@ def ingest(source: str, project: str, dataset: str, data_type: str) -> None:

current_app.logger.setLevel(logging.INFO)
# TODO: ingestion for remote files or archives
# TODO: Create directories in minio when ingesting a bundle

if not os.path.exists(source):
raise ClickException("File or directory provided does not exist")
raise ClickException("Path provided does not exist")

source = os.path.abspath(source)

perms_kwargs = {"project_id": project or None, "dataset_id": dataset or None, "data_type": data_type or None}

if os.path.isfile(source):
create_drs_blob(source, **perms_kwargs)
else:
create_drs_bundle(source, **perms_kwargs)
if not os.path.isfile(source):
raise ClickException("Directories cannot be ingested")

create_drs_blob(source, **perms_kwargs)
db.session.commit()
5 changes: 3 additions & 2 deletions chord_drs/constants.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from bento_lib.service_info.helpers import build_service_type
from chord_drs import __version__

__all__ = [
"BENTO_SERVICE_KIND",
"SERVICE_NAME",
"SERVICE_ARTIFACT",
"DRS_SPEC_VERSION",
"SERVICE_TYPE",
]

BENTO_SERVICE_KIND = "drs"
SERVICE_NAME = "Bento Data Repository Service"
SERVICE_ARTIFACT = BENTO_SERVICE_KIND
SERVICE_TYPE = build_service_type("ca.c3g.chord", SERVICE_ARTIFACT, __version__)
DRS_SPEC_VERSION = "1.4.0" # update to match whatever version of the DRS spec is implemented.
SERVICE_TYPE = build_service_type("org.ga4gh", SERVICE_ARTIFACT, DRS_SPEC_VERSION)
59 changes: 59 additions & 0 deletions chord_drs/migrations/versions/5e982af5cde4_remove_bundles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""remove bundles
Revision ID: 5e982af5cde4
Revises: dcd501398d46
Create Date: 2024-07-17 19:54:03.564099
"""

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "5e982af5cde4"
down_revision = "dcd501398d46"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###

with op.batch_alter_table("drs_object", schema=None) as batch_op:
batch_op.drop_constraint(None, type_="foreignkey")
batch_op.drop_column("bundle_id")

op.drop_table("drs_bundle")

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###

op.create_table(
"drs_bundle",
sa.Column("id", sa.VARCHAR(), nullable=False),
sa.Column("created", sa.DATETIME(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=True),
sa.Column("checksum", sa.VARCHAR(length=64), nullable=False),
sa.Column("size", sa.INTEGER(), nullable=True),
sa.Column("name", sa.VARCHAR(length=250), nullable=True),
sa.Column("description", sa.VARCHAR(length=1000), nullable=True),
sa.Column("parent_bundle_id", sa.INTEGER(), nullable=True),
sa.Column("project_id", sa.VARCHAR(length=64), nullable=True),
sa.Column("dataset_id", sa.VARCHAR(length=64), nullable=True),
sa.Column("data_type", sa.VARCHAR(length=24), nullable=True),
sa.Column("public", sa.BOOLEAN(), server_default=sa.text("0"), nullable=False),
sa.ForeignKeyConstraint(
["parent_bundle_id"],
["drs_bundle.id"],
),
sa.PrimaryKeyConstraint("id"),
)

with op.batch_alter_table("drs_object", schema=None) as batch_op:
batch_op.add_column(sa.Column("bundle_id", sa.INTEGER(), nullable=True))
batch_op.create_foreign_key(None, "drs_bundle", ["bundle_id"], ["id"])

# ### end Alembic commands ###
51 changes: 7 additions & 44 deletions chord_drs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,30 @@
"Base",
"DrsMixin",
"DrsBlob",
"DrsBundle",
]

Base = declarative_base()


class DrsMixin:
# IDs (PKs) must remain outside the mixin!
class DrsBlob(Base):
__tablename__ = "drs_object"

id = Column(String, primary_key=True)
location = Column(String(500), nullable=False)

created = Column(DateTime, server_default=func.now())
checksum = Column(String(64), nullable=False)
size = Column(Integer, default=0)
name = Column(String(250), nullable=True)
description = Column(String(1000), nullable=True)

# Permissions/Bento-specific project & dataset tagging for DRS items
# TODO: Make some of these not nullable in the future:
project_id = Column(String(64), nullable=True) # Nullable for backwards-compatibility
dataset_id = Column(String(64), nullable=True) # Nullable for backwards-compatibility / project-only stuff?
data_type = Column(String(24), nullable=True) # NULL if multi-data type or something else
public = Column(Boolean, default=False, nullable=False) # If true, the object is accessible by anyone


class DrsBundle(Base, DrsMixin):
__tablename__ = "drs_bundle"

id = Column(String, primary_key=True)
parent_bundle_id = Column(Integer, ForeignKey("drs_bundle.id"))
parent_bundle = relationship("DrsBundle", remote_side=[id])
objects = relationship("DrsBlob", cascade="all, delete-orphan", backref="bundle")

def __init__(self, *args, **kwargs):
self.id = str(uuid4())
super().__init__(*args, **kwargs)

def update_checksum_and_size(self):
# For bundle checksumming logic, see the `checksums` field in
# https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.3.0/docs/#tag/DrsObjectModel

checksums = []
total_size = 0

for obj in self.objects:
total_size += obj.size
checksums.append(obj.checksum)

checksums.sort()
concat_checksums = "".join(checksums)

hash_obj = sha256()
hash_obj.update(concat_checksums.encode())

self.checksum = hash_obj.hexdigest()
self.size = total_size


class DrsBlob(Base, DrsMixin):
__tablename__ = "drs_object"

id = Column(String, primary_key=True)
bundle_id = Column(Integer, ForeignKey(DrsBundle.id), nullable=True)
location = Column(String(500), nullable=False)

def __init__(self, *args, **kwargs):
logger = current_app.logger

Expand Down
66 changes: 21 additions & 45 deletions chord_drs/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from .backend import get_backend
from .constants import BENTO_SERVICE_KIND, SERVICE_NAME, SERVICE_TYPE
from .db import db
from .models import DrsBlob, DrsBundle
from .serialization import build_bundle_json, build_blob_json
from .models import DrsBlob
from .serialization import build_blob_json
from .utils import drs_file_checksum


Expand Down Expand Up @@ -65,7 +65,7 @@ def _post_headers_getter(r: Request) -> dict[str, str]:


def check_objects_permission(
drs_objs: list[DrsBlob | DrsBundle], permission: Permission, mark_authz_done: bool = False
drs_objs: list[DrsBlob], permission: Permission, mark_authz_done: bool = False
) -> tuple[bool, ...]:
if not authz_enabled():
return tuple([True] * len(drs_objs)) # Assume we have permission for everything if authz disabled
Expand All @@ -87,10 +87,10 @@ def check_objects_permission(
) # now a tuple of length len(drs_objs) of whether we have the permission for each object


def fetch_and_check_object_permissions(object_id: str, permission: Permission) -> tuple[DrsBlob | DrsBundle, bool]:
def fetch_and_check_object_permissions(object_id: str, permission: Permission) -> DrsBlob:
has_permission_on_everything = check_everything_permission(permission)

drs_object, is_bundle = get_drs_object(object_id)
drs_object = get_drs_object(object_id)

if not drs_object:
authz_middleware.mark_authz_done(request)
Expand All @@ -108,7 +108,7 @@ def fetch_and_check_object_permissions(object_id: str, permission: Permission) -
raise forbidden()
# -------------------------------------------------------------------

return drs_object, is_bundle
return drs_object


def bad_request_log_mark(err: str) -> BadRequest:
Expand Down Expand Up @@ -149,35 +149,25 @@ def service_info():
)


def get_drs_object(object_id: str) -> tuple[DrsBlob | DrsBundle | None, bool]:
if drs_bundle := DrsBundle.query.filter_by(id=object_id).first():
return drs_bundle, True

# Only try hitting the database for an object if no bundle was found
if drs_blob := DrsBlob.query.filter_by(id=object_id).first():
return drs_blob, False

return None, False
def get_drs_object(object_id: str) -> DrsBlob | None:
return DrsBlob.query.filter_by(id=object_id).first()


def delete_drs_object(object_id: str, logger: logging.Logger):
drs_object, is_bundle = fetch_and_check_object_permissions(object_id, P_DELETE_DATA)
drs_object = fetch_and_check_object_permissions(object_id, P_DELETE_DATA)

logger.info(f"Deleting object {drs_object.id}")

if not is_bundle:
q = DrsBlob.query.filter_by(location=drs_object.location)
n_using_file = q.count()
if n_using_file == 1 and q.first().id == drs_object.id:
# If this object is the only one using the file, delete the file too
# TODO: this can create a race condition and leave files undeleted... should we have a cleanup on start?
logger.info(
f"Deleting file at {drs_object.location}, since {drs_object.id} is the only object referring to it."
)
backend = get_backend()
backend.delete(drs_object.location)

# Don't bother with additional bundle deleting logic, they'll be removed soon anyway. TODO
q = DrsBlob.query.filter_by(location=drs_object.location)
n_using_file = q.count()
if n_using_file == 1 and q.first().id == drs_object.id:
# If this object is the only one using the file, delete the file too
# TODO: this can create a race condition and leave files undeleted... should we have a cleanup on start?
logger.info(
f"Deleting file at {drs_object.location}, since {drs_object.id} is the only object referring to it."
)
backend = get_backend()
backend.delete(drs_object.location)

db.session.delete(drs_object)
db.session.commit()
Expand All @@ -190,15 +180,11 @@ def object_info(object_id: str):
delete_drs_object(object_id, current_app.logger)
return current_app.response_class(status=204)

drs_object, is_bundle = fetch_and_check_object_permissions(object_id, P_QUERY_DATA)
drs_object = fetch_and_check_object_permissions(object_id, P_QUERY_DATA)

# The requester can ask for additional, non-spec-compliant Bento properties to be included in the response
with_bento_properties: bool = str_to_bool(request.args.get("with_bento_properties", ""))

if is_bundle:
expand: bool = str_to_bool(request.args.get("expand", ""))
return jsonify(build_bundle_json(drs_object, expand=expand, with_bento_properties=with_bento_properties))

# The requester can specify object internal path to be added to the response
use_internal_path: bool = str_to_bool(request.args.get("internal_path", ""))

Expand All @@ -222,8 +208,6 @@ def object_access(object_id: str, access_id: str):

@drs_service.route("/search", methods=["GET"])
def object_search():
# TODO: Enable search for bundles too

response = []

name: str | None = request.args.get("name")
Expand Down Expand Up @@ -262,12 +246,7 @@ def object_search():
def object_download(object_id: str):
logger = current_app.logger

# TODO: Bundle download

drs_object, is_bundle = fetch_and_check_object_permissions(object_id, P_DOWNLOAD_DATA)

if is_bundle:
raise BadRequest("Bundle download is currently unsupported")
drs_object = fetch_and_check_object_permissions(object_id, P_DOWNLOAD_DATA)

obj_name = drs_object.name
minio_obj = drs_object.return_minio_object()
Expand Down Expand Up @@ -336,9 +315,6 @@ def generate_bytes():

@drs_service.route("/ingest", methods=["POST"])
def object_ingest():
# TODO: Enable specifying a parent bundle
# TODO: If a parent is specified, make sure we have permissions to ingest into it? How to reconcile?

logger = current_app.logger
data = request.form or {}

Expand Down
Loading

0 comments on commit 568d202

Please sign in to comment.