cert-request: simplify request processing

Currently the cert-request execution is complicated and cannot
handle aliases in the --principal argument.

Implement the following simplifications:

- Search all user/host/service accounts at once, by krbPrincipalName
  (error if no account found).  Use principal canonical name to
  determine the type of the principal.

- Update subject principals userCertificate attribute uniformly,
  instead of dispatching to user/host/service-mod based on type of
  principal.

Fixes: https://fedorahosted.org/freeipa/ticket/6531
Reviewed-By: Felipe Volpone <felipevolpone@gmail.com>
This commit is contained in:
Fraser Tweedale 2016-12-02 14:47:27 +10:00 committed by Tomas Krizek
parent 69d05b86b7
commit 227cf8d4e9
No known key found for this signature in database
GPG Key ID: 22A2A94B5E49415A

View File

@ -150,13 +150,6 @@ http://www.ietf.org/rfc/rfc5280.txt
USER, HOST, KRBTGT, SERVICE = range(4)
PRINCIPAL_TYPE_STRING_MAP = {
USER: _('user'),
HOST: _('host'),
KRBTGT: _('krbtgt'),
SERVICE: _('service'),
}
register = Registry()
PKIDATE_FORMAT = '%Y-%m-%d'
@ -176,11 +169,13 @@ def _acl_make_request(principal_type, principal, ca_id, profile_id):
req.user.name = unicode(principal)
groups = []
if principal_type == 'user':
user_obj = api.Command.user_show(principal.username)['result']
user_obj = api.Command.user_show(
six.text_type(principal.username))['result']
groups = user_obj.get('memberof_group', [])
groups += user_obj.get('memberofindirect_group', [])
elif principal_type == 'host':
host_obj = api.Command.host_show(principal.hostname)['result']
host_obj = api.Command.host_show(
six.text_type(principal.hostname))['result']
groups = host_obj.get('memberof_hostgroup', [])
groups += host_obj.get('memberofindirect_hostgroup', [])
req.user.groups = sorted(set(groups))
@ -681,17 +676,33 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
Binding with a user principal one needs to be in the request_certs
taskgroup (directly or indirectly via role membership).
"""
principal_arg = kw.get('principal')
principal = kw.get('principal')
principal_string = unicode(principal)
principal_type = principal_to_principal_type(principal)
if principal_to_principal_type(principal_arg) == KRBTGT:
principal_obj = None
principal = principal_arg
if principal_type == KRBTGT:
# Allow krbtgt to use only the KDC certprofile
if profile_id != self.Backend.ra.KDC_PROFILE:
raise errors.ACIError(
info=_("krbtgt certs can use only the %s profile") % (
self.Backend.ra.KDC_PROFILE))
# Allow only our own realm krbtgt for now; no trusted realms.
if principal != kerberos.Principal((u'krbtgt', realm),
realm=realm):
raise errors.NotFound("Not our realm's krbtgt")
else:
principal_obj = self.lookup_or_add_principal(principal_arg, add)
if 'krbcanonicalname' in principal_obj:
principal = principal_obj['krbcanonicalname'][0]
else:
principal = principal_obj['krbprincipalname'][0]
principal_string = unicode(principal)
principal_type = principal_to_principal_type(principal)
bind_principal = kerberos.Principal(getattr(context, 'principal'))
bind_principal_string = unicode(bind_principal)
bind_principal_type = principal_to_principal_type(bind_principal)
@ -725,41 +736,6 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
except cryptography.x509.extensions.ExtensionNotFound:
ext_san = None
dn = None
principal_obj = None
# See if the service exists and punt if it doesn't and we aren't
# going to add it
try:
if principal_type == SERVICE:
principal_obj = api.Command['service_show'](principal_string, all=True)
elif principal_type == KRBTGT:
# Allow only our own realm krbtgt for now, no trusted realm's.
if principal != kerberos.Principal((u'krbtgt', realm),
realm=realm):
raise errors.NotFound("Not our realm's krbtgt")
elif principal_type == HOST:
principal_obj = api.Command['host_show'](
principal.hostname, all=True)
elif principal_type == USER:
principal_obj = api.Command['user_show'](
principal.username, all=True)
except errors.NotFound as e:
if add:
if principal_type == SERVICE:
principal_obj = api.Command['service_add'](
principal_string, force=True)
else:
princtype_str = PRINCIPAL_TYPE_STRING_MAP[principal_type]
raise errors.OperationNotSupportedForPrincipalType(
operation=_("'add' option"),
principal_type=princtype_str)
else:
raise errors.NotFound(
reason=_("The principal for this request doesn't exist."))
if principal_obj:
principal_obj = principal_obj['result']
dn = principal_obj['dn']
# Ensure that the DN in the CSR matches the principal
#
# We only look at the "most specific" CN value
@ -810,6 +786,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
if principal_type != KRBTGT:
# We got this far so the principal entry exists, can we write it?
dn = principal_obj.dn
if not ldap.can_write(dn, "usercertificate"):
raise errors.ACIError(
info=_("Insufficient 'write' privilege to the "
@ -934,6 +911,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
if store and 'certificate' in result:
cert = str(result.get('certificate'))
kwargs = dict(addattr=u'usercertificate={}'.format(cert))
# note: we call different commands for the different
# principal types because handling of 'userCertificate'
# vs. 'userCertificate;binary' varies by plugin.
if principal_type == SERVICE:
api.Command['service_mod'](principal_string, **kwargs)
elif principal_type == HOST:
@ -954,6 +934,51 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
value=pkey_to_value(int(result['request_id']), kw),
)
def lookup_principal(self, principal):
"""
Look up a principal's account. Only works for users, hosts, services.
"""
return self.api.Backend.ldap2.find_entry_by_attr(
'krbprincipalname', principal, 'krbprincipalaux',
base_dn=DN(self.api.env.container_accounts, self.api.env.basedn)
)
def lookup_or_add_principal(self, principal, add):
"""
Look up a principal or add it if it does not exist.
Only works for users, hosts, services. krbtgt must be
handled separately.
Only service principals get added, and only when ``add`` is
``True``. If ``add`` is requested for a nonexistant user or
host, raise ``OperationNotSupportedForPrincipalTypes``.
:param principal: ``kerberos.Principal`` to look up
:param add: whether to add the principal if not found; bool
:return: an ``LDAPEntry``
"""
try:
return self.lookup_principal(principal)
except errors.NotFound:
if add:
if principal.is_service and not principal.is_host:
self.api.Command.service_add(
six.text_type(principal), all=True, force=True)
return self.lookup_principal(principal) # we want an LDAPEntry
else:
if principal.is_user:
princtype_str = _('user')
else:
princtype_str = _('host')
raise errors.OperationNotSupportedForPrincipalType(
operation=_("'add' option"),
principal_type=princtype_str)
else:
raise errors.NotFound(
reason=_("The principal for this request doesn't exist."))
def _emails_are_valid(csr_emails, principal_emails):
"""