Skip to content

Commit

Permalink
Support resumable chunked uploads, Bump dependencies, WIP: Versioned.…
Browse files Browse the repository at this point in the history
…is_latest -> Issue opened on SQLA
  • Loading branch information
Etienne Jodry authored and Etienne Jodry committed Mar 5, 2025
1 parent 5b0e584 commit b00e9ee
Show file tree
Hide file tree
Showing 36 changed files with 293 additions and 166 deletions.
16 changes: 7 additions & 9 deletions docs/developer_manual/table_schema.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,17 @@ Custom Schema Component
~~~~~~~~~~~~~~~~~~~~~~~

``BioDM`` provides a custom ``Schema`` component that may be used by importing
``from biodm.components.schema import Schema``.
``from biodm.utils.biodm import Schema``.

The use of this component is **optional** but provides some performance improvements.
The use of this component is **discouraged** but may provides some performance improvements by
turning SQLALchemy objects to ``transient`` state effectively disabling further lazy loading
of nested attributes upon serializing.


1. Removes `None` or equivalent (`{}`, `[]`,...) values from output JSON
This can be an alternative to circular dumping policies described
below. However this approach can be quite risky and result in missing data down the line.

2. Turn SQLALchemy objects to ``transient`` state effectively disabling further lazy loading
of nested attributes.


The latter in particular needs to be taken into account for the nested configuration
discussed down below.
This component remains provided as it may be of interest for testing purposes.


Nested flags policy
Expand Down
2 changes: 1 addition & 1 deletion src/biodm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""BioDM framework."""
__version__ = '0.8.5'
__version__ = '0.8.9'
__version_info__ = ([int(num) for num in __version__.split('.')])


Expand Down
2 changes: 0 additions & 2 deletions src/biodm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import Callable, List, Optional, Dict, Any, Type

from apispec import APISpec
# from apispec.ext.marshmallow import MarshmallowPlugin
from starlette_apispec import APISpecSchemaGenerator
from starlette.applications import Starlette
from starlette.middleware.base import BaseHTTPMiddleware
Expand All @@ -26,7 +25,6 @@
from biodm.components.controllers import Controller
from biodm.components.services import UnaryEntityService, CompositeEntityService
from biodm.error import onerror
from biodm.exceptions import RequestError
from biodm.utils.security import AuthenticationMiddleware, PermissionLookupTables
from biodm.utils.utils import to_it
from biodm.utils.apispec import BDMarshmallowPlugin
Expand Down
1 change: 0 additions & 1 deletion src/biodm/components/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Explicit re-export for mypy strict.
from .schema import Schema as Schema
from .table import Base as Base
from .table import S3File as S3File
from .table import Versioned as Versioned
Expand Down
12 changes: 7 additions & 5 deletions src/biodm/components/controllers/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@
from io import BytesIO
from typing import Any, Iterable, List, Dict, TYPE_CHECKING, Optional

# from marshmallow.schema import Schema
from marshmallow.schema import Schema
from marshmallow.exceptions import ValidationError
from sqlalchemy.exc import MissingGreenlet
from starlette.requests import Request
from starlette.responses import Response
import starlette.routing as sr

from biodm import config
from biodm.components import Schema
from biodm.component import ApiComponent
from biodm.exceptions import (
DataError, PayloadJSONDecodingError, AsyncDBError, SchemaError
)
from biodm.utils.utils import json_response
from biodm.utils.utils import json_response, remove_empty

if TYPE_CHECKING:
from biodm.component import Base
Expand Down Expand Up @@ -149,10 +148,13 @@ def serialize(
if key in only
}

# SQLA result -> python dict
serialized = cls.schema.dump(data, many=many)

# Restore to normal afterwards.
# Cleanup python dict
serialized = remove_empty(serialized)
# Restore Schema to normal
cls.schema.dump_fields = dump_fields
# python dict -> str
return json.dumps(serialized, indent=config.INDENT)

except MissingGreenlet as e:
Expand Down
4 changes: 2 additions & 2 deletions src/biodm/components/controllers/s3controller.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import List, Type

from marshmallow import RAISE, ValidationError
from marshmallow import RAISE, ValidationError, Schema
import starlette.routing as sr
from starlette.requests import Request
from starlette.responses import Response, PlainTextResponse

from biodm.components import S3File, Schema
from biodm.components import S3File
from biodm.components.services import S3Service
from biodm.components.table import Base
from biodm.schemas import PartsEtagSchema
Expand Down
48 changes: 0 additions & 48 deletions src/biodm/components/schema.py

This file was deleted.

59 changes: 37 additions & 22 deletions src/biodm/components/services/dbservice.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Database service: Translates requests data into SQLA statements and execute."""
from abc import ABCMeta
from dataclasses import dataclass
from typing import Callable, List, Sequence, Any, Dict, overload, Literal, Type, Set
from uuid import uuid4

Expand All @@ -10,7 +9,7 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import (
load_only, selectinload, joinedload, ONETOMANY, MANYTOONE, Relationship, Load
selectinload, joinedload, ONETOMANY, MANYTOONE, Relationship, Load
)
from sqlalchemy.sql import Delete, Select
from biodm import config
Expand All @@ -24,27 +23,16 @@
from biodm.tables import ListGroup, Group
from biodm.tables.asso import asso_list_group
from biodm.utils.security import UserInfo, PermissionLookupTables
from biodm.utils.sqla import CompositeInsert, UpsertStmt, UpsertStmtValuesHolder, get_max_id
from biodm.utils.sqla import (
CompositeInsert, UpsertStmt, UpsertStmtValuesHolder, get_max_id, Operator, ValuedOperator
)
from biodm.utils.utils import unevalled_all, unevalled_or, to_it, partition


NUM_OPERATORS = ("gt", "ge", "lt", "le")
AGG_OPERATORS = ("min", "max", "min_v", "max_v", "min_a", "max_a")


# TODO: [prio-low]: improve those classes to enforce lists above.
@dataclass
class Operator:
"""Contains operators parsed from query parameters."""
op: str


@dataclass
class ValuedOperator(Operator):
"""Operator special case, taking a value."""
value: Any


class DatabaseService(ApiService, metaclass=ABCMeta):
"""DB Service abstract class: manages database transactions for entities.
This class holds atomic database statement execution and utility functions plus
Expand Down Expand Up @@ -600,10 +588,13 @@ def _restrict_select_on_fields(
target = rel.mapper.entity

# Get relationship fields
stmt = stmt.options(joinedload(getattr(self.table, n)))
stmt = stmt.options(
Load(target).load_only(*[getattr(target, f) for f in nested_fields.get(n)])
) if nested_fields.get(n) else stmt
if nested_fields.get(n):
stmt = stmt.options(
joinedload(getattr(self.table, n))
.load_only(*[getattr(target, f) for f in nested_fields.get(n)])
)
else:
stmt = stmt.options(joinedload(getattr(self.table, n)))

# Filter based on permissions.
if rel.direction in (MANYTOONE, ONETOMANY): # TODO: Handle else ? -> MANYTOMANY
Expand Down Expand Up @@ -830,6 +821,26 @@ def _filter_apply_parameters(

return stmt

@overload
async def filter(
self,
fields: List[str],
params: Dict[str, str],
stmt_only: Literal[True],
user_info: UserInfo | None = None,
**kwargs
) -> Select: ...

@overload
async def filter(
self,
fields: List[str],
params: Dict[str, str],
stmt_only: Literal[False],
user_info: UserInfo | None = None,
**kwargs
) -> Sequence[Base]: ...

async def filter(
self,
fields: List[str],
Expand All @@ -838,7 +849,7 @@ async def filter(
stmt_only: bool = False,
user_info: UserInfo | None = None,
**kwargs
) -> List[Base]:
) -> Select | Sequence[Base]:
"""READ rows filted on query parameters."""
# Get special parameters
offset = int(params.pop('start', 0))
Expand Down Expand Up @@ -894,8 +905,9 @@ async def filter(
"filter arguments: count cannot be used in conjunction with stmt_only !"
)
stmt = select(func.count()).select_from(stmt.subquery())
return await self._select(stmt)
return await super()._select(stmt)

# Apply limit/offset
stmt = stmt.offset(offset).limit(limit)
# stmt = stmt.slice(offset-1, limit-1) # TODO [prio-low] investigate
return stmt if stmt_only else await self._select_many(stmt, **kwargs)
Expand Down Expand Up @@ -1105,6 +1117,9 @@ def patch(ins, mapping):
#  so it impacts performance.
await session.flush()
await session.refresh(item, rels.keys())
# if 'Dataset' in str(self):
# x = await (await (getattr(item.awaitable_attrs, 'is_latest')))
# pass
return item

# pylint: disable=arguments-differ
Expand Down
15 changes: 12 additions & 3 deletions src/biodm/components/services/kcservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ async def read_or_create(
) -> None:
"""READ group from keycloak, CREATE if missing, UPDATE if exists.
For regular users with no read permissions on groups, this method will result in a
dictionary with no 'id' field, which is required when keycloak is enabled.
Ultimately leading the insert statement
:param data: Group data
:type data: Dict[str, Any]
:param user_info: requesting user info
Expand All @@ -113,15 +117,20 @@ async def read_or_create(
return

parent_id = None
failed_parent = False
if not path.parent.parts == ('/',):
parent = await self.kc.get_group_by_path(str(path.parent), user_info=user_info)
if not parent:
raise DataError("Input path does not match any parent group.")
parent_id = parent['id']
if parent:
parent_id = parent['id']
else:
failed_parent = True

cr_id = await self.kc.create_group(path.name, parent_id, user_info=user_info)
if cr_id:
data['id'] = cr_id
if failed_parent:
# Had right to see/create group but not parent, it means parent only failed.
raise DataError("Input path does not match any parent group.")

async def write(
self,
Expand Down
39 changes: 32 additions & 7 deletions src/biodm/components/services/s3service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import List, Any, Sequence, Dict

from sqlalchemy import Insert
from sqlalchemy import Insert, Select, select
from sqlalchemy.ext.asyncio import AsyncSession

from biodm.components.table import Base, S3File
Expand Down Expand Up @@ -111,6 +111,31 @@ async def _insert_list(
await self.gen_upload_form(file, session=session)
return files

async def _check_partial_upload(self, file: S3File, session: AsyncSession):
"""Query the bucket to check what parts have been uploaded. Populates etags."""
if not await getattr(file.awaitable_attrs, 'ready'):
completed_parts = self.s3.list_multipart_parts(
await self.gen_key(file, session=session),
file.upload.s3_uploadId
)
for completed in completed_parts['Parts']:
for upart in file.upload.parts:
if upart.part_number == completed['PartNumber']:
upart.etag = completed['ETag'].strip('"')

@DatabaseManager.in_session
async def _select(self, stmt: Select, session: AsyncSession) -> Base:
file: S3File = await super()._select(stmt, session=session)
await self._check_partial_upload(file, session=session)
return file

@DatabaseManager.in_session
async def _select_many(self, stmt: Select, session: AsyncSession) -> Base:
files: S3File = await super()._select_many(stmt, session=session)
for file in files:
await self._check_partial_upload(file, session=session)
return files

@DatabaseManager.in_session
async def complete_multipart(
self,
Expand All @@ -119,11 +144,12 @@ async def complete_multipart(
session: AsyncSession
):
# parts should take the form of [{'PartNumber': part_number, 'ETag': etag}, ...]
file = await self.read(
pk_val,
fields=['ready', 'upload'] + self.key_fields,
session=session
)
# Optim: Read that calls super()._select instead of our custom select
stmt = select(self.table)
stmt = stmt.where(self.gen_cond(pk_val))
stmt = self._restrict_select_on_fields(stmt, ['ready', 'upload'] + self.key_fields, None)
file = await super()._select(stmt, session=session)

upload = await getattr(file.awaitable_attrs, 'upload')
upload_id = await getattr(upload.awaitable_attrs, 's3_uploadId')

Expand Down Expand Up @@ -168,7 +194,6 @@ async def download(
assert isinstance(file, S3File) # mypy.

await self._check_permissions("download", user_info, file.__dict__, session=session)

if not file.ready:
raise FileNotUploadedError("File exists but has not been uploaded yet.")

Expand Down
Loading

0 comments on commit b00e9ee

Please sign in to comment.