diff --git a/cartography/intel/semgrep/findings.py b/cartography/intel/semgrep/findings.py index 22ca18919..7d93ec157 100644 --- a/cartography/intel/semgrep/findings.py +++ b/cartography/intel/semgrep/findings.py @@ -143,64 +143,61 @@ def transform_sca_vulns(raw_vulns: List[Dict[str, Any]]) -> Tuple[List[Dict[str, vulns = [] usages = [] for vuln in raw_vulns: - try: - sca_vuln: Dict[str, Any] = {} - # Mandatory fields - repository_name = vuln["repository"]["name"] - rule_id = vuln["rule"]["name"] - vulnerability_class = _get_vuln_class(vuln) - package = vuln['found_dependency']['package'] - sca_vuln["id"] = vuln["id"] - sca_vuln["repositoryName"] = repository_name - sca_vuln["branch"] = vuln["ref"] - sca_vuln["ruleId"] = rule_id - sca_vuln["title"] = package + ":" + vulnerability_class - sca_vuln["description"] = vuln["rule"]["message"] - sca_vuln["ecosystem"] = vuln["found_dependency"]["ecosystem"] - sca_vuln["severity"] = vuln["severity"].upper() - sca_vuln["reachability"] = vuln["reachability"].upper() # Check done to determine rechabilitity - sca_vuln["reachableIf"] = vuln["reachable_condition"].upper() if vuln["reachable_condition"] else None - sca_vuln["exposureType"] = _determine_exposure(vuln) # Determintes if reachable or unreachable - dependency = f"{package}|{vuln['found_dependency']['version']}" - sca_vuln["matchedDependency"] = dependency - dep_url = vuln["found_dependency"]["lockfile_line_url"] - if dep_url: # Lock file can be null, need to set - dep_file = dep_url.split("/")[-1].split("#")[0] - sca_vuln["dependencyFileLocation_path"] = dep_file - sca_vuln["dependencyFileLocation_url"] = dep_url - else: - if sca_vuln.get("location"): - sca_vuln["dependencyFileLocation_path"] = sca_vuln["location"]["file_path"] - sca_vuln["transitivity"] = vuln["found_dependency"]["transitivity"].upper() - if vuln.get("vulnerability_identifier"): - vuln_id = vuln["vulnerability_identifier"].upper() - sca_vuln["cveId"] = vuln_id - sca_vuln["ref_urls"] = [_build_vuln_url(vuln_id)] - if vuln.get('fix_recommendations') and len(vuln['fix_recommendations']) > 0: - fix = vuln['fix_recommendations'][0] - dep_fix = f"{fix['package']}|{fix['version']}" - sca_vuln["closestSafeDependency"] = dep_fix - sca_vuln["openedAt"] = vuln["created_at"] - sca_vuln["fixStatus"] = vuln["status"] - sca_vuln["triageStatus"] = vuln["triage_state"] - sca_vuln["confidence"] = vuln["confidence"] - usage = vuln.get("usage") - if usage: - usage_dict = {} - url = usage["location"]["url"] - usage_dict["SCA_ID"] = sca_vuln["id"] - usage_dict["findingId"] = hash(url.split("github.com/")[-1]) - usage_dict["path"] = usage["location"]["path"] - usage_dict["startLine"] = usage["location"]["start_line"] - usage_dict["startCol"] = usage["location"]["start_col"] - usage_dict["endLine"] = usage["location"]["end_line"] - usage_dict["endCol"] = usage["location"]["end_col"] - usage_dict["url"] = url - usages.append(usage_dict) - vulns.append(sca_vuln) - except KeyError as e: - logger.warning(f"Error transforming Semgrep SCA vuln {vuln}: {e}") - continue + sca_vuln: Dict[str, Any] = {} + # Mandatory fields + repository_name = vuln["repository"]["name"] + rule_id = vuln["rule"]["name"] + vulnerability_class = _get_vuln_class(vuln) + package = vuln['found_dependency']['package'] + sca_vuln["id"] = vuln["id"] + sca_vuln["repositoryName"] = repository_name + sca_vuln["branch"] = vuln["ref"] + sca_vuln["ruleId"] = rule_id + sca_vuln["title"] = package + ":" + vulnerability_class + sca_vuln["description"] = vuln["rule"]["message"] + sca_vuln["ecosystem"] = vuln["found_dependency"]["ecosystem"] + sca_vuln["severity"] = vuln["severity"].upper() + sca_vuln["reachability"] = vuln["reachability"].upper() # Check done to determine rechabilitity + sca_vuln["reachableIf"] = vuln["reachable_condition"].upper() if vuln["reachable_condition"] else None + sca_vuln["exposureType"] = _determine_exposure(vuln) # Determintes if reachable or unreachable + dependency = f"{package}|{vuln['found_dependency']['version']}" + sca_vuln["matchedDependency"] = dependency + dep_url = vuln["found_dependency"]["lockfile_line_url"] + if dep_url: # Lock file can be null, need to set + dep_file = dep_url.split("/")[-1].split("#")[0] + sca_vuln["dependencyFileLocation_path"] = dep_file + sca_vuln["dependencyFileLocation_url"] = dep_url + else: + if sca_vuln.get("location"): + sca_vuln["dependencyFileLocation_path"] = sca_vuln["location"]["file_path"] + sca_vuln["transitivity"] = vuln["found_dependency"]["transitivity"].upper() + if vuln.get("vulnerability_identifier"): + vuln_id = vuln["vulnerability_identifier"].upper() + sca_vuln["cveId"] = vuln_id + sca_vuln["ref_urls"] = [_build_vuln_url(vuln_id)] + if vuln.get('fix_recommendations') and len(vuln['fix_recommendations']) > 0: + fix = vuln['fix_recommendations'][0] + dep_fix = f"{fix['package']}|{fix['version']}" + sca_vuln["closestSafeDependency"] = dep_fix + sca_vuln["openedAt"] = vuln["created_at"] + sca_vuln["fixStatus"] = vuln["status"] + sca_vuln["triageStatus"] = vuln["triage_state"] + sca_vuln["confidence"] = vuln["confidence"] + usage = vuln.get("usage") + if usage: + usage_dict = {} + url = usage["location"]["url"] + usage_dict["SCA_ID"] = sca_vuln["id"] + usage_dict["findingId"] = hash(url.split("github.com/")[-1]) + usage_dict["path"] = usage["location"]["path"] + usage_dict["startLine"] = usage["location"]["start_line"] + usage_dict["startCol"] = usage["location"]["start_col"] + usage_dict["endLine"] = usage["location"]["end_line"] + usage_dict["endCol"] = usage["location"]["end_col"] + usage_dict["url"] = url + usages.append(usage_dict) + vulns.append(sca_vuln) + return vulns, usages diff --git a/setup.py b/setup.py index 2cba1129a..dcc963b04 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages from setuptools import setup -__version__ = '0.94.0rc2' +__version__ = '0.94.0rc3' setup(