Make host/service cert revocation aware of lightweight CAs

Revocation of host/service certs on host/service deletion or other
operations is broken when cert is issued by a lightweight (sub)CA,
causing the delete operation to be aborted.  Look up the issuing CA
and pass it to 'cert_revoke' to fix the issue.

Fixes: https://fedorahosted.org/freeipa/ticket/6221
Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
Fraser Tweedale 2016-08-26 15:31:13 +10:00 committed by Jan Cholasta
parent 520ad7d865
commit daeaf2a823
2 changed files with 37 additions and 39 deletions

View File

@ -843,12 +843,8 @@ class host_del(LDAPDelete):
) )
if self.api.Command.ca_is_enabled()['result']: if self.api.Command.ca_is_enabled()['result']:
try: certs = self.api.Command.cert_find(host=keys)['result']
entry_attrs = ldap.get_entry(dn, ['usercertificate']) revoke_certs(certs)
except errors.NotFound:
self.obj.handle_not_found(*keys)
revoke_certs(entry_attrs.get('usercertificate', []), self.log)
return dn return dn
@ -910,7 +906,9 @@ class host_mod(LDAPUpdate):
old_certs = entry_attrs_old.get('usercertificate', []) old_certs = entry_attrs_old.get('usercertificate', [])
old_certs_der = [x509.normalize_certificate(c) for c in old_certs] old_certs_der = [x509.normalize_certificate(c) for c in old_certs]
removed_certs_der = set(old_certs_der) - set(certs_der) removed_certs_der = set(old_certs_der) - set(certs_der)
revoke_certs(removed_certs_der, self.log) for der in removed_certs_der:
rm_certs = api.Command.cert_find(certificate=der)['result']
revoke_certs(rm_certs)
if certs: if certs:
entry_attrs['usercertificate'] = certs_der entry_attrs['usercertificate'] = certs_der
@ -1196,10 +1194,10 @@ class host_disable(LDAPQuery):
except errors.NotFound: except errors.NotFound:
self.obj.handle_not_found(*keys) self.obj.handle_not_found(*keys)
if self.api.Command.ca_is_enabled()['result']: if self.api.Command.ca_is_enabled()['result']:
certs = entry_attrs.get('usercertificate', []) certs = self.api.Command.cert_find(host=keys)['result']
if certs: if certs:
revoke_certs(certs, self.log) revoke_certs(certs)
# Remove the usercertificate altogether # Remove the usercertificate altogether
entry_attrs['usercertificate'] = None entry_attrs['usercertificate'] = None
ldap.update_entry(entry_attrs) ldap.update_entry(entry_attrs)
@ -1341,8 +1339,8 @@ class host_remove_cert(LDAPRemoveAttributeViaOption):
def post_callback(self, ldap, dn, entry_attrs, *keys, **options): def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
assert isinstance(dn, DN) assert isinstance(dn, DN)
if 'usercertificate' in options: for cert in options.get('usercertificate', []):
revoke_certs(options['usercertificate'], self.log) revoke_certs(api.Command.cert_find(certificate=cert)['result'])
return dn return dn

View File

@ -220,37 +220,38 @@ def validate_certificate(ugettext, cert):
x509.validate_certificate(cert, datatype=x509.DER) x509.validate_certificate(cert, datatype=x509.DER)
def revoke_certs(certs, logger=None): def revoke_certs(certs):
""" """
revoke the certificates removed from host/service entry revoke the certificates removed from host/service entry
:param certs: Output of a 'cert_find' command.
""" """
for cert in certs: for cert in certs:
try: if 'cacn' not in cert:
cert = x509.normalize_certificate(cert) # Cert is known to IPA, but has no associated CA.
except errors.CertificateFormatError as e: # If it was issued by 3rd-party CA, we can't revoke it.
if logger is not None: # If it was issued by a Dogtag lightweight CA that was
logger.info("Problem decoding certificate: %s" % e) # subsequently deleted, we can't revoke it via IPA.
# We could go directly to Dogtag to revoke it, but the
serial = unicode(x509.get_serial_number(cert, x509.DER)) # issuer's cert should have been revoked so never mind.
try:
result = api.Command['cert_show'](unicode(serial))['result']
except errors.CertificateOperationError:
continue continue
if 'revocation_reason' in result:
continue if cert['revoked']:
if x509.normalize_certificate(result['certificate']) != cert: # cert is already revoked
continue continue
try: try:
api.Command['cert_revoke'](unicode(serial), api.Command['cert_revoke'](
revocation_reason=4) cert['serial_number'],
cacn=cert['cacn'],
revocation_reason=4,
)
except errors.NotImplementedError: except errors.NotImplementedError:
# some CA's might not implement revoke # some CA's might not implement revoke
pass pass
def set_certificate_attrs(entry_attrs): def set_certificate_attrs(entry_attrs):
""" """
Set individual attributes from some values from a certificate. Set individual attributes from some values from a certificate.
@ -674,11 +675,8 @@ class service_del(LDAPDelete):
# custom services allow them to manage them. # custom services allow them to manage them.
check_required_principal(ldap, keys[-1]) check_required_principal(ldap, keys[-1])
if self.api.Command.ca_is_enabled()['result']: if self.api.Command.ca_is_enabled()['result']:
try: certs = self.api.Command.cert_find(service=keys)['result']
entry_attrs = ldap.get_entry(dn, ['usercertificate']) revoke_certs(certs)
except errors.NotFound:
self.obj.handle_not_found(*keys)
revoke_certs(entry_attrs.get('usercertificate', []), self.log)
return dn return dn
@ -711,7 +709,9 @@ class service_mod(LDAPUpdate):
old_certs = entry_attrs_old.get('usercertificate', []) old_certs = entry_attrs_old.get('usercertificate', [])
old_certs_der = [x509.normalize_certificate(c) for c in old_certs] old_certs_der = [x509.normalize_certificate(c) for c in old_certs]
removed_certs_der = set(old_certs_der) - set(certs_der) removed_certs_der = set(old_certs_der) - set(certs_der)
revoke_certs(removed_certs_der, self.log) for der in removed_certs_der:
rm_certs = api.Command.cert_find(certificate=der)['result']
revoke_certs(rm_certs)
if certs: if certs:
entry_attrs['usercertificate'] = certs_der entry_attrs['usercertificate'] = certs_der
@ -950,10 +950,10 @@ class service_disable(LDAPQuery):
done_work = False done_work = False
if self.api.Command.ca_is_enabled()['result']: if self.api.Command.ca_is_enabled()['result']:
certs = entry_attrs.get('usercertificate', []) certs = self.api.Command.cert_find(service=keys)['result']
if len(certs) > 0: if len(certs) > 0:
revoke_certs(certs, self.log) revoke_certs(certs)
# Remove the usercertificate altogether # Remove the usercertificate altogether
entry_attrs['usercertificate'] = None entry_attrs['usercertificate'] = None
ldap.update_entry(entry_attrs) ldap.update_entry(entry_attrs)
@ -989,8 +989,8 @@ class service_remove_cert(LDAPRemoveAttributeViaOption):
def post_callback(self, ldap, dn, entry_attrs, *keys, **options): def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
assert isinstance(dn, DN) assert isinstance(dn, DN)
if 'usercertificate' in options: for cert in options.get('usercertificate', []):
revoke_certs(options['usercertificate'], self.log) revoke_certs(api.Command.cert_find(certificate=cert)['result'])
return dn return dn