-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathdmarc.py
201 lines (176 loc) · 6.74 KB
/
dmarc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import argparse
import collections
import king_phisher.color as color
import king_phisher.constants as constants
import king_phisher.spf as spf
import king_phisher.client.mailer as mailer
import king_phisher.client.plugins as plugins
import king_phisher.client.gui_utilities as gui_utilities
import dns.rdtypes.ANY.TXT
import dns.resolver
TAGS = {
'adkim': 'r',
'aspf': 'r',
'fo': '0',
'p': None,
'pct': '100',
'rf': 'afrf',
'ri': '86400',
'rua': None,
'ruf': None,
'sp': None,
'v': None
}
"""
A dictionary of all tags defined in RFC-7489 with their default value as a
string, or None if no default value is specified.
"""
class DMARCError(Exception):
def __init__(self, message):
self.message = message
def __repr__(self):
return "<{0} message='{1}' >".format(self.__class__.__name__, self.message)
class DMARCNoRecordError(DMARCError):
pass
class DMARCParseError(DMARCError):
def __init__(self, message, tag=None):
self.message = message
self.tag = tag
class DMARCPolicy(object):
def __init__(self, record):
record = record.strip()
self.record = record
self.tags = collections.OrderedDict()
record = record.split(';')
# the tag specification is defined in the DKIM spec (RFC 6376) https://tools.ietf.org/html/rfc6376#section-3.2
for token in record:
token = token.strip()
if not token:
continue
if not '=' in token:
raise DMARCParseError('can not separate record token: ' + token)
tag, value = token.split('=', 1)
if tag not in TAGS:
# ignore unknown tags per https://tools.ietf.org/html/rfc7489#section-6.3
continue
self.tags[tag.strip()] = value.strip()
if 'p' in self.tags and self.tags['p'] not in ('none', 'quarantine', 'reject'):
raise DMARCParseError("invalid dmarc record (invalid policy: {0})".format(self.tags['p']), tag='p')
if self.version is None:
raise DMARCParseError('invalid dmarc record (missing version tag)', tag='v')
if self.version != 'DMARC1':
raise DMARCParseError("invalid dmarc record (invalid version value: {0})".format(self.version), tag='v')
def __repr__(self):
return "<{0} v={1} >".format(self.__class__.__name__, self.version)
def __str__(self):
return self.record
@classmethod
def from_domain(cls, domain):
if not domain.startswith('_dmarc.'):
domain = '_dmarc.' + domain
try:
answers = dns.resolver.query(domain, 'TXT')
except dns.exception.DNSException:
raise DMARCNoRecordError("DNS resolution error for: {0} TXT".format(domain)) from None
answers = list(answer for answer in answers if isinstance(answer, dns.rdtypes.ANY.TXT.TXT))
answers = [answer for answer in answers if answer.strings[0].decode('utf-8').startswith('v=DMARC')]
if len(answers) == 0:
raise DMARCParseError('failed to parse dmarc record for domain: ' + domain)
record = ''.join([part.decode('utf-8') for part in answers[0].strings])
return cls(record)
def get(self, tag):
if not tag in TAGS:
raise KeyError(tag)
return self.tags.get(tag, TAGS[tag])
@property
def policy(self):
return self.tags.get('p')
@property
def version(self):
return self.tags.get('v')
class Plugin(plugins.ClientPlugin):
authors = ['Spencer McIntyre']
classifiers = [
'Plugin :: Client :: Email :: Spam Evasion',
'Script :: CLI'
]
title = 'DMARC Check'
description = """
This plugin adds another safety check to the message precheck routines to
verify that if DMARC exists the message will not be quarentined or rejected.
If no DMARC policy is present, the policy is set to none or the percentage
is set to 0, the message sending operation will proceed.
"""
homepage = 'https://github.com/securestate/king-phisher-plugins'
reference_urls = ['https://dmarc.org/overview/']
req_min_version = '1.5.0'
version = '1.2'
def initialize(self):
self.signal_connect('send-precheck', self.signal_send_precheck, gobject=self.application.main_tabs['mailer'])
return True
def signal_send_precheck(self, mailer_tab):
test_ip = mailer.guess_smtp_server_address(
self.application.config['smtp_server'],
(self.application.config['ssh_server'] if self.application.config['smtp_ssh_enable'] else None)
)
if not test_ip:
self.logger.info('skipping dmarc policy check because the smtp server address could not be resolved')
return True
test_sender, test_domain = self.application.config['mailer.source_email_smtp'].split('@')
self.logger.debug('checking the dmarc policy for domain: ' + test_domain)
text_insert = mailer_tab.tabs['send_messages'].text_insert
text_insert("Checking the DMARC policy of target domain '{0}'... ".format(test_domain))
try:
spf_result = spf.check_host(test_ip, test_domain, sender=test_sender)
except spf.SPFError as error:
text_insert("done, encountered exception: {0}.\n".format(error.__class__.__name__))
return True
try:
dmarc_policy = DMARCPolicy.from_domain(test_domain)
except DMARCNoRecordError:
self.logger.debug('no dmarc policy found for domain: ' + test_domain)
text_insert('done, no policy found.\n')
return True
except DMARCError as error:
self.logger.warning('dmarc error: ' + error.message)
text_insert("done, encountered exception: {0}.\n".format(error.__class__.__name__))
return False
text_insert('done.\n')
self.logger.debug("dmarc policy set to {0!r} for domain: {1}".format(dmarc_policy.policy, test_domain))
text_insert('Found DMARC policy:\n')
text_insert(' Policy: ' + dmarc_policy.policy + '\n')
text_insert(' Percent: ' + dmarc_policy.get('pct') + '\n')
if dmarc_policy.get('rua'):
text_insert(' RUA URI: ' + dmarc_policy.get('rua') + '\n')
if dmarc_policy.get('ruf'):
text_insert(' RUF URI: ' + dmarc_policy.get('ruf') + '\n')
if spf_result == constants.SPFResult.PASS:
return True
if dmarc_policy.policy == 'none' or dmarc_policy.get('pct') == '0':
return True
if dmarc_policy.policy == 'quarantine':
message = 'The DMARC policy results in these messages being quarantined.'
elif dmarc_policy.policy == 'reject':
message = 'The DMARC policy results in these messages being rejected.'
text_insert('WARNING: ' + message + '\n')
ignore = gui_utilities.show_dialog_yes_no(
'DMARC Policy Failure',
self.application.get_active_window(),
message + '\nContinue sending messages anyways?'
)
return ignore
def main():
parser = argparse.ArgumentParser(description='DMARC Check Utility', conflict_handler='resolve')
parser.add_argument('domain', help='the name of the domain to check')
arguments = parser.parse_args()
try:
policy = DMARCPolicy.from_domain(arguments.domain)
except DMARCNoRecordError:
color.print_error('dmarc policy not found')
return
color.print_status('dmarc policy found')
color.print_status('record: ' + policy.record)
color.print_status('version: ' + policy.version)
color.print_status('policy: ' + policy.policy)
if __name__ == '__main__':
main()