Skip to content

Commit

Permalink
Feat: Add query-endpoints command. (#43)
Browse files Browse the repository at this point in the history
Signed-off-by: Caroline Russell <[email protected]>
  • Loading branch information
cerrussell authored Apr 1, 2024
1 parent f2e061d commit 1f5e957
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 129 deletions.
219 changes: 135 additions & 84 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions atom_tools/cli/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def _load() -> Command:
COMMANDS = [
'convert',
'filter',
'query-endpoints',
'validate-lines',
]

Expand Down
4 changes: 2 additions & 2 deletions atom_tools/cli/commands/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from atom_tools.cli.commands.command import Command
from atom_tools.lib.filtering import Filter, parse_filters
from atom_tools.lib.utils import add_outfile_to_cmd, export_json
from atom_tools.lib.utils import add_params_to_cmd, export_json


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,7 +78,7 @@ def handle(self):
outfile = str(slice_file.parent / f'{slice_file.stem}_filtered{slice_file.suffix}')
cmd, args = 'export', ''
if self.option('execute') != 'export':
cmd, args = add_outfile_to_cmd(self.option('execute'), outfile)
cmd, args = add_params_to_cmd(self.option('execute'), outfile)
filter_runner = Filter(self.option('input-slice'), outfile, self.option('fuzz'))
if criteria:
filters = parse_filters(criteria)
Expand Down
80 changes: 80 additions & 0 deletions atom_tools/cli/commands/query_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# pylint: disable=R0801
"""Query Command for the atom-tools CLI."""
import logging

from cleo.helpers import option

from atom_tools.cli.commands.command import Command
from atom_tools.lib.converter import OpenAPI
from atom_tools.lib.filtering import get_ln_range
from atom_tools.lib.utils import output_endpoints


logger = logging.getLogger(__name__)


class QueryEndpointsCommand(Command):
"""
This command handles the conversion of an atom slice to a specified
destination format.
Attributes:
name (str): The name of the command.
description (str): The description of the command.
options (list): The list of options for the command.
help (str): The help message for the command.
Methods:
handle: Executes the command and performs the conversion.
"""

name = 'query-endpoints'
description = 'List elements to display in the console.'
options = [
option(
'input-slice',
'i',
'Slice file',
flag=False,
value_required=True,
),
option(
'type',
't',
'Origin type of source on which the atom slice was generated.',
flag=False,
default='java',
),
option(
'filter-lines',
'f',
'Filter endpoints by line number or range.',
flag=False,
),
option(
'sparse',
's',
'Only display names; do not include path and line numbers.',
)
]
help = """The query command can be used to return results directly to the console. """
loggers = ['atom_tools.lib.converter', 'atom_tools.lib.regex_utils', 'atom_tools.lib.slices']

def handle(self):
"""
Executes the query command and performs the conversion.
"""
supported_types = {'java', 'jar', 'python', 'py', 'javascript', 'js', 'typescript', 'ts'}
if self.option('type') not in supported_types:
raise ValueError(f'Unknown origin type: {self.option("type")}')
converter = OpenAPI(
'openapi3.1.0',
self.option('type'),
self.option('input-slice'),
)
if not (result := converter.endpoints_to_openapi('')):
logging.warning('No results produced!')
line_filter = ()
if self.option('filter-lines'):
line_filter = get_ln_range(self.option('filter-lines'))
output_endpoints(result, self.option('sparse'), line_filter)
21 changes: 8 additions & 13 deletions atom_tools/lib/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ def add_filters(self, filters: Generator) -> None:
f'{a_filter.value}')
self.attribute_filters.append(a_filter)

def filter_lns(self, data, attribute_filter):
"""Filter line numbers"""
raise NotImplementedError

def filter_reachables(self):
"""Filter reachables"""
raise NotImplementedError
Expand All @@ -75,7 +71,7 @@ def filter_usages(self) -> Dict:
if self.fuzz:
self._search_values_fuzzy(f)
else:
self._search_values(f.attribute, f.value, f.condition)
self._search_values(f)
if self.results:
return self._process_slice_indexes()
return {'objectSlices': [], 'userDefinedTypes': []}
Expand All @@ -89,9 +85,7 @@ def _exclude_indexes(self, include: Set, exclude: Set) -> Dict:
filtered_slice[i.group('type')].append(
self.slc.content[i.group('type')][int(i.group('index'))])
return filtered_slice
if exclude:
return self._handle_exclude_only(exclude)
return self.slc.content
return self._handle_exclude_only(exclude) if exclude else self.slc.content

def _handle_exclude_only(self, exclude: Set[re.Match]) -> Dict:
filtered_slice = deepcopy(self.slc.content)
Expand Down Expand Up @@ -125,13 +119,14 @@ def _process_slice_indexes(self) -> Dict:
exclude_indexes.add(matched)
return self._exclude_indexes(include_indexes, exclude_indexes)

def _search_values(self, attrib: str, value: re.Pattern, condition: str) -> None:
def _search_values(self, f: AttributeFilter) -> None:
include = []
exclude = []
for k, v in self.slc.attrib_dicts.get(attrib, {}).items():
if value.search(k):
if condition == '==':
for k, v in self.slc.attrib_dicts.get(f.attribute, {}).items():
if f.value.search(k):
if f.condition == '==':
include.extend(v)
# self.found_keys.append(k)
else:
exclude.extend(v)
self.results.extend(list(set(include)))
Expand All @@ -153,7 +148,7 @@ def create_attribute_filter(value: str, fuzz_pct: int | None) -> Tuple:
"""Create an attribute filter"""
lns = ()
if ':' in value and (match := filtering.attribute_and_line.search(value)):
value = match.group('filename')
value = match.group('attrib')
lns = get_ln_range(match.group('line_nums'))
new_value = value if fuzz_pct else re.compile(value, re.IGNORECASE)
return new_value, lns
Expand Down
58 changes: 46 additions & 12 deletions atom_tools/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,71 @@
import json
import logging
import re
from typing import Dict
from pathlib import Path
from typing import Dict, List, Tuple


logger = logging.getLogger(__name__)


def export_json(data: Dict, outfile: str, indent: int | None = None) -> None:
"""Exports data to json"""
with open(outfile, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=indent, sort_keys=True)


def add_outfile_to_cmd(cmd: str, outfile: str):
def add_params_to_cmd(cmd: str, outfile: str, origin_type: str = '') -> Tuple[str, str]:
"""
Adds the outfile to the command.
"""
# Check that the input slice has not already been specified
args = ''
if origin_type and '-t ' not in cmd and '--type' not in cmd:
cmd += f' -t {origin_type}'
if '-i ' in cmd or '--input-slice' in cmd:
cmd = cmd.replace('--input-slice ', '-i ')
logging.warning(
'Input slice specified in command to be filtered. Replacing with filtered slice.')
if match := re.search(r'((?:-i|--input-slice)\s\S+)', cmd):
cmd = cmd.replace(match[1], f'-i {outfile}')
cmd = cmd.replace(match[1], f'-i {Path(outfile)}')
else:
cmd += f' -i {outfile}'
cmd += f' -i {Path(outfile)}'
if not args:
cmd, args = cmd.split(' ', 1)
return cmd, args


def export_json(data: Dict, outfile: str, indent: int | None = None) -> None:
"""Exports data to json"""
with open(outfile, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=indent, sort_keys=True)


def output_endpoints(data: Dict, names_only: bool, line_range: Tuple[int, int] | Tuple) -> None:
"""Outputs endpoints"""
to_print = ''
for endpoint, values in data.get('paths', {}).items():
to_print += f'{endpoint}'
usages = values.get("x-atom-usages", {}).get('call', {})
if names_only:
to_print += '\n'
continue
for k, v in usages.items():
for i in v:
if line_range[0] <= i <= line_range[1]:
to_print += f':{k}:{i}'
break
to_print += '\n'
print(to_print)


def remove_duplicates_list(obj: List[Dict]) -> List[Dict]:
"""Removes duplicates from a list of dictionaries."""
if not obj:
return obj
unique_objs = []
seen = set()
for o in obj:
key = tuple(o.get(k) for k, v in o.items())
if key not in seen:
unique_objs.append(o)
seen.add(key)
return unique_objs


def sort_dict(result: Dict) -> Dict:
"""Sorts a dictionary"""
for k, v in result.items():
Expand All @@ -43,7 +77,7 @@ def sort_dict(result: Dict) -> Dict:
return result


def sort_list(lst):
def sort_list(lst: List) -> List:
"""Sorts a list"""
if not lst:
return lst
Expand Down
19 changes: 1 addition & 18 deletions atom_tools/lib/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from atom_tools.lib.slices import AtomSlice
from atom_tools.lib.regex_utils import ValidationRegexCollection
from atom_tools.lib.utils import export_json
from atom_tools.lib.utils import export_json, remove_duplicates_list

logger = logging.getLogger(__name__)
regex: ValidationRegexCollection = ValidationRegexCollection()
Expand Down Expand Up @@ -281,20 +281,6 @@ def py_validation_helper(function_name: str, code: str, line: str) -> bool:
return found


def remove_duplicates_list(obj: List[Dict]) -> List[Dict]:
"""Removes duplicates from a list of dictionaries."""
if not obj:
return obj
unique_objs = []
seen = set()
for o in obj:
key = tuple(o.get(k) for k, v in o.items())
if key not in seen:
unique_objs.append(o)
seen.add(key)
return unique_objs


@dataclass(init=True)
class LineStats:
"""
Expand Down Expand Up @@ -585,9 +571,6 @@ def _find_line(self, code: str, function_name: str, line: str) -> bool:
def _get_verbose_results(self):
"""
Add the verbose results of the line number validation.
Args:
f: The file object to write the results to.
"""
verbose_results = '\n*** INVALID ENTRIES ***\n'
for i in self.matches['unmatched']:
Expand Down
20 changes: 20 additions & 0 deletions test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from atom_tools.lib.utils import add_params_to_cmd, remove_duplicates_list


def test_add_params_to_cmd():
assert add_params_to_cmd('convert -i usages.json -f openapi3.1.0 -t java', 'test.json') == ('convert', '-i test.json -f openapi3.1.0 -t java')
assert add_params_to_cmd('convert -f openapi3.1.0', 'usages.json', 'java') == ('convert', '-f openapi3.1.0 -t java -i usages.json')


def test_remove_duplicates_list():
data = [
{'function_name': '', 'code': '', 'line_number': 1},
{'function_name': ':program', 'line_number': 1, 'code': None},
{'function_name': ':program', 'line_number': 1, 'code': None},
{'function_name': 'require', 'line_number': 6, 'code': 'app.ts::program:require'}
]
assert remove_duplicates_list(data) == [
{'code': '', 'function_name': '', 'line_number': 1},
{'code': None, 'function_name': ':program', 'line_number': 1},
{'code': 'app.ts::program:require', 'function_name': 'require', 'line_number': 6}
]

0 comments on commit 1f5e957

Please sign in to comment.