diff --git a/artemis/config.py b/artemis/config.py index 5ec800c81..20c7f1c45 100644 --- a/artemis/config.py +++ b/artemis/config.py @@ -437,6 +437,11 @@ class WordPressBruter: "www123, when testing www.projectname.example.com.", ] = get_config("WORDPRESS_BRUTER_STRIPPED_PREFIXES", default="www", cast=decouple.Csv(str)) + class DomainExpirationScanner: + DOMAIN_EXPIRATION_TIMEFRAME_DAYS: Annotated[ + int, "The scanner warns if the domain's expiration date falls within this time frame from now." + ] = get_config("DOMAIN_EXPIRATION_TIMEFRAME_DAYS", default=5, cast=int) + @staticmethod def verify_each_variable_is_annotated() -> None: def verify_class(cls: type) -> None: diff --git a/artemis/modules/domain_expiration_scanner.py b/artemis/modules/domain_expiration_scanner.py new file mode 100644 index 000000000..08363ab9c --- /dev/null +++ b/artemis/modules/domain_expiration_scanner.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +import datetime +import time +from typing import Any, Dict, Optional + +from karton.core import Task +from whois import Domain, WhoisQuotaExceeded, query # type: ignore + +from artemis.binds import TaskStatus, TaskType +from artemis.config import Config +from artemis.domains import is_main_domain +from artemis.module_base import ArtemisBase + + +class DomainExpirationScanner(ArtemisBase): + """ + Alerts if domain expiration date is coming. + """ + + identity = "domain_expiration_scanner" + filters = [{"type": TaskType.DOMAIN.value}] + + def run(self, current_task: Task) -> None: + domain = current_task.get_payload(TaskType.DOMAIN) + result: Dict[str, Any] = {} + status = TaskStatus.OK + status_reason = None + if is_main_domain(domain): + try: + domain_data = self._query_whois(domain=domain) + except WhoisQuotaExceeded: + time.sleep(24 * 60 * 60) + domain_data = self._query_whois(domain=domain) + + expiry_date = domain_data.expiration_date + result = self._prepare_expiration_data(expiration_date=expiry_date, result=result) + + if "close_expiration_date" in result: + status = TaskStatus.INTERESTING + status_reason = self._prepare_expiration_status_reason( + days_to_expire=result["days_to_expire"], expiration_date=result["expiration_date"] + ) + + self.db.save_task_result(task=current_task, status=status, status_reason=status_reason, data=result) + + @staticmethod + def _query_whois(domain: str) -> Domain: + return query(domain) + + @staticmethod + def _prepare_expiration_data( + expiration_date: Optional[datetime.datetime], result: Dict[str, Any] + ) -> Dict[str, Any]: + days_to_expire = None + now = datetime.datetime.now() + if expiration_date: + days_to_expire = (expiration_date - now).days + result["expiration_date"] = expiration_date + if days_to_expire and days_to_expire <= Config.Modules.DomainExpirationScanner.DOMAIN_EXPIRATION_TIMEFRAME_DAYS: + result["close_expiration_date"] = True + result["days_to_expire"] = days_to_expire + return result + + @staticmethod + def _prepare_expiration_status_reason(days_to_expire: int, expiration_date: datetime.datetime) -> str: + return ( + f"Scanned domain will expire in {days_to_expire} days - (on {expiration_date})." + if days_to_expire != 1 + else f"Scanned domain will expire in {days_to_expire} day - (on {expiration_date})." + ) + + +if __name__ == "__main__": + DomainExpirationScanner().loop() diff --git a/artemis/reporting/base/reporter.py b/artemis/reporting/base/reporter.py index 81b0a953b..9fab46419 100644 --- a/artemis/reporting/base/reporter.py +++ b/artemis/reporting/base/reporter.py @@ -2,7 +2,13 @@ from typing import Any, Callable, Dict, List, Tuple from .language import Language -from .normal_form import NormalForm, get_url_normal_form, get_url_score +from .normal_form import ( + NormalForm, + get_domain_normal_form, + get_domain_score, + get_url_normal_form, + get_url_score, +) from .report import Report from .report_type import ReportType from .templating import ReportEmailTemplateFragment @@ -64,13 +70,17 @@ def dict_to_tuple(d: Dict[str, str]) -> Tuple[Tuple[str, str], ...]: @staticmethod def default_scoring_rule(report: Report) -> List[int]: - return [get_url_score(report.target)] + assert report.target_is_url() or report.target_is_domain() + return [get_url_score(report.target) if report.target_is_url() else get_domain_score(report.target)] @staticmethod def default_normal_form_rule(report: Report) -> NormalForm: + assert report.target_is_url() or report.target_is_domain() return Reporter.dict_to_tuple( { "type": report.report_type, - "target": get_url_normal_form(report.target), + "target": get_url_normal_form(report.target) + if report.target_is_url() + else get_domain_normal_form(report.target), } ) diff --git a/artemis/reporting/modules/domain_expiration_scanner/reporter.py b/artemis/reporting/modules/domain_expiration_scanner/reporter.py new file mode 100644 index 000000000..cc81efa7d --- /dev/null +++ b/artemis/reporting/modules/domain_expiration_scanner/reporter.py @@ -0,0 +1,47 @@ +import os +from typing import Any, Dict, List + +from artemis.reporting.base.language import Language +from artemis.reporting.base.report import Report +from artemis.reporting.base.report_type import ReportType +from artemis.reporting.base.reporter import Reporter +from artemis.reporting.base.templating import ReportEmailTemplateFragment +from artemis.reporting.utils import get_top_level_target + + +class DomainExpirationScannerReporter(Reporter): + CLOSE_DOMAIN_EXPIRATION_DATE = ReportType("close_domain_expiration_date") + + @staticmethod + def create_reports(task_result: Dict[str, Any], language: Language) -> List[Report]: + if task_result["headers"]["receiver"] != "domain_expiration_scanner": + return [] + + if not task_result["status"] == "INTERESTING": + return [] + + if not isinstance(task_result["result"], dict): + return [] + + data = task_result["result"] + expiration_date_from_result = data["expiration_date"] + expiration_date = expiration_date_from_result.strftime("%d-%m-%Y") + + return [ + Report( + top_level_target=get_top_level_target(task_result), + target=task_result["payload"]["domain"], + report_type=DomainExpirationScannerReporter.CLOSE_DOMAIN_EXPIRATION_DATE, + additional_data={"expiration_date": expiration_date}, + timestamp=task_result["created_at"], + ) + ] + + @staticmethod + def get_email_template_fragments() -> List[ReportEmailTemplateFragment]: + return [ + ReportEmailTemplateFragment.from_file( + os.path.join(os.path.dirname(__file__), "template_close_domain_expiration_scanner.jinja2"), + priority=5, + ), + ] diff --git a/artemis/reporting/modules/domain_expiration_scanner/template_close_domain_expiration_scanner.jinja2 b/artemis/reporting/modules/domain_expiration_scanner/template_close_domain_expiration_scanner.jinja2 new file mode 100644 index 000000000..ac5ed0a38 --- /dev/null +++ b/artemis/reporting/modules/domain_expiration_scanner/template_close_domain_expiration_scanner.jinja2 @@ -0,0 +1,13 @@ +{% if "close_domain_expiration_date" in data.contains_type %} +
{{ report.target }} - {% trans %}will expire on{% endtrans %} {{ report.additional_data["expiration_date"] }}
+