From 28c3a9a5dc8c9066d6b9ae0f23000bbd16618d1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Zaj=C4=85c?= Date: Fri, 20 Dec 2024 10:42:11 +0100 Subject: [PATCH] Allowing multiple passive dns sources --- artemis/config.py | 20 ++---- artemis/modules/ip_lookup.py | 10 ++- .../modules/removed_domain_existing_vhost.py | 63 +++++++------------ artemis/reporting/export/db.py | 9 +++ artemis/reporting/export/export_data.py | 2 + 5 files changed, 46 insertions(+), 58 deletions(-) diff --git a/artemis/config.py b/artemis/config.py index d9ff189f6..c9c59ab10 100644 --- a/artemis/config.py +++ b/artemis/config.py @@ -799,29 +799,17 @@ class Postman: ] = get_config("POSTMAN_MAIL_TO", default="to@example.com") class RemovedDomainExistingVhost: - REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URL: Annotated[ + REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URLS: Annotated[ str, - "The passive DNS url to download old domain IPs from. Currently, the system was tested with circl.lu " - "passive DNS.", - ] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URL", default=None, cast=str) + "Comma-separated list of URLs (optionally with username:password) to download old domain IPs from. " + "Currently, the system was tested with circl.lu passive DNS. **The URL should end with /pdns/query/**.", + ] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URLS", default=None, cast=decouple.Csv(str)) REMOVED_DOMAIN_EXISTING_VHOST_REPORT_ONLY_SUBDOMAINS: Annotated[ str, "If set to True, 'removed domain but existing vhost' situations will be reported only for subdomains.", ] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_REPORT_ONLY_SUBDOMAINS", default=False, cast=bool) - REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_USERNAME: Annotated[ - str, - "The passive DNS username to be used to download old domain IPs. Currently, the system was tested with circl.lu " - "passive DNS.", - ] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_USERNAME", default=None, cast=str) - - REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_PASSWORD: Annotated[ - str, - "The passive DNS password to be used to download old domain IPs. Currently, the system was tested with circl.lu " - "passive DNS.", - ] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_PASSWORD", default=None, cast=str) - REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_SLEEP_BETWEEN_REQUESTS_SECONDS: Annotated[ float, "How long to sleep between passivedns requests in order not to overload the provider.", diff --git a/artemis/modules/ip_lookup.py b/artemis/modules/ip_lookup.py index 68a9b2358..367cdcbff 100644 --- a/artemis/modules/ip_lookup.py +++ b/artemis/modules/ip_lookup.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 +from typing import Set + from karton.core import Task from artemis import load_risk_class -from artemis.binds import TaskType +from artemis.binds import TaskStatus, TaskType from artemis.module_base import ArtemisBase from artemis.resolvers import lookup @@ -18,15 +20,17 @@ class IPLookup(ArtemisBase): {"type": TaskType.DOMAIN.value}, ] - def _process(self, current_task: Task, domain: str) -> None: + def _process(self, current_task: Task, domain: str) -> Set[str]: found_ips = lookup(domain) for found_ip in found_ips: new_task = Task({"type": TaskType.NEW}, payload={"data": found_ip}) self.add_task(current_task, new_task) + return found_ips def run(self, current_task: Task) -> None: domain = current_task.get_payload(TaskType.DOMAIN) - self._process(current_task, domain) + ips = self._process(current_task, domain) + self.db.save_task_result(task=current_task, status=TaskStatus.OK, data={"ips": ips}) if __name__ == "__main__": diff --git a/artemis/modules/removed_domain_existing_vhost.py b/artemis/modules/removed_domain_existing_vhost.py index f924f1be2..2fec25275 100644 --- a/artemis/modules/removed_domain_existing_vhost.py +++ b/artemis/modules/removed_domain_existing_vhost.py @@ -1,4 +1,3 @@ -import base64 import binascii import json import os @@ -29,42 +28,32 @@ class RemovedDomainExistingVhost(ArtemisBase): filters = [{"type": TaskType.DOMAIN_THAT_MAY_NOT_EXIST.value}] def _obtain_past_target_ips(self, domain: str) -> Set[str]: - response = http_requests.get( - Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URL + domain, - headers={ - "Authorization": "Basic " - + base64.b64encode( - ( - Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_USERNAME - + ":" - + Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_PASSWORD - ).encode("utf-8") - ).decode("ascii") - }, - ) - time.sleep( - Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_SLEEP_BETWEEN_REQUESTS_SECONDS - ) - if response.status_code == 404: - return set() - - self.log.info( - "Response for %s: status code=%s, first bytes: %s", domain, response.status_code, response.content[:30] - ) - data = response.content - result = set() - for line in data.split("\n"): - if not line: + result: Set[str] = set() + for url in Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URLS: + response = http_requests.get(url + domain) + time.sleep( + Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_SLEEP_BETWEEN_REQUESTS_SECONDS + ) + if response.status_code == 404: continue - try: - item = json.loads(line) - except json.decoder.JSONDecodeError: - self.log.error("Unable to parse response: %s", line) - continue + self.log.info( + "Response for %s: status code=%s, first bytes: %s", domain, response.status_code, response.content[:30] + ) + data = response.content + result = set() + for line in data.split("\n"): + if not line: + continue + + try: + item = json.loads(line) + except json.decoder.JSONDecodeError: + self.log.error("Unable to parse response: %s", line) + continue - if item["rrtype"] in ["A", "AAAA"]: - result.add(item["rrname"]) + if item["rrtype"] in ["A", "AAAA"]: + result.add(item["rrname"]) return result @@ -136,11 +125,7 @@ def run(self, current_task: Task) -> None: if __name__ == "__main__": - if ( - Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URL - and Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_USERNAME - and Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_PASSWORD - ): + if Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URLS: RemovedDomainExistingVhost().loop() else: no_pdns_config_message_printed_filename = "/.no-pdns-config-message-shown" diff --git a/artemis/reporting/export/db.py b/artemis/reporting/export/db.py index 26f47d30c..9debf8084 100644 --- a/artemis/reporting/export/db.py +++ b/artemis/reporting/export/db.py @@ -48,6 +48,7 @@ def _initialize_data_if_needed(self) -> None: return self._reports = [] + self._ips = {} self._scanned_top_level_targets = set() self._scanned_targets = set() self._tag_stats: DefaultDict[str, int] = defaultdict(lambda: 0) @@ -71,6 +72,9 @@ def _initialize_data_if_needed(self) -> None: if top_level_target: self._scanned_top_level_targets.add(top_level_target) + if result["task"]["headers"]["receiver"] == "IPLookup": + self._ips[result["target_string"]] = list(result["task"].get("result", {}).get("ips", [])) + self._scanned_targets.add(DataLoader._get_target_host(result["task"])) # The underlying data format changed, let's not require the reporters to change @@ -96,6 +100,11 @@ def reports(self) -> List[Report]: self._initialize_data_if_needed() return self._reports + @property + def ips(self) -> dict[str, List[str]]: + self._initialize_data_if_needed() + return self._ips + @property def scanned_top_level_targets(self) -> Set[str]: self._initialize_data_if_needed() diff --git a/artemis/reporting/export/export_data.py b/artemis/reporting/export/export_data.py index 155a08011..725746e8b 100644 --- a/artemis/reporting/export/export_data.py +++ b/artemis/reporting/export/export_data.py @@ -27,6 +27,7 @@ class ExportData: language: str scanned_top_level_targets: List[str] scanned_targets: List[str] + ips: Dict[str, List[str]] messages: Dict[str, SingleTopLevelTargetExportData] alerts: List[str] @@ -96,6 +97,7 @@ def build_export_data( language=language.value, scanned_top_level_targets=list(db.scanned_top_level_targets), scanned_targets=list(db.scanned_targets), + ips=db.ips, messages=message_data, alerts=alerts, )