From 264c1d6866a0dce6dfc29523e82b2885c0f3b051 Mon Sep 17 00:00:00 2001 From: Caroline Russell Date: Fri, 28 Jun 2024 00:11:27 -0400 Subject: [PATCH] Feat: Add check-reachable command. (#51) Signed-off-by: Caroline Russell --- README.md | 69 +++++++++++++---- atom_tools/__init__.py | 2 +- atom_tools/cli/application.py | 1 + atom_tools/cli/commands/check_reachable.py | 64 ++++++++++++++++ atom_tools/cli/commands/filter.py | 7 ++ atom_tools/cli/commands/query_endpoints.py | 2 +- atom_tools/lib/filtering.py | 87 ++++++++++++++++++++-- atom_tools/lib/regex_utils.py | 6 ++ atom_tools/lib/slices.py | 11 ++- atom_tools/lib/utils.py | 11 +++ pyproject.toml | 2 +- test/test_filtering.py | 48 +++++++++--- 12 files changed, 271 insertions(+), 39 deletions(-) create mode 100644 atom_tools/cli/commands/check_reachable.py diff --git a/README.md b/README.md index 7927f0c..1dc1eff 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ e.g. `atom-tools help convert`). ``` -Atom Tools (version 0.5.0) +Atom Tools (version 0.6.0) Usage: command [options] [arguments] @@ -44,11 +44,13 @@ Options: -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug. Available commands: - convert Convert an atom slice to a different format. - filter Filter an atom slice based on specified criteria. - help Displays help for a command. - list Lists commands. - validate-lines Check the accuracy of the line numbers in an atom slice. + check-reachable Find out if there are hits for a given package:version or file:linenumber in an atom slice. + convert Convert an atom slice to a different format. + filter Filter an atom slice based on specified criteria. + help Displays help for a command. + list Lists commands. + query-endpoints List elements to display in the console. + validate-lines Check the accuracy of the line numbers in an atom slice. ``` ## Features @@ -134,6 +136,7 @@ This would be equivalent to ##### Available attributes (not case-sensitive): +*For usages slices* - callName - fileName - fullName @@ -141,14 +144,21 @@ This would be equivalent to - resolvedMethod - signature -| attribute | locations | -|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------| -| callName | objectSlices.usages.argToCalls, objectSlices.usages.invokedCalls, userDefinedTypes.procedures, | -| fileName | objectSlices, userDefinedTypes | | -| fullName | objectSlices | -| name | objectSlices.usages.targetObj, objectSlices.usages.definedBy, userDefinedTypes.fields | -| resolvedMethod | objectSlices.usages.targetObj, objectSlices.usages.definedBy, objectSlices.usages.argToCalls, objectSlices.usages.invokedCalls, userDefinedTypes.procedures | -| signature | objectSlices | +| attribute | locations searched | reachables locations | +|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------| +| callName | objectSlices.usages.argToCalls
objectSlices.usages.invokedCalls
userDefinedTypes.procedures, | | +| fileName | objectSlices
userDefinedTypes | | | +| fullName | objectSlices | | +| name | objectSlices.usages.targetObj
objectSlices.usages.definedBy
userDefinedTypes.fields | | +| purl | | reachables.purls
reachables.flows.tags | +| resolvedMethod | objectSlices.usages.targetObj
objectSlices.usages.definedBy
objectSlices.usages.argToCalls
objectSlices.usages.invokedCalls
userDefinedTypes.procedures | | +| signature | objectSlices | | | | + +#### Searching reachables for package name/version + +This option filters reachables to the given package name and version in the format of name:version + +`--package mypackage:1.0.0` #### Criteria syntax @@ -237,6 +247,37 @@ Query using filter command to target by both filename and line number range `filter -i usages.slices -t js -c filename=server.ts -e "query-endpoints -f 50-70"` +### Check Reachable + +The check-reachable command takes either a package:version or filename:line_number/line_number_range + +`check-reachable -i reachable_slice.json -p colors:1.0.0` +`check-reachable -i reachable_slice.json -p @colors/colors:1.0.0` +`check-reachable -i reachable_slice.json -l file:20` +`check-reachable -i reachable_slice.json -l file:20-40` + +``` +Description: + Find out if there are hits for a given package:version or file:linenumber in an atom slice. + +Usage: + check-reachable [options] + +Options: + -i, --input-slice=INPUT-SLICE Slice file + -p, --pkg=PKG Package to search for in the format of : + -l, --location=LOCATION Filename with line number to search for in the format of : + -h, --help Display help for the given command. When no command is given display help for the list command. + -q, --quiet Do not output any message. + -V, --version Display this application version. + --ansi Force ANSI output. + --no-ansi Disable ANSI output. + -n, --no-interaction Do not ask any interactive question. + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug. + +Help: + The check-reachables command checks for reachable flows for a package:version or file:linenumber in an atom slice. +``` ### Validate Lines diff --git a/atom_tools/__init__.py b/atom_tools/__init__.py index 5ad6ea9..3924feb 100644 --- a/atom_tools/__init__.py +++ b/atom_tools/__init__.py @@ -1,4 +1,4 @@ """ A cli, classes and functions for converting an atom slice to a different format """ -__version__ = '0.5.5' +__version__ = '0.6.0' diff --git a/atom_tools/cli/application.py b/atom_tools/cli/application.py index e15951a..b46184a 100644 --- a/atom_tools/cli/application.py +++ b/atom_tools/cli/application.py @@ -55,6 +55,7 @@ def _load() -> Command: 'convert', 'filter', 'query-endpoints', + 'check-reachable', 'validate-lines', ] diff --git a/atom_tools/cli/commands/check_reachable.py b/atom_tools/cli/commands/check_reachable.py new file mode 100644 index 0000000..0c5a96b --- /dev/null +++ b/atom_tools/cli/commands/check_reachable.py @@ -0,0 +1,64 @@ +# pylint: disable=R0801 +"""Query Reachables Command for the atom-tools CLI.""" +import logging + +from cleo.helpers import option + +from atom_tools.cli.commands.command import Command +from atom_tools.lib.slices import AtomSlice +from atom_tools.lib.utils import check_reachable + + +logger = logging.getLogger(__name__) + + +class CheckReachableCommand(Command): + """ + This command handles the conversion of an atom slice to a specified + destination format. + + Attributes: + name (str): The name of the command. + description (str): The description of the command. + options (list): The list of options for the command. + help (str): The help message for the command. + + Methods: + handle: Executes the command and performs the conversion. + """ + + name = 'check-reachable' + description = ('Find out if there are hits for a given package:version or file:linenumber in ' + 'an atom slice.') + options = [ + option( + 'input-slice', + 'i', + 'Slice file', + flag=False, + value_required=True, + ), + option( + 'pkg', + 'p', + 'Package to search for in the format of :', + flag=False, + ), + option( + 'location', + 'l', + 'Filename with line number to search for in the format of :', + flag=False, + ), + ] + help = """Checks for reachable flows for a pkg:version or file:linenumber in an atom slice.""" + + loggers = ['atom_tools.lib.filtering', 'atom_tools.lib.regex_utils', 'atom_tools.lib.slices', + 'atom_tools.lib.utils'] + + def handle(self): + """ + Executes the query command and performs the search. + """ + atom_slice = AtomSlice(self.option('input-slice')) + print(check_reachable(atom_slice.content, self.option('pkg'), self.option('location'))) diff --git a/atom_tools/cli/commands/filter.py b/atom_tools/cli/commands/filter.py index d4fe605..79a62db 100644 --- a/atom_tools/cli/commands/filter.py +++ b/atom_tools/cli/commands/filter.py @@ -40,6 +40,13 @@ class FilterCommand(Command): 'expression. Please see documentation for syntax.', flag=False, ), + option( + 'package-version', + 'p', + description='Filter a reachables slice based on a package name and version in format ' + 'package:version. May include multiple separated by a comma.', + flag=False, + ), option( 'outfile', 'o', diff --git a/atom_tools/cli/commands/query_endpoints.py b/atom_tools/cli/commands/query_endpoints.py index 11768d3..2176710 100644 --- a/atom_tools/cli/commands/query_endpoints.py +++ b/atom_tools/cli/commands/query_endpoints.py @@ -1,5 +1,5 @@ # pylint: disable=R0801 -"""Query Command for the atom-tools CLI.""" +"""Query Endpoints Command for the atom-tools CLI.""" import logging from cleo.helpers import option diff --git a/atom_tools/lib/filtering.py b/atom_tools/lib/filtering.py index c257c5f..1343369 100644 --- a/atom_tools/lib/filtering.py +++ b/atom_tools/lib/filtering.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -filtering = FilteringPatternCollection() +patterns = FilteringPatternCollection() @dataclass @@ -62,7 +62,7 @@ def filter_slice(self) -> Dict: if self.slc.slice_type == 'usages': return self.filter_usages() if self.slc.slice_type == 'reachables': - return self.filter_reachables() + return self.filter_usages() raise ValueError(f'Unknown slice type: {self.slc.slice_type}') def filter_usages(self) -> Dict: @@ -114,10 +114,10 @@ def _process_slice_indexes(self) -> Dict: include_indexes = set() exclude_indexes = set() for k in self.results: - if matched := filtering.top_level_flat_loc_index.search(k): + if matched := patterns.top_level_flat_loc_index.search(k): include_indexes.add(matched) for k in self.negative_results: - if matched := filtering.top_level_flat_loc_index.search(k): + if matched := patterns.top_level_flat_loc_index.search(k): exclude_indexes.add(matched) return self._exclude_indexes(include_indexes, exclude_indexes) @@ -152,13 +152,19 @@ def _search_values_fuzzy(self, f: AttributeFilter) -> None: self._process_fuzzy_results(f, result) +def check_reachable_purl(data: Dict, purl: str) -> bool: + """Checks if purl is reachable""" + purls = enumerate_reachable_purls(data) + return purl.lower() in purls + + def create_attribute_filter(key: str, value: str, fuzz_pct: int | None) -> Tuple: """Create an attribute filter""" lns = () fn_only = False - if key.lower() == 'filename' and '/' not in value and '\\' not in value: + if (key.lower() in {'filename', 'parentfilename'}) and '/' not in value and '\\' not in value: fn_only = True - if ':' in value and (match := filtering.attribute_and_line.search(value)): + if ':' in value and (match := patterns.attribute_and_line.search(value)): value = match.group('attrib') lns = get_ln_range(match.group('line_nums')) if fuzz_pct: @@ -170,6 +176,40 @@ def create_attribute_filter(key: str, value: str, fuzz_pct: int | None) -> Tuple return new_value, lns, fn_only +def create_purl_map(data: Dict) -> Dict: + """Map purls to package:version strings""" + purls = set(patterns.jmespath_purls.search(data)) + purl_dict = {} + for purl in purls: + formatted_purls = parse_purl(purl) + for p in formatted_purls: + purl_dict[p] = purl + return purl_dict + + +def enumerate_reachable_purls(data: Dict) -> Set[str]: + """Enumerate reachable purls""" + all_purls = set(patterns.jmespath_purls.search(data)) + purls = [] + for purl in all_purls: + purls.extend(parse_purl(purl)) + return set(purls) + + +def filter_flows(reachables: List[Dict], filename: str, ln: Tuple[int, int]) -> bool: + """Filters flows""" + if not reachables: + return False + for flows in reachables: + for f in flows.get('flows', []): + num = f.get('lineNumber') + if num and num not in ln: + continue + if f.get('parentFileName').endswith(filename): + return True + return False + + def get_ln_range(value: str) -> Tuple[int, int] | Tuple: """ Extracts line numbers from arguments and returns a tuple of (start, end) @@ -195,3 +235,38 @@ def parse_filters(filter_options: str) -> Generator[Tuple[str, str, str], None, if condition == '=': condition = '==' yield target, value, condition + + +def parse_purl_pkgs(match: re.Match) -> List[str]: + """Extract package and version variations from purl""" + pkgs = [match.group('p1')] + pkgs.append(match.group('p2')) + pkgs = list(set(pkgs)) + for i, p in enumerate(pkgs): + pkgs[i] = p.replace('pypi/', '').replace('npm/', '').replace('%40', '@') # type: ignore + return pkgs + + +def parse_purl_versions(match: re.Match) -> List[str]: + """Returns a list of version variations from a purl""" + versions = {match.group('v1')} + versions.add(match.group('v2')) + if match.group('ext'): + versions.add(f"{match.group('v1')}{match.group('ext')}") + versions.add(f"{match.group('v2')}{match.group('ext')}") + return list(versions) + + +def parse_purl(purl: str) -> List[str]: + """Returns a list of permutations of pkg:version from a purl""" + purl = patterns.purl_trailing_version.sub('', purl) + result: List[str] = [] + pkgs: List[str] = [] + versions: List[str] = [] + if match := patterns.purl_version.search(purl): + versions = parse_purl_versions(match) + if match := patterns.purl_pkg.search(purl): + pkgs = parse_purl_pkgs(match) + for i in pkgs: + result.extend(f"{i}:{j}" for j in versions) + return list(set(result)) diff --git a/atom_tools/lib/regex_utils.py b/atom_tools/lib/regex_utils.py index e496a1e..5c4b285 100644 --- a/atom_tools/lib/regex_utils.py +++ b/atom_tools/lib/regex_utils.py @@ -6,6 +6,7 @@ from dataclasses import dataclass from typing import Tuple, List, Dict, Any +import jmespath logger: logging.Logger = logging.getLogger(__name__) @@ -71,6 +72,11 @@ class FilteringPatternCollection: '{objectSlices: objectSlices[?ATTRIBUTECONDITION`TARGET_VALUE`], ' 'userDefinedTypes: userDefinedTypes[?ATTRIBUTECONDITION`TARGET_VALUE`]}' ) + jmespath_purls = jmespath.compile('reachables[].purls[]') + purl_pkg = re.compile(r'(?P[^/:]+/(?P[^/]+))(?:(?:.|/)v\d+)?(?=@)') + purl_trailing_version = re.compile(r'(?:.|/)v\d+(?=@)') + purl_version = re.compile(r'(?<=@)(?Pv?(?P[\d.]+){1,3})(?P[^?\s]+)?') + filename = re.compile(r'[^/]+(?!/)') def py_helper(endpoint: str, regex: OpenAPIRegexCollection) -> Tuple[str, List[Dict]]: diff --git a/atom_tools/lib/slices.py b/atom_tools/lib/slices.py index 5ef321d..4eb2b78 100644 --- a/atom_tools/lib/slices.py +++ b/atom_tools/lib/slices.py @@ -11,11 +11,14 @@ import json_flatten # type: ignore +from atom_tools.lib.regex_utils import FilteringPatternCollection + logger = logging.getLogger(__name__) +patterns = FilteringPatternCollection() -def create_flattened_dicts(data: Dict) -> Dict[str, Dict]: +def create_attrib_dicts(data: Dict) -> Dict[str, Dict]: """Creates a flattened slice and individual attribute dictionaries.""" attributes: Dict[str, Dict] = { 'filename': {}, @@ -27,7 +30,7 @@ def create_flattened_dicts(data: Dict) -> Dict[str, Dict]: } for k, v in data.items(): - if 'fileName' in k: + if 'fileName' in k or 'parentFileName' in k: attributes['filename'] = process_attrib_dict(attributes['filename'], k, v) elif 'fullName' in k: attributes['fullname'] = process_attrib_dict(attributes['fullname'], k, v) @@ -48,7 +51,7 @@ def import_flat_slice(content: Dict) -> Dict[str, Dict]: Import a slice from a JSON file. Args: - filename (str): The path to the JSON file. + content (dict): The contents of the JSON file Returns: tuple[dict, str]: The contents of the JSON file and the type of slice @@ -62,7 +65,7 @@ def import_flat_slice(content: Dict) -> Dict[str, Dict]: If the JSON file is not a valid slice, a warning is logged. """ content = json_flatten.flatten(content) - return create_flattened_dicts(content) + return create_attrib_dicts(content) def import_slice(filename: str | Path) -> Tuple[Dict, str, str]: diff --git a/atom_tools/lib/utils.py b/atom_tools/lib/utils.py index 021d129..d306d21 100644 --- a/atom_tools/lib/utils.py +++ b/atom_tools/lib/utils.py @@ -5,6 +5,8 @@ from pathlib import Path from typing import Dict, List, Tuple +from atom_tools.lib.filtering import check_reachable_purl, filter_flows, get_ln_range + logger = logging.getLogger(__name__) @@ -29,6 +31,15 @@ def add_params_to_cmd(cmd: str, outfile: str, origin_type: str = '') -> Tuple[st return cmd, args +def check_reachable(data: Dict, pkg: str, loc: str) -> bool: + """Checks if package is reachable""" + if pkg: + return check_reachable_purl(data, pkg) + if match := re.search(r'(?P[^/]+(?[\d-]+)', loc): + return filter_flows(data.get('reachables', []), match['file'], get_ln_range(match['line'])) + raise ValueError(f'Invalid location: {loc}') + + def export_json(data: Dict, outfile: str, indent: int | None = None) -> None: """Exports data to json""" with open(outfile, 'w', encoding='utf-8') as f: diff --git a/pyproject.toml b/pyproject.toml index bdbe77a..3314f2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "atom-tools" -version = "0.5.5" +version = "0.6.0" description = "Collection of tools for use with AppThreat/atom." authors = [ { name = "Caroline Russell", email = "caroline@appthreat.dev" }, diff --git a/test/test_filtering.py b/test/test_filtering.py index 111998b..09fe49f 100644 --- a/test/test_filtering.py +++ b/test/test_filtering.py @@ -1,26 +1,33 @@ import pytest -from atom_tools.lib.filtering import Filter, parse_filters -from atom_tools.lib.utils import sort_dict +from atom_tools.lib.filtering import check_reachable_purl, Filter, filter_flows, parse_filters +from atom_tools.lib.slices import AtomSlice +from atom_tools.lib.utils import check_reachable, sort_dict @pytest.fixture def java_usages_1(): - return Filter('test/data/java-piggymetrics-usages.json', 'outfile.json', '90') + filter_obj = Filter('test/data/java-piggymetrics-usages.json', 'outfile.json', '90') + filter_obj.add_filters(parse_filters('callName=testFilterQuery')) + return filter_obj + @pytest.fixture def java_usages_2(): - return Filter('test/data/java-sec-code-usages.json', 'outfile.json', '90') + filter_obj = Filter('test/data/java-sec-code-usages.json', 'outfile.json', '90') + filter_obj.add_filters(parse_filters('fileName=test/file/name.java')) + return filter_obj @pytest.fixture def js_usages_1(): - return Filter('test/data/js-juiceshop-usages.json', 'outfile.json', '90') + filter_obj = Filter('test/data/js-juiceshop-usages.json', 'outfile.json', '90') + filter_obj.add_filters(parse_filters('signature=@Pipe')) + return filter_obj def test_attribute_filter_class(java_usages_1, js_usages_1, java_usages_2): - java_usages_1.add_filters(parse_filters('callName=testFilterQuery')) - assert java_usages_1.filter_slice() == {'objectSlices': [{'code': '', + assert sort_dict(java_usages_1.filter_slice()) == {'objectSlices': [{'code': '', 'columnNumber': 20, 'fileName': 'account-service/src/main/java/com/piggymetrics/account/AccountApplication.java', 'fullName': 'com.piggymetrics.account.AccountApplication.:void()', @@ -52,10 +59,7 @@ def test_attribute_filter_class(java_usages_1, js_usages_1, java_usages_2): 'name': '', 'typeFullName': 'ANY'}}]}], 'userDefinedTypes': []} - js_usages_1.add_filters(parse_filters('signature=@Pipe')) - result = js_usages_1.filter_slice() - result = sort_dict(result) - assert result == { + assert sort_dict(js_usages_1.filter_slice()) == { 'objectSlices': [{'code': "@Pipe({name:'challengeHint',pure:false})", 'columnNumber': 0, 'fileName': 'frontend/src/app/score-board/pipes/challenge-hint.pipe.ts', @@ -71,7 +75,7 @@ def test_attribute_filter_class(java_usages_1, js_usages_1, java_usages_2): 'signature': '@Pipe', 'usages': []}], 'userDefinedTypes': []} - java_usages_2.add_filters(parse_filters('fileName=test/file/name.java')) + result = java_usages_2.filter_slice() result = sort_dict(result) assert result == {'objectSlices': [{'code': '', @@ -93,3 +97,23 @@ def test_attribute_filter_class(java_usages_1, js_usages_1, java_usages_2): 'name': '', 'typeFullName': 'ANY'}}]}], 'userDefinedTypes': []} + + +def test_check_reachable(): + atom_slice = AtomSlice('test/data/js-juiceshop-reachables.json') + + # Test package:version + assert check_reachable(atom_slice.content, 'colors:1.6.0', '') == True + assert check_reachable(atom_slice.content, 'colors:1.9.0', '') == False + assert check_reachable(atom_slice.content, '@colors/colors:1.6.0', '') == True + assert check_reachable(atom_slice.content, '@colors/colors:1.9.0', '') == False + + # Test filename:linenumber + assert check_reachable(atom_slice.content, '', 'routes/updateUserProfile.ts:29') == True + assert check_reachable(atom_slice.content, '', 'updateUserProfile.ts:29') == True + assert check_reachable(atom_slice.content, '', 'routes/updateUserProfile.ts:25-30') == True + assert check_reachable(atom_slice.content, '', 'updateUserProfile.ts:25-30') == True + assert check_reachable(atom_slice.content, '', 'routes/updateUserProfile.ts:400') == False + assert check_reachable(atom_slice.content, '', 'updateUserProfile.ts:400') == False + assert check_reachable(atom_slice.content, '', 'routes/updateUserProfile.ts:400-600') == False + assert check_reachable(atom_slice.content, '', 'updateUserProfile.ts:400-600') == False