Refactor CA certificate fetching code in ipa-client-install.

Part of https://fedorahosted.org/freeipa/ticket/3259
Part of https://fedorahosted.org/freeipa/ticket/3520

Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Jan Cholasta 2014-06-12 11:52:38 +02:00 committed by Petr Viktorin
parent 9e223e6fd4
commit 29f42cbec1

View File

@ -1698,16 +1698,9 @@ def get_ca_cert_from_file(url):
except Exception:
raise errors.NoCertificateError(entry=filename)
try:
x509.write_certificate(cert.der_data, CACERT)
except Exception, e:
raise errors.FileError(
reason=u"cannot write certificate file '%s': %s" % (CACERT, e)
)
else:
del cert
return cert
def get_ca_cert_from_http(url, ca_file, warn=True):
def get_ca_cert_from_http(url, warn=True):
'''
Use HTTP to retrieve the CA cert and write it into the CACERT file.
This is insecure and should be avoided.
@ -1722,11 +1715,18 @@ def get_ca_cert_from_http(url, ca_file, warn=True):
root_logger.debug("trying to retrieve CA cert via HTTP from %s", url)
try:
run([paths.BIN_WGET, "-O", ca_file, url])
stdout, stderr, rc = run([paths.BIN_WGET, "-O", "-", url])
except CalledProcessError, e:
raise errors.NoCertificateError(entry=url)
def get_ca_cert_from_ldap(server, basedn, ca_file):
try:
cert = x509.load_certificate(stdout)
except Exception:
raise errors.NoCertificateError(entry=url)
return cert
def get_ca_cert_from_ldap(server, basedn):
'''
Retrieve th CA cert from the LDAP server by binding to the
server with GSSAPI using the current Kerberos credentials.
@ -1775,20 +1775,14 @@ def get_ca_cert_from_ldap(server, basedn, ca_file):
raise errors.NoCertificateError(entry=ca_cert_attr)
try:
x509.write_certificate(der_cert, ca_file)
cert = x509.load_certificate(der_cert, x509.DER)
except Exception, e:
raise errors.FileError(reason =
u"cannot write certificate file '%s': %s" % (ca_file, e))
def validate_new_ca_cert(existing_ca_cert, ca_file, ask, override=False):
try:
new_ca_cert = x509.load_certificate_from_file(ca_file)
except Exception, e:
raise errors.FileError(
reason="Unable to read new ca cert '%s': %s" % (ca_file, e)
)
return cert
def validate_new_ca_cert(existing_ca_cert, new_ca_cert, ask, override=False):
if existing_ca_cert is None:
root_logger.info(
cert_summary("Successfully retrieved CA cert", new_ca_cert))
@ -1811,10 +1805,6 @@ def validate_new_ca_cert(existing_ca_cert, ca_file, ask, override=False):
else:
root_logger.debug(
"Existing CA cert and Retrieved CA cert are identical")
os.remove(ca_file)
del(existing_ca_cert)
del(new_ca_cert)
def get_ca_cert(fstore, options, server, basedn):
'''
@ -1866,11 +1856,12 @@ def get_ca_cert(fstore, options, server, basedn):
interactive = not options.unattended
otp_auth = options.principal is None and options.password is not None
existing_ca_cert = None
ca_cert = None
if options.ca_cert_file:
url = file_url()
try:
get_ca_cert_from_file(url)
ca_cert = get_ca_cert_from_file(url)
except errors.FileError, e:
root_logger.debug(e)
raise
@ -1902,28 +1893,18 @@ def get_ca_cert(fstore, options, server, basedn):
raise errors.NoCertificateError(message=u"HTTP certificate"
" download declined by user")
try:
get_ca_cert_from_http(url, ca_file, override)
ca_cert = get_ca_cert_from_http(url, override)
except Exception, e:
root_logger.debug(e)
raise errors.NoCertificateError(entry=url)
try:
validate_new_ca_cert(existing_ca_cert, ca_file,
False, override)
except Exception, e:
os.unlink(ca_file)
raise
validate_new_ca_cert(existing_ca_cert, ca_cert, False, override)
else:
# Auth with user credentials
try:
url = ldap_url()
get_ca_cert_from_ldap(server, basedn, ca_file)
try:
validate_new_ca_cert(existing_ca_cert,
ca_file, interactive)
except Exception, e:
os.unlink(ca_file)
raise
ca_cert = get_ca_cert_from_ldap(server, basedn)
validate_new_ca_cert(existing_ca_cert, ca_cert, interactive)
except errors.FileError, e:
root_logger.debug(e)
raise
@ -1949,28 +1930,33 @@ def get_ca_cert(fstore, options, server, basedn):
"certificate download requires --force")
else:
try:
get_ca_cert_from_http(url, ca_file)
ca_cert = get_ca_cert_from_http(url)
except Exception, e:
root_logger.debug(e)
raise errors.NoCertificateError(entry=url)
try:
validate_new_ca_cert(existing_ca_cert,
ca_file, interactive)
except Exception, e:
os.unlink(ca_file)
raise
validate_new_ca_cert(existing_ca_cert, ca_cert, interactive)
except Exception, e:
root_logger.debug(str(e))
raise errors.NoCertificateError(entry=url)
# We should have a cert now, move it to the canonical place
if os.path.exists(ca_file):
os.rename(ca_file, CACERT)
elif existing_ca_cert is None:
if ca_cert is None and existing_ca_cert is None:
raise errors.InternalError(u"expected CA cert file '%s' to "
u"exist, but it's absent" % (ca_file))
if ca_cert is not None:
try:
x509.write_certificate(ca_cert.der_data, ca_file)
except Exception, e:
if os.path.exists(ca_file):
try:
os.unlink(ca_file)
except OSError, e:
root_logger.error(
"Failed to remove '%s': %s", ca_file, e)
raise errors.FileError(reason =
u"cannot write certificate file '%s': %s" % (ca_file, e))
os.rename(ca_file, CACERT)
# Make sure the file permissions are correct
try: