mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
reworked certificate normalization and revocation
Validation of certificate is now handled by `x509.validate_certificate'. Revocation of the host and service certificates was factored out to a separate function. Part of http://www.freeipa.org/page/V4/User_Certificates Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
committed by
Jan Cholasta
parent
93dab56ebf
commit
53b11b6117
@@ -786,30 +786,8 @@ class host_del(LDAPDelete):
|
||||
entry_attrs = ldap.get_entry(dn, ['usercertificate'])
|
||||
except errors.NotFound:
|
||||
self.obj.handle_not_found(*keys)
|
||||
for cert in entry_attrs.get('usercertificate', []):
|
||||
cert = x509.normalize_certificate(cert)
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
result = api.Command['cert_show'](serial)['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](serial,
|
||||
revocation_reason=4)
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except NSPRError, nsprerr:
|
||||
if nsprerr.errno == -8183:
|
||||
# If we can't decode the cert them proceed with
|
||||
# removing the host.
|
||||
self.log.info("Problem decoding certificate %s" %
|
||||
nsprerr.args[1])
|
||||
else:
|
||||
raise nsprerr
|
||||
|
||||
revoke_certs(entry_attrs.get('usercertificate', []), self.log)
|
||||
|
||||
return dn
|
||||
|
||||
@@ -879,29 +857,8 @@ class host_mod(LDAPUpdate):
|
||||
old_certs = entry_attrs_old.get('usercertificate', [])
|
||||
old_certs_der = map(x509.normalize_certificate, old_certs)
|
||||
removed_certs_der = set(old_certs_der) - set(certs_der)
|
||||
for cert in removed_certs_der:
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
result = api.Command['cert_show'](serial)['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](
|
||||
serial, revocation_reason=4)
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except NSPRError, nsprerr:
|
||||
if nsprerr.errno == -8183:
|
||||
# If we can't decode the cert them proceed with
|
||||
# modifying the host.
|
||||
self.log.info("Problem decoding certificate %s" %
|
||||
nsprerr.args[1])
|
||||
else:
|
||||
raise nsprerr
|
||||
revoke_certs(removed_certs_der, self.log)
|
||||
|
||||
if certs:
|
||||
entry_attrs['usercertificate'] = certs_der
|
||||
|
||||
@@ -1163,31 +1120,9 @@ class host_disable(LDAPQuery):
|
||||
self.obj.handle_not_found(*keys)
|
||||
if self.api.Command.ca_is_enabled()['result']:
|
||||
certs = entry_attrs.get('usercertificate', [])
|
||||
for cert in map(x509.normalize_certificate, certs):
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
result = api.Command['cert_show'](serial)['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](serial,
|
||||
revocation_reason=4)
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except NSPRError, nsprerr:
|
||||
if nsprerr.errno == -8183:
|
||||
# If we can't decode the cert them proceed with
|
||||
# disabling the host.
|
||||
self.log.info("Problem decoding certificate %s" %
|
||||
nsprerr.args[1])
|
||||
else:
|
||||
raise nsprerr
|
||||
|
||||
if certs:
|
||||
revoke_certs(certs, self.log)
|
||||
# Remove the usercertificate altogether
|
||||
entry_attrs['usercertificate'] = None
|
||||
ldap.update_entry(entry_attrs)
|
||||
|
||||
@@ -238,16 +238,42 @@ def normalize_principal(principal):
|
||||
|
||||
def validate_certificate(ugettext, cert):
|
||||
"""
|
||||
For now just verify that it is properly base64-encoded.
|
||||
Check whether the certificate is properly encoded to DER
|
||||
"""
|
||||
if cert and util.isvalid_base64(cert):
|
||||
if api.env.in_server:
|
||||
x509.validate_certificate(cert, datatype=x509.DER)
|
||||
|
||||
|
||||
def revoke_certs(certs, logger=None):
|
||||
"""
|
||||
revoke the certificates removed from host/service entry
|
||||
"""
|
||||
for cert in certs:
|
||||
try:
|
||||
base64.b64decode(cert)
|
||||
except Exception, e:
|
||||
raise errors.Base64DecodeError(reason=str(e))
|
||||
else:
|
||||
# We'll assume this is DER data
|
||||
pass
|
||||
cert = x509.normalize_certificate(cert)
|
||||
except errors.CertificateFormatError as e:
|
||||
if logger is not None:
|
||||
logger.info("Problem decoding certificate: %s" % e)
|
||||
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
|
||||
try:
|
||||
result = api.Command['cert_show'](unicode(serial))['result']
|
||||
except errors.CertificateOperationError:
|
||||
continue
|
||||
if 'revocation_reason' in result:
|
||||
continue
|
||||
if x509.normalize_certificate(result['certificate']) != cert:
|
||||
continue
|
||||
|
||||
try:
|
||||
api.Command['cert_revoke'](unicode(serial),
|
||||
revocation_reason=4)
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def set_certificate_attrs(entry_attrs):
|
||||
"""
|
||||
@@ -566,27 +592,8 @@ class service_del(LDAPDelete):
|
||||
entry_attrs = ldap.get_entry(dn, ['usercertificate'])
|
||||
except errors.NotFound:
|
||||
self.obj.handle_not_found(*keys)
|
||||
for cert in entry_attrs.get('usercertificate', []):
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
result = api.Command['cert_show'](unicode(serial))['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except NSPRError, nsprerr:
|
||||
if nsprerr.errno == -8183:
|
||||
# If we can't decode the cert them proceed with
|
||||
# removing the service.
|
||||
self.log.info("Problem decoding certificate %s" % nsprerr.args[1])
|
||||
else:
|
||||
raise nsprerr
|
||||
revoke_certs(entry_attrs.get('usercertificate', []), self.log)
|
||||
|
||||
return dn
|
||||
|
||||
|
||||
@@ -622,29 +629,8 @@ class service_mod(LDAPUpdate):
|
||||
old_certs = entry_attrs_old.get('usercertificate', [])
|
||||
old_certs_der = map(x509.normalize_certificate, old_certs)
|
||||
removed_certs_der = set(old_certs_der) - set(certs_der)
|
||||
for cert in removed_certs_der:
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
result = api.Command['cert_show'](serial)['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](
|
||||
serial, revocation_reason=4)
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except NSPRError, nsprerr:
|
||||
if nsprerr.errno == -8183:
|
||||
# If we can't decode the cert them proceed with
|
||||
# modifying the host.
|
||||
self.log.info("Problem decoding certificate %s" %
|
||||
nsprerr.args[1])
|
||||
else:
|
||||
raise nsprerr
|
||||
revoke_certs(removed_certs_der, self.log)
|
||||
|
||||
if certs:
|
||||
entry_attrs['usercertificate'] = certs_der
|
||||
|
||||
@@ -854,29 +840,9 @@ class service_disable(LDAPQuery):
|
||||
|
||||
if self.api.Command.ca_is_enabled()['result']:
|
||||
certs = entry_attrs.get('usercertificate', [])
|
||||
for cert in map(x509.normalize_certificate, certs):
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
result = api.Command['cert_show'](unicode(serial))['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement revoke
|
||||
pass
|
||||
except NSPRError, nsprerr:
|
||||
if nsprerr.errno == -8183:
|
||||
# If we can't decode the cert them proceed with
|
||||
# disabling the service
|
||||
self.log.info("Problem decoding certificate %s" % nsprerr.args[1])
|
||||
else:
|
||||
raise nsprerr
|
||||
|
||||
if len(certs) > 0:
|
||||
revoke_certs(certs, self.log)
|
||||
# Remove the usercertificate altogether
|
||||
entry_attrs['usercertificate'] = None
|
||||
ldap.update_entry(entry_attrs)
|
||||
|
||||
@@ -294,16 +294,24 @@ def normalize_certificate(rawcert):
|
||||
# was base64-encoded and now its not or it came in as DER format.
|
||||
# Let's decode it and see. Fetching the serial number will pass the
|
||||
# certificate through the NSS DER parser.
|
||||
validate_certificate(dercert, datatype=DER)
|
||||
|
||||
return dercert
|
||||
|
||||
|
||||
def validate_certificate(cert, datatype=PEM, dbdir=None):
|
||||
"""
|
||||
Perform certificate validation by trying to load it into NSS database
|
||||
"""
|
||||
try:
|
||||
serial = unicode(get_serial_number(dercert, DER))
|
||||
except NSPRError, nsprerr:
|
||||
load_certificate(cert, datatype=datatype, dbdir=dbdir)
|
||||
except NSPRError as nsprerr:
|
||||
if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
|
||||
raise errors.CertificateFormatError(
|
||||
error=_('improperly formatted DER-encoded certificate'))
|
||||
else:
|
||||
raise errors.CertificateFormatError(error=str(nsprerr))
|
||||
|
||||
return dercert
|
||||
|
||||
def write_certificate(rawcert, filename):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user