Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for clp-json to ingest logs from S3. #651

Merged
merged 79 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
ca46dca
First version backup
haiqi96 Dec 11, 2024
b763e8b
Small refactor
haiqi96 Dec 11, 2024
4e9529c
First trial for new config
haiqi96 Dec 11, 2024
e9cdea4
Further refactor and polishing
haiqi96 Dec 11, 2024
9ba0a38
Another small refactor
haiqi96 Dec 12, 2024
58befef
small refactor again
haiqi96 Dec 12, 2024
35ec0c3
Combine s3 utils
haiqi96 Dec 12, 2024
5d57b10
Support handling S3 error message
haiqi96 Dec 12, 2024
9991307
Slight logging modification
haiqi96 Dec 12, 2024
5d23790
Linter
haiqi96 Dec 12, 2024
b4bb2af
Add extra verification
haiqi96 Dec 12, 2024
f41c558
Update components/clp-py-utils/clp_py_utils/clp_config.py
haiqi96 Dec 12, 2024
ce5a667
do nothing for now
haiqi96 Dec 12, 2024
f05dc88
backup changes for worker config
haiqi96 Dec 12, 2024
abf5dde
More support
haiqi96 Dec 13, 2024
7d34456
Remove unnecssary change
haiqi96 Dec 13, 2024
a7afd0d
Linter
haiqi96 Dec 13, 2024
99d3094
Handle mount for fs & S3
haiqi96 Dec 13, 2024
1afed1a
Linter
haiqi96 Dec 13, 2024
1de661a
Remove unused functions
haiqi96 Dec 13, 2024
ce3de98
Update components/job-orchestration/job_orchestration/executor/compre…
haiqi96 Dec 13, 2024
f49664f
simplify worker config
haiqi96 Dec 13, 2024
046cdcb
polishing
haiqi96 Dec 13, 2024
242dec2
linter
haiqi96 Dec 14, 2024
ed280cb
Apply suggestions from code review
haiqi96 Dec 16, 2024
0788e59
Fix easier ones
haiqi96 Dec 16, 2024
c198f27
Backup changes
haiqi96 Dec 16, 2024
4819f76
Small fixes
haiqi96 Dec 16, 2024
e5f43fb
fixes
haiqi96 Dec 16, 2024
1246062
add safeguard for archive update failure
haiqi96 Dec 17, 2024
3b870a4
Add docstrings
haiqi96 Dec 17, 2024
214ae3f
Apply suggestions from code review
haiqi96 Dec 18, 2024
6ff92fc
Clean up
haiqi96 Dec 18, 2024
9e07d37
update pyproject.toml
haiqi96 Dec 18, 2024
915b49d
Add docstrings
haiqi96 Dec 18, 2024
a061a29
Apply suggestions from code review
haiqi96 Dec 18, 2024
8301748
Update name as suggested by the code review
haiqi96 Dec 18, 2024
2ada464
a few small fixes to ensure other scripts still work
haiqi96 Dec 18, 2024
6e5aad5
adding safeguard for empty stdout line from clp.
haiqi96 Dec 18, 2024
55c0f36
add safe guard for search
haiqi96 Dec 18, 2024
2d7443e
Polish error messages.
haiqi96 Dec 18, 2024
6f907b2
Linter
haiqi96 Dec 18, 2024
120ffec
Slighlty improve the error message
haiqi96 Dec 18, 2024
d5eae21
Back up
haiqi96 Dec 17, 2024
ce2b440
Backup
haiqi96 Dec 19, 2024
6d2b815
Merge branch 'main' into s3_scheduler
haiqi96 Dec 19, 2024
b8f715d
Update execution image dependency
haiqi96 Dec 19, 2024
57e1912
simplify the code a little bit
haiqi96 Dec 19, 2024
27b8612
fix a previous mistake
haiqi96 Dec 19, 2024
d55f1ad
Keep fixing previous mistake
haiqi96 Dec 19, 2024
4de4fee
add url parsing helper
haiqi96 Dec 19, 2024
4224bd6
Linter
haiqi96 Dec 20, 2024
1cf3d01
Some refactor
haiqi96 Dec 20, 2024
b1655cd
Refactor compress scripts
haiqi96 Dec 20, 2024
d12e173
Initial support for cmdline
haiqi96 Jan 2, 2025
6833ee9
Linter fixes
haiqi96 Jan 2, 2025
a4e92ae
add argument checks
haiqi96 Jan 2, 2025
a638f2d
Polishing
haiqi96 Jan 3, 2025
5685224
Add some docstrings
haiqi96 Jan 3, 2025
fd9dba2
fixes
haiqi96 Jan 3, 2025
f7a175c
Rename task script
haiqi96 Jan 3, 2025
20488a1
fixes
haiqi96 Jan 3, 2025
12d6b97
Some captilization and update to the docstrings
haiqi96 Jan 3, 2025
709dfb0
Merge branch 'main' into s3_scheduler
haiqi96 Jan 13, 2025
fa5ace1
Apply suggestions from code review
haiqi96 Jan 15, 2025
a4675fd
First batch of required updates
haiqi96 Jan 15, 2025
642eb81
Changes missing from the first batch
haiqi96 Jan 15, 2025
d3c0065
Fixing path processing logic
haiqi96 Jan 15, 2025
c7d629e
Fix
haiqi96 Jan 15, 2025
2ddd9c3
Use config parser (tested)
haiqi96 Jan 15, 2025
b7c746d
move config argument around
haiqi96 Jan 15, 2025
3805d36
Linter
haiqi96 Jan 15, 2025
16ad596
Update error reporting
haiqi96 Jan 15, 2025
af732b6
Small touches
haiqi96 Jan 15, 2025
356a1c3
Updatew docstrings
haiqi96 Jan 15, 2025
57afa5f
Use default user for now
haiqi96 Jan 15, 2025
99ea6dd
Linter
haiqi96 Jan 15, 2025
996692e
Fix
haiqi96 Jan 16, 2025
351f239
Apply suggestions from code review
haiqi96 Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 175 additions & 49 deletions components/clp-package-utils/clp_package_utils/scripts/compress.py
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import subprocess
import sys
import uuid
from typing import List

from clp_py_utils.clp_config import CLPConfig, StorageEngine
from clp_py_utils.s3_utils import parse_aws_credentials_file
from job_orchestration.scheduler.job_config import InputType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
CONTAINER_INPUT_LOGS_ROOT_DIR,
dump_container_config,
generate_container_config,
generate_container_name,
Expand All @@ -21,28 +25,165 @@
logger = logging.getLogger(__file__)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
def _generate_targets_list(
container_targets_list_path: pathlib.Path,
parsed_args: argparse.Namespace,
) -> None:
input_type = parsed_args.input_type

if InputType.FS == input_type:
compression_targets_list_file = parsed_args.path_list
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
with open(container_targets_list_path, "w") as container_targets_list_file:
if compression_targets_list_file is not None:
with open(compression_targets_list_file, "r") as targets_list_file:
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
for line in targets_list_file:
resolved_path = pathlib.Path(line.rstrip()).resolve()
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
container_targets_list_file.write(f"{resolved_path}\n")

for path in parsed_args.paths:
resolved_path = pathlib.Path(path).resolve()
container_targets_list_file.write(f"{resolved_path}\n")

elif InputType.S3 == input_type:
with open(container_targets_list_path, "w") as container_targets_list_file:
container_targets_list_file.write(f"{parsed_args.url}\n")

else:
raise ValueError(f"Unsupported input type: {input_type}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for paths and URLs.

The target list generation should validate:

  1. Empty paths for FS input
  2. Path existence for FS input
  3. S3 URL format for S3 input
def _generate_targets_list(
    container_targets_list_path: pathlib.Path,
    parsed_args: argparse.Namespace,
) -> None:
    input_type = parsed_args.input_type

    if InputType.FS == input_type:
        compression_targets_list_file = parsed_args.path_list
        with open(container_targets_list_path, "w") as container_targets_list_file:
            if compression_targets_list_file is not None:
                with open(compression_targets_list_file, "r") as targets_list_file:
                    for line in targets_list_file:
+                        path_str = line.rstrip()
+                        if not path_str:
+                            continue
                         resolved_path = pathlib.Path(line.rstrip()).resolve()
+                        if not resolved_path.exists():
+                            logger.warning(f"Path does not exist: {resolved_path}")
+                            continue
                         container_targets_list_file.write(f"{resolved_path}\n")

            for path in parsed_args.paths:
+                if not path:
+                    continue
                 resolved_path = pathlib.Path(path).resolve()
+                if not resolved_path.exists():
+                    logger.warning(f"Path does not exist: {resolved_path}")
+                    continue
                 container_targets_list_file.write(f"{resolved_path}\n")

    elif InputType.S3 == input_type:
        with open(container_targets_list_path, "w") as container_targets_list_file:
+            try:
+                from clp_py_utils.s3_utils import parse_s3_url
+                parse_s3_url(parsed_args.url)  # Validate URL format
                 container_targets_list_file.write(f"{parsed_args.url}\n")
+            except ValueError as e:
+                raise ValueError(f"Invalid S3 URL: {e}")

    else:
        raise ValueError(f"Unsupported input type: {input_type}.")

Committable suggestion skipped: line range outside the PR's diff.



args_parser = argparse.ArgumentParser(description="Compresses files/directories")
def _generate_compress_cmd(
parsed_args: argparse.Namespace, config_path: pathlib.Path, target_list_path: pathlib.Path
) -> List[str]:
input_type = parsed_args.input_type

# fmt: off
compress_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.compress",
input_type,
"--config", str(config_path)
]
# fmt: on
if parsed_args.timestamp_key is not None:
compress_cmd.append("--timestamp-key")
compress_cmd.append(parsed_args.timestamp_key)
if parsed_args.tags is not None:
compress_cmd.append("--tags")
compress_cmd.append(parsed_args.tags)
if parsed_args.no_progress_reporting is True:
compress_cmd.append("--no-progress-reporting")

if InputType.FS == input_type:
pass
elif InputType.S3 == input_type:
aws_access_key_id = parsed_args.aws_access_key_id
aws_secret_access_key = parsed_args.aws_secret_access_key
if parsed_args.aws_credentials_file:
aws_access_key_id, aws_secret_access_key = parse_aws_credentials_file(
pathlib.Path(parsed_args.aws_credentials_file)
)
if aws_access_key_id and aws_secret_access_key:
compress_cmd.append("--aws-access-key-id")
compress_cmd.append(aws_access_key_id)
compress_cmd.append("--aws-secret-access-key")
compress_cmd.append(aws_secret_access_key)
else:
raise ValueError(f"Unsupported input type: {input_type}.")

compress_cmd.append("--target-list")
compress_cmd.append(str(target_list_path))

return compress_cmd


def _add_common_arguments(
args_parser: argparse.ArgumentParser, default_config_file_path: pathlib.Path
) -> None:
args_parser.add_argument(
"--config",
"-c",
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.")
args_parser.add_argument(
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
)
args_parser.add_argument(
"--timestamp-key",
help="The path (e.g. x.y) for the field containing the log event's timestamp.",
)
args_parser.add_argument(
"-t", "--tags", help="A comma-separated list of tags to apply to the compressed archives."
)
args_parser.add_argument(
"--no-progress-reporting", action="store_true", help="Disables progress reporting."
)


def _validate_fs_input_args(
parsed_args: argparse.Namespace,
args_parser: argparse.ArgumentParser,
) -> None:
# Validate some input paths were specified
if len(parsed_args.paths) == 0 and parsed_args.path_list is None:
args_parser.error("No paths specified.")

# Validate paths were specified using only one method
if len(parsed_args.paths) > 0 and parsed_args.path_list is not None:
args_parser.error("Paths cannot be specified on the command line AND through a file.")


def _validate_s3_input_args(
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
parsed_args: argparse.Namespace, args_parser: argparse.ArgumentParser, clp_config: CLPConfig
) -> None:
if StorageEngine.CLP_S != clp_config.package.storage_engine:
raise ValueError(
f"input type {InputType.S3} is only supported for the storage engine {StorageEngine.CLP_S}."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)

# Validate aws credentials were specified using only one method
aws_credential_file = parsed_args.aws_credentials_file
aws_access_key_id = parsed_args.aws_access_key_id
aws_secret_access_key = parsed_args.aws_secret_access_key
if aws_credential_file is not None:
if not pathlib.Path(aws_credential_file).exists():
raise ValueError(f"credentials file {aws_credential_file} doesn't exist.")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

if aws_access_key_id is not None or aws_secret_access_key is not None:
args_parser.error(
"aws_credentials_file can not be specified together with aws_access_key_id or aws_secret_access_key."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)

elif bool(aws_access_key_id) != bool(aws_secret_access_key):
args_parser.error(
"aws_access_key_id and aws_secret_access_key must be both specified or left unspecified."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(description="Compresses files from filesystem/s3")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
input_type_args_parser = args_parser.add_subparsers(dest="input_type")

fs_compressor_parser = input_type_args_parser.add_parser(InputType.FS)
_add_common_arguments(fs_compressor_parser, default_config_file_path)
fs_compressor_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.")
fs_compressor_parser.add_argument(
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
)

s3_compressor_parser = input_type_args_parser.add_parser(InputType.S3)
_add_common_arguments(s3_compressor_parser, default_config_file_path)
s3_compressor_parser.add_argument("url", metavar="URL", help="URL of object to be compressed")
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
s3_compressor_parser.add_argument(
"--aws-access-key-id", type=str, default=None, help="AWS access key id."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)
s3_compressor_parser.add_argument(
"--aws-secret-access-key", type=str, default=None, help="AWS secret access key."
)
s3_compressor_parser.add_argument(
"--aws-credentials-file", type=str, default=None, help="Path to AWS credentials file."
)

parsed_args = args_parser.parse_args(argv[1:])

Expand All @@ -58,6 +199,14 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

input_type = parsed_args.input_type
if InputType.FS == input_type:
_validate_fs_input_args(parsed_args, args_parser)
elif InputType.S3 == input_type:
_validate_s3_input_args(parsed_args, args_parser, clp_config)
else:
raise ValueError(f"Unsupported input type: {input_type}.")

container_name = generate_container_name(str(JobType.COMPRESSION))

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
Expand All @@ -66,49 +215,26 @@ def main(argv):
)

necessary_mounts = [mounts.clp_home, mounts.input_logs_dir, mounts.data_dir, mounts.logs_dir]
container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
compress_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.compress",
"--config", str(generated_config_path_on_container),
"--remove-path-prefix", str(CONTAINER_INPUT_LOGS_ROOT_DIR),
]
# fmt: on
if parsed_args.timestamp_key is not None:
compress_cmd.append("--timestamp-key")
compress_cmd.append(parsed_args.timestamp_key)
if parsed_args.tags is not None:
compress_cmd.append("--tags")
compress_cmd.append(parsed_args.tags)
for path in parsed_args.paths:
# Resolve path and prefix it with CONTAINER_INPUT_LOGS_ROOT_DIR
resolved_path = pathlib.Path(path).resolve()
path = str(CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(resolved_path.anchor))
compress_cmd.append(path)
if parsed_args.path_list is not None:
# Write compression targets to a file
while True:
# Get unused output path
while True:
container_path_list_filename = f"{uuid.uuid4()}.txt"
container_path_list_path = clp_config.logs_directory / container_path_list_filename
if not container_path_list_path.exists():
break

with open(parsed_args.path_list, "r") as path_list_file:
with open(container_path_list_path, "w") as container_path_list_file:
for line in path_list_file:
resolved_path = pathlib.Path(line.rstrip()).resolve()
path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(
resolved_path.anchor
)
container_path_list_file.write(f"{path}\n")

compress_cmd.append("--path-list")
compress_cmd.append(container_clp_config.logs_directory / container_path_list_filename)
container_target_list_filename = f"{uuid.uuid4()}.txt"
container_target_list_path = clp_config.logs_directory / container_target_list_filename
path_list_path_on_container = (
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
container_clp_config.logs_directory / container_target_list_filename
)
if not container_target_list_path.exists():
break

_generate_targets_list(container_target_list_path, parsed_args)

container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)
compress_cmd = _generate_compress_cmd(
parsed_args, generated_config_path_on_container, path_list_path_on_container
)
cmd = container_start_cmd + compress_cmd
subprocess.run(cmd, check=True)

Expand Down
Loading
Loading