Skip to content

Commit

Permalink
Feature/upload multiple files at once to bytes (#3476)
Browse files Browse the repository at this point in the history
Signed-off-by: Donny Peeters <[email protected]>
Co-authored-by: ammar92 <[email protected]>
  • Loading branch information
Donnype and ammar92 authored Sep 10, 2024
1 parent b098d8d commit 60283ac
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 72 deletions.
23 changes: 16 additions & 7 deletions boefjes/boefjes/clients/bytes_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import typing
import uuid
from base64 import b64encode
from collections.abc import Callable, Set
from functools import wraps
from typing import Any
Expand Down Expand Up @@ -99,17 +100,25 @@ def get_normalizer_meta(self, normalizer_meta_id: uuid.UUID) -> NormalizerMeta:

@retry_with_login
def save_raw(self, boefje_meta_id: str, raw: str | bytes, mime_types: Set[str] = frozenset()) -> UUID:
headers = {"content-type": "application/octet-stream"}
headers.update(self.headers)
file_name = "raw" # The name provides a key for all ids returned, so this is arbitrary as we only upload 1 file

response = self._session.post(
"/bytes/raw",
content=raw,
headers=headers,
params={"mime_types": list(mime_types), "boefje_meta_id": boefje_meta_id},
json={
"files": [
{
"name": file_name,
"content": b64encode(raw if isinstance(raw, bytes) else raw.encode()).decode(),
"tags": list(mime_types),
}
]
},
headers=self.headers,
params={"boefje_meta_id": str(boefje_meta_id)},
)

self._verify_response(response)
return UUID(response.json()["id"])

return UUID(response.json()[file_name])

@retry_with_login
def get_raw(self, raw_data_id: str) -> bytes:
Expand Down
14 changes: 12 additions & 2 deletions bytes/bytes/api/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from pydantic import BaseModel
from pydantic import BaseModel, Field


class RawResponse(BaseModel):
status: str
message: str
id: str | None = None
ids: list[str] | None = None


class File(BaseModel):
name: str
content: str = Field(..., contentEncoding="base64")
tags: list[str] = Field(default_factory=list)


class BoefjeOutput(BaseModel):
files: list[File] = Field(default_factory=list)
97 changes: 59 additions & 38 deletions bytes/bytes/api/router.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from base64 import b64decode
from uuid import UUID

import structlog
from asgiref.sync import async_to_sync
from cachetools import TTLCache, cached
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import Response
from httpx import codes
from starlette.responses import JSONResponse

from bytes.api.models import RawResponse
from bytes.api.models import BoefjeOutput
from bytes.auth import authenticate_token
from bytes.config import get_settings
from bytes.database.sql_meta_repository import MetaIntegrityError, ObjectNotFoundException, create_meta_data_repository
Expand All @@ -34,10 +35,11 @@ def create_boefje_meta(
meta_repository.save_boefje_meta(boefje_meta)
except MetaIntegrityError:
return JSONResponse(
{"status": "failed", "message": "Integrity error: object might already exist"}, status_code=400
{"status": "failed", "message": "Integrity error: object might already exist"},
status_code=codes.BAD_REQUEST,
)

return JSONResponse({"status": "success"}, status_code=201)
return JSONResponse({"status": "success"}, status_code=codes.CREATED)


@router.get("/boefje_meta/{boefje_meta_id}", response_model=BoefjeMeta, tags=[BOEFJE_META_TAG])
Expand Down Expand Up @@ -95,10 +97,11 @@ def create_normalizer_meta(
meta_repository.save_normalizer_meta(normalizer_meta)
except MetaIntegrityError:
return JSONResponse(
{"status": "failed", "message": "Integrity error: object might already exist"}, status_code=400
{"status": "failed", "message": "Integrity error: object might already exist"},
status_code=codes.BAD_REQUEST,
)

return JSONResponse({"status": "success"}, status_code=201)
return JSONResponse({"status": "success"}, status_code=codes.CREATED)


@router.get("/normalizer_meta/{normalizer_meta_id}", response_model=NormalizerMeta, tags=[NORMALIZER_META_TAG])
Expand All @@ -109,7 +112,7 @@ def get_normalizer_meta_by_id(
try:
return meta_repository.get_normalizer_meta_by_id(normalizer_meta_id)
except ObjectNotFoundException as error:
raise HTTPException(status_code=404, detail="Normalizer meta not found") from error
raise HTTPException(status_code=codes.NOT_FOUND, detail="Normalizer meta not found") from error


@router.get("/normalizer_meta", response_model=list[NormalizerMeta], tags=[NORMALIZER_META_TAG])
Expand Down Expand Up @@ -148,42 +151,60 @@ def get_normalizer_meta(

@router.post("/raw", tags=[RAW_TAG])
def create_raw(
request: Request,
boefje_meta_id: UUID,
mime_types: list[str] | None = Query(None),
boefje_output: BoefjeOutput,
meta_repository: MetaDataRepository = Depends(create_meta_data_repository),
event_manager: EventManager = Depends(create_event_manager),
) -> RawResponse:
parsed_mime_types = [] if mime_types is None else [MimeType(value=mime_type) for mime_type in mime_types]
) -> dict[str, UUID]:
"""Parse all the raw files from the request and return the ids. The ids are ordered according to the order from the
request data, but we assume the `name` field is unique, and hence return a mapping of the file name to the id."""

try:
meta = meta_repository.get_boefje_meta_by_id(boefje_meta_id)
raw_ids = {}
mime_types_by_id = {
raw.id: set(raw.mime_types) for raw in meta_repository.get_raw(RawDataFilter(boefje_meta_id=boefje_meta_id))
}
all_parsed_mime_types = list(mime_types_by_id.values())

if meta_repository.has_raw(meta, parsed_mime_types):
return RawResponse(status="success", message="Raw data already present")
for raw in boefje_output.files:
parsed_mime_types = {MimeType(value=x) for x in raw.tags}

# FastAPI/starlette only has async versions of the Request methods, but
# all our code is sync, so we wrap it in async_to_sync.
data = async_to_sync(request.body)()
if parsed_mime_types in mime_types_by_id.values():
# Set the id for this file using the precomputed dict that maps existing primary keys to the mime-type set.
raw_ids[raw.name] = list(mime_types_by_id.keys())[list(mime_types_by_id.values()).index(parsed_mime_types)]

raw_data = RawData(value=data, boefje_meta=meta, mime_types=parsed_mime_types)
with meta_repository:
raw_id = meta_repository.save_raw(raw_data)

event = RawFileReceived(
organization=meta.organization,
raw_data=RawDataMeta(
id=raw_id,
boefje_meta=raw_data.boefje_meta,
mime_types=raw_data.mime_types,
),
)
event_manager.publish(event)
except Exception as error:
logger.exception("Error saving raw data")
raise HTTPException(status_code=500, detail="Could not save raw data") from error
continue

if parsed_mime_types in all_parsed_mime_types:
raise HTTPException(
status_code=codes.BAD_REQUEST, detail="Content types do not define unique sets of mime types."
)

try:
meta = meta_repository.get_boefje_meta_by_id(boefje_meta_id)
raw_data = RawData(value=b64decode(raw.content.encode()), boefje_meta=meta, mime_types=parsed_mime_types)

with meta_repository:
raw_id = meta_repository.save_raw(raw_data)
raw_ids[raw.name] = raw_id

all_parsed_mime_types.append(parsed_mime_types)

event = RawFileReceived(
organization=meta.organization,
raw_data=RawDataMeta(
id=raw_id,
boefje_meta=raw_data.boefje_meta,
mime_types=raw_data.mime_types,
),
)
event_manager.publish(event)
except Exception as error:
logger.exception("Error saving raw data")
raise HTTPException(status_code=codes.INTERNAL_SERVER_ERROR, detail="Could not save raw data") from error

all_parsed_mime_types.append(parsed_mime_types)

return RawResponse(status="success", message="Raw data saved", id=raw_id)
return raw_ids


@router.get("/raw/{raw_id}", tags=[RAW_TAG])
Expand All @@ -194,7 +215,7 @@ def get_raw_by_id(
try:
raw_data = meta_repository.get_raw_by_id(raw_id)
except ObjectNotFoundException as error:
raise HTTPException(status_code=404, detail="No raw data found") from error
raise HTTPException(status_code=codes.NOT_FOUND, detail="No raw data found") from error

return Response(raw_data.value, media_type="application/octet-stream")

Expand All @@ -207,7 +228,7 @@ def get_raw_meta_by_id(
try:
raw_meta = meta_repository.get_raw_meta_by_id(raw_id)
except ObjectNotFoundException as error:
raise HTTPException(status_code=404, detail="No raw data found") from error
raise HTTPException(status_code=codes.NOT_FOUND, detail="No raw data found") from error

return raw_meta

Expand Down
10 changes: 8 additions & 2 deletions bytes/bytes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ def _validate_timezone_aware_datetime(value: datetime) -> datetime:
class MimeType(BaseModel):
value: str

def __hash__(self):
return hash(self.value)

def __lt__(self, other: MimeType):
return self.value < other.value


class Job(BaseModel):
id: UUID
Expand Down Expand Up @@ -69,7 +75,7 @@ class RawDataMeta(BaseModel):

id: UUID
boefje_meta: BoefjeMeta
mime_types: list[MimeType] = Field(default_factory=list)
mime_types: set[MimeType] = Field(default_factory=set)

# These are set once the raw is saved
secure_hash: SecureHash | None = None
Expand All @@ -80,7 +86,7 @@ class RawDataMeta(BaseModel):
class RawData(BaseModel):
value: bytes
boefje_meta: BoefjeMeta
mime_types: list[MimeType] = Field(default_factory=list)
mime_types: set[MimeType] = Field(default_factory=set)

# These are set once the raw is saved
secure_hash: SecureHash | None = None
Expand Down
31 changes: 24 additions & 7 deletions bytes/tests/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typing
from base64 import b64encode
from collections.abc import Callable
from functools import wraps
from typing import Any
Expand All @@ -7,6 +8,7 @@
import httpx
from httpx import HTTPError

from bytes.api.models import BoefjeOutput
from bytes.models import BoefjeMeta, NormalizerMeta
from bytes.repositories.meta_repository import BoefjeMetaFilter, NormalizerMetaFilter, RawDataFilter

Expand Down Expand Up @@ -126,19 +128,34 @@ def save_raw(self, boefje_meta_id: UUID, raw: bytes, mime_types: list[str] | Non
if not mime_types:
mime_types = []

headers = {"content-type": "application/octet-stream"}

file_name = "raw" # The name provides a key for all ids returned, so this is arbitrary as we only upload 1 file
response = self.client.post(
"/bytes/raw",
content=raw,
headers=headers,
params={"mime_types": mime_types, "boefje_meta_id": str(boefje_meta_id)},
json={
"files": [
{
"name": file_name,
"content": b64encode(raw).decode(),
"tags": mime_types,
}
],
},
params={"boefje_meta_id": str(boefje_meta_id)},
)
self._verify_response(response)

return response.json()[file_name]

@retry_with_login
def save_raws(self, boefje_meta_id: UUID, boefje_output: BoefjeOutput) -> dict[str, str]:
response = self.client.post(
"/bytes/raw",
content=boefje_output.model_dump_json(),
params={"boefje_meta_id": str(boefje_meta_id)},
)
self._verify_response(response)
raw_id = response.json()["id"]

return str(raw_id)
return response.json()

@retry_with_login
def get_raw(self, raw_id: UUID) -> bytes:
Expand Down
Loading

0 comments on commit 60283ac

Please sign in to comment.