ipa_certupdate: avoid classmethod and staticmethod

Because classmethod and staticmethod are just fancy ways of calling
plain old functions, turn the classmethods and staticmethods of
CertUpdate into plain old functions.

This improves readability by making it clear that the behaviour of
the routines cannot depend on instance or class variables.

Part of: https://pagure.io/freeipa/issue/6577

Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Fraser Tweedale
2017-12-01 12:27:26 +11:00
committed by Christian Heimes
parent 97942a7c7a
commit 39fdc2d250
2 changed files with 131 additions and 133 deletions

View File

@@ -27,7 +27,7 @@ import tempfile
from ipalib.install.kinit import kinit_keytab
from ipapython import ipautil
from ipaclient.install.ipa_certupdate import CertUpdate
from ipaclient.install import ipa_certupdate
from ipaserver.install import installutils
from ipaserver.install.installutils import create_replica_config
from ipaserver.install.installutils import check_creds, ReplicaConfig
@@ -178,7 +178,7 @@ def install_replica(safe_options, options, filename):
# Run ipa-certupdate to ensure we have the CA cert. This is
# necessary if the admin has just promoted the topology from
# CA-less to CA-ful, and ipa-certupdate has not been run yet.
CertUpdate.run_with_args(api)
ipa_certupdate.run_with_args(api)
# CertUpdate restarts DS causing broken pipe on the original
# connection, so reconnect the backend.
@@ -257,7 +257,7 @@ def install_master(safe_options, options):
# Run ipa-certupdate to add the new CA certificate to
# certificate databases on this server.
logger.info("Updating certificate databases.")
CertUpdate.run_with_args(api)
ipa_certupdate.run_with_args(api)
def install(safe_options, options, filename):
options.promote = False

View File

@@ -57,149 +57,147 @@ class CertUpdate(admintool.AdminTool):
api.finalize()
api.Backend.rpcclient.connect()
self.run_with_args(api)
run_with_args(api)
api.Backend.rpcclient.disconnect()
@classmethod
def run_with_args(cls, api):
"""
Run the certupdate procedure with the given API object.
:param api: API object with ldap2/rpcclient backend connected
(such that Commands can be invoked)
def run_with_args(api):
"""
Run the certupdate procedure with the given API object.
"""
server = urlsplit(api.env.jsonrpc_uri).hostname
ldap_uri = ipaldap.get_ldap_uri(server)
ldap = ipaldap.LDAPClient(ldap_uri)
:param api: API object with ldap2/rpcclient backend connected
(such that Commands can be invoked)
"""
server = urlsplit(api.env.jsonrpc_uri).hostname
ldap_uri = ipaldap.get_ldap_uri(server)
ldap = ipaldap.LDAPClient(ldap_uri)
tmpdir = tempfile.mkdtemp(prefix="tmp-")
ccache_name = os.path.join(tmpdir, 'ccache')
old_krb5ccname = os.environ.get('KRB5CCNAME')
try:
principal = str('host/%s@%s' % (api.env.host, api.env.realm))
kinit_keytab(principal, paths.KRB5_KEYTAB, ccache_name)
os.environ['KRB5CCNAME'] = ccache_name
tmpdir = tempfile.mkdtemp(prefix="tmp-")
ccache_name = os.path.join(tmpdir, 'ccache')
old_krb5ccname = os.environ.get('KRB5CCNAME')
try:
principal = str('host/%s@%s' % (api.env.host, api.env.realm))
kinit_keytab(principal, paths.KRB5_KEYTAB, ccache_name)
os.environ['KRB5CCNAME'] = ccache_name
result = api.Command.ca_is_enabled(version=u'2.107')
ca_enabled = result['result']
except (errors.CommandError, errors.NetworkError):
result = api.Command.env(server=True, version=u'2.0')
ca_enabled = result['result']['enable_ra']
try:
result = api.Command.ca_is_enabled(version=u'2.107')
ca_enabled = result['result']
except (errors.CommandError, errors.NetworkError):
result = api.Command.env(server=True, version=u'2.0')
ca_enabled = result['result']['enable_ra']
ldap.gssapi_bind()
ldap.gssapi_bind()
certs = certstore.get_ca_certs(
ldap, api.env.basedn, api.env.realm, ca_enabled)
certs = certstore.get_ca_certs(ldap, api.env.basedn,
api.env.realm, ca_enabled)
if ca_enabled:
lwcas = api.Command.ca_find()['result']
else:
lwcas = []
if ca_enabled:
lwcas = api.Command.ca_find()['result']
else:
lwcas = []
finally:
if old_krb5ccname is None:
del os.environ['KRB5CCNAME']
else:
os.environ['KRB5CCNAME'] = old_krb5ccname
shutil.rmtree(tmpdir)
finally:
if old_krb5ccname is None:
del os.environ['KRB5CCNAME']
else:
os.environ['KRB5CCNAME'] = old_krb5ccname
shutil.rmtree(tmpdir)
server_fstore = sysrestore.FileStore(paths.SYSRESTORE)
if server_fstore.has_files():
cls.update_server(certs)
try:
# pylint: disable=import-error,ipa-forbidden-import
from ipaserver.install import cainstance
# pylint: enable=import-error,ipa-forbidden-import
cainstance.add_lightweight_ca_tracking_requests(lwcas)
except Exception:
logger.exception(
"Failed to add lightweight CA tracking requests")
cls.update_client(certs)
@classmethod
def update_client(cls, certs):
cls.update_file(paths.IPA_CA_CRT, certs)
cls.update_file(paths.KDC_CA_BUNDLE_PEM, certs)
cls.update_file(paths.CA_BUNDLE_PEM, certs)
ipa_db = certdb.NSSDatabase(api.env.nss_dir)
# Remove old IPA certs from /etc/ipa/nssdb
for nickname in ('IPA CA', 'External CA cert'):
while ipa_db.has_nickname(nickname):
try:
ipa_db.delete_cert(nickname)
except ipautil.CalledProcessError as e:
logger.error("Failed to remove %s from %s: %s",
nickname, ipa_db.secdir, e)
break
cls.update_db(ipa_db.secdir, certs)
tasks.remove_ca_certs_from_systemwide_ca_store()
tasks.insert_ca_certs_into_systemwide_ca_store(certs)
@classmethod
def update_server(cls, certs):
instance = '-'.join(api.env.realm.split('.'))
cls.update_db(
paths.ETC_DIRSRV_SLAPD_INSTANCE_TEMPLATE % instance, certs)
if services.knownservices.dirsrv.is_running():
services.knownservices.dirsrv.restart(instance)
cls.update_db(paths.HTTPD_ALIAS_DIR, certs)
if services.knownservices.httpd.is_running():
services.knownservices.httpd.restart()
criteria = {
'cert-database': paths.PKI_TOMCAT_ALIAS_DIR,
'cert-nickname': IPA_CA_NICKNAME,
'ca-name': RENEWAL_CA_NAME
}
request_id = certmonger.get_request_id(criteria)
if request_id is not None:
timeout = api.env.startup_timeout + 60
logger.debug("resubmitting certmonger request '%s'", request_id)
certmonger.resubmit_request(
request_id, ca='dogtag-ipa-ca-renew-agent-reuse', profile='')
try:
state = certmonger.wait_for_request(request_id, timeout)
except RuntimeError:
raise admintool.ScriptError(
"Resubmitting certmonger request '%s' timed out, "
"please check the request manually" % request_id)
ca_error = certmonger.get_request_value(request_id, 'ca-error')
if state != 'MONITORING' or ca_error:
raise admintool.ScriptError(
"Error resubmitting certmonger request '%s', "
"please check the request manually" % request_id)
logger.debug("modifying certmonger request '%s'", request_id)
certmonger.modify(request_id, ca='dogtag-ipa-ca-renew-agent')
cls.update_file(paths.CA_CRT, certs)
cls.update_file(paths.CACERT_PEM, certs)
@staticmethod
def update_file(filename, certs, mode=0o444):
certs = (c[0] for c in certs if c[2] is not False)
server_fstore = sysrestore.FileStore(paths.SYSRESTORE)
if server_fstore.has_files():
update_server(certs)
try:
x509.write_certificate_list(certs, filename)
except Exception as e:
logger.error("failed to update %s: %s", filename, e)
# pylint: disable=import-error,ipa-forbidden-import
from ipaserver.install import cainstance
# pylint: enable=import-error,ipa-forbidden-import
cainstance.add_lightweight_ca_tracking_requests(lwcas)
except Exception:
logger.exception(
"Failed to add lightweight CA tracking requests")
@staticmethod
def update_db(path, certs):
db = certdb.NSSDatabase(path)
for cert, nickname, trusted, eku in certs:
trust_flags = certstore.key_policy_to_trust_flags(
trusted, True, eku)
update_client(certs)
def update_client(certs):
update_file(paths.IPA_CA_CRT, certs)
update_file(paths.KDC_CA_BUNDLE_PEM, certs)
update_file(paths.CA_BUNDLE_PEM, certs)
ipa_db = certdb.NSSDatabase(api.env.nss_dir)
# Remove old IPA certs from /etc/ipa/nssdb
for nickname in ('IPA CA', 'External CA cert'):
while ipa_db.has_nickname(nickname):
try:
db.add_cert(cert, nickname, trust_flags)
ipa_db.delete_cert(nickname)
except ipautil.CalledProcessError as e:
logger.error(
"failed to update %s in %s: %s", nickname, path, e)
"Failed to remove %s from %s: %s",
nickname, ipa_db.secdir, e)
break
update_db(ipa_db.secdir, certs)
tasks.remove_ca_certs_from_systemwide_ca_store()
tasks.insert_ca_certs_into_systemwide_ca_store(certs)
def update_server(certs):
instance = '-'.join(api.env.realm.split('.'))
update_db(paths.ETC_DIRSRV_SLAPD_INSTANCE_TEMPLATE % instance, certs)
if services.knownservices.dirsrv.is_running():
services.knownservices.dirsrv.restart(instance)
update_db(paths.HTTPD_ALIAS_DIR, certs)
if services.knownservices.httpd.is_running():
services.knownservices.httpd.restart()
criteria = {
'cert-database': paths.PKI_TOMCAT_ALIAS_DIR,
'cert-nickname': IPA_CA_NICKNAME,
'ca-name': RENEWAL_CA_NAME,
}
request_id = certmonger.get_request_id(criteria)
if request_id is not None:
timeout = api.env.startup_timeout + 60
logger.debug("resubmitting certmonger request '%s'", request_id)
certmonger.resubmit_request(
request_id, ca='dogtag-ipa-ca-renew-agent-reuse', profile='')
try:
state = certmonger.wait_for_request(request_id, timeout)
except RuntimeError:
raise admintool.ScriptError(
"Resubmitting certmonger request '%s' timed out, "
"please check the request manually" % request_id)
ca_error = certmonger.get_request_value(request_id, 'ca-error')
if state != 'MONITORING' or ca_error:
raise admintool.ScriptError(
"Error resubmitting certmonger request '%s', "
"please check the request manually" % request_id)
logger.debug("modifying certmonger request '%s'", request_id)
certmonger.modify(request_id, ca='dogtag-ipa-ca-renew-agent')
update_file(paths.CA_CRT, certs)
update_file(paths.CACERT_PEM, certs)
def update_file(filename, certs, mode=0o444):
certs = (c[0] for c in certs if c[2] is not False)
try:
x509.write_certificate_list(certs, filename)
except Exception as e:
logger.error("failed to update %s: %s", filename, e)
def update_db(path, certs):
db = certdb.NSSDatabase(path)
for cert, nickname, trusted, eku in certs:
trust_flags = certstore.key_policy_to_trust_flags(trusted, True, eku)
try:
db.add_cert(cert, nickname, trust_flags)
except ipautil.CalledProcessError as e:
logger.error("failed to update %s in %s: %s", nickname, path, e)