-
Notifications
You must be signed in to change notification settings - Fork 181
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
RHEL-2480: Do not create /root/.gnupg/ directory by accident (#3930)
* feat(pgp): Add crypto.py with PGP signature verification * Card ID: CCT-131 * Card ID: RHEL-2480 * Card ID: RHEL-2482 This patch adds a self-contained and isolated GPG verification environment. It runs GPG in an isolated environment where only selected PGP keys are allowed to check the file signature matches its file. GPG creates a directory `$HOME/.gnupg/` every time it performs some operation. When run under root, but not manually (e.g. via subscription-manager Cockpit plugin), it tries to create and write to this directory, which pollutes user directories and/or causes SELinux denials. This patch utilizes the `--homedir` argument GPG supports in order to move the GPG home directory to a temporary directory for the time of the transaction. After the GPG action is performed, the directory is cleaned up. Signed-off-by: mhorky <[email protected]> * feat(pgp): Use crypto.py during Egg and Collection verification * Card ID: CCT-131 * Card ID: RHEL-2480 * Card ID: RHEL-2482 Signed-off-by: mhorky <[email protected]> * chore: Add .gitleaks.toml This configuration file ensures the dummy PGP public/private key pair is not flagged by security tools as a false positive. Signed-off-by: mhorky <[email protected]> --------- Signed-off-by: mhorky <[email protected]>
- Loading branch information
Showing
5 changed files
with
447 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
[extend] | ||
useDefault = true | ||
|
||
|
||
[allowlist] | ||
description = "Repository-specific configuration" | ||
|
||
paths = [ | ||
# Tests for `crypto.py` contain public and private GPG keypair. They were | ||
# generated specifically for this usecase. The owner of the key is: | ||
# insights-core (Signing key for unit testing) <[email protected]> | ||
"insights/tests/client/test_crypto.py", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,206 @@ | ||
import errno | ||
import os.path | ||
import logging | ||
import shutil | ||
import tempfile | ||
import subprocess | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class GPGCommandResult(object): | ||
"""Output of a GPGCommand. | ||
Attributes: | ||
ok (bool): Result of an operation. | ||
return_code (int): Return code of the operation. | ||
stdout (str): Standard output of the command. | ||
stderr (str): Standard error of the command. | ||
_command (GPGCommand | None): | ||
An optional reference to the GPGCommand object that created the result | ||
""" | ||
def __init__(self, ok, return_code, stdout, stderr, command): | ||
self.ok = ok | ||
self.return_code = return_code | ||
self.stdout = stdout | ||
self.stderr = stderr | ||
self._command = command | ||
|
||
def __str__(self): | ||
return ( | ||
"<{cls} ok={ok} return_code={code} stdout={out} stderr={err}>" | ||
).format( | ||
cls=self.__class__.__name__, | ||
ok=self.ok, code=self.return_code, | ||
out=self.stdout, err=self.stderr, | ||
) | ||
|
||
|
||
class GPGCommand(object): | ||
"""GPG command run in a temporary environment. | ||
Attributes: | ||
command (list(str)): The command to be executed. | ||
keys (list(str)): List of paths to GPG public keys to check against. | ||
_home (str): Path to the temporary GPG home directory. | ||
_raw_command (list(str)): The last invoked command. | ||
""" | ||
|
||
def __init__(self, command, keys): | ||
self.command = command | ||
self.keys = keys | ||
|
||
self._home = None | ||
self._raw_command = None | ||
|
||
def __str__(self): | ||
return "<{cls} _home={home} _raw_command={raw}>".format( | ||
cls=self.__class__.__name__, | ||
home=self._home, | ||
raw=self._raw_command | ||
) | ||
|
||
def _setup(self): | ||
"""Prepare GPG environment. | ||
Returns (bool): | ||
`True` if public GPG keys were imported into temporary environment, | ||
`False` if there was an error. | ||
""" | ||
self._home = tempfile.mkdtemp() | ||
|
||
logger.debug("setting up gpg in the temporary environment") | ||
for key in self.keys: | ||
result = self._run(["--import", key]) | ||
if not result.ok: | ||
logger.debug("failed to import key '{key}': {result}".format( | ||
key=key, result=result | ||
)) | ||
return False | ||
|
||
return True | ||
|
||
def _cleanup(self): | ||
"""Clean up GPG environment.""" | ||
# Try to delete the temporary directory GPG used. As discovered by the | ||
# convert2rhel team, we need to handle race conditions: | ||
# https://github.com/oamg/convert2rhel/blob/23aadbf0df58c79a8910847d345fcd4092f4656f/convert2rhel/utils.py#L867-L893 | ||
|
||
# GPG writes a temporary socket file for the gpg-agent into the home | ||
# directory. Sometimes it removes the socket file after `rmtree()` has | ||
# determined it should be deleted but before the actual deletion occurs. | ||
# This will cause a FileNotFoundError/OSError. | ||
# When we encounter that, try to run `rmtree()` again. | ||
for _ in range(0, 5): | ||
try: | ||
shutil.rmtree(self._home) | ||
except OSError as exc: | ||
if exc.errno == errno.ENOENT: | ||
# We are trying to remove a file that has already been | ||
# removed by gpg-agent itself. Ignore it. | ||
continue | ||
# Some other error has happened, let it bubble up. | ||
raise | ||
else: | ||
# We have successfully removed everything. This `break` will | ||
# prevent the log statement below from being run. | ||
break | ||
else: | ||
# We called `rmtree()` five times and it failed each time. We cannot | ||
# do more without knowing more. | ||
logger.debug( | ||
"could not clean up temporary gpg directory " | ||
"'{path}'".format(path=self._home) | ||
) | ||
|
||
def _run(self, command): | ||
"""Run the actual command. | ||
Returns (CommandResult): The result of the shell command. | ||
""" | ||
self._raw_command = ["/usr/bin/gpg", "--homedir", self._home] + command | ||
process = subprocess.Popen( | ||
self._raw_command, | ||
stdout=subprocess.PIPE, stderr=subprocess.PIPE, | ||
) | ||
stdout, stderr = process.communicate() | ||
|
||
result = GPGCommandResult( | ||
ok=process.returncode == 0, | ||
return_code=process.returncode, | ||
stdout=stdout.decode("utf-8"), | ||
stderr=stderr.decode("utf-8"), | ||
command=self, | ||
) | ||
|
||
if result.ok: | ||
logger.debug("gpg command {command}: ok".format(command=command)) | ||
else: | ||
logger.debug( | ||
"gpg command {command} returned non-zero code: {result}".format( | ||
command=command, result=result, | ||
) | ||
) | ||
|
||
return result | ||
|
||
def evaluate(self): | ||
"""Run the command. | ||
Returns (CommandResult): The result of the shell command. | ||
""" | ||
try: | ||
ok = self._setup() | ||
if not ok: | ||
return | ||
|
||
logger.debug("running gpg in the temporary environment") | ||
return self._run(self.command) | ||
finally: | ||
self._cleanup() | ||
|
||
|
||
def verify_gpg_signed_file(file, signature, keys): | ||
""" | ||
Verify a file that was signed using GPG. | ||
Parameters: | ||
file (str): A path to the signed file. | ||
signature (str): A path to the detached signature. | ||
keys (list(str)): | ||
List of paths to GPG public keys on the filesystem to check against. | ||
Returns (CommandResult): Evaluated GPG command. | ||
""" | ||
if not os.path.isfile(file): | ||
logger.debug( | ||
"cannot verify signature of '{file}', file does not exist".format( | ||
file=file | ||
) | ||
) | ||
raise OSError(errno.ENOENT, "File '{file}' not found".format(file=file)) | ||
|
||
if not os.path.isfile(signature): | ||
logger.debug(( | ||
"cannot verify signature of '{file}', " | ||
"signature '{signature}' does not exist" | ||
).format(file=file, signature=signature)) | ||
raise OSError( | ||
errno.ENOENT, | ||
"Signature '{sig}' of file '{file}' not found.".format( | ||
sig=signature, file=file | ||
) | ||
) | ||
|
||
gpg = GPGCommand(["--verify", signature, file], keys) | ||
logger.debug("starting gpg verification process for '{file}'".format(file=file)) | ||
|
||
result = gpg.evaluate() | ||
|
||
if result.ok: | ||
logger.debug("signature verification of '{file}' passed".format(file=file)) | ||
else: | ||
logger.debug("signature verification of '{file}' failed".format(file=file)) | ||
|
||
return result |
Oops, something went wrong.