From 4c823e6995eed13c4d790c5f5d70dc2c6f417500 Mon Sep 17 00:00:00 2001 From: Mikhail Sandakov Date: Mon, 23 Oct 2023 15:35:18 +0300 Subject: [PATCH] Initial commit --- .github/ISSUE_TEMPLATE/bug_report.md | 16 + .github/ISSUE_TEMPLATE/feature_request.md | 20 + .github/workflows/tests.yml | 16 + BUCK | 17 + LICENSE | 201 ++++++++ README.md | 4 + __init__.py | 16 + action.py | 355 +++++++++++++++ dist.py | 78 ++++ dpkg.py | 76 ++++ feedback.py | 52 +++ files.py | 183 ++++++++ leapp_configs.py | 273 +++++++++++ log.py | 94 ++++ messages.py | 0 motd.py | 64 +++ packages.py | 78 ++++ plesk.py | 80 ++++ rpm.py | 163 +++++++ systemd.py | 87 ++++ tests/actionstests.py | 401 ++++++++++++++++ tests/distrotests.py | 31 ++ tests/feedbacktests.py | 67 +++ tests/filestests.py | 509 +++++++++++++++++++++ tests/leapp_configs_tests.py | 532 ++++++++++++++++++++++ tests/motdtests.py | 119 +++++ tests/rpmtests.py | 269 +++++++++++ tests/utiltests.py | 60 +++ tests/versiontests.py | 175 +++++++ util.py | 38 ++ version.py | 118 +++++ writers.py | 41 ++ 32 files changed, 4233 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/bug_report.md create mode 100644 .github/ISSUE_TEMPLATE/feature_request.md create mode 100644 .github/workflows/tests.yml create mode 100644 BUCK create mode 100644 LICENSE create mode 100644 README.md create mode 100644 __init__.py create mode 100644 action.py create mode 100644 dist.py create mode 100644 dpkg.py create mode 100644 feedback.py create mode 100644 files.py create mode 100644 leapp_configs.py create mode 100644 log.py create mode 100644 messages.py create mode 100644 motd.py create mode 100644 packages.py create mode 100644 plesk.py create mode 100644 rpm.py create mode 100644 systemd.py create mode 100644 tests/actionstests.py create mode 100644 tests/distrotests.py create mode 100644 tests/feedbacktests.py create mode 100644 tests/filestests.py create mode 100644 tests/leapp_configs_tests.py create mode 100644 tests/motdtests.py create mode 100644 tests/rpmtests.py create mode 100644 tests/utiltests.py create mode 100644 tests/versiontests.py create mode 100644 util.py create mode 100644 version.py create mode 100644 writers.py diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..c5f29cd --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,16 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '' +labels: bug +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +**Feedback archive** +*Please attach a feedback archive to the bug report.* +You could create it by calling `centos2alma --prepare-feedback`. +The archive will help us investigate the problem better and faster. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..11fc491 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,20 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: '' +labels: enhancement +assignees: '' + +--- + +**Is your feature request related to a problem? Please describe.** +A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] + +**Describe the solution you'd like** +A clear and concise description of what you want to happen. + +**Describe alternatives you've considered** +A clear and concise description of any alternative solutions or features you've considered. + +**Additional context** +Add any other context or screenshots about the feature request here. diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..61941bd --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,16 @@ +name: Test with Buck + +on: [push] + +jobs: + test: + runs-on: ubuntu-20.04 # Not latest, because python3.6 is not available on latest + # https://github.com/actions/setup-python/issues/544 + steps: + - uses: actions/checkout@v2 + - name: Perform tests + id: test + uses: SandakovMM/build-with-buck@v2 + with: + command: test + target: :libs.tests diff --git a/BUCK b/BUCK new file mode 100644 index 0000000..a53829e --- /dev/null +++ b/BUCK @@ -0,0 +1,17 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +# vim:ft=python: + +python_library( + name = 'common.lib', + srcs = glob(['*.py']), + visibility = ['PUBLIC'], +) + +python_test( + name = 'libs.tests', + srcs = glob(['./tests/*.py']), + deps = [ + ':common.lib', + ], + platform = 'py3', +) diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e7f3409 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2020 Plesk International GmbH + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a4f5d89 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +# Base library to create distupgrade/distconvert tools for instances with Plesk + +## Description +Contains common functions and classes, used to create distupgrade/distconvert tools for instances with Plesk. \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..93ba099 --- /dev/null +++ b/__init__.py @@ -0,0 +1,16 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +from .action import * +from .dist import * +from .dpkg import * +from .log import * +from .leapp_configs import * +from .messages import * +from .motd import * +from .packages import * +from .plesk import * +from .files import * +from .rpm import * +from .systemd import * +from .util import * +from .version import * +from .writers import * \ No newline at end of file diff --git a/action.py b/action.py new file mode 100644 index 0000000..bd81acf --- /dev/null +++ b/action.py @@ -0,0 +1,355 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import os +import json +import math +import time +import typing +import shutil + +from enum import Enum + +from . import files, log, writers + + +class Action(): + + def __init__(self): + self.name = "" + self.description = "" + + def __str__(self): + return "{name}!".format(name=self.name) + + def __repr__(self): + return "{classname}".format(classname=self.__class__.__name__) + + # For all estimates we assume all actions takes no more + # than 1 second by default. + # We trying to avoid estimate for small actions like + # "change one line in string" or "remove one file"... etc + def estimate_prepare_time(self): + return 1 + + def estimate_post_time(self): + return 1 + + def estimate_revert_time(self): + return 1 + + +class ActiveAction(Action): + def invoke_prepare(self): + self._prepare_action() + + def invoke_post(self): + self._post_action() + + def invoke_revert(self): + self._revert_action() + + def is_required(self) -> bool: + return self._is_required() + + def _is_required(self) -> bool: + # All actions are required by default - just to simplefy things + return True + + def _prepare_action(self): + raise NotImplementedError("Not implemented prapare action is called") + + def _post_action(self): + raise NotImplementedError("Not implemented post action is called") + + def _revert_action(self): + raise NotImplementedError("Not implemented revert action is called") + + +class ActionState(str, Enum): + success = 'success' + skiped = 'skip' + failed = 'failed' + + +class ActionsFlow(): + + def __init__(self, stages: typing.Dict[str, typing.List[Action]]): + self.stages = stages + + def __enter__(self): + return self + + def __exit__(self, *kwargs): + pass + + +class ActiveFlow(ActionsFlow): + + PATH_TO_ACTIONS_DATA = "/tmp/centos2alma_actions.json" + + def __init__(self, stages: typing.Dict[str, typing.List[ActiveAction]]): + super().__init__(stages) + self._finished = False + self.current_stage = "initiliazing" + self.current_action = "initiliazing" + self.total_time = 0 + self.error = None + + def validate_actions(self): + # Note. This one is for development porpuses only + for _, actions in self.stages.items(): + for action in actions: + if not isinstance(action, ActiveAction): + raise TypeError("Non an ActiveAction passed into action flow. Name of the action is {name!s}".format(name=action.name)) + + def pass_actions(self) -> bool: + stages = self._get_flow() + self._finished = False + + for stage_id, actions in stages.items(): + self._pre_stage(stage_id, actions) + for action in actions: + try: + if not self._is_action_required(action): + log.info("Skipped: {description!s}".format(description=action)) + self._save_action_state(action.name, ActionState.skiped) + continue + + self._invoke_action(action) + + self._save_action_state(action.name, ActionState.success) + log.info("Success: {description!s}".format(description=action)) + except Exception as ex: + self._save_action_state(action.name, ActionState.failed) + self.error = Exception("Failed: {description!s}. The reason: {error}".format(description=action, error=ex)) + log.err("Failed: {description!s}. The reason: {error}".format(description=action, error=ex)) + return False + + self._post_stage(stage_id, actions) + + self._finished = True + return True + + def _get_flow(self) -> typing.Dict[str, typing.List[ActiveAction]]: + return {} + + def _pre_stage(self, stage_id: str, actions: typing.List[ActiveAction]): + log.info("Start stage {stage}.".format(stage=stage_id)) + self.current_stage = stage_id + pass + + def _post_stage(self, stage_id: str, actions: typing.List[ActiveAction]): + pass + + def _is_action_required(self, action: ActiveAction) -> bool: + return action.is_required() + + def _invoke_action(self, action: ActiveAction) -> None: + log.info("Do: {description!s}".format(description=action)) + self.current_action = action.name + + def _save_action_state(self, name: str, state: ActionState) -> None: + pass + + def _load_actions_state(self): + if os.path.exists(self.PATH_TO_ACTIONS_DATA): + with open(self.PATH_TO_ACTIONS_DATA, "r") as actions_data_file: + return json.load(actions_data_file) + + return {"actions": []} + + def is_finished(self) -> bool: + return self._finished or self.error is not None + + def is_failed(self) -> bool: + return self.error is not None + + def get_error(self) -> Exception: + return self.error + + def get_current_stage(self) -> str: + return self.current_stage + + def get_current_action(self) -> str: + return self.current_action + + def _get_action_estimate(self, action: ActiveAction) -> int: + return action.estimate_prepare_time() + + def get_total_time(self) -> int: + if self.total_time != 0: + return self.total_time + + for _, actions in self.stages.items(): + for action in actions: + self.total_time += self._get_action_estimate(action) + + return self.total_time + + +class PrepareActionsFlow(ActiveFlow): + + def __init__(self, stages: typing.Dict[str, typing.List[ActiveAction]]): + super().__init__(stages) + self.actions_data = {} + + def __enter__(self): + self.actions_data = self._load_actions_state() + return self + + def __exit__(self, *kwargs): + files.rewrite_json_file(self.PATH_TO_ACTIONS_DATA, self.actions_data) + + def _save_action_state(self, name: str, state: ActionState) -> None: + for action in self.actions_data["actions"]: + if action["name"] == name: + action["state"] = state + return + + self.actions_data["actions"].append({"name": name, "state": state}) + + def _get_flow(self) -> typing.Dict[str, typing.List[ActiveAction]]: + return self.stages + + def _invoke_action(self, action: ActiveAction) -> None: + super()._invoke_action(action) + action.invoke_prepare() + + def _get_action_estimate(self, action: ActiveAction) -> int: + return action.estimate_prepare_time() + + +class ReverseActionFlow(ActiveFlow): + + def __enter__(self): + self.actions_data = self._load_actions_state() + return self + + def __exit__(self, *kwargs): + if os.path.exists(self.PATH_TO_ACTIONS_DATA): + os.remove(self.PATH_TO_ACTIONS_DATA) + + def _get_flow(self) -> typing.Dict[str, typing.List[ActiveAction]]: + return dict(reversed(list(self.stages.items()))) + + def _is_action_required(self, action: ActiveAction) -> bool: + # I believe the finish stage could have an action that was not performed on conversion stage + # So we ignore the case when there is no actions is persistance store + for stored_action in self.actions_data["actions"]: + if stored_action["name"] == action.name: + if stored_action["state"] == ActionState.failed or stored_action["state"] == ActionState.skiped: + return False + elif stored_action["state"] == ActionState.success: + return True + + return action.is_required() + + +class FinishActionsFlow(ReverseActionFlow): + def _invoke_action(self, action: ActiveAction) -> None: + super()._invoke_action(action) + action.invoke_post() + + def _get_action_estimate(self, action: ActiveAction) -> int: + if not self._is_action_required(action): + return 0 + return action.estimate_post_time() + + +class RevertActionsFlow(ReverseActionFlow): + def _invoke_action(self, action: ActiveAction) -> None: + super()._invoke_action(action) + action.invoke_revert() + + def _get_action_estimate(self, action: ActiveAction) -> int: + if not self._is_action_required(action): + return 0 + return action.estimate_revert_time() + + +class CheckAction(Action): + def do_check(self) -> bool: + return self._do_check() + + def _do_check(self) -> bool: + raise NotImplementedError("Not implemented check call") + + +class CheckFlow(ActionsFlow): + + def validate_actions(self): + # Note. This one is for development porpuses only + for check in self.stages: + if not isinstance(check, CheckAction): + raise TypeError("Non an CheckAction passed into check flow. Name of the action is {name!s}".format(check.name)) + + def make_checks(self) -> typing.List[str]: + failed_checks_msgs = [] + log.debug("Start checks") + for check in self.stages: + log.debug("Make check {name}".format(name=check.name)) + if not check.do_check(): + failed_checks_msgs.append(f"Required pre-conversion condition {check.name!s} not met:\n\t{check.description!s}\n") + + return failed_checks_msgs + + +class FlowProgressbar(): + def __init__(self, flow: ActionsFlow, writers: typing.List[writers.Writer] = None): + self.flow = flow + self.total_time = flow.get_total_time() + + if writers is None: + writers = [writers.StdoutWriter] + self.writers = writers + + def _seconds_to_minutes(self, seconds: str) -> str: + minutes = int(seconds / 60) + seconds = int(seconds % 60) + return f"{minutes:02d}:{seconds:02d}" + + def get_action_description(self) -> str: + description = f" stage {self.flow.get_current_stage()} / action {self.flow.get_current_action()} " + description_length = len(description) + return "(" + " " * math.floor((50 - description_length) / 2) + description + " " * math.ceil((50 - description_length) / 2) + ")" + + def write(self, msg: str) -> None: + for writer in self.writers: + writer.write(msg) + + def display(self) -> None: + start_time = time.time() + passed_time = 0 + + while passed_time < self.total_time and not self.flow.is_finished(): + percent = int((passed_time) / self.total_time * 100) + + description = self.get_action_description() + + progress = "=" * int(percent / 2) + ">" + " " * (50 - int(percent / 2)) + progress = "[" + progress[:25] + description + progress[25:] + "]" + + terminal_size, _ = shutil.get_terminal_size() + output = "" + if terminal_size > 118: + output = progress + " " + self._seconds_to_minutes(passed_time) + " / " + self._seconds_to_minutes(self.total_time) + elif terminal_size > 65 and terminal_size < 118: + output = description + " " + self._seconds_to_minutes(passed_time) + " / " + self._seconds_to_minutes(self.total_time) + else: + output = self._seconds_to_minutes(passed_time) + " / " + self._seconds_to_minutes(self.total_time) + + clean = " " * (terminal_size - len(output)) + + if percent < 80: + color = "\033[92m" # green + else: + color = "\033[93m" # yellow + drop_color = "\033[0m" + + self.write(f"\r{color}{output}{clean}{drop_color}") + time.sleep(1) + passed_time = time.time() - start_time + + if passed_time > self.total_time: + self.write("\r\033[91m[" + "X" * 25 + self.get_action_description() + "X" * 25 + "] exceed\033[0m") + self.write(common.TIME_EXCEEDED_MESSAGE.format(common.DEFAULT_LOG_FILE)) + diff --git a/dist.py b/dist.py new file mode 100644 index 0000000..f0d3be2 --- /dev/null +++ b/dist.py @@ -0,0 +1,78 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +from enum import Enum, auto +import sys +if sys.version_info < (3, 8): + import platform + + +class Distro(Enum): + unknown = auto() + unsupported = auto() + centos7 = auto() + almalinux8 = auto() + ubuntu18 = auto() + ubuntu20 = auto() + + +DISTRO_MAPPING = { + "CentOS Linux 7": Distro.centos7, + "AlmaLinux 8": Distro.almalinux8, + "Ubuntu 18": Distro.ubuntu18, + "Ubuntu 20": Distro.ubuntu20, +} + + +def _parce_os_relase(): + name = "" + version = "" + with open("/etc/os-release") as f: + lines = f.readlines() + for line in lines: + if line.startswith("NAME="): + name = line.split("=")[1].strip().strip('"') + elif line.startswith("VERSION_ID="): + version = line.split("=")[1].strip().strip('"') + + return name, version + + +def get_distro() -> Distro: + if hasattr(get_distro, "cache"): + return get_distro.cache + + if sys.version_info < (3, 8): + distro = platform.linux_distribution() + else: + distro = _parce_os_relase() + + name = distro[0] + major_version = distro[1].split(".")[0] + + get_distro.cache = DISTRO_MAPPING.get(f"{name} {major_version}", Distro.unknown) + + return get_distro.cache + + +def get_distro_description(distro: Distro) -> str: + for key, value in DISTRO_MAPPING.items(): + if value == distro: + return key + + if distro == Distro.centos7: + return "CentOS 7" + elif distro == Distro.almalinux8: + return "AlmaLinux 8" + elif distro == Distro.ubuntu18: + return "Ubuntu 18" + elif distro == Distro.ubuntu20: + return "Ubuntu 20" + else: + return "Unknown" + + +def _is_deb_based(distro: Distro) -> bool: + return distro in [Distro.ubuntu18, Distro.ubuntu20] + + +def _is_rhel_based(distro: Distro) -> bool: + return distro in [Distro.centos7, Distro.almalinux8] diff --git a/dpkg.py b/dpkg.py new file mode 100644 index 0000000..22418fa --- /dev/null +++ b/dpkg.py @@ -0,0 +1,76 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import os +import subprocess +import typing + +from . import files, util + +APT_CHOOSE_OLD_FILES_OPTIONS = ['-o', 'Dpkg::Options::=--force-confdef', + '-o', 'Dpkg::Options::=--force-confold'] + + +def is_package_installed(pkg: str) -> bool: + res = subprocess.run(["/usr/bin/dpkg", "-s", pkg], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return res.returncode == 0 + + +def install_packages(pkgs: typing.List[str], repository: str = None, force_package_config: bool = False) -> None: + if len(pkgs) == 0: + return + + # repository specify is not supported by now + cmd = ['/usr/bin/apt-get', 'install', '-y'] + if force_package_config is True: + cmd += APT_CHOOSE_OLD_FILES_OPTIONS + cmd += pkgs + + util.logged_check_call(cmd, env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"}) + + +def remove_packages(pkgs: typing.List[str]) -> None: + if len(pkgs) == 0: + return + + cmd = ["/usr/bin/apt-get", "remove", "-y"] + pkgs + util.logged_check_call(cmd) + + +def find_related_repofiles(repository_file: str) -> typing.List[str]: + return files.find_files_case_insensitive("/etc/apt/sources.list.d", repository_file) + + +def update_package_list() -> None: + util.logged_check_call(["/usr/bin/apt-get", "update", "-y"]) + + +def upgrade_packages(pkgs: typing.List[str] = None) -> None: + if pkgs is None: + pkgs = [] + + cmd = ["/usr/bin/apt-get", "upgrade", "-y"] + APT_CHOOSE_OLD_FILES_OPTIONS + pkgs + util.logged_check_call(cmd, env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"}) + + +def autoremove_outdated_packages() -> None: + util.logged_check_call(["/usr/bin/apt-get", "autoremove", "-y"], + env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"}) + + +def depconfig_parameter_set(parameter: str, value: str) -> None: + subprocess.run(["/usr/bin/debconf-communicate"], input=f"SET {parameter} {value}\n", + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, universal_newlines=True) + + +def depconfig_parameter_get(parameter: str) -> None: + process = subprocess.run(["/usr/bin/debconf-communicate"], input=f"GET {parameter}\n", + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True, universal_newlines=True) + return process.stdout.split(" ")[1].strip() + + +def restore_installation() -> None: + util.logged_check_call(["/usr/bin/apt-get", "-f", "install", "-y"]) + + +def do_distupgrade() -> None: + util.logged_check_call(["apt-get", "dist-upgrade", "-y"] + APT_CHOOSE_OLD_FILES_OPTIONS, + env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"}) \ No newline at end of file diff --git a/feedback.py b/feedback.py new file mode 100644 index 0000000..2d50f3d --- /dev/null +++ b/feedback.py @@ -0,0 +1,52 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. + +import os +import subprocess +import typing +import zipfile + +from common import dist + + +class Feedback(): + VERSIONS_FILE_PATH = "versions.txt" + + def __init__(self, util_name: str, util_version: str, + filelist: typing.List[str] = None, collect_actions: typing.List[typing.Callable] = None) -> None: + self.util_name = util_name + self.util_version = util_version + + if filelist is None: + filelist = [] + self.keep_files = filelist + + if collect_actions is None: + collect_actions = [] + + self.created_files = [self.VERSIONS_FILE_PATH] + for action in collect_actions: + self.created_files.append(action()) + + self._prepare_versions_file() + + def _prepare_versions_file(self): + with open(self.VERSIONS_FILE_PATH, "w") as versions: + try: + versions.write("The {utility} utility version: {ver}\n".format(utility=self.util_name, ver=self.util_version)) + versions.write("Distribution information: {}\n".format(" ".join(dist.get_distro_description(dist.get_distro())))) + + kernel_info = subprocess.check_output(["/usr/bin/uname", "-a"], universal_newlines=True).splitlines()[0] + versions.write("Kernel information: {}\n".format(kernel_info)) + except subprocess.CalledProcessError: + versions.write("Plesk version is not available\n") + + def save_archive(self, archive_name: str): + with zipfile.ZipFile(archive_name, "w") as zip_file: + files_to_store = self.keep_files + self.created_files + for file in (file for file in files_to_store if os.path.exists(file)): + zip_file.write(file) + + def __del__(self): + for file in self.created_files: + if os.path.exists(file): + os.unlink(file) diff --git a/files.py b/files.py new file mode 100644 index 0000000..d09ee61 --- /dev/null +++ b/files.py @@ -0,0 +1,183 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import fnmatch +import json +import os +import re +import shutil +import typing + +from . import log + + +def replace_string(filename: str, original_substring: str, new_substring: str) -> None: + with open(filename, "r") as original, open(filename + ".next", "w") as dst: + for line in original.readlines(): + line = line.replace(original_substring, new_substring) + dst.write(line) + + shutil.move(filename + ".next", filename) + + +def append_strings(filename: str, strings: typing.List[str]) -> None: + next_file = filename + ".next" + shutil.copy(filename, next_file) + + with open(next_file, "a") as dst: + for string in strings: + dst.write(string) + + shutil.move(next_file, filename) + + +def push_front_strings(filename: str, strings: typing.List[str]) -> None: + next_file = filename + ".next" + + with open(filename, "r") as original, open(next_file, "w") as dst: + for string in strings: + dst.write(string) + + for line in original.readlines(): + dst.write(line) + + shutil.move(next_file, filename) + + +def rewrite_json_file(filename: str, jobj: typing.Union[dict, typing.List]) -> None: + if filename is None or jobj is None: + return + + log.debug("Going to write json '{file}' with new data".format(file=filename)) + + with open(filename + ".next", "w") as dst: + dst.write(json.dumps(jobj, indent=4)) + + shutil.move(filename + ".next", filename) + + +def get_last_lines(filename: str, n: int) -> typing.List[str]: + with open(filename) as f: + return f.readlines()[-n:] + + +def backup_file(filename: str) -> None: + if os.path.exists(filename): + shutil.copy(filename, filename + ".bak") + + +def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False) -> None: + if os.path.exists(filename + ".bak"): + shutil.move(filename + ".bak", filename) + elif remove_if_no_backup and os.path.exists(filename): + os.remove(filename) + + +def remove_backup(filename: str) -> None: + if os.path.exists(filename + ".bak"): + os.remove(filename + ".bak") + + +def __get_files_recursive(path: str) -> typing.Iterator[str]: + for root, _, files in os.walk(path): + for file in files: + yield os.path.relpath(os.path.join(root, file), path) + + +def find_files_case_insensitive(path: str, regexps_strings: typing.Union[typing.List, str], recursive: bool = False): + # Todo. We should add typing for our functions + if not isinstance(regexps_strings, list) and not isinstance(regexps_strings, str): + raise TypeError("find_files_case_insensitive argument regexps_strings must be a list") + # But string is a common mistake and we can handle it simply + if isinstance(regexps_strings, str): + regexps_strings = [regexps_strings] + + if not os.path.exists(path) or not os.path.isdir(path): + return [] + + result = [] + regexps = [re.compile(fnmatch.translate(r), re.IGNORECASE) for r in regexps_strings] + files_list = __get_files_recursive(path) if recursive else os.listdir(path) + + for file in files_list: + for regexp in regexps: + if regexp.match(os.path.basename(file)): + result.append(os.path.join(path, file)) + + return result + + +def is_directory_empty(path: str): + return not os.path.exists(path) or len(os.listdir(path)) == 0 + + +def find_subdirectory_by(directory: str, functor: typing.Callable[[str], bool]) -> str: + for root, directories, _ in os.walk(directory): + for subdir in directories: + fullpath = os.path.join(root, subdir) + if functor(fullpath): + return fullpath + return None + + +def find_file_substrings(filename: str, substring: str) -> typing.List[str]: + if not os.path.exists(filename): + return [] + + res = [] + with open(filename, "r") as f: + for line in f.readlines(): + if substring in line: + res.append(line) + return res + + +def cnf_set_section_variable(filename: str, section: str, variable: str, value: str) -> None: + if not os.path.exists(filename): + return + + with open(filename, "r") as original, open(filename + ".next", "w") as dst: + section_found = in_section = False + variable_found = False + for line in original.readlines(): + if line.startswith("["): + if in_section: + in_section = False + if not variable_found: + dst.write(f"{variable}={value}\n") + + else: + in_section = line[1:-2] == section + section_found = in_section is True + + if in_section and line.startswith(variable + "="): + line = variable + "=" + value + "\n" + variable_found = True + + dst.write(line) + + if not section_found: + dst.write(f"\n[{section}]\n{variable}={value}\n") + elif in_section and not variable_found: + dst.write(f"{variable}={value}\n") + + shutil.move(filename + ".next", filename) + + +def cnf_unset_section_variable(filename: str, section: str, variable: str) -> None: + if not os.path.exists(filename): + return + + with open(filename, "r") as original, open(filename + ".next", "w") as dst: + section_found = False + for line in original.readlines(): + if line.startswith("["): + if section_found: + section_found = False + else: + section_found = line[1:-2] == section + + if section_found and line.startswith(variable + "="): + continue + + dst.write(line) + + shutil.move(filename + ".next", filename) diff --git a/leapp_configs.py b/leapp_configs.py new file mode 100644 index 0000000..876e40c --- /dev/null +++ b/leapp_configs.py @@ -0,0 +1,273 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import os +import json +import shutil +import typing + +from enum import IntEnum + +from . import files, log, rpm + + +PATH_TO_CONFIGFILES = "/etc/leapp/files" +LEAPP_REPOS_FILE_PATH = os.path.join(PATH_TO_CONFIGFILES, "leapp_upgrade_repositories.repo") +LEAPP_MAP_FILE_PATH = os.path.join(PATH_TO_CONFIGFILES, "repomap.csv") +LEAPP_PKGS_CONF_PATH = os.path.join(PATH_TO_CONFIGFILES, "pes-events.json") + +REPO_HEAD_WITH_URL = """ +[{id}] +name={name} +baseurl={url} +""" + +REPO_HEAD_WITH_METALINK = """ +[{id}] +name={name} +metalink={url} +""" + + +def _do_replacement(to_change: str, replacement_list: typing.List[typing.Callable[[str], str]]) -> str: + if to_change is None: + return None + + for replace in replacement_list: + to_change = replace(to_change) + return to_change + + +def _do_id_replacement(id: str) -> str: + return _do_replacement(id, [ + lambda to_change: "alma-" + to_change, + ]) + + +def _do_name_replacement(name: str) -> str: + return _do_replacement(name, [ + lambda to_change: "Alma " + to_change, + lambda to_change: to_change.replace("Enterprise Linux 7", "Enterprise Linux 8"), + lambda to_change: to_change.replace("EPEL-7", "EPEL-8"), + lambda to_change: to_change.replace("$releasever", "8"), + ]) + + +def _fixup_old_php_urls(to_change: str) -> str: + supported_old_versions = ["7.1", "7.2", "7.3"] + for version in supported_old_versions: + if "PHP_" + version in to_change: + return to_change.replace("rpm-CentOS-7", "rpm-CentOS-8") + + return to_change + + +def _fix_rackspace_repository(to_change: str) -> str: + if "mirror.rackspace.com" in to_change: + return to_change.replace("centos7-amd64", "rhel8-amd64") + + return to_change + + +def _fix_mariadb_repository(to_change: str) -> str: + # Mariadb official repository doesn't support short url for centos 8 since 10.11 + # Since there are short URL for rhel8 short for all versions, we could use it instead + if "yum.mariadb.org" in to_change: + return to_change.replace("centos7", "rhel8") + + return to_change + + +def _fix_postgresql_official_repository(to_change: str) -> str: + # The default PostgreSQL official repository list includes a testing repository + # intended for CentOS 7, which does not have an equivalent for RHEL-based 8. + # This behavior is specific exactly for testing repository, srpms, common and debug + # repositories are fine. + # This issue is applicable to all PostgreSQL versions before 16. + # Therefore, we need to create a mapping to the non-testing repository + # to prevent errors during the conversion process. + if "download.postgresql.org" in to_change: + splited = to_change.split("/") + for index, item in enumerate(splited): + if item == "testing": + # An exclusion for srpms repository. No rhel 8 repository when version is 14. Looks strange, maybe some kind of a mess + if splited[index - 1] == "srpms" and splited[index + 1].isdigit() and int(splited[index + 1]) != 14: + return to_change + if splited[index + 1] == "common" or splited[index + 1] == "debug": + return to_change + if splited[index + 1].isdigit() and int(splited[index + 1]) >= 16: + return to_change + + return to_change.replace("/testing/", "/") + + return to_change + + +def _do_url_replacement(url: str) -> str: + return _do_replacement(url, [ + _fixup_old_php_urls, + _fix_rackspace_repository, + _fix_mariadb_repository, + _fix_postgresql_official_repository, + lambda to_change: to_change.replace("rpm-CentOS-7", "rpm-RedHat-el8"), + lambda to_change: to_change.replace("epel-7", "epel-8"), + lambda to_change: to_change.replace("epel-debug-7", "epel-debug-8"), + lambda to_change: to_change.replace("epel-source-7", "epel-source-8"), + lambda to_change: to_change.replace("centos7", "centos8"), + lambda to_change: to_change.replace("centos/7", "centos/8"), + lambda to_change: to_change.replace("rhel/7", "rhel/8"), + lambda to_change: to_change.replace("CentOS_7", "CentOS_8"), + lambda to_change: to_change.replace("rhel-$releasever", "rhel-8"), + lambda to_change: to_change.replace("$releasever", "8"), + lambda to_change: to_change.replace("autoinstall.plesk.com/PMM_0.1.10", "autoinstall.plesk.com/PMM_0.1.11"), + lambda to_change: to_change.replace("autoinstall.plesk.com/PMM0", "autoinstall.plesk.com/PMM_0.1.11"), + ]) + + +def _do_common_replacement(line: str) -> str: + return _do_replacement(line, [ + lambda to_change: to_change.replace("EPEL-7", "EPEL-8"), + # We can't check repository gpg because the key is not stored in the temporary file system + # ToDo: Maybe we could find a way to put the key into the file system + lambda to_change: to_change.replace("repo_gpgcheck = 1", "repo_gpgcheck = 0"), + ]) + + +def is_repo_ok(id: str, name: str, url: str, metalink: str) -> bool: + if name is None: + log.warn("Repository info for '[{id}]' has no a name".format(id=id)) + return False + + if url is None and metalink is None: + log.warn("Repository info for '{id}' has no baseurl and metalink".format(id=id)) + return False + + return True + + +def adopt_repositories(repofile: str, ignore: typing.List = None) -> None: + if ignore is None: + ignore = [] + + log.debug("Adopt repofile '{filename}' for AlmaLinux 8".format(filename=repofile)) + + if not os.path.exists(repofile): + log.warn("The repository adapter has tried to open an unexistent file: {filename}".format(filename=repofile)) + return + + with open(repofile + ".next", "a") as dst: + for id, name, url, metalink, additional_lines in rpm.extract_repodata(repofile): + if not is_repo_ok(id, name, url, metalink): + continue + + if id in ignore: + log.debug("Skip repository '{id}' adaptation since it is in ignore list.".format(id=id)) + continue + + log.debug("Adopt repository with id '{id}' is extracted.".format(id=id)) + + id = _do_id_replacement(id) + name = _do_name_replacement(name) + if url is not None: + url = _do_url_replacement(url) + repo_format = REPO_HEAD_WITH_URL + else: + url = _do_url_replacement(metalink) + repo_format = REPO_HEAD_WITH_METALINK + + dst.write(repo_format.format(id=id, name=name, url=url)) + + for line in (_do_common_replacement(add_line) for add_line in additional_lines): + dst.write(line) + + shutil.move(repofile + ".next", repofile) + + +def add_repositories_mapping(repofiles: typing.List[str], ignore: typing.List = None, + leapp_repos_file_path: str = LEAPP_REPOS_FILE_PATH, + mapfile_path: str = LEAPP_MAP_FILE_PATH) -> None: + if ignore is None: + ignore = [] + + with open(leapp_repos_file_path, "a") as leapp_repos_file, open(mapfile_path, "a") as map_file: + for file in repofiles: + log.debug("Processing repofile '{filename}' into leapp configuration".format(filename=file)) + + if not os.path.exists(file): + log.warn("The repository mapper has tried to open an unexistent file: {filename}".format(filename=file)) + continue + + for id, name, url, metalink, additional_lines in rpm.extract_repodata(file): + if not is_repo_ok(id, name, url, metalink): + continue + + if id in ignore: + log.debug("Skip repository '{id}' since it is in ignore list.".format(id=id)) + continue + + log.debug("Repository entry with id '{id}' is extracted.".format(id=id)) + + new_id = _do_id_replacement(id) + name = _do_name_replacement(name) + if url is not None: + url = _do_url_replacement(url) + repo_format = REPO_HEAD_WITH_URL + else: + url = _do_url_replacement(metalink) + repo_format = REPO_HEAD_WITH_METALINK + + leapp_repos_file.write(repo_format.format(id=new_id, name=name, url=url)) + + for line in (_do_common_replacement(add_line) for add_line in additional_lines): + leapp_repos_file.write(line) + + # Special case for plesk repository. We need to add dist repository to install some of plesk packages + # We support metalink for plesk repository, regardless of the fact we don't use them now + if id.startswith("PLESK_18_0") and "extras" in id and url is not None: + leapp_repos_file.write(repo_format.format(id=new_id.replace("-extras", ""), + name=name.replace("extras", ""), + url=url.replace("extras", "dist"))) + leapp_repos_file.write("enabled=1\ngpgcheck=1\n") + + map_file.write("{oldrepo},{newrepo},{newrepo},all,all,x86_64,rpm,ga,ga\n".format(oldrepo=id, newrepo=new_id.replace("-extras", ""))) + + leapp_repos_file.write("\n") + + map_file.write("{oldrepo},{newrepo},{newrepo},all,all,x86_64,rpm,ga,ga\n".format(oldrepo=id, newrepo=new_id)) + + map_file.write("\n") + + +def set_package_repository(package: str, repository: str, leapp_pkgs_conf_path: str = LEAPP_PKGS_CONF_PATH) -> None: + pkg_mapping = None + with open(leapp_pkgs_conf_path, "r") as pkg_mapping_file: + pkg_mapping = json.load(pkg_mapping_file) + for info in pkg_mapping["packageinfo"]: + for outpkg in info["out_packageset"]["package"]: + if outpkg["name"] == package: + outpkg["repository"] = repository + + files.rewrite_json_file(leapp_pkgs_conf_path, pkg_mapping) + + +# The following types are defined in the leapp-repository repository and can be used +# to define the action type of the package in the pes-events.json file. +class LeappActionType(IntEnum): + PRESENT = 0 + REMOVED = 1 + DEPRECATED = 2 + REPLACED = 3 + SPLIT = 4 + MERGED = 5 + MOVED = 6 + RENAMED = 7 + + +def set_package_action(package: str, type: LeappActionType, leapp_pkgs_conf_path: str = LEAPP_PKGS_CONF_PATH): + pkg_mapping = None + with open(leapp_pkgs_conf_path, "r") as pkg_mapping_file: + pkg_mapping = json.load(pkg_mapping_file) + for info in pkg_mapping["packageinfo"]: + for inpackage in info["in_packageset"]["package"]: + if inpackage["name"] == package: + info["action"] = type + + files.rewrite_json_file(leapp_pkgs_conf_path, pkg_mapping) diff --git a/log.py b/log.py new file mode 100644 index 0000000..3595e43 --- /dev/null +++ b/log.py @@ -0,0 +1,94 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import logging + +import typing + + +DEFAULT_LOG_FILE = "/var/log/plesk/centos2alma.log" + + +class logger(): + files_logger = logging.getLogger("centos2alma_files") + + is_streams_enabled = False + streams_logger = logging.getLogger("centos2alma_streams") + + @staticmethod + def init_logger(logfiles: typing.List[str], streams: typing.List[typing.Any], + console: bool = False, loglevel: int = logging.INFO) -> None: + logger.files_logger.setLevel(loglevel) + logger.streams_logger.setLevel(loglevel) + + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + + file_handlers = [] + for logfile in logfiles: + file_handlers.append(logging.FileHandler(logfile)) + + stream_handlers = [logging.FileHandler('/dev/console', mode='w')] if console else [] + for stream in streams: + stream_handlers.append(logging.StreamHandler(stream)) + if len(stream_handlers): + logger.is_streams_enabled = True + + for handler in file_handlers + stream_handlers: + handler.setFormatter(formatter) + + for handler in file_handlers: + logger.files_logger.addHandler(handler) + + for handler in stream_handlers: + logger.streams_logger.addHandler(handler) + + @staticmethod + def debug(msg: str, to_file: bool = True, to_stream: bool = True) -> None: + if to_file: + logger.files_logger.debug(msg) + + if to_stream and logger.is_streams_enabled: + logger.streams_logger.debug(msg) + + @staticmethod + def info(msg: str, to_file: bool = True, to_stream: bool = True) -> None: + if to_file: + logger.files_logger.info(msg) + + if to_stream and logger.is_streams_enabled: + logger.streams_logger.info(msg) + + @staticmethod + def warn(msg: str, to_file: bool = True, to_stream: bool = True) -> None: + if to_file: + logger.files_logger.warn(msg) + + if to_stream and logger.is_streams_enabled: + logger.streams_logger.warn(msg) + + @staticmethod + def err(msg: str, to_file: bool = True, to_stream: bool = True) -> None: + if to_file: + logger.files_logger.error(msg) + + if to_stream and logger.is_streams_enabled: + logger.streams_logger.error(msg) + + +def init_logger(logfiles: typing.List[str], streams: typing.List[typing.Any], + console: bool = False, loglevel: int = logging.INFO) -> None: + logger.init_logger(logfiles, streams, console, loglevel) + + +def debug(msg: str, to_file: bool = True, to_stream: bool = True) -> None: + logger.debug(msg, to_file, to_stream) + + +def info(msg: str, to_file: bool = True, to_stream: bool = True) -> None: + logger.info(msg, to_file, to_stream) + + +def warn(msg: str, to_file: bool = True, to_stream: bool = True) -> None: + logger.warn(msg, to_file, to_stream) + + +def err(msg: str, to_file: bool = True, to_stream: bool = True) -> None: + logger.err(msg, to_file, to_stream) \ No newline at end of file diff --git a/messages.py b/messages.py new file mode 100644 index 0000000..e69de29 diff --git a/motd.py b/motd.py new file mode 100644 index 0000000..ae0d5cb --- /dev/null +++ b/motd.py @@ -0,0 +1,64 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import os +import shutil + +from . import files, log + +MOTD_PATH = "/etc/motd" + + +def restore_ssh_login_message(motd_path: str = MOTD_PATH) -> None: + files.restore_file_from_backup(motd_path, remove_if_no_backup=True) + + +def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None: + try: + if not os.path.exists(motd_path + ".bak"): + if os.path.exists(motd_path): + files.backup_file(motd_path) + else: + with open(motd_path + ".bak", "a") as motd: + pass + + with open(motd_path, "a") as motd: + motd.write(message) + except FileNotFoundError: + log.warn("The /etc/motd file cannot be changed or created. The script may be lacking the permissions to do so.") + + +FINISH_INTRODUCE_MESSAGE = """ +=============================================================================== +Message from the Plesk centos2alma tool: +""" + +FINISH_END_MESSAGE = """You can remove this message from the {} file. +=============================================================================== +""".format(MOTD_PATH) + + +def add_finish_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None: + try: + if not os.path.exists(motd_path + ".next"): + if os.path.exists(motd_path + ".bak"): + shutil.copy(motd_path + ".bak", motd_path + ".next") + + with open(motd_path + ".next", "a") as motd: + motd.write(FINISH_INTRODUCE_MESSAGE) + + with open(motd_path + ".next", "a") as motd: + motd.write(message) + except FileNotFoundError: + log.warn("The /etc/motd file cannot be changed or created. The script may be lacking the permissions to do so.") + + +def publish_finish_ssh_login_message(motd_path: str = MOTD_PATH) -> None: + try: + if os.path.exists(motd_path + ".next"): + with open(motd_path + ".next", "a") as motd: + motd.write(FINISH_END_MESSAGE) + + shutil.move(motd_path + ".next", motd_path) + else: + files.restore_file_from_backup(motd_path, remove_if_no_backup=True) + except FileNotFoundError: + log.warn("The /etc/motd file cannot be changed or created. The script may be lacking the permissions to do so.") diff --git a/packages.py b/packages.py new file mode 100644 index 0000000..bf2b71f --- /dev/null +++ b/packages.py @@ -0,0 +1,78 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import typing + +from . import dist, dpkg, rpm + + +def filter_installed_packages(lookup_pkgs: typing.List[str]) -> typing.List[str]: + return [pkg for pkg in lookup_pkgs if is_package_installed(pkg)] + + +def is_package_installed(pkg: str) -> bool: + started_on = dist.get_distro() + if dist._is_deb_based(started_on): + return dpkg.is_package_installed(pkg) + elif dist._is_rhel_based(started_on): + return rpm.is_package_installed(pkg) + else: + raise NotImplementedError(f"Unsupported distro {started_on}") + + +def install_packages(pkgs: str, repository: str = None, force_package_config: bool = False) -> None: + started_on = dist.get_distro() + if dist._is_deb_based(started_on): + return dpkg.install_packages(pkgs, repository, force_package_config) + elif dist._is_rhel_based(started_on): + return rpm.install_packages(pkgs, repository, force_package_config) + else: + raise NotImplementedError(f"Unsupported distro {started_on}") + + +def remove_packages(pkgs: str) -> None: + started_on = dist.get_distro() + if dist._is_deb_based(started_on): + return dpkg.remove_packages(pkgs) + elif dist._is_rhel_based(started_on): + return rpm.remove_packages(pkgs) + else: + raise NotImplementedError(f"Unsupported distro {started_on}") + + +def find_related_repofiles(repofiles_mask: str) -> typing.List[str]: + started_on = dist.get_distro() + if dist._is_deb_based(started_on): + return dpkg.find_related_repofiles(repofiles_mask) + elif dist._is_rhel_based(started_on): + return rpm.find_related_repofiles(repofiles_mask) + else: + raise NotImplementedError(f"Unsupported distro {started_on}") + + +def update_package_list() -> None: + started_on = dist.get_distro() + if dist._is_deb_based(started_on): + return dpkg.update_package_list() + elif dist._is_rhel_based(started_on): + return rpm.update_package_list() + else: + raise NotImplementedError(f"Unsupported distro {started_on}") + + +def upgrade_packages(pkgs: typing.List[str] = None) -> None: + started_on = dist.get_distro() + if dist._is_deb_based(started_on): + return dpkg.upgrade_packages(pkgs) + elif dist._is_rhel_based(started_on): + return rpm.upgrade_packages(pkgs) + else: + raise NotImplementedError(f"Unsupported distro {started_on}") + + +def autoremove_outdated_packages() -> None: + started_on = dist.get_distro() + if dist._is_deb_based(started_on): + return dpkg.autoremove_outdated_packages() + elif dist._is_rhel_based(started_on): + return rpm.autoremove_outdated_packages() + else: + raise NotImplementedError(f"Unsupported distro {started_on}") diff --git a/plesk.py b/plesk.py new file mode 100644 index 0000000..78d08f2 --- /dev/null +++ b/plesk.py @@ -0,0 +1,80 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. + +import os +import subprocess +import typing + +from . import log + + +def send_error_report(error_message: str) -> None: + # Todo. For now we works only on RHEL-based distros, so the path + # to the send-error-report utility will be the same. + # But if we will support Debian-based we should choose path carefully + send_error_path = "/usr/local/psa/admin/bin/send-error-report" + try: + if os.path.exists(send_error_path): + subprocess.run([send_error_path, "backend"], input=error_message.encode(), + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except Exception: + # We don't care about errors to avoid mislead of the user + pass + + +def get_plesk_version() -> typing.List[str]: + version_info = subprocess.check_output(["/usr/sbin/plesk", "version"], universal_newlines=True).splitlines() + for line in version_info: + if line.startswith("Product version"): + version = line.split()[-1] + return version.split(".") + + raise Exception("Unable to parce plesk version output.") + + +def get_plesk_full_version() -> typing.List[str]: + return subprocess.check_output(["/usr/sbin/plesk", "version"], universal_newlines=True).splitlines() + + +_CONVERSION_STATUS_FLAG_FILE = "/tmp/centos2alma-conversion.flag" + + +def prepare_conversion_flag() -> None: + with open(_CONVERSION_STATUS_FLAG_FILE, "w"): + pass + + +def send_conversion_status(succeed: bool) -> str: + results_sander_path = None + for path in ["/var/cache/parallels_installer/report-update", "/root/parallels/report-update"]: + if os.path.exists(path): + results_sander_path = path + break + + # For now we are not going to install sender in scoupe of conversion. + # So if we have one, use it. If not, just skip send the results + if results_sander_path is None: + log.warn("Unable to find report-update utility. Skip sending conversion status") + return + + if not os.path.exists(_CONVERSION_STATUS_FLAG_FILE): + log.warn("Conversion status flag file does not exist. Skip sending conversion status") + return + + plesk_version = ".".join(get_plesk_version()) + + try: + log.debug("Trying to send status of conversion by report-update utility") + subprocess.run(["/usr/bin/python3", results_sander_path, "--op", "dist-upgrade", "--rc", "0" if succeed else "1", + "--start-flag", _CONVERSION_STATUS_FLAG_FILE, "--from", plesk_version, "--to", plesk_version], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except Exception as ex: + log.warn("Unable to send conversion status: {}".format(ex)) + + # usually the file should be removed by report-update utility + # but if it will be failed, we should remove it manually + remove_conversion_flag() + + +def remove_conversion_flag() -> str: + if os.path.exists(_CONVERSION_STATUS_FLAG_FILE): + os.unlink(_CONVERSION_STATUS_FLAG_FILE) diff --git a/rpm.py b/rpm.py new file mode 100644 index 0000000..368b8b1 --- /dev/null +++ b/rpm.py @@ -0,0 +1,163 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import itertools +import os +import shutil +import subprocess +import typing + +from . import files, util, log + +REPO_HEAD_WITH_URL = """[{id}] +name={name} +baseurl={url} +""" + +REPO_HEAD_WITH_METALINK = """[{id}] +name={name} +metalink={url} +""" + + +def extract_repodata(repofile: str) -> typing.Iterable[typing.Tuple[str, str, str, str, typing.List[str]]]: + id = None + name = None + url = None + metalink = None + additional = [] + + with open(repofile, "r") as repo: + for line in repo.readlines(): + if line.startswith("["): + if id is not None: + yield (id, name, url, metalink, additional) + + id = None + name = None + url = None + metalink = None + additional = [] + + log.debug("Repository file line: {line}".format(line=line.rstrip())) + if line.startswith("["): + id = line[1:-2] + continue + + if "=" not in line: + additional.append(line) + continue + + field, val = line.split("=", 1) + field = field.strip().rstrip() + val = val.strip().rstrip() + if field == "name": + name = val + elif field == "baseurl": + url = val + elif field == "metalink": + metalink = val + else: + additional.append(line) + + yield (id, name, url, metalink, additional) + + +def write_repodata(repofile: str, id: str, name: str, url: str, metalink: str, additional: typing.List[str]) -> None: + repo_format = REPO_HEAD_WITH_URL + if url is None: + url = metalink + repo_format = REPO_HEAD_WITH_METALINK + + with open(repofile, "a") as dst: + dst.write(repo_format.format(id=id, name=name, url=url)) + for line in additional: + dst.write(line) + + +def remove_repositories(repofile: str, conditions: typing.Callable[[str, str, str, str], bool]) -> None: + for id, name, url, metalink, additional_lines in extract_repodata(repofile): + remove = False + for condition in conditions: + if condition(id, name, url, metalink): + remove = True + break + + if not remove: + write_repodata(repofile + ".next", id, name, url, metalink, additional_lines) + + if os.path.exists(repofile + ".next"): + shutil.move(repofile + ".next", repofile) + else: + os.remove(repofile) + + +def filter_installed_packages(lookup_pkgs: typing.List[str]) -> typing.List[str]: + return [pkg for pkg in lookup_pkgs if is_package_installed(pkg)] + + +def is_package_installed(pkg: str) -> bool: + res = subprocess.run(["/usr/bin/rpm", "--quiet", "--query", pkg]) + return res.returncode == 0 + + +def install_packages(pkgs: str, repository: str = None, force_package_config: bool = False) -> None: + # force_package_config is not supported yet + if len(pkgs) == 0: + return + + command = ["/usr/bin/yum", "install"] + if repository is not None: + command += ["--repo", repository] + command += ["-y"] + pkgs + + util.logged_check_call(command) + + +def remove_packages(pkgs: str) -> None: + if len(pkgs) == 0: + return + + if os.path.exists("/usr/bin/package-cleanup"): + duplicates = subprocess.check_output(["/usr/bin/package-cleanup", "--dupes"], universal_newlines=True).splitlines() + for duplicate, pkg in itertools.product(duplicates, pkgs): + if pkg in duplicate: + util.logged_check_call(["/usr/bin/rpm", "-e", "--nodeps", duplicate]) + # Since we removed each duplicate, we don't need to remove the package in the end. + if pkg in pkgs: + pkgs.remove(pkg) + + util.logged_check_call(["/usr/bin/rpm", "-e", "--nodeps"] + pkgs) + + +def handle_rpmnew(original_path: str) -> bool: + if not os.path.exists(original_path + ".rpmnew"): + return False + + if os.path.exists(original_path): + log.debug("The '{path}' file has a '.rpmnew' analogue file. Going to replace the file with this rpmnew file. " + "The file itself will be saved as .rpmsave".format(path=original_path)) + shutil.move(original_path, original_path + ".rpmsave") + else: + log.debug("The '{path}' file is missing, but has '.rpmnew' analogue file. Going to use it".format(path=original_path)) + + shutil.move(original_path + ".rpmnew", original_path) + + return True + + +def find_related_repofiles(repository_file: str) -> typing.List[str]: + return files.find_files_case_insensitive("/etc/yum.repos.d", repository_file) + + +def update_package_list() -> None: + util.logged_check_call(["/usr/bin/yum", "update", "-y"]) + + +def upgrade_packages(pkgs: typing.List[str] = None) -> None: + if pkgs is None: + pkgs = [] + + util.logged_check_call(["/usr/bin/yum", "upgrade", "-y"] + pkgs) + + +def autoremove_outdated_packages() -> None: + util.logged_check_call(["/usr/bin/yum", "autoremove", "-y"]) diff --git a/systemd.py b/systemd.py new file mode 100644 index 0000000..aaebba4 --- /dev/null +++ b/systemd.py @@ -0,0 +1,87 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import os +import typing +import subprocess + +from . import dist, util + +SYSTEMCTL_BIN_PATH = "/usr/bin/systemctl" +if dist._is_deb_based(dist.get_distro()): + SYSTEMCTL_BIN_PATH = "/bin/systemctl" + +SYSTEMCTL_SERVICES_PATH = "/etc/systemd/system" +if dist._is_deb_based(dist.get_distro()): + SYSTEMCTL_SERVICES_PATH = "/lib/systemd/system" + + +def is_service_exists(service: str): + res = subprocess.run([SYSTEMCTL_BIN_PATH, 'cat', service], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return res.returncode == 0 + + +def is_service_active(service: str): + res = subprocess.run([SYSTEMCTL_BIN_PATH, 'is-active', service]) + return res.returncode == 0 + + +def reload_systemd_daemon(): + util.logged_check_call([SYSTEMCTL_BIN_PATH, "daemon-reload"]) + + +def start_services(services: typing.List[str]): + existed_services = [service for service in services if is_service_exists(service)] + if not existed_services: + return + + util.logged_check_call([SYSTEMCTL_BIN_PATH, "start"] + existed_services) + + +def stop_services(services: typing.List[str]): + existed_services = [service for service in services if is_service_exists(service)] + if not existed_services: + return + + util.logged_check_call([SYSTEMCTL_BIN_PATH, "stop"] + existed_services) + + +def enable_services(services: typing.List[str]): + existed_services = [service for service in services if is_service_exists(service)] + if not existed_services: + return + + util.logged_check_call([SYSTEMCTL_BIN_PATH, "enable"] + existed_services) + + +def disable_services(services: typing.List[str]): + existed_services = [service for service in services if is_service_exists(service)] + if not existed_services: + return + + util.logged_check_call([SYSTEMCTL_BIN_PATH, "disable"] + existed_services) + + +def restart_services(services: typing.List[str]): + existed_services = [service for service in services if is_service_exists(service)] + if not existed_services: + return + + util.logged_check_call([SYSTEMCTL_BIN_PATH, "restart"] + existed_services) + + +def do_reboot(): + subprocess.call([SYSTEMCTL_BIN_PATH, "reboot"]) + + +def add_systemd_service(service: str, content: str): + with open(f"{SYSTEMCTL_SERVICES_PATH}/{service}", "w") as dst: + dst.write(content) + + enable_services([service]) + + +def remove_systemd_service(service: str): + service_config = f"{SYSTEMCTL_SERVICES_PATH}/{service}" + + if os.path.exists(service_config): + disable_services([service]) + os.remove(service_config) diff --git a/tests/actionstests.py b/tests/actionstests.py new file mode 100644 index 0000000..ab103a4 --- /dev/null +++ b/tests/actionstests.py @@ -0,0 +1,401 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import unittest +from unittest import mock +import os + +from common import action + + +class SimpleAction(action.ActiveAction): + def __init__(self): + self.name = "Simple action" + self.description = "Simple action description" + + def _prepare_action(self): + pass + + def _post_action(self): + pass + + def _revert_action(self): + pass + + +class SkipAction(action.ActiveAction): + def __init__(self): + self.name = "Skip action" + self.description = "Skip action description" + + def _is_required(self): + return False + + def _prepare_action(self): + pass + + def _post_action(self): + pass + + def _revert_action(self): + pass + + +class PrepareActionsFlowForTests(action.PrepareActionsFlow): + PATH_TO_ACTIONS_DATA = "./actions.json" + + +class TestPrepareActionsFlow(unittest.TestCase): + + def setUp(self): + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [] }") + + def tearDown(self): + os.remove("actions.json") + + def test_one_simple_action(self): + simple_action = SimpleAction() + simple_action._prepare_action = mock.Mock() + with PrepareActionsFlowForTests({1: [simple_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._prepare_action.assert_called_once() + + def test_several_simple_actions(self): + actions = [] + for _ in range(5): + simple_action = SimpleAction() + simple_action._prepare_action = mock.Mock() + actions.append(simple_action) + + with PrepareActionsFlowForTests({1: actions}) as flow: + flow.validate_actions() + flow.pass_actions() + + for act in actions: + act._prepare_action.assert_called_once() + + def test_several_steps(self): + simple_action_step_1 = SimpleAction() + simple_action_step_1._prepare_action = mock.Mock() + simple_action_step_2 = SimpleAction() + simple_action_step_2._prepare_action = mock.Mock() + + with PrepareActionsFlowForTests({1: [simple_action_step_1], 2: [simple_action_step_2]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action_step_1._prepare_action.assert_called_once() + simple_action_step_2._prepare_action.assert_called_once() + + def test_skip_action(self): + simple_action = SimpleAction() + simple_action._prepare_action = mock.Mock() + skip_action = SkipAction() + skip_action._prepare_action = mock.Mock() + + with PrepareActionsFlowForTests({1: [simple_action, skip_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._prepare_action.assert_called_once() + skip_action._prepare_action.assert_not_called() + + +class SavedAction(action.ActiveAction): + def __init__(self): + self.name = "saved" + self.description = "Saved action description" + + def _prepare_action(self): + pass + + def _post_action(self): + pass + + def _revert_action(self): + pass + + +class FinishActionsFlowForTests(action.FinishActionsFlow): + PATH_TO_ACTIONS_DATA = "./actions.json" + + +class TestFinishActionsFlow(unittest.TestCase): + + def setUp(self): + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [] }") + + def tearDown(self): + # Flow removes the file by itself + pass + + def test_one_simple_action(self): + simple_action = SimpleAction() + simple_action._post_action = mock.Mock() + with FinishActionsFlowForTests({1: [simple_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._post_action.assert_called_once() + + def test_several_simple_actions(self): + actions = [] + for _ in range(5): + simple_action = SimpleAction() + simple_action._post_action = mock.Mock() + actions.append(simple_action) + + with FinishActionsFlowForTests({1: actions}) as flow: + flow.validate_actions() + flow.pass_actions() + + for act in actions: + act._post_action.assert_called_once() + + def test_several_steps(self): + simple_action_step_1 = SimpleAction() + simple_action_step_1._post_action = mock.Mock() + simple_action_step_2 = SimpleAction() + simple_action_step_2._post_action = mock.Mock() + + with FinishActionsFlowForTests({1: [simple_action_step_1], 2: [simple_action_step_2]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action_step_1._post_action.assert_called_once() + simple_action_step_2._post_action.assert_called_once() + + def test_skip_action(self): + simple_action = SimpleAction() + simple_action._post_action = mock.Mock() + skip_action = SkipAction() + skip_action._post_action = mock.Mock() + + with FinishActionsFlowForTests({1: [simple_action, skip_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._post_action.assert_called_once() + skip_action._post_action.assert_not_called() + + def test_pass_based_on_saved_state(self): + simple_action = SimpleAction() + simple_action._post_action = mock.Mock() + saved_action = SavedAction() + saved_action._post_action = mock.Mock() + + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [ { \"name\" : \"saved\", \"state\" : \"success\"}] }") + + with FinishActionsFlowForTests({1: [simple_action, saved_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._post_action.assert_called_once() + saved_action._post_action.assert_called_once() + + def test_skip_based_on_saved_state(self): + simple_action = SimpleAction() + simple_action._post_action = mock.Mock() + saved_action = SavedAction() + saved_action._post_action = mock.Mock() + + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [ { \"name\" : \"saved\", \"state\" : \"skip\"}] }") + + with FinishActionsFlowForTests({1: [simple_action, saved_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._post_action.assert_called_once() + saved_action._post_action.assert_not_called() + + def test_skip_failed_saved_state(self): + simple_action = SimpleAction() + simple_action._post_action = mock.Mock() + saved_action = SavedAction() + saved_action._post_action = mock.Mock() + + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [ { \"name\" : \"saved\", \"state\" : \"failed\"}] }") + + with FinishActionsFlowForTests({1: [simple_action, saved_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._post_action.assert_called_once() + saved_action._post_action.assert_not_called() + + +class RevertActionsFlowForTests(action.RevertActionsFlow): + PATH_TO_ACTIONS_DATA = "./actions.json" + + +class TestRevertActionsFlow(unittest.TestCase): + + def setUp(self): + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [] }") + + def tearDown(self): + # Flow removes the file by itself + pass + + def test_one_simple_action(self): + simple_action = SimpleAction() + simple_action._revert_action = mock.Mock() + with RevertActionsFlowForTests({1: [simple_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._revert_action.assert_called_once() + + def test_several_simple_actions(self): + actions = [] + for _ in range(5): + simple_action = SimpleAction() + simple_action._revert_action = mock.Mock() + actions.append(simple_action) + + with RevertActionsFlowForTests({1: actions}) as flow: + flow.validate_actions() + flow.pass_actions() + + for act in actions: + act._revert_action.assert_called_once() + + def test_several_steps(self): + simple_action_step_1 = SimpleAction() + simple_action_step_1._revert_action = mock.Mock() + simple_action_step_2 = SimpleAction() + simple_action_step_2._revert_action = mock.Mock() + + with RevertActionsFlowForTests({1: [simple_action_step_1], 2: [simple_action_step_2]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action_step_1._revert_action.assert_called_once() + simple_action_step_2._revert_action.assert_called_once() + + def test_skip_action(self): + simple_action = SimpleAction() + simple_action._revert_action = mock.Mock() + skip_action = SkipAction() + skip_action._revert_action = mock.Mock() + + with RevertActionsFlowForTests({1: [simple_action, skip_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._revert_action.assert_called_once() + skip_action._revert_action.assert_not_called() + + def test_pass_based_on_saved_state(self): + simple_action = SimpleAction() + simple_action._revert_action = mock.Mock() + saved_action = SavedAction() + saved_action._revert_action = mock.Mock() + + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [ { \"name\" : \"saved\", \"state\" : \"success\"}] }") + + with RevertActionsFlowForTests({1: [simple_action, saved_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._revert_action.assert_called_once() + saved_action._revert_action.assert_called_once() + + def test_skip_based_on_saved_state(self): + simple_action = SimpleAction() + simple_action._revert_action = mock.Mock() + saved_action = SavedAction() + saved_action._revert_action = mock.Mock() + + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [ { \"name\" : \"saved\", \"state\" : \"skip\"}] }") + + with RevertActionsFlowForTests({1: [simple_action, saved_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._revert_action.assert_called_once() + saved_action._revert_action.assert_not_called() + + def test_skip_failed_saved_state(self): + simple_action = SimpleAction() + simple_action._revert_action = mock.Mock() + saved_action = SavedAction() + saved_action._revert_action = mock.Mock() + + with open("actions.json", "w") as actions_data_file: + actions_data_file.write("{ \"actions\": [ { \"name\" : \"saved\", \"state\" : \"failed\"}] }") + + with RevertActionsFlowForTests({1: [simple_action, saved_action]}) as flow: + flow.validate_actions() + flow.pass_actions() + + simple_action._revert_action.assert_called_once() + saved_action._revert_action.assert_not_called() + + +class TrueCheckAction(action.CheckAction): + def __init__(self): + self.name = "true" + self.description = "Always returns true" + + def _do_check(self): + return True + + +class FalseCheckAction(action.CheckAction): + def __init__(self): + self.name = "false" + self.description = "Always returns false" + + def _do_check(self): + return False + + +class TestCheckFlow(unittest.TestCase): + def test_true_check(self): + check_action = TrueCheckAction() + with action.CheckFlow([check_action]) as flow: + flow.validate_actions() + res = flow.make_checks() + self.assertEqual(len(res), 0) + + def test_several_true(self): + checks = [] + for _ in range(5): + checks.append(TrueCheckAction()) + + with action.CheckFlow(checks) as flow: + flow.validate_actions() + res = flow.make_checks() + self.assertEqual(len(res), 0) + + def test_several_checks_with_one_false(self): + checks = [] + checks.append(FalseCheckAction()) + for _ in range(5): + checks.append(TrueCheckAction()) + + with action.CheckFlow(checks) as flow: + flow.validate_actions() + res = flow.make_checks() + self.assertEqual(len(res), 1) + + def test_several_checks_with_several_false(self): + checks = [] + for _ in range(5): + checks.append(FalseCheckAction()) + for _ in range(5): + checks.append(TrueCheckAction()) + + with action.CheckFlow(checks) as flow: + flow.validate_actions() + res = flow.make_checks() + self.assertEqual(len(res), 5) diff --git a/tests/distrotests.py b/tests/distrotests.py new file mode 100644 index 0000000..3712940 --- /dev/null +++ b/tests/distrotests.py @@ -0,0 +1,31 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import unittest + +from common import dist + + +class TestDistro(unittest.TestCase): + + def test_is_ubuntu_18_deb_based(self): + self.assertTrue(dist._is_deb_based(dist.Distro.ubuntu18)) + + def test_is_ubuntu_20_deb_based(self): + self.assertTrue(dist._is_deb_based(dist.Distro.ubuntu20)) + + def test_is_centos_7_rhel_based(self): + self.assertTrue(dist._is_rhel_based(dist.Distro.centos7)) + + def test_is_alma_8_rhel_based(self): + self.assertTrue(dist._is_rhel_based(dist.Distro.almalinux8)) + + def test_is_ubuntu_18_not_rhel_based(self): + self.assertFalse(dist._is_rhel_based(dist.Distro.ubuntu18)) + + def test_is_ubuntu_20_not_rhel_based(self): + self.assertFalse(dist._is_rhel_based(dist.Distro.ubuntu20)) + + def test_is_centos_7_not_deb_based(self): + self.assertFalse(dist._is_deb_based(dist.Distro.centos7)) + + def test_is_alma_8_not_deb_based(self): + self.assertFalse(dist._is_deb_based(dist.Distro.almalinux8)) diff --git a/tests/feedbacktests.py b/tests/feedbacktests.py new file mode 100644 index 0000000..4cde436 --- /dev/null +++ b/tests/feedbacktests.py @@ -0,0 +1,67 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import os +import unittest +import zipfile + +from common import feedback + + +class TestFeedback(unittest.TestCase): + + TARGET_FEEDBACK = "test_feedback.zip" + + def tearDown(self) -> None: + if os.path.exists(self.TARGET_FEEDBACK): + os.unlink(self.TARGET_FEEDBACK) + + def test_version_file_contains_required_data(self): + _ = feedback.Feedback("tests", "1.0.0-rev1") + + required_data = { + "The tests utility version: 1.0.0-rev1": False, + "Distribution information: ": False, + "Kernel information: ": False, + } + + with open("versions.txt", "r") as versions_file: + for line in versions_file: + for key in required_data.keys(): + if key in line: + required_data[key] = True + + for key, value in required_data.items(): + self.assertTrue(value, f"Required data '{key}' is not found in versions.txt") + + def test_create_simple_feedback(self): + test_feedback = feedback.Feedback("tests", "1.0.0-rev1") + test_feedback.save_archive(self.TARGET_FEEDBACK) + self.assertTrue(os.path.exists(self.TARGET_FEEDBACK)) + + with zipfile.ZipFile(self.TARGET_FEEDBACK, "r") as zip_file: + self.assertEqual(zip_file.namelist(), ["versions.txt"]) + + def test_create_feedback_with_filelist(self): + with open("testfile", "w") as testfile: + testfile.write("test") + + test_feedback = feedback.Feedback("tests", "1.0.0-rev1", filelist=["testfile"]) + test_feedback.save_archive(self.TARGET_FEEDBACK) + self.assertTrue(os.path.exists(self.TARGET_FEEDBACK)) + + with zipfile.ZipFile(self.TARGET_FEEDBACK, "r") as zip_file: + self.assertTrue("testfile" in zip_file.namelist()) + self.assertTrue("versions.txt" in zip_file.namelist()) + + def test_create_feedback_with_collected_data(self): + def collect_data(): + with open("testfile", "w") as testfile: + testfile.write("test") + return "testfile" + + test_feedback = feedback.Feedback("tests", "1.0.0-rev1", collect_actions=[collect_data]) + test_feedback.save_archive(self.TARGET_FEEDBACK) + self.assertTrue(os.path.exists(self.TARGET_FEEDBACK)) + + with zipfile.ZipFile(self.TARGET_FEEDBACK, "r") as zip_file: + self.assertTrue("testfile" in zip_file.namelist()) + self.assertTrue("versions.txt" in zip_file.namelist()) diff --git a/tests/filestests.py b/tests/filestests.py new file mode 100644 index 0000000..0fbeea9 --- /dev/null +++ b/tests/filestests.py @@ -0,0 +1,509 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import unittest +import os +import json +import tempfile +import shutil + +from common import files + + +class ReplaceFileStringTests(unittest.TestCase): + REPLACE_FILE_CONTENT = """---> cccc <--- +This is the file where we want to replace some string. This is the string to replace ---> aaaa <---. +---> eeee <--- +---> gggg <--- +""" + + DATA_FILE_NAME = "datafile.txt" + + def setUp(self): + with open(self.DATA_FILE_NAME, "w") as f: + f.write(self.REPLACE_FILE_CONTENT) + + def tearDown(self): + if os.path.exists(self.DATA_FILE_NAME): + os.remove(self.DATA_FILE_NAME) + + def test_simple_string_replace(self): + files.replace_string(self.DATA_FILE_NAME, "aaaa", "bbbb") + with open(self.DATA_FILE_NAME) as file: + for line in file.readlines(): + if line.startswith("This is the string to replace"): + self.assertEqual(line, "This is the file where we want to replace some string. This is the string to replace ---> bbbb <---.") + break + + def test_replace_first_string(self): + files.replace_string(self.DATA_FILE_NAME, "---> cccc <---", "<--- dddd --->") + with open(self.DATA_FILE_NAME) as file: + self.assertEqual(file.readline().rstrip(), "<--- dddd --->") + + def test_replace_whole_line(self): + files.replace_string(self.DATA_FILE_NAME, "---> eeee <---", "<--- ffff --->") + with open(self.DATA_FILE_NAME) as file: + line = file.readlines()[-2].rstrip() + self.assertEqual(line, "<--- ffff --->") + + def test_replase_last_string(self): + files.replace_string(self.DATA_FILE_NAME, "---> gggg <---", "<--- hhhh --->") + with open(self.DATA_FILE_NAME) as file: + line = file.readlines()[-1].rstrip() + self.assertEqual(line, "<--- hhhh --->") + + +class AppendStringsTests(unittest.TestCase): + ORIGINAL_FILE_NAME = "original.txt" + + def setUp(self): + with open(self.ORIGINAL_FILE_NAME, "w") as f: + f.write("") + + def tearDown(self): + if os.path.exists(self.ORIGINAL_FILE_NAME): + os.remove(self.ORIGINAL_FILE_NAME) + + def test_add_to_empty(self): + files.append_strings(self.ORIGINAL_FILE_NAME, ['aaaa\n', 'bbbb\n']) + with open(self.ORIGINAL_FILE_NAME) as f: + self.assertEqual([line.rstrip() for line in f.readlines()], ['aaaa', 'bbbb']) + + def test_add_to_non_empty(self): + with open(self.ORIGINAL_FILE_NAME, "w") as f: + f.write("aaaa\n") + files.append_strings(self.ORIGINAL_FILE_NAME, ["bbbb\n", "cccc\n"]) + with open(self.ORIGINAL_FILE_NAME) as f: + self.assertEqual([line.rstrip() for line in f.readlines()], ["aaaa", "bbbb", "cccc"]) + + def test_add_nothing(self): + with open(self.ORIGINAL_FILE_NAME, "w") as f: + f.write("aaaa\n") + files.append_strings(self.ORIGINAL_FILE_NAME, []) + with open(self.ORIGINAL_FILE_NAME) as f: + self.assertEqual([line.rstrip() for line in f.readlines()], ["aaaa"]) + + +class PushFrontStringsTests(unittest.TestCase): + ORIGINAL_FILE_NAME = "original.txt" + + def setUp(self): + with open(self.ORIGINAL_FILE_NAME, "w") as f: + f.write("") + + def tearDown(self): + if os.path.exists(self.ORIGINAL_FILE_NAME): + os.remove(self.ORIGINAL_FILE_NAME) + + def test_add_to_empty(self): + files.push_front_strings(self.ORIGINAL_FILE_NAME, ["aaaa\n", "bbbb\n"]) + with open(self.ORIGINAL_FILE_NAME) as f: + self.assertEqual([line.rstrip() for line in f.readlines()], ["aaaa", "bbbb"]) + + def test_add_to_non_empty(self): + with open(self.ORIGINAL_FILE_NAME, "w") as f: + f.write("aaaa\n") + files.push_front_strings(self.ORIGINAL_FILE_NAME, ["bbbb\n", "cccc\n"]) + with open(self.ORIGINAL_FILE_NAME) as f: + self.assertEqual([line.rstrip() for line in f.readlines()], ["bbbb", "cccc", "aaaa"]) + + def test_add_nothing(self): + with open(self.ORIGINAL_FILE_NAME, "w") as f: + f.write("aaaa\n") + files.push_front_strings(self.ORIGINAL_FILE_NAME, []) + with open(self.ORIGINAL_FILE_NAME) as f: + self.assertEqual([line.rstrip() for line in f.readlines()], ["aaaa"]) + + +class RewriteJsonTests(unittest.TestCase): + OriginalJson = { + "key1": "value1", + "obj": { + "key2": "value2", + }, + "array": [ + "value3", + "value4", + "value5", + ], + "objs": [ + { + "sharedkey": "value6", + }, + { + "sharedkey": "value7", + } + ], + } + INITIAL_JSON_FILE_NAME = "test.json" + + def setUp(self): + with open(self.INITIAL_JSON_FILE_NAME, "w") as f: + f.write(json.dumps(self.OriginalJson)) + + def tearDown(self): + if os.path.exists(self.INITIAL_JSON_FILE_NAME): + os.remove(self.INITIAL_JSON_FILE_NAME) + + def test_simple_json_rewrite(self): + new_json = { + "key1": "newvalue", + "obj": { + "key2": "newvalue2", + }, + "array": [ + "newvalue3", + "newvalue4", + "newvalue5", + ], + "objs": [ + { + "sharedkey": "newvalue6", + }, + { + "sharedkey": "newvalue7", + } + ], + } + new_json["key1"] = "newvalue" + files.rewrite_json_file(self.INITIAL_JSON_FILE_NAME, new_json) + with open(self.INITIAL_JSON_FILE_NAME) as file: + self.assertEqual(json.load(file), new_json) + + +class FindFilesCaseInsensativeTests(unittest.TestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_find_file(self): + with open(os.path.join(self.temp_dir, "file.txt"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["file.txt"])) + self.assertEqual([os.path.basename(file) for file in result], ["file.txt"]) + + def test_find_file_with_different_case(self): + with open(os.path.join(self.temp_dir, "file.txt"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["FILE.txt"])) + self.assertEqual([os.path.basename(file) for file in result], ["file.txt"]) + + def test_find_several_files_by_extension(self): + with open(os.path.join(self.temp_dir, "file.txt"), "w") as f: + f.write("") + with open(os.path.join(self.temp_dir, "file2.txt"), "w") as f: + f.write("") + with open(os.path.join(self.temp_dir, "file.md"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["*.txt"])) + self.assertEqual([os.path.basename(file) for file in result], ["file.txt", "file2.txt"]) + + def test_find_different_case_files(self): + with open(os.path.join(self.temp_dir, "file.txt"), "w") as f: + f.write("") + with open(os.path.join(self.temp_dir, "FILE.txt"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["file.txt"])) + self.assertEqual([os.path.basename(file) for file in result], ["FILE.txt", "file.txt"]) + + def test_find_different_case_files_by_extension(self): + with open(os.path.join(self.temp_dir, "file.txt"), "w") as f: + f.write("") + with open(os.path.join(self.temp_dir, "FILE.txt"), "w") as f: + f.write("") + with open(os.path.join(self.temp_dir, "file.md"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["f*.txt"])) + self.assertEqual([os.path.basename(file) for file in result], ["FILE.txt", "file.txt"]) + + def test_empty_directory(self): + self.assertEqual(files.find_files_case_insensitive(self.temp_dir, ["file.txt"]), []) + + def test_find_no_files_by_extension(self): + self.assertEqual(files.find_files_case_insensitive(self.temp_dir, ["*.txt"]), []) + + def test_find_no_files(self): + with open(os.path.join(self.temp_dir, "file.md"), "w") as f: + f.write("") + + self.assertEqual(files.find_files_case_insensitive(self.temp_dir, ["file.txt"]), []) + + def test_no_such_directory(self): + self.assertEqual(files.find_files_case_insensitive(os.path.join(self.temp_dir, "no_such_dir"), ["file.txt"]), []) + + def test_several_regexps(self): + with open(os.path.join(self.temp_dir, "file.txt"), "w") as f: + f.write("") + with open(os.path.join(self.temp_dir, "file2.txt"), "w") as f: + f.write("") + with open(os.path.join(self.temp_dir, "file.md"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["file.txt", "*.md"])) + self.assertEqual([os.path.basename(file) for file in result], ["file.md", "file.txt"]) + + def test_repo_example(self): + file_names = ["almalinux-ha.repo", "almalinux-powertools.repo", "almalinux-rt.repo", + "ELevate.repo", "epel-testing-modular.repo", "imunify360-testing.repo", + "kolab-16-testing-candidate.repo", "plesk-ext-ruby.repo", "almalinux-nfv.repo", + "almalinux.repo", "almalinux-saphana.repo", "epel-modular.repo", + "epel-testing.repo", "imunify-rollout.repo", "kolab-16-testing.repo", + "plesk.repo", "almalinux-plus.repo", "almalinux-resilientstorage.repo", + "almalinux-sap.repo", "epel.repo", "imunify360.repo", + "kolab-16.repo", "plesk-ext-panel-migrator.repo", + ] + + for file_name in file_names: + with open(os.path.join(self.temp_dir, file_name), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["plesk*.repo"])) + self.assertEqual([os.path.basename(file) for file in result], ["plesk-ext-panel-migrator.repo", "plesk-ext-ruby.repo", "plesk.repo"]) + + def test_recursive_simple(self): + os.mkdir(os.path.join(self.temp_dir, "subdir")) + with open(os.path.join(self.temp_dir, "subdir", "file.txt"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["file.txt"], recursive=True)) + self.assertEqual([os.path.relpath(file, self.temp_dir) for file in result], ["subdir/file.txt"]) + + def test_recursive_in_dir_and_subdir(self): + with open(os.path.join(self.temp_dir, "file1.txt"), "w") as f: + f.write("") + os.mkdir(os.path.join(self.temp_dir, "subdir")) + with open(os.path.join(self.temp_dir, "subdir", "file2.txt"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["*.txt"], recursive=True)) + self.assertEqual([os.path.relpath(file, self.temp_dir) for file in result], ["file1.txt", "subdir/file2.txt"]) + + def test_subdir_search_is_not_supported(self): + # Just to show that we don't support seraching with subdir included + # in regexp. We can search only by filenames for now. + os.mkdir(os.path.join(self.temp_dir, "subdir")) + with open(os.path.join(self.temp_dir, "subdir", "file.txt"), "w") as f: + f.write("") + + result = sorted(files.find_files_case_insensitive(self.temp_dir, ["subdir/file.txt"], recursive=True)) + self.assertEqual([os.path.relpath(file, self.temp_dir) for file in result], []) + + +class CheckDirectoryIsEmpty(unittest.TestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_empty_directory(self): + self.assertTrue(files.is_directory_empty(self.temp_dir)) + + def test_directory_with_file(self): + with open(os.path.join(self.temp_dir, "file.txt"), "w") as f: + f.write("") + self.assertFalse(files.is_directory_empty(self.temp_dir)) + + def test_no_such_directory(self): + self.assertTrue(files.is_directory_empty(os.path.join(self.temp_dir, "no_such_dir"))) + + +class FindSubdirectory(unittest.TestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_find_by_name(self): + os.mkdir(os.path.join(self.temp_dir, "subdir")) + os.mkdir(os.path.join(self.temp_dir, "subdir2")) + + self.assertEqual(files.find_subdirectory_by(self.temp_dir, lambda subdir: os.path.basename(subdir) == "subdir2"), + os.path.join(self.temp_dir, "subdir2")) + + def test_find_by_name_not_found(self): + os.mkdir(os.path.join(self.temp_dir, "subdir")) + os.mkdir(os.path.join(self.temp_dir, "subdir2")) + + self.assertIsNone(files.find_subdirectory_by(self.temp_dir, lambda subdir: os.path.basename(subdir) == "subdir3")) + + def test_find_by_name_in_subdir(self): + os.mkdir(os.path.join(self.temp_dir, "subdir")) + os.mkdir(os.path.join(self.temp_dir, "subdir2")) + os.mkdir(os.path.join(self.temp_dir, "subdir2", "subdir3")) + + self.assertEqual(files.find_subdirectory_by(self.temp_dir, lambda subdir: os.path.basename(subdir) == "subdir3"), + os.path.join(self.temp_dir, "subdir2", "subdir3")) + + def test_find_by_file_inside(self): + os.mkdir(os.path.join(self.temp_dir, "subdir")) + os.mkdir(os.path.join(self.temp_dir, "subdir2")) + with open(os.path.join(self.temp_dir, "subdir2", "file.txt"), "w") as f: + f.write("") + + self.assertEqual(files.find_subdirectory_by(self.temp_dir, lambda subdir: os.path.exists(os.path.join(subdir, "file.txt"))), os.path.join(self.temp_dir, "subdir2")) + + +class FindFileSubstring(unittest.TestCase): + + def setUp(self): + self.temp_file = tempfile.mkstemp()[1] + + def tearDown(self) -> None: + os.remove(self.temp_file) + + def test_one_line(self): + with open(self.temp_file, "w") as f: + f.write("aaaa: bbbbbb\n") + f.write("cccc: bbbbbb\n") + f.write("dddd: kkkkkk\n") + + self.assertEqual(files.find_file_substrings(self.temp_file, "cccc"), ["cccc: bbbbbb\n"]) + + def test_several_lines(self): + with open(self.temp_file, "w") as f: + f.write("aaaa: bbbbbb\n") + f.write("cccc: bbbbbb\n") + f.write("dddd: kkkkkk\n") + + self.assertEqual(files.find_file_substrings(self.temp_file, "bbbbb"), ["aaaa: bbbbbb\n", "cccc: bbbbbb\n"]) + + def test_no_such_file(self): + self.assertEqual(files.find_file_substrings("no_such_file.txt", "bbbbb"), []) + + def test_no_such_substring(self): + with open(self.temp_file, "w") as f: + f.write("aaaa: bbbbbb\n") + f.write("cccc: bbbbbb\n") + f.write("dddd: kkkkkk\n") + + self.assertEqual(files.find_file_substrings(self.temp_file, "no_such_substring"), []) + + +class CNFSetVariable(unittest.TestCase): + + TEST_FILE_CONTENT = """ +[test] +variable1=value1 +""" + + def setUp(self): + self.temp_file = tempfile.mkstemp()[1] + with open(self.temp_file, "w") as f: + f.write(self.TEST_FILE_CONTENT) + + def tearDown(self) -> None: + os.remove(self.temp_file) + + def test_change_variable(self): + EXPECTED_FILE_CONTENT = """ +[test] +variable1=value2 +""" + + files.cnf_set_section_variable(self.temp_file, "test", "variable1", "value2") + with open(self.temp_file) as f: + self.assertEqual(f.read(), EXPECTED_FILE_CONTENT) + + def test_add_variable(self): + EXPECTED_FILE_CONTENT = """ +[test] +variable1=value1 +variable2=value2 +""" + + files.cnf_set_section_variable(self.temp_file, "test", "variable2", "value2") + with open(self.temp_file) as f: + self.assertEqual(f.read(), EXPECTED_FILE_CONTENT) + + def test_insert_section(self): + EXPECTED_FILE_CONTENT = """ +[test] +variable1=value1 + +[test2] +variable2=value2 +""" + + files.cnf_set_section_variable(self.temp_file, "test2", "variable2", "value2") + with open(self.temp_file) as f: + self.assertEqual(f.read(), EXPECTED_FILE_CONTENT) + + def test_add_section_similar_variable(self): + EXPECTED_FILE_CONTENT = """ +[test] +variable1=value1 + +[test2] +variable1=value2 +""" + + files.cnf_set_section_variable(self.temp_file, "test2", "variable1", "value2") + with open(self.temp_file) as f: + self.assertEqual(f.read(), EXPECTED_FILE_CONTENT) + + +class CNFUnsetVariable(unittest.TestCase): + + TEST_FILE_CONTENT = """ +[test1] +variable1=value1 +variable2=value1 + +[test2] +variable1=value2 +""" + + def setUp(self): + self.temp_file = tempfile.mkstemp()[1] + with open(self.temp_file, "w") as f: + f.write(self.TEST_FILE_CONTENT) + + def tearDown(self) -> None: + os.remove(self.temp_file) + + def test_remove_variable(self): + EXPECTED_FILE_CONTENT = """ +[test1] +variable1=value1 + +[test2] +variable1=value2 +""" + + files.cnf_unset_section_variable(self.temp_file, "test1", "variable2") + with open(self.temp_file) as f: + self.assertEqual(f.read(), EXPECTED_FILE_CONTENT) + + def test_remove_similar_variable(self): + EXPECTED_FILE_CONTENT = """ +[test1] +variable2=value1 + +[test2] +variable1=value2 +""" + files.cnf_unset_section_variable(self.temp_file, "test1", "variable1") + with open(self.temp_file) as f: + self.assertEqual(f.read(), EXPECTED_FILE_CONTENT) + + def test_remove_last_section_variable(self): + EXPECTED_FILE_CONTENT = """ +[test1] +variable1=value1 +variable2=value1 + +[test2] +""" + files.cnf_unset_section_variable(self.temp_file, "test2", "variable1") + with open(self.temp_file) as f: + self.assertEqual(f.read(), EXPECTED_FILE_CONTENT) diff --git a/tests/leapp_configs_tests.py b/tests/leapp_configs_tests.py new file mode 100644 index 0000000..69c5fa5 --- /dev/null +++ b/tests/leapp_configs_tests.py @@ -0,0 +1,532 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import unittest +import os +import json +import typing + +from common import leapp_configs + + +class AddMappingTests(unittest.TestCase): + + LEAPP_REPO_FILE = "leapp_repos.repo" + LEAPP_MAP_FILE = "map.repo" + + def tearDown(self): + for files in (self.LEAPP_REPO_FILE, self.LEAPP_MAP_FILE): + if os.path.exists(files): + os.remove(files) + + def _perform_test(self, repos: typing.Dict[str, str], expected_repos: str, expected_mapping: str, ignore: bool = None) -> None: + for filename, content in repos.items(): + with open(filename, "w") as f: + f.write(content) + + leapp_configs.add_repositories_mapping(repos, ignore=ignore, + leapp_repos_file_path=self.LEAPP_REPO_FILE, + mapfile_path=self.LEAPP_MAP_FILE) + + with open(self.LEAPP_REPO_FILE) as f: + lines = [line.rstrip() for line in f.readlines() if not line.rstrip() == ""] + print(lines) + self.assertEqual(lines, expected_repos.splitlines()) + + with open(self.LEAPP_MAP_FILE) as f: + lines = [line.rstrip() for line in f.readlines() if not line.rstrip() == ""] + self.assertEqual(lines, expected_mapping.splitlines()) + + for files in repos.keys(): + if os.path.exists(files): + os.remove(files) + + def test_simple_mapping(self): + simple_repos = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 +#no comment removed + +[repo2] +name=repo2 +baseurl=http://repo2/rpm-CentOS-7 +enabled=1 +gpgcheck=0 + +[repo3] +name=repo3 +baseurl=http://repo3/centos7 +enabled=1 +gpgcheck=0 +""" + + expected_leapp_repos = """[alma-repo1] +name=Alma repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 +#no comment removed +[alma-repo2] +name=Alma repo2 +baseurl=http://repo2/rpm-RedHat-el8 +enabled=1 +gpgcheck=0 +[alma-repo3] +name=Alma repo3 +baseurl=http://repo3/centos8 +enabled=1 +gpgcheck=0 +""" + expected_leapp_mapping = """repo1,alma-repo1,alma-repo1,all,all,x86_64,rpm,ga,ga +repo2,alma-repo2,alma-repo2,all,all,x86_64,rpm,ga,ga +repo3,alma-repo3,alma-repo3,all,all,x86_64,rpm,ga,ga +""" + + self._perform_test({"simple_repos.repo": simple_repos}, + expected_leapp_repos, expected_leapp_mapping) + + def test_kolab_related_mapping(self): + kolab_repos = """[kolab-repo] +name=Kolab repo +baseurl=https://mirror.apheleia-it.ch/repos/Kolab:/16/CentOS_7_Plesk_17/src +enabled=0 +priority=60 +skip_if_unavailable=1 +gpgcheck=1 +""" + + expected_kolab_leapp_repos = """[alma-kolab-repo] +name=Alma Kolab repo +baseurl=https://mirror.apheleia-it.ch/repos/Kolab:/16/CentOS_8_Plesk_17/src +enabled=0 +priority=60 +skip_if_unavailable=1 +gpgcheck=1 +""" + + expected_kolab_leapp_mapping = """kolab-repo,alma-kolab-repo,alma-kolab-repo,all,all,x86_64,rpm,ga,ga +""" + + self._perform_test({"kolab.repo": kolab_repos}, + expected_kolab_leapp_repos, expected_kolab_leapp_mapping) + + def test_epel_mapping(self): + epel_like_repos = """[epel-repo] +name=EPEL-7 repo +metalink=http://epel-repo/epel-7 +enabled=1 +gpgcheck=0 + +[epel-debug-repo] +name=EPEL-7 debug repo +metalink=http://epel-repo/epel-debug-7 +enabled=1 +gpgcheck=0 + +[epel-source-repo] +name=EPEL-7 source repo +metalink=http://epel-repo/epel-source-7 +enabled=1 +gpgcheck=0 +""" + expected_leapp_repos = """[alma-epel-repo] +name=Alma EPEL-8 repo +metalink=http://epel-repo/epel-8 +enabled=1 +gpgcheck=0 +[alma-epel-debug-repo] +name=Alma EPEL-8 debug repo +metalink=http://epel-repo/epel-debug-8 +enabled=1 +gpgcheck=0 +[alma-epel-source-repo] +name=Alma EPEL-8 source repo +metalink=http://epel-repo/epel-source-8 +enabled=1 +gpgcheck=0 +""" + expected_leapp_mapping = """epel-repo,alma-epel-repo,alma-epel-repo,all,all,x86_64,rpm,ga,ga +epel-debug-repo,alma-epel-debug-repo,alma-epel-debug-repo,all,all,x86_64,rpm,ga,ga +epel-source-repo,alma-epel-source-repo,alma-epel-source-repo,all,all,x86_64,rpm,ga,ga +""" + self._perform_test({"epel_repos.repo": epel_like_repos}, + expected_leapp_repos, expected_leapp_mapping) + + def test_plesk_mapping(self): + plesk_like_repos = """[PLESK_18_0_XX-extras] +name=plesk extras repo +baseurl=http://plesk/rpm-CentOS-7/extras +enabled=1 +gpgcheck=0 + +[PLESK_18_0_XX-PHP-5.5] +name=plesk php 5.5 repo +baseurl=http://plesk/rpm-CentOS-7/php-5.5 +enabled=1 +gpgcheck=0 + +[PLESK_18_0_XX-PHP72] +name=plesk php 7.2 repo +baseurl=http://plesk/rpm-CentOS-7/PHP_7.2 +enabled=1 +gpgcheck=0 + +[PLESK_18_0_XX-PHP80] +name=plesk php 8.0 repo +baseurl=http://plesk/rpm-CentOS-7/PHP_8.0 +enabled=1 +gpgcheck=0 +""" + expected_leapp_repos = """[alma-PLESK_18_0_XX-extras] +name=Alma plesk extras repo +baseurl=http://plesk/rpm-RedHat-el8/extras +enabled=1 +gpgcheck=0 +[alma-PLESK_18_0_XX] +name=Alma plesk repo +baseurl=http://plesk/rpm-RedHat-el8/dist +enabled=1 +gpgcheck=1 +[alma-PLESK_18_0_XX-PHP72] +name=Alma plesk php 7.2 repo +baseurl=http://plesk/rpm-CentOS-8/PHP_7.2 +enabled=1 +gpgcheck=0 +[alma-PLESK_18_0_XX-PHP80] +name=Alma plesk php 8.0 repo +baseurl=http://plesk/rpm-RedHat-el8/PHP_8.0 +enabled=1 +gpgcheck=0 +""" + expected_leapp_mapping = """PLESK_18_0_XX-extras,alma-PLESK_18_0_XX,alma-PLESK_18_0_XX,all,all,x86_64,rpm,ga,ga +PLESK_18_0_XX-extras,alma-PLESK_18_0_XX-extras,alma-PLESK_18_0_XX-extras,all,all,x86_64,rpm,ga,ga +PLESK_18_0_XX-PHP72,alma-PLESK_18_0_XX-PHP72,alma-PLESK_18_0_XX-PHP72,all,all,x86_64,rpm,ga,ga +PLESK_18_0_XX-PHP80,alma-PLESK_18_0_XX-PHP80,alma-PLESK_18_0_XX-PHP80,all,all,x86_64,rpm,ga,ga +""" + + self._perform_test({"plesk_repos.repo": plesk_like_repos}, + expected_leapp_repos, expected_leapp_mapping, + ignore=["PLESK_18_0_XX-PHP-5.5"]) + + def test_mariadb_mapping(self): + mariadb_like_repos = """[mariadb] +name = MariaDB +baseurl = http://yum.mariadb.org/10.11/centos7-amd64 +module_hotfixes=1 +gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB +gpgcheck=1 +""" + + expected_mariadb_repos = """[alma-mariadb] +name=Alma MariaDB +baseurl=http://yum.mariadb.org/10.11/rhel8-amd64 +module_hotfixes=1 +gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB +gpgcheck=1 +""" + + expected_mariadb_mapping = """mariadb,alma-mariadb,alma-mariadb,all,all,x86_64,rpm,ga,ga +""" + + self._perform_test({"mariadb.repo": mariadb_like_repos}, + expected_mariadb_repos, expected_mariadb_mapping) + + def test_official_postgresql_mapping(self): + # Not full, but representative enough + postgresql_like_repos = """[pgdg-common] +name=PostgreSQL common RPMs for RHEL / CentOS $releasever - $basearch +baseurl=https://download.postgresql.org/pub/repos/yum/common/redhat/rhel-$releasever-$basearch +enabled=1 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg15] +name=PostgreSQL 15 for RHEL / CentOS $releasever - $basearch +baseurl=https://download.postgresql.org/pub/repos/yum/15/redhat/rhel-$releasever-$basearch +enabled=1 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg-common-testing] +name=PostgreSQL common testing RPMs for RHEL / CentOS $releasever - $basearch +baseurl=https://download.postgresql.org/pub/repos/yum/testing/common/redhat/rhel-$releasever-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg16-updates-testing] +name=PostgreSQL 16 for RHEL / CentOS $releasever - $basearch - Updates testing +baseurl=https://download.postgresql.org/pub/repos/yum/testing/16/redhat/rhel-$releasever-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg15-updates-testing] +name=PostgreSQL 15 for RHEL / CentOS $releasever - $basearch - Updates testing +baseurl=https://download.postgresql.org/pub/repos/yum/testing/15/redhat/rhel-$releasever-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg-source-common] +name=PostgreSQL 12 for RHEL / CentOS $releasever - $basearch - Source +baseurl=https://download.postgresql.org/pub/repos/yum/srpms/common/redhat/rhel-$releasever-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg15-updates-testing-debuginfo] +name=PostgreSQL 15 for RHEL / CentOS $releasever - $basearch - Debuginfo +baseurl=https://download.postgresql.org/pub/repos/yum/testing/debug/15/redhat/rhel-$releasever-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg15-source-updates-testing] +name=PostgreSQL 15 for RHEL / CentOS $releasever - $basearch - Source updates testing +baseurl=https://download.postgresql.org/pub/repos/yum/srpms/testing/15/redhat/rhel-$releasever-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg14-source] +name=PostgreSQL 14 for RHEL / CentOS $releasever - $basearch - Source +baseurl=https://download.postgresql.org/pub/repos/yum/srpms/14/redhat/rhel-$releasever-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 + +[pgdg14-source-updates-testing] +name=PostgreSQL 14 for RHEL / CentOS $releasever - $basearch - Source updates testing +baseurl=https://download.postgresql.org/pub/repos/yum/srpms/testing/14/redhat/rhel-$releasever-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 1 +""" + + expected_postgresql_repos = """[alma-pgdg-common] +name=Alma PostgreSQL common RPMs for RHEL / CentOS 8 - $basearch +baseurl=https://download.postgresql.org/pub/repos/yum/common/redhat/rhel-8-$basearch +enabled=1 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg15] +name=Alma PostgreSQL 15 for RHEL / CentOS 8 - $basearch +baseurl=https://download.postgresql.org/pub/repos/yum/15/redhat/rhel-8-$basearch +enabled=1 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg-common-testing] +name=Alma PostgreSQL common testing RPMs for RHEL / CentOS 8 - $basearch +baseurl=https://download.postgresql.org/pub/repos/yum/testing/common/redhat/rhel-8-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg16-updates-testing] +name=Alma PostgreSQL 16 for RHEL / CentOS 8 - $basearch - Updates testing +baseurl=https://download.postgresql.org/pub/repos/yum/testing/16/redhat/rhel-8-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg15-updates-testing] +name=Alma PostgreSQL 15 for RHEL / CentOS 8 - $basearch - Updates testing +baseurl=https://download.postgresql.org/pub/repos/yum/15/redhat/rhel-8-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg-source-common] +name=Alma PostgreSQL 12 for RHEL / CentOS 8 - $basearch - Source +baseurl=https://download.postgresql.org/pub/repos/yum/srpms/common/redhat/rhel-8-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg15-updates-testing-debuginfo] +name=Alma PostgreSQL 15 for RHEL / CentOS 8 - $basearch - Debuginfo +baseurl=https://download.postgresql.org/pub/repos/yum/testing/debug/15/redhat/rhel-8-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg15-source-updates-testing] +name=Alma PostgreSQL 15 for RHEL / CentOS 8 - $basearch - Source updates testing +baseurl=https://download.postgresql.org/pub/repos/yum/srpms/testing/15/redhat/rhel-8-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg14-source] +name=Alma PostgreSQL 14 for RHEL / CentOS 8 - $basearch - Source +baseurl=https://download.postgresql.org/pub/repos/yum/srpms/14/redhat/rhel-8-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +[alma-pgdg14-source-updates-testing] +name=Alma PostgreSQL 14 for RHEL / CentOS 8 - $basearch - Source updates testing +baseurl=https://download.postgresql.org/pub/repos/yum/srpms/14/redhat/rhel-8-$basearch +enabled=0 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-PGDG +repo_gpgcheck = 0 +""" + + expected_postgresql_mapping = """pgdg-common,alma-pgdg-common,alma-pgdg-common,all,all,x86_64,rpm,ga,ga +pgdg15,alma-pgdg15,alma-pgdg15,all,all,x86_64,rpm,ga,ga +pgdg-common-testing,alma-pgdg-common-testing,alma-pgdg-common-testing,all,all,x86_64,rpm,ga,ga +pgdg16-updates-testing,alma-pgdg16-updates-testing,alma-pgdg16-updates-testing,all,all,x86_64,rpm,ga,ga +pgdg15-updates-testing,alma-pgdg15-updates-testing,alma-pgdg15-updates-testing,all,all,x86_64,rpm,ga,ga +pgdg-source-common,alma-pgdg-source-common,alma-pgdg-source-common,all,all,x86_64,rpm,ga,ga +pgdg15-updates-testing-debuginfo,alma-pgdg15-updates-testing-debuginfo,alma-pgdg15-updates-testing-debuginfo,all,all,x86_64,rpm,ga,ga +pgdg15-source-updates-testing,alma-pgdg15-source-updates-testing,alma-pgdg15-source-updates-testing,all,all,x86_64,rpm,ga,ga +pgdg14-source,alma-pgdg14-source,alma-pgdg14-source,all,all,x86_64,rpm,ga,ga +pgdg14-source-updates-testing,alma-pgdg14-source-updates-testing,alma-pgdg14-source-updates-testing,all,all,x86_64,rpm,ga,ga +""" + + self._perform_test({"pgdg-redhat-all.repo": postgresql_like_repos}, + expected_postgresql_repos, expected_postgresql_mapping) + + +class SetPackageRepositoryTests(unittest.TestCase): + INITIAL_JSON = { + "packageinfo": [ + { + "in_packageset": { + "package": [ + { + "name": "some", + "repository": "some-repo", + }, + ], + }, + "out_packageset": { + "package": [ + { + "name": "some", + "repository": "other-repo", + }, + ], + }, + }, + { + "in_packageset": { + "package": [ + { + "name": "other", + "repository": "some-repo", + }, + ], + }, + "out_packageset": { + "package": [ + { + "name": "other", + "repository": "other-repo", + }, + ], + }, + } + ] + } + + JSON_FILE_PATH = "leapp_upgrade_repositories.json" + # Since json could take pretty much symbols remove the restriction + maxDiff = None + + def setUp(self): + with open(self.JSON_FILE_PATH, "w") as f: + f.write(json.dumps(self.INITIAL_JSON, indent=4)) + + def tearDown(self): + if os.path.exists(self.JSON_FILE_PATH): + os.remove(self.JSON_FILE_PATH) + pass + + def test_set_package_repository(self): + leapp_configs.set_package_repository("some", "alma-repo", leapp_pkgs_conf_path=self.JSON_FILE_PATH) + + with open(self.JSON_FILE_PATH) as f: + json_data = json.load(f) + self.assertEqual(json_data["packageinfo"][0]["out_packageset"]["package"][0]["repository"], "alma-repo") + self.assertEqual(json_data["packageinfo"][1]["out_packageset"]["package"][0]["repository"], "other-repo") + + def test_set_unexcited_package(self): + leapp_configs.set_package_repository("unexsisted", "alma-repo", leapp_pkgs_conf_path=self.JSON_FILE_PATH) + + with open(self.JSON_FILE_PATH, "r") as f: + json_data = json.load(f) + print(json_data) + print(self.INITIAL_JSON) + self.assertEqual(json_data, self.INITIAL_JSON) + + +class SetPackageActionTests(unittest.TestCase): + INITIAL_JSON = { + "packageinfo": [ + { + "action": 1, + "in_packageset": { + "package": [ + { + "name": "some", + "repository": "some-repo", + }, + ], + }, + }, + { + "action": 4, + "in_packageset": { + "package": [ + { + "name": "other", + "repository": "some-repo", + }, + ], + }, + } + ] + } + + JSON_FILE_PATH = "leapp_upgrade_repositories.json" + # Since json could take pretty much symbols remove the restriction + maxDiff = None + + def setUp(self): + with open(self.JSON_FILE_PATH, "w") as f: + f.write(json.dumps(self.INITIAL_JSON, indent=4)) + + def tearDown(self): + if os.path.exists(self.JSON_FILE_PATH): + os.remove(self.JSON_FILE_PATH) + pass + + def test_set_package_action(self): + leapp_configs.set_package_action("some", 3, leapp_pkgs_conf_path=self.JSON_FILE_PATH) + + with open(self.JSON_FILE_PATH) as f: + json_data = json.load(f) + self.assertEqual(json_data["packageinfo"][0]["action"], 3) + self.assertEqual(json_data["packageinfo"][1]["action"], 4) + + def test_set_unexcited_package_action(self): + leapp_configs.set_package_action("unexsisted", 3, leapp_pkgs_conf_path=self.JSON_FILE_PATH) + + with open(self.JSON_FILE_PATH, "r") as f: + json_data = json.load(f) + self.assertEqual(json_data, self.INITIAL_JSON) diff --git a/tests/motdtests.py b/tests/motdtests.py new file mode 100644 index 0000000..8e0a2e0 --- /dev/null +++ b/tests/motdtests.py @@ -0,0 +1,119 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import unittest +import os +import tempfile + +from common import motd + + +class InprogressSshLoginMessageTests(unittest.TestCase): + def setUp(self): + self.motd_path = tempfile.mktemp() + + def tearDown(self): + for path in [self.motd_path, self.motd_path + ".bak"]: + if os.path.exists(path): + os.remove(path) + + def test_add_simple_message(self): + expected_message = "one" + motd.add_inprogress_ssh_login_message(expected_message, self.motd_path) + with open(self.motd_path) as motd_file: + self.assertEqual(motd_file.read(), expected_message) + + def test_add_two_messages(self): + expected_message = "one\ntwo\n" + motd.add_inprogress_ssh_login_message("one\n", self.motd_path) + motd.add_inprogress_ssh_login_message("two\n", self.motd_path) + with open(self.motd_path) as motd_file: + self.assertEqual(motd_file.read(), expected_message) + + def test_old_backuped(self): + with open(self.motd_path, "w") as motd_file: + motd_file.write("old\n") + + motd.add_inprogress_ssh_login_message("new\n", self.motd_path) + + with open(self.motd_path) as motd_file: + self.assertEqual(motd_file.read(), "old\nnew\n") + + with open(self.motd_path + ".bak") as motd_file: + self.assertEqual(motd_file.read(), "old\n") + + def test_restore(self): + with open(self.motd_path, "w") as motd_file: + motd_file.write("old") + + motd.add_inprogress_ssh_login_message("new", self.motd_path) + + motd.restore_ssh_login_message(self.motd_path) + + with open(self.motd_path) as motd_file: + self.assertEqual(motd_file.read(), "old") + + +class FinishSshLoginMessageTests(unittest.TestCase): + def setUp(self): + self.motd_path = tempfile.mktemp() + + def tearDown(self): + for path in [self.motd_path, self.motd_path + ".bak", self.motd_path + ".next"]: + if os.path.exists(path): + os.remove(path) + + def test_publish_simple_message(self): + expected_message = """ +=============================================================================== +Message from the Plesk ubuntu18to20 tool: +one +You can remove this message from the {} file. +=============================================================================== +""".format(motd.MOTD_PATH) + + motd.add_finish_ssh_login_message("one\n", self.motd_path) + motd.publish_finish_ssh_login_message(self.motd_path) + with open(self.motd_path) as motd_file: + self.assertEqual(motd_file.read(), expected_message) + + def test_publish_several_messages(self): + expected_message = """ +=============================================================================== +Message from the Plesk ubuntu18to20 tool: +one +two +You can remove this message from the {} file. +=============================================================================== +""".format(motd.MOTD_PATH) + motd.add_finish_ssh_login_message("one\n", self.motd_path) + motd.add_finish_ssh_login_message("two\n", self.motd_path) + motd.publish_finish_ssh_login_message(self.motd_path) + with open(self.motd_path) as motd_file: + self.assertEqual(motd_file.read(), expected_message) + + def test_file_next_is_removed(self): + motd.add_finish_ssh_login_message("one", self.motd_path) + motd.publish_finish_ssh_login_message(self.motd_path) + self.assertFalse(os.path.exists(self.motd_path + ".next")) + + def test_backuped_message_saved(self): + expected_message = """old + +=============================================================================== +Message from the Plesk ubuntu18to20 tool: +one +two +You can remove this message from the {} file. +=============================================================================== +""".format(motd.MOTD_PATH) + + with open(self.motd_path + ".bak", "w") as motd_file: + motd_file.write("old\n") + + motd.add_inprogress_ssh_login_message("new\n", self.motd_path) + + motd.add_finish_ssh_login_message("one\n", self.motd_path) + motd.add_finish_ssh_login_message("two\n", self.motd_path) + motd.publish_finish_ssh_login_message(self.motd_path) + + with open(self.motd_path) as motd_file: + self.assertEqual(motd_file.read(), expected_message) diff --git a/tests/rpmtests.py b/tests/rpmtests.py new file mode 100644 index 0000000..9fa6bf8 --- /dev/null +++ b/tests/rpmtests.py @@ -0,0 +1,269 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import unittest +import os + +from common import rpm + + +class RemoveRepositoriesTests(unittest.TestCase): + REPO_FILE_CONTENT = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo2] +name=repo2 +baseurl=http://repo2 +enabled=1 +gpgcheck=0 + +[repo3] +name=repo3 +baseurl=http://repo3 +enabled=1 +gpgcheck=0 +""" + + REPO_FILE_NAME = "repo_file.txt" + + def setUp(self): + with open(self.REPO_FILE_NAME, "w") as f: + f.write(self.REPO_FILE_CONTENT) + + def tearDown(self): + if os.path.exists(self.REPO_FILE_NAME): + os.remove(self.REPO_FILE_NAME) + + def test_remove_first_repo(self): + expected_content = """[repo2] +name=repo2 +baseurl=http://repo2 +enabled=1 +gpgcheck=0 + +[repo3] +name=repo3 +baseurl=http://repo3 +enabled=1 +gpgcheck=0 +""" + + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo1"]) + + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_remove_multiple_repos(self): + expected_content = """[repo3] +name=repo3 +baseurl=http://repo3 +enabled=1 +gpgcheck=0 +""" + + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo1", + lambda id, _1, _2, _3: id == "repo2"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_remove_all_repos(self): + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo1", + lambda id, _1, _2, _3: id == "repo2", + lambda id, _1, _2, _3: id == "repo3"]) + self.assertEqual(os.path.exists(self.REPO_FILE_NAME), False) + + def test_remove_non_existing_repo(self): + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo4"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), self.REPO_FILE_CONTENT) + + def test_remove_last_repo(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo2] +name=repo2 +baseurl=http://repo2 +enabled=1 +gpgcheck=0 + +""" + + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo3"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_remove_repo_with_metalink(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo2] +name=repo2 +baseurl=http://repo2 +enabled=1 +gpgcheck=0 + +[repo3] +name=repo3 +baseurl=http://repo3 +enabled=1 +gpgcheck=0 +""" + additional_repo = """[metarepo] +name=metarepo +metalink=http://metarepo +enabled=1 +gpgcheck=0 +""" + with open(self.REPO_FILE_NAME, "a") as f: + f.write(additional_repo) + + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, _2, _3, metalink: metalink == "http://metarepo"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_remove_repo_with_specific_name(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo3] +name=repo3 +baseurl=http://repo3 +enabled=1 +gpgcheck=0 +""" + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, name, _2, _3: name == "repo2"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_remove_repo_with_specific_baseurl(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo3] +name=repo3 +baseurl=http://repo3 +enabled=1 +gpgcheck=0 +""" + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, _2, baseurl, _3: baseurl == "http://repo2"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_remove_repo_by_id_or_url(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +""" + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, baseurl, _3: id == "repo2" or baseurl == "http://repo3"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + +class WriteRepodataTests(unittest.TestCase): + REPO_FILE_CONTENT = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +""" + + REPO_FILE_NAME = "repo_file.txt" + + def setUp(self): + with open(self.REPO_FILE_NAME, "w") as f: + f.write(self.REPO_FILE_CONTENT) + + def tearDown(self): + if os.path.exists(self.REPO_FILE_NAME): + os.remove(self.REPO_FILE_NAME) + + def test_write_repodata(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo2] +name=repo2 +baseurl=http://repo2 +enabled=1 +gpgcheck=0 +""" + rpm.write_repodata(self.REPO_FILE_NAME, "repo2", "repo2", "http://repo2", None, ["enabled=1\n", "gpgcheck=0\n"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + # Yeah, we don't check if reposiory is already in file. Maybe we will add the check in the future. + def test_write_exsisted_repodata(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 +""" + rpm.write_repodata(self.REPO_FILE_NAME, "repo1", "repo1", "http://repo1", None, ["enabled=1\n", "gpgcheck=0\n"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + +class HandleRpmnewFilesTests(unittest.TestCase): + def tearDown(self): + tests_related_files = ["test.txt", "test.txt.rpmnew", "test.txt.rpmsave"] + for file in tests_related_files: + if os.path.exists(file): + os.remove(file) + + def test_no_rpmnew(self): + with open("test.txt", "w") as f: + f.write("test") + + self.assertFalse(rpm.handle_rpmnew("test.txt")) + + def test_has_rpmnew(self): + with open("test.txt", "w") as f: + f.write("1") + + with open("test.txt.rpmnew", "w") as f: + f.write("2") + + self.assertTrue(rpm.handle_rpmnew("test.txt")) + self.assertTrue(os.path.exists("test.txt")) + self.assertEqual(open("test.txt").read(), "2") + + self.assertTrue(os.path.exists("test.txt.rpmsave")) + self.assertEqual(open("test.txt.rpmsave").read(), "1") + + def test_missing_original(self): + with open("test.txt.rpmnew", "w") as f: + f.write("2") + + self.assertTrue(rpm.handle_rpmnew("test.txt")) + self.assertTrue(os.path.exists("test.txt")) + self.assertEqual(open("test.txt").read(), "2") + + self.assertFalse(os.path.exists("test.txt.rpmsave")) diff --git a/tests/utiltests.py b/tests/utiltests.py new file mode 100644 index 0000000..f0a5ebf --- /dev/null +++ b/tests/utiltests.py @@ -0,0 +1,60 @@ +# Copyright 1999-2023. Plesk International GmbH. All rights reserved. +import unittest + +from common import util + + +class TestDictOfListsMerge(unittest.TestCase): + def test_same_field(self): + self.assertEqual( + util.merge_dicts_of_lists( + {"a": [1, 2, 3]}, + {"a": [4, 5, 6]} + ), + {"a": [1, 2, 3, 4, 5, 6]} + ) + + def test_different_fields(self): + self.assertEqual( + util.merge_dicts_of_lists( + {"a": [1, 2, 3]}, + {"b": [4, 5, 6]} + ), + {"a": [1, 2, 3], "b": [4, 5, 6]} + ) + + def test_first_empty(self): + self.assertEqual( + util.merge_dicts_of_lists( + {}, + {"a": [4, 5, 6]} + ), + {"a": [4, 5, 6]} + ) + + def test_second_empty(self): + self.assertEqual( + util.merge_dicts_of_lists( + {"a": [1, 2, 3]}, + {} + ), + {"a": [1, 2, 3]} + ) + + def test_both_empty(self): + self.assertEqual( + util.merge_dicts_of_lists( + {}, + {} + ), + {} + ) + + def test_complex(self): + self.assertEqual( + util.merge_dicts_of_lists( + {"a": [1, 2, 3], "b": [4, 5, 6]}, + {"a": [7, 8, 9], "c": [10, 11, 12]} + ), + {"a": [1, 2, 3, 7, 8, 9], "b": [4, 5, 6], "c": [10, 11, 12]} + ) diff --git a/tests/versiontests.py b/tests/versiontests.py new file mode 100644 index 0000000..a25acb6 --- /dev/null +++ b/tests/versiontests.py @@ -0,0 +1,175 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import unittest + +from common import version + + +class KernelVersionTests(unittest.TestCase): + + def _check_parse(self, version_string, expected): + kernel = version.KernelVersion(version_string) + self.assertEqual(str(kernel), expected) + + def test_kernel_parse_simple(self): + self._check_parse("3.10.0-1160.95.1.el7.x86_64", "3.10.0-1160.95.1.el7.x86_64") + + def test_kernel_parse_small_build(self): + self._check_parse("3.10.0-1160.el7.x86_64", "3.10.0-1160.el7.x86_64") + + def test_kernel_parse_large_build(self): + self._check_parse("2.25.16-1.2.3.4.5.el7.x86_64", "2.25.16-1.2.3.4.5.el7.x86_64") + + def test_kernel_parse_no_build(self): + self._check_parse("3.10.0.el7.x86_64", "3.10.0.el7.x86_64") + + def test_compare_simple_equal(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + self.assertEqual(kernel1, kernel2) + + def test_compare_simple_less_build(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("3.10.0-1160.95.2.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_simple_less_patch(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("3.10.2-1160.95.1.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_simple_less_patch_exponent(self): + kernel1 = version.KernelVersion("3.10.1-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("3.10.10-1160.95.1.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_simple_less_minor(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("3.11.0-1160.95.1.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_simple_less_minor_exponent(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("3.101.0-1160.95.1.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_simple_less_major(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("4.10.0-1160.95.1.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_simple_less_major_exponent(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("30.10.0-1160.95.1.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_different_length_build(self): + kernel1 = version.KernelVersion("3.10.0-957.5.1.el7.x86_64") + kernel2 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_different_build_subversion(self): + kernel1 = version.KernelVersion("3.10.0-957.el7.x86_64") + kernel2 = version.KernelVersion("3.10.0-1160.99.1.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_different_length_build_after_dot(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.23.el7.x86_64") + kernel2 = version.KernelVersion("3.10.0-1160.95.11.23.el7.x86_64") + self.assertLess(kernel1, kernel2) + + def test_compare_simple_build_vs_short(self): + kernel1 = version.KernelVersion("3.10.0-1160.95.1.el7.x86_64") + kernel2 = version.KernelVersion("3.10.0-1160.el7.x86_64") + self.assertGreater(kernel1, kernel2) + + def test_find_last_kernel(self): + kernels_strings = [ + "3.10.0-1160.76.1.el7.x86_64", + "3.10.0-1160.95.1.el7.x86_64", + "3.10.0-1160.el7.x86_64", + "3.10.0-1160.45.1.el7.x86_64", + ] + kernels = [version.KernelVersion(s) for s in kernels_strings] + + self.assertEqual(str(max(kernels)), "3.10.0-1160.95.1.el7.x86_64") + + def test_sort_kernels(self): + kernels_strings = [ + "3.10.0-1160.76.1.el7.x86_64", + "3.10.0-1160.95.1.el7.x86_64", + "3.10.0-1160.el7.x86_64", + "3.10.0-1160.45.1.el7.x86_64", + ] + kernels = [version.KernelVersion(s) for s in kernels_strings] + kernels.sort(reverse=True) + + expected = [ + "3.10.0-1160.95.1.el7.x86_64", + "3.10.0-1160.76.1.el7.x86_64", + "3.10.0-1160.45.1.el7.x86_64", + "3.10.0-1160.el7.x86_64", + ] + + self.assertEqual([str(k) for k in kernels], expected) + + +class PHPVersionTests(unittest.TestCase): + + def test_php_parse_simple(self): + php = version.PHPVersion("PHP 5.2") + self.assertEqual(php.major, 5) + self.assertEqual(php.minor, 2) + + def test_php_parse_plesk_package(self): + php = version.PHPVersion("plesk-php52") + self.assertEqual(php.major, 5) + self.assertEqual(php.minor, 2) + + def test_php_parse_plesk_package_7(self): + php = version.PHPVersion("plesk-php70") + self.assertEqual(php.major, 7) + self.assertEqual(php.minor, 0) + + def test_php_parse_version_small_string(self): + php = version.PHPVersion("5.2") + self.assertEqual(php.major, 5) + self.assertEqual(php.minor, 2) + + def test_php_parse_version_large_string(self): + php = version.PHPVersion("5.2.24") + self.assertEqual(php.major, 5) + self.assertEqual(php.minor, 2) + + def test_php_parse_wrong_string(self): + with self.assertRaises(ValueError): + version.PHPVersion("nothing") + + def test_compare_equal(self): + php1 = version.PHPVersion("PHP 5.2") + php2 = version.PHPVersion("PHP 5.2") + self.assertEqual(php1, php2) + + def test_compare_less_minor(self): + php1 = version.PHPVersion("PHP 5.2") + php2 = version.PHPVersion("PHP 5.3") + self.assertLess(php1, php2) + + def test_compare_less_major(self): + php1 = version.PHPVersion("PHP 5.2") + php2 = version.PHPVersion("PHP 6.2") + self.assertLess(php1, php2) + + def test_compare_less_major_exponent(self): + php1 = version.PHPVersion("PHP 5.2") + php2 = version.PHPVersion("PHP 15.2") + self.assertLess(php1, php2) + + def test_compare_less_major_and_minor(self): + php1 = version.PHPVersion("PHP 5.2") + php2 = version.PHPVersion("PHP 6.3") + self.assertLess(php1, php2) + + def test_compare_less_major_greater_minor(self): + php1 = version.PHPVersion("PHP 5.2") + php2 = version.PHPVersion("PHP 6.1") + self.assertLess(php1, php2) diff --git a/util.py b/util.py new file mode 100644 index 0000000..f65b489 --- /dev/null +++ b/util.py @@ -0,0 +1,38 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import subprocess +import typing + +from . import log + + +def logged_check_call(cmd: str, **kwargs) -> None: + log.info("Running: {cmd!s}. Output:".format(cmd=cmd)) + + # I beleive we should be able pass argument to the subprocess function + # from the caller. So we have to inject stdout/stderr/universal_newlines + kwargs["stdout"] = subprocess.PIPE + kwargs["stderr"] = subprocess.STDOUT + kwargs["universal_newlines"] = True + + process = subprocess.Popen(cmd, **kwargs) + while None is process.poll(): + line = process.stdout.readline() + if line and line.strip(): + log.info(line.strip(), to_stream=False) + + if process.returncode != 0: + log.err(f"Command '{cmd}' failed with return code {process.returncode}") + raise subprocess.CalledProcessError(process.returncode, cmd) + + log.info("Command '{cmd}' finished successfully".format(cmd=cmd)) + + +def merge_dicts_of_lists(dict1: typing.Dict[typing.Any, typing.Any], + dict2: typing.Dict[typing.Any, typing.Any]) -> typing.Dict[typing.Any, typing.Any]: + for key, value in dict2.items(): + if key in dict1: + for item in value: + dict1[key].append(item) + else: + dict1[key] = value + return dict1 diff --git a/version.py b/version.py new file mode 100644 index 0000000..7cbdf22 --- /dev/null +++ b/version.py @@ -0,0 +1,118 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. + +class KernelVersion(): + """Linux kernel version representation class.""" + + major: str + minor: str + patch: str + build: str + distro: str + arch: str + + def _extract_with_build(self, version: str): + main_part, secondary_part = version.split("-") + + self.major, self.minor, self.patch = main_part.split(".") + + for iter in range(len(secondary_part)): + if secondary_part[iter].isalpha(): + self.build = secondary_part[:iter - 1] + suffix = secondary_part[iter:] + self.distro, self.arch = suffix.split(".") + break + + def _extract_no_build(self, version: str): + self.build = "" + self.major, self.minor, self.patch, self.distro, self.arch = version.split(".") + + def __init__(self, version: str): + """Initialize a KernelVersion object.""" + self.major = 0 + self.minor = 0 + self.patch = 0 + self.build = 0 + self.distro = "" + self.arch = "" + + if "-" in version: + self._extract_with_build(version) + else: + self._extract_no_build(version) + + def __str__(self): + """Return a string representation of a KernelVersion object.""" + if self.build == "": + return f"{self.major}.{self.minor}.{self.patch}.{self.distro}.{self.arch}" + + return f"{self.major}.{self.minor}.{self.patch}-{self.build}.{self.distro}.{self.arch}" + + def __lt__(self, other): + if self.major < other.major or self.minor < other.minor or self.patch < other.patch: + return True + + for build_part_left, build_part_right in zip(self.build.split("."), other.build.split(".")): + if int(build_part_left) < int(build_part_right): + return True + elif int(build_part_left) > int(build_part_right): + return False + + return len(self.build) < len(other.build) + + def __eq__(self, other): + return self.major == other.major and self.minor == other.minor and self.patch == other.patch and self.build == other.build + + def __ge__(self, other): + return not self.__lt__(other) + + +class PHPVersion(): + """Php version representation class.""" + + major: int + minor: int + + def _extract_from_version(self, version: str): + # Version string example is "7.2" or "7.2.24" + major_part, minor_part = version.split(".")[:2] + self.major = int(major_part) + self.minor = int(minor_part) + + def _extract_from_desc(self, description: str): + # Description string example is "PHP 5.2" + major_part, minor_part = description.split(" ")[1].split(".") + self.major = int(major_part) + self.minor = int(minor_part) + + def _extract_from_plesk_package(self, packagename: str): + # Related package name example is plesk-php52 + version_part = packagename.split("php")[1] + self.major = int(version_part[0]) + self.minor = int(version_part[1]) + + def __init__(self, to_extract: str): + """Initialize a KernelVersion object.""" + self.major = 0 + self.minor = 0 + + if to_extract.startswith("plesk-php"): + self._extract_from_plesk_package(to_extract) + elif to_extract.startswith("PHP "): + self._extract_from_desc(to_extract) + elif to_extract[0].isdigit(): + self._extract_from_version(to_extract) + else: + raise ValueError(f"Cannot extract php version from '{to_extract}'") + + def __str__(self): + """Return a string representation of a PHPVersion object.""" + return f"PHP {self.major}.{self.minor}" + + def __lt__(self, other): + return self.major < other.major or self.minor < other.minor + + def __eq__(self, other): + return self.major == other.major and self.minor == other.minor + + def __ge__(self, other): + return not self.__lt__(other) diff --git a/writers.py b/writers.py new file mode 100644 index 0000000..5974d0c --- /dev/null +++ b/writers.py @@ -0,0 +1,41 @@ +# Copyright 1999 - 2023. Plesk International GmbH. All rights reserved. +import os +import sys + + +class Writer(): + def __init__(self): + pass + + def __enter__(self): + return self + + def write(self, message: str): + raise NotImplementedError("Not implemented writer call") + + def __exit__(self, *args): + pass + + +class StdoutWriter(Writer): + def write(self, message: str) -> None: + sys.stdout.write(message) + sys.stdout.flush() + + +class FileWriter(Writer): + def __init__(self, filename): + super().__init__() + self.filename = filename + + def __enter__(self): + self.file = open(self.filename, "w") + return self + + def write(self, message: str) -> None: + self.file.write(message) + self.file.flush() + + def __exit__(self, *args): + self.file.close() + os.unlink(self.filename)