Support multiple CA certificates in /etc/ipa/ca.crt in ipa-client-install.

Part of https://fedorahosted.org/freeipa/ticket/3259
Part of https://fedorahosted.org/freeipa/ticket/3520

Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Jan Cholasta 2014-06-12 11:58:28 +02:00 committed by Petr Viktorin
parent 29f42cbec1
commit fd400588d7

View File

@ -233,15 +233,18 @@ def nickname_exists(nickname):
else:
return False
def cert_summary(msg, cert, indent=' '):
def cert_summary(msg, certs, indent=' '):
if msg:
s = '%s\n' % msg
else:
s = ''
s += '%sSubject: %s\n' % (indent, cert.subject)
s += '%sIssuer: %s\n' % (indent, cert.issuer)
s += '%sValid From: %s\n' % (indent, cert.valid_not_before_str)
s += '%sValid Until: %s\n' % (indent, cert.valid_not_after_str)
for cert in certs:
s += '%sSubject: %s\n' % (indent, cert.subject)
s += '%sIssuer: %s\n' % (indent, cert.issuer)
s += '%sValid From: %s\n' % (indent, cert.valid_not_before_str)
s += '%sValid Until: %s\n' % (indent, cert.valid_not_after_str)
s += '\n'
s = s[:-1]
return s
@ -1667,7 +1670,7 @@ def print_port_conf_info():
" TCP: 464\n"
" UDP: 464, 123 (if NTP enabled)")
def get_ca_cert_from_file(url):
def get_ca_certs_from_file(url):
'''
Get the CA cert from a user supplied file and write it into the
CACERT file.
@ -1694,13 +1697,13 @@ def get_ca_cert_from_file(url):
root_logger.debug("trying to retrieve CA cert from file %s", filename)
try:
cert = x509.load_certificate_from_file(filename)
except Exception:
certs = x509.load_certificate_list_from_file(filename)
except Exception, e:
raise errors.NoCertificateError(entry=filename)
return cert
return certs
def get_ca_cert_from_http(url, warn=True):
def get_ca_certs_from_http(url, warn=True):
'''
Use HTTP to retrieve the CA cert and write it into the CACERT file.
This is insecure and should be avoided.
@ -1720,13 +1723,13 @@ def get_ca_cert_from_http(url, warn=True):
raise errors.NoCertificateError(entry=url)
try:
cert = x509.load_certificate(stdout)
certs = x509.load_certificate_list(stdout)
except Exception:
raise errors.NoCertificateError(entry=url)
return cert
return certs
def get_ca_cert_from_ldap(server, basedn):
def get_ca_certs_from_ldap(server, basedn):
'''
Retrieve th CA cert from the LDAP server by binding to the
server with GSSAPI using the current Kerberos credentials.
@ -1755,14 +1758,14 @@ def get_ca_cert_from_ldap(server, basedn):
attrs_list=[ca_cert_attr],
time_limit=10)
except errors.NotFound, e:
root_logger.debug("get_ca_cert_from_ldap() error: %s", e)
root_logger.debug("get_ca_certs_from_ldap() error: %s", e)
raise errors.NoCertificateError(entry=server)
except errors.NetworkError, e:
root_logger.debug("get_ca_cert_from_ldap() error: %s", e)
root_logger.debug("get_ca_certs_from_ldap() error: %s", e)
raise errors.NetworkError(uri=conn.ldap_uri, error=str(e))
except Exception, e:
root_logger.debug("get_ca_cert_from_ldap() error: %s", e)
root_logger.debug("get_ca_certs_from_ldap() error: %s", e)
raise errors.LDAPError(str(e))
if len(result) != 1:
@ -1774,28 +1777,27 @@ def get_ca_cert_from_ldap(server, basedn):
except KeyError:
raise errors.NoCertificateError(entry=ca_cert_attr)
try:
cert = x509.load_certificate(der_cert, x509.DER)
except Exception, e:
raise errors.FileError(reason =
u"cannot write certificate file '%s': %s" % (ca_file, e))
cert = x509.load_certificate(der_cert, x509.DER)
return cert
return [cert]
def validate_new_ca_cert(existing_ca_cert, new_ca_cert, ask, override=False):
if existing_ca_cert is None:
def validate_new_ca_certs(existing_ca_certs, new_ca_certs, ask,
override=False):
if existing_ca_certs is None:
root_logger.info(
cert_summary("Successfully retrieved CA cert", new_ca_cert))
cert_summary("Successfully retrieved CA cert", new_ca_certs))
return
if existing_ca_cert.der_data != new_ca_cert.der_data:
existing_ca_certs = set(existing_ca_certs)
new_ca_certs = set(new_ca_certs)
if existing_ca_certs > new_ca_certs:
root_logger.warning(
"The CA cert available from the IPA server does not match the\n"
"local certificate available at %s" % CACERT)
root_logger.warning(
cert_summary("Existing CA cert:", existing_ca_cert))
cert_summary("Existing CA cert:", existing_ca_certs))
root_logger.warning(
cert_summary("Retrieved CA cert:", new_ca_cert))
cert_summary("Retrieved CA cert:", new_ca_certs))
if override:
root_logger.warning("Overriding existing CA cert\n")
elif not ask or not user_input(
@ -1806,7 +1808,7 @@ def validate_new_ca_cert(existing_ca_cert, new_ca_cert, ask, override=False):
root_logger.debug(
"Existing CA cert and Retrieved CA cert are identical")
def get_ca_cert(fstore, options, server, basedn):
def get_ca_certs(fstore, options, server, basedn):
'''
Examine the different options and determine a method for obtaining
the CA cert.
@ -1855,13 +1857,13 @@ def get_ca_cert(fstore, options, server, basedn):
interactive = not options.unattended
otp_auth = options.principal is None and options.password is not None
existing_ca_cert = None
ca_cert = None
existing_ca_certs = None
ca_certs = None
if options.ca_cert_file:
url = file_url()
try:
ca_cert = get_ca_cert_from_file(url)
ca_certs = get_ca_certs_from_file(url)
except errors.FileError, e:
root_logger.debug(e)
raise
@ -1873,7 +1875,8 @@ def get_ca_cert(fstore, options, server, basedn):
if os.path.exists(CACERT):
if os.path.isfile(CACERT):
try:
existing_ca_cert = x509.load_certificate_from_file(CACERT)
existing_ca_certs = x509.load_certificate_list_from_file(
CACERT)
except Exception, e:
raise errors.FileError(reason=u"Unable to load existing" +
" CA cert '%s': %s" % (CACERT, e))
@ -1882,7 +1885,7 @@ def get_ca_cert(fstore, options, server, basedn):
"not a plain file" % (CACERT))
if otp_auth:
if existing_ca_cert:
if existing_ca_certs:
root_logger.info("OTP case, CA cert preexisted, use it")
else:
url = http_url()
@ -1893,25 +1896,26 @@ def get_ca_cert(fstore, options, server, basedn):
raise errors.NoCertificateError(message=u"HTTP certificate"
" download declined by user")
try:
ca_cert = get_ca_cert_from_http(url, override)
ca_certs = get_ca_certs_from_http(url, override)
except Exception, e:
root_logger.debug(e)
raise errors.NoCertificateError(entry=url)
validate_new_ca_cert(existing_ca_cert, ca_cert, False, override)
validate_new_ca_certs(existing_ca_certs, ca_certs, False,
override)
else:
# Auth with user credentials
try:
url = ldap_url()
ca_cert = get_ca_cert_from_ldap(server, basedn)
validate_new_ca_cert(existing_ca_cert, ca_cert, interactive)
ca_certs = get_ca_certs_from_ldap(server, basedn)
validate_new_ca_certs(existing_ca_certs, ca_certs, interactive)
except errors.FileError, e:
root_logger.debug(e)
raise
except (errors.NoCertificateError, errors.LDAPError), e:
root_logger.debug(str(e))
url = http_url()
if existing_ca_cert:
if existing_ca_certs:
root_logger.warning(
"Unable to download CA cert from LDAP\n"
"but found preexisting cert, using it.\n")
@ -1930,22 +1934,24 @@ def get_ca_cert(fstore, options, server, basedn):
"certificate download requires --force")
else:
try:
ca_cert = get_ca_cert_from_http(url)
ca_certs = get_ca_certs_from_http(url)
except Exception, e:
root_logger.debug(e)
raise errors.NoCertificateError(entry=url)
validate_new_ca_cert(existing_ca_cert, ca_cert, interactive)
validate_new_ca_certs(existing_ca_certs, ca_certs,
interactive)
except Exception, e:
root_logger.debug(str(e))
raise errors.NoCertificateError(entry=url)
if ca_cert is None and existing_ca_cert is None:
if ca_certs is None and existing_ca_certs is None:
raise errors.InternalError(u"expected CA cert file '%s' to "
u"exist, but it's absent" % (ca_file))
if ca_cert is not None:
if ca_certs is not None:
try:
x509.write_certificate(ca_cert.der_data, ca_file)
ca_certs = [cert.der_data for cert in ca_certs]
x509.write_certificate_list(ca_certs, ca_file)
except Exception, e:
if os.path.exists(ca_file):
try:
@ -2445,7 +2451,7 @@ def install(options, env, fstore, statestore):
# Get the CA certificate
try:
os.environ['KRB5_CONFIG'] = env['KRB5_CONFIG']
get_ca_cert(fstore, options, cli_server[0], cli_basedn)
get_ca_certs(fstore, options, cli_server[0], cli_basedn)
del os.environ['KRB5_CONFIG']
except errors.FileError, e:
root_logger.error(e)