mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Update cert-request to support user certs and profiles
Part of: https://fedorahosted.org/freeipa/ticket/57 Part of: https://fedorahosted.org/freeipa/ticket/4938 Reviewed-By: Martin Basti <mbasti@redhat.com>
This commit is contained in:
committed by
Jan Cholasta
parent
979947f7f2
commit
a931d3edc0
@@ -30,6 +30,7 @@ PEM = 0
|
||||
DER = 1
|
||||
|
||||
SAN_DNSNAME = 'DNS name'
|
||||
SAN_RFC822NAME = 'RFC822 Name'
|
||||
SAN_OTHERNAME_UPN = 'Other Name (OID.1.3.6.1.4.1.311.20.2.3)'
|
||||
SAN_OTHERNAME_KRB5PRINCIPALNAME = 'Other Name (OID.1.3.6.1.5.2.2)'
|
||||
|
||||
|
||||
@@ -31,7 +31,8 @@ from ipalib import ngettext
|
||||
from ipalib.plugable import Registry
|
||||
from ipalib.plugins.virtual import *
|
||||
from ipalib.plugins.baseldap import pkey_to_value
|
||||
from ipalib.plugins.service import split_principal
|
||||
from ipalib.plugins.service import split_any_principal
|
||||
from ipalib.plugins.certprofile import validate_profile_id
|
||||
import base64
|
||||
import traceback
|
||||
from ipalib.text import _
|
||||
@@ -122,6 +123,8 @@ http://www.ietf.org/rfc/rfc5280.txt
|
||||
|
||||
""")
|
||||
|
||||
USER, HOST, SERVICE = range(3)
|
||||
|
||||
register = Registry()
|
||||
|
||||
def validate_pkidate(ugettext, value):
|
||||
@@ -232,7 +235,7 @@ class cert_request(VirtualCommand):
|
||||
takes_options = (
|
||||
Str('principal',
|
||||
label=_('Principal'),
|
||||
doc=_('Service principal for this certificate (e.g. HTTP/test.example.com)'),
|
||||
doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'),
|
||||
),
|
||||
Str('request_type',
|
||||
default=u'pkcs10',
|
||||
@@ -243,6 +246,10 @@ class cert_request(VirtualCommand):
|
||||
default=False,
|
||||
autofill=True
|
||||
),
|
||||
Str('profile_id?', validate_profile_id,
|
||||
label=_("Profile ID"),
|
||||
doc=_("Certificate Profile to use"),
|
||||
)
|
||||
)
|
||||
|
||||
has_output_params = (
|
||||
@@ -294,10 +301,9 @@ class cert_request(VirtualCommand):
|
||||
ca_enabled_check()
|
||||
|
||||
ldap = self.api.Backend.ldap2
|
||||
principal = kw.get('principal')
|
||||
add = kw.get('add')
|
||||
request_type = kw.get('request_type')
|
||||
service = None
|
||||
profile_id = kw.get('profile_id', self.Backend.ra.DEFAULT_PROFILE)
|
||||
|
||||
"""
|
||||
Access control is partially handled by the ACI titled
|
||||
@@ -310,9 +316,28 @@ class cert_request(VirtualCommand):
|
||||
taskgroup (directly or indirectly via role membership).
|
||||
"""
|
||||
|
||||
bind_principal = getattr(context, 'principal')
|
||||
# Can this user request certs?
|
||||
if not bind_principal.startswith('host/'):
|
||||
principal_string = kw.get('principal')
|
||||
principal = split_any_principal(principal_string)
|
||||
servicename, principal_name, realm = principal
|
||||
if servicename is None:
|
||||
principal_type = USER
|
||||
elif servicename == 'host':
|
||||
principal_type = HOST
|
||||
else:
|
||||
principal_type = SERVICE
|
||||
|
||||
bind_principal = split_any_principal(getattr(context, 'principal'))
|
||||
bind_service, bind_name, bind_realm = bind_principal
|
||||
|
||||
if bind_service is None:
|
||||
bind_principal_type = USER
|
||||
elif bind_service == 'host':
|
||||
bind_principal_type = HOST
|
||||
else:
|
||||
bind_principal_type = SERVICE
|
||||
|
||||
if bind_principal != principal and bind_principal_type != HOST:
|
||||
# Can the bound principal request certs for another principal?
|
||||
self.check_access()
|
||||
|
||||
try:
|
||||
@@ -323,57 +348,71 @@ class cert_request(VirtualCommand):
|
||||
raise errors.CertificateOperationError(
|
||||
error=_("Failure decoding Certificate Signing Request: %s") % e)
|
||||
|
||||
if not bind_principal.startswith('host/'):
|
||||
# host principals may bypass allowed ext check
|
||||
if bind_principal_type != HOST:
|
||||
for ext in extensions:
|
||||
operation = self._allowed_extensions.get(ext)
|
||||
if operation:
|
||||
self.check_access(operation)
|
||||
|
||||
# Ensure that the hostname in the CSR matches the principal
|
||||
subject_host = subject.common_name #pylint: disable=E1101
|
||||
if not subject_host:
|
||||
raise errors.ValidationError(name='csr',
|
||||
error=_("No hostname was found in subject of request."))
|
||||
dn = None
|
||||
principal_obj = None
|
||||
# See if the service exists and punt if it doesn't and we aren't
|
||||
# going to add it
|
||||
try:
|
||||
if principal_type == SERVICE:
|
||||
principal_obj = api.Command['service_show'](principal_string, all=True)
|
||||
elif principal_type == HOST:
|
||||
principal_obj = api.Command['host_show'](principal_name, all=True)
|
||||
elif principal_type == USER:
|
||||
principal_obj = api.Command['user_show'](principal_name, all=True)
|
||||
except errors.NotFound as e:
|
||||
if principal_type == SERVICE and add:
|
||||
principal_obj = api.Command['service_add'](principal_string, force=True)
|
||||
else:
|
||||
raise errors.NotFound(
|
||||
reason=_("The principal for this request doesn't exist."))
|
||||
principal_obj = principal_obj['result']
|
||||
dn = principal_obj['dn']
|
||||
|
||||
(servicename, hostname, realm) = split_principal(principal)
|
||||
if subject_host.lower() != hostname.lower():
|
||||
raise errors.ACIError(
|
||||
info=_("hostname in subject of request '%(subject_host)s' "
|
||||
"does not match principal hostname '%(hostname)s'") % dict(
|
||||
subject_host=subject_host, hostname=hostname))
|
||||
# Ensure that the DN in the CSR matches the principal
|
||||
cn = subject.common_name #pylint: disable=E1101
|
||||
if not cn:
|
||||
raise errors.ValidationError(name='csr',
|
||||
error=_("No Common Name was found in subject of request."))
|
||||
|
||||
if principal_type in (SERVICE, HOST):
|
||||
if cn.lower() != principal_name.lower():
|
||||
raise errors.ACIError(
|
||||
info=_("hostname in subject of request '%(cn)s' "
|
||||
"does not match principal hostname '%(hostname)s'")
|
||||
% dict(cn=cn, hostname=principal_name))
|
||||
elif principal_type == USER:
|
||||
# check user name
|
||||
if cn != principal_name:
|
||||
raise errors.ValidationError(
|
||||
name='csr',
|
||||
error=_(
|
||||
"DN commonName does not match "
|
||||
"any of user's email addresses")
|
||||
)
|
||||
|
||||
# check email address
|
||||
mail = subject.email_address #pylint: disable=E1101
|
||||
if mail is not None and mail not in principal_obj.get('mail', []):
|
||||
raise errors.ValidationError(
|
||||
name='csr',
|
||||
error=_(
|
||||
"DN emailAddress does not match "
|
||||
"any of user's email addresses")
|
||||
)
|
||||
|
||||
for ext in extensions:
|
||||
if ext not in self._allowed_extensions:
|
||||
raise errors.ValidationError(
|
||||
name='csr', error=_("extension %s is forbidden") % ext)
|
||||
|
||||
for name_type, name in subjectaltname:
|
||||
if name_type not in (pkcs10.SAN_DNSNAME,
|
||||
pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME,
|
||||
pkcs10.SAN_OTHERNAME_UPN):
|
||||
raise errors.ValidationError(
|
||||
name='csr',
|
||||
error=_("subject alt name type %s is forbidden") %
|
||||
name_type)
|
||||
|
||||
dn = None
|
||||
service = None
|
||||
# See if the service exists and punt if it doesn't and we aren't
|
||||
# going to add it
|
||||
try:
|
||||
if servicename != 'host':
|
||||
service = api.Command['service_show'](principal, all=True)
|
||||
else:
|
||||
service = api.Command['host_show'](hostname, all=True)
|
||||
except errors.NotFound, e:
|
||||
if not add:
|
||||
raise errors.NotFound(reason=_("The service principal for "
|
||||
"this request doesn't exist."))
|
||||
service = api.Command['service_add'](principal, force=True)
|
||||
service = service['result']
|
||||
dn = service['dn']
|
||||
|
||||
# We got this far so the service entry exists, can we write it?
|
||||
# We got this far so the principal entry exists, can we write it?
|
||||
if not ldap.can_write(dn, "usercertificate"):
|
||||
raise errors.ACIError(info=_("Insufficient 'write' privilege "
|
||||
"to the 'userCertificate' attribute of entry '%s'.") % dn)
|
||||
@@ -382,13 +421,20 @@ class cert_request(VirtualCommand):
|
||||
for name_type, name in subjectaltname:
|
||||
if name_type == pkcs10.SAN_DNSNAME:
|
||||
name = unicode(name)
|
||||
alt_principal_obj = None
|
||||
try:
|
||||
if servicename == 'host':
|
||||
altservice = api.Command['host_show'](name, all=True)
|
||||
else:
|
||||
if principal_type == HOST:
|
||||
alt_principal_obj = api.Command['host_show'](name, all=True)
|
||||
elif principal_type == SERVICE:
|
||||
altprincipal = '%s/%s@%s' % (servicename, name, realm)
|
||||
altservice = api.Command['service_show'](
|
||||
alt_principal_obj = api.Command['service_show'](
|
||||
altprincipal, all=True)
|
||||
elif principal_type == USER:
|
||||
raise errors.ValidationError(
|
||||
name='csr',
|
||||
error=_("subject alt name type %s is forbidden "
|
||||
"for user principals") % name_type
|
||||
)
|
||||
except errors.NotFound:
|
||||
# We don't want to issue any certificates referencing
|
||||
# machines we don't know about. Nothing is stored in this
|
||||
@@ -396,47 +442,41 @@ class cert_request(VirtualCommand):
|
||||
raise errors.NotFound(reason=_('The service principal for '
|
||||
'subject alt name %s in certificate request does not '
|
||||
'exist') % name)
|
||||
altdn = altservice['result']['dn']
|
||||
if not ldap.can_write(altdn, "usercertificate"):
|
||||
raise errors.ACIError(info=_(
|
||||
"Insufficient privilege to create a certificate with "
|
||||
"subject alt name '%s'.") % name)
|
||||
if alt_principal_obj is not None:
|
||||
altdn = alt_principal_obj['result']['dn']
|
||||
if not ldap.can_write(altdn, "usercertificate"):
|
||||
raise errors.ACIError(info=_(
|
||||
"Insufficient privilege to create a certificate "
|
||||
"with subject alt name '%s'.") % name)
|
||||
elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME,
|
||||
pkcs10.SAN_OTHERNAME_UPN):
|
||||
if name != principal:
|
||||
if name != principal_string:
|
||||
raise errors.ACIError(
|
||||
info=_("Principal '%s' in subject alt name does not "
|
||||
"match requested service principal") % name)
|
||||
"match requested principal") % name)
|
||||
elif name_type == pkcs10.SAN_RFC822NAME:
|
||||
if principal_type == USER:
|
||||
if name not in principal_obj.get('mail', []):
|
||||
raise errors.ValidationError(
|
||||
name='csr',
|
||||
error=_(
|
||||
"RFC822Name does not match "
|
||||
"any of user's email addresses")
|
||||
)
|
||||
else:
|
||||
raise errors.ValidationError(
|
||||
name='csr',
|
||||
error=_("subject alt name type %s is forbidden "
|
||||
"for non-user principals") % name_type
|
||||
)
|
||||
else:
|
||||
raise errors.ACIError(
|
||||
info=_("Subject alt name type %s is forbidden") %
|
||||
name_type)
|
||||
|
||||
if 'usercertificate' in service:
|
||||
serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER)
|
||||
# revoke the certificate and remove it from the service
|
||||
# entry before proceeding. First we retrieve the certificate to
|
||||
# see if it is already revoked, if not then we revoke it.
|
||||
try:
|
||||
result = api.Command['cert_show'](unicode(serial))['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement get
|
||||
pass
|
||||
if not principal.startswith('host/'):
|
||||
api.Command['service_mod'](principal, usercertificate=None)
|
||||
else:
|
||||
hostname = get_host_from_principal(principal)
|
||||
api.Command['host_mod'](hostname, usercertificate=None)
|
||||
|
||||
# Request the certificate
|
||||
result = self.Backend.ra.request_certificate(
|
||||
csr, 'caIPAserviceCert', request_type=request_type)
|
||||
csr, profile_id, request_type=request_type)
|
||||
cert = x509.load_certificate(result['certificate'])
|
||||
result['issuer'] = unicode(cert.issuer)
|
||||
result['valid_not_before'] = unicode(cert.valid_not_before_str)
|
||||
@@ -444,15 +484,19 @@ class cert_request(VirtualCommand):
|
||||
result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
|
||||
result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
|
||||
|
||||
# Success? Then add it to the service entry.
|
||||
if 'certificate' in result:
|
||||
if not principal.startswith('host/'):
|
||||
skw = {"usercertificate": str(result.get('certificate'))}
|
||||
api.Command['service_mod'](principal, **skw)
|
||||
else:
|
||||
hostname = get_host_from_principal(principal)
|
||||
skw = {"usercertificate": str(result.get('certificate'))}
|
||||
api.Command['host_mod'](hostname, **skw)
|
||||
# Success? Then add it to the principal's entry
|
||||
# (unless the profile tells us not to)
|
||||
profile = api.Command['certprofile_show'](profile_id)
|
||||
store = profile['result']['ipacertprofilestoreissued'][0] == 'TRUE'
|
||||
if store and 'certificate' in result:
|
||||
cert = str(result.get('certificate'))
|
||||
kwargs = dict(addattr=u'usercertificate={}'.format(cert))
|
||||
if principal_type == SERVICE:
|
||||
api.Command['service_mod'](principal_string, **kwargs)
|
||||
elif principal_type == HOST:
|
||||
api.Command['host_mod'](principal_name, **kwargs)
|
||||
elif principal_type == USER:
|
||||
api.Command['user_mod'](principal_name, **kwargs)
|
||||
|
||||
return dict(
|
||||
result=result
|
||||
|
||||
Reference in New Issue
Block a user