-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecordbuilder.py
executable file
·335 lines (281 loc) · 9.6 KB
/
recordbuilder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#!/usr/bin/env python3
import functools
import inspect
import ipaddress
import sys
_cli_commands = {}
def cli(*cli_args):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
str_args = [arg[0] if isinstance(arg, tuple) else arg for arg in cli_args]
tupled_args = [arg if isinstance(arg, tuple) else (arg, None) for arg in cli_args]
_cli_commands[func.__name__] = {
'func': func,
'args': tupled_args,
'help': '{} {}'.format(func.__name__.upper(), ' '.join(str_args)),
'doc': inspect.getdoc(func)
}
return wrapper
return decorator
def run_cli():
if len(sys.argv) > 1:
command = sys.argv[1].lower()
else:
command = None
if command in ('help', '--help', '-h') or command is None:
print("You may create the following records:\n")
for arg_data in _cli_commands.values():
print(arg_data['help'])
print(
"\nRun recordbuilder.py with only the record type as argument"
" to see more info,\ne.g. ./recordbuilder.py aaaa"
)
return
cmd = _cli_commands.get(command)
if not cmd:
print("Did not understand argument '{}'?".format(command), file=sys.stderr)
return 1
if len(sys.argv[2:]) < len(cmd['args']):
exit_code = 0
if sys.argv[2:]:
print("Too few arguments.\n", file=sys.stderr)
exit_code = 1
print("Usage:", cmd['help'])
print(cmd['doc'])
return exit_code
parsed_args = []
for arg_val, arg_desc in zip(sys.argv[2:], cmd['args']):
if arg_desc[1]:
try:
arg_parsed = arg_desc[1](arg_val)
except ValueError:
print(
"{arg} must be of type {type} and '{val}' can't be parsed as such"
.format(arg=arg_desc[0], type=arg_desc[1].__name__, val=arg_val),
file=sys.stderr
)
return 1
parsed_args.append(arg_parsed)
else:
parsed_args.append(arg_val)
res = _cli_commands[command]['func'](*parsed_args)
print(res)
def to_oct(n):
return '\\{:03o}'.format(n)
def oct_len(s):
return to_oct(len(s))
def escape_text(s):
escapees = '\r\n\t: \\/'
escaped = []
for c in s:
if c in escapees:
escaped.append(to_oct(ord(c)))
else:
escaped.append(c)
escaped = ''.join(escaped)
return escaped
def escape_number(n):
# n == high * 256 + low
high = n // 256
low = n % 256
return to_oct(high) + to_oct(low)
@cli('domain', 'address', ('ttl', int))
def aaaa(domain, address, ttl):
"""Construct a generic AAAA record.
Arguments:
domain -- the hostname to map an IPv6 address to
address -- the IPv6 address, zero compression is supported
ttl -- time to live (int)
Returns a generic AAAA record and a PTR record.
"""
try:
ip = ipaddress.IPv6Address(address)
except ipaddress.AddressValueError:
print(
"Could not parse '{}' as an IPv6 address, see traceback:\n".format(address),
file=sys.stderr
)
raise
ip_oct = ''.join([to_oct(n) for n in ip.packed])
aaaa = ':{domain}:28:{address}:{ttl}'.format(
domain=escape_text(domain),
address=ip_oct,
ttl=ttl
)
reverse_ptr = '^{reverse}:{domain}:{ttl}'.format(
reverse=ip.reverse_pointer,
domain=escape_text(domain),
ttl=ttl
)
result = '\n'.join([aaaa, reverse_ptr])
return result
@cli('domain', 'flag', 'tag', 'value', ('ttl', int))
def caa(domain, flag, tag, value, ttl):
"""Construct a generic CAA record.
Arguments:
domain -- the hostname for this record
flag -- one of 0, 1
tag -- one of 'issue', 'issuewild', 'iodef'
value -- the value of the record
ttl -- time to live (int)
"""
if flag in (1, '1'):
flag_oct = '\\200'
elif flag in (0, '0'):
flag_oct = '\\000'
else:
raise ValueError("flag must be one of 0, 1")
if tag not in ('issue', 'issuewild', 'iodef'):
raise ValueError("tag must be one of 'issue', 'issuewild', 'iodef'")
result = ':{domain}:257:{flag}{tag}{value}:{ttl}'.format(
domain=escape_text(domain),
flag=flag_oct,
tag=oct_len(tag) + tag,
value=value,
ttl=ttl
)
return result
@cli('domain', 'keytype', 'key', ('ttl', int))
def domainkeys(domain, keytype, key, ttl):
"""Construct a generic DKIM record.
Arguments:
domain -- the hostname for this record
keytype -- should be 'rsa'
key -- the public key
ttl -- time to live (int)
Newlines (\\n) and carriage returns (\\r) are removed automatically from
the key. If the data is > 255 octets, it will be split into multiple strings
of at most 255 octets each.
"""
key = key.replace('\n', '')
key = key.replace('\r', '')
line = 'v=DKIM1; k={keytype}; p={key}'.format(
keytype=keytype,
key=key
)
if len(line) > 255:
print(
"Warning: data is > 255 octets and will be split into multiple strings.",
file=sys.stderr
)
# split line into chunks of at most 255 octets each
chunks = [line[i:i+255] for i in range(0, len(line), 255)]
strings = []
for chunk in chunks:
strings.append(oct_len(chunk))
strings.append(escape_text(chunk))
line = "".join(strings)
else:
line = oct_len(line) + escape_text(line)
result = ':{domain}:16:{line}:{ttl}'.format(
domain=escape_text(domain),
line=line,
ttl=ttl
)
return result
@cli('domain', ('order', int), ('preference', int), 'flag', 'services', 'regexp', 'replacement', ('ttl', int))
def naptr(domain, order, preference, flag, services, regexp, replacement, ttl):
"""Construct a generic NAPTR record.
Arguments:
domain -- the hostname for this record
order -- lower has precedence (int in range [0, 65535])
preference -- lower has precedence (int in range [0, 65535])
flag -- one of 'S', 'A', 'U', 'P'
services -- the services offered
regexp -- a regular expression rule, can be empty
replacement -- a replacement pattern
ttl -- time to live (int)
"""
if not 0 <= order <= 65535:
raise ValueError("order must be 0 <= order <= 65535")
if not 0 <= preference <= 65535:
raise ValueError("preference must be 0 <= preference <= 65535")
if replacement:
chunks = replacement.split('.')
chunks_oct = []
for chunk in chunks:
chunks_oct.append(oct_len(chunk) + chunk)
replacement_oct = ''.join(chunks_oct)
else:
replacement_oct = ''
result = (
':{domain}:35:{order}{preference}{flag}{services}{regexp}{replacement}\\000:{ttl}'
.format(
domain=escape_text(domain),
order=escape_number(order),
preference=escape_number(preference),
flag=oct_len(flag) + flag,
services=oct_len(services) + escape_text(services),
regexp=oct_len(regexp) + escape_text(regexp),
replacement=replacement_oct,
ttl=ttl
)
)
return result
@cli('domain', 'text', ('ttl', int))
def spf(domain, text, ttl):
"""Construct a generic SPF record.
Arguments:
domain -- the hostname for this record
text -- the SPF data
ttl -- time to live (int)
"""
result = ':{domain}:16:{text_len}{text}:{ttl}'.format(
domain=escape_text(domain),
text_len=oct_len(text),
text=escape_text(text),
ttl=ttl
)
return result
@cli('service', ('priority', int), ('weight', int), ('port', int), 'target', ('ttl', int))
def srv(service, priority, weight, port, target, ttl):
"""Construct a generic SRV record.
Arguments:
service -- on the form _service._proto.name.
priority -- lower has precedence (int in range [0, 65535])
weight -- higher has higher chance of getting picked (int in range [0, 65535])
port -- the service's port (int in range [0, 65535])
target -- hostname of the machine providing the service
ttl -- time to live (int)
"""
if not 0 <= priority <= 65535:
raise ValueError("priority must be 0 <= priority <= 65535")
if not 0 <= weight <= 65535:
raise ValueError("weight must be 0 <= weight <= 65535")
if not 0 <= port <= 65535:
raise ValueError("port must be 0 <= port <= 65535")
chunks = target.split('.')
chunks_oct = []
for chunk in chunks:
chunks_oct.append(oct_len(chunk) + chunk)
target_oct = ''.join(chunks_oct)
result = (
':{service}:33:{priority}{weight}{port}{target}\\000:{ttl}'
.format(
service=escape_text(service),
priority=escape_number(priority),
weight=escape_number(weight),
port=escape_number(port),
target=target_oct,
ttl=ttl
)
)
return result
@cli('domain', 'text', ('ttl', int))
def txt(domain, text, ttl):
"""Construct a TXT record.
Arguments:
domain -- the hostname for the record
text -- the value of the record
ttl -- time to live (int)
"""
result = "'{domain}:{text}:{ttl}".format(
domain=escape_text(domain),
text=escape_text(text),
ttl=ttl
)
return result
if __name__ == '__main__':
sys.exit(run_cli())