From 12d6b971cab9ba2f59443dfe8ffad69996ee4920 Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Fri, 3 Jan 2025 15:53:43 -0500 Subject: [PATCH] Some captilization and update to the docstrings --- components/clp-py-utils/clp_py_utils/s3_utils.py | 5 ++++- .../job_orchestration/executor/compress/compression_task.py | 4 ++-- .../scheduler/compress/compression_scheduler.py | 6 +++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py index f2d7de605..90b5ead09 100644 --- a/components/clp-py-utils/clp_py_utils/s3_utils.py +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -47,7 +47,7 @@ def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]: def parse_s3_url(s3_url: str) -> Tuple[str, str, str]: """ - Parses the region_code, bucket and key_prefix from the given s3 url. The url must be either a + Parses the region_code, bucket and key_prefix from the given S3 url. The url must be either a host_style_url or path_style_url. :param s3_url: a host_style_url or path_style_url. :return: A tuple of (region_code, bucket, key_prefix) @@ -90,6 +90,9 @@ def generate_s3_virtual_hosted_style_url( def get_s3_object_metadata(s3_input_config: S3InputConfig) -> Result[List[FileMetadata], str]: """ Gets the metadata of objects under the / specified by s3_input_config. + Note: We reuse FileMetadata class to store the metadata of S3 objects. The object_key is stored + as path in FileMetadata. + :param s3_input_config: S3 configuration specifying the bucket, key_prefix and credentials. :return: Result.OK(List[FileMetadata]) containing the object metadata on success, otherwise Result.Err(str) with the error message. diff --git a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py index b35bffe86..fdc541361 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py @@ -246,7 +246,7 @@ def run_clp( yaml.safe_dump(clp_metadata_db_connection_config, db_config_file) db_config_file.close() - # Get s3 config + # Get S3 config s3_config: S3Config enable_s3_write = False storage_type = worker_config.archive_output.storage.type @@ -310,7 +310,7 @@ def run_clp( total_uncompressed_size = 0 total_compressed_size = 0 - # Handle job metadata update and s3 write if enabled + # Handle job metadata update and S3 write if enabled s3_error = None while not last_line_decoded: stats: Optional[Dict[str, Any]] = None diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index c9cb5a9cf..8e69c3ce9 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -128,7 +128,7 @@ def _process_s3_input( ) -> Result[bool, str]: """ Iterate through all objects under the / specified by s3_input_config, - and adds their metadata to the paths_to_compress_buffer + and adds their metadata to the paths_to_compress_buffer. :param s3_input_config: S3 configuration specifying the bucket, key_prefix and credentials. :param paths_to_compress_buffer: PathsToCompressBuffer containing the scheduling information :return: Result.OK(True) on success, or Result.Err(str) with the error message otherwise. @@ -137,7 +137,7 @@ def _process_s3_input( res = get_s3_object_metadata(s3_input_config) if res.is_err(): - logger.error(f"Failed to process s3 input: {res.err_value}") + logger.error(f"Failed to process S3 input: {res.err_value}") return res object_metadata_list = res.ok_value @@ -189,7 +189,7 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection job_id, { "status": CompressionJobStatus.FAILED, - "status_msg": f"Scheduler Failed for s3 input: {res.err_value}", + "status_msg": f"Scheduler Failed for S3 input: {res.err_value}", }, ) db_conn.commit()