From 69fbf7785d788b4f5f07660dd51330a9a7fe2831 Mon Sep 17 00:00:00 2001 From: mhorky Date: Tue, 6 Feb 2024 01:58:58 +0100 Subject: [PATCH] RHEL-2480: Do not create /root/.gnupg/ directory by accident (#3930) * feat(pgp): Add crypto.py with PGP signature verification * Card ID: CCT-131 * Card ID: RHEL-2480 * Card ID: RHEL-2482 This patch adds a self-contained and isolated GPG verification environment. It runs GPG in an isolated environment where only selected PGP keys are allowed to check the file signature matches its file. GPG creates a directory `$HOME/.gnupg/` every time it performs some operation. When run under root, but not manually (e.g. via subscription-manager Cockpit plugin), it tries to create and write to this directory, which pollutes user directories and/or causes SELinux denials. This patch utilizes the `--homedir` argument GPG supports in order to move the GPG home directory to a temporary directory for the time of the transaction. After the GPG action is performed, the directory is cleaned up. Signed-off-by: mhorky * feat(pgp): Use crypto.py during Egg and Collection verification * Card ID: CCT-131 * Card ID: RHEL-2480 * Card ID: RHEL-2482 Signed-off-by: mhorky * chore: Add .gitleaks.toml This configuration file ensures the dummy PGP public/private key pair is not flagged by security tools as a false positive. Signed-off-by: mhorky --------- Signed-off-by: mhorky --- .gitleaks.toml | 13 ++ insights/client/__init__.py | 71 +++++---- insights/client/collection_rules.py | 24 ++-- insights/client/crypto.py | 206 +++++++++++++++++++++++++++ insights/tests/client/test_crypto.py | 187 ++++++++++++++++++++++++ 5 files changed, 447 insertions(+), 54 deletions(-) create mode 100644 .gitleaks.toml create mode 100644 insights/client/crypto.py create mode 100644 insights/tests/client/test_crypto.py diff --git a/.gitleaks.toml b/.gitleaks.toml new file mode 100644 index 0000000000..566fb938af --- /dev/null +++ b/.gitleaks.toml @@ -0,0 +1,13 @@ +[extend] +useDefault = true + + +[allowlist] +description = "Repository-specific configuration" + +paths = [ + # Tests for `crypto.py` contain public and private GPG keypair. They were + # generated specifically for this usecase. The owner of the key is: + # insights-core (Signing key for unit testing) + "insights/tests/client/test_crypto.py", +] diff --git a/insights/client/__init__.py b/insights/client/__init__.py index 8b19cb21a1..83e67c02d2 100644 --- a/insights/client/__init__.py +++ b/insights/client/__init__.py @@ -3,15 +3,14 @@ import os import logging import tempfile -import shlex import shutil import sys import atexit -from subprocess import Popen, PIPE from requests import ConnectionError from .. import package_info from . import client +from . import crypto from .constants import InsightsConstants as constants from .config import InsightsConfig from .auto_config import try_auto_configuration @@ -268,7 +267,7 @@ def update(self): else: logger.debug("Egg update disabled") - def verify(self, egg_path, gpg_key=constants.pub_gpg_path): + def verify(self, egg_path): """ Verifies the GPG signature of the egg. The signature is assumed to be in the same directory as the egg and named the same as the egg @@ -283,46 +282,42 @@ def verify(self, egg_path, gpg_key=constants.pub_gpg_path): if egg_path and not os.path.isfile(egg_path): the_message = "Provided egg path %s does not exist, cannot verify." % (egg_path) logger.debug(the_message) - return {'gpg': False, - 'stderr': the_message, - 'stdout': the_message, - 'rc': 1, - 'message': the_message} - if self.config.gpg and gpg_key and not os.path.isfile(gpg_key): - the_message = ("Running in GPG mode but cannot find " - "file %s to verify against." % (gpg_key)) - logger.debug(the_message) - return {'gpg': False, - 'stderr': the_message, - 'stdout': the_message, - 'rc': 1, - 'message': the_message} + return { + 'gpg': False, + 'stderr': the_message, + 'stdout': the_message, + 'rc': 1, + 'message': the_message, + } # if we are running in no_gpg or not gpg mode then return true if not self.config.gpg: - return {'gpg': True, - 'stderr': None, - 'stdout': None, - 'rc': 0} + return { + 'gpg': True, + 'stderr': None, + 'stdout': None, + 'rc': 0, + } # if a valid egg path and gpg were received do the verification - if egg_path and gpg_key: - cmd_template = '/usr/bin/gpg --verify --keyring %s %s %s' - cmd = cmd_template % (gpg_key, egg_path + '.asc', egg_path) - logger.debug(cmd) - process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) - stdout, stderr = process.communicate() - rc = process.returncode - logger.debug("GPG return code: %s" % rc) - return {'gpg': True if rc == 0 else False, - 'stderr': stderr, - 'stdout': stdout, - 'rc': rc} - else: - return {'gpg': False, - 'stderr': 'Must specify a valid core and gpg key.', - 'stdout': 'Must specify a valid core and gpg key.', - 'rc': 1} + if egg_path: + result = crypto.verify_gpg_signed_file( + file=egg_path, signature=egg_path + ".asc", + keys=[constants.pub_gpg_path], + ) + return { + 'gpg': result.ok, + 'rc': result.return_code, + 'stdout': result.stdout, + 'stderr': result.stderr, + } + + return { + 'gpg': False, + 'stderr': 'Must specify a valid core and gpg key.', + 'stdout': 'Must specify a valid core and gpg key.', + 'rc': 1, + } def install(self, new_egg, new_egg_gpg_sig): """ diff --git a/insights/client/collection_rules.py b/insights/client/collection_rules.py index 13b8875cef..17ac6f5f9d 100644 --- a/insights/client/collection_rules.py +++ b/insights/client/collection_rules.py @@ -6,15 +6,14 @@ import json import logging import six -import shlex import os import requests import yaml import stat from six.moves import configparser as ConfigParser -from subprocess import Popen, PIPE, STDOUT from tempfile import NamedTemporaryFile +from . import crypto from .constants import InsightsConstants as constants APP_NAME = constants.app_name @@ -139,22 +138,15 @@ def validate_gpg_sig(self, path, sig=None): Validate the collection rules """ logger.debug("Verifying GPG signature of Insights configuration") + if sig is None: sig = path + ".asc" - command = ("/usr/bin/gpg --no-default-keyring " - "--keyring " + constants.pub_gpg_path + - " --verify " + sig + " " + path) - if not six.PY3: - command = command.encode('utf-8', 'ignore') - args = shlex.split(command) - logger.debug("Executing: %s", args) - proc = Popen( - args, shell=False, stdout=PIPE, stderr=STDOUT, close_fds=True) - stdout, stderr = proc.communicate() - logger.debug("STDOUT: %s", stdout) - logger.debug("STDERR: %s", stderr) - logger.debug("Status: %s", proc.returncode) - if proc.returncode: + + result = crypto.verify_gpg_signed_file( + file=path, signature=sig, + keys=[constants.pub_gpg_path], + ) + if not result.ok: logger.error("ERROR: Unable to validate GPG signature: %s", path) return False else: diff --git a/insights/client/crypto.py b/insights/client/crypto.py new file mode 100644 index 0000000000..21f86acbae --- /dev/null +++ b/insights/client/crypto.py @@ -0,0 +1,206 @@ +import errno +import os.path +import logging +import shutil +import tempfile +import subprocess + +logger = logging.getLogger(__name__) + + +class GPGCommandResult(object): + """Output of a GPGCommand. + + Attributes: + ok (bool): Result of an operation. + return_code (int): Return code of the operation. + stdout (str): Standard output of the command. + stderr (str): Standard error of the command. + _command (GPGCommand | None): + An optional reference to the GPGCommand object that created the result + """ + def __init__(self, ok, return_code, stdout, stderr, command): + self.ok = ok + self.return_code = return_code + self.stdout = stdout + self.stderr = stderr + self._command = command + + def __str__(self): + return ( + "<{cls} ok={ok} return_code={code} stdout={out} stderr={err}>" + ).format( + cls=self.__class__.__name__, + ok=self.ok, code=self.return_code, + out=self.stdout, err=self.stderr, + ) + + +class GPGCommand(object): + """GPG command run in a temporary environment. + + Attributes: + command (list(str)): The command to be executed. + keys (list(str)): List of paths to GPG public keys to check against. + + _home (str): Path to the temporary GPG home directory. + _raw_command (list(str)): The last invoked command. + """ + + def __init__(self, command, keys): + self.command = command + self.keys = keys + + self._home = None + self._raw_command = None + + def __str__(self): + return "<{cls} _home={home} _raw_command={raw}>".format( + cls=self.__class__.__name__, + home=self._home, + raw=self._raw_command + ) + + def _setup(self): + """Prepare GPG environment. + + Returns (bool): + `True` if public GPG keys were imported into temporary environment, + `False` if there was an error. + """ + self._home = tempfile.mkdtemp() + + logger.debug("setting up gpg in the temporary environment") + for key in self.keys: + result = self._run(["--import", key]) + if not result.ok: + logger.debug("failed to import key '{key}': {result}".format( + key=key, result=result + )) + return False + + return True + + def _cleanup(self): + """Clean up GPG environment.""" + # Try to delete the temporary directory GPG used. As discovered by the + # convert2rhel team, we need to handle race conditions: + # https://github.com/oamg/convert2rhel/blob/23aadbf0df58c79a8910847d345fcd4092f4656f/convert2rhel/utils.py#L867-L893 + + # GPG writes a temporary socket file for the gpg-agent into the home + # directory. Sometimes it removes the socket file after `rmtree()` has + # determined it should be deleted but before the actual deletion occurs. + # This will cause a FileNotFoundError/OSError. + # When we encounter that, try to run `rmtree()` again. + for _ in range(0, 5): + try: + shutil.rmtree(self._home) + except OSError as exc: + if exc.errno == errno.ENOENT: + # We are trying to remove a file that has already been + # removed by gpg-agent itself. Ignore it. + continue + # Some other error has happened, let it bubble up. + raise + else: + # We have successfully removed everything. This `break` will + # prevent the log statement below from being run. + break + else: + # We called `rmtree()` five times and it failed each time. We cannot + # do more without knowing more. + logger.debug( + "could not clean up temporary gpg directory " + "'{path}'".format(path=self._home) + ) + + def _run(self, command): + """Run the actual command. + + Returns (CommandResult): The result of the shell command. + """ + self._raw_command = ["/usr/bin/gpg", "--homedir", self._home] + command + process = subprocess.Popen( + self._raw_command, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + ) + stdout, stderr = process.communicate() + + result = GPGCommandResult( + ok=process.returncode == 0, + return_code=process.returncode, + stdout=stdout.decode("utf-8"), + stderr=stderr.decode("utf-8"), + command=self, + ) + + if result.ok: + logger.debug("gpg command {command}: ok".format(command=command)) + else: + logger.debug( + "gpg command {command} returned non-zero code: {result}".format( + command=command, result=result, + ) + ) + + return result + + def evaluate(self): + """Run the command. + + Returns (CommandResult): The result of the shell command. + """ + try: + ok = self._setup() + if not ok: + return + + logger.debug("running gpg in the temporary environment") + return self._run(self.command) + finally: + self._cleanup() + + +def verify_gpg_signed_file(file, signature, keys): + """ + Verify a file that was signed using GPG. + + Parameters: + file (str): A path to the signed file. + signature (str): A path to the detached signature. + keys (list(str)): + List of paths to GPG public keys on the filesystem to check against. + + Returns (CommandResult): Evaluated GPG command. + """ + if not os.path.isfile(file): + logger.debug( + "cannot verify signature of '{file}', file does not exist".format( + file=file + ) + ) + raise OSError(errno.ENOENT, "File '{file}' not found".format(file=file)) + + if not os.path.isfile(signature): + logger.debug(( + "cannot verify signature of '{file}', " + "signature '{signature}' does not exist" + ).format(file=file, signature=signature)) + raise OSError( + errno.ENOENT, + "Signature '{sig}' of file '{file}' not found.".format( + sig=signature, file=file + ) + ) + + gpg = GPGCommand(["--verify", signature, file], keys) + logger.debug("starting gpg verification process for '{file}'".format(file=file)) + + result = gpg.evaluate() + + if result.ok: + logger.debug("signature verification of '{file}' passed".format(file=file)) + else: + logger.debug("signature verification of '{file}' failed".format(file=file)) + + return result diff --git a/insights/tests/client/test_crypto.py b/insights/tests/client/test_crypto.py new file mode 100644 index 0000000000..c3258a9ad4 --- /dev/null +++ b/insights/tests/client/test_crypto.py @@ -0,0 +1,187 @@ +import os.path +import shutil +import subprocess +import tempfile + +from insights.client import crypto + + +# GPG2 exports keys in a way gpg1.4 cannot read. Since gpg1.4 is present in the +# Ubuntu image that is used in CI to test Python 2.6, we have to use the old +# version, so it is recognised by all CI branches. +# +# $ gpg --gen-key # gpg1.x +# $ gpg --full-generate-key # gpg2.x +# $ gpg --list-secret-keys --keyid-format long +# $ KEYID=xxx (line `sec`, after the `/` character) +# $ gpg --armor --export $KEYID +# $ gpg --armor --export-secret-keys $KEYID + +GPG_PUBLIC_KEY = """-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mQENBGUlPEcBCACXhinLd4KiyQ9CWXM+gpgo0HMuTdESTlVVDYRjm/ebazephXq7 +0hhAhXallAQJkaXFPuLETumwFYPx60agUDWsUie8gk3TLE+ejuTxoda0Yo6rehsD +zds1ptHQLzar00SFUlfiJXnMaobXLeNjDSglkynQC4uQODouDa1jkSRfNbDJOJYd +IBbPBIaJ3PlNrdVutPcx4yIZM1siqMZ9k0g2iYPWyp0ceuP0jpCTPc5TRRP5pLuz +INRJzhfJ+oPtbFt8Y4s1xEZ0Kfy0sA8Awk3VJqzoCU3ILBU5cGquQDdNggxTRSPV +X5Z4nW2zjN9lXAkwr29NL1rA1jjLt+c5upYZABEBAAG0SGluc2lnaHRzLWNvcmUg +KFNpZ25pbmcga2V5IGZvciB1bml0IHRlc3RpbmcpIDxpbnNpZ2h0cy1jb3JlQGV4 +YW1wbGUub3JnPokBOAQTAQIAIgUCZSU8RwIbAwYLCQgHAwIGFQgCCQoLBBYCAwEC +HgECF4AACgkQn+6Vu126uXAHdgf/Q8a7Jruhyn+EDLL94gAc6kXubvVArVe9Rdpo +HwG7cj8wUa/7zd7FUcYJuz6bbebgmmlRwFf1CeodGURFpfwf1dkiKV5QqobeXhRp +srrkeLlMR8bZVFSCvU+oOETpJfMo6feDI/tQrOcIpxrd/cEu2XSQ1JBM/+8NbQiU +Ma1cWHOmQJ1OD7jWOllUq5hDs1vtUzPORUsYe1V1Dcx89gdlhfc1cc30yoLzDNqu +0Abn34CAthUkr0sWplzltXOY+Za1kOmQVlaLQ0W3tq8BDi79bdHpTtD/g9QO++Nc +r5xdP+tPNWAOUlyi7S6qscDLhMWv7o4eLq6UL9eUC/M2CWJSKg== +=9pSV +-----END PGP PUBLIC KEY BLOCK----- +""" + +GPG_PRIVATE_KEY = """-----BEGIN PGP PRIVATE KEY BLOCK----- +Version: GnuPG v1 + +lQOYBGUlPEcBCACXhinLd4KiyQ9CWXM+gpgo0HMuTdESTlVVDYRjm/ebazephXq7 +0hhAhXallAQJkaXFPuLETumwFYPx60agUDWsUie8gk3TLE+ejuTxoda0Yo6rehsD +zds1ptHQLzar00SFUlfiJXnMaobXLeNjDSglkynQC4uQODouDa1jkSRfNbDJOJYd +IBbPBIaJ3PlNrdVutPcx4yIZM1siqMZ9k0g2iYPWyp0ceuP0jpCTPc5TRRP5pLuz +INRJzhfJ+oPtbFt8Y4s1xEZ0Kfy0sA8Awk3VJqzoCU3ILBU5cGquQDdNggxTRSPV +X5Z4nW2zjN9lXAkwr29NL1rA1jjLt+c5upYZABEBAAEAB/0QIdsaTA+PCEwFGuPv +sFTF56eTsvpC8i8YnpdNSaI7nFcxR8JQ8+XcHLmMmG0znZuiG/dlwicUNb42CAAd +ely0i4yqf88MYCfb8EfEyB/FVcbtz9LHfWfM1wV4nkY6VgRyE1nC/I1yq5bOmxae +CZ0QHxJxEYGa6bmcBJ3Ev4O5VSKZRRByPI0HXmVB6GoqVtmwa7TFrlgLM5GPbgVe +P76lNY2me8jEnHqzrPpuCB/N0VSCEUf45RV+TNjWL1lpF8JPzXS8oGoxcDoo+sIQ +AdcfMFrRtf3DW8kJRZC8i4T9++/k3Sjak9GE8ocekCDJkFPRkKv3A4T8nvmMKbNx +fugZBADAGD+u4Uhwlyzvr84wmwIPcozhQhTWot8dom6LCiZ47sfyG1qVhvxqDKtc +ttPv+VfA0RyJb9PUKbOf71hA7lMm0qHyX6KTknYYl8u+s/ra8VSwdNyl2/DQDf2j +OjnIMLM9IC6TMQ/mvpvRmXLc6m6HfL6ZBAbAL0EwOMUAUelnBQQAye626NZJTZ3E +mQ9z6l6UEi3IZIw+uJfSg/HL4MgU4zRNsOqyJyizdmVzQm0h+1aHQu1SuFsllPXF +DOKR/IlkjQJCbiU1wZASYYJuwmmVdG4HqZD0bjKU40fJgzBJhOfnhi079jnZzMsL ++NltcmAtUFKteWsPZe7Blic4QmIbtwUEAIick2Ai788yKGrAry5XBwHNcSIYKNr2 +FFWwjFQWAK82hG2tiKCtF8vg58KBvGR3KNUlCLqDeP85jK7+IuaExSlWKYxIKo70 +8Vfpqpyol/+V1yZ2duEcPSXtrMTg5Yl/7PITyGPnM4mFjvwg+cuYRDqYTT0kWbrl +WxtODNwtZdIURTm0SGluc2lnaHRzLWNvcmUgKFNpZ25pbmcga2V5IGZvciB1bml0 +IHRlc3RpbmcpIDxpbnNpZ2h0cy1jb3JlQGV4YW1wbGUub3JnPokBOAQTAQIAIgUC +ZSU8RwIbAwYLCQgHAwIGFQgCCQoLBBYCAwECHgECF4AACgkQn+6Vu126uXAHdgf/ +Q8a7Jruhyn+EDLL94gAc6kXubvVArVe9RdpoHwG7cj8wUa/7zd7FUcYJuz6bbebg +mmlRwFf1CeodGURFpfwf1dkiKV5QqobeXhRpsrrkeLlMR8bZVFSCvU+oOETpJfMo +6feDI/tQrOcIpxrd/cEu2XSQ1JBM/+8NbQiUMa1cWHOmQJ1OD7jWOllUq5hDs1vt +UzPORUsYe1V1Dcx89gdlhfc1cc30yoLzDNqu0Abn34CAthUkr0sWplzltXOY+Za1 +kOmQVlaLQ0W3tq8BDi79bdHpTtD/g9QO++Ncr5xdP+tPNWAOUlyi7S6qscDLhMWv +7o4eLq6UL9eUC/M2CWJSKg== +=ZQ6H +-----END PGP PRIVATE KEY BLOCK----- +""" + +GPG_FINGERPRINT = "E884 7216 86A8 1EE3 EBF4 5771 9FEE 95BB 5DBA B970" +GPG_OWNER = ( + "insights-core (Signing key for unit testing) " + "" +) + + +def _initialize_gpg_environment(home): + """Save GPG keys and sign a file with them. + + The home directory is populated with the following files: + - key.public.gpg + - key.private.gpg + - file.txt + - file.txt.asc + """ + # Save the public key into temporary file + public_key = home + "/key.public.gpg" + with open(public_key, "w") as f: + f.write(GPG_PUBLIC_KEY) + # It is strictly not necessary to import both public and private keys, + # the private key should be enough. + # However, the Python 2.6 CI image requires that. + process = subprocess.Popen( + ["/usr/bin/gpg", "--homedir", home, "--import", public_key], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + ) + process.communicate() + assert process.returncode == 0 + + # Save the private key into temporary file and import it + private_key = home + "/key.private.gpg" + with open(private_key, "w") as f: + f.write(GPG_PRIVATE_KEY) + process = subprocess.Popen( + ["/usr/bin/gpg", "--homedir", home, "--import", private_key], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + ) + process.communicate() + assert process.returncode == 0 + + # Create a file and sign it + file = home + "/file.txt" + with open(file, "w") as f: + f.write("a signed message") + process = subprocess.Popen( + ["/usr/bin/gpg", "--homedir", home, "--detach-sign", "--armor", file], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + ) + process.communicate() + assert process.returncode == 0 + + # Ensure the signature has been created + assert os.path.exists(home + "/file.txt.asc") + + +def test_valid_signature(): + """A detached file signature can be verified.""" + home = tempfile.mkdtemp() + _initialize_gpg_environment(home) + + # Run the test + result = crypto.verify_gpg_signed_file( + file=home + "/file.txt", + signature=home + "/file.txt.asc", + keys=[home + "/key.public.gpg"], + ) + shutil.rmtree(home, ignore_errors=True) + + # Verify results + assert True is result.ok + assert "" == result.stdout + assert ( + 'gpg: Good signature from "{owner}"'.format(owner=GPG_OWNER) + ) in result.stderr + assert ( + 'Primary key fingerprint: {fp}'.format(fp=GPG_FINGERPRINT) + ) in result.stderr + assert 0 == result.return_code + + assert not os.path.isfile(result._command._home) + + +def test_invalid_signature(): + """A bad detached file signature can be detected.""" + home = tempfile.mkdtemp() + _initialize_gpg_environment(home) + + # Change the contents of the file, making the signature incorrect + with open(home + "/file.txt", "w") as f: + f.write("an unsigned message") + + # Run the test + result = crypto.verify_gpg_signed_file( + file=home + "/file.txt", + signature=home + "/file.txt.asc", + keys=[home + "/key.public.gpg"], + ) + shutil.rmtree(home, ignore_errors=True) + + # Verify results + assert False is result.ok + assert "" == result.stdout + assert ( + 'gpg: BAD signature from "{owner}"'.format(owner=GPG_OWNER) + ) in result.stderr + assert ( + 'Primary key fingerprint: {fp}'.format(fp=GPG_FINGERPRINT) + ) not in result.stderr + assert 1 == result.return_code + + assert not os.path.isfile(result._command._home)