Skip to content

Commit

Permalink
[tst] unittests
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed May 13, 2021
1 parent a0659ee commit 9693b02
Show file tree
Hide file tree
Showing 5 changed files with 949 additions and 299 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: "CodeQL"

on:
push:
# push:
pull_request:
branches: [ devel ]
schedule:
Expand Down
46 changes: 26 additions & 20 deletions examples/ca_handler/openssl_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ def _certificate_extensions_add(self, cert_extension_dic, cert, ca_cert):
for extension in cert_extension_dic:
self.logger.debug('adding extension: {0}: {1}: {2}'.format(extension, cert_extension_dic[extension]['critical'], cert_extension_dic[extension]['value']))
if extension == 'subjectKeyIdentifier':
self.logger.info('_certificate_extensions_add(): subjectKeyIdentifier')
_tmp_list.append(crypto.X509Extension(convert_string_to_byte(extension), critical=cert_extension_dic[extension]['critical'], value=convert_string_to_byte(cert_extension_dic[extension]['value']), subject=cert))
elif 'subject' in cert_extension_dic[extension]:
self.logger.info('_certificate_extensions_add(): subject')
_tmp_list.append(crypto.X509Extension(convert_string_to_byte(extension), critical=cert_extension_dic[extension]['critical'], value=convert_string_to_byte(cert_extension_dic[extension]['value']), subject=cert))
elif 'issuer' in cert_extension_dic[extension]:
self.logger.info('_certificate_extensions_add(): issuer')
_tmp_list.append(crypto.X509Extension(convert_string_to_byte(extension), critical=cert_extension_dic[extension]['critical'], value=convert_string_to_byte(cert_extension_dic[extension]['value']), issuer=ca_cert))
else:
_tmp_list.append(crypto.X509Extension(type_name=convert_string_to_byte(extension), critical=cert_extension_dic[extension]['critical'], value=convert_string_to_byte(cert_extension_dic[extension]['value'])))
Expand All @@ -135,30 +138,31 @@ def _certificate_extensions_load(self):
file_dic = dict(config_load(self.logger, cfg_file=self.openssl_conf))

cert_extention_dic = {}
for extension in file_dic['extensions']:
if 'extensions' in file_dic:
for extension in file_dic['extensions']:

cert_extention_dic[extension] = {}
parameters = file_dic['extensions'][extension].split(',')
cert_extention_dic[extension] = {}
parameters = file_dic['extensions'][extension].split(',')

# set crititcal task if applicable
if parameters[0] == 'critical':
cert_extention_dic[extension]['critical'] = bool(parameters.pop(0))
else:
cert_extention_dic[extension]['critical'] = False
# set crititcal task if applicable
if parameters[0] == 'critical':
cert_extention_dic[extension]['critical'] = bool(parameters.pop(0))
else:
cert_extention_dic[extension]['critical'] = False

# remove leading blank from first element
parameters[0] = parameters[0].lstrip()
# remove leading blank from first element
parameters[0] = parameters[0].lstrip()

# check if we have an issuer option (if so remove it and mark it as to be set)
if 'issuer:' in parameters[-1]:
cert_extention_dic[extension]['issuer'] = bool(parameters.pop(-1))
# check if we have an issuer option (if so remove it and mark it as to be set)
if 'issuer:' in parameters[-1]:
cert_extention_dic[extension]['issuer'] = bool(parameters.pop(-1))

# check if we have an issuer option (if so remove it and mark it as to be set)
if 'subject:' in parameters[-1]:
cert_extention_dic[extension]['subject'] = bool(parameters.pop(-1))
# check if we have an issuer option (if so remove it and mark it as to be set)
if 'subject:' in parameters[-1]:
cert_extention_dic[extension]['subject'] = bool(parameters.pop(-1))

# combine the remaining items and put them in as values
cert_extention_dic[extension]['value'] = ','.join(parameters)
# combine the remaining items and put them in as values
cert_extention_dic[extension]['value'] = ','.join(parameters)

self.logger.debug('CAhandler._certificate_extensions_load() ended')
return cert_extention_dic
Expand All @@ -176,11 +180,15 @@ def _certificate_store(self, cert):

# determine filename
if self.save_cert_as_hex:
self.logger.info('convert serial to hex: {0}: {1}'.format(serial, '{:X}'.format(serial)))
cert_file = '{:X}'.format(serial)
else:
cert_file = str(serial)
with open('{0}/{1}.pem'.format(self.cert_save_path, cert_file), 'wb') as fso:
fso.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
else:
self.logger.error('CAhandler._certificate_store() handler configuration incomplete: cert_save_path is missing')

self.logger.debug('CAhandler._certificate_store() ended')

def _config_check(self):
Expand Down Expand Up @@ -232,7 +240,6 @@ def _config_load(self):
"""" load config from file """
self.logger.debug('CAhandler._config_load()')
config_dic = config_load(self.logger, cfg_file=self.cfg_file)

if 'issuing_ca_key' in config_dic['CAhandler']:
self.issuer_dict['issuing_ca_key'] = config_dic['CAhandler']['issuing_ca_key']
if 'issuing_ca_cert' in config_dic['CAhandler']:
Expand Down Expand Up @@ -479,7 +486,6 @@ def enroll(self, csr):
cert_pem = convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
else:
error = 'urn:ietf:params:acme:badCSR'

except BaseException as err:
self.logger.error('CAhandler.enroll() error: {0}'.format(err))

Expand Down
141 changes: 74 additions & 67 deletions examples/ca_handler/xca_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,79 +601,86 @@ def _validity_calculate(self, template_dic):

def enroll(self, csr):
""" enroll certificate """
# pylint: disable=R0914
# pylint: disable=R0914, R0915
self.logger.debug('CAhandler.enroll()')

error = self._config_check()
if not error:
cert = None

if not error:
request_name = self._requestname_get(csr)
# import CSR to database
_csr_info = self._csr_import(csr, request_name)

# prepare the CSR to be signed
# csr = build_pem_file(self.logger, None, b64_url_recode(self.logger, csr), None, True)
csr = build_pem_file(self.logger, None, csr, None, True)

# load ca cert and key
(ca_key, ca_cert, ca_id) = self._ca_load()

if ca_key and ca_cert:
# load request
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)

# copy cn of request
subject = req.get_subject()
# rewrite CN if required
if not subject.CN:
self.logger.debug('rewrite CN to {0}'.format(request_name))
subject.CN = request_name

# create certificate object
cert = crypto.X509()
cert.set_pubkey(req.get_pubkey())
cert.set_version(2)
cert.set_serial_number(uuid.uuid4().int & (1<<63)-1)
cert.set_issuer(ca_cert.get_subject())

# load template if configured
if self.template_name:
(dn_dic, template_dic) = self._template_load()
else:
dn_dic = {}
template_dic = {}

# set cert_validity
if 'validity' in template_dic:
# take validity from template
cert_validity = template_dic['validity']
if request_name:
# import CSR to database
_csr_info = self._csr_import(csr, request_name)

# prepare the CSR to be signed
# csr = build_pem_file(self.logger, None, b64_url_recode(self.logger, csr), None, True)
csr = build_pem_file(self.logger, None, csr, None, True)

# load ca cert and key
(ca_key, ca_cert, ca_id) = self._ca_load()

if ca_key and ca_cert and ca_id:
# load request
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)

# copy cn of request
subject = req.get_subject()
# rewrite CN if required
if not subject.CN:
self.logger.info('rewrite CN to {0}'.format(request_name))
subject.CN = request_name

# create certificate object
cert = crypto.X509()
cert.set_pubkey(req.get_pubkey())
cert.set_version(2)
cert.set_serial_number(uuid.uuid4().int & (1<<63)-1)
cert.set_issuer(ca_cert.get_subject())

# load template if configured
if self.template_name:
(dn_dic, template_dic) = self._template_load()
else:
dn_dic = {}
template_dic = {}

# set cert_validity
if 'validity' in template_dic:
self.logger.info('take validity from template: {0}'.format(template_dic['validity']))
# take validity from template
cert_validity = template_dic['validity']
else:
cert_validity = self.cert_validity_days
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(cert_validity * 86400)

# get extension list
extension_list = self._extension_list_generate(template_dic, cert, ca_cert)
# add extensions (copy from CSR and take the ones we constructed)
cert.add_extensions(req.get_extensions())
cert.add_extensions(extension_list)

if dn_dic:
self.logger.info('modify subject with template data')
subject = self._subject_modify(subject, dn_dic)
cert.set_subject(subject)

# sign csr
cert.sign(ca_key, 'sha256')
serial = cert.get_serial_number()

# get hashes
issuer_hash = ca_cert.subject_name_hash() & 0x7fffffff
name_hash = cert.subject_name_hash() & 0x7fffffff

# store certificate
self._store_cert(ca_id, request_name, '{:X}'.format(serial), convert_byte_to_string(b64_encode(self.logger, crypto.dump_certificate(crypto.FILETYPE_ASN1, cert))), name_hash, issuer_hash)
cert = convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
else:
cert_validity = self.cert_validity_days
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(cert_validity * 86400)

# get extension list
extension_list = self._extension_list_generate(template_dic, cert, ca_cert)
# add extensions (copy from CSR and take the ones we constructed)
cert.add_extensions(req.get_extensions())
cert.add_extensions(extension_list)

if dn_dic:
subject = self._subject_modify(subject, dn_dic)
cert.set_subject(subject)

# sign csr
cert.sign(ca_key, 'sha256')
serial = cert.get_serial_number()

# get hsshes
issuer_hash = ca_cert.subject_name_hash() & 0x7fffffff
name_hash = cert.subject_name_hash() & 0x7fffffff

# store certificate
self._store_cert(ca_id, request_name, '{:X}'.format(serial), convert_byte_to_string(b64_encode(self.logger, crypto.dump_certificate(crypto.FILETYPE_ASN1, cert))), name_hash, issuer_hash)
cert = convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))

error = 'ca lookup failed'
else:
error = 'request_name lookup failed'
self.logger.debug('Certificate.enroll() ended')
return(error, cert, None)

Expand Down
Loading

0 comments on commit 9693b02

Please sign in to comment.