-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(clp-package): Add support for extracting JSON streams from archives. #569
Conversation
WalkthroughThe pull request introduces significant changes across multiple files to support JSON extraction alongside existing IR extraction functionalities. Key modifications include the addition of new constants, classes, and methods, as well as renaming attributes and functions to reflect a shift from "IR" to "stream" terminology. These updates enhance command handling, job configuration, and overall flexibility in data extraction processes, ensuring that both IR and JSON types can be processed effectively. Changes
Possibly related PRs
Suggested reviewers
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
🧹 Outside diff range and nitpick comments (7)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
75-75
: Consider adding docstring to document the new enum value.Since this is a new feature for JSON extraction, it would be helpful to add documentation explaining its purpose and usage.
Add a docstring to the enum class:
class QueryJobType(IntEnum): + """Enum representing different types of query jobs. + + SEARCH_OR_AGGREGATION: Standard search or aggregation query + EXTRACT_IR: Extract IR (Intermediate Representation) from logs + EXTRACT_JSON: Extract JSON content from logs + """ SEARCH_OR_AGGREGATION = 0 EXTRACT_IR = auto() EXTRACT_JSON = auto()components/job-orchestration/job_orchestration/scheduler/job_config.py (2)
52-55
: Add class documentation and field validation.The class implementation looks good, but could benefit from these improvements:
- Add a docstring describing the purpose of the class and its attributes
- Consider adding validation for
archive_id
to ensure it's not empty- Consider adding bounds validation for
target_chunk_size
when presentHere's a suggested implementation:
class ExtractJsonJobConfig(QueryJobConfig): + """Configuration for JSON extraction jobs. + + Attributes: + archive_id: Unique identifier for the archive to extract from + target_chunk_size: Optional size limit for extracted chunks in bytes + """ archive_id: str target_chunk_size: typing.Optional[int] = None + + @validator("archive_id") + def validate_archive_id(cls, v): + if not v.strip(): + raise ValueError("archive_id cannot be empty") + return v + + @validator("target_chunk_size") + def validate_chunk_size(cls, v): + if v is not None and v <= 0: + raise ValueError("target_chunk_size must be positive when specified") + return v
55-55
: Remove extra blank line.There's an unnecessary extra blank line after the class definition. Maintain consistent spacing with other classes in the file.
class ExtractJsonJobConfig(QueryJobConfig): archive_id: str target_chunk_size: typing.Optional[int] = None -
components/job-orchestration/job_orchestration/scheduler/scheduler_data.py (1)
71-79
: Consider adding class documentationPlease add a docstring to describe:
- The purpose of this job type
- Expected configuration parameters
- Any specific behaviour or constraints
Example addition:
class ExtractJsonJob(QueryJob): + """Represents a job for extracting JSON data from archives. + + This job type handles the extraction of JSON-formatted data using the specified + configuration parameters from ExtractJsonJobConfig. + """ extract_json_config: ExtractJsonJobConfigcomponents/job-orchestration/job_orchestration/executor/query/extract_ir_task.py (1)
34-34
: Standardize the capitalization in log messagesConsider standardizing the capitalization of 'JSON' in your log messages to align with common conventions. Use 'Start JSON extraction' instead of 'Start Json extraction' for consistency and clarity.
Also applies to: 52-52
components/clp-package-utils/clp_package_utils/scripts/decompress.py (1)
Line range hint
167-167
: Use the appropriateJobType
when generating the container name.At line 167, the
container_name
is generated usingJobType.IR_EXTRACTION
regardless of the actual job type. This may cause confusion or incorrect container labelling when handling JSON extraction commands.Modify the code to select the correct
JobType
based on thejob_command
. For example:-container_name = generate_container_name(JobType.IR_EXTRACTION) +if job_command == EXTRACT_IR_CMD: + job_type = JobType.IR_EXTRACTION +elif job_command == EXTRACT_JSON_CMD: + job_type = JobType.JSON_EXTRACTION +else: + logger.exception(f"Unexpected command: {job_command}") + return -1 + +container_name = generate_container_name(job_type)Ensure that
JobType.JSON_EXTRACTION
is defined inclp_package_utils.general
.components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (1)
940-952
: Address the TODO: improve variable names for better clarityThere's a TODO comment suggesting finding better names for
waiting_jobs_map
andjob_key
. Improving these names would enhance code readability and maintainability.Would you like assistance in proposing more descriptive variable names? I can help generate suggestions or open a GitHub issue to track this task.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
components/clp-package-utils/clp_package_utils/general.py
(1 hunks)components/clp-package-utils/clp_package_utils/scripts/decompress.py
(4 hunks)components/clp-package-utils/clp_package_utils/scripts/native/decompress.py
(4 hunks)components/job-orchestration/job_orchestration/executor/query/extract_ir_task.py
(5 hunks)components/job-orchestration/job_orchestration/scheduler/constants.py
(1 hunks)components/job-orchestration/job_orchestration/scheduler/job_config.py
(1 hunks)components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(14 hunks)components/job-orchestration/job_orchestration/scheduler/scheduler_data.py
(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- components/clp-package-utils/clp_package_utils/general.py
🔇 Additional comments (10)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
75-75
: The enum addition looks good, let's verify its usage.
The addition of EXTRACT_JSON
to QueryJobType
follows the correct pattern and maintains enum value consistency.
Let's verify the implementation of this new job type across the codebase:
✅ Verification successful
The EXTRACT_JSON job type is properly integrated across the codebase
The verification shows comprehensive implementation of the EXTRACT_JSON functionality:
- Properly defined in job orchestration constants
- Fully implemented job config class
ExtractJsonJobConfig
with required fields - Integrated into the query scheduler with proper handling
- Command-line interface support in CLP package utils
- Consistent error handling and job state management
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the implementation of EXTRACT_JSON job type
# Expected: Find corresponding handler implementations and configurations
# Check for job type usage
echo "Checking for EXTRACT_JSON usage:"
rg "EXTRACT_JSON" -A 3
# Look for related JSON extraction implementations
echo -e "\nChecking for JSON extraction related implementations:"
rg "ExtractJson(Job|Config)" -A 3
Length of output: 12690
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
52-54
: Verify integration with dependent components.
The class looks well-structured, but let's verify its integration with other components.
✅ Verification successful
Integration verification completed successfully
The ExtractJsonJobConfig
class is properly integrated across the codebase:
- Correctly defined in
QueryJobType
enum asEXTRACT_JSON
- Properly used in query scheduler for job handling
- Consistently used in decompress utilities with correct configuration parameters
- Well-integrated with the job orchestration system
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the usage and integration of ExtractJsonJobConfig
# Test 1: Check for consistent usage in decompress.py
echo "Checking decompress.py usage patterns..."
rg -A 5 "ExtractJsonJobConfig" "components/clp-package-utils/clp_package_utils/scripts/native/decompress.py"
# Test 2: Verify QueryJobType enum includes EXTRACT_JSON
echo "Verifying QueryJobType enum..."
rg -A 10 "class QueryJobType" "components/job-orchestration/job_orchestration/scheduler/query/"
Length of output: 746
Script:
#!/bin/bash
# Let's try a different approach to verify the integration
# Check for QueryJobType enum in the entire codebase
echo "Searching for QueryJobType enum definition..."
rg -A 10 "class QueryJobType.*Enum"
# Check all imports and usages of ExtractJsonJobConfig
echo "Checking ExtractJsonJobConfig usages..."
rg "ExtractJsonJobConfig" -A 3
# Check for any job type constants or enums related to EXTRACT_JSON
echo "Checking for EXTRACT_JSON job type..."
rg "EXTRACT_JSON" -B 2 -A 2
Length of output: 14013
components/job-orchestration/job_orchestration/scheduler/scheduler_data.py (2)
14-14
: Import addition looks good!
The ExtractJsonJobConfig import follows the established pattern and is properly grouped with related job configuration imports.
71-79
: Implementation is correct and well-structured!
The ExtractJsonJob class properly implements the QueryJob interface and follows the established patterns seen in ExtractIrJob.
components/clp-package-utils/clp_package_utils/scripts/native/decompress.py (1)
147-150
: Ensure 'extraction_config' is initialized before use
If an unsupported job type is passed, extraction_config
might be undefined when calling submit_and_monitor_extraction_job_in_db
. Verify that extraction_config
is always initialized before it is used.
Run the following script to check for potential use of uninitialized variables:
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (5)
45-49
: Correctly imported ExtractJsonJobConfig
The addition of ExtractJsonJobConfig
to the imports ensures proper configuration handling for JSON extraction jobs.
58-58
: Added ExtractJsonJob to scheduler imports
Including ExtractJsonJob
in the imports allows the scheduler to manage JSON extraction jobs appropriately.
74-75
: Initialization of active_archive_json_extractions
The new dictionary active_archive_json_extractions
is properly initialized to track JSON extraction jobs and their associated waiting jobs.
Line range hint 380-389
: Ensure correct task function is used for EXTRACT_JSON jobs
The code dispatches both EXTRACT_JSON
and EXTRACT_IR
job types using the extract_ir.s
task function. Please verify if extract_ir.s
is appropriate for EXTRACT_JSON
jobs or if a separate task function should be used.
803-808
: Verify use of 'ir_collection' when checking for JSON files
In json_file_exists_for_archive
, the function checks the ir_collection
for orig_file_id
matching archive_id
to determine if the JSON file exists. Please verify if accessing ir_collection
is appropriate for JSON files or if a different collection should be used.
results_cache_uri: str, | ||
ir_collection: str, | ||
) -> Optional[List[str]]: | ||
if StorageEngine.CLP == storage_engine: | ||
logger.info("Start IR extraction") | ||
extract_ir_config = ExtractIrJobConfig.parse_obj(job_config_obj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling when parsing job configuration objects
When parsing job_config_obj
into ExtractIrJobConfig
and ExtractJsonJobConfig
, consider handling potential ValidationError
exceptions that Pydantic may raise if the input data is invalid. Implementing try-except blocks will enhance the robustness of your code by preventing unhandled exceptions.
Also applies to: 53-53
@@ -63,7 +84,7 @@ def extract_ir( | |||
clp_metadata_db_conn_params: dict, | |||
results_cache_uri: str, | |||
) -> Dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Rename the extract_ir
function to reflect its generalized purpose
Since the extract_ir
function now handles both IR and JSON extraction tasks, consider renaming it to reflect its broader functionality. A name like extract_data
or perform_extraction
would more accurately describe the function's purpose.
components/job-orchestration/job_orchestration/executor/query/extract_ir_task.py
Outdated
Show resolved
Hide resolved
components/clp-package-utils/clp_package_utils/scripts/decompress.py
Outdated
Show resolved
Hide resolved
components/clp-package-utils/clp_package_utils/scripts/native/decompress.py
Show resolved
Hide resolved
components/clp-package-utils/clp_package_utils/scripts/native/decompress.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
4a06a8d
to
be75c9c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (13)
components/clp-py-utils/clp_py_utils/create-results-cache-indices.py (1)
31-35
: Consider enhancing index creation with monitoring and performance options.While the index creation logic is sound, consider these improvements:
- Add success logging for better operational monitoring
- Consider adding index options like
background: true
for large collectionsHere's a suggested enhancement:
try: with MongoClient(results_cache_uri) as results_cache_client: stream_collection = results_cache_client.get_default_database()[stream_collection_name] - file_split_id_index = IndexModel(["file_split_id"]) - orig_file_id_index = IndexModel(["orig_file_id", "begin_msg_ix", "end_msg_ix"]) - stream_collection.create_indexes([file_split_id_index, orig_file_id_index]) + file_split_id_index = IndexModel(["file_split_id"], background=True) + orig_file_id_index = IndexModel( + ["orig_file_id", "begin_msg_ix", "end_msg_ix"], + background=True + ) + stream_collection.create_indexes([file_split_id_index, orig_file_id_index]) + logger.info("Successfully created indices for stream collection")components/log-viewer-webui/server/src/app.js (1)
40-40
: Consider using consistent casing for the property name.The property name
StreamFilesCollectionName
uses PascalCase, which is inconsistent with JavaScript/Node.js conventions where object properties typically use camelCase. Consider renaming it tostreamFilesCollectionName
.- StreamFilesCollectionName: settings.MongoDbStreamFilesCollectionName, + streamFilesCollectionName: settings.MongoDbStreamFilesCollectionName,components/log-viewer-webui/server/src/DbManager.js (3)
87-87
: Follow JavaScript naming conventions for private fieldsThe private field name
#StreamFilesCollection
should follow JavaScript camelCase naming conventions for variables and private fields.Apply this change:
- #StreamFilesCollection; + #streamFilesCollection;
Line range hint
141-145
: Update method name and documentation to use Stream terminologyThe method name and documentation still reference "Ir"/"IR" while the implementation uses the new Stream terminology. This inconsistency should be addressed for clarity and maintainability.
Consider these changes:
- Rename the method from
getExtractedIrFileMetadata
togetExtractedStreamFileMetadata
- Update the JSDoc comment to replace "IR file" with "Stream file"
- * Gets the metadata for an IR file extracted from part of an original file, where the original + * Gets the metadata for a Stream file extracted from part of an original file, where the original * file has the given ID and the extracted part contains the given log event index. * * @param {string} origFileId * @param {number} logEventIdx * @return {Promise<object>} A promise that resolves to the extracted IR file's metadata. */ - async getExtractedIrFileMetadata (origFileId, logEventIdx) { + async getExtractedStreamFileMetadata (origFileId, logEventIdx) {
Line range hint
180-191
: Improve parameter naming and code styleTwo suggestions for improvement:
- The parameter name should follow camelCase convention
- The collection assignment can be more concise
Apply these changes:
- * @param {string} config.StreamFilesCollectionName + * @param {string} config.streamFilesCollectionName */ #initMongo (config) { this.#fastify.register(fastifyMongo, { forceClose: true, url: `mongodb://${config.host}:${config.port}/${config.database}`, }).after((err) => { if (err) { throw err; } - this.#StreamFilesCollection = - this.#fastify.mongo.db.collection(config.StreamFilesCollectionName); + this.#streamFilesCollection = this.#fastify.mongo.db.collection(config.streamFilesCollectionName); }); }components/clp-package-utils/clp_package_utils/scripts/decompress.py (2)
Line range hint
169-171
: Update JobType for JSON extraction.The container name is generated using
JobType.IR_EXTRACTION
even when handling JSON extraction commands. Consider adding a new job type for JSON extraction or using a more generic type.- container_name = generate_container_name(JobType.IR_EXTRACTION) + container_name = generate_container_name( + JobType.IR_EXTRACTION if job_command == EXTRACT_IR_CMD else JobType.JSON_EXTRACTION + )
256-261
: Enhance JSON extraction parser configuration.Consider the following improvements:
- Move the default chunk size to a named constant for better maintainability
- Enhance the help text to provide more context about the parameters
+DEFAULT_JSON_CHUNK_SIZE = 100000 # Add at the top with other constants + # Json extraction command parser json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD) - json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID") + json_extraction_parser.add_argument("archive_id", type=str, help="Unique identifier for the archive to extract JSON from") json_extraction_parser.add_argument( "--target-chunk-size", type=int, - help="Target chunk size", - default=100000 + help="Target size in bytes for each JSON chunk during extraction", + default=DEFAULT_JSON_CHUNK_SIZE )components/clp-package-utils/clp_package_utils/scripts/native/decompress.py (1)
Line range hint
102-160
: Consider improving error handling and variable declarationsWhile the implementation is functionally correct, consider these improvements:
- Move variable declarations (
extraction_config
andjob_type
) closer to where they're first used- Add more specific error messages for unsupported commands
- extraction_config: QueryJobConfig - job_type: QueryJobType if EXTRACT_IR_CMD == command: - job_type = QueryJobType.EXTRACT_IR + job_type: QueryJobType = QueryJobType.EXTRACT_IR orig_file_id: str if parsed_args.orig_file_id: orig_file_id = parsed_args.orig_file_id else: orig_file_path = parsed_args.orig_file_path orig_file_id = get_orig_file_id(clp_config.database, parsed_args.orig_file_path) if orig_file_id is None: logger.error(f"Cannot find orig_file_id corresponding to {orig_file_path}") return -1 - extraction_config = ExtractIrJobConfig( + extraction_config: QueryJobConfig = ExtractIrJobConfig( orig_file_id=orig_file_id, msg_ix=parsed_args.msg_ix, target_uncompressed_size=parsed_args.target_uncompressed_size, ) elif EXTRACT_JSON_CMD == command: - job_type = QueryJobType.EXTRACT_JSON - extraction_config = ExtractJsonJobConfig( + job_type: QueryJobType = QueryJobType.EXTRACT_JSON + extraction_config: QueryJobConfig = ExtractJsonJobConfig( archive_id=parsed_args.archive_id, target_chunk_size=parsed_args.target_chunk_size ) else: - logger.exception(f"Unsupported stream extraction command: {command}") + logger.error(f"Unsupported stream extraction command: {command}. Supported commands: {EXTRACT_IR_CMD}, {EXTRACT_JSON_CMD}") return -1components/clp-py-utils/clp_py_utils/clp_config.py (1)
Line range hint
348-367
: Add docstring to explain the purpose of StreamOutput classWhile the implementation is solid, consider adding a docstring to explain:
- The purpose of this class in relation to stream extraction
- The significance of the target_uncompressed_size parameter
- The relationship between IR and JSON extraction outputs
Add this docstring:
class StreamOutput(BaseModel): + """Configures output settings for stream extraction operations (IR and JSON). + + Attributes: + directory: Path where extracted stream files will be stored + target_uncompressed_size: Target size in bytes for uncompressed output files + """ directory: pathlib.Path = pathlib.Path("var") / "data" / "stream" target_uncompressed_size: int = 128 * 1024 * 1024components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (3)
291-306
: Enhance error handling and logging in stream extraction ID retrieval.The function should provide more detailed error information when the archive or target IDs cannot be retrieved.
Apply this diff to improve error handling:
def get_archive_and_target_ids_for_stream_extraction( db_conn, job_config: Dict[str, Any], job_type: QueryJobType ) -> Tuple[Optional[str], Optional[str]]: if QueryJobType.EXTRACT_IR == job_type: extract_ir_config = ExtractIrJobConfig.parse_obj(job_config) return get_archive_and_file_split_ids_for_ir_extraction(db_conn, extract_ir_config) extract_json_config = ExtractJsonJobConfig.parse_obj(job_config) archive_id = extract_json_config.archive_id if check_if_archive_exists(db_conn, extract_json_config.archive_id): return archive_id, archive_id else: - logger.error(f"archive {archive_id} does not exist") + logger.error( + f"Archive {archive_id} does not exist. Job config: {extract_json_config}" + ) return None, None
Line range hint
630-708
: Reduce code duplication in stream extraction job handling.The job handling logic for IR and JSON extraction contains duplicated code blocks for status updates and logging. Consider extracting common functionality into helper methods.
Create a helper method for status updates:
+def update_job_status( + db_conn, + job_id: str, + status: QueryJobStatus, + prev_status: QueryJobStatus, + start_time: datetime.datetime, + num_tasks: int = 0, + duration: float = 0, +) -> bool: + """Helper method to update job status with consistent logging.""" + success = set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + status, + prev_status, + start_time=start_time, + num_tasks=num_tasks, + duration=duration, + ) + if not success: + logger.error(f"Failed to set job {job_id} as {status.to_str()}") + return success elif job_type in [QueryJobType.EXTRACT_IR, QueryJobType.EXTRACT_JSON]: archive_id, target_id = get_archive_and_target_ids_for_stream_extraction( db_conn, job_config, job_type ) if not target_id: - if not set_job_or_task_status( + if not update_job_status( db_conn, job_id, - QueryJobStatus.FAILED, - QueryJobStatus.PENDING, - start_time=datetime.datetime.now(), - num_tasks=0, - duration=0, - ): - logger.error(f"Failed to set job {job_id} as failed") + status=QueryJobStatus.FAILED, + prev_status=QueryJobStatus.PENDING, + start_time=datetime.datetime.now() + ): continue
Line range hint
867-912
: Improve type hints and error handling in stream extraction completion handler.The function could benefit from more specific type hints and better error handling for edge cases.
Apply this diff to improve the code:
async def handle_finished_stream_extraction_job( - db_conn, job: QueryJob, task_results: List[Any] + db_conn, job: Union[ExtractIrJob, ExtractJsonJob], task_results: List[Dict[str, Any]] ) -> None: global active_jobs global active_archive_json_extractions global active_file_split_ir_extractions job_id = job.id new_job_status = QueryJobStatus.SUCCEEDED num_tasks = len(task_results) if 1 != num_tasks: + task_ids = [str(result.get('task_id', 'unknown')) for result in task_results] logger.error( - f"Unexpected number of tasks for extraction job {job_id}. " - f"Expected 1, got {num_tasks}." + f"Unexpected number of tasks for extraction job {job_id}. Expected 1, got {num_tasks}. " + f"Task IDs: {', '.join(task_ids)}" ) new_job_status = QueryJobStatus.FAILEDcomponents/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (1)
34-35
: Consistency in Logging MessagesConsider updating the log message to "Starting IR extraction" for grammatical consistency and clarity.
Apply this diff to adjust the log message:
-logger.info("Start IR extraction") +logger.info("Starting IR extraction")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (14)
components/clp-package-utils/clp_package_utils/general.py
(4 hunks)components/clp-package-utils/clp_package_utils/scripts/decompress.py
(4 hunks)components/clp-package-utils/clp_package_utils/scripts/native/decompress.py
(4 hunks)components/clp-package-utils/clp_package_utils/scripts/start_clp.py
(5 hunks)components/clp-py-utils/clp_py_utils/clp_config.py
(7 hunks)components/clp-py-utils/clp_py_utils/create-results-cache-indices.py
(1 hunks)components/job-orchestration/job_orchestration/executor/query/celeryconfig.py
(1 hunks)components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py
(6 hunks)components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(19 hunks)components/log-viewer-webui/server/settings.json
(1 hunks)components/log-viewer-webui/server/src/DbManager.js
(4 hunks)components/log-viewer-webui/server/src/app.js
(1 hunks)components/log-viewer-webui/server/src/routes/static.js
(1 hunks)components/package-template/src/etc/clp-config.yml
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- components/clp-package-utils/clp_package_utils/general.py
🔇 Additional comments (33)
components/log-viewer-webui/server/settings.json (2)
9-9
: Verify MongoDB collection migration requirements.
The collection name change from "ir-files" to "stream-files" may require data migration for existing deployments.
#!/bin/bash
# Check if there are any references to the old collection name that need updating
rg -l "ir-files" --type js --type py
Consider adding a migration script to handle existing data in the "ir-files" collection if needed.
12-12
: Ensure proper stream directory setup.
The path change from "ir" to "stream" requires proper directory setup in new and existing deployments.
Consider:
- Adding directory creation in deployment scripts
- Documenting the path change in upgrade guides
- Ensuring proper permissions are set for the new directory
✅ Verification successful
Let's search more broadly for directory setup patterns to verify the implementation details and documentation.
Let's examine the configuration files and setup scripts to understand how the directory structure is managed.
Let's verify if the directory creation is handled in the startup or installation scripts.
Let's examine these Python scripts to verify if they handle the directory creation during startup.
Directory setup is properly handled by the system
After examining the codebase, I can confirm that:
- The
start_clp.py
script automatically creates all required directories including the stream directory (var/data/stream
) during system startup - Directory creation is handled through
mkdir(parents=True, exist_ok=True)
calls - Proper permissions are set through the container user management system
The path change from "ir" to "stream" is safely managed by the existing infrastructure and requires no additional setup in deployments.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining references to the old directory path
rg -l "var/data/ir" --type js --type py --type sh
Length of output: 36
Script:
#!/bin/bash
# Search for directory references in deployment or setup scripts
fd -e sh -e py setup install deploy | xargs rg -l "var/data"
# Search for any documentation files mentioning directory paths
fd -e md -e txt | xargs rg -l "var/data"
# Check for any configuration files that might reference these paths
fd -e json -e yaml -e conf | xargs rg -l "var/data"
Length of output: 450
Script:
#!/bin/bash
# Let's examine the content of the config files we found
cat components/log-viewer-webui/server/settings.json
cat components/package-template/src/etc/clp-config.yml
# Check for any installation or setup scripts in the package template
fd -e sh -e py . components/package-template/src/
Length of output: 3262
Script:
#!/bin/bash
# Let's examine the startup scripts to see if they handle directory creation
cat components/package-template/src/sbin/start-clp.sh
# Check if there are any installation or setup scripts in the root of the project
fd -e sh -e py setup install init | xargs rg -l "mkdir"
Length of output: 2144
Script:
#!/bin/bash
# Let's check the content of the start_clp.py script which might handle directory creation
cat components/clp-package-utils/clp_package_utils/scripts/start_clp.py
# Also check the package utils for directory setup
cat components/clp-package-utils/clp_package_utils/scripts/native/compress.py
Length of output: 51358
components/job-orchestration/job_orchestration/executor/query/celeryconfig.py (2)
12-12
: LGTM: Task route updated to match new stream extraction pattern
The task route has been properly updated to use extract_stream_task.extract_stream
, maintaining the correct queue assignment to QueueName.QUERY
.
Let's verify the task registration:
#!/bin/bash
# Description: Verify the task registration is properly defined
# Expected: The new task should be properly defined in the extract_stream_task module
# Check for the task definition
ast-grep --pattern 'def extract_stream($$$)'
7-7
: LGTM: Import statement correctly updated for stream extraction
The change from extract_ir_task
to extract_stream_task
aligns with the PR's objective of refactoring extraction tasks to support both IR and JSON extraction under a unified "stream" concept.
Let's verify this change is consistent across the codebase:
✅ Verification successful
Import change is properly integrated and consistent
The verification confirms that:
- The
extract_stream_task.py
module exists in the correct location - All references to the task module are using the new
extract_stream_task
name - The task is properly imported in both the celery config and scheduler
- No legacy references to
extract_ir_task
remain in the codebase
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the module renaming is consistent
# Expected: No references to the old module name should exist
# Check for any remaining references to the old module name
rg "extract_ir_task" --type py
Length of output: 46
Script:
#!/bin/bash
# Let's verify the task module existence and its references
fd "extract_stream_task.py" --type f
# Check for any imports or references to extract_stream_task
rg "extract_stream_task" --type py
# Look for celery task definitions related to stream extraction
ast-grep --pattern 'app.task(name="extract_stream")'
Length of output: 681
components/clp-py-utils/clp_py_utils/create-results-cache-indices.py (1)
27-27
: LGTM! Variable renaming is consistent.
The variable renaming maintains consistency with the new stream-oriented terminology.
components/log-viewer-webui/server/src/app.js (1)
40-40
: Verify MongoDB collection name changes.
The change from irFilesCollectionName
to StreamFilesCollectionName
is part of the IR to Stream renaming. Let's verify this change is consistent across the codebase.
✅ Verification successful
MongoDB collection name changes are consistent across the codebase
The verification shows that:
- No references to the old
irfiles
naming pattern were found - All occurrences consistently use the new
StreamFiles
naming pattern - The settings.json correctly defines
MongoDbStreamFilesCollectionName
as "stream-files" - The change is properly reflected in both configuration and code files
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the MongoDB collection name changes are consistent
# Test 1: Check for any remaining references to the old collection name
rg -i 'irfiles(collection|dir)'
# Test 2: Check the new collection name usage
rg -i 'streamfiles(collection|dir)'
# Test 3: Verify settings.json has the updated property
cat components/log-viewer-webui/server/settings.json
Length of output: 2163
components/log-viewer-webui/server/src/routes/static.js (1)
25-28
: Consider updating the route prefix to match new terminology
The route prefix "/ir" might be misleading as we're moving away from IR-specific terminology to a more generic "stream" concept. This could affect API consistency and documentation.
Let's verify if this route is referenced elsewhere:
Consider updating the prefix to "/stream" to maintain consistency with the new terminology, but ensure all clients are updated accordingly.
components/package-template/src/etc/clp-config.yml (2)
86-87
: Validate stream output configuration impact.
The new stream_output
section correctly replaces the old ir_output
section. The directory path and configuration structure look appropriate.
Let's verify the deployment impact:
#!/bin/bash
# Description: Check for references to the old and new output paths
# Test: Search for references to ensure consistent updates
rg -g '!*.{log,md}' "(var/data/ir|var/data/stream)"
50-50
: Verify collection renaming migration strategy.
The change from ir_collection_name
to stream_collection_name
looks good, but ensure there's a migration strategy for existing deployments.
Let's check for any migration scripts or related changes:
components/log-viewer-webui/server/src/DbManager.js (1)
Line range hint 1-236
: Consider completing the IR to Stream terminology transition
The codebase is transitioning from IR-specific to Stream-generic terminology, but some references remain unchanged:
QUERY_JOB_TYPE.EXTRACT_IR
constantsubmitAndWaitForExtractIrJob
method name and implementation
This partial transition might cause confusion and maintenance issues.
Let's verify the usage of IR terminology in the codebase:
Consider creating a tracking issue to:
- Document all places where IR terminology needs to be updated
- Plan the migration strategy
- Update all references consistently
- Update related tests and documentation
components/clp-package-utils/clp_package_utils/scripts/decompress.py (3)
17-17
: LGTM! Import addition is well-placed.
The EXTRACT_JSON_CMD import is correctly grouped with related command imports.
Line range hint 150-162
: LGTM! Function rename reflects broader purpose.
The rename from handle_extract_ir_cmd
to handle_extract_stream_cmd
better represents its expanded functionality to handle both IR and JSON extraction.
268-269
: LGTM! Command handling is consistent.
The addition of JSON extraction command handling follows the established pattern and maintains consistency with the existing code structure.
components/clp-package-utils/clp_package_utils/scripts/native/decompress.py (5)
15-19
: LGTM: Import and constant additions are appropriate
The new imports and constant additions properly support the JSON extraction functionality.
Also applies to: 25-25
78-81
: LGTM: Function signature properly generalized
The updated signature appropriately supports multiple extraction job types through the generic job_type
and job_config
parameters.
Also applies to: 84-87
91-99
: LGTM: Job submission and monitoring logic properly updated
The changes maintain robust error handling while supporting multiple job types.
298-304
: LGTM: JSON extraction parser properly configured
The parser is well-structured with clear help messages and appropriate required flags.
310-311
: LGTM: Command routing properly updated
The changes correctly route both IR and JSON extraction commands to the stream extraction handler.
components/clp-py-utils/clp_py_utils/clp_config.py (2)
273-273
: LGTM: Consistent renaming from IR to stream collection
The renaming from ir_collection_name
to stream_collection_name
aligns well with the PR's objective to support both IR and JSON extraction under the unified "stream" terminology.
Also applies to: 287-292
430-430
: Verify stream output directory creation in deployment
The integration of StreamOutput into CLPConfig looks correct. However, ensure that:
- The stream output directory is properly created during deployment
- Appropriate permissions are set for the directory
Also applies to: 440-440, 460-464, 533-533
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (6)
289-289
: LGTM: Collection name parameter updated consistently.
The change from --ir-collection
to --stream-collection
aligns with the broader refactoring to support both IR and JSON extraction under the stream terminology.
663-666
: LGTM: Environment variables updated consistently.
The environment variables have been correctly updated from IR-specific to stream-specific naming:
CLP_STREAM_OUTPUT_DIR
replaces IR-specific output directoryCLP_STREAM_COLLECTION
replaces IR-specific collection name
713-713
: LGTM: Directory creation updated consistently.
The change from IR output directory to stream output directory maintains consistency with the new terminology.
936-938
: LGTM: MongoDB settings updated consistently.
The MongoDB-related settings have been properly updated to use stream terminology:
MongoDbStreamFilesCollectionName
replaces IR-specific collection nameStreamFilesDir
replaces IR-specific directory path
964-964
: LGTM: Mount configuration updated consistently.
The mount configuration has been updated from IR output directory to stream output directory, maintaining consistency with the new terminology.
Line range hint 289-964
: Verify the impact of terminology changes across the codebase.
While the changes in this file are consistent, we should verify that all IR to stream terminology changes have been applied consistently across the entire codebase.
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (7)
16-16
: Import Statement Update Confirmed
The import of ExtractIrJobConfig
and ExtractJsonJobConfig
is necessary to handle both IR and JSON extraction configurations. This change is appropriate and aligns with the new functionality.
28-31
: Updated Parameters in make_command
Function
The addition of stream_output_dir
, job_config_obj
, and stream_collection
parameters to the make_command
function correctly accommodates the requirements for both IR and JSON extraction tasks.
51-69
: Proper Handling of JSON Extraction Command
The new conditional branch for StorageEngine.CLP_S
correctly introduces the logic for JSON extraction. The construction of the command and the parsing of ExtractJsonJobConfig
are implemented appropriately.
78-78
: Function Renaming Reflects Expanded Functionality
Renaming the function to extract_stream
appropriately reflects its support for both IR and JSON extraction, enhancing code readability and clarity.
87-87
: Updated Task Name Improves Clarity
Setting task_name
to "Stream Extraction" provides a clearer description of the task being executed, aligning with the function's broadened scope.
104-105
: Retrieval of New Environment Variables
The addition of stream_output_dir
and stream_collection
from environment variables ensures that the necessary configurations are accessible for the extraction processes.
112-115
: Correct Parameters Passed to make_command
Passing stream_output_dir
, job_config_obj
, and stream_collection
to the make_command
function ensures it has all required information to construct the appropriate extraction command.
let {StreamFilesDir} = settings; | ||
if (false === path.isAbsolute(StreamFilesDir)) { | ||
StreamFilesDir = path.resolve(rootDirname, StreamFilesDir); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use consistent camelCase naming convention
The variable StreamFilesDir
should follow JavaScript naming conventions by using camelCase instead of PascalCase, as it's not a class name.
Apply this diff to maintain consistency:
- let {StreamFilesDir} = settings;
- if (false === path.isAbsolute(StreamFilesDir)) {
- StreamFilesDir = path.resolve(rootDirname, StreamFilesDir);
+ let {streamFilesDir} = settings;
+ if (false === path.isAbsolute(streamFilesDir)) {
+ streamFilesDir = path.resolve(rootDirname, streamFilesDir);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let {StreamFilesDir} = settings; | |
if (false === path.isAbsolute(StreamFilesDir)) { | |
StreamFilesDir = path.resolve(rootDirname, StreamFilesDir); | |
let {streamFilesDir} = settings; | |
if (false === path.isAbsolute(streamFilesDir)) { | |
streamFilesDir = path.resolve(rootDirname, streamFilesDir); |
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (2)
359-393
: Consider adding error handling for MongoDB operations.While the functions are well-structured and handle both extraction types effectively, the MongoDB operations could benefit from explicit error handling to gracefully handle connection issues or query failures.
Add try-catch blocks around MongoDB operations:
def is_target_extracted( results_cache_uri: str, stream_collection_name: str, target_id: str, job_type: QueryJobType ) -> bool: target_key: str if QueryJobType.EXTRACT_IR == job_type: target_key = "file_split_id" else: target_key = "orig_file_id" - with pymongo.MongoClient(results_cache_uri) as results_cache_client: - stream_collection = results_cache_client.get_default_database()[stream_collection_name] - results_count = stream_collection.count_documents({target_key: target_id}) - return 0 != results_count + try: + with pymongo.MongoClient(results_cache_uri) as results_cache_client: + stream_collection = results_cache_client.get_default_database()[stream_collection_name] + results_count = stream_collection.count_documents({target_key: target_id}) + return 0 != results_count + except Exception as e: + logger.error(f"Failed to check extraction status: {e}") + return False
Line range hint
867-912
: Consider extracting common logging patterns.While the function effectively handles job completion with proper error handling, the logging statements could be made more maintainable by extracting common patterns.
Extract logging into helper functions:
+def log_task_completion(job_id: str, task_id: str, duration: float, success: bool = True) -> None: + status = "succeeded" if success else "failed" + logger.info( + f"extraction task job-{job_id}-task-{task_id} {status} in {duration} second(s)." + ) + async def handle_finished_stream_extraction_job( db_conn, job: QueryJob, task_results: List[Any] ) -> None: # ... existing code ... - logger.info( - f"extraction task job-{job_id}-task-{task_id} succeeded in " - f"{task_result.duration} second(s)." - ) + log_task_completion(job_id, task_id, task_result.duration)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(19 hunks)components/log-viewer-webui/server/src/routes/static.js
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- components/log-viewer-webui/server/src/routes/static.js
🔇 Additional comments (5)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (5)
291-306
: Well-structured stream extraction utility function!
The function effectively handles both IR and JSON extraction types while maintaining good separation of concerns and SQL safety.
395-418
: Well-implemented job creation function with proper type safety!
The function effectively handles both extraction types while maintaining type safety through proper configuration parsing and providing informative logging.
Line range hint 458-467
: Excellent integration of JSON extraction with existing task group creation!
The changes effectively integrate JSON extraction while maintaining code reusability and consistency with the existing IR extraction functionality.
973-974
: Clean integration of JSON extraction job status handling!
The changes effectively integrate JSON extraction job status handling while maintaining code consistency and reusability.
Line range hint 990-1004
: Clean configuration integration!
The stream collection configuration is properly integrated and propagated through the job handling system.
Also applies to: 1088-1088
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (2)
81-183
: Add docstrings and type hints to abstract methods.The
StreamExtractionHandle
class hierarchy is well-designed, but could be more maintainable with:
- Docstrings explaining the purpose and contract of each abstract method
- Return type hints for all methods
Apply this diff to improve the abstract class:
class StreamExtractionHandle(ABC): def __init__(self, job_id: str): self.job_id = job_id @abstractmethod - def get_stream_id(self) -> Optional[str]: ... + def get_stream_id(self) -> Optional[str]: + """ + Get the identifier for the stream being extracted. + + Returns: + str: The stream ID if available, None otherwise + """ + ... @abstractmethod - def get_archive_for_stream_extraction(self, db_conn) -> Optional[str]: ... + def get_archive_for_stream_extraction(self, db_conn) -> Optional[str]: + """ + Get the archive ID containing the stream to be extracted. + + Args: + db_conn: Database connection + + Returns: + str: The archive ID if found, None otherwise + """ + ...
Line range hint
912-957
: Improve error handling and logging consistency.The job completion handling could be improved in several ways:
- The logging messages at lines 954 and 956 are nearly identical
- The error handling for unexpected task counts could be more specific
Consider these improvements:
- logger.info(f"Completed stream extraction job {job_id}.") - else: - logger.info(f"Completed stream extraction job {job_id} with failing tasks.") + logger.info(f"Successfully completed stream extraction job {job_id}") + else: + logger.error( + f"Stream extraction job {job_id} failed: " + f"{'unexpected task count' if num_tasks != 1 else 'task execution failed'}" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(19 hunks)
🔇 Additional comments (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (1)
454-469
: LGTM! Well-implemented archive existence check.
The function uses parameterized queries for SQL injection prevention and has clear error handling.
# NOTE: The following two if blocks of `is_stream_extraction_active` and | ||
# `is_stream_extracted` should not be reordered. | ||
# We must: | ||
# 1. First, check if the stream is in the process of being extracted | ||
# (`is_stream_extraction_active`). | ||
# 2. Then, check if the stream has already been extracted | ||
# (`is_stream_extracted`). | ||
# | ||
# This order ensures correctness because `is_stream_extracted` returns True if | ||
# any chunk of the stream has been extracted, but it does not guarantee that *all* | ||
# chunks are extracted. If `is_stream_extracted` is checked first and the job is | ||
# marked as successful based on its result, it is possible that the extraction job | ||
# is still in progress, meaning the specific chunk requested by the web UI might | ||
# not yet be ready. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider encapsulating the extraction state checks.
The ordering dependency between is_stream_extraction_active
and is_stream_extracted
is well documented but could be encapsulated to prevent future maintenance issues.
Consider creating a dedicated method to handle these checks:
+ def check_extraction_state(
+ self,
+ db_conn,
+ job_id: str,
+ results_cache_uri: str,
+ stream_collection_name: str
+ ) -> Tuple[bool, Optional[QueryJobStatus]]:
+ """
+ Check the extraction state in the correct order:
+ 1. First, check if stream is being extracted
+ 2. Then, check if stream has been extracted
+
+ Returns:
+ Tuple[bool, Optional[QueryJobStatus]]:
+ - bool: True if state handling is complete
+ - QueryJobStatus: The new job status if complete, None otherwise
+ """
+ if self.is_stream_extraction_active():
+ self.mark_job_as_waiting()
+ return True, QueryJobStatus.RUNNING
+
+ if self.is_stream_extracted(results_cache_uri, stream_collection_name):
+ return True, QueryJobStatus.SUCCEEDED
+
+ return False, None
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (4)
81-100
: Add docstrings to abstract methods for better maintainability.The abstract class provides a good interface, but adding docstrings would make it more maintainable and help future implementations.
Apply this diff to add docstrings:
class StreamExtractionHandle(ABC): def __init__(self, job_id: str): self.job_id = job_id self.archive_id: Optional[str] = None @abstractmethod def get_stream_id(self) -> Optional[str]: ... + """ + Returns the unique identifier for the stream being extracted. + :return: String identifier for the stream, or None if not available + """ @abstractmethod def is_stream_extraction_active(self) -> bool: ... + """ + Checks if the stream is currently being extracted. + :return: True if extraction is in progress, False otherwise + """ @abstractmethod def mark_job_as_waiting(self) -> None: ... + """ + Marks the current job as waiting for stream extraction to complete. + """ @abstractmethod def is_stream_extracted(self, results_cache_uri: str, stream_collection_name: str) -> bool: ... + """ + Checks if the stream has been extracted to the results cache. + :param results_cache_uri: URI of the results cache + :param stream_collection_name: Name of the collection storing extracted streams + :return: True if stream is extracted, False otherwise + """ @abstractmethod def create_stream_extraction_job(self) -> QueryJob: ... + """ + Creates a new job for extracting the stream. + :return: Configured QueryJob instance + """
Line range hint
903-948
: Improve error handling in stream extraction job completion.The error handling in
handle_finished_stream_extraction_job
could be more robust:
- Consider using early returns to reduce nesting
- Add more detailed error messages
Apply this diff to improve the error handling:
async def handle_finished_stream_extraction_job( db_conn, job: QueryJob, task_results: List[Any] ) -> None: global active_jobs global active_archive_json_extractions global active_file_split_ir_extractions job_id = job.id - new_job_status = QueryJobStatus.SUCCEEDED + new_job_status = QueryJobStatus.FAILED # Default to failed, set to success only if all checks pass num_tasks = len(task_results) if 1 != num_tasks: logger.error( - f"Unexpected number of tasks for extraction job {job_id}. " - f"Expected 1, got {num_tasks}." + f"Invalid task count for extraction job {job_id}: expected 1, got {num_tasks}. " + f"This indicates a potential scheduling issue." ) - new_job_status = QueryJobStatus.FAILED - else: - task_result = QueryTaskResult.parse_obj(task_results[0]) - task_id = task_result.task_id - if not QueryJobStatus.SUCCEEDED == task_result.status: - logger.error( - f"extraction task job-{job_id}-task-{task_id} failed. " - f"Check {task_result.error_log_path} for details." - ) - new_job_status = QueryJobStatus.FAILED - else: - logger.info( - f"extraction task job-{job_id}-task-{task_id} succeeded in " - f"{task_result.duration} second(s)." - ) + return + task_result = QueryTaskResult.parse_obj(task_results[0]) + task_id = task_result.task_id + + if not QueryJobStatus.SUCCEEDED == task_result.status: + logger.error( + f"Stream extraction task job-{job_id}-task-{task_id} failed. " + f"Error details available at: {task_result.error_log_path}" + ) + return + + logger.info( + f"Stream extraction task job-{job_id}-task-{task_id} completed successfully " + f"in {task_result.duration:.2f} second(s)." + ) + new_job_status = QueryJobStatus.SUCCEEDED
76-77
: Consider using a thread-safe data structure for job tracking.The use of global dictionaries for tracking active jobs and extractions could lead to race conditions in a multi-threaded environment.
Consider using a thread-safe data structure or encapsulating the state management in a class:
from threading import Lock class JobTracker: def __init__(self): self._lock = Lock() self._active_jobs = {} self._active_ir_extractions = {} self._active_json_extractions = {} def add_job(self, job_id: str, job: QueryJob) -> None: with self._lock: self._active_jobs[job_id] = job def remove_job(self, job_id: str) -> None: with self._lock: self._active_jobs.pop(job_id, None)
1009-1010
: Use tuple for job type comparison.Replace the list with a tuple for better performance when checking job types.
Apply this diff:
- elif job_type in [QueryJobType.EXTRACT_JSON, QueryJobType.EXTRACT_IR]: + elif job_type in (QueryJobType.EXTRACT_JSON, QueryJobType.EXTRACT_IR):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(19 hunks)
🔇 Additional comments (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (1)
677-691
: Consider encapsulating the extraction state checks.
The ordering dependency between is_stream_extraction_active
and is_stream_extracted
is well documented but could be encapsulated to prevent future maintenance issues.
def check_if_archive_exists( | ||
db_conn, | ||
archive_id: str, | ||
) -> bool: | ||
|
||
query = f"""SELECT 1 | ||
FROM {CLP_METADATA_TABLE_PREFIX}archives WHERE | ||
id = %s | ||
""" | ||
with contextlib.closing(db_conn.cursor(dictionary=True)) as cursor: | ||
cursor.execute(query, (archive_id,)) | ||
if cursor.fetchone(): | ||
return True | ||
|
||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
SQL injection vulnerabilities found in multiple database operations
Several SQL queries are using string formatting (f-strings
) to construct queries instead of using parameterized queries, particularly in:
-
compression_scheduler.py
:update_compression_task_metadata
: Direct string interpolation fortask_id
update_compression_job_metadata
: Direct string interpolation for field values andjob_id
- Multiple UPDATE queries with interpolated status values and job IDs
-
query_scheduler.py
:- Multiple SELECT queries with interpolated table names and status values
- INSERT query in
insert_query_tasks_into_db
with interpolated values
These should be rewritten using parameterized queries similar to the pattern used in check_if_archive_exists
.
🔗 Analysis chain
Use parameterized queries consistently.
Good use of parameterized query in check_if_archive_exists
. However, this pattern should be applied consistently throughout the file.
Let's verify other SQL queries in the file:
Let me analyze the SQL queries in more detail by checking their full context.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for potential SQL injection vulnerabilities
rg -A 3 "execute\(" | rg -B 3 "SELECT|UPDATE|INSERT|DELETE"
Length of output: 4240
Script:
#!/bin/bash
# Get more context around SQL queries
rg -B 5 -A 10 "execute\(" components/job-orchestration/job_orchestration/scheduler/
Length of output: 23781
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skimmed things and they generally look good. I won't have time to finish a complete review within 24 hours, but will likely get to it later tonight.
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (2)
81-103
: Add return type hints to abstract methods.The abstract methods in
StreamExtractionHandle
should include return type hints for better type safety and documentation.Apply this diff:
- def get_stream_id(self) -> Optional[str]: ... + @abstractmethod + def get_stream_id(self) -> Optional[str]: ... - def is_stream_extraction_active(self) -> bool: ... + @abstractmethod + def is_stream_extraction_active(self) -> bool: ... - def is_stream_extracted(self, results_cache_uri: str, stream_collection_name: str) -> bool: ... + @abstractmethod + def is_stream_extracted(self, results_cache_uri: str, stream_collection_name: str) -> bool: ... - def mark_job_as_waiting(self) -> None: ... + @abstractmethod + def mark_job_as_waiting(self) -> None: ... - def create_stream_extraction_job(self) -> QueryJob: ... + @abstractmethod + def create_stream_extraction_job(self) -> QueryJob: ...
Line range hint
908-938
: Improve error handling in stream extraction job completion.The error handling in
handle_finished_stream_extraction_job
could be enhanced:
- The assumption of exactly one task result is rigid and could be made more flexible
- Error messages could provide more context about the failure
Apply this diff:
async def handle_finished_stream_extraction_job( db_conn, job: QueryJob, task_results: List[Any] ) -> None: global active_jobs global active_archive_json_extractions global active_file_split_ir_extractions job_id = job.id new_job_status = QueryJobStatus.SUCCEEDED num_tasks = len(task_results) - if 1 != num_tasks: + if num_tasks == 0: logger.error( - f"Unexpected number of tasks for extraction job {job_id}. " - f"Expected 1, got {num_tasks}." + f"No task results received for extraction job {job_id}." ) new_job_status = QueryJobStatus.FAILED - else: + elif num_tasks > 1: + logger.warning( + f"Multiple task results ({num_tasks}) received for extraction job {job_id}. " + f"Processing only the first result." + ) + + if new_job_status != QueryJobStatus.FAILED: task_result = QueryTaskResult.parse_obj(task_results[0]) task_id = task_result.task_id if not QueryJobStatus.SUCCEEDED == task_result.status: logger.error( - f"extraction task job-{job_id}-task-{task_id} failed. " - f"Check {task_result.error_log_path} for details." + f"Stream extraction task job-{job_id}-task-{task_id} failed with status " + f"{task_result.status}. Check {task_result.error_log_path} for details." ) new_job_status = QueryJobStatus.FAILED
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(19 hunks)
🔇 Additional comments (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (1)
682-696
: Consider encapsulating the extraction state checks.
The ordering dependency between is_stream_extraction_active
and is_stream_extracted
is well documented but could be encapsulated to prevent future maintenance issues.
# Dictionary that maps IDs of clp-s archives being extracted to IDs of jobs waiting for them | ||
active_archive_json_extractions: Dict[str, List[str]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider encapsulating global state in a JobStateManager class.
The use of global dictionaries for tracking job states could lead to maintainability and thread-safety issues. Consider encapsulating this state in a dedicated class.
Example implementation:
class JobStateManager:
def __init__(self):
self._active_jobs: Dict[str, QueryJob] = {}
self._active_file_split_ir_extractions: Dict[str, List[str]] = {}
self._active_archive_json_extractions: Dict[str, List[str]] = {}
self._lock = asyncio.Lock() # For thread safety
async def add_waiting_ir_job(self, file_split_id: str, job_id: str) -> None:
async with self._lock:
if file_split_id not in self._active_file_split_ir_extractions:
self._active_file_split_ir_extractions[file_split_id] = []
self._active_file_split_ir_extractions[file_split_id].append(job_id)
async def add_waiting_json_job(self, archive_id: str, job_id: str) -> None:
async with self._lock:
if archive_id not in self._active_archive_json_extractions:
self._active_archive_json_extractions[archive_id] = []
self._active_archive_json_extractions[archive_id].append(job_id)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not assuming multithreading for this file. we currently only use single threading
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@haiqi96, understood. Since the file is single-threaded, encapsulating the state for thread-safety may not be necessary.
(_/)
( •_•)
/ >💧
✏️ Learnings added
Learnt from: haiqi96
PR: y-scope/clp#569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:76-77
Timestamp: 2024-11-17T23:24:08.758Z
Learning: The `query_scheduler.py` file operates with single threading, and multithreading is not used.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
Co-authored-by: kirkrodrigues <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (4)
76-77
: Add empty line before comment for better readabilityAdd an empty line before the comment to improve readability and maintain consistency with the project's commenting style.
active_file_split_ir_extractions: Dict[str, List[str]] = {} + # Dictionary that maps IDs of clp-s archives being extracted to IDs of jobs waiting for them active_archive_json_extractions: Dict[str, List[str]] = {}
182-186
: Rename function for clarityThe function name
does_document_exist
is too generic. Consider renaming it tomongodb_doc_exists
to better reflect its specific purpose and implementation.-def does_document_exist(mongodb_uri, collection_name, field, value): +def mongodb_doc_exists(mongodb_uri, collection_name, field, value):
113-114
: Improve error message clarityThe error message could be more specific about which parameters failed to resolve to an existing archive.
- raise ValueError("Job parameters don't resolve to an existing archive") + raise ValueError(f"Failed to resolve archive for file_split_id={self._file_split_id}")
451-464
: Rename function for consistencyThe function name
check_if_archive_exists
could be shortened toarchive_exists
for better consistency with other function names in the codebase.-def check_if_archive_exists( +def archive_exists( db_conn, archive_id: str, ) -> bool:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(19 hunks)
🔇 Additional comments (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (1)
955-961
:
Potential race condition in job cleanup
The code assumes that the job being completed is the one that did the extraction, but there could be a race condition if multiple jobs complete simultaneously.
Consider adding synchronization or using a more robust cleanup mechanism:
if QueryJobType.EXTRACT_IR == job.get_type():
extract_ir_config: ExtractIrJobConfig = job.get_config()
- waiting_jobs = active_file_split_ir_extractions.pop(extract_ir_config.file_split_id)
+ file_split_id = extract_ir_config.file_split_id
+ if file_split_id in active_file_split_ir_extractions:
+ waiting_jobs = active_file_split_ir_extractions.pop(file_split_id)
else:
extract_json_config: ExtractJsonJobConfig = job.get_config()
- waiting_jobs = active_archive_json_extractions.pop(extract_json_config.archive_id)
+ archive_id = extract_json_config.archive_id
+ if archive_id in active_archive_json_extractions:
+ waiting_jobs = active_archive_json_extractions.pop(archive_id)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (4)
82-104
: Add type hints to abstract methods for better type safety.The abstract methods in
StreamExtractionHandle
would benefit from type hints to ensure proper implementation in derived classes.Apply this diff to add type hints:
- def get_stream_id(self) -> str: ... + def get_stream_id(self) -> str: + """Returns the ID of the stream being extracted.""" + ... - def is_stream_extraction_active(self) -> bool: ... + def is_stream_extraction_active(self) -> bool: + """Returns True if the stream is currently being extracted.""" + ... - def is_stream_extracted(self, results_cache_uri: str, stream_collection_name: str) -> bool: ... + def is_stream_extracted(self, results_cache_uri: str, stream_collection_name: str) -> bool: + """Returns True if the stream has been extracted.""" + ... - def mark_job_as_waiting(self) -> None: ... + def mark_job_as_waiting(self) -> None: + """Marks the job as waiting for stream extraction to complete.""" + ... - def create_stream_extraction_job(self) -> QueryJob: ... + def create_stream_extraction_job(self) -> QueryJob: + """Creates and returns a new stream extraction job.""" + ...
182-186
: Rename function to be more descriptive.The function name
document_exist
could be more descriptive of its purpose in checking MongoDB documents.Apply this diff to rename the function:
-def document_exist(mongodb_uri, collection_name, field, value): +def mongodb_document_exists(mongodb_uri: str, collection_name: str, field: str, value: str) -> bool:
Line range hint
663-682
: Use specific exception handling.The current exception handling catches all
ValueError
instances. Consider catching specific exceptions for better error handling.Apply this diff to improve error handling:
try: if QueryJobType.EXTRACT_IR == job_type: job_handle = IrExtractionHandle(job_id, job_config, db_conn) else: job_handle = JsonExtractionHandle(job_id, job_config, db_conn) - except ValueError: + except ValueError as ve: + logger.exception(f"Invalid job parameters: {str(ve)}") + if not set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.FAILED, + QueryJobStatus.PENDING, + start_time=datetime.datetime.now(), + num_tasks=0, + duration=0, + ): + logger.error(f"Failed to set job {job_id} as failed") + continue + except Exception as e: logger.exception("Failed to initialize extraction job handle")
Line range hint
910-977
: Consider extracting job cleanup logic into a separate method.The job cleanup logic in
handle_finished_stream_extraction_job
could be extracted into a separate method for better maintainability.Consider creating a helper method:
def cleanup_waiting_jobs( db_conn, job: QueryJob, job_id: str, new_job_status: QueryJobStatus, start_time: datetime.datetime ) -> None: """Cleanup waiting jobs after stream extraction completion.""" waiting_jobs: List[str] if QueryJobType.EXTRACT_IR == job.get_type(): extract_ir_config: ExtractIrJobConfig = job.get_config() waiting_jobs = active_file_split_ir_extractions.pop(extract_ir_config.file_split_id) else: extract_json_config: ExtractJsonJobConfig = job.get_config() waiting_jobs = active_archive_json_extractions.pop(extract_json_config.archive_id) waiting_jobs.remove(job_id) for waiting_job in waiting_jobs: logger.info(f"Setting status to {new_job_status.to_str()} for waiting jobs: {waiting_job}.") set_job_or_task_status( db_conn, QUERY_JOBS_TABLE_NAME, waiting_job, new_job_status, QueryJobStatus.RUNNING, num_tasks_completed=0, duration=(datetime.datetime.now() - start_time).total_seconds(), )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(19 hunks)
🔇 Additional comments (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (1)
106-145
: Use double underscores for private members in derived classes.
The derived classes IrExtractionHandle
and JsonExtractionHandle
use single underscore for private members. For stronger encapsulation, use double underscores.
Also applies to: 147-180
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (3)
76-77
: Consider encapsulating extraction state managementWhile the code operates in a single-threaded context, managing state through global dictionaries can make the code harder to maintain and test. Consider creating a dedicated class to manage extraction state, even without thread-safety concerns.
Example implementation:
class ExtractionStateManager: def __init__(self): self._active_file_split_ir_extractions: Dict[str, List[str]] = {} self._active_archive_json_extractions: Dict[str, List[str]] = {} def add_waiting_ir_job(self, file_split_id: str, job_id: str) -> None: if file_split_id not in self._active_file_split_ir_extractions: self._active_file_split_ir_extractions[file_split_id] = [] self._active_file_split_ir_extractions[file_split_id].append(job_id) def add_waiting_json_job(self, archive_id: str, job_id: str) -> None: if archive_id not in self._active_archive_json_extractions: self._active_archive_json_extractions[archive_id] = [] self._active_archive_json_extractions[archive_id].append(job_id)
182-186
: Consider a more specific function nameThe function name
document_exists
could be more specific to its purpose, such ascheck_mongodb_document_exists
orverify_stream_document_exists
, to better indicate its role in checking MongoDB documents.
Line range hint
661-751
: Consider extracting job type handling into separate methodsThe job type handling logic in
handle_pending_query_jobs
is becoming complex. Consider extracting the handling of each job type into separate methods for better maintainability.Example refactor:
def handle_stream_extraction_job( self, db_conn, job_id: str, job_type: QueryJobType, job_config: Dict[str, Any], results_cache_uri: str, stream_collection_name: str, ) -> None: try: job_handle = self._create_job_handle(job_id, job_type, job_config, db_conn) except ValueError: self._handle_job_initialization_failure(db_conn, job_id) return if job_handle.is_stream_extraction_active(): self._handle_active_extraction(db_conn, job_id, job_handle) return if job_handle.is_stream_extracted(results_cache_uri, stream_collection_name): self._handle_completed_extraction(db_conn, job_id, job_handle) return self._create_and_dispatch_extraction_job(db_conn, job_id, job_handle)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(19 hunks)
🔇 Additional comments (2)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (2)
82-104
: Well-designed abstraction for stream extraction handling
The StreamExtractionHandle
abstract base class provides a clean interface for handling different types of extractions. The abstraction effectively separates concerns and makes the code more maintainable.
1132-1134
: LGTM: Clean integration of stream collection configuration
The addition of the stream collection name parameter is well-integrated into the existing configuration flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the PR title, how about:
feat(clp-package): Add support for extracting JSON streams from archives.
…ves. (y-scope#569) Co-authored-by: kirkrodrigues <[email protected]>
Description
This PR adds json extraction task for the clp-package to prepare for the log-viewer support. The PR contains the following changes:
Validation performed
For CLP-unstructured.
Confirmed that a compressed file and be opened in the log-viewer via CLP webui.
For CLP-Json
Confirmed that a Jsonl file gets can be extracted when submitting a job via decompress.sh script
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Refactor