mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-26 16:16:31 -06:00
Split x509.load_certificate() into PEM/DER functions
Splitting the load_certificate() function into two separate helps us word the requirements for the input explicitly. It also makes our backend similar to the one of python-cryptography so eventually we can swap python-cryptography for IPA x509 module. https://pagure.io/freeipa/issue/4985 Reviewed-By: Fraser Tweedale <ftweedal@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com> Reviewed-By: Martin Basti <mbasti@redhat.com>
This commit is contained in:
parent
284658e08e
commit
4375ef860f
@ -1647,7 +1647,7 @@ def get_ca_certs_from_ldap(server, basedn, realm):
|
||||
logger.debug("get_ca_certs_from_ldap() error: %s", e)
|
||||
raise
|
||||
|
||||
certs = [x509.load_certificate(c[0], x509.DER) for c in certs
|
||||
certs = [x509.load_der_x509_certificate(c[0]) for c in certs
|
||||
if c[2] is not False]
|
||||
|
||||
return certs
|
||||
|
@ -66,7 +66,7 @@ class CertRetrieveOverride(MethodOverride):
|
||||
certs = result['result']['certificate_chain']
|
||||
else:
|
||||
certs = [result['result']['certificate']]
|
||||
certs = (x509.normalize_certificate(cert) for cert in certs)
|
||||
certs = (x509.ensure_der_format(cert) for cert in certs)
|
||||
certs = (x509.make_pem(base64.b64encode(cert)) for cert in certs)
|
||||
with open(certificate_out, 'w') as f:
|
||||
f.write('\n'.join(certs))
|
||||
|
@ -683,8 +683,8 @@ class ModVaultData(Local):
|
||||
|
||||
# retrieve transport certificate (cached by vaultconfig_show)
|
||||
response = self.api.Command.vaultconfig_show()
|
||||
transport_cert = x509.load_certificate(
|
||||
response['result']['transport_cert'], x509.DER)
|
||||
transport_cert = x509.load_der_x509_certificate(
|
||||
response['result']['transport_cert'])
|
||||
# call with the retrieved transport certificate
|
||||
return self._do_internal(algo, transport_cert, True,
|
||||
*args, **options)
|
||||
|
@ -30,7 +30,7 @@ from ipalib import errors, x509
|
||||
|
||||
def _parse_cert(dercert):
|
||||
try:
|
||||
cert = x509.load_certificate(dercert, x509.DER)
|
||||
cert = x509.load_der_x509_certificate(dercert)
|
||||
subject = DN(cert.subject)
|
||||
issuer = DN(cert.issuer)
|
||||
serial_number = cert.serial_number
|
||||
|
@ -19,7 +19,7 @@
|
||||
|
||||
# Certificates should be stored internally DER-encoded. We can be passed
|
||||
# a certificate several ways: read if from LDAP, read it from a 3rd party
|
||||
# app (dogtag, candlepin, etc) or as user input. The normalize_certificate()
|
||||
# app (dogtag, candlepin, etc) or as user input. The ensure_der_format()
|
||||
# function will convert an incoming certificate to DER-encoding.
|
||||
|
||||
# Conventions
|
||||
@ -101,24 +101,26 @@ def strip_header(pem):
|
||||
return pem
|
||||
|
||||
|
||||
def load_certificate(data, datatype=PEM):
|
||||
def load_pem_x509_certificate(data):
|
||||
"""
|
||||
Load an X.509 certificate.
|
||||
Load an X.509 certificate in PEM format.
|
||||
|
||||
:param datatype: PEM for base64-encoded data (with or without header),
|
||||
or DER
|
||||
:return: a python-cryptography ``CertificateSigningRequest`` object.
|
||||
:returns: a python-cryptography ``Certificate`` object.
|
||||
:raises: ``ValueError`` if unable to load the certificate.
|
||||
|
||||
"""
|
||||
if type(data) in (tuple, list):
|
||||
data = data[0]
|
||||
return crypto_x509.load_pem_x509_certificate(data,
|
||||
backend=default_backend())
|
||||
|
||||
if (datatype == PEM):
|
||||
data = strip_header(data)
|
||||
data = base64.b64decode(data)
|
||||
|
||||
return cryptography.x509.load_der_x509_certificate(data, default_backend())
|
||||
def load_der_x509_certificate(data):
|
||||
"""
|
||||
Load an X.509 certificate in DER format.
|
||||
|
||||
:returns: a python-cryptography ``Certificate`` object.
|
||||
:raises: ``ValueError`` if unable to load the certificate.
|
||||
"""
|
||||
return crypto_x509.load_der_x509_certificate(data,
|
||||
backend=default_backend())
|
||||
|
||||
|
||||
def load_certificate_from_file(filename, dbdir=None):
|
||||
@ -126,10 +128,9 @@ def load_certificate_from_file(filename, dbdir=None):
|
||||
Load a certificate from a PEM file.
|
||||
|
||||
Returns a python-cryptography ``Certificate`` object.
|
||||
|
||||
"""
|
||||
with open(filename, mode='rb') as f:
|
||||
return load_certificate(f.read(), PEM)
|
||||
return load_pem_x509_certificate(f.read())
|
||||
|
||||
|
||||
def load_certificate_list(data):
|
||||
@ -140,8 +141,7 @@ def load_certificate_list(data):
|
||||
|
||||
"""
|
||||
certs = PEM_REGEX.findall(data)
|
||||
certs = [load_certificate(cert, PEM) for cert in certs]
|
||||
return certs
|
||||
return [load_pem_x509_certificate(cert) for cert in certs]
|
||||
|
||||
|
||||
def load_certificate_list_from_file(filename):
|
||||
@ -151,7 +151,7 @@ def load_certificate_list_from_file(filename):
|
||||
Return a list of python-cryptography ``Certificate`` objects.
|
||||
|
||||
"""
|
||||
with open(filename) as f:
|
||||
with open(filename, 'rb') as f:
|
||||
return load_certificate_list(f.read())
|
||||
|
||||
|
||||
@ -242,7 +242,8 @@ def make_pem(data):
|
||||
pemcert + \
|
||||
'\n-----END CERTIFICATE-----'
|
||||
|
||||
def normalize_certificate(rawcert):
|
||||
|
||||
def ensure_der_format(rawcert):
|
||||
"""
|
||||
Incoming certificates should be DER-encoded. If not it is converted to
|
||||
DER-format.
|
||||
@ -274,17 +275,26 @@ def normalize_certificate(rawcert):
|
||||
|
||||
# At this point we should have a DER certificate.
|
||||
# Attempt to decode it.
|
||||
validate_certificate(dercert, datatype=DER)
|
||||
|
||||
validate_der_x509_certificate(dercert)
|
||||
return dercert
|
||||
|
||||
|
||||
def validate_certificate(cert, datatype=PEM):
|
||||
def validate_pem_x509_certificate(cert):
|
||||
"""
|
||||
Perform cert validation by trying to load it via python-cryptography.
|
||||
"""
|
||||
try:
|
||||
load_certificate(cert, datatype=datatype)
|
||||
load_pem_x509_certificate(cert)
|
||||
except ValueError as e:
|
||||
raise errors.CertificateFormatError(error=str(e))
|
||||
|
||||
|
||||
def validate_der_x509_certificate(cert):
|
||||
"""
|
||||
Perform cert validation by trying to load it via python-cryptography.
|
||||
"""
|
||||
try:
|
||||
load_der_x509_certificate(cert)
|
||||
except ValueError as e:
|
||||
raise errors.CertificateFormatError(error=str(e))
|
||||
|
||||
|
@ -458,7 +458,7 @@ class NSSDatabase(object):
|
||||
if label in ('CERTIFICATE', 'X509 CERTIFICATE',
|
||||
'X.509 CERTIFICATE'):
|
||||
try:
|
||||
x509.load_certificate(match.group(2))
|
||||
x509.load_pem_x509_certificate(match.group(2))
|
||||
except ValueError as e:
|
||||
if label != 'CERTIFICATE':
|
||||
logger.warning(
|
||||
@ -531,7 +531,7 @@ class NSSDatabase(object):
|
||||
|
||||
# Try to load the file as DER certificate
|
||||
try:
|
||||
x509.load_certificate(data, x509.DER)
|
||||
x509.load_der_x509_certificate(data)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
@ -577,7 +577,7 @@ class NSSDatabase(object):
|
||||
"No server certificates found in %s" % (', '.join(files)))
|
||||
|
||||
for cert_pem in extracted_certs:
|
||||
cert = x509.load_certificate(cert_pem)
|
||||
cert = x509.load_pem_x509_certificate(cert_pem)
|
||||
nickname = str(DN(cert.subject))
|
||||
data = cert.public_bytes(serialization.Encoding.DER)
|
||||
self.add_cert(data, nickname, EMPTY_TRUST_FLAGS)
|
||||
@ -688,7 +688,7 @@ class NSSDatabase(object):
|
||||
Raises a ValueError if the certificate is invalid.
|
||||
"""
|
||||
cert = self.get_cert(nickname)
|
||||
cert = x509.load_certificate(cert, x509.DER)
|
||||
cert = x509.load_der_x509_certificate(cert)
|
||||
|
||||
try:
|
||||
self.run_certutil(['-V', '-n', nickname, '-u', 'V'],
|
||||
@ -705,7 +705,7 @@ class NSSDatabase(object):
|
||||
|
||||
def verify_ca_cert_validity(self, nickname):
|
||||
cert = self.get_cert(nickname)
|
||||
cert = x509.load_certificate(cert, x509.DER)
|
||||
cert = x509.load_der_x509_certificate(cert)
|
||||
|
||||
if not cert.subject:
|
||||
raise ValueError("has empty subject")
|
||||
@ -736,6 +736,6 @@ class NSSDatabase(object):
|
||||
def verify_kdc_cert_validity(self, nickname, realm):
|
||||
nicknames = self.get_trust_chain(nickname)
|
||||
certs = [self.get_cert(nickname) for nickname in nicknames]
|
||||
certs = [x509.load_certificate(cert, x509.DER) for cert in certs]
|
||||
certs = [x509.load_der_x509_certificate(cert) for cert in certs]
|
||||
|
||||
verify_kdc_cert_validity(certs[-1], certs[:-1], realm)
|
||||
|
@ -196,7 +196,7 @@ def install_check(standalone, replica_config, options):
|
||||
cert = db.get_cert_from_db(nickname)
|
||||
if not cert:
|
||||
continue
|
||||
subject = DN(x509.load_certificate(cert).subject)
|
||||
subject = DN(x509.load_pem_x509_certificate(cert).subject)
|
||||
if subject == DN(options._ca_subject):
|
||||
raise ScriptError(
|
||||
"Certificate with subject %s is present in %s, "
|
||||
|
@ -1413,7 +1413,7 @@ def update_people_entry(dercert):
|
||||
is needed when a certificate is renewed.
|
||||
"""
|
||||
def make_filter(dercert):
|
||||
cert = x509.load_certificate(dercert, datatype=x509.DER)
|
||||
cert = x509.load_der_x509_certificate(dercert)
|
||||
subject = DN(cert.subject)
|
||||
issuer = DN(cert.issuer)
|
||||
return ldap2.ldap2.combine_filters(
|
||||
@ -1426,8 +1426,8 @@ def update_people_entry(dercert):
|
||||
ldap2.ldap2.MATCH_ALL)
|
||||
|
||||
def make_entry(dercert, entry):
|
||||
cert = x509.load_certificate(dercert, datatype=x509.DER)
|
||||
serial_number = cert.serial
|
||||
cert = x509.load_der_x509_certificate(dercert)
|
||||
serial_number = cert.serial_number
|
||||
subject = DN(cert.subject)
|
||||
issuer = DN(cert.issuer)
|
||||
entry['usercertificate'].append(dercert)
|
||||
@ -1443,7 +1443,7 @@ def update_authority_entry(dercert):
|
||||
serial number to match the given cert.
|
||||
"""
|
||||
def make_filter(dercert):
|
||||
cert = x509.load_certificate(dercert, datatype=x509.DER)
|
||||
cert = x509.load_der_x509_certificate(dercert)
|
||||
subject = str(DN(cert.subject))
|
||||
return ldap2.ldap2.make_filter(
|
||||
dict(objectclass='authority', authoritydn=subject),
|
||||
@ -1451,7 +1451,7 @@ def update_authority_entry(dercert):
|
||||
)
|
||||
|
||||
def make_entry(dercert, entry):
|
||||
cert = x509.load_certificate(dercert, datatype=x509.DER)
|
||||
cert = x509.load_der_x509_certificate(dercert)
|
||||
entry['authoritySerial'] = cert.serial_number
|
||||
return entry
|
||||
|
||||
|
@ -58,7 +58,7 @@ def get_cert_nickname(cert):
|
||||
representation of the first RDN in the subject and subject_dn
|
||||
is a DN object.
|
||||
"""
|
||||
cert_obj = x509.load_certificate(cert)
|
||||
cert_obj = x509.load_pem_x509_certificate(cert)
|
||||
dn = DN(cert_obj.subject)
|
||||
|
||||
return (str(dn[0]), dn)
|
||||
@ -362,7 +362,7 @@ class CertDB(object):
|
||||
return
|
||||
|
||||
cert = self.get_cert_from_db(nickname)
|
||||
cert_obj = x509.load_certificate(cert)
|
||||
cert_obj = x509.load_pem_x509_certificate(cert)
|
||||
subject = str(DN(cert_obj.subject))
|
||||
certmonger.add_principal(request_id, principal)
|
||||
certmonger.add_subject(request_id, subject)
|
||||
|
@ -1054,7 +1054,7 @@ def load_pkcs12(cert_files, key_password, key_nickname, ca_cert_files,
|
||||
if ca_cert is None:
|
||||
ca_cert = cert
|
||||
|
||||
cert_obj = x509.load_certificate(cert, x509.DER)
|
||||
cert_obj = x509.load_der_x509_certificate(cert)
|
||||
subject = DN(cert_obj.subject)
|
||||
issuer = DN(cert_obj.issuer)
|
||||
|
||||
@ -1185,7 +1185,7 @@ def load_external_cert(files, ca_subject):
|
||||
for nickname, _trust_flags in nssdb.list_certs():
|
||||
cert = nssdb.get_cert(nickname, pem=True)
|
||||
|
||||
cert_obj = x509.load_certificate(cert)
|
||||
cert_obj = x509.load_pem_x509_certificate(cert)
|
||||
subject = DN(cert_obj.subject)
|
||||
issuer = DN(cert_obj.issuer)
|
||||
|
||||
|
@ -799,7 +799,7 @@ class ModCertMapData(LDAPModAttribute):
|
||||
data.append(cls._build_mapdata(subject, issuer))
|
||||
|
||||
for dercert in certificates:
|
||||
cert = x509.load_certificate(dercert, x509.DER)
|
||||
cert = x509.load_der_x509_certificate(dercert)
|
||||
issuer = DN(cert.issuer)
|
||||
subject = DN(cert.subject)
|
||||
if not subject:
|
||||
|
@ -325,7 +325,7 @@ def ca_kdc_check(api_instance, hostname):
|
||||
|
||||
|
||||
def validate_certificate(value):
|
||||
return x509.validate_certificate(value, x509.DER)
|
||||
return x509.validate_der_x509_certificate(value)
|
||||
|
||||
|
||||
def bind_principal_can_manage_cert(cert):
|
||||
@ -366,7 +366,7 @@ class BaseCertObject(Object):
|
||||
'certificate', validate_certificate,
|
||||
label=_("Certificate"),
|
||||
doc=_("Base-64 encoded certificate."),
|
||||
normalizer=x509.normalize_certificate,
|
||||
normalizer=x509.ensure_der_format,
|
||||
flags={'no_create', 'no_update', 'no_search'},
|
||||
),
|
||||
Bytes(
|
||||
@ -490,7 +490,7 @@ class BaseCertObject(Object):
|
||||
|
||||
"""
|
||||
if 'certificate' in obj:
|
||||
cert = x509.load_certificate(obj['certificate'])
|
||||
cert = x509.load_pem_x509_certificate(obj['certificate'])
|
||||
obj['subject'] = DN(cert.subject)
|
||||
obj['issuer'] = DN(cert.issuer)
|
||||
obj['serial_number'] = cert.serial_number
|
||||
@ -927,7 +927,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
|
||||
"used for krbtgt certificates")
|
||||
|
||||
if 'certificate_chain' in ca_obj:
|
||||
cert = x509.load_certificate(result['certificate'])
|
||||
cert = x509.load_pem_x509_certificate(result['certificate'])
|
||||
cert = cert.public_bytes(serialization.Encoding.DER)
|
||||
result['certificate_chain'] = [cert] + ca_obj['certificate_chain']
|
||||
|
||||
@ -1191,7 +1191,7 @@ class cert_show(Retrieve, CertMethod, VirtualCommand):
|
||||
# we don't tell Dogtag the issuer (but we check the cert after).
|
||||
#
|
||||
result = self.Backend.ra.get_certificate(str(serial_number))
|
||||
cert = x509.load_certificate(result['certificate'])
|
||||
cert = x509.load_pem_x509_certificate(result['certificate'])
|
||||
|
||||
try:
|
||||
self.check_access()
|
||||
@ -1270,7 +1270,8 @@ class cert_revoke(PKQuery, CertMethod, VirtualCommand):
|
||||
logger.debug("Not granted by ACI to revoke certificate, "
|
||||
"looking at principal")
|
||||
try:
|
||||
cert = x509.load_certificate(resp['result']['certificate'])
|
||||
cert = x509.load_pem_x509_certificate(
|
||||
resp['result']['certificate'])
|
||||
if not bind_principal_can_manage_cert(cert):
|
||||
raise acierr
|
||||
except errors.NotImplementedError:
|
||||
@ -1435,7 +1436,7 @@ class cert_find(Search, CertMethod):
|
||||
|
||||
def _get_cert_key(self, cert):
|
||||
try:
|
||||
cert_obj = x509.load_certificate(cert, x509.DER)
|
||||
cert_obj = x509.load_der_x509_certificate(cert)
|
||||
except ValueError as e:
|
||||
message = messages.SearchResultTruncated(
|
||||
reason=_("failed to load certificate: %s") % e,
|
||||
@ -1691,7 +1692,8 @@ class cert_find(Search, CertMethod):
|
||||
obj['certificate'].replace('\r\n', ''))
|
||||
|
||||
if 'certificate_chain' in ca_obj:
|
||||
cert = x509.load_certificate(obj['certificate'])
|
||||
cert = x509.load_der_x509_certificate(
|
||||
obj['certificate'])
|
||||
cert_der = (
|
||||
cert.public_bytes(serialization.Encoding.DER))
|
||||
obj['certificate_chain'] = (
|
||||
|
@ -691,7 +691,7 @@ class host_add(LDAPCreate):
|
||||
# save the password so it can be displayed in post_callback
|
||||
setattr(context, 'randompassword', entry_attrs['userpassword'])
|
||||
certs = options.get('usercertificate', [])
|
||||
certs_der = [x509.normalize_certificate(c) for c in certs]
|
||||
certs_der = [x509.ensure_der_format(c) for c in certs]
|
||||
entry_attrs['usercertificate'] = certs_der
|
||||
entry_attrs['managedby'] = dn
|
||||
entry_attrs['objectclass'].append('ieee802device')
|
||||
@ -895,7 +895,7 @@ class host_mod(LDAPUpdate):
|
||||
|
||||
# verify certificates
|
||||
certs = entry_attrs.get('usercertificate') or []
|
||||
certs_der = [x509.normalize_certificate(c) for c in certs]
|
||||
certs_der = [x509.ensure_der_format(c) for c in certs]
|
||||
|
||||
# revoke removed certificates
|
||||
ca_is_enabled = self.api.Command.ca_is_enabled()['result']
|
||||
@ -905,7 +905,7 @@ class host_mod(LDAPUpdate):
|
||||
except errors.NotFound:
|
||||
self.obj.handle_not_found(*keys)
|
||||
old_certs = entry_attrs_old.get('usercertificate', [])
|
||||
old_certs_der = [x509.normalize_certificate(c) for c in old_certs]
|
||||
old_certs_der = [x509.ensure_der_format(c) for c in old_certs]
|
||||
removed_certs_der = set(old_certs_der) - set(certs_der)
|
||||
for der in removed_certs_der:
|
||||
rm_certs = api.Command.cert_find(certificate=der)['result']
|
||||
|
@ -220,7 +220,7 @@ def validate_certificate(ugettext, cert):
|
||||
Check whether the certificate is properly encoded to DER
|
||||
"""
|
||||
if api.env.in_server:
|
||||
x509.validate_certificate(cert, datatype=x509.DER)
|
||||
x509.validate_der_x509_certificate(cert)
|
||||
|
||||
|
||||
def revoke_certs(certs):
|
||||
@ -269,8 +269,8 @@ def set_certificate_attrs(entry_attrs):
|
||||
cert = entry_attrs['usercertificate'][0]
|
||||
else:
|
||||
cert = entry_attrs['usercertificate']
|
||||
cert = x509.normalize_certificate(cert)
|
||||
cert = x509.load_certificate(cert, datatype=x509.DER)
|
||||
cert = x509.ensure_der_format(cert)
|
||||
cert = x509.load_der_x509_certificate(cert)
|
||||
entry_attrs['subject'] = unicode(DN(cert.subject))
|
||||
entry_attrs['serial_number'] = unicode(cert.serial_number)
|
||||
entry_attrs['serial_number_hex'] = u'0x%X' % cert.serial_number
|
||||
@ -633,7 +633,7 @@ class service_add(LDAPCreate):
|
||||
self.obj.validate_ipakrbauthzdata(entry_attrs)
|
||||
|
||||
certs = options.get('usercertificate', [])
|
||||
certs_der = [x509.normalize_certificate(c) for c in certs]
|
||||
certs_der = [x509.ensure_der_format(c) for c in certs]
|
||||
entry_attrs['usercertificate'] = certs_der
|
||||
|
||||
if not options.get('force', False):
|
||||
@ -705,7 +705,7 @@ class service_mod(LDAPUpdate):
|
||||
|
||||
# verify certificates
|
||||
certs = entry_attrs.get('usercertificate') or []
|
||||
certs_der = [x509.normalize_certificate(c) for c in certs]
|
||||
certs_der = [x509.ensure_der_format(c) for c in certs]
|
||||
# revoke removed certificates
|
||||
ca_is_enabled = self.api.Command.ca_is_enabled()['result']
|
||||
if 'usercertificate' in options and ca_is_enabled:
|
||||
@ -714,7 +714,7 @@ class service_mod(LDAPUpdate):
|
||||
except errors.NotFound:
|
||||
self.obj.handle_not_found(*keys)
|
||||
old_certs = entry_attrs_old.get('usercertificate', [])
|
||||
old_certs_der = [x509.normalize_certificate(c) for c in old_certs]
|
||||
old_certs_der = [x509.ensure_der_format(c) for c in old_certs]
|
||||
removed_certs_der = set(old_certs_der) - set(certs_der)
|
||||
for der in removed_certs_der:
|
||||
rm_certs = api.Command.cert_find(certificate=der)['result']
|
||||
|
@ -34,10 +34,32 @@ pytestmark = pytest.mark.tier0
|
||||
# certutil -
|
||||
|
||||
# certificate for CN=ipa.example.com,O=IPA
|
||||
goodcert = 'MIICAjCCAWugAwIBAgICBEUwDQYJKoZIhvcNAQEFBQAwKTEnMCUGA1UEAxMeSVBBIFRlc3QgQ2VydGlmaWNhdGUgQXV0aG9yaXR5MB4XDTEwMDYyNTEzMDA0MloXDTE1MDYyNTEzMDA0MlowKDEMMAoGA1UEChMDSVBBMRgwFgYDVQQDEw9pcGEuZXhhbXBsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAJcZ+H6+cQaN/BlzR8OYkVeJgaU5tCaV9FF1m7Ws/ftPtTJUaSL1ncp6603rjA4tH1aa/B8i8xdC46+ZbY2au8b9ryGcOsx2uaRpNLEQ2Fy//q1kQC8oM+iD8Nd6osF0a2wnugsgnJHPuJzhViaWxYgzk5DRdP81debokF3f3FX/AgMBAAGjOjA4MBEGCWCGSAGG+EIBAQQEAwIGQDATBgNVHSUEDDAKBggrBgEFBQcDATAOBgNVHQ8BAf8EBAMCBPAwDQYJKoZIhvcNAQEFBQADgYEALD6X9V9w381AzzQPcHsjIjiX3B/AF9RCGocKZUDXkdDhsD9NZ3PLPEf1AMjkraKG963HPB8scyiBbbSuSh6m7TCp0eDgRpo77zNuvd3U4Qpm0Qk+KEjtHQDjNNG6N4ZnCQPmjFPScElvc/GgW7XMbywJy2euF+3/Uip8cnPgSH4='
|
||||
goodcert = (
|
||||
b'MIICAjCCAWugAwIBAgICBEUwDQYJKoZIhvcNAQEFBQAwKTEnMCUGA1UEAxMeSVBB'
|
||||
b'IFRlc3QgQ2VydGlmaWNhdGUgQXV0aG9yaXR5MB4XDTEwMDYyNTEzMDA0MloXDTE1'
|
||||
b'MDYyNTEzMDA0MlowKDEMMAoGA1UEChMDSVBBMRgwFgYDVQQDEw9pcGEuZXhhbXBs'
|
||||
b'ZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAJcZ+H6+cQaN/BlzR8OY'
|
||||
b'kVeJgaU5tCaV9FF1m7Ws/ftPtTJUaSL1ncp6603rjA4tH1aa/B8i8xdC46+ZbY2a'
|
||||
b'u8b9ryGcOsx2uaRpNLEQ2Fy//q1kQC8oM+iD8Nd6osF0a2wnugsgnJHPuJzhViaW'
|
||||
b'xYgzk5DRdP81debokF3f3FX/AgMBAAGjOjA4MBEGCWCGSAGG+EIBAQQEAwIGQDAT'
|
||||
b'BgNVHSUEDDAKBggrBgEFBQcDATAOBgNVHQ8BAf8EBAMCBPAwDQYJKoZIhvcNAQEF'
|
||||
b'BQADgYEALD6X9V9w381AzzQPcHsjIjiX3B/AF9RCGocKZUDXkdDhsD9NZ3PLPEf1'
|
||||
b'AMjkraKG963HPB8scyiBbbSuSh6m7TCp0eDgRpo77zNuvd3U4Qpm0Qk+KEjtHQDj'
|
||||
b'NNG6N4ZnCQPmjFPScElvc/GgW7XMbywJy2euF+3/Uip8cnPgSH4='
|
||||
)
|
||||
|
||||
goodcert_headers = (
|
||||
b'-----BEGIN CERTIFICATE-----\n' +
|
||||
goodcert +
|
||||
b'\n-----END CERTIFICATE-----'
|
||||
)
|
||||
# The base64-encoded string 'bad cert'
|
||||
badcert = 'YmFkIGNlcnQ='
|
||||
badcert = (
|
||||
b'-----BEGIN CERTIFICATE-----\n'
|
||||
b'YmFkIGNlcnQ=\r\n'
|
||||
b'-----END CERTIFICATE-----'
|
||||
)
|
||||
|
||||
|
||||
class test_x509(object):
|
||||
"""
|
||||
@ -55,39 +77,21 @@ class test_x509(object):
|
||||
"""
|
||||
|
||||
# Load a good cert
|
||||
x509.load_certificate(goodcert)
|
||||
|
||||
# Should handle list/tuple
|
||||
x509.load_certificate((goodcert,))
|
||||
x509.load_certificate([goodcert])
|
||||
|
||||
# Load a good cert with headers
|
||||
newcert = '-----BEGIN CERTIFICATE-----' + goodcert + '-----END CERTIFICATE-----'
|
||||
x509.load_certificate(newcert)
|
||||
|
||||
# Should handle list/tuple
|
||||
x509.load_certificate((newcert,))
|
||||
x509.load_certificate([newcert])
|
||||
x509.load_pem_x509_certificate(goodcert_headers)
|
||||
|
||||
# Load a good cert with headers and leading text
|
||||
newcert = (
|
||||
'leading text\n-----BEGIN CERTIFICATE-----' +
|
||||
goodcert +
|
||||
'-----END CERTIFICATE-----')
|
||||
x509.load_certificate(newcert)
|
||||
|
||||
# Should handle list/tuple
|
||||
x509.load_certificate((newcert,))
|
||||
x509.load_certificate([newcert])
|
||||
'leading text\n' + goodcert_headers)
|
||||
x509.load_pem_x509_certificate(newcert)
|
||||
|
||||
# Load a good cert with bad headers
|
||||
newcert = '-----BEGIN CERTIFICATE-----' + goodcert
|
||||
newcert = '-----BEGIN CERTIFICATE-----' + goodcert_headers
|
||||
with pytest.raises((TypeError, ValueError)):
|
||||
x509.load_certificate(newcert)
|
||||
x509.load_pem_x509_certificate(newcert)
|
||||
|
||||
# Load a bad cert
|
||||
with pytest.raises(ValueError):
|
||||
x509.load_certificate(badcert)
|
||||
x509.load_pem_x509_certificate(badcert)
|
||||
|
||||
def test_1_load_der_cert(self):
|
||||
"""
|
||||
@ -97,11 +101,7 @@ class test_x509(object):
|
||||
der = base64.b64decode(goodcert)
|
||||
|
||||
# Load a good cert
|
||||
x509.load_certificate(der, x509.DER)
|
||||
|
||||
# Should handle list/tuple
|
||||
x509.load_certificate((der,), x509.DER)
|
||||
x509.load_certificate([der], x509.DER)
|
||||
x509.load_der_x509_certificate(der)
|
||||
|
||||
def test_3_cert_contents(self):
|
||||
"""
|
||||
@ -112,7 +112,7 @@ class test_x509(object):
|
||||
|
||||
not_before = datetime.datetime(2010, 6, 25, 13, 0, 42)
|
||||
not_after = datetime.datetime(2015, 6, 25, 13, 0, 42)
|
||||
cert = x509.load_certificate(goodcert)
|
||||
cert = x509.load_pem_x509_certificate(goodcert_headers)
|
||||
|
||||
assert DN(cert.subject) == DN(('CN', 'ipa.example.com'), ('O', 'IPA'))
|
||||
assert DN(cert.issuer) == DN(('CN', 'IPA Test Certificate Authority'))
|
||||
|
Loading…
Reference in New Issue
Block a user