diff --git a/README.md b/README.md
index 7927f0c..1dc1eff 100644
--- a/README.md
+++ b/README.md
@@ -29,7 +29,7 @@ e.g. `atom-tools help
convert`).
```
-Atom Tools (version 0.5.0)
+Atom Tools (version 0.6.0)
Usage:
command [options] [arguments]
@@ -44,11 +44,13 @@ Options:
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug.
Available commands:
- convert Convert an atom slice to a different format.
- filter Filter an atom slice based on specified criteria.
- help Displays help for a command.
- list Lists commands.
- validate-lines Check the accuracy of the line numbers in an atom slice.
+ check-reachable Find out if there are hits for a given package:version or file:linenumber in an atom slice.
+ convert Convert an atom slice to a different format.
+ filter Filter an atom slice based on specified criteria.
+ help Displays help for a command.
+ list Lists commands.
+ query-endpoints List elements to display in the console.
+ validate-lines Check the accuracy of the line numbers in an atom slice.
```
## Features
@@ -134,6 +136,7 @@ This would be equivalent to
##### Available attributes (not case-sensitive):
+*For usages slices*
- callName
- fileName
- fullName
@@ -141,14 +144,21 @@ This would be equivalent to
- resolvedMethod
- signature
-| attribute | locations |
-|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| callName | objectSlices.usages.argToCalls, objectSlices.usages.invokedCalls, userDefinedTypes.procedures, |
-| fileName | objectSlices, userDefinedTypes | |
-| fullName | objectSlices |
-| name | objectSlices.usages.targetObj, objectSlices.usages.definedBy, userDefinedTypes.fields |
-| resolvedMethod | objectSlices.usages.targetObj, objectSlices.usages.definedBy, objectSlices.usages.argToCalls, objectSlices.usages.invokedCalls, userDefinedTypes.procedures |
-| signature | objectSlices |
+| attribute | locations searched | reachables locations |
+|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------|
+| callName | objectSlices.usages.argToCalls
objectSlices.usages.invokedCalls
userDefinedTypes.procedures, | |
+| fileName | objectSlices
userDefinedTypes | | |
+| fullName | objectSlices | |
+| name | objectSlices.usages.targetObj
objectSlices.usages.definedBy
userDefinedTypes.fields | |
+| purl | | reachables.purls
reachables.flows.tags |
+| resolvedMethod | objectSlices.usages.targetObj
objectSlices.usages.definedBy
objectSlices.usages.argToCalls
objectSlices.usages.invokedCalls
userDefinedTypes.procedures | |
+| signature | objectSlices | | | |
+
+#### Searching reachables for package name/version
+
+This option filters reachables to the given package name and version in the format of name:version
+
+`--package mypackage:1.0.0`
#### Criteria syntax
@@ -237,6 +247,37 @@ Query using filter command to target by both filename and line number range
`filter -i usages.slices -t js -c filename=server.ts -e "query-endpoints -f 50-70"`
+### Check Reachable
+
+The check-reachable command takes either a package:version or filename:line_number/line_number_range
+
+`check-reachable -i reachable_slice.json -p colors:1.0.0`
+`check-reachable -i reachable_slice.json -p @colors/colors:1.0.0`
+`check-reachable -i reachable_slice.json -l file:20`
+`check-reachable -i reachable_slice.json -l file:20-40`
+
+```
+Description:
+ Find out if there are hits for a given package:version or file:linenumber in an atom slice.
+
+Usage:
+ check-reachable [options]
+
+Options:
+ -i, --input-slice=INPUT-SLICE Slice file
+ -p, --pkg=PKG Package to search for in the format of :
+ -l, --location=LOCATION Filename with line number to search for in the format of :
+ -h, --help Display help for the given command. When no command is given display help for the list command.
+ -q, --quiet Do not output any message.
+ -V, --version Display this application version.
+ --ansi Force ANSI output.
+ --no-ansi Disable ANSI output.
+ -n, --no-interaction Do not ask any interactive question.
+ -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug.
+
+Help:
+ The check-reachables command checks for reachable flows for a package:version or file:linenumber in an atom slice.
+```
### Validate Lines
diff --git a/atom_tools/__init__.py b/atom_tools/__init__.py
index 5ad6ea9..3924feb 100644
--- a/atom_tools/__init__.py
+++ b/atom_tools/__init__.py
@@ -1,4 +1,4 @@
"""
A cli, classes and functions for converting an atom slice to a different format
"""
-__version__ = '0.5.5'
+__version__ = '0.6.0'
diff --git a/atom_tools/cli/application.py b/atom_tools/cli/application.py
index e15951a..b46184a 100644
--- a/atom_tools/cli/application.py
+++ b/atom_tools/cli/application.py
@@ -55,6 +55,7 @@ def _load() -> Command:
'convert',
'filter',
'query-endpoints',
+ 'check-reachable',
'validate-lines',
]
diff --git a/atom_tools/cli/commands/check_reachable.py b/atom_tools/cli/commands/check_reachable.py
new file mode 100644
index 0000000..0c5a96b
--- /dev/null
+++ b/atom_tools/cli/commands/check_reachable.py
@@ -0,0 +1,64 @@
+# pylint: disable=R0801
+"""Query Reachables Command for the atom-tools CLI."""
+import logging
+
+from cleo.helpers import option
+
+from atom_tools.cli.commands.command import Command
+from atom_tools.lib.slices import AtomSlice
+from atom_tools.lib.utils import check_reachable
+
+
+logger = logging.getLogger(__name__)
+
+
+class CheckReachableCommand(Command):
+ """
+ This command handles the conversion of an atom slice to a specified
+ destination format.
+
+ Attributes:
+ name (str): The name of the command.
+ description (str): The description of the command.
+ options (list): The list of options for the command.
+ help (str): The help message for the command.
+
+ Methods:
+ handle: Executes the command and performs the conversion.
+ """
+
+ name = 'check-reachable'
+ description = ('Find out if there are hits for a given package:version or file:linenumber in '
+ 'an atom slice.')
+ options = [
+ option(
+ 'input-slice',
+ 'i',
+ 'Slice file',
+ flag=False,
+ value_required=True,
+ ),
+ option(
+ 'pkg',
+ 'p',
+ 'Package to search for in the format of :',
+ flag=False,
+ ),
+ option(
+ 'location',
+ 'l',
+ 'Filename with line number to search for in the format of :',
+ flag=False,
+ ),
+ ]
+ help = """Checks for reachable flows for a pkg:version or file:linenumber in an atom slice."""
+
+ loggers = ['atom_tools.lib.filtering', 'atom_tools.lib.regex_utils', 'atom_tools.lib.slices',
+ 'atom_tools.lib.utils']
+
+ def handle(self):
+ """
+ Executes the query command and performs the search.
+ """
+ atom_slice = AtomSlice(self.option('input-slice'))
+ print(check_reachable(atom_slice.content, self.option('pkg'), self.option('location')))
diff --git a/atom_tools/cli/commands/filter.py b/atom_tools/cli/commands/filter.py
index d4fe605..79a62db 100644
--- a/atom_tools/cli/commands/filter.py
+++ b/atom_tools/cli/commands/filter.py
@@ -40,6 +40,13 @@ class FilterCommand(Command):
'expression. Please see documentation for syntax.',
flag=False,
),
+ option(
+ 'package-version',
+ 'p',
+ description='Filter a reachables slice based on a package name and version in format '
+ 'package:version. May include multiple separated by a comma.',
+ flag=False,
+ ),
option(
'outfile',
'o',
diff --git a/atom_tools/cli/commands/query_endpoints.py b/atom_tools/cli/commands/query_endpoints.py
index 11768d3..2176710 100644
--- a/atom_tools/cli/commands/query_endpoints.py
+++ b/atom_tools/cli/commands/query_endpoints.py
@@ -1,5 +1,5 @@
# pylint: disable=R0801
-"""Query Command for the atom-tools CLI."""
+"""Query Endpoints Command for the atom-tools CLI."""
import logging
from cleo.helpers import option
diff --git a/atom_tools/lib/filtering.py b/atom_tools/lib/filtering.py
index c257c5f..1343369 100644
--- a/atom_tools/lib/filtering.py
+++ b/atom_tools/lib/filtering.py
@@ -13,7 +13,7 @@
logger = logging.getLogger(__name__)
-filtering = FilteringPatternCollection()
+patterns = FilteringPatternCollection()
@dataclass
@@ -62,7 +62,7 @@ def filter_slice(self) -> Dict:
if self.slc.slice_type == 'usages':
return self.filter_usages()
if self.slc.slice_type == 'reachables':
- return self.filter_reachables()
+ return self.filter_usages()
raise ValueError(f'Unknown slice type: {self.slc.slice_type}')
def filter_usages(self) -> Dict:
@@ -114,10 +114,10 @@ def _process_slice_indexes(self) -> Dict:
include_indexes = set()
exclude_indexes = set()
for k in self.results:
- if matched := filtering.top_level_flat_loc_index.search(k):
+ if matched := patterns.top_level_flat_loc_index.search(k):
include_indexes.add(matched)
for k in self.negative_results:
- if matched := filtering.top_level_flat_loc_index.search(k):
+ if matched := patterns.top_level_flat_loc_index.search(k):
exclude_indexes.add(matched)
return self._exclude_indexes(include_indexes, exclude_indexes)
@@ -152,13 +152,19 @@ def _search_values_fuzzy(self, f: AttributeFilter) -> None:
self._process_fuzzy_results(f, result)
+def check_reachable_purl(data: Dict, purl: str) -> bool:
+ """Checks if purl is reachable"""
+ purls = enumerate_reachable_purls(data)
+ return purl.lower() in purls
+
+
def create_attribute_filter(key: str, value: str, fuzz_pct: int | None) -> Tuple:
"""Create an attribute filter"""
lns = ()
fn_only = False
- if key.lower() == 'filename' and '/' not in value and '\\' not in value:
+ if (key.lower() in {'filename', 'parentfilename'}) and '/' not in value and '\\' not in value:
fn_only = True
- if ':' in value and (match := filtering.attribute_and_line.search(value)):
+ if ':' in value and (match := patterns.attribute_and_line.search(value)):
value = match.group('attrib')
lns = get_ln_range(match.group('line_nums'))
if fuzz_pct:
@@ -170,6 +176,40 @@ def create_attribute_filter(key: str, value: str, fuzz_pct: int | None) -> Tuple
return new_value, lns, fn_only
+def create_purl_map(data: Dict) -> Dict:
+ """Map purls to package:version strings"""
+ purls = set(patterns.jmespath_purls.search(data))
+ purl_dict = {}
+ for purl in purls:
+ formatted_purls = parse_purl(purl)
+ for p in formatted_purls:
+ purl_dict[p] = purl
+ return purl_dict
+
+
+def enumerate_reachable_purls(data: Dict) -> Set[str]:
+ """Enumerate reachable purls"""
+ all_purls = set(patterns.jmespath_purls.search(data))
+ purls = []
+ for purl in all_purls:
+ purls.extend(parse_purl(purl))
+ return set(purls)
+
+
+def filter_flows(reachables: List[Dict], filename: str, ln: Tuple[int, int]) -> bool:
+ """Filters flows"""
+ if not reachables:
+ return False
+ for flows in reachables:
+ for f in flows.get('flows', []):
+ num = f.get('lineNumber')
+ if num and num not in ln:
+ continue
+ if f.get('parentFileName').endswith(filename):
+ return True
+ return False
+
+
def get_ln_range(value: str) -> Tuple[int, int] | Tuple:
"""
Extracts line numbers from arguments and returns a tuple of (start, end)
@@ -195,3 +235,38 @@ def parse_filters(filter_options: str) -> Generator[Tuple[str, str, str], None,
if condition == '=':
condition = '=='
yield target, value, condition
+
+
+def parse_purl_pkgs(match: re.Match) -> List[str]:
+ """Extract package and version variations from purl"""
+ pkgs = [match.group('p1')]
+ pkgs.append(match.group('p2'))
+ pkgs = list(set(pkgs))
+ for i, p in enumerate(pkgs):
+ pkgs[i] = p.replace('pypi/', '').replace('npm/', '').replace('%40', '@') # type: ignore
+ return pkgs
+
+
+def parse_purl_versions(match: re.Match) -> List[str]:
+ """Returns a list of version variations from a purl"""
+ versions = {match.group('v1')}
+ versions.add(match.group('v2'))
+ if match.group('ext'):
+ versions.add(f"{match.group('v1')}{match.group('ext')}")
+ versions.add(f"{match.group('v2')}{match.group('ext')}")
+ return list(versions)
+
+
+def parse_purl(purl: str) -> List[str]:
+ """Returns a list of permutations of pkg:version from a purl"""
+ purl = patterns.purl_trailing_version.sub('', purl)
+ result: List[str] = []
+ pkgs: List[str] = []
+ versions: List[str] = []
+ if match := patterns.purl_version.search(purl):
+ versions = parse_purl_versions(match)
+ if match := patterns.purl_pkg.search(purl):
+ pkgs = parse_purl_pkgs(match)
+ for i in pkgs:
+ result.extend(f"{i}:{j}" for j in versions)
+ return list(set(result))
diff --git a/atom_tools/lib/regex_utils.py b/atom_tools/lib/regex_utils.py
index e496a1e..5c4b285 100644
--- a/atom_tools/lib/regex_utils.py
+++ b/atom_tools/lib/regex_utils.py
@@ -6,6 +6,7 @@
from dataclasses import dataclass
from typing import Tuple, List, Dict, Any
+import jmespath
logger: logging.Logger = logging.getLogger(__name__)
@@ -71,6 +72,11 @@ class FilteringPatternCollection:
'{objectSlices: objectSlices[?ATTRIBUTECONDITION`TARGET_VALUE`], '
'userDefinedTypes: userDefinedTypes[?ATTRIBUTECONDITION`TARGET_VALUE`]}'
)
+ jmespath_purls = jmespath.compile('reachables[].purls[]')
+ purl_pkg = re.compile(r'(?P[^/:]+/(?P[^/]+))(?:(?:.|/)v\d+)?(?=@)')
+ purl_trailing_version = re.compile(r'(?:.|/)v\d+(?=@)')
+ purl_version = re.compile(r'(?<=@)(?Pv?(?P[\d.]+){1,3})(?P[^?\s]+)?')
+ filename = re.compile(r'[^/]+(?!/)')
def py_helper(endpoint: str, regex: OpenAPIRegexCollection) -> Tuple[str, List[Dict]]:
diff --git a/atom_tools/lib/slices.py b/atom_tools/lib/slices.py
index 5ef321d..4eb2b78 100644
--- a/atom_tools/lib/slices.py
+++ b/atom_tools/lib/slices.py
@@ -11,11 +11,14 @@
import json_flatten # type: ignore
+from atom_tools.lib.regex_utils import FilteringPatternCollection
+
logger = logging.getLogger(__name__)
+patterns = FilteringPatternCollection()
-def create_flattened_dicts(data: Dict) -> Dict[str, Dict]:
+def create_attrib_dicts(data: Dict) -> Dict[str, Dict]:
"""Creates a flattened slice and individual attribute dictionaries."""
attributes: Dict[str, Dict] = {
'filename': {},
@@ -27,7 +30,7 @@ def create_flattened_dicts(data: Dict) -> Dict[str, Dict]:
}
for k, v in data.items():
- if 'fileName' in k:
+ if 'fileName' in k or 'parentFileName' in k:
attributes['filename'] = process_attrib_dict(attributes['filename'], k, v)
elif 'fullName' in k:
attributes['fullname'] = process_attrib_dict(attributes['fullname'], k, v)
@@ -48,7 +51,7 @@ def import_flat_slice(content: Dict) -> Dict[str, Dict]:
Import a slice from a JSON file.
Args:
- filename (str): The path to the JSON file.
+ content (dict): The contents of the JSON file
Returns:
tuple[dict, str]: The contents of the JSON file and the type of slice
@@ -62,7 +65,7 @@ def import_flat_slice(content: Dict) -> Dict[str, Dict]:
If the JSON file is not a valid slice, a warning is logged.
"""
content = json_flatten.flatten(content)
- return create_flattened_dicts(content)
+ return create_attrib_dicts(content)
def import_slice(filename: str | Path) -> Tuple[Dict, str, str]:
diff --git a/atom_tools/lib/utils.py b/atom_tools/lib/utils.py
index 021d129..d306d21 100644
--- a/atom_tools/lib/utils.py
+++ b/atom_tools/lib/utils.py
@@ -5,6 +5,8 @@
from pathlib import Path
from typing import Dict, List, Tuple
+from atom_tools.lib.filtering import check_reachable_purl, filter_flows, get_ln_range
+
logger = logging.getLogger(__name__)
@@ -29,6 +31,15 @@ def add_params_to_cmd(cmd: str, outfile: str, origin_type: str = '') -> Tuple[st
return cmd, args
+def check_reachable(data: Dict, pkg: str, loc: str) -> bool:
+ """Checks if package is reachable"""
+ if pkg:
+ return check_reachable_purl(data, pkg)
+ if match := re.search(r'(?P[^/]+(?[\d-]+)', loc):
+ return filter_flows(data.get('reachables', []), match['file'], get_ln_range(match['line']))
+ raise ValueError(f'Invalid location: {loc}')
+
+
def export_json(data: Dict, outfile: str, indent: int | None = None) -> None:
"""Exports data to json"""
with open(outfile, 'w', encoding='utf-8') as f:
diff --git a/pyproject.toml b/pyproject.toml
index bdbe77a..3314f2f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "atom-tools"
-version = "0.5.5"
+version = "0.6.0"
description = "Collection of tools for use with AppThreat/atom."
authors = [
{ name = "Caroline Russell", email = "caroline@appthreat.dev" },
diff --git a/test/test_filtering.py b/test/test_filtering.py
index 111998b..09fe49f 100644
--- a/test/test_filtering.py
+++ b/test/test_filtering.py
@@ -1,26 +1,33 @@
import pytest
-from atom_tools.lib.filtering import Filter, parse_filters
-from atom_tools.lib.utils import sort_dict
+from atom_tools.lib.filtering import check_reachable_purl, Filter, filter_flows, parse_filters
+from atom_tools.lib.slices import AtomSlice
+from atom_tools.lib.utils import check_reachable, sort_dict
@pytest.fixture
def java_usages_1():
- return Filter('test/data/java-piggymetrics-usages.json', 'outfile.json', '90')
+ filter_obj = Filter('test/data/java-piggymetrics-usages.json', 'outfile.json', '90')
+ filter_obj.add_filters(parse_filters('callName=testFilterQuery'))
+ return filter_obj
+
@pytest.fixture
def java_usages_2():
- return Filter('test/data/java-sec-code-usages.json', 'outfile.json', '90')
+ filter_obj = Filter('test/data/java-sec-code-usages.json', 'outfile.json', '90')
+ filter_obj.add_filters(parse_filters('fileName=test/file/name.java'))
+ return filter_obj
@pytest.fixture
def js_usages_1():
- return Filter('test/data/js-juiceshop-usages.json', 'outfile.json', '90')
+ filter_obj = Filter('test/data/js-juiceshop-usages.json', 'outfile.json', '90')
+ filter_obj.add_filters(parse_filters('signature=@Pipe'))
+ return filter_obj
def test_attribute_filter_class(java_usages_1, js_usages_1, java_usages_2):
- java_usages_1.add_filters(parse_filters('callName=testFilterQuery'))
- assert java_usages_1.filter_slice() == {'objectSlices': [{'code': '',
+ assert sort_dict(java_usages_1.filter_slice()) == {'objectSlices': [{'code': '',
'columnNumber': 20,
'fileName': 'account-service/src/main/java/com/piggymetrics/account/AccountApplication.java',
'fullName': 'com.piggymetrics.account.AccountApplication.:void()',
@@ -52,10 +59,7 @@ def test_attribute_filter_class(java_usages_1, js_usages_1, java_usages_2):
'name': '',
'typeFullName': 'ANY'}}]}],
'userDefinedTypes': []}
- js_usages_1.add_filters(parse_filters('signature=@Pipe'))
- result = js_usages_1.filter_slice()
- result = sort_dict(result)
- assert result == {
+ assert sort_dict(js_usages_1.filter_slice()) == {
'objectSlices': [{'code': "@Pipe({name:'challengeHint',pure:false})",
'columnNumber': 0,
'fileName': 'frontend/src/app/score-board/pipes/challenge-hint.pipe.ts',
@@ -71,7 +75,7 @@ def test_attribute_filter_class(java_usages_1, js_usages_1, java_usages_2):
'signature': '@Pipe',
'usages': []}],
'userDefinedTypes': []}
- java_usages_2.add_filters(parse_filters('fileName=test/file/name.java'))
+
result = java_usages_2.filter_slice()
result = sort_dict(result)
assert result == {'objectSlices': [{'code': '',
@@ -93,3 +97,23 @@ def test_attribute_filter_class(java_usages_1, js_usages_1, java_usages_2):
'name': '',
'typeFullName': 'ANY'}}]}],
'userDefinedTypes': []}
+
+
+def test_check_reachable():
+ atom_slice = AtomSlice('test/data/js-juiceshop-reachables.json')
+
+ # Test package:version
+ assert check_reachable(atom_slice.content, 'colors:1.6.0', '') == True
+ assert check_reachable(atom_slice.content, 'colors:1.9.0', '') == False
+ assert check_reachable(atom_slice.content, '@colors/colors:1.6.0', '') == True
+ assert check_reachable(atom_slice.content, '@colors/colors:1.9.0', '') == False
+
+ # Test filename:linenumber
+ assert check_reachable(atom_slice.content, '', 'routes/updateUserProfile.ts:29') == True
+ assert check_reachable(atom_slice.content, '', 'updateUserProfile.ts:29') == True
+ assert check_reachable(atom_slice.content, '', 'routes/updateUserProfile.ts:25-30') == True
+ assert check_reachable(atom_slice.content, '', 'updateUserProfile.ts:25-30') == True
+ assert check_reachable(atom_slice.content, '', 'routes/updateUserProfile.ts:400') == False
+ assert check_reachable(atom_slice.content, '', 'updateUserProfile.ts:400') == False
+ assert check_reachable(atom_slice.content, '', 'routes/updateUserProfile.ts:400-600') == False
+ assert check_reachable(atom_slice.content, '', 'updateUserProfile.ts:400-600') == False