Skip to content

Commit

Permalink
Feat: OpenAPI endpoint line numbers (#22)
Browse files Browse the repository at this point in the history
* Add line numbers to x-atom-usages for now.

Signed-off-by: Caroline Russell <[email protected]>

* Bump version, move flake8 config, update docstrings and types.

Signed-off-by: Caroline Russell <[email protected]>

---------

Signed-off-by: Caroline Russell <[email protected]>
  • Loading branch information
cerrussell authored Feb 16, 2024
1 parent a0e015f commit 0861e5a
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 240 deletions.
4 changes: 4 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[flake8]
max-line-length = 99
max-complexity = 18
exclude = ["atom_tools/cli/"]
20 changes: 5 additions & 15 deletions atom_tools/cli/commands/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,23 @@ def handle(self):
"""
Executes the convert command and performs the conversion.
"""
supported_types = [
'java', 'jar', 'python', 'py', 'javascript', 'js',
'typescript', 'ts'
]
supported_types = ['java', 'jar', 'python', 'py', 'javascript', 'js', 'typescript', 'ts']
if self.option('type') not in supported_types:
raise ValueError(
f'Unknown origin type: {self.option("type")}'
)
raise ValueError(f'Unknown origin type: {self.option("type")}')
match self.option('format'):
case 'openapi3.1.0' | 'openapi3.0.1':
converter = OpenAPI(
self.option('format'),
self.option('type'),
self.option('usages-slice'),
# self.option('server'),
# self.option('reachables-slice'),
)

if not (result := converter.endpoints_to_openapi(
self.option('server'))):
if not (result := converter.endpoints_to_openapi(self.option('server'))):
logging.error('No results produced!')
return 1
with open(self.option('output-file'), 'w',
encoding='utf-8') as f:
with open(self.option('output-file'), 'w', encoding='utf-8') as f:
json.dump(result, f, indent=4, sort_keys=True)
logging.info(f'OpenAPI document written to '
f'{self.option("output-file")}.')
logging.info(f'OpenAPI document written to {self.option("output-file")}.')
case _:
raise ValueError(
f'Unknown destination format: {self.option("format")}'
Expand Down
157 changes: 88 additions & 69 deletions atom_tools/lib/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ class OpenAPI:
regex (RegexCollection): collection of regular expressions
Methods:
_create_ln_entries: Creates an x-atom-usages entry.
_filter_matches: Filters a list of matches based on certain criteria.
_js_helper: Formats path sections which are parameters correctly.
_process_methods_helper: Utility for process_methods.
_query_calls_helper: A helper function to query calls.
_remove_nested_parameters: Removes nested path parameters from the get/post/etc.
calls_to_params: Transforms a call and endpoint into parameter object.
collect_methods: Collects and combines methods that may be endpoints.
convert_usages: Converts usages to OpenAPI.
Expand All @@ -93,7 +96,6 @@ class OpenAPI:
endpoints_to_openapi: Generates an OpenAPI document.
extract_endpoints: Extracts endpoints from the given code.
filter_calls: Filters invokedCalls and argToCalls.
filter_matches: Filters a list of matches based on certain criteria.
methods_to_endpoints: Converts a method map to a map of endpoints.
populate_endpoints: Populates the endpoints based on the method_map.
process_calls: Processes calls and returns a new method map.
Expand All @@ -112,6 +114,7 @@ def __init__(
self.openapi_version = dest_format.replace('openapi', '')
self.title = f'OpenAPI Specification for {Path(usages).parent.stem}'
self.regex = RegexCollection()
self.file_endpoint_map: Dict = {}

def endpoints_to_openapi(self, server: str = '') -> Any:
"""
Expand All @@ -128,12 +131,32 @@ def endpoints_to_openapi(self, server: str = '') -> Any:

return output

def create_file_to_method_dict(self, method_map):
"""
Creates a dictionary of endpoints and methods.
"""
full_names = list(method_map.get('full_names').keys())
file_endpoint_map = {i: [] for i in full_names}
for full_name in full_names:
for values in method_map['full_names'][full_name]['resolved_methods'].values():
file_endpoint_map[full_name].extend(values.get('endpoints'))
for k, v in file_endpoint_map.items():
filename = k.split(':')[0]
endpoints = set(v)
for i in endpoints:
if self.file_endpoint_map.get(i):
self.file_endpoint_map[i].add(filename)
else:
self.file_endpoint_map[i] = {filename}
self.file_endpoint_map = {k: list(v) for k, v in self.file_endpoint_map.items()}

def convert_usages(self) -> Dict[str, Any]:
"""
Converts usages to OpenAPI.
"""
methods = self.process_methods()
methods = self.methods_to_endpoints(methods)
self.create_file_to_method_dict(methods)
methods = self.process_calls(methods)
return self.populate_endpoints(methods)

Expand Down Expand Up @@ -177,32 +200,26 @@ def process_methods(self) -> Dict[str, List[str]]:
Create a dictionary of full names and their corresponding methods.
"""
method_map = self._process_methods_helper(
'objectSlices[].{fullName: fullName, resolvedMethods: usages['
'].*.resolvedMethod[]}')
'objectSlices[].{fullName: fullName, resolvedMethods: usages[].*.resolvedMethod[]}')

calls = self._process_methods_helper(
'objectSlices[].{fullName: fullName, resolvedMethods: usages[].*['
'][?resolvedMethod].resolvedMethod[]}')
'objectSlices[].{fullName: fullName, resolvedMethods: usages[].*[][?resolvedMethod].'
'resolvedMethod[]}')

user_defined_types = self._process_methods_helper(
'userDefinedTypes[].{fullName: name, resolvedMethods: fields['
'].name}')
'userDefinedTypes[].{fullName: name, resolvedMethods: fields[].name}')

for key, value in calls.items():
if method_map.get(key):
method_map[key]['resolved_methods'].extend(
value.get('resolved_methods'))
method_map[key]['resolved_methods'].extend(value.get('resolved_methods'))
else:
method_map[key] = {
'resolved_methods': value.get('resolved_methods')}
method_map[key] = {'resolved_methods': value.get('resolved_methods')}

for key, value in user_defined_types.items():
if method_map.get(key):
method_map[key]['resolved_methods'].extend(
value.get('resolved_methods'))
method_map[key]['resolved_methods'].extend(value.get('resolved_methods'))
else:
method_map[key] = {
'resolved_methods': value.get('resolved_methods')}
method_map[key] = {'resolved_methods': value.get('resolved_methods')}

for k, v in method_map.items():
method_map[k] = list(set(v.get('resolved_methods')))
Expand All @@ -228,8 +245,7 @@ def query_calls(self, full_name: str, resolved_methods: List[str]) -> List:
calls.append(call)
return calls

def _query_calls_helper(
self, full_name: str, call_type_str: str) -> List[Dict]:
def _query_calls_helper(self, full_name: str, call_type_str: str) -> List[Dict]:
"""
A function to help query calls.
Expand All @@ -240,8 +256,7 @@ def _query_calls_helper(
Returns:
list: The result of searching for the calls pattern in the usages.
"""
pattern = (f'objectSlices[].usages[?fullName=='
f'{json.dumps(full_name)}{call_type_str}')
pattern = f'objectSlices[].usages[?fullName=={json.dumps(full_name)}{call_type_str}'
compiled_pattern = jmespath.compile(pattern)
return compiled_pattern.search(self.usages.content)

Expand All @@ -254,20 +269,18 @@ def process_calls(self, method_map: Dict) -> Dict[str, Any]:
dict: A new method map containing calls.
"""
for full_name, resolved_methods in method_map['full_names'].items():
if res := self.query_calls(
full_name, resolved_methods['resolved_methods'].keys()):
if res := self.query_calls(full_name, resolved_methods['resolved_methods'].keys()):
mmap = self.filter_calls(res, resolved_methods)
else:
mmap = self.filter_calls([], resolved_methods)
method_map['full_names'][full_name]['resolved_methods'] = (
mmap.get('resolved_methods'))

method_map['full_names'][full_name]['resolved_methods'] = mmap.get('resolved_methods')

return method_map

@staticmethod
def filter_calls(
queried_calls: List[Dict[str, Any]], resolved_methods: Dict
) -> Dict[str, List]:
queried_calls: List[Dict[str, Any]], resolved_methods: Dict) -> Dict[str, List]:
"""
Iterate through the invokedCalls and argToCalls and create a relevant
dictionary of endpoints and calls.
Expand All @@ -282,12 +295,15 @@ def filter_calls(
i for i in queried_calls
if i.get('resolvedMethod', '') == method
]
resolved_methods['resolved_methods'][method].update(
{'calls': calls})
lns = [
i.get('lineNumber')
for i in calls
if i.get('lineNumber') and i.get('resolvedMethod', '') == method
]
resolved_methods['resolved_methods'][method].update({'calls': calls, 'line_nos': lns})
return resolved_methods

def methods_to_endpoints(
self, method_map: Dict[str, Any]) -> Dict[str, Any]:
def methods_to_endpoints(self, method_map: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert a method map to a map of endpoints.
Expand Down Expand Up @@ -340,8 +356,10 @@ def _process_methods_helper(self, pattern: str) -> Dict[str, Any]:
"""
dict_resolved_pattern = jmespath.compile(pattern)
result = [i for i in dict_resolved_pattern.search(self.usages.content)
if i.get('resolvedMethods')]
result = [
i for i in dict_resolved_pattern.search(self.usages.content)
if i.get('resolvedMethods')
]

resolved: Dict = {}
for r in result:
Expand All @@ -364,9 +382,9 @@ def populate_endpoints(self, method_map: Dict) -> Dict[str, Any]:
"""
paths_object: Dict = {}
for resolved_methods in method_map.values():
for value in resolved_methods.values():
for key, value in resolved_methods.items():
for m in value['resolved_methods'].items():
new_path_item = self.create_paths_item(m)
new_path_item = self.create_paths_item(key, m)
if paths_object:
paths_object = self.merge_path_objects(
paths_object, new_path_item)
Expand Down Expand Up @@ -394,17 +412,19 @@ def merge_path_objects(p1: Dict, p2: Dict) -> Dict:
p1[key] = value
return p1

def create_paths_item(self, paths_dict: Dict) -> Dict:
def create_paths_item(self, filename: str, paths_dict: Dict) -> Dict:
"""
Create paths item object based on provided endpoints and calls.
Args:
filename (str): The name of the file
paths_dict (dict): The object containing endpoints and calls
Returns:
dict: The paths item object
"""
endpoints = paths_dict[1].get('endpoints')
calls = paths_dict[1].get('calls')
line_numbers = paths_dict[1].get('line_nos')
paths_object: Dict = {}

for ep in endpoints:
Expand All @@ -424,6 +444,9 @@ def create_paths_item(self, paths_dict: Dict) -> Dict:
paths_item_object |= self.calls_to_params(ep, call)
else:
paths_item_object |= self.calls_to_params(ep, None)
if line_numbers and (line_nos := self._create_ln_entries(
filename, list(set(line_numbers)))):
paths_item_object |= line_nos
if paths_item_object:
if paths_object.get(ep):
paths_object[ep] |= paths_item_object
Expand All @@ -432,10 +455,25 @@ def create_paths_item(self, paths_dict: Dict) -> Dict:
else:
paths_object[ep] = {}

return self.remove_nested_parameters(paths_object)
return self._remove_nested_parameters(paths_object)

@staticmethod
def _create_ln_entries(filename, line_numbers):
"""
Creates line number entries for a given filename and line numbers.
Args:
filename (str): The name of the file.
line_numbers (list): A list of line numbers.
Returns:
dict: A dictionary containing line number entries.
"""
fn = filename.split(':')[0]
return {'x-atom-usages': {fn: line_numbers}}

@staticmethod
def remove_nested_parameters(data: Dict) -> Dict[str, Dict | List]:
def _remove_nested_parameters(data: Dict) -> Dict[str, Dict | List]:
"""
Removes nested path parameters from the given data.
Expand All @@ -447,8 +485,7 @@ def remove_nested_parameters(data: Dict) -> Dict[str, Dict | List]:
"""
for value in data.values():
for v in value.values():
if isinstance(v, dict) and "parameters" in v and isinstance(
v["parameters"], list):
if isinstance(v, dict) and "parameters" in v and isinstance(v["parameters"], list):
v["parameters"] = [param for param in v["parameters"] if
param.get("in") != "path"]
return data
Expand All @@ -467,15 +504,10 @@ def determine_operations(call: Dict, params: List) -> Dict[str, Any]:
parameters and responses.
"""
ops = {'get', 'put', 'post', 'delete', 'options', 'head', 'patch'}
if found := [
op for op in ops if op in call.get('resolvedMethod', '').lower()
]:
if found := [op for op in ops if op in call.get('resolvedMethod', '').lower()]:
if params:
return {op: {
'parameters': params, 'responses': {}} for op in found
}
return {
op: {'responses': {}} for op in found}
return {op: {'parameters': params, 'responses': {}} for op in found}
return {op: {'responses': {}} for op in found}
return {'parameters': params} if params else {}

def calls_to_params(self, ep: str, call: Dict | None) -> Dict[str, Any]:
Expand Down Expand Up @@ -517,15 +549,9 @@ def create_param_object(self, ep: str, call: Dict | None) -> List[Dict]:
if not params and call:
ptypes = set(call.get('paramTypes', []))
if len(ptypes) > 1:
params = [
{'name': param, 'in': 'header'}
for param in ptypes if param != 'ANY'
]
params = [{'name': param, 'in': 'header'} for param in ptypes if param != 'ANY']
else:
params = [
{'name': param, 'in': 'header'}
for param in ptypes
]
params = [{'name': param, 'in': 'header'} for param in ptypes]
return params

def collect_methods(self) -> List:
Expand All @@ -537,14 +563,11 @@ def collect_methods(self) -> List:
list: A list of unique methods.
"""
# Surely there is a way to combine these...
target_obj_pattern = jmespath.compile(
'objectSlices[].usages[].targetObj.resolvedMethod')
defined_by_pattern = jmespath.compile(
'objectSlices[].usages[].definedBy.resolvedMethod')
invoked_calls_pattern = jmespath.compile(
'objectSlices[].usages[].invokedCalls[].resolvedMethod')
udt_jmespath_query = jmespath.compile(
'userDefinedTypes[].fields[].name')
target_obj_pattern = jmespath.compile('objectSlices[].usages[].targetObj.resolvedMethod')
defined_by_pattern = jmespath.compile('objectSlices[].usages[].definedBy.resolvedMethod')
invoked_calls_pattern = jmespath.compile('objectSlices[].usages[].invokedCalls[].resolved'
'Method')
udt_jmespath_query = jmespath.compile('userDefinedTypes[].fields[].name')
methods = target_obj_pattern.search(self.usages.content) or []
methods.extend(defined_by_pattern.search(self.usages.content) or [])
methods.extend(invoked_calls_pattern.search(self.usages.content) or [])
Expand All @@ -567,10 +590,10 @@ def extract_endpoints(self, method: str) -> List[str]:
return endpoints
if not (matches := re.findall(self.regex.endpoints, method)):
return endpoints
matches = self.filter_matches(matches, method)
matches = self._filter_matches(matches, method)
return [v for v in matches if v]

def filter_matches(self, matches: List[str], code: str) -> List[str]:
def _filter_matches(self, matches: List[str], code: str) -> List[str]:
"""
Filters a list of matches based on certain criteria.
Expand All @@ -592,11 +615,7 @@ def filter_matches(self, matches: List[str], code: str) -> List[str]:
):
return filtered_matches
case 'js' | 'ts' | 'javascript' | 'typescript':
if (
'app.' not in code and
'route' not in code and
'ftp' not in code
):
if 'app.' not in code and 'route' not in code and 'ftp' not in code:
return filtered_matches

for m in matches:
Expand Down
Loading

0 comments on commit 0861e5a

Please sign in to comment.