From 3d6278c7651faae96725df9b551aeb2f68544ab2 Mon Sep 17 00:00:00 2001 From: Yuxiang Zhu Date: Fri, 24 Feb 2023 16:25:46 +0800 Subject: [PATCH 1/2] New signing mechanism This PR adds a new signing mechanism to let promote-assembly job directly send signing requests through UMB. This eliminates the intermediate step that pulling artifacts from the internet. --- jobs/build/promote-assembly/Jenkinsfile | 39 +-- pipeline-scripts/buildlib.groovy | 3 +- pyartcd/pyartcd/constants.py | 8 +- pyartcd/pyartcd/exceptions.py | 6 + pyartcd/pyartcd/locks.py | 6 + pyartcd/pyartcd/pipelines/promote.py | 265 ++++++++++---- pyartcd/pyartcd/runtime.py | 6 +- pyartcd/pyartcd/s3.py | 3 +- pyartcd/pyartcd/signatory.py | 222 ++++++++++++ pyartcd/pyartcd/umb_client.py | 438 ++++++++++++++++++++++++ pyartcd/pyartcd/util.py | 38 +- pyartcd/requirements.txt | 12 +- pyartcd/tests/pipelines/test_promote.py | 45 ++- pyartcd/tests/test_signatory.py | 204 +++++++++++ pyartcd/tests/test_umb_client.py | 130 +++++++ 15 files changed, 1310 insertions(+), 115 deletions(-) create mode 100644 pyartcd/pyartcd/signatory.py create mode 100644 pyartcd/pyartcd/umb_client.py create mode 100644 pyartcd/tests/test_signatory.py create mode 100644 pyartcd/tests/test_umb_client.py diff --git a/jobs/build/promote-assembly/Jenkinsfile b/jobs/build/promote-assembly/Jenkinsfile index d6103ff9f7..01d1d75c45 100644 --- a/jobs/build/promote-assembly/Jenkinsfile +++ b/jobs/build/promote-assembly/Jenkinsfile @@ -155,6 +155,9 @@ node { if (params.SKIP_MIRROR_BINARIES) { cmd << "--skip-mirror-binaries" } + if (params.SKIP_SIGNING) { + cmd << "--skip-signing" + } if (params.NO_MULTI) { cmd << "--no-multi" } @@ -167,7 +170,10 @@ node { // because 4.11 heterogeneous is tech preview. cmd << "--use-multi-hack" } + def signing_env = params.DRY_RUN? "stage": "prod" + cmd << "--signing-env=${signing_env}" echo "Will run ${cmd}" + def siging_cert_id = signing_env == "prod" ? "0xffe138e-openshift-art-bot" : "0xffe138d-nonprod-openshift-art-bot" buildlib.withAppCiAsArtPublish() { withCredentials([[$class: 'UsernamePasswordMultiBinding', credentialsId: 'creds_dev_registry.quay.io', usernameVariable: 'QUAY_USERNAME', passwordVariable: 'QUAY_PASSWORD'], aws(credentialsId: 's3-art-srv-enterprise', accessKeyVariable: 'AWS_ACCESS_KEY_ID', secretKeyVariable: 'AWS_SECRET_ACCESS_KEY'), @@ -175,7 +181,13 @@ node { string(credentialsId: 'jboss-jira-token', variable: 'JIRA_TOKEN'), string(credentialsId: 'jenkins-service-account', variable: 'JENKINS_SERVICE_ACCOUNT'), string(credentialsId: 'jenkins-service-account-token', variable: 'JENKINS_SERVICE_ACCOUNT_TOKEN'), - string(credentialsId: 'openshift-bot-token', variable: 'GITHUB_TOKEN')]) { + string(credentialsId: 'openshift-bot-token', variable: 'GITHUB_TOKEN'), + file(credentialsId: "${siging_cert_id}.crt", variable: 'SIGNING_CERT'), + file(credentialsId: "${siging_cert_id}.key", variable: 'SIGNING_KEY'), + string(credentialsId: 'redis-server-password', variable: 'REDIS_SERVER_PASSWORD'), + string(credentialsId: 'redis-host', variable: 'REDIS_HOST'), + string(credentialsId: 'redis-port', variable: 'REDIS_PORT'), + ]) { def out = sh(script: cmd.join(' '), returnStdout: true).trim() echo "artcd returns:\n$out" release_info = readJSON(text: out) @@ -233,31 +245,6 @@ node { */ } - stage("sign artifacts") { - if (params.SKIP_SIGNING) { - echo "Signing artifacts is skipped." - return - } - release_info.content.each { arch, info -> - def payloadDigest = release_info.content[arch].digest - // Currently a multi/heterogeneous release payload has a modified release name to workaround a Cincinnati issue. - // Using the real per-arch release name in $info instead of the one defined by release artists. - def release_name = info.metadata.version - echo "Signing $arch" - release.signArtifacts( - name: release_name, - signature_name: "signature-1", - dry_run: params.DRY_RUN, - env: "prod", - key_name: release_info.client_type=='ocp'?"redhatrelease2":"beta2", - arch: arch, - digest: payloadDigest, - client_type: release_info.client_type, - product: "openshift", - ) - } - } - stage("send release message") { if (release_info.type == "custom") { echo "Don't send release messages for a custom release." diff --git a/pipeline-scripts/buildlib.groovy b/pipeline-scripts/buildlib.groovy index 4fd7948155..3100b236e6 100644 --- a/pipeline-scripts/buildlib.groovy +++ b/pipeline-scripts/buildlib.groovy @@ -61,7 +61,8 @@ def proxy_setup() { 'cdn.quay.io', 'cdn01.quay.io', 'cdn02.quay.io', - 'cdn03.quay.io' + 'cdn03.quay.io', + "api.redhat.com", ] env.https_proxy = proxy diff --git a/pyartcd/pyartcd/constants.py b/pyartcd/pyartcd/constants.py index 1b1b74ad01..a73a45947e 100644 --- a/pyartcd/pyartcd/constants.py +++ b/pyartcd/pyartcd/constants.py @@ -27,7 +27,6 @@ } OCP_BUILD_DATA_URL = 'https://github.com/openshift-eng/ocp-build-data' -QUAY_RELEASE_REPO_URL = "quay.io/openshift-release-dev/ocp-release" # This is the URL that buildvm itself uses to resolve Jenkins # It shall be used by jenkinsapi to start new builds @@ -38,3 +37,10 @@ JENKINS_UI_URL = 'https://saml.buildvm.hosts.prod.psi.bos.redhat.com:8888' MIRROR_BASE_URL = 'https://mirror.openshift.com' + +UMB_BROKERS = { + "prod": "stomp+ssl://umb.api.redhat.com:61612", + "stage": "stomp+ssl://umb.stage.api.redhat.com:61612", + "qa": "stomp+ssl://umb.qa.api.redhat.com:61612", + "dev": "stomp+ssl://umb.dev.api.redhat.com:61612", +} diff --git a/pyartcd/pyartcd/exceptions.py b/pyartcd/pyartcd/exceptions.py index 0b03d98d37..ab0f79ccfc 100644 --- a/pyartcd/pyartcd/exceptions.py +++ b/pyartcd/pyartcd/exceptions.py @@ -1,2 +1,8 @@ class VerificationError(ValueError): pass + + +class SignatoryServerError(IOError): + """ Represents an error generated by remote Signatory server + """ + pass diff --git a/pyartcd/pyartcd/locks.py b/pyartcd/pyartcd/locks.py index baf48e9d84..e3464ea02c 100644 --- a/pyartcd/pyartcd/locks.py +++ b/pyartcd/pyartcd/locks.py @@ -9,6 +9,12 @@ # - the sleep interval between two consecutive retries, in seconds # - a timeout, after which the lock will expire and clear itself LOCK_POLICY = { + # default policy: give up after 1 hour + 'default': { + 'retry_count': 36000, + 'retry_delay_min': 0.1, + 'lock_timeout': 60 * 60 * 1, # 1 hours + }, # olm-bundle: give up after 1 hour 'olm_bundle': { 'retry_count': 36000, diff --git a/pyartcd/pyartcd/pipelines/promote.py b/pyartcd/pyartcd/pipelines/promote.py index da6b0459be..39178172db 100644 --- a/pyartcd/pyartcd/pipelines/promote.py +++ b/pyartcd/pyartcd/pipelines/promote.py @@ -7,7 +7,7 @@ import traceback from collections import OrderedDict from pathlib import Path -from typing import Dict, Iterable, List, Optional +from typing import Dict, Iterable, List, Optional, Union from urllib.parse import quote import aiohttp @@ -16,12 +16,13 @@ import hashlib import shutil import urllib.parse +from pyartcd.signatory import AsyncSignatory import requests # from pyartcd.cincinnati import CincinnatiAPI from doozerlib import assembly from doozerlib.util import (brew_arch_for_go_arch, brew_suffix_for_arch, go_arch_for_brew_arch, go_suffix_for_arch) -from pyartcd import constants, exectools, util, jenkins +from pyartcd import constants, exectools, locks, util, jenkins from pyartcd.cli import cli, click_coroutine, pass_runtime from pyartcd.exceptions import VerificationError from pyartcd.jira import JIRAClient @@ -47,10 +48,13 @@ def __init__(self, runtime: Runtime, group: str, assembly: str, skip_attached_bug_check: bool = False, skip_image_list: bool = False, skip_build_microshift: bool = False, + skip_signing: bool = False, permit_overwrite: bool = False, no_multi: bool = False, multi_only: bool = False, skip_mirror_binaries: bool = False, - use_multi_hack: bool = False) -> None: + use_multi_hack: bool = False, + signing_env: Optional[str] = None, + ) -> None: self.runtime = runtime self.group = group self.assembly = assembly @@ -59,6 +63,7 @@ def __init__(self, runtime: Runtime, group: str, assembly: str, self.skip_image_list = skip_image_list self.skip_build_microshift = skip_build_microshift self.skip_mirror_binaries = skip_mirror_binaries + self.skip_signing = skip_signing self.permit_overwrite = permit_overwrite if multi_only and no_multi: @@ -67,6 +72,9 @@ def __init__(self, runtime: Runtime, group: str, assembly: str, self.multi_only = multi_only self.use_multi_hack = use_multi_hack self._multi_enabled = False + if not self.skip_signing and not signing_env: + raise ValueError("--signing-env is required unless --skip-signing is set") + self.signing_env = signing_env self._logger = self.runtime.logger self._slack_client = self.runtime.new_slack_client() @@ -87,8 +95,29 @@ def __init__(self, runtime: Runtime, group: str, assembly: str, self._elliott_env_vars["ELLIOTT_DATA_PATH"] = self._ocp_build_data_url self._doozer_env_vars["DOOZER_DATA_PATH"] = self._ocp_build_data_url + def check_environment_variables(self): + logger = self.runtime.logger + + required_vars = ["GITHUB_TOKEN", "JIRA_TOKEN", "QUAY_PASSWORD"] + if not self.skip_mirror_binaries and not self.skip_signing: + required_vars += ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] + if not self.skip_signing: + required_vars += ["SIGNING_CERT", "SIGNING_KEY", "REDIS_SERVER_PASSWORD", "REDIS_HOST", "REDIS_PORT"] + if not self.skip_build_microshift: + required_vars += ["JENKINS_SERVICE_ACCOUNT", "JENKINS_SERVICE_ACCOUNT_TOKEN"] + + for env_var in required_vars: + if not os.environ.get(env_var): + msg = f"Environment variable {env_var} is not set." + if not self.runtime.dry_run: + raise ValueError(msg) + else: + logger.warning(msg) + async def run(self): logger = self.runtime.logger + # Check if all required environment variables are set + self.check_environment_variables() # Load group config and releases.yml logger.info("Loading build data...") @@ -98,7 +127,7 @@ async def run(self): data_path=self._doozer_env_vars.get("DOOZER_DATA_PATH", None) or constants.OCP_BUILD_DATA_URL ) if releases_config.get("releases", {}).get(self.assembly) is None: - raise ValueError(f"To promote this release, assembly {self.assembly} must be explictly defined in releases.yml.") + raise ValueError(f"To promote this release, assembly {self.assembly} must be explicitly defined in releases.yml.") permits = util.get_assembly_promotion_permits(releases_config, self.assembly) # Get release name @@ -224,21 +253,18 @@ async def run(self): reference_releases = util.get_assembly_basis(releases_config, self.assembly).get("reference_releases", {}) tag_stable = assembly_type in [assembly.AssemblyTypes.STANDARD, assembly.AssemblyTypes.CANDIDATE, assembly.AssemblyTypes.PREVIEW] release_infos = await self.promote(assembly_type, release_name, arches, previous_list, metadata, reference_releases, tag_stable) - self._logger.info("All release images for %s have been promoted.", release_name) + pullspecs = {arch: release_info["image"] for arch, release_info in release_infos.items()} + pullspecs_repr = ", ".join(f"{arch}: {pullspecs[arch]}" for arch in sorted(pullspecs.keys())) + self._logger.info("All release images for %s have been promoted. Pullspecs: %s", release_name, pullspecs_repr) # Before waiting for release images to be accepted by release controllers, # we can start microshift build await self._build_microshift(releases_config) - # Wait for payloads to be accepted by release controllers - pullspecs = {arch: release_info["image"] for arch, release_info in release_infos.items()} - pullspecs_repr = ", ".join(f"{arch}: {pullspecs[arch]}" for arch in sorted(pullspecs.keys())) if not tag_stable: self._logger.warning("Release %s will not appear on release controllers. Pullspecs: %s", release_name, pullspecs_repr) await self._slack_client.say(f"Release {release_name} is ready. It will not appear on the release controllers. Please tell the user to manually pull the release images: {pullspecs_repr}", slack_thread) - else: # Wait for release images to be accepted by the release controllers - self._logger.info("All release images for %s have been successfully promoted. Pullspecs: %s", release_name, pullspecs_repr) - + else: # check if release is already accepted (in case we timeout and run the job again) tasks = [] for arch, release_info in release_infos.items(): @@ -250,6 +276,7 @@ async def run(self): accepted = await asyncio.gather(*tasks) if not all(accepted): + # Wait for release images to be accepted by the release controllers self._logger.info("Waiting for release images for %s to be accepted by the release controller...", release_name) await self._slack_client.say(f"Release {release_name} has been tagged on release controller, but is not accepted yet. Waiting.", slack_thread) tasks = [] @@ -298,6 +325,16 @@ async def run(self): self._jira_client.assign_to_me(subtask) self._jira_client.close_task(subtask) + # extract client binaries + client_type = "ocp" + if (assembly_type == assembly.AssemblyTypes.CANDIDATE and not self.assembly.startswith('rc.')) or assembly_type in [assembly.AssemblyTypes.CUSTOM, assembly.AssemblyTypes.PREVIEW]: + client_type = "ocp-dev-preview" + message_digests = [] + if not self.skip_mirror_binaries: + message_digests = await self.extract_and_publish_clients(client_type, release_infos) + if not self.skip_signing: + await self.sign_artifacts(release_name, client_type, release_infos, message_digests) + except Exception as err: self._logger.exception(err) error_message = f"Error promoting release {release_name}: {err}\n {traceback.format_exc()}" @@ -344,25 +381,6 @@ async def run(self): if rhcos: rhcos_version = rhcos["annotations"]["io.openshift.build.versions"].split("=")[1] # machine-os=48.84.202112162302-0 => 48.84.202112162302-0 data["content"][arch]["rhcos_version"] = rhcos_version - - client_type = "ocp" - if (assembly_type == assembly.AssemblyTypes.CANDIDATE and not self.assembly.startswith('rc.')) or assembly_type in [assembly.AssemblyTypes.CUSTOM, assembly.AssemblyTypes.PREVIEW]: - client_type = "ocp-dev-preview" - data['client_type'] = client_type - # mirror binaries - if not self.skip_mirror_binaries: - # make sure login to quay - cmd = ["docker", "login", "-u", "openshift-release-dev+art_quay_dev", "-p", f"{os.environ['QUAY_PASSWORD']}", "quay.io"] - await exectools.cmd_assert_async(cmd, env=os.environ.copy(), stdout=sys.stderr) - for arch in data['content']: - logger.info(f"Mirroring client binaries for {arch}") - if self.runtime.dry_run: - logger.info(f"[DRY RUN] Would have sync'd client binaries for {constants.QUAY_RELEASE_REPO_URL}:{release_name}-{arch} to mirror {arch}/clients/{client_type}/{release_name}.") - else: - if arch != "multi": - await self.publish_client(self._working_dir, f"{release_name}-{arch}", data["content"][arch]['metadata']['version'], arch, client_type) - else: - await self.publish_multi_client(self._working_dir, f"{release_name}-{arch}", data["content"][arch]['metadata']['version'], data['content'], client_type) # sync rhcos await self.sync_rhcos_srpms(assembly_type, data) json.dump(data, sys.stdout) @@ -381,7 +399,8 @@ async def sync_rhcos_srpms(self, assembly_type, data): # Sync potential pre-release source on which RHCOS depends. See ART-6419 for details. major, minor = util.isolate_major_minor_in_group(self.group) if assembly_type in [assembly.AssemblyTypes.CANDIDATE, assembly.AssemblyTypes.PREVIEW]: - src_output_dir = f"{self._working_dir}/rhcos_src_staging" + src_output_dir = self._working_dir / "rhcos_src_staging" + src_output_dir.mkdir(parents=True, exist_ok=True) for arch in data['content']: if arch != "multi": cmd = [ @@ -391,11 +410,11 @@ async def sync_rhcos_srpms(self, assembly_type, data): "config:rhcos-srpms", "--version", data["content"][arch]['rhcos_version'], "--arch", arch, - "-o", src_output_dir, + "-o", str(src_output_dir), ] await exectools.cmd_assert_async(cmd, env=self._doozer_env_vars) # Publish the clients to our S3 bucket. - await sync_dir_to_s3_mirror(src_output_dir, "/pub/openshift-v4/sources/packages/", "", "", False, False) + await sync_dir_to_s3_mirror(str(src_output_dir), "/pub/openshift-v4/sources/packages/", "", "", dry_run=self.runtime.dry_run, remove_old=False) else: self._logger.info("Skipping sync srpms of rhcos") @@ -409,11 +428,133 @@ def _reraise_if_not_permitted(self, err: VerificationError, code: str, permits: self._logger.warn("Issue %s is permitted with justification: %s", err, justification) return justification - async def publish_client(self, working_dir, from_release_tag, release_name, build_arch, client_type): - _, minor = util.isolate_major_minor_in_group(self.group) - quay_url = constants.QUAY_RELEASE_REPO_URL + async def extract_and_publish_clients(self, client_type: str, release_infos: Dict): + logger = self._logger + # make sure login to quay + if "QUAY_PASSWORD" in os.environ: + cmd = ["docker", "login", "-u", "openshift-release-dev+art_quay_dev", "-p", f"{os.environ['QUAY_PASSWORD']}", "quay.io"] + await exectools.cmd_assert_async(cmd, env=os.environ.copy(), stdout=sys.stderr) + base_to_mirror_dir = f"{self._working_dir}/to_mirror/openshift-v4" + message_digests = [] + for arch, release_info in release_infos.items(): + logger.info("Extracting client binaries for %s", arch) + pullspec = release_info["image"] + release_name = release_info["metadata"]["version"] + if arch == "multi": + manifest_arches = [brew_arch_for_go_arch(manifest["platform"]["architecture"]) for manifest in release_info.get("manifests", [])] + message_digest = await self.publish_multi_client(base_to_mirror_dir, pullspec, release_name, manifest_arches, client_type) + else: + message_digest = await self.publish_client(base_to_mirror_dir, pullspec, release_name, arch, client_type) + message_digests.append(message_digest) + return message_digests + + async def sign_artifacts(self, release_name: str, client_type: str, release_infos: Dict, message_digests: List[str]): + """ Signs artifacts and publishes signature files to mirror + """ + if not self.signing_env: + raise ValueError("--signing-env is missing") + cert_file = os.environ["SIGNING_CERT"] + key_file = os.environ["SIGNING_KEY"] + uri = constants.UMB_BROKERS[self.signing_env] + sig_keyname = "redhatrelease2" if client_type == 'ocp' else "beta2" + self._logger.info("About to sign artifacts with key %s", sig_keyname) + json_digest_sig_dir = self._working_dir / "json_digests" + message_digest_sig_dir = self._working_dir / "message_digests" + base_to_mirror_dir = self._working_dir / "to_mirror/openshift-v4" + + lock_name = f'signing-lock-{self.signing_env}' + lock_policy = locks.LOCK_POLICY['default'] + lock_manager = locks.new_lock_manager( + internal_lock_timeout=lock_policy['lock_timeout'], + retry_count=lock_policy['retry_count'], + retry_delay_min=lock_policy['retry_delay_min'] + ) + async with await lock_manager.lock(lock_name): + async with AsyncSignatory(uri, cert_file, key_file, sig_keyname=sig_keyname) as signatory: + tasks = [] + for release_info in release_infos.values(): + pullspec = release_info["image"] + digest = release_info["digest"] + sig_file = json_digest_sig_dir / f"{digest.replace(':', '=')}" / "signature-1" + tasks.append(self._sign_json_digest(signatory, release_info["metadata"]["version"], pullspec, digest, sig_file)) + for message_digest in message_digests: + input_path = base_to_mirror_dir / message_digest + if not input_path.is_file(): + raise IOError(f"Message digest file {input_path} doesn't exist or is not a regular file") + sig_file = message_digest_sig_dir / f"{message_digest}.gpg" + tasks.append(self._sign_message_digest(signatory, release_name, input_path, sig_file)) + await asyncio.gather(*tasks) + + self._logger.info("All artifacts have been successfully signed.") + self._logger.info("Publishing signatures...") + tasks = [ + self._publish_json_digest_signatures(json_digest_sig_dir), + self._publish_message_digest_signatures(message_digest_sig_dir), + ] + await asyncio.gather(*tasks) + self._logger.info("All signatures have been published.") + + async def _sign_json_digest(self, signatory: AsyncSignatory, release_name: str, pullspec: str, digest: str, sig_path: Path): + """ Sign a JSON digest claim + :param signatory: Signatory + :param pullspec: Pullspec of the payload + :param digest: SHA256 digest of the payload + :param sig_path: Where to save the signature file + """ + self._logger.info("Signing json digest for payload %s with digest %s...", pullspec, digest) + if self.runtime.dry_run: + self._logger.warning("[DRY RUN] Would have signed the requested artifact.") + return + sig_path.parent.mkdir(parents=True, exist_ok=True) + with open(sig_path, "wb") as sig_file: + await signatory.sign_json_digest( + product="openshift", + release_name=release_name, + pullspec=pullspec, + digest=digest, + sig_file=sig_file) + + async def _sign_message_digest(self, signatory: AsyncSignatory, release_name, input_path: Path, sig_path: Path): + """ Sign a message digest + :param signatory: Signatory + :param input_path: Path to the message digest file + :param sig_path: Where to save the signature file + """ + self._logger.info("Signing message digest file %s...", input_path.absolute()) + if self.runtime.dry_run: + self._logger.warning("[DRY RUN] Would have signed the requested artifact.") + return + sig_path.parent.mkdir(parents=True, exist_ok=True) + with open(input_path, "rb") as in_file, open(sig_path, "wb") as sig_file: + await signatory.sign_message_digest( + product="openshift", + release_name=release_name, + artifact=in_file, + sig_file=sig_file) + + async def _publish_json_digest_signatures(self, local_dir: Union[str, Path], env: str = "prod"): + tasks = [] + # mirror to S3 + mirror_release_path = "release" if env == "prod" else "test" + tasks.append(util.mirror_to_s3(local_dir, f"s3://art-srv-enterprise/pub/openshift-v4/signatures/openshift/{mirror_release_path}/", exclude="*", include="sha256=*", dry_run=self.runtime.dry_run)) + if mirror_release_path == "release": + tasks.append(util.mirror_to_s3(local_dir, "s3://art-srv-enterprise/pub/openshift-v4/signatures/openshift-release-dev/ocp-release/", exclude="*", include="sha256=*", dry_run=self.runtime.dry_run)) + tasks.append(util.mirror_to_s3(local_dir, "s3://art-srv-enterprise/pub/openshift-v4/signatures/openshift-release-dev/ocp-release-nightly/", exclude="*", include="sha256=*", dry_run=self.runtime.dry_run)) + + # mirror to google storage + google_storage_path = "official" if env == "prod" else "test-1" + tasks.append(util.mirror_to_google_cloud(f"{local_dir}/*", f"gs://openshift-release/{google_storage_path}/signatures/openshift/release", dry_run=self.runtime.dry_run)) + tasks.append(util.mirror_to_google_cloud(f"{local_dir}/*", f"gs://openshift-release/{google_storage_path}/signatures/openshift-release-dev/ocp-release", dry_run=self.runtime.dry_run)) + tasks.append(util.mirror_to_google_cloud(f"{local_dir}/*", f"gs://openshift-release/{google_storage_path}/signatures/openshift-release-dev/ocp-release-nightly", dry_run=self.runtime.dry_run)) + + await asyncio.gather(*tasks) + + async def _publish_message_digest_signatures(self, local_dir: Union[str, Path]): + # mirror to S3 + await util.mirror_to_s3(local_dir, "s3://art-srv-enterprise/pub/openshift-v4/", exclude="*", include="*/sha256sum.txt.gpg", dry_run=self.runtime.dry_run) + + async def publish_client(self, base_to_mirror_dir: str, pullspec, release_name, build_arch, client_type): # Anything under this directory will be sync'd to the mirror - base_to_mirror_dir = f"{working_dir}/to_mirror/openshift-v4" shutil.rmtree(f"{base_to_mirror_dir}/{build_arch}", ignore_errors=True) # From the newly built release, extract the client tools into the workspace following the directory structure @@ -422,11 +563,11 @@ async def publish_client(self, working_dir, from_release_tag, release_name, buil os.makedirs(client_mirror_dir) # extract release clients tools - extract_release_client_tools(f"{quay_url}:{from_release_tag}", f"--to={client_mirror_dir}", None) + extract_release_client_tools(pullspec, f"--to={client_mirror_dir}", None) # Get cli installer operator-registry pull-spec from the release for release_component_tag_name, source_name in constants.MIRROR_CLIENTS.items(): - image_stat, cli_pull_spec = get_release_image_pullspec(f"{quay_url}:{from_release_tag}", release_component_tag_name) + image_stat, cli_pull_spec = get_release_image_pullspec(pullspec, release_component_tag_name) if image_stat == 0: # image exists _, image_info = get_release_image_info_from_pullspec(cli_pull_spec) # Retrieve the commit from image info @@ -435,14 +576,14 @@ async def publish_client(self, working_dir, from_release_tag, release_name, buil # URL to download the tarball a specific commit response = requests.get(f"{source_url}/archive/{commit}.tar.gz", stream=True) if response.ok: - with open(f"{client_mirror_dir}/{source_name}-src-{from_release_tag}.tar.gz", "wb") as f: + with open(f"{client_mirror_dir}/{source_name}-src-{release_name}-{build_arch}.tar.gz", "wb") as f: f.write(response.raw.read()) # calc shasum - with open(f"{client_mirror_dir}/{source_name}-src-{from_release_tag}.tar.gz", 'rb') as f: + with open(f"{client_mirror_dir}/{source_name}-src-{release_name}-{build_arch}.tar.gz", 'rb') as f: shasum = hashlib.sha256(f.read()).hexdigest() # write shasum to sha256sum.txt with open(f"{client_mirror_dir}/sha256sum.txt", 'a') as f: - f.write(f"{shasum} {source_name}-src-{from_release_tag}.tar.gz\n") + f.write(f"{shasum} {source_name}-src-{release_name}-{build_arch}.tar.gz\n") else: response.raise_for_status() else: @@ -450,7 +591,7 @@ async def publish_client(self, working_dir, from_release_tag, release_name, buil # ART-7207 - upload baremetal installer binary to mirror if build_arch == 'x86_64': - self.publish_baremetal_installer_binary(from_release_tag, client_mirror_dir) + self.publish_baremetal_installer_binary(pullspec, client_mirror_dir) # Starting from 4.14, oc-mirror will be synced for all arches. See ART-6820 and ART-6863 # oc-mirror was introduced in 4.10, so skip for <= 4.9. @@ -458,7 +599,7 @@ async def publish_client(self, working_dir, from_release_tag, release_name, buil if (major > 4 or minor >= 14) or (major == 4 and minor >= 10 and build_arch == 'x86_64'): # oc image extract requires an empty destination directory. So do this before extracting tools. # oc adm release extract --tools does not require an empty directory. - image_stat, oc_mirror_pullspec = get_release_image_pullspec(f"{quay_url}:{from_release_tag}", "oc-mirror") + image_stat, oc_mirror_pullspec = get_release_image_pullspec(pullspec, "oc-mirror") if image_stat == 0: # image exist # extract image to workdir, if failed it will raise error in function extract_release_binary(oc_mirror_pullspec, [f"--path=/usr/bin/oc-mirror:{client_mirror_dir}"]) @@ -480,18 +621,17 @@ async def publish_client(self, working_dir, from_release_tag, release_name, buil self.create_symlink(client_mirror_dir, False, False) # extract opm binaries - _, operator_registry = get_release_image_pullspec(f"{quay_url}:{from_release_tag}", "operator-registry") + _, operator_registry = get_release_image_pullspec(pullspec, "operator-registry") self.extract_opm(client_mirror_dir, release_name, operator_registry, build_arch) util.log_dir_tree(client_mirror_dir) # print dir tree util.log_file_content(f"{client_mirror_dir}/sha256sum.txt") # print sha256sum.txt # Publish the clients to our S3 bucket. - await exectools.cmd_assert_async(f"aws s3 sync --no-progress --exact-timestamps {base_to_mirror_dir}/{build_arch} s3://art-srv-enterprise/pub/openshift-v4/{build_arch}", stdout=sys.stderr) + await util.mirror_to_s3(f"{base_to_mirror_dir}/{build_arch}", f"s3://art-srv-enterprise/pub/openshift-v4/{build_arch}", dry_run=self.runtime.dry_run) + return f"{build_arch}/clients/{client_type}/{release_name}/sha256sum.txt" - def publish_baremetal_installer_binary(self, from_release_tag: str, client_mirror_dir: str): - # Get baremetal image pullspec - release_pullspec = f'{constants.QUAY_RELEASE_REPO_URL}:{from_release_tag}' + def publish_baremetal_installer_binary(self, release_pullspec: str, client_mirror_dir: str): _, baremetal_installer_pullspec = get_release_image_pullspec(release_pullspec, 'baremetal-installer') self._logger.info('baremetal-installer pullspec: %s', baremetal_installer_pullspec) @@ -542,21 +682,18 @@ def extract_opm(self, client_mirror_dir, release_name, operator_registry, arch): with open(f"{client_mirror_dir}/sha256sum.txt", 'a') as f: # write shasum to sha256sum.txt f.write(f"{shasum} opm-{platform}-{release_name}.tar.gz\n") - async def publish_multi_client(self, working_dir, from_release_tag, release_name, arch_list, client_type): + async def publish_multi_client(self, base_to_mirror_dir: str, pullspec: str, release_name, arch_list, client_type): # Anything under this directory will be sync'd to the mirror - base_to_mirror_dir = f"{working_dir}/to_mirror/openshift-v4" shutil.rmtree(f"{base_to_mirror_dir}/multi", ignore_errors=True) release_mirror_dir = f"{base_to_mirror_dir}/multi/clients/{client_type}/{release_name}" - - for go_arch in [go_arch_for_brew_arch(arch) for arch in arch_list]: - if go_arch == "multi": - continue + for arch in arch_list: + go_arch = go_arch_for_brew_arch(arch) # From the newly built release, extract the client tools into the workspace following the directory structure # we expect to publish to mirror client_mirror_dir = f"{release_mirror_dir}/{go_arch}" os.makedirs(client_mirror_dir) # extract release clients tools - extract_release_client_tools(f"{constants.QUAY_RELEASE_REPO_URL}:{from_release_tag}", f"--to={client_mirror_dir}", go_arch) + extract_release_client_tools(pullspec, f"--to={client_mirror_dir}", go_arch) # create symlink for clients self.create_symlink(path_to_dir=client_mirror_dir, log_tree=True, log_shasum=True) @@ -575,7 +712,8 @@ async def publish_multi_client(self, working_dir, from_release_tag, release_name util.log_dir_tree(release_mirror_dir) # Publish the clients to our S3 bucket. - await exectools.cmd_assert_async(f"aws s3 sync --no-progress --exact-timestamps {base_to_mirror_dir}/multi s3://art-srv-enterprise/pub/openshift-v4/multi", stdout=sys.stderr) + await util.mirror_to_s3(f"{base_to_mirror_dir}/multi", "s3://art-srv-enterprise/pub/openshift-v4/multi", dry_run=self.runtime.dry_run) + return f"multi/clients/{client_type}/{release_name}/sha256sum.txt" def create_symlink(self, path_to_dir, log_tree, log_shasum): # External consumers want a link they can rely on.. e.g. .../latest/openshift-client-linux.tgz . @@ -947,7 +1085,6 @@ async def _promote_heterogeneous_payload(self, assembly_type: assembly.AssemblyT self._logger.info("Pushing manifest list...") await self.push_manifest_list(release_name, dest_manifest_list) self._logger.info("Heterogeneous release payload for %s has been built. Manifest list pullspec is %s", release_name, dest_image_pullspec) - self._logger.info("Getting release image information for %s...", dest_image_pullspec) # Get info of the pushed manifest list self._logger.info("Getting release image information for %s...", dest_image_pullspec) @@ -1230,26 +1367,34 @@ async def send_image_list_email(self, release_name: str, advisory: int, archive_ help="Do not gather an advisory image list for docs.") @click.option("--skip-build-microshift", is_flag=True, help="Do not build microshift rpm") +@click.option("--skip-signing", is_flag=True, + help="Do not sign artifacts") @click.option("--permit-overwrite", is_flag=True, help="DANGER! Allows the pipeline to overwrite an existing payload.") @click.option("--no-multi", is_flag=True, help="Do not promote a multi-arch/heterogeneous payload.") @click.option("--multi-only", is_flag=True, help="Do not promote arch-specific homogenous payloads.") @click.option("--skip-mirror-binaries", is_flag=True, help="Do not mirror client binaries to mirror") @click.option("--use-multi-hack", is_flag=True, help="Add '-multi' to heterogeneous payload name to workaround a Cincinnati issue") +@click.option("--signing-env", type=click.Choice(("prod", "stage")), + help="Signing server environment: prod or stage") @pass_runtime @click_coroutine async def promote(runtime: Runtime, group: str, assembly: str, skip_blocker_bug_check: bool, skip_attached_bug_check: bool, skip_image_list: bool, skip_build_microshift: bool, + skip_signing: bool, permit_overwrite: bool, no_multi: bool, multi_only: bool, skip_mirror_binaries: bool, - use_multi_hack: bool): + use_multi_hack: bool, + signing_env: Optional[str]): pipeline = PromotePipeline(runtime, group, assembly, skip_blocker_bug_check, skip_attached_bug_check, skip_image_list, skip_build_microshift, + skip_signing, permit_overwrite, no_multi, multi_only, skip_mirror_binaries, - use_multi_hack) + use_multi_hack, + signing_env) await pipeline.run() diff --git a/pyartcd/pyartcd/runtime.py b/pyartcd/pyartcd/runtime.py index bc589a6c0a..1a8ba25e3b 100644 --- a/pyartcd/pyartcd/runtime.py +++ b/pyartcd/pyartcd/runtime.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Any, Dict, Optional -import toml +import tomli from pyartcd.jira import JIRAClient from pyartcd.mail import MailService @@ -33,8 +33,8 @@ def init_logger(): @classmethod def from_config_file(cls, config_filename: Path, working_dir: Path, dry_run: bool): - with open(config_filename, "r") as config_file: - config_dict = toml.load(config_file) + with open(config_filename, "rb") as config_file: + config_dict = tomli.load(config_file) return Runtime(config=config_dict, working_dir=working_dir, dry_run=dry_run) def new_jira_client(self, jira_token: Optional[str] = None): diff --git a/pyartcd/pyartcd/s3.py b/pyartcd/pyartcd/s3.py index 4323d2a3dc..a1a6972701 100644 --- a/pyartcd/pyartcd/s3.py +++ b/pyartcd/pyartcd/s3.py @@ -1,4 +1,5 @@ import os +from typing import Optional from pyartcd import exectools from tenacity import retry, stop_after_attempt, wait_fixed @@ -19,7 +20,7 @@ async def sync_repo_to_s3_mirror(local_dir: str, s3_path: str, dry_run: bool = F await sync_dir_to_s3_mirror(local_dir, s3_path, exclude='', include_only='', dry_run=dry_run, remove_old=True) -async def sync_dir_to_s3_mirror(local_dir: str, s3_path: str, exclude: str, include_only: str, +async def sync_dir_to_s3_mirror(local_dir: str, s3_path: str, exclude: Optional[str] = None, include_only: Optional[str] = None, dry_run: bool = False, remove_old: bool = True): """ Sync a directory to an s3 bucket. diff --git a/pyartcd/pyartcd/signatory.py b/pyartcd/pyartcd/signatory.py new file mode 100644 index 0000000000..b714e7358c --- /dev/null +++ b/pyartcd/pyartcd/signatory.py @@ -0,0 +1,222 @@ + +import asyncio +import base64 +import io +import json +import logging +import uuid +from datetime import datetime, timedelta +from typing import BinaryIO, Dict, cast + +import aiofiles +from cryptography import x509 +from cryptography.x509.oid import NameOID + +from pyartcd.exceptions import SignatoryServerError +from pyartcd.umb_client import AsyncUMBClient + +_LOGGER = logging.getLogger(__name__) + + +class AsyncSignatory: + """ + AsyncSignatory can sign OCP artifacts by sending a signing request to RADAS over UMB. + + Example usage: + ``` + uri = "stomp+ssl://umb.stage.api.redhat.com:61612" + cert_file = "ssl/nonprod-openshift-art-bot.crt" + key_file = "ssl/nonprod-openshift-art-bot.key" + async with AsyncSignatory(uri, cert_file, key_file, sig_keyname="beta2", requestor="yuxzhu") as signatory: + pullspec = "quay.io/openshift-release-dev/ocp-release:4.11.31-x86_64" + digest = "sha256:cc10900ad98b44ba432bc0d99e7d4fffb5498fd6844fc3b6a0a3552ee6d64059" + # sign a release payload + with open("signature-1", "wb") as sig_file: + await signatory.sign_json_digest("openshift", "4.11.31", pullspec, digest, sig_file) + # sign a message digest + with open("sha256sum.txt", "rb") as in_file, open("sha256sum.txt.gpg", "wb") as sig_file: + await signatory.sign_message_digest("openshift", "4.11.31", in_file, sig_file) + ``` + """ + + SEND_DESTINATION = '/topic/VirtualTopic.eng.art.artifact.sign' + CONSUMER_QUEUE_TEMPLATE = "/queue/Consumer.{service_account}.{subscription}.VirtualTopic.eng.robosignatory.art.sign" + + def __init__( + self, + uri: str, + cert_file: str, + key_file: str, + sig_keyname="test", + requestor="timer", + subscription_name="artcd", + ): + self.cert_file = cert_file + self.sig_keyname = sig_keyname + self.requestor = requestor + self.subscription_name = subscription_name + self._umb = AsyncUMBClient(uri, cert_file, key_file) + self._receiver = None + self._receiver_task = None + self._requests: Dict[str, asyncio.Future] = {} + self._loop = asyncio.get_event_loop() + + async def __aenter__(self): + await self.start() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + @staticmethod + async def _get_certificate_common_name(cert_file: str): + """ Get common name for the specified certificate file + """ + async with aiofiles.open(cert_file, "rb") as f: + cert = x509.load_pem_x509_certificate(await f.read()) + return cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + + async def start(self): + # Get service account name embedded in the client certificate + service_account = await self._get_certificate_common_name(self.cert_file) + _LOGGER.info("Using UMB service account: %s", service_account) + # Connect to UMB + await self._umb.connect() + # Subscribe to the consumer queue + # e.g. /queue/Consumer.openshift-art-bot.artcd.VirtualTopic.eng.robosignatory.art.sign + consumer_queue = self.CONSUMER_QUEUE_TEMPLATE.format_map({ + "service_account": service_account, + "subscription": self.subscription_name + }) + self._receiver = await self._umb.subscribe(consumer_queue, self.subscription_name) + # Start a task to handle messages received from the consumer queue + self._receiver_task = asyncio.create_task(self._handle_messages()) + + async def close(self): + """ Closes connection to UMB + """ + await self._umb.close() + # self._receiver_task will stop until receives EOF or it was garbage collected + self._receiver_task = None + self._receiver = None + + async def _handle_messages(self): + """ Handles messages received from the consumer queue + """ + receiver = self._receiver + assert receiver, "start() was not called" + async for message in receiver.iter_messages(): + message_id = str(message.headers["message-id"]) + timestamp = int(message.headers["timestamp"]) / 1000 + try: + age = datetime.utcnow() - datetime.utcfromtimestamp(timestamp) + if age >= timedelta(hours=1): + _LOGGER.warning("Discarding stale message {}".format(message_id)) + await self._umb.ack(message_id, receiver.id) # discard the message + continue + body = json.loads(str(message.body)) + request_id = body["msg"]["request_id"] + fut = self._requests.get(request_id) + if not fut: + _LOGGER.warning("Unknown request_id %s in message %s", request_id, message_id) + continue + fut.set_result((message.headers, body)) + await self._umb.ack(message_id, receiver.id) # consume the message + except Exception: + _LOGGER.exception("Error handling message %s", message_id) + _LOGGER.info("_handle_message: exited") + + async def _sign_artifact( + self, + typ: str, + product: str, + release_name: str, + name: str, + artifact: BinaryIO, + sig_file: BinaryIO, + ): + """ Signs an artifact + """ + # Create a signing request + # Example request: https://datagrepper.stage.engineering.redhat.com/id?id=ID:umb-stage-3.umb-001.preprod.us-east-1.aws.redhat.com-38533-1689629292398-10:23520:-1:1:1&is_raw=true&size=extra-large + artifact_base64 = io.BytesIO() + base64.encode(artifact, artifact_base64) + request_id = ( + f'{product}-{typ}-{datetime.utcnow().strftime("%Y%m%d%H%M%S")}-{uuid.uuid4()}' + ) + message = { + "artifact": artifact_base64.getvalue().decode(), + "artifact_meta": { + "product": product, + "release_name": release_name, + "name": name, + "type": typ, + }, + "request_id": request_id, + "requestor": self.requestor, + "sig_keyname": self.sig_keyname, + } + request_body = json.dumps(message) + + # Send the signing request via UMB + fut = self._loop.create_future() + self._requests[request_id] = fut + try: + await self._umb.send(self.SEND_DESTINATION, request_body) + _, response_body = await fut + finally: + del self._requests[request_id] + + # example response: https://datagrepper.stage.engineering.redhat.com/id?id=2019-0304004b-d1e6-4e03-b28d-cfa1e5f59948&is_raw=true&size=extra-large + if response_body["msg"]["signing_status"] != "success": + err = ", ".join(response_body["msg"]["errors"]) + raise SignatoryServerError(f"Robo Signatory declined: {err}") + input = io.BytesIO(response_body["msg"]["signed_artifact"].encode()) + base64.decode(input, sig_file) + artifact_meta = cast(Dict[str, str], response_body["msg"]["artifact_meta"]) + return artifact_meta + + async def sign_json_digest( + self, product: str, release_name: str, pullspec: str, digest: str, sig_file: BinaryIO + ): + """ Sign a JSON digest claim + """ + json_claim = { + "critical": { + "image": {"docker-manifest-digest": digest}, + "type": "atomic container signature", + "identity": { + "docker-reference": pullspec, + }, + }, + "optional": { + "creator": "Red Hat OpenShift Signing Authority 0.0.1", + }, + } + artifact = io.BytesIO(json.dumps(json_claim).encode()) + name = digest.replace(":", "=") + signature_meta = await self._sign_artifact( + typ="json-digest", + product=product, + release_name=release_name, + name=name, + artifact=artifact, + sig_file=sig_file, + ) + return signature_meta + + async def sign_message_digest( + self, product: str, release_name: str, artifact: BinaryIO, sig_file: BinaryIO + ): + """ Sign a message digest + """ + name = "sha256sum.txt.gpg" + signature_meta = await self._sign_artifact( + typ="message-digest", + product=product, + release_name=release_name, + name=name, + artifact=artifact, + sig_file=sig_file, + ) + return signature_meta diff --git a/pyartcd/pyartcd/umb_client.py b/pyartcd/pyartcd/umb_client.py new file mode 100644 index 0000000000..9cb7ed706f --- /dev/null +++ b/pyartcd/pyartcd/umb_client.py @@ -0,0 +1,438 @@ +import asyncio +import logging +import threading +import uuid +from typing import Any, Callable, Dict, Optional +from urllib.parse import urlparse +import warnings +import weakref + +import stomp +import stomp.utils + +_LOGGER = logging.getLogger(__name__) +STOMP_DEFAULT_PORT = 61613 + + +def parse_stomp_uri(uri: str): + """ Parses a stomp URI. + Examples: + stomp://umb.stage.api.redhat.com:61612 + stomp+tcp://umb.stage.api.redhat.com:61612 + stomp+ssl://umb.stage.api.redhat.com:61612 + :param uri: the URI to parse + :return: (scheme, hostname, port) + """ + parsed = urlparse(uri) + if parsed.scheme not in {"stomp", "stomp+tcp", "stomp+ssl"}: + raise ValueError(f"Unsupported scheme {parsed.scheme}") + if not parsed.hostname: + raise ValueError("Hostname is required") + return parsed.scheme, parsed.hostname, parsed.port or STOMP_DEFAULT_PORT + + +def parse_broker_uri(uri: str): + """ Parses a broker URI or a failover URI. + Examples: + stomp+ssl://umb.stage.api.redhat.com:61612 + failover:(stomp+ssl://umb-broker03.api.redhat.com:61612,stomp+ssl://umb-broker04.api.redhat.com:61612) + :param uri: the URI to parse + :return: a list of tuples in format of (scheme, hostname, port) + """ + parsed = urlparse(uri) + if parsed.scheme == "failover": + # URI uses Failover transport https://activemq.apache.org/failover-transport-reference.html + if not parsed.path: + raise ValueError(f"Incomplete URI: {uri}") + if parsed.path[0] == "(" and parsed.path[-1] == ")": # URI looks like "failover:(,,...)" + uris = parsed.path[1:-1].split(",") + else: # URI looks like "failover:,,..." + uris = parsed.path.split(",") + return [parse_stomp_uri(uri) for uri in uris] + return [parse_stomp_uri(uri)] + + +class AsyncUMBReceiver: + """ Provides an interface to iterate received messages from the subscription. + + Example: + ``` + receiver = await umb_client.subscribe(consumer_queue, subscription_name) + async for message in receiver.iter_messages(): + print(message.headers["message-id"]) + ``` + """ + + def __init__(self, id: str) -> None: + self.id = id + self._queue: asyncio.Queue[Optional[stomp.utils.Frame]] = asyncio.Queue() + self._loop = asyncio.get_event_loop() + self.closed = False + + async def iter_messages(self): + while True: + r = await self._queue.get() + if r is None: # None represents "EOF" + return + yield r + + def put_frame(self, frame: stomp.utils.Frame): + if self.closed: + raise ValueError("AsyncUMBReceiver is closed") + self._queue.put_nowait(frame) + + def close(self): + if self.closed: + return + self.closed = True + self._queue.put_nowait(None) # None represents "EOF" + + def close_threadsafe(self): + """ close() is not thread-safe. + This method can be called from another thread + to schedule the event loop in the main thread to close it soon. + """ + self._loop.call_soon_threadsafe(self.close) + + +class UMBClientConnectionListener(stomp.ConnectionListener): + """ This class is used internally by AsyncUMBClient to handle stomp.py events. + + stomp.py calls "on_*" methods from a separate thread when an event occurs. + Be careful to avoid race condition! + + """ + + def __init__(self, client: "AsyncUMBClient", print_to_log=True): + self._client = client + self._futures: Dict[str, asyncio.Future] = {} + self._receivers: Dict[str, AsyncUMBReceiver] = {} + self.print_to_log = print_to_log + + def add_future(self, id: str, fut: asyncio.Future): + if id in self._futures: + raise KeyError(f"Future ID {id} already exists") + self._futures[id] = fut + + def remove_future(self, id: str): + old = self._futures.pop(id, None) + if old and not old.done(): + old.cancel() + + def add_receiver(self, id: str, receiver: AsyncUMBReceiver): + if id in self._receivers: + raise KeyError(f"Receiver ID {id} already exists") + self._receivers[id] = receiver + + def remove_receiver(self, id: str, close=True): + old = self._receivers.pop(id, None) + if old and close: + old.close() + + def __print(self, msg, *args): + if self.print_to_log: + logging.debug(msg, *args) + else: + print(msg % args) + + def on_connecting(self, host_and_port): + """ + :param (str,int) host_and_port: + """ + self.__print("on_connecting %s %s", *host_and_port) + + def on_connected(self, frame): + """ + :param Frame frame: the stomp frame + """ + self.__print("on_connected %s %s", frame.headers, frame.body) + self._complete_future("on_connected", None) + + def _complete_future(self, id: str, result: Any): + fut = self._futures.get(id) + if not fut: + return + fut.get_loop().call_soon_threadsafe(fut.set_result, result) + + def _err_future(self, id: str, err: Exception): + fut = self._futures.get(id) + if not fut: + return + fut.get_loop().call_soon_threadsafe(fut.set_exception, err) + + def on_disconnected(self): + self.__print("on_disconnected") + # close all receivers + for _, r in self._receivers.items(): + r.close_threadsafe() + # notify UMB client + self._client.on_disconnected() + # notify all pending futures of the disconnection + for id in self._futures.keys(): + if id == "on_disconnected": + self._complete_future("on_disconnected", None) + else: + self._err_future(id, IOError("Connection lost")) + + def on_heartbeat_timeout(self): + self.__print("on_heartbeat_timeout") + + def on_before_message(self, frame): + """ + :param Frame frame: the stomp frame + """ + self.__print("on_before_message %s %s", frame.headers, frame.body) + + def on_message(self, frame): + """ + :param Frame frame: the stomp frame + """ + self.__print("on_message %s %s", frame.headers, frame.body) + subscription = frame.headers.get("subscription") + if subscription: + receiver = self._receivers.get(subscription) + if receiver: + receiver._loop.call_soon_threadsafe(receiver.put_frame, frame) + + def on_receipt(self, frame): + """ + :param Frame frame: the stomp frame + """ + self.__print("on_receipt %s %s", frame.headers, frame.body) + receipt = frame.headers.get("receipt-id") + if receipt: + self._complete_future(receipt, frame) + + def on_error(self, frame): + """ + :param Frame frame: the stomp frame + """ + self.__print("on_error %s %s", frame.headers, frame.body) + + def on_send(self, frame): + """ + :param Frame frame: the stomp frame + """ + self.__print("on_send %s %s %s", frame.cmd, frame.headers, frame.body) + + def on_heartbeat(self): + self.__print("on_heartbeat") + + +class AsyncUMBClient: + """ + AsyncUMBClient provides a simpler interface to send and receive messages + through UMB (Universal Message Bus). + For more information about UMB, see https://source.redhat.com/groups/public/enterprise-services-platform/it_platform_wiki/umb_appendix#queues-topics-and-virtualtopics. + + Example: + ``` + uri = "stomp+ssl://umb.stage.api.redhat.com:61612" + cert_file = "ssl/nonprod-openshift-art-bot.crt" + key_file = "ssl/nonprod-openshift-art-bot.key" + umb_client = AsyncUMBClient(uri, cert_file, key_file) + await umb_client.connect() + # send a message + await umb_client.send(topic, request_body) + # receive messages + receiver = await umb_client.subscribe(consumer_queue, subscription_name) + async for message in receiver.iter_messages(): + print(message.headers["message-id"]) + # disconnect + await umb_client.disconnect() + + # or use context manager + async with AsyncUMBClient(uri, cert_file, key_file) as umb_client: + await umb_client.send(topic, request_body) + receiver = await umb_client.subscribe(consumer_queue, subscription_name) + async for message in receiver.iter_messages(): + print(message.headers["message-id"]) + ``` + """ + + def __init__( + self, + uri: str, + cert_file: Optional[str] = None, + key_file: Optional[str] = None + ): + conn = self._create_connection(uri, cert_file, key_file) + self._listener = UMBClientConnectionListener(self) + conn.set_listener("", self._listener) + self._conn = conn + self._main_loop = asyncio.get_event_loop() + self._sender_loop = None + self._sender_thread = None + + async def close(self): + await self.disconnect() + + async def __aenter__(self): + await self.connect() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + def __del__(self): + if self.connected: + warnings.warn( + f"Unclosed UMB connection {self!r}", ResourceWarning) + + @property + def connected(self): + """ Returns True if already connected + """ + return self._conn.is_connected() + + def on_disconnected(self): + """ Handles disconnection. + This method is called by the listener from another thread. + """ + def _handle_disconnected(): + if self._sender_loop: + sender_loop = self._sender_loop + sender_loop.call_soon_threadsafe(sender_loop.stop) # This will cause the sender thread to exit + self._sender_loop = None + self._sender_thread = None + self._main_loop.call_soon_threadsafe(_handle_disconnected) + + @staticmethod + def _sender_thread_func(loop: asyncio.AbstractEventLoop): + """ Thread function of the sender thread + :param loop: a loop to be associated with the thread + """ + # Start an event loop and run until loop.stop() is called + asyncio.set_event_loop(loop) + loop.run_forever() + + @staticmethod + def _create_connection( + uri: str, + cert_file: Optional[str] = None, + key_file: Optional[str] = None + ): + """ Creates and configures a stomp connection + """ + parsed_uris = parse_broker_uri(uri) + host_and_ports = [(hostname, port) for _, hostname, port in parsed_uris] + conn = stomp.StompConnection11(host_and_ports=host_and_ports) # UMB supports Stomp v1.1 protocol + ssl_host_and_ports = [(hostname, port) for scheme, hostname, port in parsed_uris if scheme == "stomp+ssl"] + if ssl_host_and_ports: + conn.set_ssl(for_hosts=ssl_host_and_ports, cert_file=cert_file, key_file=key_file) + return conn + + async def _call_in_sender_thread(self, func: Callable): + """ Calls a function in the sender thread (thread-safe) + :param func: the function to call + :return: return value of the function call + """ + if not self._sender_loop: + raise IOError("Not connected") + fut = self._main_loop.create_future() + + def callback(): + try: + result = func() + fut.get_loop().call_soon_threadsafe(fut.set_result, result) + except Exception as ex: + fut.get_loop().call_soon_threadsafe(fut.set_exception, ex) + self._sender_loop.call_soon_threadsafe(callback) + return await fut + + async def connect(self): + if self.connected: + raise IOError("Already connected") + _LOGGER.info("Connecting to message bus...") + if not self._sender_loop: + self._sender_loop = asyncio.new_event_loop() + self._sender_thread = threading.Thread(target=self._sender_thread_func, args=(self._sender_loop, ), daemon=True) + self._sender_thread.start() + receipt = str(uuid.uuid4()) + fut = self._main_loop.create_future() + self._listener.add_future("on_connected", fut) + try: + await self._call_in_sender_thread(lambda: self._conn.connect(wait=False, headers={"receipt": receipt})) + await fut + finally: + self._listener.remove_future("on_connected") + _LOGGER.info("Connected") + + async def disconnect(self): + """ Disconnect from the message broker and wait for the receipt + """ + if not self.connected: + return + _LOGGER.info("Disconnecting from message bus...") + receipt = str(uuid.uuid4()) + fut = self._main_loop.create_future() + self._listener.add_future("on_disconnected", fut) + try: + await self._call_in_sender_thread(lambda: self._conn.disconnect(receipt=receipt)) + await fut + finally: + self._listener.remove_future("on_disconnected") + _LOGGER.info("Disconnected") + + async def subscribe(self, destination: str, id: str): + """ Subscribe to a destination + :param destination: a queue or topic + :param id: subscription ID + :return: an instance of AsyncUMBReceiver for receiving messages for the subscription + """ + subscription_id = id or str(uuid.uuid4()) + receiver = AsyncUMBReceiver(id=subscription_id) + self._listener.add_receiver(subscription_id, receiver) + await self._call_in_sender_thread(lambda: self._conn.subscribe(destination=destination, id=subscription_id, ack="client-individual")) + return receiver + + async def unsubscribe(self, id: str): + """ Unsubscribes from the queue or topic + :param id: subscription ID + """ + receiver = self._listener._receivers.get(id) + if not receiver: + raise ValueError(f"Subscription '{id}' doesn't exist") + await self._call_in_sender_thread(lambda: self._conn.unsubscribe(id=id)) + self._listener.remove_receiver(id, close=True) + + async def send(self, destination: str, body: str): + """ Sends a message to the broker and wait for the receipt + """ + receipt = str(uuid.uuid4()) + _LOGGER.debug("Sending message %s to %s...", body, destination) + fut = self._main_loop.create_future() + self._listener.add_future(receipt, fut) + try: + await self._call_in_sender_thread(lambda: self._conn.send( + body=body, + destination=destination, + headers={"receipt": receipt}, + )) + await fut + finally: + self._listener.remove_future(receipt) + + async def ack(self, message_id: str, subscription: str): + """ Acknowledges 'consumption' of a message by id. + """ + receipt = str(uuid.uuid4()) + fut = self._main_loop.create_future() + self._listener.add_future(receipt, fut) + try: + await self._call_in_sender_thread(lambda: self._conn.ack(message_id, subscription, receipt=receipt)) + await fut + finally: + self._listener.remove_future(receipt) + + async def nack(self, message_id: str, subscription: str): + """ Notifies the message broker that a message was not consumed. + """ + receipt = str(uuid.uuid4()) + fut = self._main_loop.create_future() + self._listener.add_future(receipt, fut) + try: + await self._call_in_sender_thread(lambda: self._conn.nack(message_id, subscription, receipt=receipt)) + await fut + finally: + self._listener.remove_future(receipt) diff --git a/pyartcd/pyartcd/util.py b/pyartcd/pyartcd/util.py index d443c23bc7..6a9770ecc9 100644 --- a/pyartcd/pyartcd/util.py +++ b/pyartcd/pyartcd/util.py @@ -2,10 +2,11 @@ import os import re import shutil +import sys import tempfile from datetime import datetime from pathlib import Path -from typing import Dict, Optional, Tuple +from typing import Dict, Optional, Tuple, Union import aiofiles import yaml @@ -297,7 +298,7 @@ async def is_build_permitted(version: str, data_path: str = constants.OCP_BUILD_ # Check for frozen scheduled automation if freeze_automation == "scheduled" and not is_manual_build(): logger.info('Only manual runs are permitted according to freeze_automation in group.yml ' - 'and this run appears to be non-manual.') + 'and this run appears to be non-manual.') return False # Check if group can run on weekends @@ -567,9 +568,9 @@ def mail_build_failure_owners(failed_builds: dict, doozer_working: str, mail_cli container_log = "Unfortunately there were no container build logs; " \ "something else about the build failed." logger.warning('No container build log for failed %s build\n' - '(task url %s)\n' - 'at path %s', - failure['distgit'], failure['task_url'], container_log) + '(task url %s)\n' + 'at path %s', + failure['distgit'], failure['task_url'], container_log) explanation_body = f"ART's brew/OSBS build of OCP image {failure['image']}:{failure['version']} has failed.\n\n" if failure['owners']: @@ -590,3 +591,30 @@ def mail_build_failure_owners(failed_builds: dict, doozer_working: str, mail_cli subject=f'Failed OCP build of {failure["image"]}:{failure["version"]}', content=explanation_body ) + + +async def mirror_to_s3(source: Union[str, Path], dest: str, exclude: Optional[str] = None, include: Optional[str] = None, dry_run=False): + """ + Copy to AWS S3 + """ + cmd = ["aws", "s3", "sync", "--no-progress", "--exact-timestamps"] + if exclude is not None: + cmd.append(f"--exclude={exclude}") + if include is not None: + cmd.append(f"--include={include}") + if dry_run: + cmd.append("--dryrun") + cmd.extend(["--", f"{source}", f"{dest}"]) + await exectools.cmd_assert_async(cmd, env=os.environ.copy(), stdout=sys.stderr) + + +async def mirror_to_google_cloud(source: Union[str, Path], dest: str, dry_run=False): + """ + Copy to Google Cloud + """ + # -n - no clobber/overwrite; -v - print url of item; -L - write to log for auto re-processing; -r - recursive + cmd = ["gsutil", "cp", "-n", "-v", "-r", "--", f"{source}", f"{dest}"] + if dry_run: + logger.warning("[DRY RUN] Would have run %s", cmd) + return + await exectools.cmd_assert_async(cmd, env=os.environ.copy(), stdout=sys.stderr) diff --git a/pyartcd/requirements.txt b/pyartcd/requirements.txt index 41e29a4519..93d198c8bd 100644 --- a/pyartcd/requirements.txt +++ b/pyartcd/requirements.txt @@ -1,18 +1,22 @@ -aiohttp[speedups] >= 3.6 +Jinja2 aiofiles +aiohttp[speedups] >= 3.6 +aiohttp_retry aioredlock >= 0.7.3 click contextvars errata-tool ~= 1.31.0 +cryptography ghapi jenkinsapi >= 0.3.13 # https://github.com/pycontribs/jenkinsapi/issues/833 -Jinja2 jira >= 3.4.1 # https://github.com/pycontribs/jira/issues/1486 openshift-client pygit2 == 1.10.1 # https://github.com/libgit2/pygit2/issues/1176 +rh-doozer +rh-elliott ruamel.yaml semver >= 2.13.0 slack_sdk >= 3.13.0 +stomp.py ~= 8.1.0 tenacity -aiohttp_retry -toml +tomli ~= 2.0.1 diff --git a/pyartcd/tests/pipelines/test_promote.py b/pyartcd/tests/pipelines/test_promote.py index fd0848fa18..638abb76c1 100644 --- a/pyartcd/tests/pipelines/test_promote.py +++ b/pyartcd/tests/pipelines/test_promote.py @@ -95,6 +95,22 @@ class TestPromotePipeline(IsolatedAsyncioTestCase): ] } + def setUp(self) -> None: + os.environ.update({ + "GITHUB_TOKEN": "fake-github-token", + "JIRA_TOKEN": "fake-jira-token", + "QUAY_PASSWORD": "fake-quay-password", + "SIGNING_CERT": "/path/to/signing.crt", + "SIGNING_KEY": "/path/to/signing.key", + "REDIS_SERVER_PASSWORD": "fake-redis-server-password", + "REDIS_HOST": "fake-redis-host", + "REDIS_PORT": "12345", + "JENKINS_SERVICE_ACCOUNT": "fake-jenkins-service-account", + "JENKINS_SERVICE_ACCOUNT_TOKEN": "fake-jenkins-service-account-token", + "AWS_ACCESS_KEY_ID": "fake-aws-access-key-id", + "AWS_SECRET_ACCESS_KEY": "fake-aws-crecret-access-key", + }) + @patch("pyartcd.jira.JIRAClient.from_url", return_value=None) @patch("pyartcd.pipelines.promote.util.load_releases_config", return_value={}) @patch("pyartcd.pipelines.promote.util.load_group_config", return_value=dict(arches=["x86_64", "s390x"])) @@ -112,8 +128,8 @@ async def test_run_without_explicit_assembly_definition( working_dir=Path("/path/to/working"), dry_run=False ) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99") - with self.assertRaisesRegex(ValueError, "must be explictly defined"): + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", signing_env="prod") + with self.assertRaisesRegex(ValueError, "must be explicitly defined"): await pipeline.run() load_group_config.assert_awaited_once_with("openshift-4.10", "4.10.99", env=ANY) load_releases_config.assert_awaited_once_with( @@ -137,7 +153,7 @@ async def test_run_with_stream_assembly(self, load_group_config: AsyncMock, load working_dir=Path("/path/to/working"), dry_run=False ) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="stream") + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="stream", signing_env="prod") with self.assertRaisesRegex(ValueError, "not supported"): await pipeline.run() load_group_config.assert_awaited_once_with("openshift-4.10", "stream", env=ANY) @@ -164,7 +180,7 @@ async def test_run_with_custom_assembly_and_missing_release_offset( dry_run=False, new_slack_client=MagicMock(return_value=AsyncMock()) ) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="art0001") + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="art0001", signing_env="prod") with self.assertRaisesRegex(ValueError, "patch_version is not set"): await pipeline.run() load_group_config.assert_awaited_once_with("openshift-4.10", "art0001", env=ANY) @@ -216,7 +232,7 @@ async def test_run_with_custom_assembly(self, get_image_stream: AsyncMock, load_ runtime.new_slack_client.return_value.bind_channel = MagicMock() pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="art0001", - skip_attached_bug_check=True, skip_mirror_binaries=True) + skip_attached_bug_check=True, skip_mirror_binaries=True, signing_env="prod") await pipeline.run() load_group_config.assert_awaited_once_with("openshift-4.10", "art0001", env=ANY) @@ -258,13 +274,14 @@ async def test_run_with_standard_assembly_without_upgrade_edges(self, load_group runtime.new_slack_client.return_value = AsyncMock() runtime.new_slack_client.return_value.say.return_value = {'message': {'ts': ''}} runtime.new_slack_client.return_value.bind_channel = MagicMock() - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99") + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", signing_env="prod") with self.assertRaisesRegex(ValueError, "missing the required `upgrades` field"): await pipeline.run() load_group_config.assert_awaited_once_with("openshift-4.10", "4.10.99", env=ANY) load_releases_config.assert_awaited_once_with(group='openshift-4.10', data_path='https://example.com/ocp-build-data.git') + @patch("pyartcd.pipelines.promote.PromotePipeline.sign_artifacts") @patch("pyartcd.jira.JIRAClient.from_url", return_value=None) @patch("pyartcd.pipelines.promote.PromotePipeline.build_release_image", return_value=None) @patch("pyartcd.pipelines.promote.get_release_image_info", side_effect=lambda pullspec, raise_if_not_found=False: { @@ -301,7 +318,7 @@ async def test_run_with_standard_assembly_without_upgrade_edges(self, load_group @patch("pyartcd.pipelines.promote.PromotePipeline.get_image_stream") async def test_run_with_standard_assembly(self, get_image_stream: AsyncMock, load_group_config: AsyncMock, load_releases_config: AsyncMock, get_release_image_info: AsyncMock, - build_release_image: AsyncMock, _): + build_release_image: AsyncMock, _, __): runtime = MagicMock( config={ "build_config": { @@ -318,7 +335,7 @@ async def test_run_with_standard_assembly(self, get_image_stream: AsyncMock, loa runtime.new_slack_client.return_value.say.return_value = {'message': {'ts': ''}} runtime.new_slack_client.return_value.bind_channel = MagicMock() pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", - skip_mirror_binaries=True) + skip_mirror_binaries=True, signing_env="prod") pipeline.check_blocker_bugs = AsyncMock() pipeline.change_advisory_state = AsyncMock() pipeline.get_advisory_info = AsyncMock(return_value={ @@ -396,7 +413,7 @@ async def test_promote_arch(self, get_image_stream: AsyncMock, get_release_image working_dir=Path("/path/to/working"), dry_run=False ) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99") + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", signing_env="prod") previous_list = ["4.10.98", "4.10.97", "4.9.99"] metadata = {"description": "whatever", "url": "https://access.redhat.com/errata/RHBA-2099:2222"} @@ -479,7 +496,7 @@ async def test_build_release_image_from_reference_release(self, cmd_assert_async working_dir=Path("/path/to/working"), dry_run=False ) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99") + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", signing_env="prod") previous_list = ["4.10.98", "4.10.97", "4.9.99"] metadata = {"description": "whatever", "url": "https://access.redhat.com/errata/RHBA-2099:2222"} @@ -512,7 +529,7 @@ async def test_build_release_image_from_image_stream(self, cmd_assert_async: Asy runtime = MagicMock(config={"build_config": {"ocp_build_data_url": "https://example.com/ocp-build-data.git"}, "jira": {"url": "https://issues.redhat.com/"}}, working_dir=Path("/path/to/working"), dry_run=False) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99") + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", signing_env="prod") previous_list = ["4.10.98", "4.10.97", "4.9.99"] metadata = {"description": "whatever", "url": "https://access.redhat.com/errata/RHBA-2099:2222"} @@ -572,7 +589,7 @@ async def test_promote_heterogeneous_payload(self, get_image_digest: AsyncMock, working_dir=Path("/path/to/working"), dry_run=False ) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99") + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", signing_env="prod") previous_list = ["4.10.98", "4.10.97", "4.9.99"] metadata = {"description": "whatever", "url": "https://access.redhat.com/errata/RHBA-2099:2222"} @@ -757,7 +774,7 @@ async def test_build_release_image_from_heterogeneous_image_stream( working_dir=Path("/path/to/working"), dry_run=False ) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", use_multi_hack=True) + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", use_multi_hack=True, signing_env="prod") previous_list = ["4.10.98", "4.10.97", "4.9.99"] metadata = {"description": "whatever", "url": "https://access.redhat.com/errata/RHBA-2099:2222"} @@ -870,7 +887,7 @@ def test_build_create_symlink(self, _): working_dir=Path("/path/to/working"), dry_run=False ) - pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99") + pipeline = PromotePipeline(runtime, group="openshift-4.10", assembly="4.10.99", signing_env="prod") temp_dir = tempfile.mkdtemp() os.chdir(temp_dir) open("openshift-client-linux-4.3.0-0.nightly-2019-12-06-161135.tar.gz", "w").close() diff --git a/pyartcd/tests/test_signatory.py b/pyartcd/tests/test_signatory.py new file mode 100644 index 0000000000..3ab1f4e5af --- /dev/null +++ b/pyartcd/tests/test_signatory.py @@ -0,0 +1,204 @@ +import asyncio +import base64 +from datetime import datetime, timezone +from io import BytesIO +import json +from unittest import IsolatedAsyncioTestCase, TestCase +from unittest.mock import ANY, AsyncMock, MagicMock, patch +from pyartcd.signatory import AsyncSignatory + + +class TestAsyncSignatory(IsolatedAsyncioTestCase): + @patch("aiofiles.open", autospec=True) + async def test_get_certificate_common_name(self, open: AsyncMock): + # Well, this is the content of "Red Hat IT Root CA" + open.return_value.__aenter__.return_value.read.return_value = b""" +-----BEGIN CERTIFICATE----- +MIIENDCCAxygAwIBAgIJANunI0D662cnMA0GCSqGSIb3DQEBCwUAMIGlMQswCQYD +VQQGEwJVUzEXMBUGA1UECAwOTm9ydGggQ2Fyb2xpbmExEDAOBgNVBAcMB1JhbGVp +Z2gxFjAUBgNVBAoMDVJlZCBIYXQsIEluYy4xEzARBgNVBAsMClJlZCBIYXQgSVQx +GzAZBgNVBAMMElJlZCBIYXQgSVQgUm9vdCBDQTEhMB8GCSqGSIb3DQEJARYSaW5m +b3NlY0ByZWRoYXQuY29tMCAXDTE1MDcwNjE3MzgxMVoYDzIwNTUwNjI2MTczODEx +WjCBpTELMAkGA1UEBhMCVVMxFzAVBgNVBAgMDk5vcnRoIENhcm9saW5hMRAwDgYD +VQQHDAdSYWxlaWdoMRYwFAYDVQQKDA1SZWQgSGF0LCBJbmMuMRMwEQYDVQQLDApS +ZWQgSGF0IElUMRswGQYDVQQDDBJSZWQgSGF0IElUIFJvb3QgQ0ExITAfBgkqhkiG +9w0BCQEWEmluZm9zZWNAcmVkaGF0LmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEP +ADCCAQoCggEBALQt9OJQh6GC5LT1g80qNh0u50BQ4sZ/yZ8aETxt+5lnPVX6MHKz +bfwI6nO1aMG6j9bSw+6UUyPBHP796+FT/pTS+K0wsDV7c9XvHoxJBJJU38cdLkI2 +c/i7lDqTfTcfLL2nyUBd2fQDk1B0fxrskhGIIZ3ifP1Ps4ltTkv8hRSob3VtNqSo +GxkKfvD2PKjTPxDPWYyruy9irLZioMffi3i/gCut0ZWtAyO3MVH5qWF/enKwgPES +X9po+TdCvRB/RUObBaM761EcrLSM1GqHNueSfqnho3AjLQ6dBnPWlo638Zm1VebK +BELyhkLWMSFkKwDmne0jQ02Y4g075vCKvCsCAwEAAaNjMGEwHQYDVR0OBBYEFH7R +4yC+UehIIPeuL8Zqw3PzbgcZMB8GA1UdIwQYMBaAFH7R4yC+UehIIPeuL8Zqw3Pz +bgcZMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgGGMA0GCSqGSIb3DQEB +CwUAA4IBAQBDNvD2Vm9sA5A9AlOJR8+en5Xz9hXcxJB5phxcZQ8jFoG04Vshvd0e +LEnUrMcfFgIZ4njMKTQCM4ZFUPAieyLx4f52HuDopp3e5JyIMfW+KFcNIpKwCsak +oSoKtIUOsUJK7qBVZxcrIyeQV2qcYOeZhtS5wBqIwOAhFwlCET7Ze58QHmS48slj +S9K0JAcps2xdnGu0fkzhSQxY8GPQNFTlr6rYld5+ID/hHeS76gq0YG3q6RLWRkHf +4eTkRjivAlExrFzKcljC4axKQlnOvVAzz+Gm32U0xPBF4ByePVxCJUHw1TsyTmel +RxNEp7yHoXcwn+fXna+t5JWh1gxUZty3 +-----END CERTIFICATE----- +""" + actual = await AsyncSignatory._get_certificate_common_name("/path/to/client.crt") + self.assertEqual(actual, "Red Hat IT Root CA") + + @patch("pyartcd.signatory.AsyncSignatory._get_certificate_common_name", autospec=True) + @patch("pyartcd.signatory.AsyncUMBClient", autospec=True) + async def test_start(self, AsyncUMBClient: AsyncMock, _get_certificate_common_name: AsyncMock): + uri = "failover:(stomp+ssl://stomp1.example.com:12345,stomp://stomp2.example.com:23456)" + cert_file = "/path/to/client.crt" + key_file = "/path/to/client.key" + _get_certificate_common_name.return_value = "fake-service-account" + umb = AsyncUMBClient.return_value + receiver = umb.subscribe.return_value + + async def iter_messages(): + for item in range(3): + yield f"message-{item}" + return + receiver.iter_messages.side_effect = iter_messages + signatory = AsyncSignatory(uri, cert_file, key_file, sig_keyname="test", requestor="fake-requestor", subscription_name="fake-subscription") + await signatory.start() + umb.subscribe.assert_awaited_once_with("/queue/Consumer.fake-service-account.fake-subscription.VirtualTopic.eng.robosignatory.art.sign", "fake-subscription") + + @patch("pyartcd.signatory.datetime", wraps=datetime) + @patch("pyartcd.signatory.AsyncUMBClient", autospec=True) + async def test_handle_messages_with_stale_message(self, AsyncUMBClient: AsyncMock, datetime: MagicMock): + uri = "failover:(stomp+ssl://stomp1.example.com:12345,stomp://stomp2.example.com:23456)" + cert_file = "/path/to/client.crt" + key_file = "/path/to/client.key" + signatory = AsyncSignatory(uri, cert_file, key_file, sig_keyname="test", requestor="fake-requestor", subscription_name="fake-subscription") + receiver = signatory._receiver = MagicMock(id="fake-subscription") + datetime.utcnow.return_value = datetime(2023, 1, 2, 0, 0, 0) + + async def iter_messages(): + messages = [ + MagicMock( + headers={"message-id": "fake-message-id", "timestamp": datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc).timestamp() * 1000}, + body="") + ] + for item in messages: + yield item + receiver.iter_messages.side_effect = iter_messages + umb = AsyncUMBClient.return_value + + await signatory._handle_messages() + + umb.ack.assert_awaited_once_with("fake-message-id", "fake-subscription") + + @patch("pyartcd.signatory.datetime", wraps=datetime) + @patch("pyartcd.signatory.AsyncUMBClient", autospec=True) + async def test_handle_messages_with_invalid_message(self, AsyncUMBClient: AsyncMock, datetime: MagicMock): + uri = "failover:(stomp+ssl://stomp1.example.com:12345,stomp://stomp2.example.com:23456)" + cert_file = "/path/to/client.crt" + key_file = "/path/to/client.key" + signatory = AsyncSignatory(uri, cert_file, key_file, sig_keyname="test", requestor="fake-requestor", subscription_name="fake-subscription") + receiver = signatory._receiver = MagicMock(id="fake-subscription") + datetime.utcnow.return_value = datetime(2023, 1, 1, 0, 1, 0) + + async def iter_messages(): + messages = [ + MagicMock( + headers={"message-id": "fake-message-id", "timestamp": datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc).timestamp() * 1000}, + body="") + ] + for item in messages: + yield item + receiver.iter_messages.side_effect = iter_messages + umb = AsyncUMBClient.return_value + + await signatory._handle_messages() + + umb.ack.assert_not_called() + + @patch("pyartcd.signatory.datetime", wraps=datetime) + @patch("pyartcd.signatory.AsyncUMBClient", autospec=True) + async def test_handle_messages_with_valid_message(self, AsyncUMBClient: AsyncMock, datetime: MagicMock): + uri = "failover:(stomp+ssl://stomp1.example.com:12345,stomp://stomp2.example.com:23456)" + cert_file = "/path/to/client.crt" + key_file = "/path/to/client.key" + signatory = AsyncSignatory(uri, cert_file, key_file, sig_keyname="test", requestor="fake-requestor", subscription_name="fake-subscription") + receiver = signatory._receiver = MagicMock(id="fake-subscription") + datetime.utcnow.return_value = datetime(2023, 1, 1, 0, 1, 0) + signatory._requests["fake-request-id"] = asyncio.get_event_loop().create_future() + + async def iter_messages(): + messages = [ + MagicMock( + headers={"message-id": "fake-message-id", "timestamp": datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc).timestamp() * 1000}, + body=json.dumps({"msg": {"request_id": "fake-request-id"}})) + ] + for item in messages: + yield item + receiver.iter_messages.side_effect = iter_messages + umb = AsyncUMBClient.return_value + + await signatory._handle_messages() + + umb.ack.assert_awaited_once_with("fake-message-id", "fake-subscription") + message_headers, message_body = await signatory._requests["fake-request-id"] + self.assertEqual(message_headers["message-id"], "fake-message-id") + self.assertEqual(message_body["msg"]["request_id"], "fake-request-id") + + @patch("pyartcd.signatory.datetime", wraps=datetime) + @patch("uuid.uuid4", autospec=True) + @patch("pyartcd.signatory.AsyncUMBClient", autospec=True) + async def test_sign_artifact(self, AsyncUMBClient: AsyncMock, uuid4: MagicMock, datetime: MagicMock): + uri = "failover:(stomp+ssl://stomp1.example.com:12345,stomp://stomp2.example.com:23456)" + cert_file = "/path/to/client.crt" + key_file = "/path/to/client.key" + signatory = AsyncSignatory(uri, cert_file, key_file, sig_keyname="test", requestor="fake-requestor", subscription_name="fake-subscription") + artifact = BytesIO(b"fake_artifact") + sig_file = BytesIO() + uuid4.return_value = "fake-uuid" + datetime.utcnow.return_value = datetime(2023, 1, 2, 12, 30, 40) + umb = AsyncUMBClient.return_value + response_headers = {} + response_body = { + "msg": { + "artifact_meta": { + "name": "sha256sum.txt.gpg", + "product": "openshift", + "release_name": "4.0.1", + "type": "message-digest" + }, + "signing_status": "success", + "errors": [], + "signed_artifact": base64.b64encode(b'fake-signature').decode()} + } + expected_requested_id = 'openshift-message-digest-20230102123040-fake-uuid' + asyncio.get_event_loop().call_soon(lambda: signatory._requests[expected_requested_id].set_result((response_headers, response_body))) + + await signatory._sign_artifact("message-digest", "openshift", "4.0.1", "sha256sum.txt.gpg", artifact, sig_file) + umb.send.assert_awaited_once_with(signatory.SEND_DESTINATION, ANY) + self.assertEqual(sig_file.getvalue(), b'fake-signature') + + @patch("pyartcd.signatory.AsyncSignatory._sign_artifact") + @patch("pyartcd.signatory.AsyncUMBClient", autospec=True) + async def test_sign_message_digest(self, AsyncUMBClient: AsyncMock, _sign_artifact: AsyncMock): + uri = "failover:(stomp+ssl://stomp1.example.com:12345,stomp://stomp2.example.com:23456)" + cert_file = "/path/to/client.crt" + key_file = "/path/to/client.key" + signatory = AsyncSignatory(uri, cert_file, key_file, sig_keyname="test", requestor="fake-requestor", subscription_name="fake-subscription") + artifact = BytesIO(b"fake_artifact") + sig_file = BytesIO() + _sign_artifact.side_effect = lambda *args, **kwargs: sig_file.write(b"fake-signature") + + await signatory.sign_message_digest("openshift", "4.0.1", artifact, sig_file) + _sign_artifact.assert_awaited_once_with(typ='message-digest', product='openshift', release_name='4.0.1', name='sha256sum.txt.gpg', artifact=artifact, sig_file=sig_file) + self.assertEqual(sig_file.getvalue(), b'fake-signature') + + @patch("pyartcd.signatory.AsyncSignatory._sign_artifact") + @patch("pyartcd.signatory.AsyncUMBClient", autospec=True) + async def test_sign_json_digest(self, AsyncUMBClient: AsyncMock, _sign_artifact: AsyncMock): + uri = "failover:(stomp+ssl://stomp1.example.com:12345,stomp://stomp2.example.com:23456)" + cert_file = "/path/to/client.crt" + key_file = "/path/to/client.key" + signatory = AsyncSignatory(uri, cert_file, key_file, sig_keyname="test", requestor="fake-requestor", subscription_name="fake-subscription") + sig_file = BytesIO() + _sign_artifact.side_effect = lambda *args, **kwargs: sig_file.write(b"fake-signature") + pullspec = "example.com/fake/repo@sha256:dead-beef" + + await signatory.sign_json_digest("openshift", "4.0.1", pullspec, "sha256:dead-beef", sig_file) + _sign_artifact.assert_awaited_once_with(typ='json-digest', product='openshift', release_name='4.0.1', name='sha256=dead-beef', artifact=ANY, sig_file=sig_file) + self.assertEqual(sig_file.getvalue(), b'fake-signature') diff --git a/pyartcd/tests/test_umb_client.py b/pyartcd/tests/test_umb_client.py new file mode 100644 index 0000000000..f5f0360286 --- /dev/null +++ b/pyartcd/tests/test_umb_client.py @@ -0,0 +1,130 @@ +import asyncio +from unittest import IsolatedAsyncioTestCase, TestCase +from unittest.mock import ANY, MagicMock, patch + +from pyartcd.umb_client import AsyncUMBClient, parse_stomp_uri, parse_broker_uri + + +class TestModuleFunctions(TestCase): + def test_parse_stomp_uri_with_default_port(self): + uri = "stomp://broker.example.com" + expected = ("stomp", "broker.example.com", 61613) + actual = parse_stomp_uri(uri) + self.assertEqual(actual, expected) + + def test_parse_stomp_uri_with_port(self): + uri = "stomp+ssl://broker.example.com:12345" + expected = ("stomp+ssl", "broker.example.com", 12345) + actual = parse_stomp_uri(uri) + self.assertEqual(actual, expected) + + def test_parse_stomp_uri_with_invalid_scheme(self): + uri = "unsupported://broker.example.com:12345" + with self.assertRaises(ValueError): + parse_stomp_uri(uri) + + def test_parse_broker_uri_with_stomp_scheme(self): + uri = "stomp+ssl://broker.example.com:12345" + expected = [("stomp+ssl", "broker.example.com", 12345)] + actual = parse_broker_uri(uri) + self.assertEqual(actual, expected) + + def test_parse_broker_uri_with_failover_scheme(self): + uri = "failover:stomp+ssl://broker1.example.com:12345,stomp+ssl://broker2.example.com:12345" + expected = [("stomp+ssl", "broker1.example.com", 12345), ("stomp+ssl", "broker2.example.com", 12345)] + actual = parse_broker_uri(uri) + self.assertEqual(actual, expected) + + +class TestAsyncUMBClient(IsolatedAsyncioTestCase): + + @patch("stomp.StompConnection11", autospec=True) + def test_create_connection(self, StompConnection11: MagicMock): + uri = "failover:(stomp+ssl://stomp1.example.com:12345,stomp://stomp2.example.com:23456)" + cert_file = "/path/to/client.crt" + key_file = "/path/to/client.key" + actual = AsyncUMBClient._create_connection(uri, cert_file, key_file) + conn = StompConnection11.return_value + self.assertEqual(actual, conn) + StompConnection11.assert_called_once_with(host_and_ports=[("stomp1.example.com", 12345), ("stomp2.example.com", 23456)]) + conn.set_ssl.assert_called_once_with(for_hosts=[("stomp1.example.com", 12345)], cert_file=cert_file, key_file=key_file) + + async def test_call_in_sender_thread(self): + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + actual = await client._call_in_sender_thread(lambda: "foo") + self.assertEqual(actual, "foo") + + async def test_call_in_sender_thread_with_exception(self): + def func(): + raise ValueError("Test error") + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + with self.assertRaises(ValueError): + await client._call_in_sender_thread(func) + + @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) + async def test_connect(self, _create_connection: MagicMock): + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + loop = asyncio.get_event_loop() + loop.call_soon(lambda: client._listener._complete_future("on_connected", None)) + await client.connect() + conn = _create_connection.return_value + conn.connect.assert_called_once_with(wait=False, headers={"receipt": ANY}) + + @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) + async def test_disconnect(self, _create_connection: MagicMock): + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + client.connected = True + loop = asyncio.get_event_loop() + loop.call_soon(lambda: client._listener._complete_future("on_disconnected", None)) + await client.disconnect() + conn = _create_connection.return_value + conn.disconnect.assert_called_once_with(receipt=ANY) + + @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) + async def test_subscribe(self, _create_connection: MagicMock): + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + receiver = await client.subscribe(destination="/topic/foo.bar", id="fake-subscription") + conn = _create_connection.return_value + conn.subscribe.assert_called_once_with(destination="/topic/foo.bar", id="fake-subscription", ack="client-individual") + self.assertEqual(receiver.id, "fake-subscription") + + @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) + async def test_unsubscribe(self, _create_connection: MagicMock): + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + client._listener._receivers["fake-subscription"] = MagicMock() + await client.unsubscribe(id="fake-subscription") + conn = _create_connection.return_value + conn.unsubscribe.assert_called_once_with(id="fake-subscription") + + @patch("uuid.uuid4", autospec=True) + @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) + async def test_send(self, _create_connection: MagicMock, uuid4: MagicMock): + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + uuid4.return_value = "fake-uuid" + loop = asyncio.get_event_loop() + loop.call_soon(lambda: client._listener._complete_future(uuid4.return_value, None)) + await client.send(destination="/topic/foo.bar", body="fake-content") + conn = _create_connection.return_value + conn.send.assert_called_once_with(body="fake-content", destination="/topic/foo.bar", headers={"receipt": uuid4.return_value}) + + @patch("uuid.uuid4", autospec=True) + @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) + async def test_ack(self, _create_connection: MagicMock, uuid4: MagicMock): + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + uuid4.return_value = "fake-uuid" + loop = asyncio.get_event_loop() + loop.call_soon(lambda: client._listener._complete_future(uuid4.return_value, None)) + await client.ack(message_id="fake-message-id", subscription="fake-subscription") + conn = _create_connection.return_value + conn.ack.assert_called_once_with("fake-message-id", "fake-subscription", receipt=uuid4.return_value) + + @patch("uuid.uuid4", autospec=True) + @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) + async def test_nack(self, _create_connection: MagicMock, uuid4: MagicMock): + client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + uuid4.return_value = "fake-uuid" + loop = asyncio.get_event_loop() + loop.call_soon(lambda: client._listener._complete_future(uuid4.return_value, None)) + await client.nack(message_id="fake-message-id", subscription="fake-subscription") + conn = _create_connection.return_value + conn.nack.assert_called_once_with("fake-message-id", "fake-subscription", receipt=uuid4.return_value) From 481d1a77bbce867e3c219633d24ddcc01268dedb Mon Sep 17 00:00:00 2001 From: Yuxiang Zhu Date: Wed, 2 Aug 2023 15:30:37 +0800 Subject: [PATCH 2/2] Address review suggestions and fix unit tests --- pyartcd/pyartcd/umb_client.py | 4 ++-- pyartcd/pyartcd/util.py | 8 ++++---- pyartcd/requirements.txt | 2 +- pyartcd/tests/test_umb_client.py | 23 ++++++++++++++++++++++- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/pyartcd/pyartcd/umb_client.py b/pyartcd/pyartcd/umb_client.py index 9cb7ed706f..ec7034f370 100644 --- a/pyartcd/pyartcd/umb_client.py +++ b/pyartcd/pyartcd/umb_client.py @@ -168,11 +168,11 @@ def on_disconnected(self): # notify UMB client self._client.on_disconnected() # notify all pending futures of the disconnection - for id in self._futures.keys(): + for future_id in self._futures.keys(): if id == "on_disconnected": self._complete_future("on_disconnected", None) else: - self._err_future(id, IOError("Connection lost")) + self._err_future(future_id, IOError("Connection lost")) def on_heartbeat_timeout(self): self.__print("on_heartbeat_timeout") diff --git a/pyartcd/pyartcd/util.py b/pyartcd/pyartcd/util.py index 6a9770ecc9..c23e1ccd20 100644 --- a/pyartcd/pyartcd/util.py +++ b/pyartcd/pyartcd/util.py @@ -298,7 +298,7 @@ async def is_build_permitted(version: str, data_path: str = constants.OCP_BUILD_ # Check for frozen scheduled automation if freeze_automation == "scheduled" and not is_manual_build(): logger.info('Only manual runs are permitted according to freeze_automation in group.yml ' - 'and this run appears to be non-manual.') + 'and this run appears to be non-manual.') return False # Check if group can run on weekends @@ -568,9 +568,9 @@ def mail_build_failure_owners(failed_builds: dict, doozer_working: str, mail_cli container_log = "Unfortunately there were no container build logs; " \ "something else about the build failed." logger.warning('No container build log for failed %s build\n' - '(task url %s)\n' - 'at path %s', - failure['distgit'], failure['task_url'], container_log) + '(task url %s)\n' + 'at path %s', + failure['distgit'], failure['task_url'], container_log) explanation_body = f"ART's brew/OSBS build of OCP image {failure['image']}:{failure['version']} has failed.\n\n" if failure['owners']: diff --git a/pyartcd/requirements.txt b/pyartcd/requirements.txt index 93d198c8bd..884b7fc5f9 100644 --- a/pyartcd/requirements.txt +++ b/pyartcd/requirements.txt @@ -15,7 +15,7 @@ pygit2 == 1.10.1 # https://github.com/libgit2/pygit2/issues/1176 rh-doozer rh-elliott ruamel.yaml -semver >= 2.13.0 +semver ~= 3.0.1 slack_sdk >= 3.13.0 stomp.py ~= 8.1.0 tenacity diff --git a/pyartcd/tests/test_umb_client.py b/pyartcd/tests/test_umb_client.py index f5f0360286..b12c94d89c 100644 --- a/pyartcd/tests/test_umb_client.py +++ b/pyartcd/tests/test_umb_client.py @@ -51,6 +51,7 @@ def test_create_connection(self, StompConnection11: MagicMock): async def test_call_in_sender_thread(self): client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + client._sender_loop = asyncio.get_event_loop() actual = await client._call_in_sender_thread(lambda: "foo") self.assertEqual(actual, "foo") @@ -58,11 +59,14 @@ async def test_call_in_sender_thread_with_exception(self): def func(): raise ValueError("Test error") client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + client._sender_loop = asyncio.get_event_loop() with self.assertRaises(ValueError): await client._call_in_sender_thread(func) @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) async def test_connect(self, _create_connection: MagicMock): + stomp_conn = _create_connection.return_value + stomp_conn.is_connected.return_value = False client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") loop = asyncio.get_event_loop() loop.call_soon(lambda: client._listener._complete_future("on_connected", None)) @@ -73,7 +77,9 @@ async def test_connect(self, _create_connection: MagicMock): @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) async def test_disconnect(self, _create_connection: MagicMock): client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") - client.connected = True + stomp_conn = _create_connection.return_value + stomp_conn.is_connected.return_value = True + client._sender_loop = asyncio.get_event_loop() loop = asyncio.get_event_loop() loop.call_soon(lambda: client._listener._complete_future("on_disconnected", None)) await client.disconnect() @@ -83,6 +89,9 @@ async def test_disconnect(self, _create_connection: MagicMock): @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) async def test_subscribe(self, _create_connection: MagicMock): client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + stomp_conn = _create_connection.return_value + stomp_conn.is_connected.return_value = True + client._sender_loop = asyncio.get_event_loop() receiver = await client.subscribe(destination="/topic/foo.bar", id="fake-subscription") conn = _create_connection.return_value conn.subscribe.assert_called_once_with(destination="/topic/foo.bar", id="fake-subscription", ack="client-individual") @@ -91,6 +100,9 @@ async def test_subscribe(self, _create_connection: MagicMock): @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) async def test_unsubscribe(self, _create_connection: MagicMock): client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + stomp_conn = _create_connection.return_value + stomp_conn.is_connected.return_value = True + client._sender_loop = asyncio.get_event_loop() client._listener._receivers["fake-subscription"] = MagicMock() await client.unsubscribe(id="fake-subscription") conn = _create_connection.return_value @@ -100,6 +112,9 @@ async def test_unsubscribe(self, _create_connection: MagicMock): @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) async def test_send(self, _create_connection: MagicMock, uuid4: MagicMock): client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + stomp_conn = _create_connection.return_value + stomp_conn.is_connected.return_value = True + client._sender_loop = asyncio.get_event_loop() uuid4.return_value = "fake-uuid" loop = asyncio.get_event_loop() loop.call_soon(lambda: client._listener._complete_future(uuid4.return_value, None)) @@ -111,6 +126,9 @@ async def test_send(self, _create_connection: MagicMock, uuid4: MagicMock): @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) async def test_ack(self, _create_connection: MagicMock, uuid4: MagicMock): client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + stomp_conn = _create_connection.return_value + stomp_conn.is_connected.return_value = True + client._sender_loop = asyncio.get_event_loop() uuid4.return_value = "fake-uuid" loop = asyncio.get_event_loop() loop.call_soon(lambda: client._listener._complete_future(uuid4.return_value, None)) @@ -122,6 +140,9 @@ async def test_ack(self, _create_connection: MagicMock, uuid4: MagicMock): @patch("pyartcd.umb_client.AsyncUMBClient._create_connection", autospec=True) async def test_nack(self, _create_connection: MagicMock, uuid4: MagicMock): client = AsyncUMBClient("stomp+ssl://stomp1.example.com:12345", cert_file="/path/to/client.crt", key_file="/path/to/client.key") + stomp_conn = _create_connection.return_value + stomp_conn.is_connected.return_value = True + client._sender_loop = asyncio.get_event_loop() uuid4.return_value = "fake-uuid" loop = asyncio.get_event_loop() loop.call_soon(lambda: client._listener._complete_future(uuid4.return_value, None))