-
Notifications
You must be signed in to change notification settings - Fork 297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DomainTools Expert Bot (initial version) #1004
base: develop
Are you sure you want to change the base?
Changes from 3 commits
4536da5
5c61f0a
5348b39
9e9fa96
3a400cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# Domaintools expert | ||
|
||
This expert bot is an example on how to query domaintools. | ||
|
||
It does require an API from domaintools. | ||
|
||
Documentation on domaintools: https://www.domaintools.com/resources/api-documentation/ | ||
Specifically, this bot can query a domain for the reputation score in domaintools: https://www.domaintools.com/resources/api-documentation/reputation/ | ||
|
||
It will add the score in the extra field: ``extra.domaintools_score``. | ||
|
||
|
||
Authors: Juan Ortega Valiente, Aaron Kaplan |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
domaintools_api==0.1.7 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
domaintools expert: query domtaintools.com to get a reputation score for a domain name | ||
|
||
""" | ||
from intelmq.lib.bot import Bot | ||
try: | ||
from domaintools import API, exceptions | ||
except ImportError: | ||
API = None | ||
|
||
|
||
class DomaintoolsExpertBot(Bot): | ||
|
||
def init(self): | ||
self.logger.info("Loading Domaintools expert.") | ||
if (not API): | ||
self.logger.exception("need to have the domaintools API installed. See https://github.com/domaintools/python_api") | ||
self.stop() | ||
if (not self.parameters.user): | ||
self.logger.exception("need to specify user for domaintools expert in runtime.conf. Exiting") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing dot at end of line. And you can remove the "Exiting", that's done by the bot class. |
||
self.stop() | ||
if (not self.parameters.password): | ||
self.logger.exception("need to specify password for the user for domaintools expert in runtime.conf. Exiting") | ||
self.stop() | ||
self.api = API(self.parameters.user, self.parameters.password) | ||
|
||
def domaintools_get_score(self, fqdn): | ||
score = None | ||
if fqdn: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's already checked in the |
||
resp = self.api.reputation(fqdn, include_reasons=False) # don't include a reason in the JSON response | ||
|
||
try: | ||
score = resp['risk_score'] | ||
except exceptions.NotFoundException: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can use |
||
score = None | ||
except exceptions.BadRequestException: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is a bad request really the same as no result? Just raise the error, so the bot can retry. |
||
score = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If nothing is returned explicitly, the return value is |
||
return score | ||
|
||
def process(self): | ||
event = self.receive_message() | ||
extra = {} | ||
|
||
for key in ["source.", "destination."]: | ||
key_fqdn = key + "fqdn" | ||
if key_fqdn not in event: | ||
continue # can't query if we don't have a domain name | ||
score = self.domaintools_get_score(event.get(key_fqdn)) | ||
if score is not None: | ||
extra["domaintools_score"] = score | ||
event.add("extra", extra) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fails if extra is already present, with force it would overwrite existing data |
||
|
||
self.send_message(event) | ||
self.acknowledge_message() | ||
|
||
|
||
BOT = DomaintoolsExpertBot |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Testing GethostbynameExpertBot. | ||
""" | ||
|
||
import unittest | ||
|
||
import intelmq.lib.test as test | ||
from intelmq.bots.experts.domaintools.expert import DomaintoolsExpertBot | ||
|
||
EXAMPLE_INPUT = {"__type": "Event", | ||
"source.fqdn": "google.com", | ||
"time.observation": "2015-01-01T00:00:00+00:00" | ||
} | ||
EXAMPLE_OUTPUT = {"__type": "Event", | ||
"source.fqdn": "google.com", | ||
"extra": '{"domaintools_score": 0}', | ||
"time.observation": "2015-01-01T00:00:00+00:00" | ||
} | ||
NONEXISTING_INPUT = {"__type": "Event", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the lib raises the |
||
"source.fqdn": "example.invalid", | ||
"destination.fqdn": "example.invalid", | ||
"time.observation": "2015-01-01T00:00:00+00:00" | ||
} | ||
|
||
|
||
@test.skip_internet() | ||
class TestDomaintoolsExpertBot(test.BotTestCase, unittest.TestCase): | ||
""" | ||
A TestCase for DomaintoolsExpertBot. | ||
""" | ||
|
||
@classmethod | ||
def set_bot(self): | ||
self.bot_reference = DomaintoolsExpertBot | ||
self.sysconfig = {'user': 'mkendrick_first2017', 'password': 'c0e4e-e2527-dc6af-824a4-229d5'} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this here intentionally? |
||
|
||
def test_existing(self): | ||
self.input_message = EXAMPLE_INPUT | ||
self.run_bot() | ||
self.assertMessageEqual(0, EXAMPLE_OUTPUT) | ||
|
||
def test_non_existing(self): | ||
self.input_message = NONEXISTING_INPUT | ||
self.run_bot() | ||
self.assertMessageEqual(0, NONEXISTING_INPUT) | ||
|
||
if __name__ == '__main__': # pragma: no cover | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong usage of
logger.exception
. It prints the traceback of a fromerly catched exception.Replace both lines with
raise ValueError("your error message")
. The bot class performs the stop itself.