Skip to content

Commit

Permalink
Merge pull request #419 from DataDog/ikretz/fix/local-checking
Browse files Browse the repository at this point in the history
Simplify local target checks
  • Loading branch information
ikretz authored Jul 18, 2024
2 parents 404f3e6 + 854d8f5 commit 104e883
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 114 deletions.
51 changes: 11 additions & 40 deletions guarddog/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Includes rules based on package registry metadata and source code analysis.
"""

from functools import reduce
import json as js
import logging
import os
Expand All @@ -20,7 +21,6 @@
from guarddog.reporters.sarif import report_verify_sarif
from guarddog.scanners import get_scanner
from guarddog.scanners.scanner import PackageScanner
from functools import reduce

EXIT_CODE_ISSUES_FOUND = 1

Expand Down Expand Up @@ -190,32 +190,6 @@ def display_result(result: dict) -> None:
return return_value # this is mostly for testing


def is_local_target(identifier: str) -> bool:
"""
@param identifier: The name/path of the package as passed to "guarddog ecosystem scan"
@return: Whether the identifier should be considered a local path
"""
if (
identifier.startswith("/")
or identifier.startswith("./")
or identifier.startswith("../")
):
return True

if identifier == ".":
return True

# If this looks like an archive, consider it as a local target if the target exists on the local filesystem
if (
identifier.endswith(".tar.gz")
or identifier.endswith(".zip")
or identifier.endswith(".whl")
):
return os.path.exists(identifier)

return False


def _scan(
identifier,
version,
Expand All @@ -240,20 +214,17 @@ def _scan(
sys.exit(1)

results = []
if is_local_target(identifier):
log.debug(
f"Considering that '{identifier}' is a local target, scanning filesystem"
)
if os.path.isdir(identifier):
log.debug(f"Considering that '{identifier}' as a local directory")
for package in os.listdir(identifier):
result = scanner.scan_local(f"{identifier}/{package}", rule_param)
result["package"] = package
results.append(result)
else:
result = scanner.scan_local(identifier, rule_param)
result["package"] = identifier
if os.path.isdir(identifier):
log.debug(f"Considering that '{identifier}' is a local directory")
for package in os.listdir(identifier):
result = scanner.scan_local(f"{identifier}/{package}", rule_param)
result["package"] = package
results.append(result)
elif os.path.isfile(identifier):
log.debug(f"Considering that '{identifier}' is a local file")
result = scanner.scan_local(identifier, rule_param)
result["package"] = identifier
results.append(result)
else:
log.debug(f"Considering that '{identifier}' is a remote target")
try:
Expand Down
20 changes: 8 additions & 12 deletions guarddog/scanners/pypi_package_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from guarddog.analyzer.analyzer import Analyzer
from guarddog.ecosystems import ECOSYSTEM
from guarddog.scanners.scanner import PackageScanner
from guarddog.utils.archives import is_supported_archive
from guarddog.utils.package_info import get_package_info


Expand Down Expand Up @@ -42,25 +43,20 @@ def download_package(self, package_name, directory, version=None) -> str:
raise Exception(f"Version {version} for package {package_name} doesn't exist.")

files = releases[version]
url = None
file_extension = None
url, file_extension = None, None

for file in files:
# Store url to compressed package and appropriate file extension
if file["filename"].endswith(".tar.gz"):
if is_supported_archive(file["filename"]):
url = file["url"]
file_extension = ".tar.gz"
_, file_extension = os.path.splitext(file["filename"])
break

if any(file["filename"].endswith(ext) for ext in (".egg", ".whl", ".zip")):
url = file["url"]
file_extension = ".zip"

if not (url or file_extension):
if not (url and file_extension):
raise Exception(f"Compressed file for {package_name} does not exist on PyPI.")

# Path to compressed package
zippath = os.path.join(directory, package_name + file_extension)
unzippedpath = zippath.removesuffix(file_extension)

unzippedpath = os.path.join(directory, package_name)
self.download_compressed(url, zippath, unzippedpath)

return unzippedpath
28 changes: 12 additions & 16 deletions guarddog/scanners/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,35 +233,31 @@ def scan_local(
Args:
path (str): path to package
rules (set, optional): Set of rule names to use. Defaults to all rules.
callback (typing.Callable[[dict], None], optional): Callback to apply to Analyzer output
Raises:
Exception: Analyzer exception
Returns:
dict: Analyzer output with rules to results mapping
rules: rules to apply
callback: callback to call for each result
"""

if rules is not None:
rules = set(rules)

if not os.path.exists(path):
raise Exception(f"Path {path} does not exist.")

if any(path.endswith(ext) for ext in (".tar.gz", ".tgz", ".zip", ".whl")):
with tempfile.TemporaryDirectory() as tmpdirname:
safe_extract(path, tmpdirname)
return self.analyzer.analyze_sourcecode(
tmpdirname, rules=rules
)

results = None
if os.path.isdir(path):
return self.analyzer.analyze_sourcecode(path, rules=rules)
results = self.analyzer.analyze_sourcecode(path, rules=rules)
elif os.path.isfile(path):
with tempfile.TemporaryDirectory() as tempdir:
safe_extract(path, tempdir)
results = self.analyzer.analyze_sourcecode(tempdir, rules=rules)
else:
raise Exception(f"Local scan target {path} is neither a directory nor a file.")

raise Exception(
f"Path {path} is not a directory nor an archive type supported by GuardDog."
)
callback(results)

return results

@abstractmethod
def download_and_get_package_info(
Expand Down
56 changes: 50 additions & 6 deletions guarddog/utils/archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,68 @@
log = logging.getLogger("guarddog")


def is_supported_archive(path: str) -> bool:
"""
Decide whether a file contains a supported archive.
Args:
path (str): The local filesystem path to examine
Returns:
bool: Represents the decision reached for the file
"""
return is_tar_archive(path) or is_zip_archive(path)


def is_tar_archive(path: str) -> bool:
"""
Decide whether a file contains a tar archive.
Args:
path (str): The local filesystem path to examine
Returns:
bool: Represents the decision reached for the file
"""
return any(path.endswith(ext) for ext in [".tar.gz", ".tgz"])


def is_zip_archive(path: str) -> bool:
"""
Decide whether a file contains a zip, whl or egg archive.
Args:
path (str): The local filesystem path to examine
Returns:
bool: Represents the decision reached for the file
"""
return any(path.endswith(ext) for ext in [".zip", ".whl", ".egg"])


def safe_extract(source_archive: str, target_directory: str) -> None:
"""
safe_extract safely extracts archives to a target directory.
This function does not clean up the original archive, and does not create the target directory if it does not exist.
This function does not clean up the original archive and does not
create the target directory if it does not exist. It also assumes
the source archive argument is a path to a regular file on the
local filesystem.
@param source_archive: The archive to extract
@param target_directory: The directory where to extract the archive to
@raise ValueError If the archive type is unsupported
"""
log.debug(f"Extracting archive {source_archive} to directory {target_directory}")
if source_archive.endswith('.tar.gz') or source_archive.endswith('.tgz'):
if is_tar_archive(source_archive):
tarsafe.open(source_archive).extractall(target_directory)
elif source_archive.endswith('.zip') or source_archive.endswith('.whl'):
elif is_zip_archive(source_archive):
with zipfile.ZipFile(source_archive, 'r') as zip:
for file in zip.namelist():
# Note: zip.extract cleans up any malicious file name such as directory traversal attempts
# This is not the case of zipfile.extractall
# Note: zip.extract cleans up any malicious file name
# such as directory traversal attempts This is not the
# case of zipfile.extractall
zip.extract(file, path=os.path.join(target_directory, file))
else:
raise ValueError("unsupported archive extension: " + target_directory)
raise ValueError(f"unsupported archive extension: {source_archive}")
Loading

0 comments on commit 104e883

Please sign in to comment.