diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py index 8e357378a..002303f30 100644 --- a/bbot/core/helpers/misc.py +++ b/bbot/core/helpers/misc.py @@ -1200,3 +1200,39 @@ def weighted_shuffle(items, weights): shuffled_items.append(chosen_item) return shuffled_items + + +def parse_port_string(port_string): + elements = port_string.split(",") + ports = [] + + for element in elements: + if element.isdigit(): + port = int(element) + if 1 <= port <= 65535: + ports.append(port) + else: + raise ValueError(f"Invalid port: {element}") + elif "-" in element: + range_parts = element.split("-") + if len(range_parts) != 2 or not all(part.isdigit() for part in range_parts): + raise ValueError(f"Invalid port or port range: {element}") + start, end = map(int, range_parts) + if not (1 <= start < end <= 65535): + raise ValueError(f"Invalid port range: {element}") + ports.extend(range(start, end + 1)) + else: + raise ValueError(f"Invalid port or port range: {element}") + + return ports + + +def parse_list_string(list_string): + elements = list_string.split(",") + result = [] + + for element in elements: + if any((c in '<>:"/\\|?*') or (ord(c) < 32 and c != " ") for c in element): + raise ValueError(f"Invalid character in string: {element}") + result.append(element) + return result diff --git a/bbot/modules/deadly/ffuf.py b/bbot/modules/deadly/ffuf.py index 709b1d1be..2ef1938f9 100644 --- a/bbot/modules/deadly/ffuf.py +++ b/bbot/modules/deadly/ffuf.py @@ -1,4 +1,5 @@ from bbot.modules.base import BaseModule +from bbot.core.helpers.misc import parse_list_string import random import string @@ -54,7 +55,12 @@ async def setup(self): self.wordlist_lines = list(self.helpers.read_file(self.wordlist)) self.tempfile, tempfile_len = self.generate_templist() self.verbose(f"Generated dynamic wordlist with length [{str(tempfile_len)}]") - self.extensions = self.config.get("extensions", "") + try: + self.extensions = parse_list_string(self.config.get("extensions", "")) + self.critical(f"Using custom extensions: [{','.join(self.extensions)}]") + except ValueError as e: + self.warning(f"Error parsing extensions: {e}") + return False return True async def handle_event(self, event): @@ -72,7 +78,7 @@ async def handle_event(self, event): exts = ["", "/"] if self.extensions: - for ext in self.extensions.split(","): + for ext in self.extensions: exts.append(f".{ext}") filters = await self.baseline_ffuf(fixed_url, exts=exts) diff --git a/bbot/modules/ffuf_shortnames.py b/bbot/modules/ffuf_shortnames.py index d181319f0..3ac991967 100644 --- a/bbot/modules/ffuf_shortnames.py +++ b/bbot/modules/ffuf_shortnames.py @@ -3,6 +3,7 @@ import string from bbot.modules.deadly.ffuf import ffuf +from bbot.core.helpers.misc import parse_list_string def find_common_prefixes(strings, minimum_set_length=4): @@ -86,7 +87,14 @@ async def setup(self): wordlist_extensions = f"{self.helpers.wordlist_dir}/raft-small-extensions-lowercase_CLEANED.txt" self.debug(f"Using [{wordlist_extensions}] for shortname candidate extension list") self.wordlist_extensions = await self.helpers.wordlist(wordlist_extensions) - self.extensions = self.config.get("extensions") + + try: + self.extensions = parse_list_string(self.config.get("extensions", "")) + self.critical(f"Using custom extensions: [{','.join(self.extensions)}]") + except ValueError as e: + self.warning(f"Error parsing extensions: {e}") + return False + self.ignore_redirects = self.config.get("ignore_redirects") self.per_host_collection = {} diff --git a/bbot/modules/internal/speculate.py b/bbot/modules/internal/speculate.py index 1e00d1bdb..0e2b6b9ce 100644 --- a/bbot/modules/internal/speculate.py +++ b/bbot/modules/internal/speculate.py @@ -1,6 +1,7 @@ import random import ipaddress +from bbot.core.helpers.misc import parse_port_string from bbot.modules.internal.base import BaseInternalModule @@ -24,7 +25,7 @@ class speculate(BaseInternalModule): flags = ["passive"] meta = {"description": "Derive certain event types from others by common sense"} - options = {"max_hosts": 65536, "ports": [80, 443]} + options = {"max_hosts": 65536, "ports": "80,443"} options_desc = { "max_hosts": "Max number of IP_RANGE hosts to convert into IP_ADDRESS events", "ports": "The set of ports to speculate on", @@ -40,9 +41,14 @@ async def setup(self): self.range_to_ip = True self.dns_resolution = self.scan.config.get("dns_resolution", True) - self.ports = self.config.get("ports", [80, 443]) - if isinstance(self.ports, int): - self.ports = [self.ports] + port_string = self.config.get("ports", "80,443") + + try: + self.ports = parse_port_string(port_string) + except ValueError as e: + self.warning(f"Error parsing ports: {e}") + return False + if not self.portscanner_enabled: self.info(f"No portscanner enabled. Assuming open ports: {', '.join(str(x) for x in self.ports)}") diff --git a/bbot/test/test_step_1/test_helpers.py b/bbot/test/test_step_1/test_helpers.py index f7dd72c57..87bf518a1 100644 --- a/bbot/test/test_step_1/test_helpers.py +++ b/bbot/test/test_step_1/test_helpers.py @@ -580,3 +580,67 @@ async def async_gen(): except StopIteration: break assert l == [0, 1, 2, 3, 4] + + +# test parse_port_string helper + + +def test_portparse_singleports(helpers): + assert helpers.parse_port_string("80,443,22") == [80, 443, 22] + + +def test_portparse_range_valid(helpers): + assert helpers.parse_port_string("80,443,22,1000-1002") == [80, 443, 22, 1000, 1001, 1002] + + +def test_portparse_invalidport(helpers): + with pytest.raises(ValueError) as e: + helpers.parse_port_string("80,443,22,70000") + assert str(e.value) == "Invalid port: 70000" + + +def test_portparse_range_invalid(helpers): + with pytest.raises(ValueError) as e: + helpers.parse_port_string("80,443,22,1000-70000") + assert str(e.value) == "Invalid port range: 1000-70000" + + +def test_portparse_range_morethantwoparts(helpers): + with pytest.raises(ValueError) as e: + helpers.parse_port_string("80,443,22,1000-1001-1002") + assert str(e.value) == "Invalid port or port range: 1000-1001-1002" + + +def test_portparse_range_startgreaterthanend(helpers): + with pytest.raises(ValueError) as e: + helpers.parse_port_string("80,443,22,1002-1000") + assert str(e.value) == "Invalid port range: 1002-1000" + + +def test_portparse_nonnumericinput(helpers): + with pytest.raises(ValueError) as e: + helpers.parse_port_string("80,443,22,foo") + assert str(e.value) == "Invalid port or port range: foo" + + +# test parse_list_string helper + + +def test_liststring_valid_strings(helpers): + assert helpers.parse_list_string("hello,world,bbot") == ["hello", "world", "bbot"] + + +def test_liststring_invalid_string(helpers): + with pytest.raises(ValueError) as e: + helpers.parse_list_string("hello,world,\x01") + assert str(e.value) == "Invalid character in string: \x01" + + +def test_liststring_singleitem(helpers): + assert helpers.parse_list_string("hello") == ["hello"] + + +def test_liststring_invalidfnchars(helpers): + with pytest.raises(ValueError) as e: + helpers.parse_list_string("hello,world,bbot|test") + assert str(e.value) == "Invalid character in string: bbot|test"