-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathcsrgen.py
executable file
·397 lines (335 loc) · 14 KB
/
csrgen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
#!/usr/bin/env python
#
# Generate a key, self-signed certificate, and certificate request.
# Usage: csrgen -n <fqdn>
#
# When more than one hostname is provided, a SAN (Subject Alternate Name)
# certificate and request are generated. This can be acheived by adding -s.
# Usage: csrgen -n <hostname> -s <san0> <san1>
#
# If you want to generate multiple CSRs, you can use the -f command to
# feed in a .yaml file via the CLI. See the example samples/sample-file.yaml in this
# repository for examples.
#
# If you want to predefine some of your CSR attributes, you can use the -u command
# to feed in a .yaml file via the CLI. See the example samples/csr-sample-unattended.yaml in this repository
# for examples.
#
# Author: Cotton Beckfield <[email protected]> 06-25-2014, Updated 01-20-2020
# Author: Ben Mz <[email protected]> Updated 06-15-2018
# Libraries/Modules
import sys, platform, yaml
import argparse, logging, logging.handlers
from OpenSSL import crypto, SSL
__version__ = '1.1.0'
class Certificate:
def __init__(self, logger, opts={}):
self._logger = logger
self.allowed = ["Digital Signature", "Non Repudiation", "Key Encipherment"]
# Set default usage
self._level = logging.WARNING
self._key_size = 2048
self._ca = False
self._verbose = True
self.usage = ','.join(self.allowed)
try:
self._verbose = opts['verbose']
del opts['verbose']
except KeyError:
pass
# Set default log level
try:
self._level = opts['level']
del opts['level']
except KeyError:
pass
# Set key size
try:
if int(opts['size']) in [1024,2048,4096]:
self._key_size = int(opts['size'])
del opts['size']
except KeyError:
pass
except ValueError:
pass
try:
for usage in opts['usage']:
if usage not in self.allowed:
raise Exception('Invalid key usage: {u}'.format(u=usage))
self.usage = opts['usage']
del opts['usage']
except KeyError:
# Keep server default if no usage is set
pass
self.opts = opts
self.output('[*] We have already set options:',level=logging.DEBUG)
self.output('{o}'.format(o=self.opts),level=logging.DEBUG)
def _header(self):
self.output('\t\t..:: Certificate Signing Request (CSR) Generator ::..\n')
def _isCA(self):
return "TRUE" if self._ca else "FALSE"
def _ask(self, msg, country=False, default=None):
while True:
rep = raw_input(msg)
if country and (len(rep)) and (len(rep) != 2):
self.output('[!] Sorry this value is invalid (should be two letters only).')
continue
if len(rep) is 0:
if default is None:
self.output('[!] Sorry this value is mandatory.')
continue
rep = default
break
return rep
# Generate Certificate Signing Request (CSR)
def generateCSR(self):
try:
nodename = self.opts['hostname']
except KeyError:
raise Exception('Could not generate certificate with empty hostname')
# These variables will be used to create the host.csr and host.key files.
csrfile = nodename + '.csr'
keyfile = nodename + '.key'
# OpenSSL Key Type Variable, passed in later.
TYPE_RSA = crypto.TYPE_RSA
# Appends SAN to have 'DNS:'
ss = []
try:
for entry in self.opts['sans']:
ss.append("DNS: {e}".format(e=entry))
except KeyError:
pass
ss = ", ".join(ss)
req = crypto.X509Req()
req.get_subject().CN = nodename
try:
req.get_subject().countryName = self.opts['C']
req.get_subject().stateOrProvinceName = self.opts['ST']
req.get_subject().localityName = self.opts['L']
req.get_subject().organizationName = self.opts['O']
req.get_subject().organizationalUnitName = self.opts['OU']
except KeyError:
raise Exception('Missing mandatory certificate value!')
# Email Address is not mandatory
try:
req.get_subject().emailAddress = self.opts['emailAddress']
except KeyError:
pass
# Add in extensions
base_constraints = ([
crypto.X509Extension("keyUsage", False, self.usage),
crypto.X509Extension("basicConstraints", False, "CA:{c}".format(c=self._isCA())),
])
x509_extensions = base_constraints
# If there are SAN entries, append the base_constraints to include them.
if len(ss):
san_constraint = crypto.X509Extension("subjectAltName", False, ss)
x509_extensions.append(san_constraint)
req.add_extensions(x509_extensions)
# Utilizes generateKey function to kick off key generation.
key = self.generateKey(TYPE_RSA, self._key_size)
req.set_pubkey(key)
req.sign(key, "sha256")
self.output('[+] Generate CSR file: {f}'.format(f=csrfile))
self.generateFiles(csrfile, req)
self.output('[+] Generate Key file: {f}'.format(f=keyfile))
self.generateFiles(keyfile, key)
self.output("\n[+] Your CSR and certificate ({s} bits) are now generated with:".format(s=self._key_size))
for k,v in self.opts.items():
if k is 'hostname':
self.output("\t[CN]\t\t-> {v}".format(k=k,v=v))
else:
self.output("\t[{k}]\t\t-> {v}".format(k=k,v=v))
return req
def getCSRSubjects(self):
fields = ['C','ST','L','O','OU','hostname']
for field in fields:
try:
# Check if field is already setup
if self.opts[field]:
self.output('[*] Field {n} is set'.format(n=field), level=logging.DEBUG)
continue
except KeyError:
self.output('[*] Field {n} is NOT set'.format(n=field), level=logging.DEBUG)
pass
if field is 'C':
self.opts['C'] = self._ask("Enter your Country Name (2 letter code) [US]: ", default='US', country=True)
elif field is 'ST':
self.opts['ST'] = self._ask("Enter your State or Province <full name> [California]: ", default='California')
elif field is 'L':
self.opts['L'] = self._ask("Enter your (Locality Name (eg, city) [San Francisco]: ", default='San Francisco')
elif field is 'O':
self.opts['O'] = self._ask("Enter your Organization Name (eg, company) [FTW Enterprise]: ", default='FTW Enterprise')
elif field is 'OU':
self.opts['OU'] = self._ask("Enter your Organizational Unit (eg, section) [IT]: ", default='IT')
# Parse the contents of the YAML file and then
# auto setup values.
def loadDefaults(self, csr_file):
try:
self.output("[+] Reading default values file: {f}".format(f=csr_file), level=logging.DEBUG)
cfg = self._parseYAML(csr_file)
except Exception as err:
raise Exception(err)
for k,v in cfg.items():
if (k is 'C') and len(v) != 2:
continue
if len(v) is 0:
continue
try:
self.opts[k] = str(v)
except Exception:
pass
# Parse the contents of the YAML file and then
# generate a CSR for each of them.
def loadNodes(self, nodes_file):
try:
self.output("[+] Reading nodes file: {f}".format(f=nodes_file), level=logging.DEBUG)
cfg = self._parseYAML(nodes_file)
except Exception as err:
raise Exception(err)
self.output('[+] Generate certificates for:')
for k,v in cfg.items():
self.opts['hostname'] = cfg[k]['hostname']
if cfg[k]['sans']:
self.opts['sans'] = cfg[k]['sans']
else:
self.opts['sans'] = ''
self.output("[+] host: {h}, alternate names: {s}".format(h=self.opts['hostname'], s=self.opts['sans']))
self.generateCSR()
def _parseYAML(self, yaml_file):
"""Parse YAML file and return object generated
"""
with open(yaml_file, 'r') as stream:
cfg = yaml.safe_load(stream)
return cfg
def generateKey(self, type, bits):
"""Generate Private Key
"""
self.output('[+] Generate certificate seed Key...')
key = crypto.PKey()
key.generate_key(type, bits)
return key
def generateFiles(self, mkFile, request):
"""Generate .csr/key files.
"""
with open(mkFile, "w") as f:
if ".csr" in mkFile:
f.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, request))
elif ".key" in mkFile:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, request))
else:
self.output("[!] Failed to create CSR/Key files", level=logging.ERROR)
def output(self, msg, level=logging.WARNING):
"""Generate output to CLI and log file
"""
# Output to log
if level == logging.DEBUG:
self._logger.debug(msg)
elif level == logging.INFO:
self._logger.info(msg)
elif level == logging.WARNING:
self._logger.warning(msg)
elif level == logging.ERROR:
self._logger.error(msg)
elif level == logging.CRITICAL:
self._logger.critical(msg)
# Misconfigured level are high notifications
else:
self._logger.error("[!] Invalid level for message: {m}".format(m=msg))
# Output to CLI if needed
if self._verbose and (level >= self._level):
sys.stdout.write("{m}\n".format(m=msg))
class Authority(Certificate):
def __init__(self,logger, opts):
# Init certificate
try:
super(Authority, self).__init__(logger,opts)
except Exception as err:
raise Exception("Error at {n} initialization: {e}".format(n=self._name, e=err))
self._ca = True
def initialize(self):
self.generateCSR()
def main(argv):
print(argv)
# Define default values
VERBOSE = False
LOG_FILE = "./certGen.log"
LOG_LEVEL = logging.WARNING
opts = {}
# Run Portion
# This section will parse the flags available via command line.
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", help="Output more infos", action="store_true")
parser.add_argument("-d", "--debug", help="Enable debug mode", action="store_true")
parser.add_argument("-l", "--log", help="Define log file (default: {f})".format(f=LOG_FILE))
parser.add_argument("-n", "--name", help="Provide the FQDN", action="store", default="")
parser.add_argument("-s", "--san", help="SANS, define alternative names", action="store", nargs='*', default="")
parser.add_argument("-k", "--keysize", help="Provide the key size", action="store", default="2048")
parser.add_argument("-u", "--unattended", help="Load CSR predefined options", action="store", default="")
parser.add_argument("-f", "--file", help="Load hosts file (CN and optional Alternate Names) list", action="store", default="")
parser.add_argument("-a", "--authority", help="Generate Authority certificate (Default is server)", action="store_true")
parser.add_argument("-c", "--client", help="Generate client certificate (Default is server)", action="store_true")
args = parser.parse_args()
# Run the primary function.
# Checks to see if the -f was given. If it wasn't, skip directly
# to the generateCSR, otherwise it'll need to parse the YAML file
# first via the functio parseYAML called via generateFromFile.
if args.log:
LOG_FILE = args.log
if args.verbose:
VERBOSE = True
opts['verbose'] = VERBOSE
if args.debug:
opts['level'] = logging.DEBUG
# Define logger
try:
logger = logging.getLogger('certgen')
hdlr = logging.handlers.TimedRotatingFileHandler(LOG_FILE, when="midnight", backupCount=3)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(LOG_LEVEL)
except AttributeError as err:
sys.stdout.write("[!] Unable to open log file {f}: {e}\n".format(f=LOG_FILE, e=err))
sys.exit(1)
except IOError as err:
sys.stdout.write("[!] Unable to open log file {f}: {e}\n".format(f=LOG_FILE, e=err))
sys.exit(1)
if args.keysize:
opts['size'] = args.keysize
if args.authority:
if args.client:
sys.stdout.write('[!] You can generate multiple certificate type at one time.')
sys.exit(2)
if args.san:
sys.stdout.write('[!] You can not specify alternative names with authority certificates')
sys.exit(1)
opts['usage'] = ['Certificate signing','CRL signing']
if args.client:
if args.san:
sys.stdout.write('[!] You can not specify alternative names with client certificates')
sys.exit(1)
opts['usage'] = ["digitalSignature"]
# Store infos if set
if args.name:
opts['hostname'] = args.name
if args.san:
opts['sans'] = args.san
try:
# Initialize certificate object
cert = Certificate(logger, opts)
if args.unattended:
cert.loadDefaults(args.unattended)
# Run interactively if needed for C, ST, L, O, OU values
cert.getCSRSubjects()
if args.file:
cert.loadNodes(args.file)
else:
cert.generateCSR()
except KeyboardInterrupt:
sys.stdout.write('\n[!] Exit requested.')
except SystemExit:
sys.stdout.write('\n[!] Software aborted.')
sys.stdout.write('\nBye! ;)\n')
if __name__ == '__main__':
main(sys.argv)