Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DRS object deletion #108

Merged
merged 11 commits into from
Apr 17, 2024
2 changes: 1 addition & 1 deletion chord_drs/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
def _get_backend() -> Backend | None:
# Instantiate backend if needed
backend_class = DATA_SOURCE_BACKENDS.get(current_app.config["SERVICE_DATA_SOURCE"])
return backend_class() if backend_class else None
return backend_class(current_app.config) if backend_class else None


def get_backend() -> Backend | None:
Expand Down
18 changes: 9 additions & 9 deletions chord_drs/backends/base.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from abc import ABC, abstractmethod


__all__ = ["Backend", "FakeBackend"]
__all__ = ["Backend"]


# noinspection PyUnusedLocal
class Backend(ABC):
@abstractmethod
def save(self, current_location: str, filename: str) -> str: # pragma: no cover
def __init__(self, config: dict): # pragma: no cover
pass

@abstractmethod
def save(self, current_location: str, filename: str) -> str: # pragma: no cover
pass

class FakeBackend(Backend):
"""
For the tests
"""

def save(self, current_location: str, filename: str) -> str:
return current_location
@abstractmethod
def delete(self, location: str) -> None: # pragma: no cover
pass
14 changes: 10 additions & 4 deletions chord_drs/backends/local.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from shutil import copy
from flask import current_app
from pathlib import Path

from .base import Backend
Expand All @@ -15,12 +14,19 @@ class LocalBackend(Backend):
specified by the DATA var env, the default being in ~/chord_drs_data
"""

def __init__(self):
self.base_location = Path(current_app.config["SERVICE_DATA"])
def __init__(self, config: dict): # config is dict or flask.Config, which is a subclass of dict.
self.base_location = Path(config["SERVICE_DATA"])
# We can use mkdir, since resolve has been called in config.py
self.base_location.mkdir(exist_ok=True)
self.base_location.mkdir(parents=True, exist_ok=True)

def save(self, current_location: str | Path, filename: str) -> str:
new_location = self.base_location / filename
copy(current_location, new_location)
return str(new_location.resolve())

def delete(self, location: str | Path) -> None:
loc = location if isinstance(location, Path) else Path(location)
if self.base_location in loc.parents:
loc.unlink()
return
raise ValueError(f"Location {loc} is not a subpath of backend base location {self.base_location}")
30 changes: 18 additions & 12 deletions chord_drs/backends/minio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import boto3

from flask import current_app
from urllib.parse import urlparse

from .base import Backend
Expand All @@ -10,26 +9,33 @@


class MinioBackend(Backend):
def __init__(self, resource=None):
def __init__(self, config: dict, resource=None): # config is dict or flask.Config, which is a subclass of dict.
self._minio_url = config["MINIO_URL"]

self.minio = resource or boto3.resource(
"s3",
endpoint_url=current_app.config["MINIO_URL"],
aws_access_key_id=current_app.config["MINIO_USERNAME"],
aws_secret_access_key=current_app.config["MINIO_PASSWORD"],
endpoint_url=self._minio_url,
aws_access_key_id=config["MINIO_USERNAME"],
aws_secret_access_key=config["MINIO_PASSWORD"],
)

self.bucket = self.minio.Bucket(current_app.config["MINIO_BUCKET"])
self.bucket = self.minio.Bucket(config["MINIO_BUCKET"])

@staticmethod
def build_minio_location(obj):
host = urlparse(current_app.config["MINIO_URL"]).netloc
def build_minio_location(self, obj):
host = urlparse(self._minio_url).netloc
return f"s3://{host}/{obj.bucket_name}/{obj.key}"

def get_minio_object(self, location: str):
obj = self.bucket.Object(location.split("/")[-1])
return obj.get()
return self.bucket.Object(location.split("/")[-1])

def get_minio_object_dict(self, location: str) -> dict:
return self.get_minio_object(location).get()

def save(self, current_location: str, filename: str) -> str:
with open(current_location, "rb") as f:
obj = self.bucket.put_object(Key=filename, Body=f)
return MinioBackend.build_minio_location(obj)
return self.build_minio_location(obj)

def delete(self, location: str) -> None:
obj = self.get_minio_object(location)
obj.delete()
4 changes: 2 additions & 2 deletions chord_drs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __init__(self, *args, **kwargs):

super().__init__(*args, **kwargs)

def return_minio_object(self):
def return_minio_object(self) -> dict:
parsed_url = urlparse(self.location)

if parsed_url.scheme != "s3":
Expand All @@ -139,4 +139,4 @@ def return_minio_object(self):
if not backend or not isinstance(backend, MinioBackend):
raise Exception("The backend for this instance is not properly configured.")

return backend.get_minio_object(self.location)
return backend.get_minio_object_dict(self.location)
180 changes: 37 additions & 143 deletions chord_drs/routes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import os
import re
import tempfile
import urllib.parse

from asgiref.sync import async_to_sync
from bento_lib.auth.permissions import Permission, P_INGEST_DATA, P_QUERY_DATA, P_DOWNLOAD_DATA
from bento_lib.auth.permissions import Permission, P_INGEST_DATA, P_QUERY_DATA, P_DELETE_DATA, P_DOWNLOAD_DATA
from bento_lib.auth.resources import RESOURCE_EVERYTHING, build_resource
from bento_lib.service_info.constants import SERVICE_ORGANIZATION_C3G
from bento_lib.service_info.helpers import build_service_info
Expand All @@ -13,22 +14,20 @@
Request,
current_app,
jsonify,
url_for,
request,
send_file,
make_response,
)
from sqlalchemy import or_
from urllib.parse import urlparse
from werkzeug.exceptions import BadRequest, Forbidden, NotFound, InternalServerError, RequestedRangeNotSatisfiable

from . import __version__
from .authz import authz_middleware
from .backend import get_backend
from .constants import BENTO_SERVICE_KIND, SERVICE_NAME, SERVICE_TYPE
from .data_sources import DATA_SOURCE_LOCAL, DATA_SOURCE_MINIO
from .db import db
from .models import DrsMixin, DrsBlob, DrsBundle
from .types import DRSAccessMethodDict, DRSContentsDict, DRSObjectBentoDict, DRSObjectDict
from .models import DrsBlob, DrsBundle
from .serialization import build_bundle_json, build_blob_json
from .utils import drs_file_checksum


Expand Down Expand Up @@ -84,18 +83,18 @@ def _post_headers_getter(r: Request) -> dict[str, str]:


def fetch_and_check_object_permissions(object_id: str, permission: Permission) -> tuple[DrsBlob | DrsBundle, bool]:
view_data_everything = check_everything_permission(permission)
has_permission_on_everything = check_everything_permission(permission)

drs_object, is_bundle = get_drs_object(object_id)

if not drs_object:
authz_middleware.mark_authz_done(request)
if authz_enabled() and not view_data_everything: # Don't leak if this object exists
if authz_enabled() and not has_permission_on_everything: # Don't leak if this object exists
raise forbidden()
raise NotFound("No object found for this ID")

# Check permissions -------------------------------------------------
if view_data_everything:
if has_permission_on_everything:
# Good to go already!
authz_middleware.mark_authz_done(request)
else:
Expand All @@ -119,138 +118,6 @@ def range_not_satisfiable_log_mark(description: str, length: int) -> RequestedRa
return RequestedRangeNotSatisfiable(description=description, length=length)


def get_drs_host() -> str:
return urlparse(current_app.config["SERVICE_BASE_URL"]).netloc


def create_drs_uri(object_id: str) -> str:
return f"drs://{get_drs_host()}/{object_id}"


def build_contents(bundle: DrsBundle, expand: bool) -> list[DRSContentsDict]:
content: list[DRSContentsDict] = []
bundles = DrsBundle.query.filter_by(parent_bundle=bundle).all()

for b in bundles:
content.append(
{
**({"contents": build_contents(b, expand)} if expand else {}),
"drs_uri": create_drs_uri(b.id),
"id": b.id,
"name": b.name, # TODO: Can overwrite... see spec
}
)

for c in bundle.objects:
content.append(
{
"drs_uri": create_drs_uri(c.id),
"id": c.id,
"name": c.name, # TODO: Can overwrite... see spec
}
)

return content


def build_bento_object_json(drs_object: DrsMixin) -> DRSObjectBentoDict:
return {
"bento": {
"project_id": drs_object.project_id,
"dataset_id": drs_object.dataset_id,
"data_type": drs_object.data_type,
"public": drs_object.public,
}
}


def build_bundle_json(
drs_bundle: DrsBundle,
expand: bool = False,
with_bento_properties: bool = False,
) -> DRSObjectDict:
return {
"contents": build_contents(drs_bundle, expand),
"checksums": [
{
"checksum": drs_bundle.checksum,
"type": "sha-256",
},
],
"created_time": f"{drs_bundle.created.isoformat('T')}Z",
"size": drs_bundle.size,
"name": drs_bundle.name,
# Description should be excluded if null in the database
**({"description": drs_bundle.description} if drs_bundle.description is not None else {}),
"id": drs_bundle.id,
"self_uri": create_drs_uri(drs_bundle.id),
**(build_bento_object_json(drs_bundle) if with_bento_properties else {}),
}


def build_blob_json(
drs_blob: DrsBlob,
inside_container: bool = False,
with_bento_properties: bool = False,
) -> DRSObjectDict:
data_source = current_app.config["SERVICE_DATA_SOURCE"]

blob_url: str = urllib.parse.urljoin(
current_app.config["SERVICE_BASE_URL"] + "/",
url_for("drs_service.object_download", object_id=drs_blob.id).lstrip("/"),
)

https_access_method: DRSAccessMethodDict = {
"access_url": {
# url_for external was giving weird results - build the URL by hand instead using the internal url_for
"url": blob_url,
# No headers --> auth will have to be obtained via some
# out-of-band method, or the object's contents are public. This
# will depend on how the service is deployed.
},
"type": "https",
}

access_methods: list[DRSAccessMethodDict] = [https_access_method]

if inside_container and data_source == DATA_SOURCE_LOCAL:
access_methods.append(
{
"access_url": {
"url": f"file://{drs_blob.location}",
},
"type": "file",
}
)
elif data_source == DATA_SOURCE_MINIO:
access_methods.append(
{
"access_url": {
"url": drs_blob.location,
},
"type": "s3",
}
)

return {
"access_methods": access_methods,
"checksums": [
{
"checksum": drs_blob.checksum,
"type": "sha-256",
},
],
"created_time": f"{drs_blob.created.isoformat('T')}Z",
"size": drs_blob.size,
"name": drs_blob.name,
# Description should be excluded if null in the database
**({"description": drs_blob.description} if drs_blob.description is not None else {}),
"id": drs_blob.id,
"self_uri": create_drs_uri(drs_blob.id),
**(build_bento_object_json(drs_blob) if with_bento_properties else {}),
}


@drs_service.route("/service-info", methods=["GET"])
@drs_service.route("/ga4gh/drs/v1/service-info", methods=["GET"])
@authz_middleware.deco_public_endpoint
Expand Down Expand Up @@ -288,9 +155,36 @@ def get_drs_object(object_id: str) -> tuple[DrsBlob | DrsBundle | None, bool]:
return None, False


@drs_service.route("/objects/<string:object_id>", methods=["GET"])
@drs_service.route("/ga4gh/drs/v1/objects/<string:object_id>", methods=["GET"])
def delete_drs_object(object_id: str, logger: logging.Logger):
drs_object, is_bundle = fetch_and_check_object_permissions(object_id, P_DELETE_DATA)

logger.info(f"Deleting object {drs_object.id}")

if not is_bundle:
q = DrsBlob.query.filter_by(location=drs_object.location)
n_using_file = q.count()
if n_using_file == 1 and q.first().id == drs_object.id:
# If this object is the only one using the file, delete the file too
# TODO: this can create a race condition and leave files undeleted... should we have a cleanup on start?
logger.info(
f"Deleting file at {drs_object.location}, since {drs_object.id} is the only object referring to it."
)
backend = get_backend()
backend.delete(drs_object.location)

# Don't bother with additional bundle deleting logic, they'll be removed soon anyway. TODO

db.session.delete(drs_object)
db.session.commit()


@drs_service.route("/objects/<string:object_id>", methods=["GET", "DELETE"])
@drs_service.route("/ga4gh/drs/v1/objects/<string:object_id>", methods=["GET", "DELETE"])
def object_info(object_id: str):
if request.method == "DELETE":
delete_drs_object(object_id, current_app.logger)
return current_app.response_class(status=204)

drs_object, is_bundle = fetch_and_check_object_permissions(object_id, P_QUERY_DATA)

# The requester can ask for additional, non-spec-compliant Bento properties to be included in the response
Expand Down
Loading