mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
x509: use python-cryptography to process certs
Update x509.load_certificate and related functions to return python-cryptography ``Certificate`` objects. Update the call sites accordingly, including removal of NSS initialisation code. Also update GeneralName parsing code to return python-cryptography GeneralName values, for consistency with other code that processes GeneralNames. The new function, `get_san_general_names`, and associated helper functions, can be removed when python-cryptography provides a way to deal with unrecognised critical extensions. Part of: https://fedorahosted.org/freeipa/ticket/6398 Reviewed-By: Jan Cholasta <jcholast@redhat.com> Reviewed-By: Florence Blanc-Renaud <frenaud@redhat.com>
This commit is contained in:
committed by
David Kupka
parent
c57dc890b2
commit
db116f73fe
329
ipalib/x509.py
329
ipalib/x509.py
@@ -28,31 +28,27 @@
|
||||
#
|
||||
# cert: the certificate is a PEM-encoded certificate
|
||||
# dercert: the certificate is DER-encoded
|
||||
# nsscert: the certificate is an NSS Certificate object
|
||||
# rawcert: the cert is in an unknown format
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import binascii
|
||||
import collections
|
||||
import os
|
||||
import datetime
|
||||
import ipaddress
|
||||
import sys
|
||||
import base64
|
||||
import re
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import cryptography.x509
|
||||
import nss.nss as nss
|
||||
from nss.error import NSPRError
|
||||
from pyasn1.type import univ, char, namedtype, tag
|
||||
from pyasn1.codec.der import decoder, encoder
|
||||
from pyasn1_modules import rfc2459
|
||||
import six
|
||||
|
||||
from ipalib import api
|
||||
from ipalib import _
|
||||
from ipalib import util
|
||||
from ipalib import errors
|
||||
from ipaplatform.paths import paths
|
||||
from ipapython.dn import DN
|
||||
|
||||
if six.PY3:
|
||||
@@ -95,32 +91,16 @@ def strip_header(pem):
|
||||
|
||||
return pem
|
||||
|
||||
def initialize_nss_database(dbdir=None):
|
||||
"""
|
||||
Initializes NSS database, if not initialized yet. Uses a proper database
|
||||
directory (.ipa/alias or HTTPD_ALIAS_DIR), depending on the value of
|
||||
api.env.in_tree.
|
||||
"""
|
||||
|
||||
if not nss.nss_is_initialized():
|
||||
if dbdir is None:
|
||||
if 'in_tree' in api.env:
|
||||
if api.env.in_tree:
|
||||
dbdir = api.env.dot_ipa + os.sep + 'alias'
|
||||
else:
|
||||
dbdir = paths.HTTPD_ALIAS_DIR
|
||||
nss.nss_init(dbdir)
|
||||
else:
|
||||
nss.nss_init_nodb()
|
||||
else:
|
||||
nss.nss_init(dbdir)
|
||||
|
||||
def load_certificate(data, datatype=PEM, dbdir=None):
|
||||
def load_certificate(data, datatype=PEM):
|
||||
"""
|
||||
Given a base64-encoded certificate, with or without the
|
||||
header/footer, return a request object.
|
||||
Load an X.509 certificate.
|
||||
|
||||
:param datatype: PEM for base64-encoded data (with or without header),
|
||||
or DER
|
||||
:return: a python-cryptography ``CertificateSigningRequest`` object.
|
||||
:raises: ``ValueError`` if unable to load the certificate.
|
||||
|
||||
Returns a nss.Certificate type
|
||||
"""
|
||||
if type(data) in (tuple, list):
|
||||
data = data[0]
|
||||
@@ -129,82 +109,50 @@ def load_certificate(data, datatype=PEM, dbdir=None):
|
||||
data = strip_header(data)
|
||||
data = base64.b64decode(data)
|
||||
|
||||
initialize_nss_database(dbdir=dbdir)
|
||||
return cryptography.x509.load_der_x509_certificate(data, default_backend())
|
||||
|
||||
if six.PY2:
|
||||
return nss.Certificate(buffer(data)) # pylint: disable=buffer-builtin
|
||||
else:
|
||||
# In python 3 , `bytes` has the buffer interface
|
||||
return nss.Certificate(data)
|
||||
|
||||
def load_certificate_from_file(filename, dbdir=None):
|
||||
"""
|
||||
Load a certificate from a PEM file.
|
||||
|
||||
Returns a nss.Certificate type
|
||||
Returns a python-cryptography ``Certificate`` object.
|
||||
|
||||
"""
|
||||
fd = open(filename, 'r')
|
||||
data = fd.read()
|
||||
fd.close()
|
||||
with open(filename, mode='rb') as f:
|
||||
return load_certificate(f.read(), PEM)
|
||||
|
||||
return load_certificate(data, PEM, dbdir)
|
||||
|
||||
def load_certificate_list(data, dbdir=None):
|
||||
def load_certificate_list(data):
|
||||
"""
|
||||
Load a certificate list from a sequence of concatenated PEMs.
|
||||
|
||||
Return a list of python-cryptography ``Certificate`` objects.
|
||||
|
||||
"""
|
||||
certs = PEM_REGEX.findall(data)
|
||||
certs = [load_certificate(cert, PEM, dbdir) for cert in certs]
|
||||
certs = [load_certificate(cert, PEM) for cert in certs]
|
||||
return certs
|
||||
|
||||
def load_certificate_list_from_file(filename, dbdir=None):
|
||||
|
||||
def load_certificate_list_from_file(filename):
|
||||
"""
|
||||
Load a certificate list from a PEM file.
|
||||
|
||||
Returns a list of nss.Certificate objects.
|
||||
"""
|
||||
fd = open(filename, 'r')
|
||||
data = fd.read()
|
||||
fd.close()
|
||||
Return a list of python-cryptography ``Certificate`` objects.
|
||||
|
||||
return load_certificate_list(data, dbdir)
|
||||
|
||||
def get_subject(certificate, datatype=PEM, dbdir=None):
|
||||
"""
|
||||
Load an X509.3 certificate and get the subject.
|
||||
"""
|
||||
with open(filename) as f:
|
||||
return load_certificate_list(f.read())
|
||||
|
||||
nsscert = load_certificate(certificate, datatype, dbdir)
|
||||
subject = nsscert.subject
|
||||
del(nsscert)
|
||||
return subject
|
||||
|
||||
def get_issuer(certificate, datatype=PEM, dbdir=None):
|
||||
"""
|
||||
Load an X509.3 certificate and get the issuer.
|
||||
"""
|
||||
|
||||
nsscert = load_certificate(certificate, datatype, dbdir)
|
||||
issuer = nsscert.issuer
|
||||
del(nsscert)
|
||||
return issuer
|
||||
|
||||
def get_serial_number(certificate, datatype=PEM, dbdir=None):
|
||||
"""
|
||||
Return the decimal value of the serial number.
|
||||
"""
|
||||
nsscert = load_certificate(certificate, datatype, dbdir)
|
||||
serial_number = nsscert.serial_number
|
||||
del(nsscert)
|
||||
return serial_number
|
||||
|
||||
def is_self_signed(certificate, datatype=PEM, dbdir=None):
|
||||
nsscert = load_certificate(certificate, datatype, dbdir)
|
||||
self_signed = (nsscert.issuer == nsscert.subject)
|
||||
del nsscert
|
||||
return self_signed
|
||||
def is_self_signed(certificate, datatype=PEM):
|
||||
cert = load_certificate(certificate, datatype)
|
||||
return cert.issuer == cert.subject
|
||||
|
||||
|
||||
def _get_der_field(cert, datatype, dbdir, field):
|
||||
cert = load_certificate(cert, datatype, dbdir)
|
||||
cert = cert.der_data
|
||||
cert = normalize_certificate(cert)
|
||||
cert = decoder.decode(cert, rfc2459.Certificate())[0]
|
||||
field = cert['tbsCertificate'][field]
|
||||
field = encoder.encode(field)
|
||||
@@ -222,20 +170,17 @@ def get_der_serial_number(cert, datatype=PEM, dbdir=None):
|
||||
def get_der_public_key_info(cert, datatype=PEM, dbdir=None):
|
||||
return _get_der_field(cert, datatype, dbdir, 'subjectPublicKeyInfo')
|
||||
|
||||
def get_ext_key_usage(certificate, datatype=PEM, dbdir=None):
|
||||
nsscert = load_certificate(certificate, datatype, dbdir)
|
||||
if not nsscert.extensions:
|
||||
|
||||
def get_ext_key_usage(certificate, datatype=PEM):
|
||||
cert = load_certificate(certificate, datatype)
|
||||
try:
|
||||
eku = cert.extensions.get_extension_for_oid(
|
||||
cryptography.x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value
|
||||
except cryptography.x509.ExtensionNotFound:
|
||||
return None
|
||||
|
||||
for ext in nsscert.extensions:
|
||||
if ext.oid_tag == nss.SEC_OID_X509_EXT_KEY_USAGE:
|
||||
break
|
||||
else:
|
||||
return None
|
||||
return set(oid.dotted_string for oid in eku)
|
||||
|
||||
eku = nss.x509_ext_key_usage(ext.value, nss.AsDottedDecimal)
|
||||
eku = set(o[4:] for o in eku)
|
||||
return eku
|
||||
|
||||
def make_pem(data):
|
||||
"""
|
||||
@@ -270,27 +215,21 @@ def normalize_certificate(rawcert):
|
||||
else:
|
||||
dercert = rawcert
|
||||
|
||||
# At this point we should have a certificate, either because the data
|
||||
# was base64-encoded and now its not or it came in as DER format.
|
||||
# Let's decode it and see. Fetching the serial number will pass the
|
||||
# certificate through the NSS DER parser.
|
||||
# At this point we should have a DER certificate.
|
||||
# Attempt to decode it.
|
||||
validate_certificate(dercert, datatype=DER)
|
||||
|
||||
return dercert
|
||||
|
||||
|
||||
def validate_certificate(cert, datatype=PEM, dbdir=None):
|
||||
def validate_certificate(cert, datatype=PEM):
|
||||
"""
|
||||
Perform certificate validation by trying to load it into NSS database
|
||||
Perform cert validation by trying to load it via python-cryptography.
|
||||
"""
|
||||
try:
|
||||
load_certificate(cert, datatype=datatype, dbdir=dbdir)
|
||||
except NSPRError as nsprerr:
|
||||
if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
|
||||
raise errors.CertificateFormatError(
|
||||
error=_('improperly formatted DER-encoded certificate'))
|
||||
else:
|
||||
raise errors.CertificateFormatError(error=str(nsprerr))
|
||||
load_certificate(cert, datatype=datatype)
|
||||
except ValueError as e:
|
||||
raise errors.CertificateFormatError(error=str(e))
|
||||
|
||||
|
||||
def write_certificate(rawcert, filename):
|
||||
@@ -379,56 +318,6 @@ def _decode_krb5principalname(data):
|
||||
return name
|
||||
|
||||
|
||||
GeneralNameInfo = collections.namedtuple(
|
||||
'GeneralNameInfo', ('type', 'desc', 'value', 'der_value'))
|
||||
|
||||
|
||||
def decode_generalnames(secitem):
|
||||
"""
|
||||
Decode a GeneralNames object (this the data for the Subject
|
||||
Alt Name and Issuer Alt Name extensions, among others).
|
||||
|
||||
``secitem``
|
||||
The input is the DER-encoded extension data, without the
|
||||
OCTET STRING header, as an nss SecItem object.
|
||||
|
||||
Return a list of ``GeneralNameInfo`` namedtuples. The
|
||||
``der_value`` field is set for otherNames, otherwise it is
|
||||
``None``.
|
||||
|
||||
"""
|
||||
nss_names = nss.x509_alt_name(secitem, repr_kind=nss.AsObject)
|
||||
asn1_names = decoder.decode(
|
||||
secitem.data, asn1Spec=rfc2459.SubjectAltName())[0]
|
||||
names = []
|
||||
for nss_name, asn1_name in zip(nss_names, asn1_names):
|
||||
# NOTE: we use the NSS enum to identify the name type.
|
||||
# (For otherName we also tuple it up with the type-id OID).
|
||||
# The enum does not correspond exactly to the ASN.1 tags.
|
||||
# If we ever want to switch to using the true tag numbers,
|
||||
# the expression to get the tag is:
|
||||
#
|
||||
# asn1_name.getComponent().getTagSet()[0].asTuple()[2]
|
||||
#
|
||||
if nss_name.type_enum == nss.certOtherName:
|
||||
oid = str(asn1_name['otherName']['type-id'])
|
||||
nametype = (nss_name.type_enum, oid)
|
||||
der_value = asn1_name['otherName']['value'].asOctets()
|
||||
else:
|
||||
nametype = nss_name.type_enum
|
||||
der_value = None
|
||||
|
||||
if nametype == (nss.certOtherName, SAN_KRB5PRINCIPALNAME):
|
||||
name = _decode_krb5principalname(asn1_name['otherName']['value'])
|
||||
else:
|
||||
name = nss_name.name
|
||||
|
||||
gni = GeneralNameInfo(nametype, nss_name.type_string, name, der_value)
|
||||
names.append(gni)
|
||||
|
||||
return names
|
||||
|
||||
|
||||
class KRB5PrincipalName(cryptography.x509.general_name.OtherName):
|
||||
def __init__(self, type_id, value):
|
||||
super(KRB5PrincipalName, self).__init__(type_id, value)
|
||||
@@ -464,6 +353,100 @@ def process_othernames(gns):
|
||||
yield gn
|
||||
|
||||
|
||||
def get_san_general_names(cert):
|
||||
"""
|
||||
Return SAN general names from a python-cryptography
|
||||
certificate object. If the SAN extension is not present,
|
||||
return an empty sequence.
|
||||
|
||||
Because python-cryptography does not yet provide a way to
|
||||
handle unrecognised critical extensions (which may occur),
|
||||
we must parse the certificate and extract the General Names.
|
||||
For uniformity with other code, we manually construct values
|
||||
of python-crytography GeneralName subtypes.
|
||||
|
||||
python-cryptography does not yet provide types for
|
||||
ediPartyName or x400Address, so we drop these name types.
|
||||
|
||||
otherNames are NOT instantiated to more specific types where
|
||||
the type is known. Use ``process_othernames`` to do that.
|
||||
|
||||
When python-cryptography can handle certs with unrecognised
|
||||
critical extensions and implements ediPartyName and
|
||||
x400Address, this function (and helpers) will be redundant
|
||||
and should go away.
|
||||
|
||||
"""
|
||||
tbs = decoder.decode(
|
||||
cert.tbs_certificate_bytes,
|
||||
asn1Spec=rfc2459.TBSCertificate()
|
||||
)[0]
|
||||
OID_SAN = univ.ObjectIdentifier('2.5.29.17')
|
||||
gns = []
|
||||
for ext in tbs['extensions']:
|
||||
if ext['extnID'] == OID_SAN:
|
||||
der = decoder.decode(
|
||||
ext['extnValue'], asn1Spec=univ.OctetString())[0]
|
||||
gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0]
|
||||
break
|
||||
|
||||
GENERAL_NAME_CONSTRUCTORS = {
|
||||
'rfc822Name': lambda x: cryptography.x509.RFC822Name(unicode(x)),
|
||||
'dNSName': lambda x: cryptography.x509.DNSName(unicode(x)),
|
||||
'directoryName': _pyasn1_to_cryptography_directoryname,
|
||||
'registeredID': _pyasn1_to_cryptography_registeredid,
|
||||
'iPAddress': _pyasn1_to_cryptography_ipaddress,
|
||||
'uniformResourceIdentifier':
|
||||
lambda x: cryptography.x509.UniformResourceIdentifier(unicode(x)),
|
||||
'otherName': _pyasn1_to_cryptography_othername,
|
||||
}
|
||||
|
||||
result = []
|
||||
|
||||
for gn in gns:
|
||||
gn_type = gn.getName()
|
||||
if gn_type in GENERAL_NAME_CONSTRUCTORS:
|
||||
result.append(
|
||||
GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent()))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_directoryname(dn):
|
||||
attrs = []
|
||||
|
||||
# Name is CHOICE { RDNSequence } (only one possibility)
|
||||
for rdn in dn.getComponent():
|
||||
for ava in rdn:
|
||||
attr = cryptography.x509.NameAttribute(
|
||||
_pyasn1_to_cryptography_oid(ava['type']),
|
||||
unicode(decoder.decode(ava['value'])[0])
|
||||
)
|
||||
attrs.append(attr)
|
||||
|
||||
return cryptography.x509.DirectoryName(cryptography.x509.Name(attrs))
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_registeredid(oid):
|
||||
return cryptography.x509.RegisteredID(_pyasn1_to_cryptography_oid(oid))
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_ipaddress(octet_string):
|
||||
return cryptography.x509.IPAddress(
|
||||
ipaddress.ip_address(bytes(octet_string)))
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_othername(on):
|
||||
return cryptography.x509.OtherName(
|
||||
_pyasn1_to_cryptography_oid(on['type-id']),
|
||||
bytes(on['value'])
|
||||
)
|
||||
|
||||
|
||||
def _pyasn1_to_cryptography_oid(oid):
|
||||
return cryptography.x509.ObjectIdentifier(str(oid))
|
||||
|
||||
|
||||
def chunk(size, s):
|
||||
"""Yield chunks of the specified size from the given string.
|
||||
|
||||
@@ -486,20 +469,34 @@ def to_hex_with_colons(bs):
|
||||
return add_colons(binascii.hexlify(bs).decode('utf-8'))
|
||||
|
||||
|
||||
class UTC(datetime.tzinfo):
|
||||
ZERO = datetime.timedelta(0)
|
||||
|
||||
def tzname(self, dt):
|
||||
return "UTC"
|
||||
|
||||
def utcoffset(self, dt):
|
||||
return self.ZERO
|
||||
|
||||
def dst(self, dt):
|
||||
return self.ZERO
|
||||
|
||||
|
||||
def format_datetime(t):
|
||||
if t.tzinfo is None:
|
||||
t = t.replace(tzinfo=UTC())
|
||||
return unicode(t.strftime("%a %b %d %H:%M:%S %Y %Z"))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# this can be run with:
|
||||
# python ipalib/x509.py < /etc/ipa/ca.crt
|
||||
|
||||
api.bootstrap()
|
||||
api.finalize()
|
||||
|
||||
nss.nss_init_nodb()
|
||||
|
||||
# Read PEM certs from stdin and print out its components
|
||||
# Read PEM cert from stdin and print out its components
|
||||
|
||||
certlines = sys.stdin.readlines()
|
||||
cert = ''.join(certlines)
|
||||
|
||||
nsscert = load_certificate(cert)
|
||||
cert = load_certificate(cert)
|
||||
|
||||
print(nsscert)
|
||||
print(cert)
|
||||
|
||||
Reference in New Issue
Block a user