From 227cf8d4e98178212386eca0a1ac21459436e63f Mon Sep 17 00:00:00 2001 From: Fraser Tweedale Date: Fri, 2 Dec 2016 14:47:27 +1000 Subject: [PATCH] cert-request: simplify request processing Currently the cert-request execution is complicated and cannot handle aliases in the --principal argument. Implement the following simplifications: - Search all user/host/service accounts at once, by krbPrincipalName (error if no account found). Use principal canonical name to determine the type of the principal. - Update subject principals userCertificate attribute uniformly, instead of dispatching to user/host/service-mod based on type of principal. Fixes: https://fedorahosted.org/freeipa/ticket/6531 Reviewed-By: Felipe Volpone --- ipaserver/plugins/cert.py | 121 +++++++++++++++++++++++--------------- 1 file changed, 73 insertions(+), 48 deletions(-) diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index b62f82541..0b81a826e 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -150,13 +150,6 @@ http://www.ietf.org/rfc/rfc5280.txt USER, HOST, KRBTGT, SERVICE = range(4) -PRINCIPAL_TYPE_STRING_MAP = { - USER: _('user'), - HOST: _('host'), - KRBTGT: _('krbtgt'), - SERVICE: _('service'), -} - register = Registry() PKIDATE_FORMAT = '%Y-%m-%d' @@ -176,11 +169,13 @@ def _acl_make_request(principal_type, principal, ca_id, profile_id): req.user.name = unicode(principal) groups = [] if principal_type == 'user': - user_obj = api.Command.user_show(principal.username)['result'] + user_obj = api.Command.user_show( + six.text_type(principal.username))['result'] groups = user_obj.get('memberof_group', []) groups += user_obj.get('memberofindirect_group', []) elif principal_type == 'host': - host_obj = api.Command.host_show(principal.hostname)['result'] + host_obj = api.Command.host_show( + six.text_type(principal.hostname))['result'] groups = host_obj.get('memberof_hostgroup', []) groups += host_obj.get('memberofindirect_hostgroup', []) req.user.groups = sorted(set(groups)) @@ -681,17 +676,33 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): Binding with a user principal one needs to be in the request_certs taskgroup (directly or indirectly via role membership). """ + principal_arg = kw.get('principal') - principal = kw.get('principal') - principal_string = unicode(principal) - principal_type = principal_to_principal_type(principal) + if principal_to_principal_type(principal_arg) == KRBTGT: + principal_obj = None + principal = principal_arg - if principal_type == KRBTGT: + # Allow krbtgt to use only the KDC certprofile if profile_id != self.Backend.ra.KDC_PROFILE: raise errors.ACIError( info=_("krbtgt certs can use only the %s profile") % ( self.Backend.ra.KDC_PROFILE)) + # Allow only our own realm krbtgt for now; no trusted realms. + if principal != kerberos.Principal((u'krbtgt', realm), + realm=realm): + raise errors.NotFound("Not our realm's krbtgt") + + else: + principal_obj = self.lookup_or_add_principal(principal_arg, add) + if 'krbcanonicalname' in principal_obj: + principal = principal_obj['krbcanonicalname'][0] + else: + principal = principal_obj['krbprincipalname'][0] + + principal_string = unicode(principal) + principal_type = principal_to_principal_type(principal) + bind_principal = kerberos.Principal(getattr(context, 'principal')) bind_principal_string = unicode(bind_principal) bind_principal_type = principal_to_principal_type(bind_principal) @@ -725,41 +736,6 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): except cryptography.x509.extensions.ExtensionNotFound: ext_san = None - dn = None - principal_obj = None - # See if the service exists and punt if it doesn't and we aren't - # going to add it - try: - if principal_type == SERVICE: - principal_obj = api.Command['service_show'](principal_string, all=True) - elif principal_type == KRBTGT: - # Allow only our own realm krbtgt for now, no trusted realm's. - if principal != kerberos.Principal((u'krbtgt', realm), - realm=realm): - raise errors.NotFound("Not our realm's krbtgt") - elif principal_type == HOST: - principal_obj = api.Command['host_show']( - principal.hostname, all=True) - elif principal_type == USER: - principal_obj = api.Command['user_show']( - principal.username, all=True) - except errors.NotFound as e: - if add: - if principal_type == SERVICE: - principal_obj = api.Command['service_add']( - principal_string, force=True) - else: - princtype_str = PRINCIPAL_TYPE_STRING_MAP[principal_type] - raise errors.OperationNotSupportedForPrincipalType( - operation=_("'add' option"), - principal_type=princtype_str) - else: - raise errors.NotFound( - reason=_("The principal for this request doesn't exist.")) - if principal_obj: - principal_obj = principal_obj['result'] - dn = principal_obj['dn'] - # Ensure that the DN in the CSR matches the principal # # We only look at the "most specific" CN value @@ -810,6 +786,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if principal_type != KRBTGT: # We got this far so the principal entry exists, can we write it? + dn = principal_obj.dn if not ldap.can_write(dn, "usercertificate"): raise errors.ACIError( info=_("Insufficient 'write' privilege to the " @@ -934,6 +911,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if store and 'certificate' in result: cert = str(result.get('certificate')) kwargs = dict(addattr=u'usercertificate={}'.format(cert)) + # note: we call different commands for the different + # principal types because handling of 'userCertificate' + # vs. 'userCertificate;binary' varies by plugin. if principal_type == SERVICE: api.Command['service_mod'](principal_string, **kwargs) elif principal_type == HOST: @@ -954,6 +934,51 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): value=pkey_to_value(int(result['request_id']), kw), ) + def lookup_principal(self, principal): + """ + Look up a principal's account. Only works for users, hosts, services. + """ + return self.api.Backend.ldap2.find_entry_by_attr( + 'krbprincipalname', principal, 'krbprincipalaux', + base_dn=DN(self.api.env.container_accounts, self.api.env.basedn) + ) + + def lookup_or_add_principal(self, principal, add): + """ + Look up a principal or add it if it does not exist. + + Only works for users, hosts, services. krbtgt must be + handled separately. + + Only service principals get added, and only when ``add`` is + ``True``. If ``add`` is requested for a nonexistant user or + host, raise ``OperationNotSupportedForPrincipalTypes``. + + :param principal: ``kerberos.Principal`` to look up + :param add: whether to add the principal if not found; bool + :return: an ``LDAPEntry`` + + """ + try: + return self.lookup_principal(principal) + except errors.NotFound: + if add: + if principal.is_service and not principal.is_host: + self.api.Command.service_add( + six.text_type(principal), all=True, force=True) + return self.lookup_principal(principal) # we want an LDAPEntry + else: + if principal.is_user: + princtype_str = _('user') + else: + princtype_str = _('host') + raise errors.OperationNotSupportedForPrincipalType( + operation=_("'add' option"), + principal_type=princtype_str) + else: + raise errors.NotFound( + reason=_("The principal for this request doesn't exist.")) + def _emails_are_valid(csr_emails, principal_emails): """