Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed domain, existing vhost #1455

Merged
merged 18 commits into from
Dec 18, 2024
9 changes: 6 additions & 3 deletions artemis/binds.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ class TaskType(str, Enum):
# unclassified data (goes to classifier)
NEW = "new"

# {domain: lidl.com}
# {domain: google.com}
DOMAIN = "domain"

# {domain: google.com but without existence filtering enabled}
DOMAIN_THAT_MAY_NOT_EXIST = "domain_that_may_not_exist"

# {ip: 8.8.8.8}
IP = "ip"

# {service: lidl.com:443}
# {service: google.com:443}
SERVICE = "service"

# {webapp: having a URL, e.g. https://lidl.com/new/, and a type, e.g. WebApplication.WORDPRESS}
# {webapp: having a URL, e.g. https://google.com/new/, and a type, e.g. WebApplication.WORDPRESS}
WEBAPP = "webapp"

# {URL: just a HTTP URL. Must have content attached to make further operations faster}
Expand Down
37 changes: 37 additions & 0 deletions artemis/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,43 @@ class Postman:
"Recipient e-mail address, e.g. for open relay testing.",
] = get_config("POSTMAN_MAIL_TO", default="[email protected]")

class RemovedDomainExistingVhost:
REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URL: Annotated[
str,
"The passive DNS url to download old domain IPs from. Currently, the system was tested with circl.lu "
"passive DNS.",
] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URL", default=None, cast=str)

REMOVED_DOMAIN_EXISTING_VHOST_REPORT_ONLY_SUBDOMAINS: Annotated[
str,
"If set to True, 'removed domain but existing vhost' situations will be reported only for subdomains.",
] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_REPORT_ONLY_SUBDOMAINS", default=False, cast=bool)

REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_USERNAME: Annotated[
str,
"The passive DNS username to be used to download old domain IPs. Currently, the system was tested with circl.lu "
"passive DNS.",
] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_USERNAME", default=None, cast=str)

REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_PASSWORD: Annotated[
str,
"The passive DNS password to be used to download old domain IPs. Currently, the system was tested with circl.lu "
"passive DNS.",
] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_PASSWORD", default=None, cast=str)

REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_SLEEP_BETWEEN_REQUESTS_SECONDS: Annotated[
float,
"How long to sleep between passivedns requests in order not to overload the provider.",
] = get_config(
"REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_SLEEP_BETWEEN_REQUESTS_SECONDS", default=10, cast=float
)

REMOVED_DOMAIN_EXISTING_VHOST_SIMILARITY_THRESHOLD: Annotated[
float,
"How similar the results for correct and different domain should be to consider that the server "
"doesn't host the given domain.",
] = get_config("REMOVED_DOMAIN_EXISTING_VHOST_SIMILARITY_THRESHOLD", default=0.5, cast=float)

class Shodan:
SHODAN_API_KEY: Annotated[
str,
Expand Down
25 changes: 19 additions & 6 deletions artemis/module_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,30 @@ def add_task_if_domain_exists(self, current_task: Task, new_task: Task) -> bool:
self.add_task(current_task, new_task)
return True

if self.check_domain_exists(domain):
if self.check_domain_exists_or_is_placeholder(domain):
self.add_task(current_task, new_task)
return True
else:
self.log.info("Skipping invalid domain (nonexistent/placeholder): %s", domain)
return False

def check_domain_exists_or_is_placeholder(self, domain: str) -> bool:
"""
Check if a domain exists or is a placeholder page.

Args:
domain (str): The domain to check.

Returns:
bool: True if the domain exists and is not a placeholder page, False otherwise.
"""
if Config.Modules.PlaceholderPageContent.ENABLE_PLACEHOLDER_PAGE_DETECTOR:
placeholder_page = PlaceholderPageDetector()
if placeholder_page.is_placeholder(domain):
return False

return self.check_domain_exists(domain)

def check_domain_exists(self, domain: str) -> bool:
"""
Check if a domain exists by looking up its NS and A records.
Expand All @@ -163,10 +180,6 @@ def check_domain_exists(self, domain: str) -> bool:
bool: True if the domain exists, False otherwise.
"""
try:
if Config.Modules.PlaceholderPageContent.ENABLE_PLACEHOLDER_PAGE_DETECTOR:
placeholder_page = PlaceholderPageDetector()
if placeholder_page.is_placeholder(domain):
return False

# Check for NS records
try:
Expand Down Expand Up @@ -548,7 +561,7 @@ def _get_scan_destination(self, task: Task) -> str:
result = task.payload["data"]
elif task.headers["type"] == TaskType.IP:
result = task.payload["ip"]
elif task.headers["type"] == TaskType.DOMAIN:
elif task.headers["type"] == TaskType.DOMAIN or task.headers["type"] == TaskType.DOMAIN_THAT_MAY_NOT_EXIST:
# This is an approximation. Sometimes, when we scan domain, we actually scan the IP the domain
# resolves to (e.g. in port_scan karton), sometimes the domain itself (e.g. the DNS kartons) or
# even the MX servers. Therefore this will not map 1:1 to the actual host being scanned.
Expand Down
14 changes: 14 additions & 0 deletions artemis/modules/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,20 @@ def run(self, current_task: Task) -> None:
else:
data = Classifier._clean_ipv6_brackets(data)

if task_type == TaskType.DOMAIN:
self.add_task(
current_task,
Task(
{"type": TaskType.DOMAIN_THAT_MAY_NOT_EXIST},
payload={
task_type.value: sanitized,
},
payload_persistent={
f"original_{task_type.value}": sanitized,
},
),
)

new_task = Task(
{"type": task_type},
payload={
Expand Down
157 changes: 157 additions & 0 deletions artemis/modules/removed_domain_existing_vhost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import base64
import binascii
import json
import os
import time
from difflib import SequenceMatcher
from typing import Set

import requests
from karton.core import Task
from urllib3.util import connection

from artemis import http_requests, load_risk_class
from artemis.binds import TaskStatus, TaskType
from artemis.config import Config
from artemis.module_base import ArtemisBase
from artemis.utils import build_logger

_orig_create_connection = connection.create_connection


@load_risk_class.load_risk_class(load_risk_class.LoadRiskClass.LOW)
class RemovedDomainExistingVhost(ArtemisBase):
"""
A module that checks that despite removing domain, the corresponding vhost still exists on the server.
"""

identity = "removed_domain_existing_vhost"
filters = [{"type": TaskType.DOMAIN_THAT_MAY_NOT_EXIST.value}]

def _obtain_past_target_ips(self, domain: str) -> Set[str]:
response = http_requests.get(
Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URL + domain,
headers={
"Authorization": "Basic "
+ base64.b64encode(
(
Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_USERNAME
+ ":"
+ Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_PASSWORD
).encode("utf-8")
).decode("ascii")
},
)
time.sleep(
Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_SLEEP_BETWEEN_REQUESTS_SECONDS
)
if response.status_code == 404:
return set()

self.log.info(
"Response for %s: status code=%s, first bytes: %s", domain, response.status_code, response.content[:30]
)
data = response.content
result = set()
for line in data.split("\n"):
if not line:
continue

try:
item = json.loads(line)
except json.decoder.JSONDecodeError:
self.log.error("Unable to parse response: %s", line)
continue

if item["rrtype"] in ["A", "AAAA"]:
result.add(item["rrname"])

return result

@staticmethod
def _request_with_patched_ip(url: str, patch_ip_to: str) -> http_requests.HTTPResponse:
def patched_create_connection(address, *args, **kwargs): # type: ignore
host, port = address
hostname = patch_ip_to

return _orig_create_connection((hostname, port), *args, **kwargs) # type: ignore

connection.create_connection = patched_create_connection
return http_requests.get(url)

def run(self, current_task: Task) -> None:
domain = current_task.get_payload("domain")

if self.check_domain_exists(domain):
self.db.save_task_result(task=current_task, status=TaskStatus.OK, status_reason="Domain exists")
return

target_ips = self._obtain_past_target_ips(domain)
if not target_ips:
self.db.save_task_result(
task=current_task, status=TaskStatus.OK, status_reason="Unable to obtain past target ips"
)
return

prefix = binascii.hexlify(os.urandom(3)).decode("ascii")
for ip in target_ips:
for proto in ["http", "https"]:
try:
response_for_old_domain = self._request_with_patched_ip(proto + "://" + domain, ip)
response_for_other_vhost = self._request_with_patched_ip(proto + "://" + prefix + domain, ip)
except requests.exceptions.RequestException:
self.log.exception("Unable to download website content")
continue

ratio = SequenceMatcher(
None, response_for_old_domain.content, response_for_other_vhost.content
).quick_ratio()
self.log.info(
f"Similarity between correct and incorrect domain is {ratio} nd status code is {response_for_old_domain.status_code}"
)

if (
response_for_old_domain.status_code in range(200, 300)
and ratio
< Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_SIMILARITY_THRESHOLD
):
self.db.save_task_result(
task=current_task,
status=TaskStatus.INTERESTING,
status_reason=f"Detected that {ip} hosts nonexistent domain {domain}",
data={
"ip": ip,
"domain": domain,
"response_for_old_domain": response_for_old_domain.content,
"response_for_other_vhost": response_for_other_vhost.content,
"similarity": ratio,
},
)
return
self.db.save_task_result(
task=current_task,
status=TaskStatus.OK,
status_reason=f"Didn't detect that any of ips: {target_ips} host {domain}",
)


if __name__ == "__main__":
if (
Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_URL
kazet marked this conversation as resolved.
Show resolved Hide resolved
and Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_USERNAME
and Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_PASSIVEDNS_PASSWORD
):
RemovedDomainExistingVhost().loop()
else:
no_pdns_config_message_printed_filename = "/.no-pdns-config-message-shown"

if not os.path.exists(no_pdns_config_message_printed_filename):
# We want to display the message only once
LOGGER = build_logger(__name__)
LOGGER.error(
"PassiveDNS config is required to start the module that detects cases where a server still hosts a domain that doesn't exist anymore."
)
LOGGER.error("Don't worry - all other modules can be used.")

with open(no_pdns_config_message_printed_filename, "w"):
pass
8 changes: 8 additions & 0 deletions artemis/modules/subdomain_enumeration.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ def run(self, current_task: Task) -> None:
)
self.add_task_if_domain_exists(current_task, task)

task = Task(
{"type": TaskType.DOMAIN_THAT_MAY_NOT_EXIST},
payload={
"domain": subdomain,
},
)
self.add_task(current_task, task)

valid_subdomains.update(valid_subdomains_from_tool)

if valid_subdomains:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
from typing import Any, Dict, List

from artemis.config import Config
from artemis.reporting.base.language import Language
from artemis.reporting.base.report import Report
from artemis.reporting.base.report_type import ReportType
from artemis.reporting.base.reporter import Reporter
from artemis.reporting.base.templating import ReportEmailTemplateFragment
from artemis.reporting.utils import get_top_level_target


class RemovedDomainExistingVhostReporter(Reporter):
REMOVED_DOMAIN_EXISTING_VHOST = ReportType("removed_domain_existing_vhost")

@staticmethod
def create_reports(task_result: Dict[str, Any], language: Language) -> List[Report]:
if task_result["headers"]["receiver"] != "removed_domain_existing_vhost":
return []

if not isinstance(task_result["result"], dict):
return []

if not task_result["status"] == "INTERESTING":
return []

if (
Config.Modules.RemovedDomainExistingVhost.REMOVED_DOMAIN_EXISTING_VHOST_REPORT_ONLY_SUBDOMAINS
and task_result["result"]["domain"] == task_result["payload_persistent"].get("original_domain")
):
return []

return [
Report(
top_level_target=get_top_level_target(task_result),
target=task_result["result"]["domain"],
report_type=RemovedDomainExistingVhostReporter.REMOVED_DOMAIN_EXISTING_VHOST,
additional_data={"ip": task_result["result"]["ip"]},
timestamp=task_result["created_at"],
)
]

@staticmethod
def get_email_template_fragments() -> List[ReportEmailTemplateFragment]:
return [
ReportEmailTemplateFragment.from_file(
os.path.join(os.path.dirname(__file__), "template_removed_domain_existing_vhost.jinja2"), priority=3
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{% if "removed_domain_existing_vhost" in data.contains_type %}
<li>
{% trans trimmed %}
The following servers host domains that don't exist anymore:
{% endtrans %}
<ul>
{% for report in data.reports %}
{% if report.report_type == "removed_domain_existing_vhost" %}
<li>
{{ report.additional_data.ip }} {% trans %}hosts the following domain:{% endtrans %} {{ report.target }}
{{ report_meta(report) }}
</li>
{% endif %}
{% endfor %}
</ul>
<p>
{% trans trimmed %}
If the domain has been removed, we also recommend removing the corresponding virtual host
on the web server. Currently, an attacker could communicate with the server directly and
still interact with the site, even though it's considered removed.
{% endtrans %}
</p>
</li>
{% endif %}
Loading
Loading