Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(package)!: Add support for writing clp-s single file archives to S3. #634

Merged
merged 43 commits into from
Dec 19, 2024

Conversation

haiqi96
Copy link
Contributor

@haiqi96 haiqi96 commented Dec 12, 2024

Description

This PR adds support for writing clp-s single file archive to s3. The write destination is provided in the clp-config.yml file by user.
Currently, we support two ways of S3 authentication. If users provide the ACCESS_KEY credentials in the clp-config file, CLP will attempt to use the credentials. Otherwise, CLP will rely on boto3 to auto-locate the credentials. Note that CLP operate on containers so it won't automatically pick up the .aws/credentials.

Besides supporting S3 write, the PR introduces new config classes to make it easier for:

  1. User to specifcy S3 info in the clp-config file
  2. For CLP workers to obtain this information.

Note: This PR only supports S3 write for CLP-S. Search and extraction support for CLP-S is not included. The PR adds checks where necessary to return failurefor search and extraction with S3.

The PR includes the following changes:

  1. Implemented features in compression worker to support S3 upload using boto3.
  2. Added a few new helper classes.
  3. Introduced S3config class to support specfying s3 configuration in clp-config file. The goal is to reuse this configuration class in future S3 support.
  4. Introduced two storage configuration classes, FSStorage and S3Storage to represent the archive output configuration.
  5. Use WorkerConfig (instead of environmental variables) to pass parameters from starter scripts to workers. With more parameters related to S3, it could be error prone to pass all parameters using Envvars.

Due to time limitation, we leave the following items as todo for future:

  1. We didn't properly implement S3 verification. Currently, if user provides invalid S3 information (such as wrong credentials or unexisting bucket), it will only be caught when CLP-S attempts to upload a single file archive. Ideally, we should catch the invalid s3 configuration at package start up.
  2. Currently, if s3 upload fails, the package does not invalidate the archive's entry in the database. As a result, an archive will show up in the database even if it is not uploaded to S3. The job, however, will still report failure.

Validation performed

Manually validated different configurations

Summary by CodeRabbit

  • New Features

    • Introduced new StorageType, FsStorage, S3Storage, and S3Config classes for enhanced storage configuration.
    • Added generate_worker_config and s3_put functions for improved configuration handling and S3 uploads.
    • Enhanced worker configuration handling in multiple tasks, including extraction and compression processes.
    • Implemented storage type validations in various scripts to ensure compatibility with supported operations.
  • Bug Fixes

    • Enhanced error handling and logging for worker configuration and task failures, particularly with storage type validations.
  • Documentation

    • Updated configuration structure in clp-config.yml to reflect new storage settings.
  • Chores

    • Added dependencies boto3 and result to pyproject.toml.

Copy link
Contributor

coderabbitai bot commented Dec 12, 2024

Walkthrough

The pull request introduces substantial changes to the configuration management and storage handling across multiple components of the CLP (Compressed Log Processing) system. The modifications focus on enhancing type safety, introducing S3 storage support, and refactoring configuration handling. Key changes include adding a new StorageType enum, creating S3Storage and FsStorage classes, and updating configuration validation methods. The changes provide more flexible and robust configuration management, particularly for archive output and worker configurations.

Changes

File Change Summary
clp_py_utils/clp_config.py Added StorageType, FsStorage, S3Storage, S3Config classes; modified ArchiveOutput and CLPConfig to support new storage configurations.
clp_package_utils/general.py Added generate_worker_config function; updated validate_worker_config and methods for archive output handling.
clp_package_utils/scripts/start_clp.py Refactored worker configuration management; simplified worker start functions and adjusted parameter handling.
job_orchestration/executor/compress/fs_compression_task.py Added S3 upload functionality; updated run_clp to accept worker_config.
job_orchestration/executor/query/extract_stream_task.py Enhanced worker configuration handling; improved error reporting with load_worker_config.
job_orchestration/executor/query/fs_search_task.py Updated command generation with worker_config; improved error handling.
job_orchestration/executor/utils.py Added load_worker_config function for loading configurations.
s3_utils.py Added s3_put function for S3 file uploads.
pyproject.toml Added boto3 and result dependencies.
package-template/src/etc/clp-config.yml Updated archive_output structure to include storage type.
initialize-orchestration-db.py Increased status_msg column size in COMPRESSION_JOBS_TABLE_NAME.
job_orchestration/scheduler/compress/compression_scheduler.py Enhanced SQL query construction for metadata updates.
clp_package_utils/scripts/native/decompress.py Updated directory retrieval to use method call.
clp_package_utils/scripts/native/del_archives.py Updated directory retrieval to use method call.

Possibly related PRs


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@haiqi96
Copy link
Contributor Author

haiqi96 commented Dec 12, 2024

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Dec 12, 2024

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Outside diff range and nitpick comments (10)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (1)

92-92: Confirm deletion of local archive after successful upload

The local archive file is deleted after a successful S3 upload using src_file.unlink(). Ensure that this behaviour is intended and does not impact other processes that may require access to the local file.

components/clp-py-utils/clp_py_utils/clp_config.py (3)

546-557: Improve validation logic in validate_archive_output_config

The current validation does not provide detailed feedback when the storage type is unsupported with the chosen storage engine.

Consider specifying which storage types are supported with each storage engine for clearer error messages.


312-387: Consolidate storage classes and remove redundant code

There is duplicated code in the FSStorage and S3Storage classes, particularly in methods like make_config_path_absolute and dump_to_primitive_dict.

Consider creating a base class for shared functionality to adhere to the DRY (Don't Repeat Yourself) principle.


77-82: Ensure consistent use of Optional in type annotations

In the Database class, ssl_cert, username, and password use Optional without specifying a default value, which may lead to unexpected None values.

Initialize optional fields with None explicitly for clarity:

ssl_cert: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
components/clp-py-utils/clp_py_utils/result.py (1)

1-10: Consider using @dataclass for the Result class

The Result class could benefit from using the @dataclass decorator to automatically generate boilerplate code such as __init__ and __repr__.

Example:

from dataclasses import dataclass
from typing import Optional

@dataclass
class Result:
    success: bool
    error: Optional[str] = None
components/job-orchestration/job_orchestration/executor/s3_utils.py (1)

11-14: Enhance input validation

The file existence checks are good, but consider adding:

  • Size validation to prevent uploading empty files
  • File permissions check to ensure readability
 if not src_file.exists():
     return Result(success=False, error=f"{src_file} doesn't exist")
 if not src_file.is_file():
     return Result(success=False, error=f"{src_file} is not a file")
+if not os.access(src_file, os.R_OK):
+    return Result(success=False, error=f"{src_file} is not readable")
+if src_file.stat().st_size == 0:
+    return Result(success=False, error=f"{src_file} is empty")
components/package-template/src/etc/clp-config.yml (2)

69-71: Document available storage types

The new storage configuration structure is good, but needs documentation for available options.

 #archive_output:
+#  # Storage configuration for archives
+#  # Available types:
+#  # - "fs": Local filesystem storage
+#  # - "s3": Amazon S3 storage (requires additional S3 configuration)
   storage:
     type: "fs"
     directory: "var/data/archives"

69-71: Consider adding S3 configuration example

Since S3 storage is being implemented, it would be helpful to include a commented example.

   storage:
     type: "fs"
     directory: "var/data/archives"
+    # Example S3 configuration:
+    # type: "s3"
+    # bucket: "my-archive-bucket"
+    # key_prefix: "archives/"
+    # region_name: "us-west-2"
components/clp-package-utils/clp_package_utils/general.py (1)

487-495: Enhance error handling for S3 configuration validation.

The S3 configuration validation could be improved:

  1. Consider adding more specific error messages for different validation failures
  2. Add logging for validation failures to aid debugging

Apply this diff to enhance error handling:

-    if StorageType.S3 == storage_config.type:
-        result = verify_s3_config_for_archive_output(storage_config.s3_config)
-        if not result.success:
-            raise ValueError(f"S3 config verification failed: {result.error}")
+    if StorageType.S3 == storage_config.type:
+        result = verify_s3_config_for_archive_output(storage_config.s3_config)
+        if not result.success:
+            logger.error(f"S3 configuration validation failed: {result.error}")
+            raise ValueError(
+                f"S3 configuration validation failed: {result.error}. "
+                f"Please check your S3 credentials and configuration."
+            )
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)

702-702: Use consistent directory creation pattern.

The directory creation should follow the same pattern as other similar operations in the codebase. Consider extracting this into a helper function to maintain consistency.

Apply this diff to improve consistency:

-    clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True)
+    create_directory(clp_config.archive_output.get_directory())

+def create_directory(path: pathlib.Path):
+    """Creates a directory if it doesn't exist.
+    
+    Args:
+        path: The path to create.
+    """
+    path.mkdir(parents=True, exist_ok=True)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6dd8fc1 and 9ba0a38.

📒 Files selected for processing (9)
  • components/clp-package-utils/clp_package_utils/general.py (4 hunks)
  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py (4 hunks)
  • components/clp-py-utils/clp_py_utils/clp_config.py (10 hunks)
  • components/clp-py-utils/clp_py_utils/result.py (1 hunks)
  • components/clp-py-utils/clp_py_utils/s3_utils.py (1 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (7 hunks)
  • components/job-orchestration/job_orchestration/executor/s3_utils.py (1 hunks)
  • components/job-orchestration/pyproject.toml (1 hunks)
  • components/package-template/src/etc/clp-config.yml (1 hunks)
🔇 Additional comments (5)
components/job-orchestration/pyproject.toml (1)

13-13: Verify boto3 version selection

The selected version ^1.35.76 seems relatively recent, but let's verify it's the latest stable version without known vulnerabilities.

components/job-orchestration/job_orchestration/executor/s3_utils.py (1)

24-36: 🛠️ Refactor suggestion

Improve file handling with context managers

The S3 client should be properly managed to ensure resources are released.

-    with open(src_file, "rb") as file_data:
-        try:
-            my_s3_client.put_object(
-                Bucket=s3_config.bucket, Body=file_data, Key=s3_config.key_prefix + dest_file_name
-            )
-        except ClientError as e:
-            error_code = e.response["Error"]["Code"]
-            error_message = e.response["Error"]["Message"]
-            return Result(success=False, error=f"ClientError: {error_code} - {error_message}")
-        except Exception as e:
-            return Result(success=False, error=f"An unexpected error occurred: {e}")
+    try:
+        with open(src_file, "rb") as file_data:
+            try:
+                my_s3_client.put_object(
+                    Bucket=s3_config.bucket,
+                    Body=file_data,
+                    Key=s3_config.key_prefix + dest_file_name
+                )
+            except ClientError as e:
+                error_code = e.response["Error"]["Code"]
+                error_message = e.response["Error"]["Message"]
+                return Result(success=False, error=f"ClientError: {error_code} - {error_message}")
+            except Exception as e:
+                return Result(success=False, error=f"An unexpected error occurred: {e}")
+    except IOError as e:
+        return Result(success=False, error=f"Failed to read file: {e}")

Likely invalid or redundant comment.

components/clp-package-utils/clp_package_utils/general.py (2)

23-23: LGTM: Import of StorageType is correctly placed.

The import is appropriately grouped with other imports from clp_py_utils.clp_config.


244-254: Verify the container path mapping for archive output directory.

The container path mapping for archive output directory is hardcoded to /mnt/archive-output. Consider making this path configurable through environment variables or configuration files for better flexibility.

components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)

1143-1148: LGTM: Worker configuration validation is correctly placed.

The worker configuration validation is appropriately added for the relevant targets.

last_line_decoded = True

if last_archive_stats is not None and (
None is stats or stats["id"] != last_archive_stats["id"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not the best way to write the loop, but then we don't have to duplicate everything after the end of the loop.

@haiqi96
Copy link
Contributor Author

haiqi96 commented Dec 13, 2024

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Dec 13, 2024

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (2)

117-117: Consider adding validation for enable_s3_write parameter.

The function correctly adds the --single-file-archive flag when S3 write is enabled, but consider adding parameter validation.

def make_clp_s_command(
    clp_home: pathlib.Path,
    archive_output_dir: pathlib.Path,
    clp_config: ClpIoConfig,
    db_config_file_path: pathlib.Path,
    enable_s3_write: bool,
):
+   if not isinstance(enable_s3_write, bool):
+       raise TypeError("enable_s3_write must be a boolean")

Also applies to: 130-132


141-141: Update docstring parameter descriptions.

The docstring needs to be updated to include detailed descriptions for the new worker_config parameter and its usage.

    """
    Compresses files from an FS into archives on an FS

-   :param worker_config: WorkerConfig
+   :param worker_config: Configuration object containing worker-specific settings including
+                        storage engine, data directory, and archive output settings
    :param clp_config: ClpIoConfig

Also applies to: 155-172

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6ff92fc and a061a29.

📒 Files selected for processing (3)
  • components/clp-py-utils/pyproject.toml (2 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (12 hunks)
  • components/job-orchestration/job_orchestration/executor/utils.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • components/job-orchestration/job_orchestration/executor/utils.py
  • components/clp-py-utils/pyproject.toml
🔇 Additional comments (5)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (5)

7-7: LGTM! Import changes are well-structured.

The new imports for S3 and worker configuration types are properly organized and aligned with the new functionality.

Also applies to: 16-19


179-191: LGTM! S3 configuration handling is robust.

The code properly checks storage type compatibility and handles S3 configuration initialization.


314-323: LGTM! Error handling is well-structured.

The error handling combines compression and S3 upload errors effectively, providing clear error messages.


243-274: 🛠️ Refactor suggestion

Consider implementing retry logic for S3 uploads.

The S3 upload implementation is functional but lacks retry logic for transient failures. Also, consider moving the file deletion logic to after verifying the upload success.

    if enable_s3_write:
        archive_id = last_archive_stats["id"]
        src_archive_file = archive_output_dir / archive_id

        if s3_error is None:
            logger.info(f"Uploading archive {archive_id} to S3...")
+           max_retries = 3
+           retry_count = 0
+           while retry_count < max_retries:
+               try:
                    result = s3_put(s3_config, src_archive_file, archive_id)
                    if result.is_err():
+                       retry_count += 1
+                       if retry_count < max_retries:
+                           logger.warning(f"Retry {retry_count}/{max_retries} for archive {archive_id}")
+                           continue
                        logger.error(f"Failed to upload archive {archive_id}: {result.err_value}")
                        s3_error = result.err_value
                        proc.terminate()
+                       break
                    else:
                        logger.info(f"Finished uploading archive {archive_id} to S3.")
+                       src_archive_file.unlink()
+                       break
+               except Exception as e:
+                   retry_count += 1
+                   if retry_count < max_retries:
+                       logger.warning(f"Retry {retry_count}/{max_retries} for archive {archive_id}")
+                       continue
+                   logger.error(f"Unexpected error during S3 upload: {str(e)}")
+                   s3_error = str(e)
+                   proc.terminate()
+                   break

-           src_archive_file.unlink()

Likely invalid or redundant comment.


344-358: Consider adding configuration validation.

While the worker configuration loading is implemented correctly, consider adding validation for required S3 configuration when S3 storage is enabled.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (3)

179-191: Enhance error message clarity for storage engine compatibility.

The error message could be more helpful by specifying which storage engines support S3 storage.

Consider updating the error message:

-            error_msg = f"S3 storage is not supported for the {clp_storage_engine} storage engine."
+            error_msg = f"S3 storage is only supported for CLP-S storage engine. Current engine: {clp_storage_engine}"

243-290: Consider extracting S3 upload logic to improve readability.

The loop handles multiple responsibilities (compression monitoring, S3 upload, and database updates). Consider extracting the S3 upload logic into a separate function.

+def handle_s3_upload(
+    archive_stats: Dict[str, Any],
+    archive_output_dir: pathlib.Path,
+    s3_config: S3Config,
+) -> Optional[str]:
+    """
+    Handles S3 upload for a single archive.
+    Returns error message if upload fails, None otherwise.
+    """
+    archive_id = archive_stats["id"]
+    archive_path = archive_output_dir / archive_id
+
+    logger.info(f"Uploading archive {archive_id} to S3...")
+    result = s3_put(s3_config, archive_path, archive_id)
+
+    if result.is_err():
+        logger.error(f"Failed to upload archive {archive_id}: {result.err_value}")
+        return result.err_value
+
+    logger.info(f"Finished uploading archive {archive_id} to S3.")
+    return None

     while not last_line_decoded:
         # ... existing loop setup ...
         if last_archive_stats is not None and (
             None is stats or stats["id"] != last_archive_stats["id"]
         ):
             if enable_s3_write:
-                archive_id = last_archive_stats["id"]
-                archive_path = archive_output_dir / archive_id
-
-                if s3_error is None:
-                    logger.info(f"Uploading archive {archive_id} to S3...")
-                    result = s3_put(s3_config, archive_path, archive_id)
-
-                    if result.is_err():
-                        logger.error(f"Failed to upload archive {archive_id}: {result.err_value}")
-                        s3_error = result.err_value
-                        proc.terminate()
-                    else:
-                        logger.info(f"Finished uploading archive {archive_id} to S3.")
+                if s3_error is None:
+                    s3_error = handle_s3_upload(last_archive_stats, archive_output_dir, s3_config)
+                    if s3_error is not None:
+                        proc.terminate()

344-358: Include exception details in the error message.

The error message could be more helpful by including the actual exception details.

-        error_msg = "Failed to load worker config"
+        error_msg = f"Failed to load worker config: {str(ex)}"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a061a29 and 8301748.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (12 hunks)
🔇 Additional comments (3)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (3)

Line range hint 7-23: LGTM! Import changes enhance type safety.

The additions of Optional type and new configuration classes improve type safety and code clarity.


Line range hint 117-132: LGTM! Command generation changes are well-implemented.

The addition of the enable_s3_write parameter and conditional flag handling is clean and logical.


314-323: LGTM! Error handling improvements provide better context.

The combination of error messages from different sources helps users better understand what went wrong.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
components/clp-package-utils/clp_package_utils/scripts/del_archives.py (1)

62-66: Enhance error message for better user guidance

The storage type validation is well-structured, but the error message could be more helpful for S3 users.

Consider updating the error message to be more specific:

-        logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
+        logger.error(f"Archive deletion for {storage_type} storage is not yet supported. Currently, only local filesystem (FS) archives can be deleted.")

This change would:

  1. Clarify that this is a temporary limitation
  2. Explicitly state which storage type is supported
  3. Help users understand their options
components/clp-package-utils/clp_package_utils/scripts/decompress.py (1)

164-168: Consider refactoring storage validation

The storage type validation is duplicated between file and stream extraction. Consider extracting this into a helper function:

+def validate_fs_storage(storage_type: StorageType, operation: str) -> bool:
+    """
+    Validates if the storage type is filesystem.
+    :param storage_type: The storage type to validate
+    :param operation: The operation being performed (for error messaging)
+    :return: True if valid, False otherwise
+    """
+    if StorageType.FS != storage_type:
+        logger.error(f"{operation} is not supported for storage type: {storage_type}.")
+        return False
+    return True

 def handle_extract_file_cmd(...):
     # ...
     storage_type = clp_config.archive_output.storage.type
-    if StorageType.FS != storage_type:
-        logger.error(f"File extraction is not supported for storage type: {storage_type}.")
-        return -1
+    if not validate_fs_storage(storage_type, "File extraction"):
+        return -1

 def handle_extract_stream_cmd(...):
     # ...
     storage_type = clp_config.archive_output.storage.type
-    if StorageType.FS != storage_type:
-        logger.error(f"Stream extraction is not supported for storage type: {storage_type}.")
-        return -1
+    if not validate_fs_storage(storage_type, "Stream extraction"):
+        return -1
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8301748 and 2ada464.

📒 Files selected for processing (3)
  • components/clp-package-utils/clp_package_utils/scripts/decompress.py (3 hunks)
  • components/clp-package-utils/clp_package_utils/scripts/del_archives.py (2 hunks)
  • components/clp-package-utils/clp_package_utils/scripts/native/decompress.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • components/clp-package-utils/clp_package_utils/scripts/native/decompress.py
🔇 Additional comments (4)
components/clp-package-utils/clp_package_utils/scripts/del_archives.py (1)

7-8: LGTM! Clean import addition

The StorageType import is properly placed and necessary for the new storage type validation.

components/clp-package-utils/clp_package_utils/scripts/decompress.py (3)

8-8: LGTM: Clean import addition

The StorageType import is properly placed and necessary for the new storage validation functionality.


84-88: Add documentation for storage type limitation

The storage type validation is well-implemented, but please add a docstring note about this limitation. Consider:

 def handle_extract_file_cmd(
     parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
 ) -> int:
     """
     Handles the file extraction command.
+    Note: Currently only supports filesystem (FS) storage type.
     :param parsed_args:
     :param clp_home:
     :param default_config_file_path:
     :return: 0 on success, -1 otherwise.
     """

Let's verify the error handling:

✅ Verification successful

Documentation suggestion is valid, but error handling is well-implemented

The error handling for storage type validation is consistently implemented across multiple components:

  • The error message is clear and follows the same pattern in related files (decompress.py, del_archives.py)
  • Similar storage type checks exist in other components (fs_search_task.py, extract_stream_task.py)
  • The StorageType enum is properly defined in clp_py_utils/clp_config.py
  • Early validation prevents unnecessary resource allocation

The original suggestion to add documentation about FS-only support remains valid, as it helps developers understand the limitation upfront.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if there are any error handling tests for storage type validation

# Search for test files related to decompress.py
fd -e py "test.*decompress" -x rg -l "StorageType" {}

# Search for related error handling patterns
rg -A 5 "storage_type.*StorageType.FS" --type py

Length of output: 106


Script:

#!/bin/bash
# Let's try a different approach to find test files and error handling

# Find all test files in the package
fd "test.*\.py" components/clp-package-utils

# Search for StorageType usage in Python files
rg "StorageType" --type py -B 2 -A 2

# Look for any test files containing "extract" or "decompress"
fd "test.*\.py" | rg -l "extract|decompress"

Length of output: 10939


84-88: Overall implementation is robust and well-aligned with PR objectives

The storage type validation changes effectively prevent extraction operations for S3 storage, which aligns with the PR objectives. The implementation is consistent across both extraction methods and provides clear error messages.

Consider adding integration tests to verify the behaviour with different storage types.

Let's check for existing integration tests:

Also applies to: 164-168

Comment on lines 164 to 167
storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Stream extraction is not supported for storage type: {storage_type}.")
return -1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to the search script as well? Technically we're going to implement it soon, but better to have a working package for now until that happens.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (4)

Line range hint 141-172: Consider enhancing the docstring for WorkerConfig parameter

The docstring should include details about the expected structure and required fields of the WorkerConfig parameter.


179-191: Enhance error message for storage engine compatibility

Consider including more details in the error message about which storage engines are supported with S3 storage.

-            error_msg = f"S3 storage is not supported for the {clp_storage_engine} storage engine."
+            error_msg = f"S3 storage is only supported for CLP_S storage engine. Current engine: {clp_storage_engine}"

319-327: Consider categorizing errors for better debugging

The error message concatenation is good, but consider adding error categories or codes to help with debugging and monitoring.

         error_msgs = []
         if compression_successful is False:
-            error_msgs.append(f"See logs {stderr_log_path}")
+            error_msgs.append(f"[COMPRESSION_ERROR] See logs {stderr_log_path}")
         if s3_error is not None:
-            error_msgs.append(s3_error)
+            error_msgs.append(f"[S3_ERROR] {s3_error}")

348-362: Enhance worker config error reporting

Consider including the specific error details in the error message to aid in troubleshooting.

     except Exception as ex:
-        error_msg = "Failed to load worker config"
+        error_msg = f"Failed to load worker config: {str(ex)}"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2ada464 and 6e5aad5.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (12 hunks)
🔇 Additional comments (2)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (2)

Line range hint 117-132: LGTM: Clean implementation of S3 write support

The conditional addition of the --single-file-archive flag based on enable_s3_write is well-implemented.


243-294: Verify handling of partial uploads and cleanup

The current implementation handles basic error cases, but consider verifying:

  1. Handling of partial uploads
  2. Cleanup of temporary files in all error scenarios
  3. Network timeout handling

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
components/clp-package-utils/clp_package_utils/scripts/search.py (2)

78-82: Enhance the error message for better user guidance.

The storage type validation is well-placed and correctly prevents unsupported operations. However, the error message could be more helpful to users.

Consider this improvement:

-        logger.error(f"Search is not supported for storage type: {storage_type}.")
+        logger.error(f"Search operation is currently only supported for filesystem storage (FS). Detected storage type: {storage_type}. Please ensure your archive_output.storage.type is set to FS in the configuration.")

78-82: Consider implementing a storage capability system.

As more storage types are added, managing feature support through simple conditionals might become unwieldy. Consider implementing a capability system where storage types can declare their supported operations.

This could be implemented as:

  1. Define a set of capabilities (e.g., SEARCH, WRITE, READ)
  2. Have each storage type declare its supported capabilities
  3. Use capability checking instead of storage type checking

Example implementation:

class StorageCapability(Enum):
    SEARCH = "search"
    WRITE = "write"
    READ = "read"

# In storage type classes:
def get_capabilities(self) -> Set[StorageCapability]:
    return {StorageCapability.WRITE, StorageCapability.READ}  # For S3
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6e5aad5 and 55c0f36.

📒 Files selected for processing (1)
  • components/clp-package-utils/clp_package_utils/scripts/search.py (2 hunks)
🔇 Additional comments (1)
components/clp-package-utils/clp_package_utils/scripts/search.py (1)

10-10: LGTM! Import statement is appropriate.

The StorageType import is correctly placed and necessary for the new storage validation functionality.

@@ -57,6 +59,11 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not changing to "archive storage type" since we have "Archive" at the beginning of the sentence.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (5)

Line range hint 117-132: Add type annotation for enable_s3_write parameter.

The new parameter should have a type annotation for consistency with other parameters.

def make_clp_s_command(
    clp_home: pathlib.Path,
    archive_output_dir: pathlib.Path,
    clp_config: ClpIoConfig,
    db_config_file_path: pathlib.Path,
-   enable_s3_write: bool,
+   enable_s3_write: bool
):

Line range hint 141-172: Enhance docstring for worker_config parameter.

The docstring should explain what configuration options are expected in the WorkerConfig object and their impact on the function's behavior.

    """
    Compresses files from an FS into archives on an FS

-   :param worker_config: WorkerConfig
+   :param worker_config: WorkerConfig containing storage configuration (storage type, S3 config),
+                        data directory, and archive output settings
    :param clp_config: ClpIoConfig
    :param clp_home:

179-191: Consider restructuring S3 configuration validation.

The code could be more readable with early returns and clearer separation of concerns.

    s3_config: S3Config
    enable_s3_write = False
    storage_type = worker_config.archive_output.storage.type
-   if StorageType.S3 == storage_type:
-       if StorageEngine.CLP_S != clp_storage_engine:
-           error_msg = f"S3 storage is not supported for storage engine: {clp_storage_engine}."
-           logger.error(error_msg)
-           return False, {"error_message": error_msg}
-
-       s3_config = worker_config.archive_output.storage.s3_config
-       enable_s3_write = True
+   if StorageType.S3 != storage_type:
+       return True, {}
+
+   if StorageEngine.CLP_S != clp_storage_engine:
+       error_msg = f"S3 storage is not supported for storage engine: {clp_storage_engine}."
+       logger.error(error_msg)
+       return False, {"error_message": error_msg}
+
+   s3_config = worker_config.archive_output.storage.s3_config
+   enable_s3_write = True

319-327: Consider using structured error types.

The current error handling concatenates error messages as strings. Consider using a structured error type for better error handling and reporting.

from dataclasses import dataclass
from typing import List, Optional

@dataclass
class CompressionError:
    type: str
    message: str
    details: Optional[Dict[str, Any]] = None

def format_errors(errors: List[CompressionError]) -> str:
    return "\n".join(f"{error.type}: {error.message}" for error in errors)

348-362: Enhance error message for configuration loading failures.

The current error message doesn't provide enough context about what specifically failed during configuration loading.

    try:
        worker_config = WorkerConfig.parse_obj(
            read_yaml_config_file(pathlib.Path(os.getenv("CLP_CONFIG_PATH")))
        )
    except Exception as ex:
-       error_msg = "Failed to load worker config"
+       error_msg = f"Failed to load worker config: {str(ex)}"
        logger.exception(error_msg)
        return CompressionTaskResult(
            task_id=task_id,
            status=CompressionTaskStatus.FAILED,
            duration=0,
            error_message=error_msg,
        )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2d7443e and 120ffec.

📒 Files selected for processing (2)
  • components/clp-package-utils/clp_package_utils/scripts/search.py (2 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (12 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • components/clp-package-utils/clp_package_utils/scripts/search.py
🔇 Additional comments (3)
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (3)

Line range hint 7-23: LGTM! Well-organized imports with proper type hints.

The addition of type hints and new imports for S3-related functionality improves code clarity and type safety.


205-205: LGTM! Proper parameter passing.

The enable_s3_write parameter is correctly passed to make_clp_s_command.


371-374: LGTM! Proper parameter updates.

The run_clp call correctly uses the new worker_config parameter and removes redundant parameters.

Comment on lines +243 to +294
# Handle job metadata update and s3 write if enabled
s3_error = None
while not last_line_decoded:
line = proc.stdout.readline()
if not line:
break
stats = json.loads(line.decode("ascii"))
if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]:
# We've started a new archive so add the previous archive's last
# reported size to the total
total_uncompressed_size += last_archive_stats["uncompressed_size"]
total_compressed_size += last_archive_stats["size"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
update_job_metadata_and_tags(
db_cursor,
job_id,
clp_metadata_db_connection_config["table_prefix"],
tag_ids,
last_archive_stats,
)
db_conn.commit()
stats: Optional[Dict[str, Any]] = None
if "" == line:
# Skip empty lines that could be caused by potential errors in printing archive stats
continue

if line is not None:
stats = json.loads(line.decode("ascii"))
else:
last_line_decoded = True

if last_archive_stats is not None and (
None is stats or stats["id"] != last_archive_stats["id"]
):
if enable_s3_write:
archive_id = last_archive_stats["id"]
archive_path = archive_output_dir / archive_id

if s3_error is None:
logger.info(f"Uploading archive {archive_id} to S3...")
result = s3_put(s3_config, archive_path, archive_id)

if result.is_err():
logger.error(f"Failed to upload archive {archive_id}: {result.err_value}")
s3_error = result.err_value
# NOTE: It's possible `proc` finishes before we call `terminate` on it, in
# which case the process will still return success.
proc.terminate()
else:
logger.info(f"Finished uploading archive {archive_id} to S3.")

archive_path.unlink()

if s3_error is None:
# We've started a new archive so add the previous archive's last reported size to
# the total
total_uncompressed_size += last_archive_stats["uncompressed_size"]
total_compressed_size += last_archive_stats["size"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
update_job_metadata_and_tags(
db_cursor,
job_id,
clp_metadata_db_connection_config["table_prefix"],
tag_ids,
last_archive_stats,
)
db_conn.commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider improving error handling and transaction management.

The database operations within the loop could fail, and the current error handling might not properly rollback transactions.

  1. Wrap database operations in try-except blocks
  2. Ensure proper transaction handling
  3. Consider moving S3 upload logic to a separate function
            if s3_error is None:
                total_uncompressed_size += last_archive_stats["uncompressed_size"]
                total_compressed_size += last_archive_stats["size"]
+               try:
                    with closing(sql_adapter.create_connection(True)) as db_conn, closing(
                        db_conn.cursor(dictionary=True)
                    ) as db_cursor:
                        update_job_metadata_and_tags(
                            db_cursor,
                            job_id,
                            clp_metadata_db_connection_config["table_prefix"],
                            tag_ids,
                            last_archive_stats,
                        )
                        db_conn.commit()
+               except Exception as e:
+                   logger.error(f"Failed to update database: {e}")
+                   s3_error = str(e)
+                   proc.terminate()

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Member

@kirkrodrigues kirkrodrigues left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the PR title, how about:

feat(package)!: Add support for writing clp-s single file archives to S3.

@haiqi96 haiqi96 changed the title feat(package): Add support for writing clp-s single file archive to s3. feat(package)!: Add support for writing clp-s single file archives to S3. Dec 19, 2024
@haiqi96 haiqi96 merged commit 37263eb into y-scope:main Dec 19, 2024
7 checks passed
davidlion pushed a commit to Bill-hbrhbr/clp that referenced this pull request Dec 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants