Skip to content

Commit

Permalink
BROKEN - start task-collection refactor (ref #1853, ref #1854)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcompa committed Oct 8, 2024
1 parent c47cbcf commit b1f29c9
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 173 deletions.
176 changes: 79 additions & 97 deletions fractal_server/app/routes/api/v2/task_collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from pathlib import Path
from shutil import copy as shell_copy
from tempfile import TemporaryDirectory
Expand All @@ -21,26 +20,24 @@
from ....db import AsyncSession
from ....db import get_async_db
from ....models.v2 import CollectionStateV2
from ....models.v2 import TaskGroupV2
from ....models.v2 import TaskV2
from ....schemas.v2 import CollectionStateReadV2
from ....schemas.v2 import CollectionStatusV2
from ....schemas.v2 import TaskCollectPipV2
from ....schemas.v2 import TaskReadV2
from ....schemas.v2 import TaskGroupCreateV2
from ...aux.validate_user_settings import validate_user_settings
from ._aux_functions_tasks import _get_valid_user_group_id
from fractal_server.app.models import UserOAuth
from fractal_server.app.routes.auth import current_active_user
from fractal_server.app.routes.auth import current_active_verified_user
from fractal_server.string_tools import slugify_task_name_for_source
from fractal_server.tasks.utils import _normalize_package_name
from fractal_server.tasks.utils import get_absolute_venv_path
from fractal_server.tasks.utils import get_collection_log
from fractal_server.tasks.utils import get_collection_path
from fractal_server.tasks.v2._TaskCollectPip import _TaskCollectPip
from fractal_server.tasks.v2.background_operations import (
background_collect_pip,
)
from fractal_server.tasks.v2.endpoint_operations import create_package_dir_pip
from fractal_server.tasks.v2.endpoint_operations import download_package
from fractal_server.tasks.v2.endpoint_operations import (
get_package_version_from_pypi,
Expand Down Expand Up @@ -107,19 +104,32 @@ async def collect_tasks_pip(
),
)

# Populate fields
# Populate task-group attributes
task_group_attrs = dict(
user_id=user.id,
python_version=task_collect.python_version,
)
if task_collect.package.startswith("/") and task_collect.package.endswith(
".whl"
):
wheel_info = _parse_wheel_filename(task_collect.package)
task_group_attrs["pkg_name"] = wheel_info["distribution"]
if task_collect.package.endswith(".whl"):
try:
wheel_filename = Path(task_collect.package).name
wheel_info = _parse_wheel_filename(wheel_filename)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"Invalid wheel-file name {wheel_filename}. "
f"Original error: {str(e)}",
),
)
task_group_attrs["pkg_name"] = _normalize_package_name(
wheel_info["distribution"]
)
task_group_attrs["version"] = wheel_info["version"]
task_group_attrs["origin"] = "wheel-file"
else:
pkg_name = task_collect.package
task_group_attrs["pkg_name"] = _normalize_package_name(pkg_name)
task_group_attrs["origin"] = "pypi"
if task_collect.package_version is None:
latest_version = await get_package_version_from_pypi(
task_collect.package
Expand Down Expand Up @@ -150,6 +160,55 @@ async def collect_tasks_pip(
user_id=user.id,
db=db,
)
task_group_attrs["user_group_id"] = user_group_id

# Construct task_group.path
if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
base_tasks_path = user_settings.ssh_tasks_dir
else:
base_tasks_path = settings.FRACTAL_TASKS_DIR.as_posix()
task_group_path = (
Path(base_tasks_path)
/ str(user.id)
/ task_group_attrs["pkg_name"]
/ task_group_attrs["version"]
).as_posix()
task_group_attrs["path"] = task_group_path
task_group_attrs["venv_path"] = Path(task_group_path, "venv").as_posix()

# Validate TaskGroupV2 attributes
# FIXME: add try/except?
TaskGroupCreateV2(**task_group_attrs)

# FIXME: verify non-duplication constraint
pass

# Verify that task-group path is unique
stm = select(TaskGroupV2).where(TaskGroupV2.path == task_group_path)
res = await db.execute(stm)
for conflicting_task_group in res.scalars().all():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"Another task-group already has path={task_group_path}.\n"
f"{conflicting_task_group=}"
),
)

# Verify that folder does not exist (for local collection)
if settings.FRACTAL_RUNNER_BACKEND != "slurm_ssh":
if Path(task_group_path).exists():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{task_group_path} already exists.",
)

# Create TaskGroupV2 object
task_group = TaskGroupV2(**task_group_attrs)
db.add(task_group)
await db.commit()
await db.refresh(task_group)
db.expunge(task_group)

# END of SSH/non-SSH common part

Expand Down Expand Up @@ -180,11 +239,9 @@ async def collect_tasks_pip(
background_tasks.add_task(
background_collect_pip_ssh,
state_id=state.id,
task_pkg=task_pkg,
task_group=task_group,
fractal_ssh=fractal_ssh,
tasks_base_dir=user_settings.ssh_tasks_dir,
user_id=user.id,
user_group_id=user_group_id,
task_pkg=task_pkg, # FIXME: move to task_group if possible
)

response.status_code = status.HTTP_201_CREATED
Expand All @@ -194,6 +251,7 @@ async def collect_tasks_pip(

logger = set_logger(logger_name="collect_tasks_pip")

# Read manifest - FIXME: move to background task
with TemporaryDirectory() as tmpdir:
try:
# Copy or download the package wheel file to tmpdir
Expand All @@ -217,83 +275,6 @@ async def collect_tasks_pip(
detail=f"Invalid package or manifest. Original error: {e}",
)

try:
venv_path = create_package_dir_pip(task_pkg=task_pkg)
except FileExistsError:
venv_path = create_package_dir_pip(task_pkg=task_pkg, create=False)
try:
package_path = get_absolute_venv_path(venv_path)
collection_path = get_collection_path(package_path)
with collection_path.open("r") as f:
task_collect_data = json.load(f)

err_msg = (
"Cannot collect package, possible reason: an old version of "
"the same package has already been collected.\n"
f"{str(collection_path)} has invalid content: "
)
if not isinstance(task_collect_data, dict):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{err_msg} it's not a Python dictionary.",
)
if "task_list" not in task_collect_data.keys():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{err_msg} it has no key 'task_list'.",
)
if not isinstance(task_collect_data["task_list"], list):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{err_msg} 'task_list' is not a Python list.",
)

for task_dict in task_collect_data["task_list"]:

task = TaskReadV2(**task_dict)
db_task = await db.get(TaskV2, task.id)
if (
(not db_task)
or db_task.source != task.source
or db_task.name != task.name
):
await db.close()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"Cannot collect package. Folder already exists, "
f"but task {task.id} does not exists or it does "
f"not have the expected source ({task.source}) or "
f"name ({task.name})."
),
)
except FileNotFoundError as e:
await db.close()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"Cannot collect package. Possible reason: another "
"collection of the same package is in progress. "
f"Original FileNotFoundError: {e}"
),
)
except ValidationError as e:
await db.close()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"Cannot collect package. Possible reason: an old version "
"of the same package has already been collected. "
f"Original ValidationError: {e}"
),
)
task_collect_data["info"] = "Already installed"
state = CollectionStateV2(data=task_collect_data)
response.status_code == status.HTTP_200_OK
await db.close()
return state
settings = Inject(get_settings)

# Check that tasks are not already in the DB
for new_task in task_pkg.package_manifest.task_list:
new_task_name_slug = slugify_task_name_for_source(new_task.name)
Expand All @@ -312,7 +293,7 @@ async def collect_tasks_pip(
# All checks are OK, proceed with task collection
collection_status = dict(
status=CollectionStatusV2.PENDING,
venv_path=venv_path.relative_to(settings.FRACTAL_TASKS_DIR).as_posix(),
venv_path=task_group_attrs["venv_path"],
package=task_pkg.package,
)
state = CollectionStateV2(data=collection_status)
Expand All @@ -323,10 +304,11 @@ async def collect_tasks_pip(
background_tasks.add_task(
background_collect_pip,
state_id=state.id,
venv_path=venv_path,
task_pkg=task_pkg,
user_id=user.id,
user_group_id=user_group_id,
task_group=task_group,
venv_path=Path(
task_group_attrs["venv_path"]
), # FIXME: move to task_group if possible
task_pkg=task_pkg, # FIXME: move to task_group if possible
)
logger.debug(
"Task-collection endpoint: start background collection "
Expand Down
20 changes: 14 additions & 6 deletions fractal_server/app/routes/api/v2/task_collection_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from fractal_server.app.db import get_sync_db
from fractal_server.app.models import UserOAuth
from fractal_server.app.models.v1 import Task as TaskV1
from fractal_server.app.models.v2 import TaskGroupV2
from fractal_server.app.models.v2 import TaskV2
from fractal_server.app.routes.auth import current_active_verified_user
from fractal_server.app.routes.aux.validate_user_settings import (
Expand All @@ -33,7 +34,7 @@
_prepare_tasks_metadata,
)
from fractal_server.tasks.v2.database_operations import (
create_db_task_group_and_tasks,
create_db_tasks_and_update_task_group,
)

router = APIRouter()
Expand Down Expand Up @@ -189,14 +190,21 @@ async def collect_task_custom(
# Prepare task-group attributes
task_group_attrs = dict(
origin="other",
pkg_name=task_collect.source, # FIXME
pkg_name=task_collect.source, # FIXME: pick one
user_id=user.id,
user_group_id=user_group_id,
)
TaskGroupCreateV2(**task_group_attrs)

task_group = TaskGroupV2(**task_group_attrs)
db.add(task_group)
await db.commit()
await db.refresh(task_group)
db.expunge(task_group)

task_group = create_db_task_group_and_tasks(
task_group = create_db_tasks_and_update_task_group(
task_list=task_list,
task_group_obj=TaskGroupCreateV2(**task_group_attrs),
user_id=user.id,
user_group_id=user_group_id,
task_group_id=task_group.id,
db=db_sync,
)

Expand Down
Loading

0 comments on commit b1f29c9

Please sign in to comment.