Skip to content

Commit

Permalink
Merge branch 'main' into release/v2.0-beta-3
Browse files Browse the repository at this point in the history
  • Loading branch information
lmarini committed Sep 17, 2024
2 parents a28c5fb + 7d054e0 commit 1712540
Show file tree
Hide file tree
Showing 37 changed files with 1,456 additions and 310 deletions.
28 changes: 25 additions & 3 deletions backend/app/deps/authorization_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ class ListenerAuthorization:
For more info see https://fastapi.tiangolo.com/advanced/advanced-dependencies/.
Regular users are not allowed to run non-active listeners"""

# def __init__(self, optional_arg: str = None):
# self.optional_arg = optional_arg
# def __init__(self, role: str = "viewer"):
# self.role = role

async def __call__(
self,
Expand All @@ -438,10 +438,32 @@ async def __call__(
if admin and admin_mode:
return True

# Else check if listener is active or current user is the creator of the extractor
if (
listener := await EventListenerDB.get(PydanticObjectId(listener_id))
) is not None:
# If listener has access restrictions, evaluate them against requesting user
if listener.access is not None:
group_q = await GroupDB.find(
Or(
GroupDB.creator == current_user,
GroupDB.users.email == current_user,
),
).to_list()
user_groups = [g.id for g in group_q]

valid_modificaiton = (
(admin and admin_mode)
or (listener.creator and listener.creator.email == current_user)
or (listener.access.owner == current_user)
or (current_user in listener.access.users)
or (not set(user_groups).isdisjoint(listener.access.groups))
)
if not valid_modificaiton:
raise HTTPException(
status_code=403,
detail=f"User `{current_user} does not have permission on listener `{listener_id}`",
)

if listener.active is True or (
listener.creator and listener.creator.email == current_user
):
Expand Down
27 changes: 19 additions & 8 deletions backend/app/heartbeat_listener_sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import logging

import pika
from beanie import PydanticObjectId

from app.config import settings
from app.models.listeners import EventListenerDB, EventListenerOut, ExtractorInfo
from app.models.search import SearchCriteria
Expand Down Expand Up @@ -29,22 +30,32 @@ def callback(ch, method, properties, body):
extractor_db = EventListenerDB(
**extractor_info, properties=ExtractorInfo(**extractor_info)
)
owner = msg["owner"]
if owner is not None:
extractor_db.access = {"owner": owner}

mongo_client = MongoClient(settings.MONGODB_URL)
db = mongo_client[settings.MONGO_DATABASE]

# check to see if extractor already exists
existing_extractor = db["listeners"].find_one({"name": msg["queue"]})
if owner is None:
existing_extractor = EventListenerDB.find_one(
EventListenerDB.name == msg["queue"], EventListenerDB.access == None
)
else:
existing_extractor = EventListenerDB.find_one(
EventListenerDB.name == msg["queue"], EventListenerDB.access.owner == owner
)
if existing_extractor is not None:
# Update existing listener
existing_version = existing_extractor["version"]
new_version = extractor_db.version
if version.parse(new_version) > version.parse(existing_version):
# if this is a new version, add it to the database
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
new_extractor = EventListenerDB.insert_one(extractor_db.to_mongo())
found = EventListenerDB.get(PydanticObjectId(new_extractor.inserted_id))
# TODO - for now we are not deleting an older version of the extractor, just adding a new one
# removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]})
# removed = EventListenerDB.delete_one(EventListenerDB.id == PydanticObjectId(existing_extractor["_id"]))
extractor_out = EventListenerOut.from_mongo(found)
logger.info(
"%s updated from %s to %s"
Expand All @@ -53,8 +64,8 @@ def callback(ch, method, properties, body):
return extractor_out
else:
# Register new listener
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
new_extractor = EventListenerDB.insert_one(extractor_db.to_mongo())
found = EventListenerDB.get(PydanticObjectId(new_extractor.inserted_id))
extractor_out = EventListenerOut.from_mongo(found)
logger.info("New extractor registered: " + extractor_name)

Expand Down Expand Up @@ -97,7 +108,7 @@ def callback(ch, method, properties, body):
FeedListener(listener_id=extractor_out.id, automatic=True)
],
)
db["feeds"].insert_one(new_feed.to_mongo())
FeedDB.insert_one(new_feed.to_mongo())

return extractor_out

Expand Down
19 changes: 16 additions & 3 deletions backend/app/models/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from typing import List, Optional, Union

import pymongo
from beanie import Document, PydanticObjectId, View
from pydantic import AnyUrl, BaseModel, Field

from app.config import settings
from app.models.authorization import AuthorizationDB
from app.models.mongomodel import MongoDBRef
from app.models.users import UserOut
from beanie import Document, PydanticObjectId, View
from pydantic import AnyUrl, BaseModel, Field


class Repository(BaseModel):
Expand Down Expand Up @@ -37,6 +38,17 @@ class ExtractorInfo(BaseModel):
categories: Optional[List[str]] = []
parameters: Optional[dict] = None
version: Optional[str] = "1.0"
unique_key: Optional[str] = None


class AccessList(BaseModel):
"""Container object for lists of user emails/group IDs/dataset IDs that can submit to listener.
The singular owner is the primary who can modify other lists."""

owner: str
users: List[str] = []
groups: List[PydanticObjectId] = []
datasets: List[PydanticObjectId] = []


class EventListenerBase(BaseModel):
Expand All @@ -45,6 +57,7 @@ class EventListenerBase(BaseModel):
name: str
version: str = "1.0"
description: str = ""
access: Optional[AccessList] = None


class EventListenerIn(EventListenerBase):
Expand All @@ -68,7 +81,7 @@ class EventListenerDB(Document, EventListenerBase):
created: datetime = Field(default_factory=datetime.now)
modified: datetime = Field(default_factory=datetime.now)
lastAlive: datetime = None
alive: Optional[bool] = None # made up field to indicate if extractor is alive
alive: Optional[bool] = None
active: bool = False
properties: Optional[ExtractorInfo] = None

Expand Down
2 changes: 1 addition & 1 deletion backend/app/routers/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ async def get_dataset_roles(
roles = DatasetRoles(dataset_id=str(dataset.id))

async for auth in AuthorizationDB.find(
AuthorizationDB.dataset_id == ObjectId(dataset_id)
AuthorizationDB.dataset_id == PydanticObjectId(dataset_id)
):
# First, fetch all groups that have a role on the dataset
group_user_counts = {}
Expand Down
38 changes: 21 additions & 17 deletions backend/app/routers/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,18 +323,18 @@ async def get_dataset_files(
) is not None:
if authenticated or public or (admin and admin_mode):
query = [
FileDBViewList.dataset_id == ObjectId(dataset_id),
FileDBViewList.dataset_id == PydanticObjectId(dataset_id),
]
else:
query = [
FileDBViewList.dataset_id == ObjectId(dataset_id),
FileDBViewList.dataset_id == PydanticObjectId(dataset_id),
Or(
FileDBViewList.creator.email == user_id,
FileDBViewList.auth.user_ids == user_id,
),
]
if folder_id is not None:
query.append(FileDBViewList.folder_id == ObjectId(folder_id))
query.append(FileDBViewList.folder_id == PydanticObjectId(folder_id))

files_and_count = (
await FileDBViewList.find(*query)
Expand Down Expand Up @@ -404,7 +404,7 @@ async def patch_dataset(

if dataset_info.status is not None:
query = [
FileDBViewList.dataset_id == ObjectId(dataset_id),
FileDBViewList.dataset_id == PydanticObjectId(dataset_id),
]
files_views = await FileDBViewList.find(*query).to_list()
for file_view in files_views:
Expand Down Expand Up @@ -710,18 +710,20 @@ async def get_dataset_folders(
) is not None:
if authenticated or public:
query = [
FolderDBViewList.dataset_id == ObjectId(dataset_id),
FolderDBViewList.dataset_id == PydanticObjectId(dataset_id),
]
else:
query = [
FolderDBViewList.dataset_id == ObjectId(dataset_id),
FolderDBViewList.dataset_id == PydanticObjectId(dataset_id),
Or(
FolderDBViewList.creator.email == user_id,
FolderDBViewList.auth.user_ids == user_id,
),
]
if parent_folder is not None:
query.append(FolderDBViewList.parent_folder == ObjectId(parent_folder))
query.append(
FolderDBViewList.parent_folder == PydanticObjectId(parent_folder)
)
else:
query.append(FolderDBViewList.parent_folder == None) # noqa: E711

Expand Down Expand Up @@ -769,11 +771,11 @@ async def get_dataset_folders_and_files(
) is not None:
if authenticated or public or (admin and admin_mode):
query = [
FolderFileViewList.dataset_id == ObjectId(dataset_id),
FolderFileViewList.dataset_id == PydanticObjectId(dataset_id),
]
else:
query = [
FolderFileViewList.dataset_id == ObjectId(dataset_id),
FolderFileViewList.dataset_id == PydanticObjectId(dataset_id),
Or(
FolderFileViewList.creator.email == user_id,
FolderFileViewList.auth.user_ids == user_id,
Expand All @@ -791,8 +793,8 @@ async def get_dataset_folders_and_files(
else:
query.append(
Or(
FolderFileViewList.folder_id == ObjectId(folder_id),
FolderFileViewList.parent_folder == ObjectId(folder_id),
FolderFileViewList.folder_id == PydanticObjectId(folder_id),
FolderFileViewList.parent_folder == PydanticObjectId(folder_id),
)
)

Expand Down Expand Up @@ -839,15 +841,17 @@ async def delete_folder(
if (await DatasetDB.get(PydanticObjectId(dataset_id))) is not None:
if (folder := await FolderDB.get(PydanticObjectId(folder_id))) is not None:
# delete current folder and files
async for file in FileDB.find(FileDB.folder_id == ObjectId(folder_id)):
async for file in FileDB.find(
FileDB.folder_id == PydanticObjectId(folder_id)
):
await remove_file_entry(file.id, fs, es)

# recursively delete child folder and files
async def _delete_nested_folders(parent_folder_id):
while (
await FolderDB.find_one(
FolderDB.dataset_id == ObjectId(dataset_id),
FolderDB.parent_folder == ObjectId(parent_folder_id),
FolderDB.dataset_id == PydanticObjectId(dataset_id),
FolderDB.parent_folder == PydanticObjectId(parent_folder_id),
)
) is not None:
async for subfolder in FolderDB.find(
Expand Down Expand Up @@ -1212,7 +1216,7 @@ async def download_dataset(

# Write dataset metadata if found
metadata = await MetadataDB.find(
MetadataDB.resource.resource_id == ObjectId(dataset_id)
MetadataDB.resource.resource_id == PydanticObjectId(dataset_id)
).to_list()
if len(metadata) > 0:
datasetmetadata_path = os.path.join(
Expand All @@ -1235,7 +1239,7 @@ async def download_dataset(
file_count = 0

async for file in FileDBViewList.find(
FileDBViewList.dataset_id == ObjectId(dataset_id)
FileDBViewList.dataset_id == PydanticObjectId(dataset_id)
):
# find the bytes id
# if it's working draft file_id == origin_id
Expand Down Expand Up @@ -1273,7 +1277,7 @@ async def download_dataset(
bag_size += current_file_size

metadata = await MetadataDB.find(
MetadataDB.resource.resource_id == ObjectId(dataset_id)
MetadataDB.resource.resource_id == PydanticObjectId(dataset_id)
).to_list()
if len(metadata) > 0:
metadata_filename = file_name + "_metadata.json"
Expand Down
Loading

0 comments on commit 1712540

Please sign in to comment.