Skip to content

Commit

Permalink
Issue #37 - Correct overwriting of line number data. (#39)
Browse files Browse the repository at this point in the history
Signed-off-by: Caroline Russell <[email protected]>
  • Loading branch information
cerrussell authored Mar 20, 2024
1 parent b646d1f commit bbdfa59
Show file tree
Hide file tree
Showing 6 changed files with 10,897 additions and 298 deletions.
2 changes: 1 addition & 1 deletion atom_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
A cli, classes and functions for converting an atom slice to a different format
"""
__version__ = '0.4.3'
__version__ = '0.4.4'
232 changes: 162 additions & 70 deletions atom_tools/lib/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,36 @@ def __init__(
self.regex_param_count = 0
self.target_line_nums: Dict[str, Dict] = {}

def convert_usages(self) -> Dict[str, Any]:
def convert_usages(self) -> Dict[str, Dict]:
"""
Converts usages to OpenAPI.
"""
methods = self._process_methods()
methods = self.methods_to_endpoints(methods)
self.create_file_to_method_dict(methods)
self._identify_target_line_nums(methods)
self.target_line_nums = self._identify_target_line_nums(methods)
self.file_endpoint_map = self.create_file_to_method_dict(methods)
methods = self._process_calls(methods)
return self.populate_endpoints(methods)

def create_file_to_method_dict(self, method_map):
def create_file_to_method_dict(self, method_map: Dict[str, Any]) -> Dict[str, List]:
"""
Creates a dictionary of endpoints and methods.
"""
file_names = list(method_map.get('file_names').keys())
file_endpoint_map = {i: [] for i in file_names}
if not method_map:
return {}
file_names = list(method_map.get('file_names', {}).keys())
file_endpoint_map: Dict = {i: [] for i in file_names}
for full_name in file_names:
for values in method_map['file_names'][full_name]['resolved_methods'].values():
file_endpoint_map[full_name].extend(values.get('endpoints'))
for k, v in file_endpoint_map.items():
# filename = k.split(':')[0]
endpoints = set(v)
for i in endpoints:
if self.file_endpoint_map.get(i):
self.file_endpoint_map[i].add(k)
else:
self.file_endpoint_map[i] = {k}
self.file_endpoint_map = {k: list(v) for k, v in self.file_endpoint_map.items()}
return {k: list(v) for k, v in self.file_endpoint_map.items()}

def create_paths_item(self, filename: str, paths_dict: Dict) -> Dict:
"""
Expand All @@ -98,11 +99,11 @@ def create_paths_item(self, filename: str, paths_dict: Dict) -> Dict:
calls, ep, filename, call_line_numbers, target_line_number
)
if paths_object.get(ep):
paths_object[ep] |= paths_item_object
paths_object[ep] = merge_path_objects(paths_object[ep], paths_item_object)
else:
paths_object |= {ep: paths_item_object}

return _remove_nested_parameters(paths_object)
return remove_nested_parameters(paths_object)

def endpoints_to_openapi(self, server: str = '') -> Any:
"""
Expand Down Expand Up @@ -133,7 +134,6 @@ def methods_to_endpoints(self, method_map: Dict[str, Any]) -> Dict[str, Any]:
for file_name, resolved_methods in method_map.items():
if new_resolved := self._process_resolved_methods(resolved_methods):
new_method_map['file_names'][file_name] = {'resolved_methods': new_resolved}

return new_method_map

def populate_endpoints(self, method_map: Dict) -> Dict[str, Any]:
Expand All @@ -155,7 +155,6 @@ def populate_endpoints(self, method_map: Dict) -> Dict[str, Any]:
paths_object = merge_path_objects(paths_object, new_path_item)
else:
paths_object = new_path_item

return paths_object

def _calls_to_params(self, ep: str, orig_ep: str, call: Dict | None) -> Dict[str, Any]:
Expand Down Expand Up @@ -308,24 +307,24 @@ def _generic_params_helper(self, endpoint: str, orig_endpoint: str) -> List[Dict
)
return params

def _identify_target_line_nums(self, methods):
def _identify_target_line_nums(self, methods: Dict[str, Any]) -> Dict:
file_names = list(methods['file_names'].keys())
if not file_names:
return
conditional = [f'fileName==`{json.dumps(i)}`' for i in file_names]
conditional = '*[?' + ' || '.join(conditional) + (
return {}
conditional = [f'fileName==`{i}`' for i in file_names]
conditional = '*[?' + ' || '.join(conditional) + ( # type: ignore
'][].{file_name: fileName, methods: usages[].targetObj[].{resolved_method: '
'resolvedMethod || callName || code || name, line_number: lineNumber}}')
pattern = jmespath.compile(conditional)
pattern = jmespath.compile(conditional) # type: ignore
result = pattern.search(self.usages.content)
result = {i['file_name']: i['methods'] for i in result if i['methods']}
targets = {i: {} for i in result}
targets: Dict = {i: {} for i in result}

for k, v in result.items():
for i in v:
targets[k] |= {i['resolved_method']: i['line_number']}
targets[k] = merge_targets(targets[k], {i['resolved_method']: i['line_number']})

self.target_line_nums = targets
return targets

def _paths_object_helper(
self,
Expand All @@ -351,11 +350,13 @@ def _paths_object_helper(
if calls:
for call in calls:
paths_item_object |= self._calls_to_params(ep, orig_ep, call)
if (call_line_numbers or line_number) and (line_nos := _create_ln_entries(
if (call_line_numbers or line_number) and (line_nos := create_ln_entries(
filename, list(set(call_line_numbers)), line_number)):
paths_item_object |= line_nos
# if line_number:
# paths_item_object['x-atom-usages-target'] = {filename: line_number}
if 'x-atom-usages' in paths_item_object:
paths_item_object['x-atom-usages'] = merge_x_atom(
paths_item_object['x-atom-usages'], line_nos)
else:
paths_item_object |= line_nos
return ep, paths_item_object

def _parse_path_regexes(self, endpoint: str) -> str:
Expand Down Expand Up @@ -535,28 +536,51 @@ def _query_calls_helper(self, file_name: str) -> List[Dict]:
Returns:
list: The result of searching for the calls pattern in the usages.
"""
pattern = f'objectSlices[?fileName==`{json.dumps(file_name)}`].usages[].*[?callName][][]'
pattern = (f'objectSlices[?fileName==`{json.dumps(file_name.encode().decode())}`].usages[]'
f'.*[?callName][][]')
compiled_pattern = jmespath.compile(pattern)
return compiled_pattern.search(self.usages.content)


def merge_path_objects(p1: Dict, p2: Dict) -> Dict:
def create_ln_entries(filename: str, call_line_numbers: List, line_number: int | None) -> Dict:
"""
Merge two dictionaries representing path objects.
Creates line number entries for a given filename and line numbers.
Args:
p1 (dict): The first dictionary representing a path object.
p2 (dict): The second dictionary representing a path object.
filename (str): The name of the file.
call_line_numbers (list): A list of call line numbers.
line_number (int): Target line number.
Returns:
dict: The merged dictionary representing the path object.
dict: A dictionary containing line number entries.
"""
for key, value in p2.items():
if p1.get(key):
p1[key].update(value)
else:
p1[key] = value
return p1
fn = filename.split(':')[0]
x_atom: Dict = {'x-atom-usages': {}}
if call_line_numbers:
x_atom['x-atom-usages']['call'] = {fn: call_line_numbers}
if line_number:
x_atom['x-atom-usages']['target'] = {fn: line_number}
return x_atom


def determine_operations(call: Dict, params: List) -> Dict[str, Any]:
"""
Determine the supported operations based on the call and parameters.
Args:
call (dict): The call information.
params (list): The parameters for the call.
Returns:
dict: A dictionary containing the supported operations and their
parameters and responses.
"""
ops = {'get', 'put', 'post', 'delete', 'options', 'head', 'patch'}
if found := [op for op in ops if op in call.get('resolvedMethod', '').lower()]:
if params:
return {op: {'parameters': params, 'responses': {}} for op in found}
return {op: {'responses': {}} for op in found}
return {'parameters': params} if params else {}


def filter_calls(
Expand Down Expand Up @@ -584,59 +608,127 @@ def filter_calls(
return resolved_methods


def determine_operations(call: Dict, params: List) -> Dict[str, Any]:
def merge_operations(op1: Dict, op2: Dict) -> Dict:
"""
Determine the supported operations based on the call and parameters.
Merge two dictionaries of operations.
Args:
call (dict): The call information.
params (list): The parameters for the call.
op1 (dict): The first dictionary of operations.
op2 (dict): The second dictionary of operations.
Returns:
dict: A dictionary containing the supported operations and their
parameters and responses.
dict: The merged dictionary of operations.
"""
ops = {'get', 'put', 'post', 'delete', 'options', 'head', 'patch'}
if found := [op for op in ops if op in call.get('resolvedMethod', '').lower()]:
if params:
return {op: {'parameters': params, 'responses': {}} for op in found}
return {op: {'responses': {}} for op in found}
return {'parameters': params} if params else {}
for k, v in op2.items():
if v and not op1.get(k) or op1[k] == {}:
op1[k] = v
elif k == 'parameters' and v:
op1[k] = merge_params(op1[k], v)
return op1


def _remove_nested_parameters(data: Dict) -> Dict[str, Dict | List]:
def merge_params(p1: List, p2: List) -> List:
"""
Removes nested path parameters from the given data.
Merge two lists of parameters.
Args:
data (dict): The data containing nested path parameters.
p1 (list): The first list of parameters.
p2 (list): The second list of parameters.
Returns:
dict: The modified data with the nested path parameters removed.
list: The merged list of parameters.
"""
for value in data.values():
for v in value.values():
if isinstance(v, dict) and "parameters" in v and isinstance(v["parameters"], list):
v["parameters"] = [param for param in v["parameters"] if
param.get("in") != "path"]
return data
names = [i.get('name') for i in p1]
for i in p2:
if i.get('name', '') not in names:
p1.append(i)
return p1


def _create_ln_entries(filename, call_line_numbers, line_numbers):
def merge_path_objects(p1: Dict, p2: Dict) -> Dict:
"""
Creates line number entries for a given filename and line numbers.
Merge two dictionaries representing path objects.
Args:
filename (str): The name of the file.
call_line_numbers (list): A list of line numbers.
p1 (dict): The first dictionary representing a path object.
p2 (dict): The second dictionary representing a path object.
Returns:
dict: A dictionary containing line number entries.
dict: The merged dictionary representing the path object.
"""
fn = filename.split(':')[0]
x_atom = {'x-atom-usages': {}}
if call_line_numbers:
x_atom['x-atom-usages']['call'] = {fn: call_line_numbers}
if line_numbers:
x_atom['x-atom-usages']['target'] = {fn: line_numbers}
return x_atom
for key, value in p2.items():
if key not in p1:
p1[key] = value
continue
for k, v in value.items():
if p1[key].get(k):
if k == 'x-atom-usages':
p1[key][k] = merge_x_atom(p1[key][k], v)
elif k == 'parameters':
p1[key][k] = merge_params(p1[key][k], v)
elif k in {'get', 'put', 'post', 'delete', 'options', 'head', 'patch'}:
p1[key][k] = merge_operations(p1[key][k], v)
continue
p1[key][k] = v

return p1


def merge_targets(t1: Dict, t2: Dict) -> Dict:
"""
Merge two dictionaries of targets.
Args:
t1 (dict): The first dictionary of targets.
t2 (dict): The second dictionary of targets.
Returns:
dict: The merged dictionary of targets.
"""
for k, v in t2.items():
if k in t1:
t1[k].append(v)
else:
t1[k] = [v]
return t1


def merge_x_atom(x1: Dict, x2: Dict) -> Dict:
"""
Merge two dictionaries of x-atom-usages.
Args:
x1 (dict): The first dictionary of x atoms.
x2 (dict): The second dictionary of x atoms.
Returns:
dict: The merged dictionary of x atoms.
"""
for key, value in x2.items():
if key not in x1:
x1[key] = value
continue
for k, v in value.items():
if x1[key].get(k):
x1[key][k].extend(v)
else:
x1[key][k] = v
return x1


def remove_nested_parameters(data: Dict) -> Dict[str, Dict | List]:
"""
Removes nested path parameters from the given data.
Args:
data (dict): The data containing nested path parameters.
Returns:
dict: The modified data with the nested path parameters removed.
"""
for value in data.values():
for v in value.values():
if isinstance(v, dict) and 'parameters' in v and isinstance(v['parameters'], list):
v['parameters'] = [param for param in v['parameters'] if
param.get('in') != 'path']
return data
5 changes: 2 additions & 3 deletions atom_tools/lib/regex_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from dataclasses import dataclass
from typing import Tuple, List, Dict, Any


logger: logging.Logger = logging.getLogger(__name__)
py_type_mapping = {'int': 'integer', 'string': 'string', 'float': 'number', 'path': 'string'}


@dataclass
Expand Down Expand Up @@ -151,6 +153,3 @@ def create_tmp_regex_name(element: str, m: Tuple | str, count: int) -> Tuple[str
def fwd_slash_repl(match: re.Match) -> str:
"""For substituting forward slashes."""
return str(match['paren'].replace('/', '$L@$H'))


py_type_mapping = {'int': 'integer', 'string': 'string', 'float': 'number', 'path': 'string'}
4 changes: 3 additions & 1 deletion atom_tools/lib/slices.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def import_slice(filename: str | Path) -> Tuple[Dict, str]:
return {}, 'unknown'
try:
with open(filename, 'r', encoding='utf-8') as f:
content = json.load(f)
raw_content = f.read()
raw_content = raw_content.replace(r'\\', '/')
content = json.loads(raw_content)
if content.get('objectSlices'):
return content, 'usages'
if content.get('reachables'):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "atom-tools"
version = "0.4.3"
version = "0.4.4"
description = "Collection of tools for use with AppThreat/atom."
authors = [
{ name = "Caroline Russell", email = "[email protected]" },
Expand Down
Loading

0 comments on commit bbdfa59

Please sign in to comment.