Skip to content

Commit

Permalink
ms_defender: Improve reliability and error handling (#11898)
Browse files Browse the repository at this point in the history
* ms_defender: Improve reliability and error handling

* ms_defender: Improve reliability and error handling
  • Loading branch information
valentijnscholten authored Feb 27, 2025
1 parent 2c9a32f commit 96ae5ed
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 41 deletions.
92 changes: 51 additions & 41 deletions dojo/tools/ms_defender/parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import logging
import zipfile

from dojo.models import Endpoint, Finding

logger = logging.getLogger(__name__)


class MSDefenderParser:

Expand Down Expand Up @@ -53,22 +56,29 @@ def get_findings(self, file, test):
machines[data.get("id")] = data
for vulnerability in vulnerabilities:
try:
self.process_zip(vulnerability, machines[vulnerability["machineId"]])
machine = machines.get(vulnerability["machineId"], None)
if machine is not None:
self.process_zip(vulnerability, machine)
else:
logger.debug("fallback to process without machine: no machine id")
self.process_json(vulnerability)
except (IndexError, KeyError):
logger.exception("fallback to process without machine: exception")
self.process_json(vulnerability)
else:
return []
return self.findings

def process_json(self, vulnerability):
description = ""
description += "cveId: " + str(vulnerability["cveId"]) + "\n"
description += "machineId: " + str(vulnerability["machineId"]) + "\n"
description += "fixingKbId: " + str(vulnerability["fixingKbId"]) + "\n"
description += "productName: " + str(vulnerability["productName"]) + "\n"
description += "productVendor: " + str(vulnerability["productVendor"]) + "\n"
description += "productVersion: " + str(vulnerability["productVersion"]) + "\n"
title = str(vulnerability["cveId"])
description += "cveId: " + str(vulnerability.get("cveId", "")) + "\n"
description += "machineId: " + str(vulnerability.get("machineId", "")) + "\n"
description += "fixingKbId: " + str(vulnerability.get("fixingKbId", "")) + "\n"
description += "productName: " + str(vulnerability.get("productName", "")) + "\n"
description += "productVendor: " + str(vulnerability.get("productVendor", "")) + "\n"
description += "productVersion: " + str(vulnerability.get("productVersion", "")) + "\n"
description += "machine Info: " + "Unable to find or parse machine data, check logs for more information" + "\n"
title = str(vulnerability.get("cveId", ""))
finding = Finding(
title=title + "_" + vulnerability["machineId"],
severity=self.severity_check(vulnerability["severity"]),
Expand All @@ -86,35 +96,35 @@ def process_json(self, vulnerability):

def process_zip(self, vulnerability, machine):
description = ""
description += "cveId: " + str(vulnerability["cveId"]) + "\n"
description += "machineId: " + str(vulnerability["machineId"]) + "\n"
description += "fixingKbId: " + str(vulnerability["fixingKbId"]) + "\n"
description += "productName: " + str(vulnerability["productName"]) + "\n"
description += "productVendor: " + str(vulnerability["productVendor"]) + "\n"
description += "productVersion: " + str(vulnerability["productVersion"]) + "\n"
description += "machine Info: id: " + str(machine["id"]) + "\n"
description += "machine Info: osPlatform: " + str(machine["osPlatform"]) + "\n"
description += "machine Info: osVersion: " + str(machine["osVersion"]) + "\n"
description += "machine Info: osProcessor: " + str(machine["osProcessor"]) + "\n"
description += "machine Info: version: " + str(machine["version"]) + "\n"
description += "machine Info: agentVersion: " + str(machine["agentVersion"]) + "\n"
description += "machine Info: osBuild: " + str(machine["osBuild"]) + "\n"
description += "machine Info: healthStatus: " + str(machine["healthStatus"]) + "\n"
description += "machine Info: deviceValue: " + str(machine["deviceValue"]) + "\n"
description += "machine Info: rbacGroupId: " + str(machine["rbacGroupId"]) + "\n"
description += "machine Info: rbacGroupName: " + str(machine["rbacGroupName"]) + "\n"
description += "machine Info: riskScore: " + str(machine["riskScore"]) + "\n"
description += "machine Info: exposureLevel: " + str(machine["exposureLevel"]) + "\n"
description += "machine Info: isAadJoined: " + str(machine["isAadJoined"]) + "\n"
description += "machine Info: aadDeviceId: " + str(machine["aadDeviceId"]) + "\n"
description += "machine Info: defenderAvStatus: " + str(machine["defenderAvStatus"]) + "\n"
description += "machine Info: onboardingStatus: " + str(machine["onboardingStatus"]) + "\n"
description += "machine Info: osArchitecture: " + str(machine["osArchitecture"]) + "\n"
description += "machine Info: managedBy: " + str(machine["managedBy"]) + "\n"
title = str(vulnerability["cveId"])
if str(machine["computerDnsName"]) != "null":
description += "cveId: " + str(vulnerability.get("cveId", "")) + "\n"
description += "machineId: " + str(vulnerability.get("machineId", "")) + "\n"
description += "fixingKbId: " + str(vulnerability.get("fixingKbId", "")) + "\n"
description += "productName: " + str(vulnerability.get("productName", "")) + "\n"
description += "productVendor: " + str(vulnerability.get("productVendor", "")) + "\n"
description += "productVersion: " + str(vulnerability.get("productVersion", "")) + "\n"
description += "machine Info: id: " + str(machine.get("id", "")) + "\n"
description += "machine Info: osPlatform: " + str(machine.get("osPlatform", "")) + "\n"
description += "machine Info: osVersion: " + str(machine.get("osVersion", "")) + "\n"
description += "machine Info: osProcessor: " + str(machine.get("osProcessor", "")) + "\n"
description += "machine Info: version: " + str(machine.get("version", "")) + "\n"
description += "machine Info: agentVersion: " + str(machine.get("agentVersion", "")) + "\n"
description += "machine Info: osBuild: " + str(machine.get("osBuild", "")) + "\n"
description += "machine Info: healthStatus: " + str(machine.get("healthStatus", "")) + "\n"
description += "machine Info: deviceValue: " + str(machine.get("deviceValue", "")) + "\n"
description += "machine Info: rbacGroupId: " + str(machine.get("rbacGroupId", "")) + "\n"
description += "machine Info: rbacGroupName: " + str(machine.get("rbacGroupName", "")) + "\n"
description += "machine Info: riskScore: " + str(machine.get("riskScore", "")) + "\n"
description += "machine Info: exposureLevel: " + str(machine.get("exposureLevel", "")) + "\n"
description += "machine Info: isAadJoined: " + str(machine.get("isAadJoined", "")) + "\n"
description += "machine Info: aadDeviceId: " + str(machine.get("aadDeviceId", "")) + "\n"
description += "machine Info: defenderAvStatus: " + str(machine.get("defenderAvStatus", "")) + "\n"
description += "machine Info: onboardingStatus: " + str(machine.get("onboardingStatus", "")) + "\n"
description += "machine Info: osArchitecture: " + str(machine.get("osArchitecture", "")) + "\n"
description += "machine Info: managedBy: " + str(machine.get("managedBy", "")) + "\n"
title = str(vulnerability.get("cveId", ""))
if "computerDnsName" in machine and str(machine["computerDnsName"]) != "null":
title = title + "_" + str(machine["computerDnsName"])
if str(machine["osPlatform"]) != "null":
if "osPlatform" in machine and str(machine["osPlatform"]) != "null":
title = title + "_" + str(machine["osPlatform"])
finding = Finding(
title=title + "_" + vulnerability["machineId"],
Expand All @@ -123,18 +133,18 @@ def process_zip(self, vulnerability, machine):
static_finding=False,
dynamic_finding=True,
)
if vulnerability["fixingKbId"] is not None:
if "fixingKbId" in vulnerability and vulnerability["fixingKbId"] is not None:
finding.mitigation = vulnerability["fixingKbId"]
if vulnerability["cveId"] is not None:
if "cveId" in vulnerability:
finding.unsaved_vulnerability_ids = []
finding.unsaved_vulnerability_ids.append(vulnerability["cveId"])
self.findings.append(finding)
finding.unsaved_endpoints = []
if machine["computerDnsName"] is not None:
if "computerDnsName" in machine and machine["computerDnsName"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["computerDnsName"]).replace(" ", "").replace("(", "_").replace(")", "_")))
if machine["lastIpAddress"] is not None:
if "lastIpAddress" in machine and machine["lastIpAddress"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["lastIpAddress"])))
if machine["lastExternalIpAddress"] is not None:
if "lastExternalIpAddress" in machine and machine["lastExternalIpAddress"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["lastExternalIpAddress"])))

def severity_check(self, input):
Expand Down
Binary file not shown.
10 changes: 10 additions & 0 deletions unittests/tools/test_ms_defender_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ def test_parser_defender_issue_11217(self):
endpoint.clean()
self.assertEqual("Max_Mustermann_iPadAir_17zoll__2ndgeneration_", finding.unsaved_endpoints[0].host)

def test_parser_defender_error_handling(self):
"""https://github.com/DefectDojo/django-DefectDojo/issues/11896 handle missing values properly, i.e. defenderAvStatus"""
testfile = open(get_unit_tests_scans_path("ms_defender") / "defender_error_handling.zip", encoding="utf-8")
parser = MSDefenderParser()
findings = parser.get_findings(testfile, Test())
testfile.close()
self.assertEqual(421, len(findings))
finding = findings[0]
self.assertEqual(3, len(finding.unsaved_endpoints))

def test_parser_defender_empty_machines(self):
testfile = open(get_unit_tests_scans_path("ms_defender") / "empty_machines.zip", encoding="utf-8")
parser = MSDefenderParser()
Expand Down

0 comments on commit 96ae5ed

Please sign in to comment.