Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocklist blocks scanning, not only reporting #585

Merged
merged 10 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions artemis/blocklist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import dataclasses
import datetime
import enum
import ipaddress
from typing import List, Optional, Union

import yaml

from artemis import utils
from artemis.domains import is_domain, is_subdomain
from artemis.reporting.base.report import Report
from artemis.reporting.base.report_type import ReportType

logger = utils.build_logger(__name__)


class UnsupportedBlocklistItem(Exception):
pass


class BlocklistMode(str, enum.Enum):
BLOCK_SCANNING_AND_REPORTING = "block_scanning_and_reporting"
BLOCK_REPORTING_ONLY = "block_reporting_only"


@dataclasses.dataclass
class BlocklistItem:
# each BlocklistItem is a filter that:
# - if `mode` is block_scanning_and_reporting, blocks scanning as well as reporting. If `mode` is
# block_reporting_only, blocks only reporting,
# - matches all the non-null items: domain, ip_range, ...
# - if all match, report/scanning is skipped.
# The same is repeated for all BlocklistItems - if at least one matches, report/scanning is skipped.
mode: BlocklistMode
domain: Optional[str] = None
ip_range: Optional[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]] = None
until: Optional[datetime.datetime] = None
karton_name: Optional[str] = None
report_target_should_contain: Optional[str] = None
report_type: Optional[ReportType] = None


class BlocklistError(Exception):
pass


def load_blocklist(file_path: Optional[str]) -> List[BlocklistItem]:
if not file_path:
return []

with open(file_path, "r") as file:
data = yaml.safe_load(file)

expected_keys = {
"mode",
"domain",
"ip_range",
"until",
"karton_name",
"report_target_should_contain",
"report_type",
}

for item in data:
# Assert there are no additional keys
unexpected_keys = set(item.keys()) - expected_keys
if unexpected_keys:
raise BlocklistError(f"Unexpected keys in entry: {','.join(unexpected_keys)}")

blocklist_items = [
BlocklistItem(
mode=BlocklistMode(item["mode"]),
domain=item.get("domain", None),
ip_range=ipaddress.ip_network(item["ip_range"], strict=False) if item.get("ip_range", None) else None,
until=datetime.datetime.strptime(item["until"], "%Y-%m-%d") if item.get("until", None) else None,
karton_name=item.get("karton_name", None),
report_target_should_contain=item.get("report_target_should_contain", None),
report_type=item.get("report_Type", None),
)
for item in data
]

return blocklist_items


def should_block_scanning(
domain: Optional[str], ip: Optional[str], karton_name: str, blocklist: List[BlocklistItem]
) -> bool:
logger.info("checking whether scanning of domain=%s ip=%s by %s is filtered", domain, ip, karton_name)
for item in blocklist:
if item.mode != BlocklistMode.BLOCK_SCANNING_AND_REPORTING:
continue

if item.domain:
if not domain:
continue
if not is_subdomain(domain, item.domain):
continue

if item.ip_range:
if not ip:
continue
if ipaddress.IPv4Address(ip) not in item.ip_range:
continue

if item.until:
if datetime.datetime.now() >= item.until:
continue

if item.karton_name and karton_name != item.karton_name:
continue

if item.report_target_should_contain:
raise UnsupportedBlocklistItem(
"If a blocklist item is set to block scanning, report_target_should_contain "
"cannot be provided, as the report targets are determined during e-mail report generation "
"(https://artemis-scanner.readthedocs.io/en/latest/generating-emails.html) and "
"a single scanning module can cause different targets to be generated (e.g. "
"for files found by the bruter module, the target would be their url, such as "
"https://example.com/wp-config.php.bak)."
)

if item.report_type:
raise UnsupportedBlocklistItem(
"If a blocklist item is set to block scanning, report type cannot be provided, as "
"report types are determined during e-mail report generation "
"(https://artemis-scanner.readthedocs.io/en/latest/generating-emails.html) and "
"a single scanning module can cause different report types to be generated."
)

logger.info(
"scanning of domain=%s ip=%s by %s filtered due to blocklist rule %s", domain, ip, karton_name, item
)
return True
return False


def blocklist_reports(reports: List[Report], blocklist: List[BlocklistItem]) -> List[Report]:
result = []
for report in reports:
filtered = False
for item in blocklist:
if item.domain:
domain = report.top_level_target if is_domain(report.top_level_target) else None
if report.last_domain:
domain = report.last_domain
if not domain:
continue
if not is_subdomain(domain, item.domain):
continue

if item.ip_range:
if not report.target_ip:
continue
if ipaddress.IPv4Address(report.target_ip) not in item.ip_range:
continue

if item.until:
if not report.timestamp:
continue
if report.timestamp >= item.until:
continue

if item.karton_name and report.original_karton_name != item.karton_name:
continue

if item.report_target_should_contain and item.report_target_should_contain not in report.target:
continue

if item.report_type and report.report_type != item.report_type:
continue

filtered = True
logger.info(
"report from %s (type=%s) in %s filtered due to blocklist rule %s",
report.original_karton_name,
report.report_type,
report.target,
item,
)

if not filtered:
result.append(report)
return result
5 changes: 5 additions & 0 deletions artemis/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ class Limits:
] = get_config("SECONDS_PER_REQUEST", default=0, cast=int)

class Miscellaneous:
BLOCKLIST_FILE: Annotated[
str,
"A file that determines what should not be scanned or reported",
] = get_config("BLOCKLIST_FILE", default=None)

CUSTOM_USER_AGENT: Annotated[
str,
"Custom User-Agent string used by Artemis (if not set, the library defaults will be used, different for requests, Nuclei etc.)",
Expand Down
41 changes: 40 additions & 1 deletion artemis/module_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
from redis import Redis

from artemis.binds import TaskStatus, TaskType
from artemis.blocklist import load_blocklist, should_block_scanning
from artemis.config import Config
from artemis.db import DB
from artemis.domains import is_domain
from artemis.redis_cache import RedisCache
from artemis.resolvers import ip_lookup
from artemis.resource_lock import FailedToAcquireLockException, ResourceLock
from artemis.retrying_resolver import setup_retrying_resolver
from artemis.task_utils import get_target_host
from artemis.utils import is_ip_address

REDIS = Redis.from_url(Config.Data.REDIS_CONN_STR)

Expand Down Expand Up @@ -49,6 +53,11 @@ def __init__(self, db: Optional[DB] = None, *args, **kwargs) -> None: # type: i
self.lock = ResourceLock(redis=REDIS, res_name=self.identity)
self.redis = REDIS

if Config.Miscellaneous.BLOCKLIST_FILE:
self._blocklist = load_blocklist(Config.Miscellaneous.BLOCKLIST_FILE)
else:
self._blocklist = []

if db:
self.db = db
else:
Expand Down Expand Up @@ -141,8 +150,13 @@ def loop(self) -> None:
tasks = []
for _ in range(self.task_max_batch_size):
task = self._consume_random_routed_task(self.identity)

if task:
if self.identity in task.payload_persistent.get("disabled_modules", []):
if self._is_blocklisted(task):
self.log.info("Task %s is blocklisted for module %s", task, self.identity)
self.backend.increment_metrics(KartonMetrics.TASK_CONSUMED, self.identity)
self.backend.set_task_status(task, KartonTaskState.FINISHED)
elif self.identity in task.payload_persistent.get("disabled_modules", []):
self.log.info("Module %s disabled for task %s", self.identity, task)
self.backend.increment_metrics(KartonMetrics.TASK_CONSUMED, self.identity)
self.backend.set_task_status(task, KartonTaskState.FINISHED)
Expand Down Expand Up @@ -172,6 +186,31 @@ def _consume_random_routed_task(self, identity: str) -> Optional[Task]:
return task
return None

def _is_blocklisted(self, task: Task) -> bool:
host = get_target_host(task)

if is_domain(host):
try:
ip_addresses = list(ip_lookup(host))
except Exception as e:
self.log.error(f"Exception while trying to obtain IP for host {host}", e)
ip_addresses = []

if ip_addresses:
for ip in ip_addresses:
if should_block_scanning(domain=host, ip=ip, karton_name=self.identity, blocklist=self._blocklist):
return True
else:
if should_block_scanning(domain=host, ip=None, karton_name=self.identity, blocklist=self._blocklist):
return True
elif is_ip_address(host):
domain = task.payload.get("last_domain", None)
if should_block_scanning(domain=domain, ip=host, karton_name=self.identity, blocklist=self._blocklist):
return True
else:
assert False, f"expected {host} to be either domain or an IP address"
return False

def reschedule_task(self, task: Task) -> None:
"""
Puts task back into the queue.
Expand Down
102 changes: 0 additions & 102 deletions artemis/reporting/blocklist.py

This file was deleted.

4 changes: 2 additions & 2 deletions artemis/reporting/export/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from karton.core import Task
from tqdm import tqdm

from artemis.blocklist import BlocklistItem, blocklist_reports
from artemis.config import Config
from artemis.db import DB
from artemis.reporting.base.language import Language
from artemis.reporting.base.report import Report
from artemis.reporting.base.reporters import reports_from_task_result
from artemis.reporting.blocklist import BlocklistItem, filter_blocklist
from artemis.reporting.utils import get_top_level_target
from artemis.task_utils import get_target_host

Expand Down Expand Up @@ -71,7 +71,7 @@ def _initialize_data_if_needed(self) -> None:
report_to_add.original_task_target_string = task_result["target_string"]
report_to_add.last_domain = task_result["payload"].get("last_domain", None)

self._reports.extend(filter_blocklist(reports_to_add, self._blocklist))
self._reports.extend(blocklist_reports(reports_to_add, self._blocklist))
self._data_initialized = True

@property
Expand Down
Loading