mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
certstore: Make certificate retrieval more robust
https://fedorahosted.org/freeipa/ticket/4565 Reviewed-By: David Kupka <dkupka@redhat.com>
This commit is contained in:
parent
179be3c222
commit
4154c8893f
@ -239,6 +239,31 @@ def put_ca_cert(ldap, base_dn, dercert, nickname, trusted=None,
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def make_compat_ca_certs(certs, realm, ipa_ca_subject):
|
||||||
|
"""
|
||||||
|
Make CA certificates and associated key policy from DER certificates.
|
||||||
|
"""
|
||||||
|
result = []
|
||||||
|
|
||||||
|
for cert in certs:
|
||||||
|
subject, issuer_serial, public_key_info = _parse_cert(cert)
|
||||||
|
subject = DN(subject)
|
||||||
|
|
||||||
|
if ipa_ca_subject is not None and subject == DN(ipa_ca_subject):
|
||||||
|
nickname = get_ca_nickname(realm)
|
||||||
|
ext_key_usage = {x509.EKU_SERVER_AUTH,
|
||||||
|
x509.EKU_CLIENT_AUTH,
|
||||||
|
x509.EKU_EMAIL_PROTECTION,
|
||||||
|
x509.EKU_CODE_SIGNING}
|
||||||
|
else:
|
||||||
|
nickname = str(subject)
|
||||||
|
ext_key_usage = {x509.EKU_SERVER_AUTH}
|
||||||
|
|
||||||
|
result.append((cert, nickname, True, ext_key_usage))
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
|
def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
|
||||||
filter_subject=None):
|
filter_subject=None):
|
||||||
"""
|
"""
|
||||||
@ -250,6 +275,7 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
|
|||||||
filter_subject = [str(subj).replace('\\;', '\\3b')
|
filter_subject = [str(subj).replace('\\;', '\\3b')
|
||||||
for subj in filter_subject]
|
for subj in filter_subject]
|
||||||
|
|
||||||
|
certs = []
|
||||||
config_dn = DN(('cn', 'ipa'), ('cn', 'etc'), base_dn)
|
config_dn = DN(('cn', 'ipa'), ('cn', 'etc'), base_dn)
|
||||||
container_dn = DN(('cn', 'certificates'), config_dn)
|
container_dn = DN(('cn', 'certificates'), config_dn)
|
||||||
try:
|
try:
|
||||||
@ -265,7 +291,6 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
|
|||||||
'ipaPublicKey', 'ipaKeyTrust', 'ipaKeyExtUsage',
|
'ipaPublicKey', 'ipaKeyTrust', 'ipaKeyExtUsage',
|
||||||
'cACertificate;binary'])
|
'cACertificate;binary'])
|
||||||
|
|
||||||
certs = []
|
|
||||||
for entry in result:
|
for entry in result:
|
||||||
nickname = entry.single_value['cn']
|
nickname = entry.single_value['cn']
|
||||||
trusted = entry.single_value.get('ipaKeyTrust', 'unknown').lower()
|
trusted = entry.single_value.get('ipaKeyTrust', 'unknown').lower()
|
||||||
@ -281,34 +306,39 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
|
|||||||
ext_key_usage.discard(x509.EKU_PLACEHOLDER)
|
ext_key_usage.discard(x509.EKU_PLACEHOLDER)
|
||||||
|
|
||||||
for cert in entry.get('cACertificate;binary', []):
|
for cert in entry.get('cACertificate;binary', []):
|
||||||
|
try:
|
||||||
|
_parse_cert(cert)
|
||||||
|
except ValueError:
|
||||||
|
certs = []
|
||||||
|
break
|
||||||
certs.append((cert, nickname, trusted, ext_key_usage))
|
certs.append((cert, nickname, trusted, ext_key_usage))
|
||||||
|
|
||||||
return certs
|
|
||||||
except errors.NotFound:
|
except errors.NotFound:
|
||||||
try:
|
try:
|
||||||
ldap.get_entry(container_dn, [''])
|
ldap.get_entry(container_dn, [''])
|
||||||
except errors.NotFound:
|
except errors.NotFound:
|
||||||
pass
|
# Fallback to cn=CAcert,cn=ipa,cn=etc,SUFFIX
|
||||||
else:
|
dn = DN(('cn', 'CAcert'), config_dn)
|
||||||
return []
|
entry = ldap.get_entry(dn, ['cACertificate;binary'])
|
||||||
|
|
||||||
# Fallback to cn=CAcert,cn=ipa,cn=etc,SUFFIX
|
cert = entry.single_value['cACertificate;binary']
|
||||||
dn = DN(('cn', 'CAcert'), config_dn)
|
try:
|
||||||
entry = ldap.get_entry(dn, ['cACertificate;binary'])
|
subject, issuer_serial, public_key_info = _parse_cert(cert)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if filter_subject is not None and subject not in filter_subject:
|
||||||
|
raise errors.NotFound(reason="no matching entry found")
|
||||||
|
|
||||||
cert = entry.single_value['cACertificate;binary']
|
if compat_ipa_ca:
|
||||||
subject, issuer_serial, public_key_info = _parse_cert(cert)
|
ca_subject = subject
|
||||||
if filter_subject is not None and subject not in filter_subject:
|
else:
|
||||||
return []
|
ca_subject = None
|
||||||
|
certs = make_compat_ca_certs([cert], compat_realm, ca_subject)
|
||||||
|
|
||||||
nickname = get_ca_nickname(compat_realm)
|
if certs:
|
||||||
ext_key_usage = {x509.EKU_SERVER_AUTH}
|
return certs
|
||||||
if compat_ipa_ca:
|
else:
|
||||||
ext_key_usage |= {x509.EKU_CLIENT_AUTH,
|
raise errors.NotFound(reason="no such entry")
|
||||||
x509.EKU_EMAIL_PROTECTION,
|
|
||||||
x509.EKU_CODE_SIGNING}
|
|
||||||
|
|
||||||
return [(cert, nickname, True, ext_key_usage)]
|
|
||||||
|
|
||||||
|
|
||||||
def trust_flags_to_key_policy(trust_flags):
|
def trust_flags_to_key_policy(trust_flags):
|
||||||
|
Loading…
Reference in New Issue
Block a user