mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
x509: Make certificates represented as objects
https://pagure.io/freeipa/issue/4985 Reviewed-By: Fraser Tweedale <ftweedal@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com> Reviewed-By: Martin Basti <mbasti@redhat.com>
This commit is contained in:
committed by
Pavel Vomacka
parent
4375ef860f
commit
b5732efda6
532
ipalib/x509.py
532
ipalib/x509.py
@@ -39,10 +39,15 @@ import ssl
|
||||
import base64
|
||||
import re
|
||||
|
||||
from cryptography import x509 as crypto_x509
|
||||
from cryptography import utils as crypto_utils
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import cryptography.x509
|
||||
from cryptography.hazmat.primitives.serialization import (
|
||||
Encoding, PublicFormat
|
||||
)
|
||||
from pyasn1.type import univ, char, namedtype, tag
|
||||
from pyasn1.codec.der import decoder, encoder
|
||||
# from pyasn1.codec.native import decoder, encoder
|
||||
from pyasn1_modules import rfc2315, rfc2459
|
||||
import six
|
||||
|
||||
@@ -101,26 +106,319 @@ def strip_header(pem):
|
||||
return pem
|
||||
|
||||
|
||||
@crypto_utils.register_interface(crypto_x509.Certificate)
|
||||
class IPACertificate(object):
|
||||
"""
|
||||
A proxy class wrapping a python-cryptography certificate representation for
|
||||
FreeIPA purposes
|
||||
"""
|
||||
def __init__(self, cert, backend=None):
|
||||
"""
|
||||
:param cert: A python-cryptography Certificate object
|
||||
:param backend: A python-cryptography Backend object
|
||||
"""
|
||||
self._cert = cert
|
||||
self.backend = default_backend() if backend is None else backend()
|
||||
|
||||
# initialize the certificate fields
|
||||
# we have to do it this way so that some systems don't explode since
|
||||
# some field types encode-decoding is not strongly defined
|
||||
self._subject = self.__get_der_field('subject')
|
||||
self._issuer = self.__get_der_field('issuer')
|
||||
|
||||
def __getstate__(self):
|
||||
state = {
|
||||
'_cert': self.public_bytes(Encoding.DER),
|
||||
'_subject': self.subject_bytes,
|
||||
'_issuer': self.issuer_bytes,
|
||||
}
|
||||
return state
|
||||
|
||||
def __setstate__(self, state):
|
||||
self._subject = state['_subject']
|
||||
self._issuer = state['_issuer']
|
||||
self._cert = crypto_x509.load_der_x509_certificate(
|
||||
state['_cert'], backend=default_backend())
|
||||
|
||||
def __eq__(self, other):
|
||||
"""
|
||||
Checks equality.
|
||||
|
||||
:param other: either cryptography.Certificate or IPACertificate or
|
||||
bytes representing a DER-formatted certificate
|
||||
"""
|
||||
if (isinstance(other, (crypto_x509.Certificate, IPACertificate))):
|
||||
return (self.public_bytes(Encoding.DER) ==
|
||||
other.public_bytes(Encoding.DER))
|
||||
elif isinstance(other, bytes):
|
||||
return self.public_bytes(Encoding.DER) == other
|
||||
else:
|
||||
return False
|
||||
|
||||
def __ne__(self, other):
|
||||
"""
|
||||
Checks not equal.
|
||||
"""
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __hash__(self):
|
||||
"""
|
||||
Computes a hash of the wrapped cryptography.Certificate.
|
||||
"""
|
||||
return hash(self._cert)
|
||||
|
||||
def __encode_extension(self, oid, critical, value):
|
||||
# TODO: have another proxy for crypto_x509.Extension which would
|
||||
# provide public_bytes on the top of what python-cryptography has
|
||||
ext = rfc2459.Extension()
|
||||
# TODO: this does not have to be so weird, pyasn1 now has codecs
|
||||
# which are capable of providing python-native types
|
||||
ext['extnID'] = univ.ObjectIdentifier(oid)
|
||||
ext['critical'] = univ.Boolean(critical)
|
||||
ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value)))
|
||||
ext = encoder.encode(ext)
|
||||
return ext
|
||||
|
||||
def __get_pyasn1_field(self, field):
|
||||
"""
|
||||
:returns: a field of the certificate in pyasn1 representation
|
||||
"""
|
||||
cert_bytes = self.tbs_certificate_bytes
|
||||
cert = decoder.decode(cert_bytes, rfc2459.TBSCertificate())[0]
|
||||
field = cert[field]
|
||||
return field
|
||||
|
||||
def __get_der_field(self, field):
|
||||
"""
|
||||
:field: the name of the field of the certificate
|
||||
:returns: bytes representing the value of a certificate field
|
||||
"""
|
||||
return encoder.encode(self.__get_pyasn1_field(field))
|
||||
|
||||
def public_bytes(self, encoding):
|
||||
"""
|
||||
Serializes the certificate to PEM or DER format.
|
||||
"""
|
||||
return self._cert.public_bytes(encoding)
|
||||
|
||||
def is_self_signed(self):
|
||||
"""
|
||||
:returns: True if this certificate is self-signed, False otherwise
|
||||
"""
|
||||
return self._cert.issuer == self._cert.subject
|
||||
|
||||
def fingerprint(self, algorithm):
|
||||
"""
|
||||
Counts fingerprint of the wrapped cryptography.Certificate
|
||||
"""
|
||||
return self._cert.fingerprint(algorithm)
|
||||
|
||||
@property
|
||||
def serial_number(self):
|
||||
return self._cert.serial_number
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
return self._cert.version
|
||||
|
||||
@property
|
||||
def subject(self):
|
||||
return self._cert.subject
|
||||
|
||||
@property
|
||||
def subject_bytes(self):
|
||||
return self._subject
|
||||
|
||||
@property
|
||||
def signature_hash_algorithm(self):
|
||||
"""
|
||||
Returns a HashAlgorithm corresponding to the type of the digest signed
|
||||
in the certificate.
|
||||
"""
|
||||
return self._cert.signature_hash_algorithm
|
||||
|
||||
@property
|
||||
def signature_algorithm_oid(self):
|
||||
"""
|
||||
Returns the ObjectIdentifier of the signature algorithm.
|
||||
"""
|
||||
return self._cert.signature_algorithm_oid
|
||||
|
||||
@property
|
||||
def signature(self):
|
||||
"""
|
||||
Returns the signature bytes.
|
||||
"""
|
||||
return self._cert.signature
|
||||
|
||||
@property
|
||||
def issuer(self):
|
||||
return self._cert.issuer
|
||||
|
||||
@property
|
||||
def issuer_bytes(self):
|
||||
return self._issuer
|
||||
|
||||
@property
|
||||
def not_valid_before(self):
|
||||
return self._cert.not_valid_before
|
||||
|
||||
@property
|
||||
def not_valid_after(self):
|
||||
return self._cert.not_valid_after
|
||||
|
||||
@property
|
||||
def tbs_certificate_bytes(self):
|
||||
return self._cert.tbs_certificate_bytes
|
||||
|
||||
@property
|
||||
def extensions(self):
|
||||
# TODO: own Extension and Extensions classes proxying
|
||||
# python-cryptography
|
||||
return self._cert.extensions
|
||||
|
||||
def public_key(self):
|
||||
return self._cert.public_key()
|
||||
|
||||
@property
|
||||
def public_key_info_bytes(self):
|
||||
return self._cert.public_key().public_bytes(
|
||||
encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo)
|
||||
|
||||
@property
|
||||
def extended_key_usage(self):
|
||||
try:
|
||||
ext_key_usage = self._cert.extensions.get_extension_for_oid(
|
||||
crypto_x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value
|
||||
except crypto_x509.ExtensionNotFound:
|
||||
return None
|
||||
|
||||
return set(oid.dotted_string for oid in ext_key_usage)
|
||||
|
||||
@property
|
||||
def extended_key_usage_bytes(self):
|
||||
ekurfc = rfc2459.ExtKeyUsageSyntax()
|
||||
eku = self.extended_key_usage or {EKU_PLACEHOLDER}
|
||||
for i, oid in enumerate(eku):
|
||||
ekurfc[i] = univ.ObjectIdentifier(oid)
|
||||
ekurfc = encoder.encode(ekurfc)
|
||||
return self.__encode_extension('2.5.29.37', EKU_ANY not in eku, ekurfc)
|
||||
|
||||
@property
|
||||
def san_general_names(self):
|
||||
"""
|
||||
Return SAN general names from a python-cryptography
|
||||
certificate object. If the SAN extension is not present,
|
||||
return an empty sequence.
|
||||
|
||||
Because python-cryptography does not yet provide a way to
|
||||
handle unrecognised critical extensions (which may occur),
|
||||
we must parse the certificate and extract the General Names.
|
||||
For uniformity with other code, we manually construct values
|
||||
of python-crytography GeneralName subtypes.
|
||||
|
||||
python-cryptography does not yet provide types for
|
||||
ediPartyName or x400Address, so we drop these name types.
|
||||
|
||||
otherNames are NOT instantiated to more specific types where
|
||||
the type is known. Use ``process_othernames`` to do that.
|
||||
|
||||
When python-cryptography can handle certs with unrecognised
|
||||
critical extensions and implements ediPartyName and
|
||||
x400Address, this function (and helpers) will be redundant
|
||||
and should go away.
|
||||
|
||||
"""
|
||||
gns = self.__pyasn1_get_san_general_names()
|
||||
|
||||
GENERAL_NAME_CONSTRUCTORS = {
|
||||
'rfc822Name': lambda x: crypto_x509.RFC822Name(unicode(x)),
|
||||
'dNSName': lambda x: crypto_x509.DNSName(unicode(x)),
|
||||
'directoryName': _pyasn1_to_cryptography_directoryname,
|
||||
'registeredID': _pyasn1_to_cryptography_registeredid,
|
||||
'iPAddress': _pyasn1_to_cryptography_ipaddress,
|
||||
'uniformResourceIdentifier':
|
||||
lambda x: crypto_x509.UniformResourceIdentifier(unicode(x)),
|
||||
'otherName': _pyasn1_to_cryptography_othername,
|
||||
}
|
||||
|
||||
result = []
|
||||
|
||||
for gn in gns:
|
||||
gn_type = gn.getName()
|
||||
if gn_type in GENERAL_NAME_CONSTRUCTORS:
|
||||
result.append(
|
||||
GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent()))
|
||||
|
||||
return result
|
||||
|
||||
def __pyasn1_get_san_general_names(self):
|
||||
# pyasn1 returns None when the key is not present in the certificate
|
||||
# but we need an iterable
|
||||
extensions = self.__get_pyasn1_field('extensions') or []
|
||||
OID_SAN = univ.ObjectIdentifier('2.5.29.17')
|
||||
gns = []
|
||||
for ext in extensions:
|
||||
if ext['extnID'] == OID_SAN:
|
||||
der = decoder.decode(
|
||||
ext['extnValue'], asn1Spec=univ.OctetString())[0]
|
||||
gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0]
|
||||
break
|
||||
return gns
|
||||
|
||||
@property
|
||||
def san_a_label_dns_names(self):
|
||||
gns = self.__pyasn1_get_san_general_names()
|
||||
result = []
|
||||
|
||||
for gn in gns:
|
||||
if gn.getName() == 'dNSName':
|
||||
result.append(unicode(gn.getComponent()))
|
||||
|
||||
return result
|
||||
|
||||
def match_hostname(self, hostname):
|
||||
match_cert = {}
|
||||
|
||||
match_cert['subject'] = match_subject = []
|
||||
for rdn in self._cert.subject.rdns:
|
||||
match_rdn = []
|
||||
for ava in rdn:
|
||||
if ava.oid == crypto_x509.oid.NameOID.COMMON_NAME:
|
||||
match_rdn.append(('commonName', ava.value))
|
||||
match_subject.append(match_rdn)
|
||||
|
||||
values = self.san_a_label_dns_names
|
||||
if values:
|
||||
match_cert['subjectAltName'] = match_san = []
|
||||
for value in values:
|
||||
match_san.append(('DNS', value))
|
||||
|
||||
ssl.match_hostname(match_cert, DNSName(hostname).ToASCII())
|
||||
|
||||
|
||||
def load_pem_x509_certificate(data):
|
||||
"""
|
||||
Load an X.509 certificate in PEM format.
|
||||
|
||||
:returns: a python-cryptography ``Certificate`` object.
|
||||
:returns: a ``IPACertificate`` object.
|
||||
:raises: ``ValueError`` if unable to load the certificate.
|
||||
"""
|
||||
return crypto_x509.load_pem_x509_certificate(data,
|
||||
backend=default_backend())
|
||||
return IPACertificate(
|
||||
crypto_x509.load_pem_x509_certificate(data, backend=default_backend())
|
||||
)
|
||||
|
||||
|
||||
def load_der_x509_certificate(data):
|
||||
"""
|
||||
Load an X.509 certificate in DER format.
|
||||
|
||||
:returns: a python-cryptography ``Certificate`` object.
|
||||
:returns: a ``IPACertificate`` object.
|
||||
:raises: ``ValueError`` if unable to load the certificate.
|
||||
"""
|
||||
return crypto_x509.load_der_x509_certificate(data,
|
||||
backend=default_backend())
|
||||
return IPACertificate(
|
||||
crypto_x509.load_der_x509_certificate(data, backend=default_backend())
|
||||
)
|
||||
|
||||
|
||||
def load_certificate_from_file(filename, dbdir=None):
|
||||
@@ -138,7 +436,6 @@ def load_certificate_list(data):
|
||||
Load a certificate list from a sequence of concatenated PEMs.
|
||||
|
||||
Return a list of python-cryptography ``Certificate`` objects.
|
||||
|
||||
"""
|
||||
certs = PEM_REGEX.findall(data)
|
||||
return [load_pem_x509_certificate(cert) for cert in certs]
|
||||
@@ -155,11 +452,11 @@ def load_certificate_list_from_file(filename):
|
||||
return load_certificate_list(f.read())
|
||||
|
||||
|
||||
def pkcs7_to_pems(data, datatype=PEM):
|
||||
def pkcs7_to_certs(data, datatype=PEM):
|
||||
"""
|
||||
Extract certificates from a PKCS #7 object.
|
||||
|
||||
Return a ``list`` of X.509 PEM strings.
|
||||
:returns: a ``list`` of ``IPACertificate`` objects.
|
||||
"""
|
||||
if datatype == PEM:
|
||||
match = re.match(
|
||||
@@ -187,62 +484,12 @@ def pkcs7_to_pems(data, datatype=PEM):
|
||||
|
||||
for certificate in signed_data['certificates']:
|
||||
certificate = encoder.encode(certificate)
|
||||
certificate = base64.b64encode(certificate)
|
||||
certificate = make_pem(certificate)
|
||||
certificate = load_der_x509_certificate(certificate)
|
||||
result.append(certificate)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def is_self_signed(certificate, datatype=PEM):
|
||||
cert = load_certificate(certificate, datatype)
|
||||
return cert.issuer == cert.subject
|
||||
|
||||
|
||||
def _get_der_field(cert, datatype, dbdir, field):
|
||||
cert = normalize_certificate(cert)
|
||||
cert = decoder.decode(cert, rfc2459.Certificate())[0]
|
||||
field = cert['tbsCertificate'][field]
|
||||
field = encoder.encode(field)
|
||||
return field
|
||||
|
||||
def get_der_subject(cert, datatype=PEM, dbdir=None):
|
||||
return _get_der_field(cert, datatype, dbdir, 'subject')
|
||||
|
||||
def get_der_issuer(cert, datatype=PEM, dbdir=None):
|
||||
return _get_der_field(cert, datatype, dbdir, 'issuer')
|
||||
|
||||
def get_der_serial_number(cert, datatype=PEM, dbdir=None):
|
||||
return _get_der_field(cert, datatype, dbdir, 'serialNumber')
|
||||
|
||||
def get_der_public_key_info(cert, datatype=PEM, dbdir=None):
|
||||
return _get_der_field(cert, datatype, dbdir, 'subjectPublicKeyInfo')
|
||||
|
||||
|
||||
def get_ext_key_usage(certificate, datatype=PEM):
|
||||
cert = load_certificate(certificate, datatype)
|
||||
try:
|
||||
eku = cert.extensions.get_extension_for_oid(
|
||||
cryptography.x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value
|
||||
except cryptography.x509.ExtensionNotFound:
|
||||
return None
|
||||
|
||||
return set(oid.dotted_string for oid in eku)
|
||||
|
||||
|
||||
def make_pem(data):
|
||||
"""
|
||||
Convert a raw base64-encoded blob into something that looks like a PE
|
||||
file with lines split to 64 characters and proper headers.
|
||||
"""
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('ascii')
|
||||
pemcert = '\r\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
|
||||
return '-----BEGIN CERTIFICATE-----\n' + \
|
||||
pemcert + \
|
||||
'\n-----END CERTIFICATE-----'
|
||||
|
||||
|
||||
def ensure_der_format(rawcert):
|
||||
"""
|
||||
Incoming certificates should be DER-encoded. If not it is converted to
|
||||
@@ -254,8 +501,6 @@ def ensure_der_format(rawcert):
|
||||
if not rawcert:
|
||||
return None
|
||||
|
||||
rawcert = strip_header(rawcert)
|
||||
|
||||
try:
|
||||
if isinstance(rawcert, bytes):
|
||||
# base64 must work with utf-8, otherwise it is raw bin certificate
|
||||
@@ -299,58 +544,37 @@ def validate_der_x509_certificate(cert):
|
||||
raise errors.CertificateFormatError(error=str(e))
|
||||
|
||||
|
||||
def write_certificate(rawcert, filename):
|
||||
def write_certificate(cert, filename):
|
||||
"""
|
||||
Write the certificate to a file in PEM format.
|
||||
|
||||
The cert value can be either DER or PEM-encoded, it will be normalized
|
||||
to DER regardless, then back out to PEM.
|
||||
"""
|
||||
dercert = normalize_certificate(rawcert)
|
||||
|
||||
try:
|
||||
fp = open(filename, 'w')
|
||||
fp.write(make_pem(base64.b64encode(dercert)))
|
||||
fp.close()
|
||||
with open(filename, 'wb') as fp:
|
||||
fp.write(cert.public_bytes(Encoding.PEM))
|
||||
except (IOError, OSError) as e:
|
||||
raise errors.FileError(reason=str(e))
|
||||
|
||||
def write_certificate_list(rawcerts, filename):
|
||||
|
||||
def write_certificate_list(certs, filename):
|
||||
"""
|
||||
Write a list of certificates to a file in PEM format.
|
||||
|
||||
The cert values can be either DER or PEM-encoded, they will be normalized
|
||||
to DER regardless, then back out to PEM.
|
||||
:param certs: a list of IPACertificate objects to be written to a file
|
||||
:param filename: a path to the file the certificates should be written into
|
||||
"""
|
||||
dercerts = [normalize_certificate(rawcert) for rawcert in rawcerts]
|
||||
|
||||
try:
|
||||
with open(filename, 'w') as f:
|
||||
for cert in dercerts:
|
||||
cert = base64.b64encode(cert)
|
||||
cert = make_pem(cert)
|
||||
f.write(cert + '\n')
|
||||
with open(filename, 'wb') as f:
|
||||
for cert in certs:
|
||||
f.write(cert.public_bytes(Encoding.PEM))
|
||||
except (IOError, OSError) as e:
|
||||
raise errors.FileError(reason=str(e))
|
||||
|
||||
|
||||
def _encode_extension(oid, critical, value):
|
||||
ext = rfc2459.Extension()
|
||||
ext['extnID'] = univ.ObjectIdentifier(oid)
|
||||
ext['critical'] = univ.Boolean(critical)
|
||||
ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value)))
|
||||
ext = encoder.encode(ext)
|
||||
return ext
|
||||
|
||||
|
||||
def encode_ext_key_usage(ext_key_usage):
|
||||
eku = rfc2459.ExtKeyUsageSyntax()
|
||||
for i, oid in enumerate(ext_key_usage):
|
||||
eku[i] = univ.ObjectIdentifier(oid)
|
||||
eku = encoder.encode(eku)
|
||||
return _encode_extension('2.5.29.37', EKU_ANY not in ext_key_usage, eku)
|
||||
|
||||
|
||||
class _PrincipalName(univ.Sequence):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('name-type', univ.Integer().subtype(
|
||||
@@ -385,13 +609,13 @@ def _decode_krb5principalname(data):
|
||||
return name
|
||||
|
||||
|
||||
class KRB5PrincipalName(cryptography.x509.general_name.OtherName):
|
||||
class KRB5PrincipalName(crypto_x509.general_name.OtherName):
|
||||
def __init__(self, type_id, value):
|
||||
super(KRB5PrincipalName, self).__init__(type_id, value)
|
||||
self.name = _decode_krb5principalname(value)
|
||||
|
||||
|
||||
class UPN(cryptography.x509.general_name.OtherName):
|
||||
class UPN(crypto_x509.general_name.OtherName):
|
||||
def __init__(self, type_id, value):
|
||||
super(UPN, self).__init__(type_id, value)
|
||||
self.name = unicode(
|
||||
@@ -411,128 +635,48 @@ def process_othernames(gns):
|
||||
|
||||
"""
|
||||
for gn in gns:
|
||||
if isinstance(gn, cryptography.x509.general_name.OtherName):
|
||||
if isinstance(gn, crypto_x509.general_name.OtherName):
|
||||
cls = OTHERNAME_CLASS_MAP.get(
|
||||
gn.type_id.dotted_string,
|
||||
cryptography.x509.general_name.OtherName)
|
||||
crypto_x509.general_name.OtherName)
|
||||
yield cls(gn.type_id, gn.value)
|
||||
else:
|
||||
yield gn
|
||||
|
||||
|
||||
def _pyasn1_get_san_general_names(cert):
|
||||
tbs = decoder.decode(
|
||||
cert.tbs_certificate_bytes,
|
||||
asn1Spec=rfc2459.TBSCertificate()
|
||||
)[0]
|
||||
OID_SAN = univ.ObjectIdentifier('2.5.29.17')
|
||||
# One would expect KeyError or empty iterable when the key ('extensions'
|
||||
# in this particular case) is not pressent in the certificate but pyasn1
|
||||
# returns None here
|
||||
extensions = tbs['extensions'] or []
|
||||
gns = []
|
||||
for ext in extensions:
|
||||
if ext['extnID'] == OID_SAN:
|
||||
der = decoder.decode(
|
||||
ext['extnValue'], asn1Spec=univ.OctetString())[0]
|
||||
gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0]
|
||||
break
|
||||
|
||||
return gns
|
||||
|
||||
|
||||
def get_san_general_names(cert):
|
||||
"""
|
||||
Return SAN general names from a python-cryptography
|
||||
certificate object. If the SAN extension is not present,
|
||||
return an empty sequence.
|
||||
|
||||
Because python-cryptography does not yet provide a way to
|
||||
handle unrecognised critical extensions (which may occur),
|
||||
we must parse the certificate and extract the General Names.
|
||||
For uniformity with other code, we manually construct values
|
||||
of python-crytography GeneralName subtypes.
|
||||
|
||||
python-cryptography does not yet provide types for
|
||||
ediPartyName or x400Address, so we drop these name types.
|
||||
|
||||
otherNames are NOT instantiated to more specific types where
|
||||
the type is known. Use ``process_othernames`` to do that.
|
||||
|
||||
When python-cryptography can handle certs with unrecognised
|
||||
critical extensions and implements ediPartyName and
|
||||
x400Address, this function (and helpers) will be redundant
|
||||
and should go away.
|
||||
|
||||
"""
|
||||
gns = _pyasn1_get_san_general_names(cert)
|
||||
|
||||
GENERAL_NAME_CONSTRUCTORS = {
|
||||
'rfc822Name': lambda x: cryptography.x509.RFC822Name(unicode(x)),
|
||||
'dNSName': lambda x: cryptography.x509.DNSName(unicode(x)),
|
||||
'directoryName': _pyasn1_to_cryptography_directoryname,
|
||||
'registeredID': _pyasn1_to_cryptography_registeredid,
|
||||
'iPAddress': _pyasn1_to_cryptography_ipaddress,
|
||||
'uniformResourceIdentifier':
|
||||
lambda x: cryptography.x509.UniformResourceIdentifier(unicode(x)),
|
||||
'otherName': _pyasn1_to_cryptography_othername,
|
||||
}
|
||||
|
||||
result = []
|
||||
|
||||
for gn in gns:
|
||||
gn_type = gn.getName()
|
||||
if gn_type in GENERAL_NAME_CONSTRUCTORS:
|
||||
result.append(
|
||||
GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent()))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_directoryname(dn):
|
||||
attrs = []
|
||||
|
||||
# Name is CHOICE { RDNSequence } (only one possibility)
|
||||
for rdn in dn.getComponent():
|
||||
for ava in rdn:
|
||||
attr = cryptography.x509.NameAttribute(
|
||||
attr = crypto_x509.NameAttribute(
|
||||
_pyasn1_to_cryptography_oid(ava['type']),
|
||||
unicode(decoder.decode(ava['value'])[0])
|
||||
)
|
||||
attrs.append(attr)
|
||||
|
||||
return cryptography.x509.DirectoryName(cryptography.x509.Name(attrs))
|
||||
return crypto_x509.DirectoryName(crypto_x509.Name(attrs))
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_registeredid(oid):
|
||||
return cryptography.x509.RegisteredID(_pyasn1_to_cryptography_oid(oid))
|
||||
return crypto_x509.RegisteredID(_pyasn1_to_cryptography_oid(oid))
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_ipaddress(octet_string):
|
||||
return cryptography.x509.IPAddress(
|
||||
return crypto_x509.IPAddress(
|
||||
ipaddress.ip_address(bytes(octet_string)))
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_othername(on):
|
||||
return cryptography.x509.OtherName(
|
||||
return crypto_x509.OtherName(
|
||||
_pyasn1_to_cryptography_oid(on['type-id']),
|
||||
bytes(on['value'])
|
||||
)
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_oid(oid):
|
||||
return cryptography.x509.ObjectIdentifier(str(oid))
|
||||
|
||||
|
||||
def get_san_a_label_dns_names(cert):
|
||||
gns = _pyasn1_get_san_general_names(cert)
|
||||
result = []
|
||||
|
||||
for gn in gns:
|
||||
if gn.getName() == 'dNSName':
|
||||
result.append(unicode(gn.getComponent()))
|
||||
|
||||
return result
|
||||
return crypto_x509.ObjectIdentifier(str(oid))
|
||||
|
||||
|
||||
def chunk(size, s):
|
||||
@@ -574,23 +718,3 @@ def format_datetime(t):
|
||||
if t.tzinfo is None:
|
||||
t = t.replace(tzinfo=UTC())
|
||||
return unicode(t.strftime("%a %b %d %H:%M:%S %Y %Z"))
|
||||
|
||||
|
||||
def match_hostname(cert, hostname):
|
||||
match_cert = {}
|
||||
|
||||
match_cert['subject'] = match_subject = []
|
||||
for rdn in cert.subject.rdns:
|
||||
match_rdn = []
|
||||
for ava in rdn:
|
||||
if ava.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
|
||||
match_rdn.append(('commonName', ava.value))
|
||||
match_subject.append(match_rdn)
|
||||
|
||||
values = get_san_a_label_dns_names(cert)
|
||||
if values:
|
||||
match_cert['subjectAltName'] = match_san = []
|
||||
for value in values:
|
||||
match_san.append(('DNS', value))
|
||||
|
||||
ssl.match_hostname(match_cert, DNSName(hostname).ToASCII())
|
||||
|
||||
Reference in New Issue
Block a user