Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for clp-s s3 ingestion #651

Open
wants to merge 63 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
ca46dca
First version backup
haiqi96 Dec 11, 2024
b763e8b
Small refactor
haiqi96 Dec 11, 2024
4e9529c
First trial for new config
haiqi96 Dec 11, 2024
e9cdea4
Further refactor and polishing
haiqi96 Dec 11, 2024
9ba0a38
Another small refactor
haiqi96 Dec 12, 2024
58befef
small refactor again
haiqi96 Dec 12, 2024
35ec0c3
Combine s3 utils
haiqi96 Dec 12, 2024
5d57b10
Support handling S3 error message
haiqi96 Dec 12, 2024
9991307
Slight logging modification
haiqi96 Dec 12, 2024
5d23790
Linter
haiqi96 Dec 12, 2024
b4bb2af
Add extra verification
haiqi96 Dec 12, 2024
f41c558
Update components/clp-py-utils/clp_py_utils/clp_config.py
haiqi96 Dec 12, 2024
ce5a667
do nothing for now
haiqi96 Dec 12, 2024
f05dc88
backup changes for worker config
haiqi96 Dec 12, 2024
abf5dde
More support
haiqi96 Dec 13, 2024
7d34456
Remove unnecssary change
haiqi96 Dec 13, 2024
a7afd0d
Linter
haiqi96 Dec 13, 2024
99d3094
Handle mount for fs & S3
haiqi96 Dec 13, 2024
1afed1a
Linter
haiqi96 Dec 13, 2024
1de661a
Remove unused functions
haiqi96 Dec 13, 2024
ce3de98
Update components/job-orchestration/job_orchestration/executor/compre…
haiqi96 Dec 13, 2024
f49664f
simplify worker config
haiqi96 Dec 13, 2024
046cdcb
polishing
haiqi96 Dec 13, 2024
242dec2
linter
haiqi96 Dec 14, 2024
ed280cb
Apply suggestions from code review
haiqi96 Dec 16, 2024
0788e59
Fix easier ones
haiqi96 Dec 16, 2024
c198f27
Backup changes
haiqi96 Dec 16, 2024
4819f76
Small fixes
haiqi96 Dec 16, 2024
e5f43fb
fixes
haiqi96 Dec 16, 2024
1246062
add safeguard for archive update failure
haiqi96 Dec 17, 2024
3b870a4
Add docstrings
haiqi96 Dec 17, 2024
214ae3f
Apply suggestions from code review
haiqi96 Dec 18, 2024
6ff92fc
Clean up
haiqi96 Dec 18, 2024
9e07d37
update pyproject.toml
haiqi96 Dec 18, 2024
915b49d
Add docstrings
haiqi96 Dec 18, 2024
a061a29
Apply suggestions from code review
haiqi96 Dec 18, 2024
8301748
Update name as suggested by the code review
haiqi96 Dec 18, 2024
2ada464
a few small fixes to ensure other scripts still work
haiqi96 Dec 18, 2024
6e5aad5
adding safeguard for empty stdout line from clp.
haiqi96 Dec 18, 2024
55c0f36
add safe guard for search
haiqi96 Dec 18, 2024
2d7443e
Polish error messages.
haiqi96 Dec 18, 2024
6f907b2
Linter
haiqi96 Dec 18, 2024
120ffec
Slighlty improve the error message
haiqi96 Dec 18, 2024
d5eae21
Back up
haiqi96 Dec 17, 2024
ce2b440
Backup
haiqi96 Dec 19, 2024
6d2b815
Merge branch 'main' into s3_scheduler
haiqi96 Dec 19, 2024
b8f715d
Update execution image dependency
haiqi96 Dec 19, 2024
57e1912
simplify the code a little bit
haiqi96 Dec 19, 2024
27b8612
fix a previous mistake
haiqi96 Dec 19, 2024
d55f1ad
Keep fixing previous mistake
haiqi96 Dec 19, 2024
4de4fee
add url parsing helper
haiqi96 Dec 19, 2024
4224bd6
Linter
haiqi96 Dec 20, 2024
1cf3d01
Some refactor
haiqi96 Dec 20, 2024
b1655cd
Refactor compress scripts
haiqi96 Dec 20, 2024
d12e173
Initial support for cmdline
haiqi96 Jan 2, 2025
6833ee9
Linter fixes
haiqi96 Jan 2, 2025
a4e92ae
add argument checks
haiqi96 Jan 2, 2025
a638f2d
Polishing
haiqi96 Jan 3, 2025
5685224
Add some docstrings
haiqi96 Jan 3, 2025
fd9dba2
fixes
haiqi96 Jan 3, 2025
f7a175c
Rename task script
haiqi96 Jan 3, 2025
20488a1
fixes
haiqi96 Jan 3, 2025
12d6b97
Some captilization and update to the docstrings
haiqi96 Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 175 additions & 49 deletions components/clp-package-utils/clp_package_utils/scripts/compress.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import subprocess
import sys
import uuid
from typing import List

from clp_py_utils.clp_config import CLPConfig, StorageEngine
from clp_py_utils.s3_utils import parse_aws_credentials_file
from job_orchestration.scheduler.job_config import InputType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
CONTAINER_INPUT_LOGS_ROOT_DIR,
dump_container_config,
generate_container_config,
generate_container_name,
Expand All @@ -21,28 +25,165 @@
logger = logging.getLogger(__file__)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
def _generate_targets_list(
container_targets_list_path: pathlib.Path,
parsed_args: argparse.Namespace,
) -> None:
input_type = parsed_args.input_type

if InputType.FS == input_type:
compression_targets_list_file = parsed_args.path_list
with open(container_targets_list_path, "w") as container_targets_list_file:
if compression_targets_list_file is not None:
with open(compression_targets_list_file, "r") as targets_list_file:
for line in targets_list_file:
resolved_path = pathlib.Path(line.rstrip()).resolve()
container_targets_list_file.write(f"{resolved_path}\n")

for path in parsed_args.paths:
resolved_path = pathlib.Path(path).resolve()
container_targets_list_file.write(f"{resolved_path}\n")

elif InputType.S3 == input_type:
with open(container_targets_list_path, "w") as container_targets_list_file:
container_targets_list_file.write(f"{parsed_args.url}\n")

else:
raise ValueError(f"Unsupported input type: {input_type}.")


args_parser = argparse.ArgumentParser(description="Compresses files/directories")
def _generate_compress_cmd(
parsed_args: argparse.Namespace, config_path: pathlib.Path, target_list_path: pathlib.Path
) -> List[str]:
input_type = parsed_args.input_type

# fmt: off
compress_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.compress",
input_type,
"--config", str(config_path)
]
# fmt: on
if parsed_args.timestamp_key is not None:
compress_cmd.append("--timestamp-key")
compress_cmd.append(parsed_args.timestamp_key)
if parsed_args.tags is not None:
compress_cmd.append("--tags")
compress_cmd.append(parsed_args.tags)
if parsed_args.no_progress_reporting is True:
compress_cmd.append("--no-progress-reporting")

if InputType.FS == input_type:
pass
elif InputType.S3 == input_type:
aws_access_key_id = parsed_args.aws_access_key_id
aws_secret_access_key = parsed_args.aws_secret_access_key
if parsed_args.aws_credentials_file:
aws_access_key_id, aws_secret_access_key = parse_aws_credentials_file(
pathlib.Path(parsed_args.aws_credentials_file)
)
if aws_access_key_id and aws_secret_access_key:
compress_cmd.append("--aws-access-key-id")
compress_cmd.append(aws_access_key_id)
compress_cmd.append("--aws-secret-access-key")
compress_cmd.append(aws_secret_access_key)
else:
raise ValueError(f"Unsupported input type: {input_type}.")

compress_cmd.append("--target-list")
compress_cmd.append(str(target_list_path))

return compress_cmd


def _add_common_arguments(
args_parser: argparse.ArgumentParser, default_config_file_path: pathlib.Path
) -> None:
args_parser.add_argument(
"--config",
"-c",
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.")
args_parser.add_argument(
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
)
args_parser.add_argument(
"--timestamp-key",
help="The path (e.g. x.y) for the field containing the log event's timestamp.",
)
args_parser.add_argument(
"-t", "--tags", help="A comma-separated list of tags to apply to the compressed archives."
)
args_parser.add_argument(
"--no-progress-reporting", action="store_true", help="Disables progress reporting."
)


def _validate_fs_input_args(
parsed_args: argparse.Namespace,
args_parser: argparse.ArgumentParser,
) -> None:
# Validate some input paths were specified
if len(parsed_args.paths) == 0 and parsed_args.path_list is None:
args_parser.error("No paths specified.")

# Validate paths were specified using only one method
if len(parsed_args.paths) > 0 and parsed_args.path_list is not None:
args_parser.error("Paths cannot be specified on the command line AND through a file.")


def _validate_s3_input_args(
parsed_args: argparse.Namespace, args_parser: argparse.ArgumentParser, clp_config: CLPConfig
) -> None:
if StorageEngine.CLP_S != clp_config.package.storage_engine:
raise ValueError(
f"input type {InputType.S3} is only supported for the storage engine {StorageEngine.CLP_S}."
)

# Validate aws credentials were specified using only one method
aws_credential_file = parsed_args.aws_credentials_file
aws_access_key_id = parsed_args.aws_access_key_id
aws_secret_access_key = parsed_args.aws_secret_access_key
if aws_credential_file is not None:
if not pathlib.Path(aws_credential_file).exists():
raise ValueError(f"credentials file {aws_credential_file} doesn't exist.")

if aws_access_key_id is not None or aws_secret_access_key is not None:
args_parser.error(
"aws_credentials_file can not be specified together with aws_access_key_id or aws_secret_access_key."
)

elif bool(aws_access_key_id) != bool(aws_secret_access_key):
args_parser.error(
"aws_access_key_id and aws_secret_access_key must be both specified or left unspecified."
)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(description="Compresses files from filesystem/s3")
input_type_args_parser = args_parser.add_subparsers(dest="input_type")

fs_compressor_parser = input_type_args_parser.add_parser(InputType.FS)
_add_common_arguments(fs_compressor_parser, default_config_file_path)
fs_compressor_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.")
fs_compressor_parser.add_argument(
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
)

s3_compressor_parser = input_type_args_parser.add_parser(InputType.S3)
_add_common_arguments(s3_compressor_parser, default_config_file_path)
s3_compressor_parser.add_argument("url", metavar="URL", help="URL of object to be compressed")
s3_compressor_parser.add_argument(
"--aws-access-key-id", type=str, default=None, help="AWS access key id."
)
s3_compressor_parser.add_argument(
"--aws-secret-access-key", type=str, default=None, help="AWS secret access key."
)
s3_compressor_parser.add_argument(
"--aws-credentials-file", type=str, default=None, help="Path to AWS credentials file."
)

parsed_args = args_parser.parse_args(argv[1:])

Expand All @@ -58,6 +199,14 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

input_type = parsed_args.input_type
if InputType.FS == input_type:
_validate_fs_input_args(parsed_args, args_parser)
elif InputType.S3 == input_type:
_validate_s3_input_args(parsed_args, args_parser, clp_config)
else:
raise ValueError(f"Unsupported input type: {input_type}.")

container_name = generate_container_name(str(JobType.COMPRESSION))

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
Expand All @@ -66,49 +215,26 @@ def main(argv):
)

necessary_mounts = [mounts.clp_home, mounts.input_logs_dir, mounts.data_dir, mounts.logs_dir]
container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
compress_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.compress",
"--config", str(generated_config_path_on_container),
"--remove-path-prefix", str(CONTAINER_INPUT_LOGS_ROOT_DIR),
]
# fmt: on
if parsed_args.timestamp_key is not None:
compress_cmd.append("--timestamp-key")
compress_cmd.append(parsed_args.timestamp_key)
if parsed_args.tags is not None:
compress_cmd.append("--tags")
compress_cmd.append(parsed_args.tags)
for path in parsed_args.paths:
# Resolve path and prefix it with CONTAINER_INPUT_LOGS_ROOT_DIR
resolved_path = pathlib.Path(path).resolve()
path = str(CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(resolved_path.anchor))
compress_cmd.append(path)
if parsed_args.path_list is not None:
# Write targets to compress to a file
while True:
# Get unused output path
while True:
container_path_list_filename = f"{uuid.uuid4()}.txt"
container_path_list_path = clp_config.logs_directory / container_path_list_filename
if not container_path_list_path.exists():
break

with open(parsed_args.path_list, "r") as path_list_file:
with open(container_path_list_path, "w") as container_path_list_file:
for line in path_list_file:
resolved_path = pathlib.Path(line.rstrip()).resolve()
path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(
resolved_path.anchor
)
container_path_list_file.write(f"{path}\n")

compress_cmd.append("--path-list")
compress_cmd.append(container_clp_config.logs_directory / container_path_list_filename)
container_target_list_filename = f"{uuid.uuid4()}.txt"
container_target_list_path = clp_config.logs_directory / container_target_list_filename
path_list_path_on_container = (
container_clp_config.logs_directory / container_target_list_filename
)
if not container_target_list_path.exists():
break

_generate_targets_list(container_target_list_path, parsed_args)

container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)
compress_cmd = _generate_compress_cmd(
parsed_args, generated_config_path_on_container, path_list_path_on_container
)
cmd = container_start_cmd + compress_cmd
subprocess.run(cmd, check=True)

Expand Down
Loading
Loading