Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-package: Add handling for IR extraction jobs to the query scheduler and workers. #460

Merged
merged 30 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
decfb0f
Add job types
haiqi96 Jun 18, 2024
e52877c
Add base class for SearchQuery
haiqi96 Jun 19, 2024
d28681c
Fix
haiqi96 Jun 19, 2024
d027279
Linter
haiqi96 Jun 19, 2024
92e69fd
fixes
haiqi96 Jun 19, 2024
f0ee686
fixes
haiqi96 Jun 19, 2024
14100c2
Initial support for IR task flow in the scheduler
haiqi96 Jun 19, 2024
133481f
Some renaming and remove debug print
haiqi96 Jun 19, 2024
7b1dfd5
First draft
haiqi96 Jun 21, 2024
afa26f9
First Refactoring
haiqi96 Jun 21, 2024
e2cdcb1
Refactoring
haiqi96 Jun 21, 2024
46431cf
refactor log messages and names
haiqi96 Jun 21, 2024
43dede4
remove unused imports
haiqi96 Jun 21, 2024
57492bb
fix
haiqi96 Jun 21, 2024
3a785f8
Polishing
haiqi96 Jun 21, 2024
f703001
Merge remote-tracking branch 'origin/main' into finalize_extraction_job
haiqi96 Jun 22, 2024
26ae8de
Merge branch 'main' of https://github.com/haiqi96/clp_fork into final…
haiqi96 Jun 24, 2024
d740476
Fixes
haiqi96 Jun 24, 2024
fbbf7f5
Fixes again
haiqi96 Jun 24, 2024
f00029c
linter
haiqi96 Jun 24, 2024
df88105
Add configurable option for target_uncompressed_size
haiqi96 Jun 25, 2024
bebde54
Apply suggestions from code review
haiqi96 Jun 26, 2024
b5d40ea
First batch of update
haiqi96 Jun 26, 2024
fb680c0
Another batch of change. DOCSTRING IS NOT READY YET
haiqi96 Jun 26, 2024
76e16b3
Refactor common functions
haiqi96 Jun 26, 2024
fe36599
Add doc string
haiqi96 Jun 26, 2024
0529b37
Linter
haiqi96 Jun 26, 2024
bd7083f
Update docstring.
kirkrodrigues Jun 28, 2024
4a4d5b2
Rename generic_run_query_task to run_query_task; Make search_task's m…
kirkrodrigues Jun 28, 2024
ba51583
remove str conversion for the envvars since they were already string …
haiqi96 Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
self.data_dir: typing.Optional[DockerMount] = None
self.logs_dir: typing.Optional[DockerMount] = None
self.archives_output_dir: typing.Optional[DockerMount] = None
self.ir_output_dir: typing.Optional[DockerMount] = None


def get_clp_home():
Expand Down Expand Up @@ -224,6 +225,19 @@ def generate_container_config(clp_config: CLPConfig, clp_home: pathlib.Path):
container_clp_config.archive_output.directory,
)

container_clp_config.ir_output.directory = pathlib.Path("/") / "mnt" / "ir-output"
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
):
docker_mounts.ir_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
)

return container_clp_config, docker_mounts


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import subprocess
import sys
import time
import typing
import uuid
from typing import Any, Dict, List, Optional

import yaml
from clp_py_utils.clp_config import (
Expand Down Expand Up @@ -526,6 +526,8 @@ def start_compression_worker(
clp_config.redis.compression_backend_database,
num_cpus,
mounts,
None,
None,
)


Expand All @@ -538,6 +540,13 @@ def start_query_worker(
):
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"

query_worker_mount = [mounts.ir_output_dir]
query_worker_env = {
"CLP_IR_OUTPUT_DIR": container_clp_config.ir_output.directory,
"CLP_IR_COLLECTION": clp_config.results_cache.ir_collection_name,
}

generic_start_worker(
QUERY_WORKER_COMPONENT_NAME,
instance_id,
Expand All @@ -549,6 +558,8 @@ def start_query_worker(
clp_config.redis.query_backend_database,
num_cpus,
mounts,
query_worker_env,
query_worker_mount,
)


Expand All @@ -563,6 +574,8 @@ def generic_start_worker(
redis_database: int,
num_cpus: int,
mounts: CLPDockerMounts,
worker_specific_env: Dict[str, Any],
worker_specific_mount: List[Optional[DockerMount]],
):
logger.info(f"Starting {component_name}...")

Expand All @@ -578,6 +591,7 @@ def generic_start_worker(

# Create necessary directories
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.ir_output.directory.mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
# fmt: off
Expand All @@ -588,6 +602,7 @@ def generic_start_worker(
"-w", str(CONTAINER_CLP_HOME),
"--name", container_name,
"--log-driver", "local",
"-u", f"{os.getuid()}:{os.getgid()}",
"-e", f"PYTHONPATH={clp_site_packages_dir}",
"-e", (
f"BROKER_URL=amqp://"
Expand All @@ -604,20 +619,28 @@ def generic_start_worker(
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}",
"-u", f"{os.getuid()}:{os.getgid()}",
"--mount", str(mounts.clp_home),
]
if worker_specific_env:
for env_name, env_value in worker_specific_env.items():
container_start_cmd.append("-e")
container_start_cmd.append(f"{env_name}={env_value}")

# fmt: on
necessary_mounts = [
mounts.clp_home,
mounts.data_dir,
mounts.logs_dir,
mounts.archives_output_dir,
mounts.input_logs_dir,
]
if worker_specific_mount:
necessary_mounts.extend(worker_specific_mount)

for mount in necessary_mounts:
if mount:
container_start_cmd.append("--mount")
container_start_cmd.append(str(mount))
if not mount:
raise ValueError(f"Required mount configuration is empty: {necessary_mounts}")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
container_start_cmd.append("--mount")
container_start_cmd.append(str(mount))
container_start_cmd.append(clp_config.execution_container)

worker_cmd = [
Expand Down Expand Up @@ -645,8 +668,8 @@ def generic_start_worker(

def update_meteor_settings(
parent_key_prefix: str,
settings: typing.Dict[str, typing.Any],
updates: typing.Dict[str, typing.Any],
settings: Dict[str, Any],
updates: Dict[str, Any],
):
"""
Recursively updates the given Meteor settings object with the values from `updates`.
Expand Down
29 changes: 29 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ class ResultsCache(BaseModel):
host: str = "localhost"
port: int = 27017
db_name: str = "clp-search"
ir_collection_name: str = "clp-ir"
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

@validator("host")
def validate_host(cls, field):
Expand All @@ -268,6 +269,12 @@ def validate_db_name(cls, field):
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.db_name cannot be empty.")
return field

@validator("ir_collection_name")
def validate_ir_collection_name(cls, field):
if "" == field:
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.ir_collection_name cannot be empty.")
return field

def get_uri(self):
return f"mongodb://{self.host}:{self.port}/{self.db_name}"

Expand Down Expand Up @@ -321,6 +328,25 @@ def dump_to_primitive_dict(self):
return d


class IrOutput(BaseModel):
directory: pathlib.Path = pathlib.Path("var") / "data" / "ir"

@validator("directory")
def validate_directory(cls, field):
if "" == field:
raise ValueError("directory can not be empty")
return field

def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.directory = make_config_path_absolute(clp_home, self.directory)

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d["directory"] = str(d["directory"])
return d


class WebUi(BaseModel):
host: str = "localhost"
port: int = 4000
Expand Down Expand Up @@ -368,6 +394,7 @@ class CLPConfig(BaseModel):
credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH

archive_output: ArchiveOutput = ArchiveOutput()
ir_output: IrOutput = IrOutput()
data_directory: pathlib.Path = pathlib.Path("var") / "data"
logs_directory: pathlib.Path = pathlib.Path("var") / "log"

Expand All @@ -377,6 +404,7 @@ def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory)
self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path)
self.archive_output.make_config_paths_absolute(clp_home)
self.ir_output.make_config_paths_absolute(clp_home)
self.data_directory = make_config_path_absolute(clp_home, self.data_directory)
self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory)
self._os_release_file_path = make_config_path_absolute(clp_home, self._os_release_file_path)
Expand Down Expand Up @@ -463,6 +491,7 @@ def load_redis_credentials_from_file(self):
def dump_to_primitive_dict(self):
d = self.dict()
d["archive_output"] = self.archive_output.dump_to_primitive_dict()
d["ir_output"] = self.ir_output.dump_to_primitive_dict()
# Turn paths into primitive strings
d["input_logs_directory"] = str(self.input_logs_directory)
d["credentials_file_path"] = str(self.credentials_file_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

from job_orchestration.scheduler.constants import QueueName

imports = "job_orchestration.executor.query.fs_search_task"
imports = (
"job_orchestration.executor.query.fs_search_task",
"job_orchestration.executor.query.extract_ir_task",
)

task_routes = {
"job_orchestration.executor.query.fs_search_task.search": QueueName.QUERY,
"job_orchestration.executor.query.extract_ir_task.extract_ir": QueueName.QUERY,
}
task_create_missing_queues = True

Expand Down
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import datetime
import os
import subprocess
from pathlib import Path
from typing import Any, Dict

from celery.app.task import Task
from celery.utils.log import get_task_logger
from clp_py_utils.clp_config import Database, StorageEngine
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.query.celery import app
from job_orchestration.scheduler.job_config import ExtractIrJobConfig
from job_orchestration.scheduler.scheduler_data import QueryTaskResult, QueryTaskStatus

from .utils import generate_final_task_results, get_logger_file_path, update_query_task_metadata

# Setup logging
logger = get_task_logger(__name__)


def make_command(
storage_engine: str,
clp_home: Path,
archives_dir: Path,
ir_output_dir: Path,
archive_id: str,
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
extract_ir_config: ExtractIrJobConfig,
results_cache_uri: str,
results_collection: str,
):
if StorageEngine.CLP == storage_engine:
if not extract_ir_config.file_split_id:
raise ValueError(f"file_split_id not supplied")
command = [
str(clp_home / "bin" / "clo"),
"i",
str(archives_dir / archive_id),
extract_ir_config.file_split_id,
str(ir_output_dir),
results_cache_uri,
results_collection,
]
if extract_ir_config.target_size is not None:
command.append("--target-size")
command.append(extract_ir_config.target_size)
else:
raise ValueError(f"Unsupported storage engine {storage_engine}")

return command


@app.task(bind=True)
def extract_ir(
self: Task,
job_id: str,
task_id: int,
job_config_obj: dict,
archive_id: str,
clp_metadata_db_conn_params: dict,
results_cache_uri: str,
) -> Dict[str, Any]:
clp_home = Path(os.getenv("CLP_HOME"))
archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR"))
ir_directory = Path(os.getenv("CLP_IR_OUTPUT_DIR"))
clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL"))
clp_storage_engine = str(os.getenv("CLP_STORAGE_ENGINE"))

ir_collection = str(os.getenv("CLP_IR_COLLECTION"))

# Setup logging to file
set_logging_level(logger, clp_logging_level)
clo_log_path = get_logger_file_path(clp_logs_dir, job_id, task_id)
clo_log_file = open(clo_log_path, "w")

logger.info(f"Started IR extraction task for job {job_id}")

extract_ir_config = ExtractIrJobConfig.parse_obj(job_config_obj)
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

start_time = datetime.datetime.now()
task_status: QueryTaskStatus
try:
task_command = make_command(
storage_engine=clp_storage_engine,
clp_home=clp_home,
archives_dir=archive_directory,
ir_output_dir=ir_directory,
archive_id=archive_id,
extract_ir_config=extract_ir_config,
results_cache_uri=results_cache_uri,
results_collection=ir_collection,
)
except ValueError as e:
error_message = f"Error creating IR extraction command: {e}"
logger.error(error_message)
clo_log_file.write(error_message)

task_status = QueryTaskStatus.FAILED
update_query_task_metadata(
sql_adapter,
task_id,
dict(status=task_status, duration=0, start_time=start_time),
)

clo_log_file.close()
return QueryTaskResult(
task_id=task_id,
status=task_status,
duration=0,
error_log_path=str(clo_log_path),
).dict()

task_status = QueryTaskStatus.RUNNING
update_query_task_metadata(
sql_adapter, task_id, dict(status=task_status, start_time=start_time)
)

logger.info(f'Running: {" ".join(task_command)}')
extract_proc = subprocess.Popen(
task_command,
preexec_fn=os.setpgrp,
close_fds=True,
stdout=clo_log_file,
stderr=clo_log_file,
)

logger.info("Waiting for IR extraction to finish")
# communicate is equivalent to wait in this case, but avoids deadlocks if we switch to piping
# stdout/stderr in the future.
extract_proc.communicate()
return_code = extract_proc.returncode
if 0 != return_code:
task_status = QueryTaskStatus.FAILED
logger.error(
f"IR extraction task {task_id} failed for job {job_id} - return_code={return_code}"
)
else:
task_status = QueryTaskStatus.SUCCEEDED
logger.info(f"IR extraction task {task_id} completed for job {job_id}")

clo_log_file.close()
duration = (datetime.datetime.now() - start_time).total_seconds()

update_query_task_metadata(
sql_adapter, task_id, dict(status=task_status, start_time=start_time, duration=duration)
)

return generate_final_task_results(task_id, task_status, duration, clo_log_path)
Loading