CertUpdate: make it easy to invoke from other programs

The guts of ipa-certupdate are useful to execute as part of other
programs (e.g. as a first step of ipa-ca-install).  Refactor
ipa_certupdate.CertUpdate to make it easy to do that.  In
particular, make it possible to use an already-initialised API
object.

Part of: https://pagure.io/freeipa/issue/6577

Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Fraser Tweedale 2017-10-30 17:52:07 +11:00 committed by Christian Heimes
parent feee70d7bb
commit 93d53e5cd0

View File

@ -56,30 +56,36 @@ class CertUpdate(admintool.AdminTool):
api.bootstrap(context='cli_installer', confdir=paths.ETC_IPA)
api.finalize()
api.Backend.rpcclient.connect()
self.run_with_args(api)
api.Backend.rpcclient.disconnect()
@classmethod
def run_with_args(cls, api):
"""
Run the certupdate procedure with the given API object.
:param api: API object with ldap2/rpcclient backend connected
(such that Commands can be invoked)
"""
server = urlsplit(api.env.jsonrpc_uri).hostname
ldap_uri = ipaldap.get_ldap_uri(server)
ldap = ipaldap.LDAPClient(ldap_uri)
tmpdir = tempfile.mkdtemp(prefix="tmp-")
ccache_name = os.path.join(tmpdir, 'ccache')
old_krb5ccname = os.environ.get('KRB5CCNAME')
try:
principal = str('host/%s@%s' % (api.env.host, api.env.realm))
kinit_keytab(principal, paths.KRB5_KEYTAB, ccache_name)
os.environ['KRB5CCNAME'] = ccache_name
api.Backend.rpcclient.connect()
try:
result = api.Backend.rpcclient.forward(
'ca_is_enabled',
version=u'2.107',
)
result = api.Command.ca_is_enabled(version=u'2.107')
ca_enabled = result['result']
except (errors.CommandError, errors.NetworkError):
result = api.Backend.rpcclient.forward(
'env',
server=True,
version=u'2.0',
)
result = api.Command.env(server=True, version=u'2.0')
ca_enabled = result['result']['enable_ra']
ldap.gssapi_bind()
@ -92,13 +98,16 @@ class CertUpdate(admintool.AdminTool):
else:
lwcas = []
api.Backend.rpcclient.disconnect()
finally:
if old_krb5ccname is None:
del os.environ['KRB5CCNAME']
else:
os.environ['KRB5CCNAME'] = old_krb5ccname
shutil.rmtree(tmpdir)
server_fstore = sysrestore.FileStore(paths.SYSRESTORE)
if server_fstore.has_files():
self.update_server(certs)
cls.update_server(certs)
try:
# pylint: disable=import-error,ipa-forbidden-import
from ipaserver.install import cainstance
@ -108,12 +117,13 @@ class CertUpdate(admintool.AdminTool):
logger.exception(
"Failed to add lightweight CA tracking requests")
self.update_client(certs)
cls.update_client(certs)
def update_client(self, certs):
self.update_file(paths.IPA_CA_CRT, certs)
self.update_file(paths.KDC_CA_BUNDLE_PEM, certs)
self.update_file(paths.CA_BUNDLE_PEM, certs)
@classmethod
def update_client(cls, certs):
cls.update_file(paths.IPA_CA_CRT, certs)
cls.update_file(paths.KDC_CA_BUNDLE_PEM, certs)
cls.update_file(paths.CA_BUNDLE_PEM, certs)
ipa_db = certdb.NSSDatabase(api.env.nss_dir)
@ -127,19 +137,20 @@ class CertUpdate(admintool.AdminTool):
nickname, ipa_db.secdir, e)
break
self.update_db(ipa_db.secdir, certs)
cls.update_db(ipa_db.secdir, certs)
tasks.remove_ca_certs_from_systemwide_ca_store()
tasks.insert_ca_certs_into_systemwide_ca_store(certs)
def update_server(self, certs):
@classmethod
def update_server(cls, certs):
instance = '-'.join(api.env.realm.split('.'))
self.update_db(
cls.update_db(
paths.ETC_DIRSRV_SLAPD_INSTANCE_TEMPLATE % instance, certs)
if services.knownservices.dirsrv.is_running():
services.knownservices.dirsrv.restart(instance)
self.update_db(paths.HTTPD_ALIAS_DIR, certs)
cls.update_db(paths.HTTPD_ALIAS_DIR, certs)
if services.knownservices.httpd.is_running():
services.knownservices.httpd.restart()
@ -170,17 +181,19 @@ class CertUpdate(admintool.AdminTool):
logger.debug("modifying certmonger request '%s'", request_id)
certmonger.modify(request_id, ca='dogtag-ipa-ca-renew-agent')
self.update_file(paths.CA_CRT, certs)
self.update_file(paths.CACERT_PEM, certs)
cls.update_file(paths.CA_CRT, certs)
cls.update_file(paths.CACERT_PEM, certs)
def update_file(self, filename, certs, mode=0o444):
@staticmethod
def update_file(filename, certs, mode=0o444):
certs = (c[0] for c in certs if c[2] is not False)
try:
x509.write_certificate_list(certs, filename)
except Exception as e:
logger.error("failed to update %s: %s", filename, e)
def update_db(self, path, certs):
@staticmethod
def update_db(path, certs):
db = certdb.NSSDatabase(path)
for cert, nickname, trusted, eku in certs:
trust_flags = certstore.key_policy_to_trust_flags(