freeipa/ipalib/install/certstore.py
Stanislav Laznicka 31142ead83 Unify storing certificates in LDAP
Recent certificate refactoring left the system in a state where
the certificates are somewhere converted to DER format, somewhere
directly sent to ipaldap as IPACertificate objects. The latter
is the desirable way, make sure it's the one commonly used.

https://pagure.io/freeipa/issue/4985

Reviewed-By: Fraser Tweedale <ftweedal@redhat.com>
2017-08-25 09:40:15 +02:00

388 lines
14 KiB
Python

# Authors:
# Jan Cholasta <jcholast@redhat.com>
#
# Copyright (C) 2014 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
LDAP shared certificate store.
"""
from pyasn1.error import PyAsn1Error
from ipapython.dn import DN
from ipapython.certdb import get_ca_nickname, TrustFlags
from ipalib import errors, x509
def _parse_cert(cert):
try:
subject = DN(cert.subject)
issuer = DN(cert.issuer)
serial_number = cert.serial_number
public_key_info = cert.public_key_info_bytes
except (ValueError, PyAsn1Error) as e:
raise ValueError("failed to decode certificate: %s" % e)
subject = str(subject).replace('\\;', '\\3b')
issuer = str(issuer).replace('\\;', '\\3b')
issuer_serial = '%s;%s' % (issuer, serial_number)
return subject, issuer_serial, public_key_info
def init_ca_entry(entry, cert, nickname, trusted, ext_key_usage):
"""
Initialize certificate store entry for a CA certificate.
"""
subject, issuer_serial, public_key = _parse_cert(cert)
if ext_key_usage is not None:
try:
cert_eku = cert.extended_key_usage
except ValueError as e:
raise ValueError("failed to decode certificate: %s" % e)
if cert_eku is not None:
cert_eku -= {x509.EKU_SERVER_AUTH, x509.EKU_CLIENT_AUTH,
x509.EKU_EMAIL_PROTECTION, x509.EKU_CODE_SIGNING,
x509.EKU_ANY, x509.EKU_PLACEHOLDER}
ext_key_usage = ext_key_usage | cert_eku
entry['objectClass'] = ['ipaCertificate', 'pkiCA', 'ipaKeyPolicy']
entry['cn'] = [nickname]
entry['ipaCertSubject'] = [subject]
entry['ipaCertIssuerSerial'] = [issuer_serial]
entry['ipaPublicKey'] = [public_key]
entry['cACertificate;binary'] = [cert]
if trusted is not None:
entry['ipaKeyTrust'] = ['trusted' if trusted else 'distrusted']
if ext_key_usage is not None:
ext_key_usage = list(ext_key_usage)
if not ext_key_usage:
ext_key_usage.append(x509.EKU_PLACEHOLDER)
entry['ipaKeyExtUsage'] = ext_key_usage
def update_compat_ca(ldap, base_dn, cert):
"""
Update the CA certificate in cn=CAcert,cn=ipa,cn=etc,SUFFIX.
"""
dn = DN(('cn', 'CAcert'), ('cn', 'ipa'), ('cn', 'etc'), base_dn)
try:
entry = ldap.get_entry(dn, attrs_list=['cACertificate;binary'])
entry.single_value['cACertificate;binary'] = cert
ldap.update_entry(entry)
except errors.NotFound:
entry = ldap.make_entry(dn)
entry['objectClass'] = ['nsContainer', 'pkiCA']
entry.single_value['cn'] = 'CAcert'
entry.single_value['cACertificate;binary'] = cert
ldap.add_entry(entry)
except errors.EmptyModlist:
pass
def clean_old_config(ldap, base_dn, dn, config_ipa, config_compat):
"""
Remove ipaCA and compatCA flags from their previous carriers.
"""
if not config_ipa and not config_compat:
return
try:
result, _truncated = ldap.find_entries(
base_dn=DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'),
base_dn),
filter='(|(ipaConfigString=ipaCA)(ipaConfigString=compatCA))',
attrs_list=['ipaConfigString'])
except errors.NotFound:
return
for entry in result:
if entry.dn == dn:
continue
for config in list(entry['ipaConfigString']):
if config.lower() == 'ipaca' and config_ipa:
entry['ipaConfigString'].remove(config)
elif config.lower() == 'compatca' and config_compat:
entry['ipaConfigString'].remove(config)
try:
ldap.update_entry(entry)
except errors.EmptyModlist:
pass
def add_ca_cert(ldap, base_dn, cert, nickname, trusted=None,
ext_key_usage=None, config_ipa=False, config_compat=False):
"""
Add new entry for a CA certificate to the certificate store.
"""
container_dn = DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'),
base_dn)
dn = DN(('cn', nickname), container_dn)
entry = ldap.make_entry(dn)
init_ca_entry(entry, cert, nickname, trusted, ext_key_usage)
if config_ipa:
entry.setdefault('ipaConfigString', []).append('ipaCA')
if config_compat:
entry.setdefault('ipaConfigString', []).append('compatCA')
if config_compat:
update_compat_ca(ldap, base_dn, cert)
ldap.add_entry(entry)
clean_old_config(ldap, base_dn, dn, config_ipa, config_compat)
def update_ca_cert(ldap, base_dn, cert, trusted=None, ext_key_usage=None,
config_ipa=False, config_compat=False):
"""
Update existing entry for a CA certificate in the certificate store.
"""
subject, issuer_serial, public_key = _parse_cert(cert)
filter = ldap.make_filter({'ipaCertSubject': subject})
result, _truncated = ldap.find_entries(
base_dn=DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'),
base_dn),
filter=filter,
attrs_list=['cn', 'ipaCertSubject', 'ipaCertIssuerSerial',
'ipaPublicKey', 'ipaKeyTrust', 'ipaKeyExtUsage',
'ipaConfigString', 'cACertificate;binary'])
entry = result[0]
dn = entry.dn
for old_cert in entry['cACertificate;binary']:
# Check if we are adding a new cert
if old_cert == cert:
break
else:
# We are adding a new cert, validate it
if entry.single_value['ipaCertSubject'].lower() != subject.lower():
raise ValueError("subject name mismatch")
if entry.single_value['ipaPublicKey'] != public_key:
raise ValueError("subject public key info mismatch")
entry['ipaCertIssuerSerial'].append(issuer_serial)
entry['cACertificate;binary'].append(cert)
# Update key trust
if trusted is not None:
old_trust = entry.single_value.get('ipaKeyTrust')
new_trust = 'trusted' if trusted else 'distrusted'
if old_trust is not None and old_trust.lower() != new_trust:
raise ValueError("inconsistent trust")
entry.single_value['ipaKeyTrust'] = new_trust
# Update extended key usage
if trusted is not False:
if ext_key_usage is not None:
old_eku = set(entry.get('ipaKeyExtUsage', []))
old_eku.discard(x509.EKU_PLACEHOLDER)
new_eku = old_eku | ext_key_usage
if not new_eku:
new_eku.add(x509.EKU_PLACEHOLDER)
entry['ipaKeyExtUsage'] = list(new_eku)
else:
entry.pop('ipaKeyExtUsage', None)
# Update configuration flags
is_ipa = False
is_compat = False
for config in entry.get('ipaConfigString', []):
if config.lower() == 'ipaca':
is_ipa = True
elif config.lower() == 'compatca':
is_compat = True
if config_ipa and not is_ipa:
entry.setdefault('ipaConfigString', []).append('ipaCA')
if config_compat and not is_compat:
entry.setdefault('ipaConfigString', []).append('compatCA')
if is_compat or config_compat:
update_compat_ca(ldap, base_dn, cert)
ldap.update_entry(entry)
clean_old_config(ldap, base_dn, dn, config_ipa, config_compat)
def put_ca_cert(ldap, base_dn, cert, nickname, trusted=None,
ext_key_usage=None, config_ipa=False, config_compat=False):
"""
Add or update entry for a CA certificate in the certificate store.
:param cert: IPACertificate
"""
try:
update_ca_cert(ldap, base_dn, cert, trusted, ext_key_usage,
config_ipa=config_ipa, config_compat=config_compat)
except errors.NotFound:
add_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage,
config_ipa=config_ipa, config_compat=config_compat)
except errors.EmptyModlist:
pass
def make_compat_ca_certs(certs, realm, ipa_ca_subject):
"""
Make CA certificates and associated key policy from DER certificates.
"""
result = []
for cert in certs:
subject, _issuer_serial, _public_key_info = _parse_cert(cert)
subject = DN(subject)
if ipa_ca_subject is not None and subject == DN(ipa_ca_subject):
nickname = get_ca_nickname(realm)
ext_key_usage = {x509.EKU_SERVER_AUTH,
x509.EKU_CLIENT_AUTH,
x509.EKU_EMAIL_PROTECTION,
x509.EKU_CODE_SIGNING}
else:
nickname = str(subject)
ext_key_usage = {x509.EKU_SERVER_AUTH}
result.append((cert, nickname, True, ext_key_usage))
return result
def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
filter_subject=None):
"""
Get CA certificates and associated key policy from the certificate store.
"""
if filter_subject is not None:
if not isinstance(filter_subject, list):
filter_subject = [filter_subject]
filter_subject = [str(subj).replace('\\;', '\\3b')
for subj in filter_subject]
certs = []
config_dn = DN(('cn', 'ipa'), ('cn', 'etc'), base_dn)
container_dn = DN(('cn', 'certificates'), config_dn)
try:
# Search the certificate store for CA certificate entries
filters = ['(objectClass=ipaCertificate)', '(objectClass=pkiCA)']
if filter_subject:
filter = ldap.make_filter({'ipaCertSubject': filter_subject})
filters.append(filter)
result, _truncated = ldap.find_entries(
base_dn=container_dn,
filter=ldap.combine_filters(filters, ldap.MATCH_ALL),
attrs_list=['cn', 'ipaCertSubject', 'ipaCertIssuerSerial',
'ipaPublicKey', 'ipaKeyTrust', 'ipaKeyExtUsage',
'cACertificate;binary'])
for entry in result:
nickname = entry.single_value['cn']
trusted = entry.single_value.get('ipaKeyTrust', 'unknown').lower()
if trusted == 'trusted':
trusted = True
elif trusted == 'distrusted':
trusted = False
else:
trusted = None
ext_key_usage = entry.get('ipaKeyExtUsage')
if ext_key_usage is not None:
ext_key_usage = set(str(p) for p in ext_key_usage)
ext_key_usage.discard(x509.EKU_PLACEHOLDER)
for cert in entry.get('cACertificate;binary', []):
try:
_parse_cert(cert)
except ValueError:
certs = []
break
certs.append((cert, nickname, trusted, ext_key_usage))
except errors.NotFound:
try:
ldap.get_entry(container_dn, [''])
except errors.NotFound:
# Fallback to cn=CAcert,cn=ipa,cn=etc,SUFFIX
dn = DN(('cn', 'CAcert'), config_dn)
entry = ldap.get_entry(dn, ['cACertificate;binary'])
cert = entry.single_value['cACertificate;binary']
try:
subject, _issuer_serial, _public_key_info = _parse_cert(cert)
except ValueError:
pass
else:
if filter_subject is not None and subject not in filter_subject:
raise errors.NotFound(reason="no matching entry found")
if compat_ipa_ca:
ca_subject = subject
else:
ca_subject = None
certs = make_compat_ca_certs([cert], compat_realm, ca_subject)
if certs:
return certs
else:
raise errors.NotFound(reason="no such entry")
def trust_flags_to_key_policy(trust_flags):
"""
Convert certutil trust flags to certificate store key policy.
"""
return trust_flags[1:]
def key_policy_to_trust_flags(trusted, ca, ext_key_usage):
"""
Convert certificate store key policy to certutil trust flags.
"""
return TrustFlags(False, trusted, ca, ext_key_usage)
def put_ca_cert_nss(ldap, base_dn, cert, nickname, trust_flags,
config_ipa=False, config_compat=False):
"""
Add or update entry for a CA certificate in the certificate store.
:param cert: IPACertificate
"""
trusted, ca, ext_key_usage = trust_flags_to_key_policy(trust_flags)
if ca is False:
raise ValueError("must be CA certificate")
put_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage,
config_ipa, config_compat)
def get_ca_certs_nss(ldap, base_dn, compat_realm, compat_ipa_ca,
filter_subject=None):
"""
Get CA certificates and associated trust flags from the certificate store.
"""
nss_certs = []
certs = get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
filter_subject=filter_subject)
for cert, nickname, trusted, ext_key_usage in certs:
trust_flags = key_policy_to_trust_flags(trusted, True, ext_key_usage)
nss_certs.append((cert, nickname, trust_flags))
return nss_certs