Refactor certmonger for OpenSSL certificates

Currently, it was only possible to request an NSS certificate
via certmonger. Merged start_tracking methods and refactored them
to allow for OpenSSL certificates tracking.

https://fedorahosted.org/freeipa/ticket/5695

Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
Stanislav Laznicka 2017-01-27 09:34:23 +01:00 committed by Jan Cholasta
parent 595f9b64e3
commit 51a2b13729
6 changed files with 93 additions and 97 deletions

View File

@ -300,7 +300,7 @@ def add_subject(request_id, subject):
def request_and_wait_for_cert(
certpath, nickname, subject, principal, passwd_fname=None,
certpath, subject, principal, nickname=None, passwd_fname=None,
dns=None, ca='IPA', profile=None,
pre_command=None, post_command=None, storage='NSSDB'):
"""
@ -308,7 +308,7 @@ def request_and_wait_for_cert(
The method also waits for the certificate to be available.
"""
reqId = request_cert(certpath, nickname, subject, principal,
reqId = request_cert(certpath, subject, principal, nickname,
passwd_fname, dns, ca, profile,
pre_command, post_command, storage)
state = wait_for_request(reqId, api.env.startup_timeout)
@ -319,7 +319,7 @@ def request_and_wait_for_cert(
def request_cert(
certpath, nickname, subject, principal, passwd_fname=None,
certpath, subject, principal, nickname=None, passwd_fname=None,
dns=None, ca='IPA', profile=None,
pre_command=None, post_command=None, storage='NSSDB'):
"""
@ -343,9 +343,11 @@ def request_cert(
if not ca_path:
raise RuntimeError('{} CA not found'.format(ca))
request_parameters = dict(KEY_STORAGE=storage, CERT_STORAGE=storage,
CERT_LOCATION=certfile, CERT_NICKNAME=nickname,
KEY_LOCATION=keyfile, KEY_NICKNAME=nickname,
CERT_LOCATION=certfile, KEY_LOCATION=keyfile,
SUBJECT=subject, CA=ca_path)
if nickname:
request_parameters["CERT_NICKNAME"] = nickname
request_parameters["KEY_NICKNAME"] = nickname
if principal:
request_parameters['PRINCIPAL'] = [principal]
if dns is not None and len(dns) > 0:
@ -379,33 +381,82 @@ def request_cert(
return request.obj_if.get_nickname()
def start_tracking(nickname, secdir, password_file=None, command=None):
def start_tracking(
certpath, ca='IPA', nickname=None, pin=None, pinfile=None,
pre_command=None, post_command=None, profile=None, storage="NSSDB"):
"""
Tell certmonger to track the given certificate nickname in NSS
database in secdir protected by optional password file password_file.
Tell certmonger to track the given certificate in either a file or an NSS
database. The certificate access can be protected by a password_file.
command is an optional parameter which specifies a command for
certmonger to run when it renews a certificate. This command must
reside in /usr/lib/ipa/certmonger to work with SELinux.
This uses the generic certmonger command getcert so we can specify
a different helper.
Returns certificate nickname.
:param certpath:
The path to an NSS database or a tuple (PEM certificate, private key).
:param ca:
Nickanme of the CA for which the given certificate should be tracked.
:param nickname:
Nickname of the NSS certificate in ``certpath`` to be tracked.
:param pin:
The passphrase for either NSS database containing ``nickname`` or
for the encrypted key in the ``certpath`` tuple.
:param pinfile:
Similar to ``pin`` parameter except this is a path to a file containing
the required passphrase.
:param pre_command:
Specifies a command for certmonger to run before it renews a
certificate. This command must reside in /usr/lib/ipa/certmonger
to work with SELinux.
:param post_command:
Specifies a command for certmonger to run after it has renewed a
certificate. This command must reside in /usr/lib/ipa/certmonger
to work with SELinux.
:param storage:
One of "NSSDB" or "FILE", describes whether certmonger should use
NSS or OpenSSL backend to track the certificate in ``certpath``
:param profile:
Which certificate profile should be used.
:returns: certificate tracking nickname.
"""
if storage == 'FILE':
certfile, keyfile = certpath
else:
certfile = certpath
keyfile = certpath
cm = _certmonger()
params = {'TRACK': True}
params['cert-nickname'] = nickname
params['cert-database'] = os.path.abspath(secdir)
params['cert-storage'] = 'NSSDB'
params['key-nickname'] = nickname
params['key-database'] = os.path.abspath(secdir)
params['key-storage'] = 'NSSDB'
ca_path = cm.obj_if.find_ca_by_nickname('IPA')
certmonger_cmd_template = paths.CERTMONGER_COMMAND_TEMPLATE
ca_path = cm.obj_if.find_ca_by_nickname(ca)
if not ca_path:
raise RuntimeError('IPA CA not found')
params['ca'] = ca_path
if command:
params['cert-postsave-command'] = command
if password_file:
params['KEY_PIN_FILE'] = os.path.abspath(password_file)
raise RuntimeError('{} CA not found'.format(ca))
params = {
'TRACK': True,
'CERT_STORAGE': storage,
'KEY_STORAGE': storage,
'CERT_LOCATION': certfile,
'KEY_LOCATION': keyfile,
'CA': ca_path
}
if nickname:
params['CERT_NICKNAME'] = nickname
params['KEY_NICKNAME'] = nickname
if pin:
params['KEY_PIN'] = pin
if pinfile:
params['KEY_PIN_FILE'] = os.path.abspath(pinfile)
if pre_command:
if not os.path.isabs(pre_command):
pre_command = certmonger_cmd_template % (pre_command)
params['cert-presave-command'] = pre_command
if post_command:
if not os.path.isabs(post_command):
post_command = certmonger_cmd_template % (post_command)
params['cert-postsave-command'] = post_command
if profile:
params['ca-profile'] = profile
result = cm.obj_if.add_request(params)
try:
if result[0]:
@ -545,53 +596,6 @@ def get_pin(token):
return None
def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command,
post_command, profile=None):
"""
Tell certmonger to start tracking a dogtag CA certificate. These
are handled differently because their renewal must be done directly
and not through IPA.
This uses the generic certmonger command getcert so we can specify
a different helper.
pre_command is the script to execute before a renewal is done.
post_command is the script to execute after a renewal is done.
Both commands can be None.
"""
cm = _certmonger()
certmonger_cmd_template = paths.CERTMONGER_COMMAND_TEMPLATE
params = {'TRACK': True}
params['cert-nickname'] = nickname
params['cert-database'] = os.path.abspath(secdir)
params['cert-storage'] = 'NSSDB'
params['key-nickname'] = nickname
params['key-database'] = os.path.abspath(secdir)
params['key-storage'] = 'NSSDB'
ca_path = cm.obj_if.find_ca_by_nickname(ca)
if ca_path:
params['ca'] = ca_path
if pin:
params['KEY_PIN'] = pin
if pinfile:
params['KEY_PIN_FILE'] = os.path.abspath(pinfile)
if pre_command:
if not os.path.isabs(pre_command):
pre_command = certmonger_cmd_template % (pre_command)
params['cert-presave-command'] = pre_command
if post_command:
if not os.path.isabs(post_command):
post_command = certmonger_cmd_template % (post_command)
params['cert-postsave-command'] = post_command
if profile:
params['ca-profile'] = profile
cm.obj_if.add_request(params)
def check_state(dirs):
"""
Given a set of directories and nicknames verify that we are no longer
@ -623,9 +627,9 @@ def wait_for_request(request_id, timeout=120):
return state
if __name__ == '__main__':
request_id = request_cert(paths.HTTPD_ALIAS_DIR, "Test",
request_id = request_cert(paths.HTTPD_ALIAS_DIR,
"cn=tiger.example.com,O=IPA",
"HTTP/tiger.example.com@EXAMPLE.COM")
"HTTP/tiger.example.com@EXAMPLE.COM", "Test")
csr = get_request_value(request_id, 'csr')
print(csr)
stop_tracking(request_id)

View File

@ -1012,12 +1012,11 @@ class CAInstance(DogtagInstance):
def configure_agent_renewal(self):
try:
certmonger.dogtag_start_tracking(
certmonger.start_tracking(
certpath=self.ra_agent_db,
ca='dogtag-ipa-ca-renew-agent',
nickname='ipaCert',
pin=None,
pinfile=self.ra_agent_pwd,
secdir=self.ra_agent_db,
pre_command='renew_ra_cert_pre',
post_command='renew_ra_cert')
except RuntimeError as e:
@ -1813,10 +1812,9 @@ def add_lightweight_ca_tracking_requests(logger, lwcas):
request_id = certmonger.get_request_id(criteria)
if request_id is None:
try:
certmonger.dogtag_start_tracking(
secdir=paths.PKI_TOMCAT_ALIAS_DIR,
certmonger.start_tracking(
certpath=paths.PKI_TOMCAT_ALIAS_DIR,
pin=certmonger.get_pin('internal'),
pinfile=None,
nickname=nickname,
ca=ipalib.constants.RENEWAL_CA_NAME,
pre_command='stop_pkicad',

View File

@ -324,14 +324,11 @@ class CertDB(object):
def track_server_cert(self, nickname, principal, password_file=None, command=None):
"""
Tell certmonger to track the given certificate nickname.
If command is not a full path then it is prefixed with
/usr/lib[64]/ipa/certmonger.
"""
if command is not None and not os.path.isabs(command):
command = paths.CERTMONGER_COMMAND_TEMPLATE % (command)
try:
request_id = certmonger.start_tracking(nickname, self.secdir, password_file, command)
request_id = certmonger.start_tracking(
self.secdir, nickname=nickname, pinfile=password_file,
post_command=command)
except RuntimeError as e:
root_logger.error("certmonger failed starting to track certificate: %s" % str(e))
return

View File

@ -307,12 +307,11 @@ class DogtagInstance(service.Service):
for nickname, profile in self.tracking_reqs:
try:
certmonger.dogtag_start_tracking(
certmonger.start_tracking(
certpath=self.nss_db,
ca='dogtag-ipa-ca-renew-agent',
nickname=nickname,
pin=pin,
pinfile=None,
secdir=self.nss_db,
pre_command='stop_pkicad',
post_command='renew_ca_cert "%s"' % nickname,
profile=profile)
@ -328,12 +327,11 @@ class DogtagInstance(service.Service):
"""
pin = self.__get_pin()
try:
certmonger.dogtag_start_tracking(
certmonger.start_tracking(
certpath=self.nss_db,
ca='dogtag-ipa-ca-renew-agent',
nickname=self.server_cert_name,
pin=pin,
pinfile=None,
secdir=self.nss_db,
pre_command='stop_pkicad',
post_command='renew_ca_cert "%s"' % self.server_cert_name)
except RuntimeError as e:

View File

@ -358,8 +358,7 @@ class KrbInstance(service.Service):
krbtgt = "krbtgt/" + self.realm + "@" + self.realm
certpath = (paths.KDC_CERT, paths.KDC_KEY)
try:
reqid = certmonger.request_cert(certpath, u'KDC-Cert',
subject, krbtgt,
reqid = certmonger.request_cert(certpath, subject, krbtgt,
dns=self.fqdn, storage='FILE',
profile='KDCs_PKINIT_Certs')
except dbus.DBusException as e:

View File

@ -66,9 +66,9 @@ class update_ra_cert_store(Updater):
certmonger.stop_tracking(secdir=olddb.secdir,
nickname='ipaCert')
certmonger.start_tracking(secdir=newdb.secdir,
certmonger.start_tracking(certpath=newdb.secdir,
nickname='ipaCert',
password_file=newdb.pwd_file)
pinfile=newdb.pwd_file)
olddb.delete_cert('ipaCert')