Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert is_login_page() to excavate rule #1844

Merged
merged 7 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,6 @@ def always_emit(self):
no_host_information = not bool(self.host)
return self._always_emit or always_emit_tags or no_host_information

@property
def quick_emit(self):
no_host_information = not bool(self.host)
return self._quick_emit or no_host_information

@property
def id(self):
"""
Expand Down
48 changes: 0 additions & 48 deletions bbot/core/helpers/web/web.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import re
import logging
import warnings
from pathlib import Path
Expand Down Expand Up @@ -464,53 +463,6 @@ def beautifulsoup(
log.debug(f"Error parsing beautifulsoup: {e}")
return False

user_keywords = [re.compile(r, re.I) for r in ["user", "login", "email"]]
pass_keywords = [re.compile(r, re.I) for r in ["pass"]]

def is_login_page(self, html):
"""
TODO: convert this into an excavate YARA rule

Determines if the provided HTML content contains a login page.

This function parses the HTML to search for forms with input fields typically used for
authentication. If it identifies password fields or a combination of username and password
fields, it returns True.

Args:
html (str): The HTML content to analyze.

Returns:
bool: True if the HTML contains a login page, otherwise False.

Examples:
>>> is_login_page('<form><input type="text" name="username"><input type="password" name="password"></form>')
True

>>> is_login_page('<form><input type="text" name="search"></form>')
False
"""
try:
soup = BeautifulSoup(html, "html.parser")
except Exception as e:
log.debug(f"Error parsing html: {e}")
return False

forms = soup.find_all("form")

# first, check for obvious password fields
for form in forms:
if form.find_all("input", {"type": "password"}):
return True

# next, check for forms that have both a user-like and password-like field
for form in forms:
user_fields = sum(bool(form.find_all("input", {"name": r})) for r in self.user_keywords)
pass_fields = sum(bool(form.find_all("input", {"name": r})) for r in self.pass_keywords)
if user_fields and pass_fields:
return True
return False

def response_to_json(self, response):
"""
Convert web response to JSON object, similar to the output of `httpx -irr -json`
Expand Down
3 changes: 1 addition & 2 deletions bbot/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ def critical(self, *args, trace=True, **kwargs):
self.trace()


class InterceptModule(BaseModule):
class BaseInterceptModule(BaseModule):
"""
An Intercept Module is a special type of high-priority module that gets early access to events.

Expand All @@ -1571,7 +1571,6 @@ class InterceptModule(BaseModule):
"""

accept_dupes = True
suppress_dupes = False
_intercept = True

async def _worker(self):
Expand Down
3 changes: 0 additions & 3 deletions bbot/modules/httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ async def handle_batch(self, *events):
httpx_ip = j.get("host", "")
if httpx_ip:
tags.append(f"ip-{httpx_ip}")
# detect login pages
if self.helpers.web.is_login_page(j.get("body", "")):
tags.append("login-page")
# grab title
title = self.helpers.tagify(j.get("title", ""), maxlen=30)
if title:
Expand Down
4 changes: 2 additions & 2 deletions bbot/modules/internal/cloudcheck.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from bbot.modules.base import InterceptModule
from bbot.modules.base import BaseInterceptModule


class CloudCheck(InterceptModule):
class CloudCheck(BaseInterceptModule):
watched_events = ["*"]
meta = {"description": "Tag events by cloud provider, identify cloud resources like storage buckets"}
scope_distance_modifier = 1
Expand Down
4 changes: 2 additions & 2 deletions bbot/modules/internal/dnsresolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

from bbot.errors import ValidationError
from bbot.core.helpers.dns.engine import all_rdtypes
from bbot.modules.base import InterceptModule, BaseModule
from bbot.core.helpers.dns.helpers import extract_targets
from bbot.modules.base import BaseInterceptModule, BaseModule


class DNSResolve(InterceptModule):
class DNSResolve(BaseInterceptModule):
watched_events = ["*"]
_priority = 1
scope_distance_modifier = None
Expand Down
52 changes: 49 additions & 3 deletions bbot/modules/internal/excavate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from bbot.errors import ExcavateError
import bbot.core.helpers.regexes as bbot_regexes
from bbot.modules.base import BaseInterceptModule
from bbot.modules.internal.base import BaseInternalModule
from urllib.parse import urlparse, urljoin, parse_qs, urlunparse

Expand Down Expand Up @@ -279,7 +280,7 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte
await self.report(event_data, event, yara_rule_settings, discovery_context)


class excavate(BaseInternalModule):
class excavate(BaseInternalModule, BaseInterceptModule):
"""
Example (simple) Excavate Rules:

Expand Down Expand Up @@ -310,6 +311,7 @@ class excavateTestRule(ExcavateRule):
"custom_yara_rules": "Include custom Yara rules",
}
scope_distance_modifier = None
accept_dupes = False

_module_threads = 8

Expand Down Expand Up @@ -669,8 +671,32 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte

class URLExtractor(ExcavateRule):
yara_rules = {
"url_full": r'rule url_full { meta: tags = "spider-danger" description = "contains full URL" strings: $url_full = /https?:\/\/([\w\.-]+)(:\d{1,5})?([\/\w\.-]*)/ condition: $url_full }',
"url_attr": r'rule url_attr { meta: tags = "spider-danger" description = "contains tag with src or href attribute" strings: $url_attr = /<[^>]+(href|src)=["\'][^"\']*["\'][^>]*>/ condition: $url_attr }',
"url_full": (
r"""
rule url_full {
meta:
tags = "spider-danger"
description = "contains full URL"
strings:
$url_full = /https?:\/\/([\w\.-]+)(:\d{1,5})?([\/\w\.-]*)/
condition:
$url_full
}
"""
),
"url_attr": (
r"""
rule url_attr {
meta:
tags = "spider-danger"
description = "contains tag with src or href attribute"
strings:
$url_attr = /<[^>]+(href|src)=["\'][^"\']*["\'][^>]*>/
condition:
$url_attr
}
"""
),
}
full_url_regex = re.compile(r"(https?)://((?:\w|\d)(?:[\d\w-]+\.?)+(?::\d{1,5})?(?:/[-\w\.\(\)]*[-\w\.]+)*/?)")
full_url_regex_strict = re.compile(r"^(https?):\/\/([\w.-]+)(?::\d{1,5})?(\/[\w\/\.-]*)?(\?[^\s]+)?$")
Expand Down Expand Up @@ -749,6 +775,26 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte
for domain_str in yara_results[identifier]:
await self.report(domain_str, event, yara_rule_settings, discovery_context, event_type="DNS_NAME")

class LoginPageExtractor(ExcavateRule):
yara_rules = {
"login_page": r"""
rule login_page {
meta:
description = "Detects login pages with username and password fields"
strings:
$username_field = /<input[^>]+name=["']?(user|login|email)/ nocase
$password_field = /<input[^>]+name=["']?passw?/ nocase
condition:
$username_field and $password_field
}
"""
}

async def process(self, yara_results, event, yara_rule_settings, discovery_context):
self.excavate.critical(f"Login page detected: {event.data['url']}")
if yara_results:
event.add_tag("login-page")

def add_yara_rule(self, rule_name, rule_content, rule_instance):
rule_instance.name = rule_name
self.yara_rules_dict[rule_name] = rule_content
Expand Down
4 changes: 3 additions & 1 deletion bbot/modules/internetdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class internetdb(BaseModule):
"show_open_ports": "Display OPEN_TCP_PORT events in output, even if they didn't lead to an interesting discovery"
}

# we get lots of 404s, that's normal
_api_failure_abort_threshold = 9999999999

_qsize = 500

base_url = "https://internetdb.shodan.io"
Expand Down Expand Up @@ -113,7 +116,6 @@ async def _parse_response(self, data: dict, event, ip):
"OPEN_TCP_PORT",
parent=event,
internal=(not self.show_open_ports),
quick=True,
context=f'{{module}} queried Shodan\'s InternetDB API for "{query_host}" and found {{event.type}}: {{event.data}}',
)
vulns = data.get("vulns", [])
Expand Down
18 changes: 4 additions & 14 deletions bbot/scanner/manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
from contextlib import suppress

from bbot.modules.base import InterceptModule
from bbot.modules.base import BaseInterceptModule


class ScanIngress(InterceptModule):
class ScanIngress(BaseInterceptModule):
"""
This is always the first intercept module in the chain, responsible for basic scope checks

Expand All @@ -15,9 +15,7 @@ class ScanIngress(InterceptModule):
# accept all events regardless of scope distance
scope_distance_modifier = None
_name = "_scan_ingress"

# small queue size so we don't drain modules' outgoing queues
_qsize = 10
_qsize = -1

@property
def priority(self):
Expand Down Expand Up @@ -115,14 +113,6 @@ async def handle_event(self, event, **kwargs):
# nerf event's priority if it's not in scope
event.module_priority += event.scope_distance

async def forward_event(self, event, kwargs):
# if a module qualifies for "quick-emit", we skip all the intermediate modules like dns and cloud
# and forward it straight to the egress module
if event.quick_emit:
await self.scan.egress_module.queue_event(event, kwargs)
else:
await super().forward_event(event, kwargs)

@property
def non_intercept_modules(self):
if self._non_intercept_modules is None:
Expand Down Expand Up @@ -169,7 +159,7 @@ def is_incoming_duplicate(self, event, add=False):
return False


class ScanEgress(InterceptModule):
class ScanEgress(BaseInterceptModule):
"""
This is always the last intercept module in the chain, responsible for executing and acting on the
`abort_if` and `on_success_callback` functions.
Expand Down
4 changes: 3 additions & 1 deletion bbot/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ async def _prep(self):
self.debug(
f"Setting intercept module {intercept_module.name}._incoming_event_queue to previous intercept module {prev_intercept_module.name}.outgoing_event_queue"
)
intercept_module._incoming_event_queue = prev_intercept_module.outgoing_event_queue
interqueue = asyncio.Queue()
intercept_module._incoming_event_queue = interqueue
prev_intercept_module._outgoing_event_queue = interqueue

# abort if there are no output modules
num_output_modules = len([m for m in self.modules.values() if m._type == "output"])
Expand Down
23 changes: 12 additions & 11 deletions bbot/test/test_step_1/test_modules_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,21 +380,22 @@ async def handle_event(self, event):
scan.modules["dummy"] = dummy(scan)
events = [e async for e in scan.async_start()]

assert len(events) == 9
assert len(events) == 10
for e in events:
log.critical(e)
assert 2 == len([e for e in events if e.type == "SCAN"])
assert 3 == len([e for e in events if e.type == "DNS_NAME"])
assert 4 == len([e for e in events if e.type == "DNS_NAME"])
# one from target and one from speculate
assert 2 == len([e for e in events if e.type == "DNS_NAME" and e.data == "evilcorp.com"])
# the reason we don't have a DNS_NAME for www.evilcorp.com is because FINDING.quick_emit = True
assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.evilcorp.com"])
assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.evilcorp.com"])
assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "asdf.evilcorp.com"])
assert 1 == len([e for e in events if e.type == "ORG_STUB" and e.data == "evilcorp"])
assert 1 == len([e for e in events if e.type == "FINDING"])
assert 1 == len([e for e in events if e.type == "URL_UNVERIFIED"])

assert scan.stats.events_emitted_by_type == {
"SCAN": 1,
"DNS_NAME": 3,
"DNS_NAME": 4,
"URL": 1,
"ORG_STUB": 1,
"URL_UNVERIFIED": 1,
Expand All @@ -414,34 +415,34 @@ async def handle_event(self, event):
assert dummy_stats.produced == {"FINDING": 1, "URL": 1}
assert dummy_stats.produced_total == 2
assert dummy_stats.consumed == {
"DNS_NAME": 2,
"DNS_NAME": 3,
"FINDING": 1,
"OPEN_TCP_PORT": 1,
"ORG_STUB": 1,
"SCAN": 1,
"URL": 1,
"URL_UNVERIFIED": 1,
}
assert dummy_stats.consumed_total == 8
assert dummy_stats.consumed_total == 9

python_stats = scan.stats.module_stats["python"]
assert python_stats.produced == {}
assert python_stats.produced_total == 0
assert python_stats.consumed == {
"DNS_NAME": 3,
"DNS_NAME": 4,
"FINDING": 1,
"ORG_STUB": 1,
"SCAN": 1,
"URL": 1,
"URL_UNVERIFIED": 1,
}
assert python_stats.consumed_total == 8
assert python_stats.consumed_total == 9

speculate_stats = scan.stats.module_stats["speculate"]
assert speculate_stats.produced == {"DNS_NAME": 1, "URL_UNVERIFIED": 1, "ORG_STUB": 1}
assert speculate_stats.produced_total == 3
assert speculate_stats.consumed == {"URL": 1, "DNS_NAME": 2, "URL_UNVERIFIED": 1, "IP_ADDRESS": 2}
assert speculate_stats.consumed_total == 6
assert speculate_stats.consumed == {"URL": 1, "DNS_NAME": 3, "URL_UNVERIFIED": 1, "IP_ADDRESS": 3}
assert speculate_stats.consumed_total == 8


@pytest.mark.asyncio
Expand Down
4 changes: 3 additions & 1 deletion bbot/test/test_step_2/module_tests/test_module_httpx.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .base import ModuleTestBase


class TestHTTPX(ModuleTestBase):
class TestHTTPXBase(ModuleTestBase):
targets = ["http://127.0.0.1:8888/url", "127.0.0.1:8888"]
module_name = "httpx"
modules_overrides = ["httpx", "excavate"]
config_overrides = {"modules": {"httpx": {"store_responses": True}}}

# HTML for a page with a login form
Expand Down
Loading