Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for extracting JSON streams from archives. #569

Merged
merged 40 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
97bcbd2
Make it work with a lot of duplications
haiqi96 Oct 30, 2024
db1bf0a
deduplication the code in task launching scripts
haiqi96 Oct 30, 2024
aea5630
Remove redundant task for json. Combine them into one
haiqi96 Oct 30, 2024
d0e3b12
Linter
haiqi96 Oct 30, 2024
6af5d5a
refactor
haiqi96 Oct 30, 2024
496e8d8
Linter
haiqi96 Oct 31, 2024
c4e118d
refactor
haiqi96 Nov 1, 2024
a75c42d
proposal
haiqi96 Nov 2, 2024
66fb365
Add actual refactor
haiqi96 Nov 4, 2024
310f0d5
Linter
haiqi96 Nov 4, 2024
069f568
deduplication
haiqi96 Nov 4, 2024
30c03e3
Decompressor end renaming
haiqi96 Nov 4, 2024
acb0237
Renaming for task and scheduler
haiqi96 Nov 4, 2024
5f12b38
Mass renaming
haiqi96 Nov 4, 2024
3c23fd2
Renaming for webui
haiqi96 Nov 5, 2024
be75c9c
missing change
haiqi96 Nov 5, 2024
383781a
polishing
haiqi96 Nov 5, 2024
9518074
Update comments
haiqi96 Nov 5, 2024
ebe6395
fixes
haiqi96 Nov 5, 2024
7b79dff
fixes
haiqi96 Nov 7, 2024
7498c69
fixes2
haiqi96 Nov 7, 2024
cc9b5c2
update webui part
haiqi96 Nov 8, 2024
cda0fd2
Merge branch 'main' into json_extraction_temp
haiqi96 Nov 12, 2024
3c30054
Apply suggestions from code review
haiqi96 Nov 15, 2024
90e55d5
Address code review suggestions
haiqi96 Nov 15, 2024
63a18d7
Address more code review suggestions
haiqi96 Nov 15, 2024
b900553
fix for previous commit
haiqi96 Nov 15, 2024
6b5e60f
Apply suggestions from code review
haiqi96 Nov 15, 2024
ff15840
fix for previous commit (again)
haiqi96 Nov 15, 2024
549d3c4
First refactor
haiqi96 Nov 15, 2024
972cb2b
Further refactor
haiqi96 Nov 15, 2024
5ec988f
Update comments
haiqi96 Nov 15, 2024
e65799e
small polishing
haiqi96 Nov 15, 2024
fb8e2a6
Further polishing
haiqi96 Nov 15, 2024
314e008
Use proper encapsulation for class data member variables
haiqi96 Nov 16, 2024
3d86291
Linter
haiqi96 Nov 16, 2024
4b84812
Apply suggestions from code review
haiqi96 Nov 17, 2024
7a3398f
Address code review
haiqi96 Nov 17, 2024
1d1599e
Fix silly mistakes
haiqi96 Nov 18, 2024
a5eb217
Update comment.
kirkrodrigues Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# CONSTANTS
EXTRACT_FILE_CMD = "x"
EXTRACT_IR_CMD = "i"
EXTRACT_JSON_CMD = "j"

# Paths
CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
dump_container_config,
EXTRACT_FILE_CMD,
EXTRACT_IR_CMD,
EXTRACT_JSON_CMD,
generate_container_config,
generate_container_name,
generate_container_start_cmd,
Expand Down Expand Up @@ -146,11 +147,11 @@ def handle_extract_file_cmd(
return 0


def handle_extract_ir_cmd(
def handle_extract_cmd(
parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
) -> int:
"""
Handles the IR extraction command.
Handles the extraction command.
:param parsed_args:
:param clp_home:
:param default_config_file_path:
Expand All @@ -174,23 +175,34 @@ def handle_extract_ir_cmd(
)

# fmt: off
job_command = parsed_args.command
extract_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.decompress",
"--config", str(generated_config_path_on_container),
EXTRACT_IR_CMD,
str(parsed_args.msg_ix),
job_command
]
# fmt: on
if parsed_args.orig_file_id:
extract_cmd.append("--orig-file-id")
extract_cmd.append(str(parsed_args.orig_file_id))

if EXTRACT_IR_CMD == job_command:
extract_cmd.append(str(parsed_args.msg_ix))
if parsed_args.orig_file_id:
extract_cmd.append("--orig-file-id")
extract_cmd.append(str(parsed_args.orig_file_id))
else:
extract_cmd.append("--orig-file-path")
extract_cmd.append(str(parsed_args.orig_file_path))
if parsed_args.target_uncompressed_size:
extract_cmd.append("--target-uncompressed-size")
extract_cmd.append(str(parsed_args.target_uncompressed_size))
elif EXTRACT_JSON_CMD == job_command:
extract_cmd.append(str(parsed_args.archive_id))
if parsed_args.target_chunk_size:
extract_cmd.append("--target-chunk-size")
extract_cmd.append(str(parsed_args.target_chunk_size))
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
else:
extract_cmd.append("--orig-file-path")
extract_cmd.append(str(parsed_args.orig_file_path))
if parsed_args.target_uncompressed_size:
extract_cmd.append("--target-uncompressed-size")
extract_cmd.append(str(parsed_args.target_uncompressed_size))
logger.exception(f"Unexpected command: {job_command}")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
cmd = container_start_cmd + extract_cmd

try:
Expand Down Expand Up @@ -241,13 +253,20 @@ def main(argv):
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
group.add_argument("--orig-file-path", type=str, help="Original file's path.")

# IR extraction command parser
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size", default=100000
)

parsed_args = args_parser.parse_args(argv[1:])

command = parsed_args.command
if EXTRACT_FILE_CMD == command:
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
elif EXTRACT_IR_CMD == command:
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
elif command in [EXTRACT_IR_CMD, EXTRACT_JSON_CMD]:
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return handle_extract_cmd(parsed_args, clp_home, default_config_file_path)
else:
logger.exception(f"Unexpected command: {command}")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@
from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, CLPConfig, Database
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.job_config import ExtractIrJobConfig
from job_orchestration.scheduler.job_config import (
ExtractIrJobConfig,
ExtractJsonJobConfig,
QueryJobConfig,
)

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
EXTRACT_FILE_CMD,
EXTRACT_IR_CMD,
EXTRACT_JSON_CMD,
get_clp_home,
load_config_file,
)
Expand Down Expand Up @@ -70,42 +75,35 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
return results[0]["orig_file_id"]


def submit_and_monitor_ir_extraction_job_in_db(
def submit_and_monitor_extraction_job_in_db(
db_config: Database,
orig_file_id: str,
msg_ix: int,
target_uncompressed_size: Optional[int],
job_type: QueryJobType,
job_config: QueryJobConfig,
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
) -> int:
"""
Submits an IR extraction job to the scheduler and waits until the job finishes.
Submits an extraction job to the scheduler and waits until the job finishes.
:param db_config:
:param orig_file_id:
:param msg_ix:
:param target_uncompressed_size:
:param job_type:
:param job_config:
:return: 0 on success, -1 otherwise.
"""
extract_ir_config = ExtractIrJobConfig(
orig_file_id=orig_file_id,
msg_ix=msg_ix,
target_uncompressed_size=target_uncompressed_size,
)

sql_adapter = SQL_Adapter(db_config)
job_id = submit_query_job(sql_adapter, extract_ir_config, QueryJobType.EXTRACT_IR)
job_id = submit_query_job(sql_adapter, job_config, job_type)
job_status = wait_for_query_job(sql_adapter, job_id)

if QueryJobStatus.SUCCEEDED == job_status:
logger.info(f"Finished IR extraction job {job_id}.")
logger.info(f"Finished extraction job {job_id}.")
return 0

logger.error(
f"IR extraction job {job_id} finished with unexpected status: {job_status.to_str()}."
)
logger.error(f"Extraction job {job_id} finished with unexpected status: {job_status.to_str()}.")
return -1


def handle_extract_ir_cmd(
parsed_args: argparse.Namespace, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
def handle_extract_cmd(
parsed_args: argparse.Namespace,
job_type: QueryJobType,
clp_home: pathlib.Path,
default_config_file_path: pathlib.Path,
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
) -> int:
"""
Handles the IR extraction command.
Expand All @@ -121,22 +119,35 @@ def handle_extract_ir_cmd(
if clp_config is None:
return -1

orig_file_id: str
if parsed_args.orig_file_id:
orig_file_id = parsed_args.orig_file_id
extraction_config: QueryJobConfig
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
if QueryJobType.EXTRACT_IR == job_type:
orig_file_id: str
if parsed_args.orig_file_id:
orig_file_id = parsed_args.orig_file_id
else:
orig_file_path = parsed_args.orig_file_path
orig_file_id = get_orig_file_id(clp_config.database, parsed_args.orig_file_path)
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
if orig_file_id is None:
logger.error(f"Cannot find orig_file_id corresponding to {orig_file_path}")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return -1
extraction_config = ExtractIrJobConfig(
orig_file_id=orig_file_id,
msg_ix=parsed_args.msg_ix,
target_uncompressed_size=parsed_args.target_uncompressed_size,
)
elif QueryJobType.EXTRACT_JSON == job_type:
extraction_config = ExtractJsonJobConfig(
archive_id=parsed_args.archive_id, target_chunk_size=parsed_args.target_chunk_size
)
else:
orig_file_id = get_orig_file_id(clp_config.database, parsed_args.orig_file_path)
if orig_file_id is None:
return -1

logger.exception(f"Unsupported extraction job type: {job_type}")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
try:
return asyncio.run(
run_function_in_process(
submit_and_monitor_ir_extraction_job_in_db,
submit_and_monitor_extraction_job_in_db,
clp_config.database,
orig_file_id,
parsed_args.msg_ix,
parsed_args.target_uncompressed_size,
job_type,
extraction_config,
)
)
except asyncio.CancelledError:
Expand Down Expand Up @@ -278,13 +289,26 @@ def main(argv):
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
group.add_argument("--orig-file-path", type=str, help="Original file's path.")

# Json extraction command parser
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size.", required=True
)

parsed_args = args_parser.parse_args(argv[1:])

command = parsed_args.command
if EXTRACT_FILE_CMD == command:
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
elif EXTRACT_IR_CMD == command:
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
return handle_extract_cmd(
parsed_args, QueryJobType.EXTRACT_IR, clp_home, default_config_file_path
)
elif EXTRACT_JSON_CMD == command:
return handle_extract_cmd(
parsed_args, QueryJobType.EXTRACT_JSON, clp_home, default_config_file_path
)
else:
logger.exception(f"Unexpected command: {command}")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
report_command_creation_failure,
run_query_task,
)
from job_orchestration.scheduler.job_config import ExtractIrJobConfig
from job_orchestration.scheduler.job_config import ExtractIrJobConfig, ExtractJsonJobConfig
from job_orchestration.scheduler.scheduler_data import QueryTaskStatus

# Setup logging
Expand All @@ -26,11 +26,13 @@ def make_command(
archives_dir: Path,
archive_id: str,
ir_output_dir: Path,
extract_ir_config: ExtractIrJobConfig,
job_config_obj: dict,
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
results_cache_uri: str,
ir_collection: str,
) -> Optional[List[str]]:
if StorageEngine.CLP == storage_engine:
logger.info("Start IR extraction")
extract_ir_config = ExtractIrJobConfig.parse_obj(job_config_obj)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling when parsing job configuration objects

When parsing job_config_obj into ExtractIrJobConfig and ExtractJsonJobConfig, consider handling potential ValidationError exceptions that Pydantic may raise if the input data is invalid. Implementing try-except blocks will enhance the robustness of your code by preventing unhandled exceptions.

Also applies to: 53-53

if not extract_ir_config.file_split_id:
logger.error("file_split_id not supplied")
return None
Expand All @@ -46,6 +48,25 @@ def make_command(
if extract_ir_config.target_uncompressed_size is not None:
command.append("--target-size")
command.append(str(extract_ir_config.target_uncompressed_size))
elif StorageEngine.CLP_S == storage_engine:
logger.info("Start Json extraction")
extract_json_config = ExtractJsonJobConfig.parse_obj(job_config_obj)
command = [
str(clp_home / "bin" / "clp-s"),
"x",
str(archives_dir),
str(ir_output_dir),
"--ordered",
"--archive-id",
archive_id,
"--mongodb-uri",
results_cache_uri,
"--mongodb-collection",
ir_collection,
]
if extract_json_config.target_chunk_size is not None:
command.append("--ordered-chunk-size")
command.append(str(extract_json_config.target_chunk_size))
else:
logger.error(f"Unsupported storage engine {storage_engine}")
return None
Expand All @@ -63,7 +84,7 @@ def extract_ir(
clp_metadata_db_conn_params: dict,
results_cache_uri: str,
) -> Dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Rename the extract_ir function to reflect its generalized purpose

Since the extract_ir function now handles both IR and JSON extraction tasks, consider renaming it to reflect its broader functionality. A name like extract_data or perform_extraction would more accurately describe the function's purpose.

task_name = "IR extraction"
task_name = "Extraction"

# Setup logging to file
clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
Expand All @@ -82,15 +103,14 @@ def extract_ir(
clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE")
ir_output_dir = Path(os.getenv("CLP_IR_OUTPUT_DIR"))
ir_collection = os.getenv("CLP_IR_COLLECTION")
extract_ir_config = ExtractIrJobConfig.parse_obj(job_config_obj)

task_command = make_command(
storage_engine=clp_storage_engine,
clp_home=clp_home,
archives_dir=archive_directory,
archive_id=archive_id,
ir_output_dir=ir_output_dir,
extract_ir_config=extract_ir_config,
job_config_obj=job_config_obj,
results_cache_uri=results_cache_uri,
ir_collection=ir_collection,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def to_str(self) -> str:
class QueryJobType(IntEnum):
SEARCH_OR_AGGREGATION = 0
EXTRACT_IR = auto()
EXTRACT_JSON = auto()

def __str__(self) -> str:
return str(self.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class ExtractIrJobConfig(QueryJobConfig):
target_uncompressed_size: typing.Optional[int] = None


class ExtractJsonJobConfig(QueryJobConfig):
archive_id: str
target_chunk_size: typing.Optional[int] = None


class SearchJobConfig(QueryJobConfig):
query_string: str
max_num_results: int
Expand Down
Loading