Support requests with SAN in cert-request.

For each SAN in a request there must be a matching service entry writable by
the requestor. Users can request certificates with SAN only if they have
"Request Certificate With SubjectAltName" permission.

https://fedorahosted.org/freeipa/ticket/3977

Reviewed-By: Martin Kosek <mkosek@redhat.com>
This commit is contained in:
Jan Cholasta 2014-06-18 09:02:03 +02:00 committed by Martin Kosek
parent e675e427c7
commit d6fb110b77
4 changed files with 193 additions and 43 deletions

View File

@ -299,7 +299,7 @@ Requires: gnupg
Requires: iproute Requires: iproute
Requires: keyutils Requires: keyutils
Requires: pyOpenSSL Requires: pyOpenSSL
Requires: python-nss Requires: python-nss >= 0.15
Requires: python-lxml Requires: python-lxml
Requires: python-netaddr Requires: python-netaddr
Requires: libipa_hbac-python Requires: libipa_hbac-python

View File

@ -308,6 +308,21 @@ default:objectClass: top
default:objectClass: nsContainer default:objectClass: nsContainer
default:cn: certificate remove hold default:cn: certificate remove hold
dn: cn=request certificate with subjectaltname,cn=virtual operations,cn=etc,$SUFFIX
default:objectClass: top
default:objectClass: nsContainer
default:cn: request certificate with subjectaltname
dn: cn=Request Certificate with SubjectAltName,cn=permissions,cn=pbac,$SUFFIX
default:objectClass: top
default:objectClass: groupofnames
default:objectClass: ipapermission
default:cn: Request Certificate with SubjectAltName
default:member: cn=Certificate Administrators,cn=privileges,cn=pbac,$SUFFIX
dn: $SUFFIX
add:aci:'(targetattr = "objectclass")(target = "ldap:///cn=request certificate with subjectaltname,cn=virtual operations,cn=etc,$SUFFIX" )(version 3.0; acl "permission:Request Certificate with SubjectAltName"; allow (write) groupdn = "ldap:///cn=Request Certificate with SubjectAltName,cn=permissions,cn=pbac,$SUFFIX";)'
# Read privileges # Read privileges
dn: cn=RBAC Readers,cn=privileges,cn=pbac,$SUFFIX dn: cn=RBAC Readers,cn=privileges,cn=pbac,$SUFFIX

View File

@ -21,7 +21,7 @@ import os
import sys import sys
import base64 import base64
import nss.nss as nss import nss.nss as nss
from pyasn1.type import univ, namedtype, tag from pyasn1.type import univ, char, namedtype, tag
from pyasn1.codec.der import decoder from pyasn1.codec.der import decoder
from ipapython import ipautil from ipapython import ipautil
from ipalib import api from ipalib import api
@ -29,6 +29,10 @@ from ipalib import api
PEM = 0 PEM = 0
DER = 1 DER = 1
SAN_DNSNAME = 'DNS name'
SAN_OTHERNAME_UPN = 'Other Name (OID.1.3.6.1.4.1.311.20.2.3)'
SAN_OTHERNAME_KRB5PRINCIPALNAME = 'Other Name (OID.1.3.6.1.5.2.2)'
def get_subject(csr, datatype=PEM): def get_subject(csr, datatype=PEM):
""" """
Given a CSR return the subject value. Given a CSR return the subject value.
@ -41,6 +45,89 @@ def get_subject(csr, datatype=PEM):
finally: finally:
del request del request
def get_extensions(csr, datatype=PEM):
"""
Given a CSR return OIDs of certificate extensions.
The return value is a tuple of strings
"""
request = load_certificate_request(csr, datatype)
return tuple(nss.oid_dotted_decimal(ext.oid_tag)[4:]
for ext in request.extensions)
class _PrincipalName(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('name-type', univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))
),
namedtype.NamedType('name-string', univ.SequenceOf(char.GeneralString()).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))
),
)
class _KRB5PrincipalName(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('realm', char.GeneralString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))
),
namedtype.NamedType('principalName', _PrincipalName().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))
),
)
def _decode_krb5principalname(data):
principal = decoder.decode(data, asn1Spec=_KRB5PrincipalName())[0]
realm = (str(principal['realm']).replace('\\', '\\\\')
.replace('@', '\\@'))
name = principal['principalName']['name-string']
name = '/'.join(str(n).replace('\\', '\\\\')
.replace('/', '\\/')
.replace('@', '\\@') for n in name)
name = '%s@%s' % (name, realm)
return name
class _AnotherName(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('type-id', univ.ObjectIdentifier()),
namedtype.NamedType('value', univ.Any().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))
),
)
class _GeneralName(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('otherName', _AnotherName().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))
),
namedtype.NamedType('rfc822Name', char.IA5String().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))
),
namedtype.NamedType('dNSName', char.IA5String().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
),
namedtype.NamedType('x400Address', univ.Sequence().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))
),
namedtype.NamedType('directoryName', univ.Choice().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4))
),
namedtype.NamedType('ediPartyName', univ.Sequence().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 5))
),
namedtype.NamedType('uniformResourceIdentifier', char.IA5String().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6))
),
namedtype.NamedType('iPAddress', univ.OctetString().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 7))
),
namedtype.NamedType('registeredID', univ.ObjectIdentifier().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 8))
),
)
class _SubjectAltName(univ.SequenceOf):
componentType = _GeneralName()
def get_subjectaltname(csr, datatype=PEM): def get_subjectaltname(csr, datatype=PEM):
""" """
Given a CSR return the subjectaltname value, if any. Given a CSR return the subjectaltname value, if any.
@ -48,13 +135,26 @@ def get_subjectaltname(csr, datatype=PEM):
The return value is a tuple of strings or None The return value is a tuple of strings or None
""" """
request = load_certificate_request(csr, datatype) request = load_certificate_request(csr, datatype)
try:
for extension in request.extensions: for extension in request.extensions:
if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
return nss.x509_alt_name(extension.value) break
finally: else:
del request
return None return None
del request
nss_names = nss.x509_alt_name(extension.value, nss.AsObject)
asn1_names = decoder.decode(extension.value.data,
asn1Spec=_SubjectAltName())[0]
names = []
for nss_name, asn1_name in zip(nss_names, asn1_names):
name_type = nss_name.type_string
if name_type == SAN_OTHERNAME_KRB5PRINCIPALNAME:
name = _decode_krb5principalname(asn1_name['otherName']['value'])
else:
name = nss_name.name
names.append((name_type, name))
return tuple(names)
# Unfortunately, NSS can only parse the extension request attribute, so # Unfortunately, NSS can only parse the extension request attribute, so
# we have to parse friendly name ourselves (see RFC 2986) # we have to parse friendly name ourselves (see RFC 2986)

View File

@ -42,6 +42,7 @@ from ipalib import output
from ipalib.plugins.service import validate_principal from ipalib.plugins.service import validate_principal
import nss.nss as nss import nss.nss as nss
from nss.error import NSPRError from nss.error import NSPRError
from pyasn1.error import PyAsn1Error
__doc__ = _(""" __doc__ = _("""
IPA certificate operations IPA certificate operations
@ -136,17 +137,6 @@ def validate_pkidate(ugettext, value):
return None return None
def get_csr_hostname(csr):
"""
Return the value of CN in the subject of the request or None
"""
try:
subject = pkcs10.get_subject(csr)
return subject.common_name #pylint: disable=E1101
except NSPRError, nsprerr:
raise errors.CertificateOperationError(
error=_('Failure decoding Certificate Signing Request: %s') % nsprerr)
def validate_csr(ugettext, csr): def validate_csr(ugettext, csr):
""" """
Ensure the CSR is base64-encoded and can be decoded by our PKCS#10 Ensure the CSR is base64-encoded and can be decoded by our PKCS#10
@ -290,6 +280,14 @@ class cert_request(VirtualCommand):
), ),
) )
_allowed_extensions = {
'2.5.29.14': None, # Subject Key Identifier
'2.5.29.15': None, # Key Usage
'2.5.29.17': 'request certificate with subjectaltname',
'2.5.29.19': None, # Basic Constraints
'2.5.29.37': None, # Extended Key Usage
}
def execute(self, csr, **kw): def execute(self, csr, **kw):
ldap = self.api.Backend.ldap2 ldap = self.api.Backend.ldap2
principal = kw.get('principal') principal = kw.get('principal')
@ -313,10 +311,22 @@ class cert_request(VirtualCommand):
if not bind_principal.startswith('host/'): if not bind_principal.startswith('host/'):
self.check_access() self.check_access()
# FIXME: add support for subject alt name try:
subject = pkcs10.get_subject(csr)
extensions = pkcs10.get_extensions(csr)
subjectaltname = pkcs10.get_subjectaltname(csr) or ()
except (NSPRError, PyAsn1Error), e:
raise errors.CertificateOperationError(
error=_("Failure decoding Certificate Signing Request: %s") % e)
if not bind_principal.startswith('host/'):
for ext in extensions:
operation = self._allowed_extensions.get(ext)
if operation:
self.check_access(operation)
# Ensure that the hostname in the CSR matches the principal # Ensure that the hostname in the CSR matches the principal
subject_host = get_csr_hostname(csr) subject_host = subject.common_name #pylint: disable=E1101
if not subject_host: if not subject_host:
raise errors.ValidationError(name='csr', raise errors.ValidationError(name='csr',
error=_("No hostname was found in subject of request.")) error=_("No hostname was found in subject of request."))
@ -328,28 +338,40 @@ class cert_request(VirtualCommand):
"does not match principal hostname '%(hostname)s'") % dict( "does not match principal hostname '%(hostname)s'") % dict(
subject_host=subject_host, hostname=hostname)) subject_host=subject_host, hostname=hostname))
for ext in extensions:
if ext not in self._allowed_extensions:
raise errors.ValidationError(
name='csr', error=_("extension %s is forbidden") % ext)
for name_type, name in subjectaltname:
if name_type not in (pkcs10.SAN_DNSNAME,
pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME,
pkcs10.SAN_OTHERNAME_UPN):
raise errors.ValidationError(
name='csr',
error=_("subject alt name type %s is forbidden") %
name_type)
dn = None dn = None
service = None service = None
# See if the service exists and punt if it doesn't and we aren't # See if the service exists and punt if it doesn't and we aren't
# going to add it # going to add it
try: try:
if not principal.startswith('host/'): if servicename != 'host':
service = api.Command['service_show'](principal, all=True)['result'] service = api.Command['service_show'](principal, all=True)
dn = service['dn']
else: else:
hostname = get_host_from_principal(principal) service = api.Command['host_show'](hostname, all=True)
service = api.Command['host_show'](hostname, all=True)['result']
dn = service['dn']
except errors.NotFound, e: except errors.NotFound, e:
if not add: if not add:
raise errors.NotFound(reason=_("The service principal for " raise errors.NotFound(reason=_("The service principal for "
"this request doesn't exist.")) "this request doesn't exist."))
try: try:
service = api.Command['service_add'](principal, **{'force': True})['result'] service = api.Command['service_add'](principal, force=True)
dn = service['dn']
except errors.ACIError: except errors.ACIError:
raise errors.ACIError(info=_('You need to be a member of ' raise errors.ACIError(info=_('You need to be a member of '
'the serviceadmin role to add services')) 'the serviceadmin role to add services'))
service = service['result']
dn = service['dn']
# We got this far so the service entry exists, can we write it? # We got this far so the service entry exists, can we write it?
if not ldap.can_write(dn, "usercertificate"): if not ldap.can_write(dn, "usercertificate"):
@ -357,25 +379,38 @@ class cert_request(VirtualCommand):
"to the 'userCertificate' attribute of entry '%s'.") % dn) "to the 'userCertificate' attribute of entry '%s'.") % dn)
# Validate the subject alt name, if any # Validate the subject alt name, if any
subjectaltname = pkcs10.get_subjectaltname(csr) for name_type, name in subjectaltname:
if subjectaltname is not None: if name_type == pkcs10.SAN_DNSNAME:
for name in subjectaltname:
name = unicode(name) name = unicode(name)
try: try:
hostentry = api.Command['host_show'](name, all=True)['result'] if servicename == 'host':
hostdn = hostentry['dn'] altservice = api.Command['host_show'](name, all=True)
else:
altprincipal = '%s/%s@%s' % (servicename, name, realm)
altservice = api.Command['service_show'](
altprincipal, all=True)
except errors.NotFound: except errors.NotFound:
# We don't want to issue any certificates referencing # We don't want to issue any certificates referencing
# machines we don't know about. Nothing is stored in this # machines we don't know about. Nothing is stored in this
# host record related to this certificate. # host record related to this certificate.
raise errors.NotFound(reason=_('no host record for ' raise errors.NotFound(reason=_('The service principal for '
'subject alt name %s in certificate request') % name) 'subject alt name %s in certificate request does not '
authprincipal = getattr(context, 'principal') 'exist') % name)
if authprincipal.startswith("host/"): altdn = altservice['result']['dn']
if not hostdn in service.get('managedby_host', []): if not ldap.can_write(altdn, "usercertificate"):
raise errors.ACIError(info=_( raise errors.ACIError(info=_(
"Insufficient privilege to create a certificate " "Insufficient privilege to create a certificate with "
"with subject alt name '%s'.") % name) "subject alt name '%s'.") % name)
elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME,
pkcs10.SAN_OTHERNAME_UPN):
if name != principal:
raise errors.ACIError(
info=_("Principal '%s' in subject alt name does not "
"match requested service principal") % name)
else:
raise errors.ACIError(
info=_("Subject alt name type %s is forbidden") %
name_type)
if 'usercertificate' in service: if 'usercertificate' in service:
serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER) serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER)