From 54745cb20493c5c3a8047642f2497c7fd3575a4c Mon Sep 17 00:00:00 2001 From: Mikhail Sandakov Date: Thu, 21 Mar 2024 11:04:11 +0200 Subject: [PATCH] Support repositories with mirrorlist --- src/leapp_configs.py | 28 +++++++++---- src/rpm.py | 30 +++++++++----- tests/rpmtests.py | 94 +++++++++++++++++++++++++++++++++++++------- 3 files changed, 121 insertions(+), 31 deletions(-) diff --git a/src/leapp_configs.py b/src/leapp_configs.py index 4225e05..d0de5c0 100644 --- a/src/leapp_configs.py +++ b/src/leapp_configs.py @@ -26,6 +26,12 @@ metalink={url} """ +REPO_HEAD_WITH_MIRRORLIST = """ +[{id}] +name={name} +mirrorlist={url} +""" + def _do_replacement(to_change: str, replacement_list: typing.List[typing.Callable[[str], str]]) -> str: if to_change is None: @@ -131,12 +137,12 @@ def _do_common_replacement(line: str) -> str: ]) -def is_repo_ok(id: str, name: str, url: str, metalink: str) -> bool: +def is_repo_ok(id: str, name: str, url: str, metalink: str, mirrorlist: str) -> bool: if name is None: log.warn("Repository info for '[{id}]' has no a name".format(id=id)) return False - if url is None and metalink is None: + if url is None and metalink is None and mirrorlist is None: log.warn("Repository info for '{id}' has no baseurl and metalink".format(id=id)) return False @@ -154,8 +160,8 @@ def adopt_repositories(repofile: str, ignore: typing.List = None) -> None: return with open(repofile + ".next", "a") as dst: - for id, name, url, metalink, additional_lines in rpm.extract_repodata(repofile): - if not is_repo_ok(id, name, url, metalink): + for id, name, url, metalink, mirrorlist, additional_lines in rpm.extract_repodata(repofile): + if not is_repo_ok(id, name, url, metalink, mirrorlist): continue if id in ignore: @@ -169,9 +175,12 @@ def adopt_repositories(repofile: str, ignore: typing.List = None) -> None: if url is not None: url = _do_url_replacement(url) repo_format = REPO_HEAD_WITH_URL - else: + elif metalink is not None: url = _do_url_replacement(metalink) repo_format = REPO_HEAD_WITH_METALINK + else: + url = _do_url_replacement(mirrorlist) + repo_format = REPO_HEAD_WITH_MIRRORLIST dst.write(repo_format.format(id=id, name=name, url=url)) @@ -195,8 +204,8 @@ def add_repositories_mapping(repofiles: typing.List[str], ignore: typing.List = log.warn("The repository mapper has tried to open an unexistent file: {filename}".format(filename=file)) continue - for id, name, url, metalink, additional_lines in rpm.extract_repodata(file): - if not is_repo_ok(id, name, url, metalink): + for id, name, url, metalink, mirrorlist, additional_lines in rpm.extract_repodata(file): + if not is_repo_ok(id, name, url, metalink, mirrorlist): continue if id in ignore: @@ -210,9 +219,12 @@ def add_repositories_mapping(repofiles: typing.List[str], ignore: typing.List = if url is not None: url = _do_url_replacement(url) repo_format = REPO_HEAD_WITH_URL - else: + elif metalink is not None: url = _do_url_replacement(metalink) repo_format = REPO_HEAD_WITH_METALINK + else: + url = _do_url_replacement(mirrorlist) + repo_format = REPO_HEAD_WITH_MIRRORLIST leapp_repos_file.write(repo_format.format(id=new_id, name=name, url=url)) diff --git a/src/rpm.py b/src/rpm.py index 1df1eaa..197a2e2 100644 --- a/src/rpm.py +++ b/src/rpm.py @@ -17,24 +17,31 @@ metalink={url} """ +REPO_HEAD_WITH_MIRRORLIST = """[{id}] +name={name} +mirrorlist={url} +""" + -def extract_repodata(repofile: str) -> typing.Iterable[typing.Tuple[str, str, str, str, typing.List[str]]]: +def extract_repodata(repofile: str) -> typing.Iterable[typing.Tuple[str, str, str, str, str, typing.List[str]]]: id = None name = None url = None metalink = None + mirrorlist = None additional = [] with open(repofile, "r") as repo: for line in repo.readlines(): if line.startswith("["): if id is not None: - yield (id, name, url, metalink, additional) + yield (id, name, url, metalink, mirrorlist, additional) id = None name = None url = None metalink = None + mirrorlist = None additional = [] log.debug("Repository file line: {line}".format(line=line.rstrip())) @@ -55,17 +62,22 @@ def extract_repodata(repofile: str) -> typing.Iterable[typing.Tuple[str, str, st url = val elif field == "metalink": metalink = val + elif field == "mirrorlist": + mirrorlist = val else: additional.append(line) - yield (id, name, url, metalink, additional) + yield (id, name, url, metalink, mirrorlist, additional) -def write_repodata(repofile: str, id: str, name: str, url: str, metalink: str, additional: typing.List[str]) -> None: +def write_repodata(repofile: str, id: str, name: str, url: str, metalink: str, mirrorlist: str, additional: typing.List[str]) -> None: repo_format = REPO_HEAD_WITH_URL - if url is None: + if url is None and metalink is not None: url = metalink repo_format = REPO_HEAD_WITH_METALINK + if url is None and mirrorlist is not None: + url = mirrorlist + repo_format = REPO_HEAD_WITH_MIRRORLIST with open(repofile, "a") as dst: dst.write(repo_format.format(id=id, name=name, url=url)) @@ -73,16 +85,16 @@ def write_repodata(repofile: str, id: str, name: str, url: str, metalink: str, a dst.write(line) -def remove_repositories(repofile: str, conditions: typing.Callable[[str, str, str, str], bool]) -> None: - for id, name, url, metalink, additional_lines in extract_repodata(repofile): +def remove_repositories(repofile: str, conditions: typing.Callable[[str, str, str, str, str], bool]) -> None: + for id, name, url, metalink, mirrorlist, additional_lines in extract_repodata(repofile): remove = False for condition in conditions: - if condition(id, name, url, metalink): + if condition(id, name, url, metalink, mirrorlist): remove = True break if not remove: - write_repodata(repofile + ".next", id, name, url, metalink, additional_lines) + write_repodata(repofile + ".next", id, name, url, metalink, mirrorlist, additional_lines) if os.path.exists(repofile + ".next"): shutil.move(repofile + ".next", repofile) diff --git a/tests/rpmtests.py b/tests/rpmtests.py index d52d019..b39da6c 100644 --- a/tests/rpmtests.py +++ b/tests/rpmtests.py @@ -50,7 +50,7 @@ def test_remove_first_repo(self): gpgcheck=0 """ - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo1"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3, _4: id == "repo1"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content) @@ -63,19 +63,19 @@ def test_remove_multiple_repos(self): gpgcheck=0 """ - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo1", - lambda id, _1, _2, _3: id == "repo2"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3, _4: id == "repo1", + lambda id, _1, _2, _3, _4: id == "repo2"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content) def test_remove_all_repos(self): - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo1", - lambda id, _1, _2, _3: id == "repo2", - lambda id, _1, _2, _3: id == "repo3"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3, _4: id == "repo1", + lambda id, _1, _2, _3, _4: id == "repo2", + lambda id, _1, _2, _3, _4: id == "repo3"]) self.assertEqual(os.path.exists(self.REPO_FILE_NAME), False) def test_remove_non_existing_repo(self): - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo4"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3, _4: id == "repo4"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), self.REPO_FILE_CONTENT) @@ -94,7 +94,7 @@ def test_remove_last_repo(self): """ - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3: id == "repo3"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, _2, _3, _4: id == "repo3"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content) @@ -126,7 +126,7 @@ def test_remove_repo_with_metalink(self): with open(self.REPO_FILE_NAME, "a") as f: f.write(additional_repo) - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, _2, _3, metalink: metalink == "http://metarepo"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, _2, _3, metalink, _4: metalink == "http://metarepo"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content) @@ -143,7 +143,7 @@ def test_remove_repo_with_specific_name(self): enabled=1 gpgcheck=0 """ - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, name, _2, _3: name == "repo2"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, name, _2, _3, _4: name == "repo2"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content) @@ -160,7 +160,7 @@ def test_remove_repo_with_specific_baseurl(self): enabled=1 gpgcheck=0 """ - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, _2, baseurl, _3: baseurl == "http://repo2"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, _2, baseurl, _3, _4: baseurl == "http://repo2"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content) @@ -172,7 +172,39 @@ def test_remove_repo_by_id_or_url(self): gpgcheck=0 """ - rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, baseurl, _3: id == "repo2" or baseurl == "http://repo3"]) + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda id, _1, baseurl, _3, _4: id == "repo2" or baseurl == "http://repo3"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_remove_repo_by_mirrorlist(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo2] +name=repo2 +baseurl=http://repo2 +enabled=1 +gpgcheck=0 + +[repo3] +name=repo3 +baseurl=http://repo3 +enabled=1 +gpgcheck=0 +""" + additional_repo = """[mirrorrepo] +name=mirrorrepo +mirrorlist=http://mirrorrepo +enabled=1 +gpgcheck=0 +""" + with open(self.REPO_FILE_NAME, "a") as f: + f.write(additional_repo) + + rpm.remove_repositories(self.REPO_FILE_NAME, [lambda _1, _2, _3, _4, mirrorlist: mirrorlist == "http://mirrorrepo"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content) @@ -209,7 +241,7 @@ def test_write_repodata(self): enabled=1 gpgcheck=0 """ - rpm.write_repodata(self.REPO_FILE_NAME, "repo2", "repo2", "http://repo2", None, ["enabled=1\n", "gpgcheck=0\n"]) + rpm.write_repodata(self.REPO_FILE_NAME, "repo2", "repo2", "http://repo2", None, None, ["enabled=1\n", "gpgcheck=0\n"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content) @@ -227,7 +259,41 @@ def test_write_exsisted_repodata(self): enabled=1 gpgcheck=0 """ - rpm.write_repodata(self.REPO_FILE_NAME, "repo1", "repo1", "http://repo1", None, ["enabled=1\n", "gpgcheck=0\n"]) + rpm.write_repodata(self.REPO_FILE_NAME, "repo1", "repo1", "http://repo1", None, None, ["enabled=1\n", "gpgcheck=0\n"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_write_repodata_with_metalink(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo2] +name=repo2 +metalink=http://repo2 +enabled=1 +gpgcheck=0 +""" + rpm.write_repodata(self.REPO_FILE_NAME, "repo2", "repo2", None, "http://repo2", None, ["enabled=1\n", "gpgcheck=0\n"]) + with open(self.REPO_FILE_NAME) as file: + self.assertEqual(file.read(), expected_content) + + def test_write_repodata_with_mirrorlist(self): + expected_content = """[repo1] +name=repo1 +baseurl=http://repo1 +enabled=1 +gpgcheck=0 + +[repo2] +name=repo2 +mirrorlist=http://repo2 +enabled=1 +gpgcheck=0 +""" + rpm.write_repodata(self.REPO_FILE_NAME, "repo2", "repo2", None, None, "http://repo2", ["enabled=1\n", "gpgcheck=0\n"]) with open(self.REPO_FILE_NAME) as file: self.assertEqual(file.read(), expected_content)