From 2f86436083ef14bed11334d7698158377b6028d9 Mon Sep 17 00:00:00 2001 From: Nikita Kryukov Date: Mon, 31 Aug 2020 13:26:11 +0700 Subject: [PATCH 1/2] Check email in breaches module (#56) * Add unfinished email_breach module * Add email breach module * Remove debug code * Skip tests on 429. Close session on error * Make function for handling errors. Pretify get_random_string function. * Fix response messages a little Co-authored-by: manmolecular --- src/scripts/osint/email_breach/__init__.py | 5 ++ src/scripts/osint/email_breach/__main__.py | 12 +++ src/scripts/osint/email_breach/module.py | 83 +++++++++++++++++++ .../osint/email_breach/requirements.txt | 12 +++ src/scripts/osint/email_breach/test_module.py | 74 +++++++++++++++++ 5 files changed, 186 insertions(+) create mode 100644 src/scripts/osint/email_breach/__init__.py create mode 100644 src/scripts/osint/email_breach/__main__.py create mode 100644 src/scripts/osint/email_breach/module.py create mode 100644 src/scripts/osint/email_breach/requirements.txt create mode 100644 src/scripts/osint/email_breach/test_module.py diff --git a/src/scripts/osint/email_breach/__init__.py b/src/scripts/osint/email_breach/__init__.py new file mode 100644 index 0000000..acdd053 --- /dev/null +++ b/src/scripts/osint/email_breach/__init__.py @@ -0,0 +1,5 @@ +import sys +from pathlib import Path + +__root_dir = Path(__file__).parents[4] +sys.path.append(str(__root_dir)) diff --git a/src/scripts/osint/email_breach/__main__.py b/src/scripts/osint/email_breach/__main__.py new file mode 100644 index 0000000..b58e333 --- /dev/null +++ b/src/scripts/osint/email_breach/__main__.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +from pprint import pprint +from sys import argv + +from src.core.utils.module import run_module +from .module import Runner + +result = run_module( + Runner, args=argv, arg_name="email", arg_default="johndoe@gmail.com" +) +pprint(result) diff --git a/src/scripts/osint/email_breach/module.py b/src/scripts/osint/email_breach/module.py new file mode 100644 index 0000000..f6afd96 --- /dev/null +++ b/src/scripts/osint/email_breach/module.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 + +from hashlib import sha1 +from re import findall + +from bs4 import BeautifulSoup +from requests import Session + +from src.core.base.osint import OsintRunner, PossibleKeys +from src.core.utils.response import ScriptResponse +from src.core.utils.validators import validate_kwargs + + +class Defaults: + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/84.0.4147.105 Safari/537.36", + } + + +class Runner(OsintRunner): + """ + Check email in public breaches. + """ + + required = ["email"] + + def __init__(self, logger: str = __name__): + super(Runner, self).__init__(logger) + + @validate_kwargs(PossibleKeys.KEYS) + def run(self, *args, **kwargs) -> ScriptResponse.success or ScriptResponse.error: + """ + Take email and look it up in breaches. + Breaches info is provided by monitor.firefox.com (haveibeenpwned.com) + """ + email = kwargs.get("email") + if not isinstance(email, str): + return ScriptResponse.error( + result=None, + message=f"Can't make query. Incorrect input type (got {type(email)}, need {type('')}).", + ) + email_hash = sha1(email.encode()).hexdigest() + + with Session() as session: + session.headers.update(Defaults.headers) + + resp = session.get(r"https://monitor.firefox.com") + if resp.status_code != 200: + return ScriptResponse.error( + message=f"Can't look up email in breaches. Server response: {resp.status_code}.", + ) + + csrf_re = findall(r'(?<="_csrf" value=").*(?=">)', resp.text) + if not csrf_re: + return ScriptResponse.error(message=f"Can't find csrf token.") + + csrf = csrf_re[0] + resp = session.post( + r"https://monitor.firefox.com/scan", + data={"_csrf": csrf, "emailHash": email_hash}, + ) + if resp.status_code != 200: + return ScriptResponse.error( + message=f"Can't look up email in breaches. Server response: {resp.status_code}.", + ) + + breaches = [] + soup = BeautifulSoup(resp.text, "html.parser") + for breach in soup.find_all("a", class_="breach-card"): + title = breach.find("span", class_="breach-title").text + info = breach.find_all("span", class_="breach-value") + if len(info) < 2: + continue + breaches.append( + {"title": title, "date": info[0].text, "compromised": info[1].text} + ) + + return ScriptResponse.success( + result=breaches, + message=f"Email {email} is found in {len(breaches)} breaches.", + ) diff --git a/src/scripts/osint/email_breach/requirements.txt b/src/scripts/osint/email_breach/requirements.txt new file mode 100644 index 0000000..92c3ac7 --- /dev/null +++ b/src/scripts/osint/email_breach/requirements.txt @@ -0,0 +1,12 @@ +beautifulsoup4==4.9.1 +certifi==2020.6.20 +chardet==3.0.4 +colorama==0.4.3 +commonmark==0.9.1 +idna==2.10 +Pygments==2.6.1 +requests==2.24.0 +rich==5.2.1 +soupsieve==2.0.1 +typing-extensions==3.7.4.2 +urllib3==1.25.10 diff --git a/src/scripts/osint/email_breach/test_module.py b/src/scripts/osint/email_breach/test_module.py new file mode 100644 index 0000000..20e8477 --- /dev/null +++ b/src/scripts/osint/email_breach/test_module.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 + +from unittest import TestCase, SkipTest +from random import choices +from string import ascii_uppercase + +from .module import Runner + + +def get_random_string(length: int = 30) -> str: + """ + Generates random string. + :param length: string length + :return: random string + """ + return "".join(choices(ascii_uppercase, k=length)) + + +def check_response_msg(message: str = "") -> None: + """ + Check that response message is valid and applicable + :param message: message of the response + :return: None + """ + if "429" in message: + raise SkipTest("Server respond with 429 (Too many requests)") + + +class EmailBreachTest(TestCase): + """ + Defines basic tests for the email breach script. + """ + + def setUp(self): + """ + Setup something before each test function + :return: None + """ + self.runner = Runner() + + def test_request(self) -> None: + """ + Test email breach on johndoe@gmail.com. + :return: None + """ + response = self.runner.run(email="johndoe@gmail.com") + check_response_msg(response.get("message", "")) + + self.assertIn("found in", response.get("message")) + self.assertGreaterEqual(len(response.get("result")), 10) + self.assertIn( + { + "compromised": "Passwords, Email addresses", + "date": "December 4, 2013", + "title": "Adobe", + }, + response.get("result"), + ) + + def test_random_string(self) -> None: + """ + Test email breach on random string request. + """ + response = self.runner.run(email=get_random_string()) + check_response_msg(response.get("message", "")) + self.assertIn("found in", response.get("message")) + + def test_unexpected_input(self) -> None: + """ + Test email breach on unexpected input type + """ + response = self.runner.run(email=None) + check_response_msg(response.get("message", "")) + self.assertIn("Can't make query", response.get("message")) From 6767d76c96dcc138d7994e9e378ea1104fe31359 Mon Sep 17 00:00:00 2001 From: Anton Nikolaev Date: Mon, 31 Aug 2020 13:47:25 +0700 Subject: [PATCH 2/2] Add Redis cache. Publisher+Consumer refactoring. (#57) * Add redis cache * Update redis handling * Pika/RabbitMQ/Publisher/Consumer global refactoring --- docker-compose.yml | 15 +++++++++ requirements.txt | 1 + server.py | 25 +++++++++++++-- src/cache/__init__.py | 0 src/cache/redis.py | 60 ++++++++++++++++++++++++++++++++++++ src/db/crud.py | 17 ++++++----- src/queue/consumer.py | 60 +++++++++++++++++++++--------------- src/queue/defaults.py | 3 +- src/queue/publisher.py | 69 ++++++++++++++++++++++++++++++------------ 9 files changed, 196 insertions(+), 54 deletions(-) create mode 100644 src/cache/__init__.py create mode 100644 src/cache/redis.py diff --git a/docker-compose.yml b/docker-compose.yml index 90ae458..40217dd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,17 @@ services: - postgres:/data/postgres networks: - postgres + redis: + container_name: osint-framework-redis + image: redis:alpine + healthcheck: + test: redis-cli ping + interval: 30s + timeout: 5s + retries: 5 + networks: + - redis + restart: unless-stopped rabbitmq: container_name: osint-framework-rabbitmq image: rabbitmq:alpine @@ -76,6 +87,7 @@ services: POSTGRES_PORT: ${POSTGRES_PORT:-5432} RABBITMQ_HOST: ${RABBITMQ_HOST:-osint-framework-rabbitmq} RABBITMQ_PORT: ${RABBITMQ_PORT:-5672} + REDIS_HOST: ${REDIS_HOST-osint-framework-redis} LOG_HANDLER: ${LOG_HANDLER:-stream} build: context: . @@ -91,10 +103,13 @@ services: networks: - postgres - rabbitmq + - redis networks: postgres: driver: bridge rabbitmq: driver: bridge + redis: + driver: bridge volumes: postgres: diff --git a/requirements.txt b/requirements.txt index 4621bad..65e9d05 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,6 +24,7 @@ pycares==3.1.1 pycparser==2.20 Pygments==2.6.1 PyYAML==5.3.1 +redis==3.5.3 requests==2.24.0 rich==5.1.2 selenium==3.141.0 diff --git a/server.py b/server.py index fecfb72..8f6dcea 100644 --- a/server.py +++ b/server.py @@ -22,6 +22,8 @@ from src.server.handlers.task_spawner import TaskSpawner from src.server.structures.response import ServerResponse from src.server.structures.task import TaskItem +from src.server.structures.task import TaskStatus +from src.cache.redis import RedisCache # Set logging level for Tornado Server tornado.log.access_log.setLevel(DEBUG) @@ -32,6 +34,9 @@ # Initialize publisher publisher = Publisher() +# Initialize redis +redis = RedisCache() + class BaseHandler(RequestHandler, ABC): """ @@ -170,12 +175,26 @@ def get(self) -> None: """ try: task_id = self.get_argument("task_id", default=None) - results = json_encode(TaskCrud.get_results(task_id)) + redis_cache = redis.get(task_id) + # If cache is available - write cache as response + if redis_cache: + logger.info(msg=f"Redis cache is available, task '{task_id}'") + return self.write(redis_cache) + # If cache is not available - get results from the database + db_results = TaskCrud.get_results(task_id) + json_results = dumps(db_results, default=str) + # If status is 'pending' (in progress), skip cache saving, write database results + if db_results.get("task", {}).get("status", "") == TaskStatus.PENDING: + logger.info(msg=f"Status of the task '{task_id}' is '{TaskStatus.PENDING}', skip Redis cache saving") + return self.write(json_results) + # If status is 'error' or 'success' (finished in any way), save the cache and write database results + redis.set(key=task_id, value=json_results) + logger.info(msg=f"Save results to Redis cache, task '{task_id}'") + self.write(json_results) except Exception as get_results_error: return self.error( msg=f"Unexpected error at getting results: {str(get_results_error)}" ) - self.write(results) class HealthCheckHandler(BaseHandler, ABC): @@ -219,7 +238,7 @@ def make_app() -> Application: # Init rabbitmq queue polling polling = tornado.ioloop.PeriodicCallback( - lambda: publisher.process_data_events(), 1000 + lambda: publisher.process_data_events(), callback_time=1.000 ) polling.start() diff --git a/src/cache/__init__.py b/src/cache/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cache/redis.py b/src/cache/redis.py new file mode 100644 index 0000000..1b5293c --- /dev/null +++ b/src/cache/redis.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 + +from redis import Redis +from os import environ + + +class DefaultValues: + # 24 hrs + REDIS_TIMEOUT = 86400 + REDIS_HOST = environ.get("REDIS_HOST", default="localhost") + + +class RedisCache: + def __init__( + self, + host: str = DefaultValues.REDIS_HOST, + timeout: int = DefaultValues.REDIS_TIMEOUT, + ): + self.options = dict(timeout=timeout) + self.redis = Redis(host=host) + + def get(self, key) -> dict or list: + """ + Return redis cache value + :param key: key to get + :return: cache + """ + if self.exists(key): + return self.redis.get(key) + return None + + def set(self, key, value, timeout=None) -> None: + """ + Set redis cache value + :param key: key to set + :param value: value to set + :param timeout: timeout to live + :return: None + """ + self.redis.set(key, value) + if timeout: + self.redis.expire(key, timeout) + else: + self.redis.expire(key, self.options["timeout"]) + + def delitem(self, key) -> None: + """ + Delete cache value + :param key: key to delete + :return: None + """ + self.redis.delete(key) + + def exists(self, key) -> bool: + """ + Check if value exists + :param key: key to check + :return: bool + """ + return bool(self.redis.exists(key)) diff --git a/src/db/crud.py b/src/db/crud.py index 53a06a0..1c492b1 100644 --- a/src/db/crud.py +++ b/src/db/crud.py @@ -118,25 +118,28 @@ def update_task(task: TaskItem, db: Session = SessionLocal()) -> None: @staticmethod @retry() - def get_results(task_id: str, db: Session = SessionLocal()) -> list: + def get_results(task_id: str, db: Session = SessionLocal()) -> dict: """ Return results :param task_id: task id to use :param db: database to use :return: dict """ + # fmt: off try: - results = ( - db.query(models.Result).filter(models.Result.owner_id == task_id).all() - ) + db_results = db.query(models.Result).filter(models.Result.owner_id == task_id).all() + db_task_status = db.query(models.Task).filter_by(task_id=task_id).first() except exc.DBAPIError as api_err: raise api_err from api_err except: - return [] + return {} else: - return [loads(str(data.result)) for data in results] + results = [loads(str(data.result)) for data in db_results] + task_status = object_as_dict(db_task_status) + return {"task": task_status, "results": results} finally: db.close() + # fmt: on @staticmethod @retry() @@ -164,7 +167,7 @@ def get_results_count(task_id: str, db: Session = SessionLocal()) -> int: @retry() def get_task(task_id: str, db: Session = SessionLocal()) -> dict: """ - Return task results by UUID + Return task status by UUID :param task_id: task id to use :param db: database to use :return: dict diff --git a/src/queue/consumer.py b/src/queue/consumer.py index 70fd137..3ff316d 100644 --- a/src/queue/consumer.py +++ b/src/queue/consumer.py @@ -2,7 +2,9 @@ from json import loads, dumps -import pika +from pika import BlockingConnection, ConnectionParameters +from pika.adapters.blocking_connection import BlockingChannel +from pika.spec import Basic, BasicProperties from src.core.runner.manager import CaseManager from src.core.utils.log import Logger @@ -15,62 +17,72 @@ class Consumer: def __init__( - self, host: str = Default.RABBITMQ_HOST, port: int = Default.RABBITMQ_PORT + self, + host: str = Default.RABBITMQ_HOST, + port: int = Default.RABBITMQ_PORT, + task_queue: str = Default.TASK_QUEUE ): """ Init rabbitmq consumer :param host: rabbitmq host :param port: rabbitmq port + :param task_queue: queue name """ - self.queue = Default.QUEUE - self.connection = pika.BlockingConnection( - pika.ConnectionParameters(host=host, port=port,) + self.connection = BlockingConnection( + ConnectionParameters(host=host, port=port,) ) self.channel = self.connection.channel() - self.channel.queue_declare(queue=self.queue) + self.channel.queue_declare(queue=task_queue) + self.channel.basic_consume( + queue=task_queue, + on_message_callback=self.task_process, + ) + self.manager = CaseManager() - def callback(self, ch, method, properties, body) -> None: + def task_process( + self, + channel: BlockingChannel, + method: Basic.Deliver, + properties: BasicProperties, + body: bytes + ) -> None: """ Process the received task - :param ch: channel + :param channel: channel :param method: method :param properties: task properties :param body: task body :return: None """ - raw_body = loads(body) + raw_body = loads(body.decode(encoding="utf-8")) cases = raw_body.get("cases", {}) task = TaskItem(**raw_body.get("task", {})) - done_tasks = 0 - cases_len = len(cases) - for result in self.manager.multi_case_runner(cases=cases): - done_tasks += 1 - TaskCrud.create_task_result(task, result or {}) - message = f"Done {done_tasks} out of {cases_len} cases" - task.set_pending(message) - logger.info(message) - TaskCrud.update_task(task) + try: + results = list(self.manager.multi_case_runner(cases=cases)) + for result in results: + TaskCrud.create_task_result(task, result or {}) + task.set_success(msg=f"Task done: {len(results)} out of {len(cases)} cases") + except Exception as cases_err: + task.set_error(msg=f"Task error: {str(cases_err)}") - task.set_success(msg=f"All cases done ({done_tasks} out of {cases_len})") TaskCrud.update_task(task) logger.info(msg=f"Done task {task.task_id}") - ch.basic_publish( + channel.basic_publish( exchange="", routing_key=properties.reply_to, - properties=pika.BasicProperties(correlation_id=properties.correlation_id), - body=dumps(task.as_json()), + properties=BasicProperties(correlation_id=properties.correlation_id), + body=dumps(task.as_json()).encode(encoding="utf-8"), ) - ch.basic_ack(delivery_tag=method.delivery_tag) + channel.basic_ack(delivery_tag=method.delivery_tag) def start_consuming(self) -> None: """ Run consumer :return: None """ - self.channel.basic_consume(queue=self.queue, on_message_callback=self.callback) self.channel.start_consuming() def __del__(self): diff --git a/src/queue/defaults.py b/src/queue/defaults.py index 83a153d..1b9afd2 100644 --- a/src/queue/defaults.py +++ b/src/queue/defaults.py @@ -7,4 +7,5 @@ class DefaultValues: RABBITMQ_HOST = str(environ.get("RABBITMQ_HOST", default="localhost")) RABBITMQ_PORT = int(environ.get("RABBITMQ_PORT", default=5672)) - QUEUE = "case_queue" + TASK_QUEUE = "task_queue" + RESPONSE_QUEUE = "response_queue" diff --git a/src/queue/publisher.py b/src/queue/publisher.py index e2ab1e5..d741575 100644 --- a/src/queue/publisher.py +++ b/src/queue/publisher.py @@ -2,7 +2,9 @@ from json import dumps -import pika +from pika import BlockingConnection, ConnectionParameters +from pika.adapters.blocking_connection import BlockingChannel +from pika.spec import Basic, BasicProperties from src.core.utils.log import Logger from src.queue.defaults import DefaultValues as Default @@ -13,36 +15,52 @@ class Publisher: def __init__( - self, host: str = Default.RABBITMQ_HOST, port: int = Default.RABBITMQ_PORT + self, + host: str = Default.RABBITMQ_HOST, + port: int = Default.RABBITMQ_PORT, + task_queue: str = Default.TASK_QUEUE, + response_queue: str = Default.RESPONSE_QUEUE, ): """ Init rabbitmq publisher :param host: rabbitmq host :param port: rabbitmq port + :param task_queue: queue name + :param response_queue: response queue name """ - self.queue = Default.QUEUE - self.connection = pika.BlockingConnection( - pika.ConnectionParameters(host=host, port=port,) + self.task_queue_name = task_queue + self.response_queue_name = response_queue + + self.connection = BlockingConnection( + ConnectionParameters(host=host, port=port,) ) self.channel = self.connection.channel() - result = self.channel.queue_declare(queue="", exclusive=True) - self.callback_queue = result.method.queue + + self.task_queue = self.channel.queue_declare(queue=self.task_queue_name) + self.response_queue = self.channel.queue_declare(queue=self.response_queue_name, exclusive=True) + self.channel.basic_consume( - queue=self.callback_queue, - on_message_callback=self.on_response, + queue=self.response_queue_name, + on_message_callback=self.task_response, auto_ack=True, ) - def on_response(self, ch, method, props, body) -> None: + @staticmethod + def task_response( + channel: BlockingChannel, + method: Basic.Deliver, + properties: BasicProperties, + body: bytes, + ) -> None: """ Process tasks response - :param ch: channel + :param channel: channel :param method: method - :param props: task properties + :param properties: task properties :param body: task body :return: None """ - logger.info(msg=f"Done task {props.correlation_id}") + logger.info(msg=f"Done task {properties.correlation_id}") def publish_task(self, task: TaskItem, cases: list) -> None: """ @@ -51,21 +69,34 @@ def publish_task(self, task: TaskItem, cases: list) -> None: :param cases: list of cases :return: None """ + task_body = dumps( + { + "task": task.as_json(), + "cases": cases + } + ).encode(encoding="utf-8") + self.channel.basic_publish( exchange="", - routing_key=self.queue, - properties=pika.BasicProperties( - reply_to=self.callback_queue, correlation_id=task.task_id, + routing_key=self.task_queue_name, + properties=BasicProperties( + reply_to=self.response_queue_name, correlation_id=task.task_id, ), - body=dumps({"task": task.as_json(), "cases": cases}), + body=task_body, ) - def process_data_events(self) -> None: + def process_data_events(self, time_limit: int = 1) -> None: """ Process data events + :param time_limit: limit time of processing (in seconds) :return: None """ - self.connection.process_data_events(time_limit=1) + logger.info( + msg=f"Check for new events: " + f"{self.task_queue.method.message_count} tasks in queue, " + f"{self.response_queue.method.message_count} responses in queue" + ) + self.connection.process_data_events(time_limit=time_limit) def __del__(self): """