Use certmonger D-Bus API instead of messing with its files.

FreeIPA certmonger module changed to use D-Bus to communicate with certmonger.
Using the D-Bus API should be more stable and supported way of using cermonger than
tampering with its files.

>=certmonger-0.75.13 is needed for this to work.

https://fedorahosted.org/freeipa/ticket/4280

Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
David Kupka 2014-09-04 15:02:34 +02:00 committed by Martin Kosek
parent 9e8aed8e53
commit 78b2a7abbb
8 changed files with 289 additions and 303 deletions

View File

@ -122,7 +122,7 @@ Requires: python-dns
Requires: zip
Requires: policycoreutils >= %{POLICYCOREUTILSVER}
Requires: tar
Requires(pre): certmonger >= 0.75.6
Requires(pre): certmonger >= 0.75.13
Requires(pre): 389-ds-base >= 1.3.2.20
Requires: fontawesome-fonts
Requires: open-sans-fonts

View File

@ -692,24 +692,24 @@ def certificate_renewal_update(ca):
# State not set, lets see if we are already configured
for request in requests:
nss_dir, nickname, ca_name, pre_command, post_command, profile = request
criteria = (
('cert_storage_location', nss_dir, certmonger.NPATH),
('cert_nickname', nickname, None),
('ca_name', ca_name, None),
('template_profile', profile, None),
)
criteria = {
'cert-database': nss_dir,
'cert-nickname': nickname,
'ca-name': ca_name,
'template-profile': profile,
}
request_id = certmonger.get_request_id(criteria)
if request_id is None:
break
val = certmonger.get_request_value(request_id, 'pre_certsave_command')
val = certmonger.get_request_value(request_id, 'cert-presave-command')
if val is not None:
val = val.split(' ', 1)[0]
val = os.path.basename(val)
if pre_command != val:
break
val = certmonger.get_request_value(request_id, 'post_certsave_command')
val = certmonger.get_request_value(request_id, 'post-certsave-command')
if val is not None:
val = val.split(' ', 1)[0]
val = os.path.basename(val)

View File

@ -122,11 +122,10 @@ class CertUpdate(admintool.AdminTool):
dogtag_constants = dogtag.configured_constants()
nickname = 'caSigningCert cert-pki-ca'
criteria = (
('cert_storage_location', dogtag_constants.ALIAS_DIR,
certmonger.NPATH),
('cert_nickname', nickname, None),
)
criteria = {
'cert-database': dogtag_constants.ALIAS_DIR,
'cert-nickname': nickname,
}
request_id = certmonger.get_request_id(criteria)
if request_id is not None:
timeout = api.env.startup_timeout + 60

View File

@ -1,4 +1,5 @@
# Authors: Rob Crittenden <rcritten@redhat.com>
# David Kupka <dkupka@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
@ -23,137 +24,202 @@
import os
import sys
import re
import time
import dbus
from ipapython import ipautil
from ipapython import dogtag
from ipapython.ipa_log_manager import *
from ipaplatform.paths import paths
from ipaplatform import services
REQUEST_DIR=paths.CERTMONGER_REQUESTS_DIR
CA_DIR=paths.CERTMONGER_CAS_DIR
REQUEST_DIR = paths.CERTMONGER_REQUESTS_DIR
CA_DIR = paths.CERTMONGER_CAS_DIR
# Normalizer types for critera in get_request_id()
NPATH = 1
DBUS_CM_PATH = '/org/fedorahosted/certmonger'
DBUS_CM_IF = 'org.fedorahosted.certmonger'
DBUS_CM_REQUEST_IF = 'org.fedorahosted.certmonger.request'
DBUS_CM_CA_IF = 'org.fedorahosted.certmonger.ca'
DBUS_PROPERTY_IF = 'org.freedesktop.DBus.Properties'
def find_request_value(filename, directive):
class _cm_dbus_object(object):
"""
Return a value from a certmonger request file for the requested directive
It tries to do this a number of times because sometimes there is a delay
when ipa-getcert returns and the file is fully updated, particularly
when doing a request. Generating a CSR is fast but not instantaneous.
Auxiliary class for convenient DBus object handling.
"""
tries = 1
value = None
found = False
while value is None and tries <= 5:
tries=tries + 1
time.sleep(1)
fp = open(filename, 'r')
lines = fp.readlines()
fp.close()
def __init__(self, bus, object_path, object_dbus_interface,
parent_dbus_interface=None, property_interface=False):
"""
bus - DBus bus object, result of dbus.SystemBus() or dbus.SessionBus()
Object is accesible over this DBus bus instance.
object_path - path to requested object on DBus bus
object_dbus_interface
parent_dbus_interface
property_interface - create DBus property interface? True or False
"""
if bus is None or object_path is None or object_dbus_interface is None:
raise RuntimeError(
"bus, object_path and dbus_interface must not be None.")
if parent_dbus_interface is None:
parent_dbus_interface = object_dbus_interface
self.bus = bus
self.path = object_path
self.obj_dbus_if = object_dbus_interface
self.parent_dbus_if = parent_dbus_interface
self.obj = bus.get_object(parent_dbus_interface, object_path)
self.obj_if = dbus.Interface(self.obj, object_dbus_interface)
if property_interface:
self.prop_if = dbus.Interface(self.obj, DBUS_PROPERTY_IF)
for line in lines:
if found:
# A value can span multiple lines. If it does then it has a
# leading space.
if not line.startswith(' '):
# We hit the next directive, return now
return value
else:
value = value + line[1:]
def _start_certmonger():
"""
Start certmonger daemon. If it's already running systemctl just ignores
the command.
"""
if not services.knownservices.certmonger.is_running():
try:
services.knownservices.certmonger.start()
except Exception, e:
root_logger.error('Failed to start certmonger: %s' % e)
raise
def _connect_to_certmonger():
"""
Start certmonger daemon and connect to it via DBus.
"""
try:
_start_certmonger()
except (KeyboardInterrupt, OSError), e:
root_logger.error('Failed to start certmonger: %s' % e)
raise
try:
bus = dbus.SystemBus()
cm = _cm_dbus_object(bus, DBUS_CM_PATH, DBUS_CM_IF)
except dbus.DBusException, e:
root_logger.error("Failed to access certmonger over DBus: %s", e)
raise
return cm
def _get_requests(criteria=dict()):
"""
Get all requests that matches the provided criteria.
"""
if not isinstance(criteria, dict):
raise TypeError('"criteria" must be dict.')
cm = _connect_to_certmonger()
requests = []
requests_paths = []
if 'nickname' in criteria:
request_path = cm.obj_if.find_request_by_nickname(criteria['nickname'])
if request_path:
requests_paths = [request_path]
else:
requests_paths = cm.obj_if.get_requests()
for request_path in requests_paths:
request = _cm_dbus_object(cm.bus, request_path, DBUS_CM_REQUEST_IF,
DBUS_CM_IF, True)
for criterion in criteria:
if criterion == 'ca-name':
ca_path = request.obj_if.get_ca()
ca = _cm_dbus_object(cm.bus, ca_path, DBUS_CM_CA_IF,
DBUS_CM_IF)
value = ca.obj_if.get_nickname()
else:
if line.startswith(directive + '='):
found = True
value = line[len(directive)+1:]
value = request.prop_if.Get(DBUS_CM_REQUEST_IF, criterion)
if value != criteria[criterion]:
break
else:
requests.append(request)
return requests
def _get_request(criteria):
"""
Find request that matches criteria.
If 'nickname' is specified other criteria are ignored because 'nickname'
uniquely identify single request.
When multiple or none request matches specified criteria RuntimeError is
raised.
"""
requests = _get_requests(criteria)
if len(requests) == 0:
return None
elif len(requests) == 1:
return requests[0]
else:
raise RuntimeError("Criteria expected to be met by 1 request, got %s."
% len(requests))
return value
def get_request_value(request_id, directive):
"""
There is no guarantee that the request_id will match the filename
in the certmonger requests directory, so open each one to find the
request_id.
Get property of request.
"""
fileList=os.listdir(REQUEST_DIR)
for file in fileList:
value = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id')
if value is not None and value.rstrip() == request_id:
return find_request_value('%s/%s' % (REQUEST_DIR, file), directive)
try:
request = _get_request(dict(nickname=request_id))
except RuntimeError, e:
root_logger.error('Failed to get request: %s' % e)
raise
if request:
return request.prop_if.Get(DBUS_CM_REQUEST_IF, directive)
else:
return None
return None
def get_request_id(criteria):
"""
If you don't know the certmonger request_id then try to find it by looking
through all the request files. An alternative would be to parse the
ipa-getcert list output but this seems cleaner.
through all the requests.
criteria is a tuple of key/value/type to search for. The more specific
criteria is a tuple of key/value to search for. The more specific
the better. An error is raised if multiple request_ids are returned for
the same criteria.
None is returned if none of the criteria match.
"""
assert type(criteria) is tuple
try:
request = _get_request(criteria)
except RuntimeError, e:
root_logger.error('Failed to get request: %s' % e)
raise
if request:
return request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname')
else:
return None
reqid=None
fileList=os.listdir(REQUEST_DIR)
for file in fileList:
match = True
for (key, value, valtype) in criteria:
rv = find_request_value('%s/%s' % (REQUEST_DIR, file), key)
if rv and valtype == NPATH:
rv = os.path.abspath(rv)
if rv is None or rv.rstrip() != value:
match = False
break
if match and reqid is not None:
raise RuntimeError('multiple certmonger requests match the criteria')
if match:
reqid = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id').rstrip()
return reqid
def get_requests_for_dir(dir):
"""
Return a list containing the request ids for a given NSS database
directory.
"""
reqid=[]
fileList=os.listdir(REQUEST_DIR)
for file in fileList:
rv = find_request_value(os.path.join(REQUEST_DIR, file),
'cert_storage_location')
if rv is None:
continue
rv = os.path.abspath(rv).rstrip()
if rv != dir:
continue
id = find_request_value(os.path.join(REQUEST_DIR, file), 'id')
if id is not None:
reqid.append(id.rstrip())
reqid = []
criteria = {'cert-storage': 'NSSDB', 'key-storage': 'NSSDB',
'cert-database': dir, 'key-database': dir, }
requests = _get_requests(criteria)
for request in requests:
reqid.append(request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname'))
return reqid
def add_request_value(request_id, directive, value):
"""
Add a new directive to a certmonger request file.
The certmonger service MUST be stopped in order for this to work.
"""
fileList=os.listdir(REQUEST_DIR)
for file in fileList:
id = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id')
if id is not None and id.rstrip() == request_id:
current_value = find_request_value('%s/%s' % (REQUEST_DIR, file), directive)
if not current_value:
fp = open('%s/%s' % (REQUEST_DIR, file), 'a')
fp.write('%s=%s\n' % (directive, value))
fp.close()
try:
request = _get_request({'nickname': request_id})
except RuntimeError, e:
root_logger.error('Failed to get request: %s' % e)
raise
if request:
request.obj_if.modify({directive: value})
return
def add_principal(request_id, principal):
"""
@ -162,7 +228,8 @@ def add_principal(request_id, principal):
When an existing certificate is added via start-tracking it won't have
a principal.
"""
return add_request_value(request_id, 'template_principal', principal)
add_request_value(request_id, 'template-principal', [principal])
def add_subject(request_id, subject):
"""
@ -172,47 +239,29 @@ def add_subject(request_id, subject):
When an existing certificate is added via start-tracking it won't have
a subject_template set.
"""
return add_request_value(request_id, 'template_subject', subject)
add_request_value(request_id, 'template-subject', subject)
def request_cert(nssdb, nickname, subject, principal, passwd_fname=None):
"""
Execute certmonger to request a server certificate
Execute certmonger to request a server certificate.
"""
args = [paths.IPA_GETCERT,
'request',
'-d', nssdb,
'-n', nickname,
'-N', subject,
'-K', principal,
]
cm = _connect_to_certmonger()
request_parameters = dict(KEY_STORAGE='NSSDB', CERT_STORAGE='NSSDB',
CERT_LOCATION=nssdb, CERT_NICKNAME=nickname,
SUBJECT=subject, PRINCIPAL=principal,)
if passwd_fname:
args.append('-p')
args.append(os.path.abspath(passwd_fname))
(stdout, stderr, returncode) = ipautil.run(args)
# FIXME: should be some error handling around this
m = re.match('New signing request "(\d+)" added', stdout)
request_id = m.group(1)
return request_id
request_parameters['KEY_PIN_FILE'] = passwd_fname
result = cm.obj_if.add_request(request_parameters)
try:
if result[0]:
request = _cm_dbus_object(cm.bus, result[1], DBUS_CM_REQUEST_IF,
DBUS_CM_IF, True)
except TypeError:
root_logger.error('Failed to get create new request.')
raise
return request.obj_if.get_nickname()
def cert_exists(nickname, secdir):
"""
See if a nickname exists in an NSS database.
Returns True/False
This isn't very sophisticated in that it doesn't differentiate between
a database that doesn't exist and a nickname that doesn't exist within
the database.
"""
args = [paths.CERTUTIL, "-L",
"-d", os.path.abspath(secdir),
"-n", nickname
]
(stdout, stderr, rc) = ipautil.run(args, raiseonerr=False)
if rc == 0:
return True
else:
return False
def start_tracking(nickname, secdir, password_file=None, command=None):
"""
@ -223,75 +272,69 @@ def start_tracking(nickname, secdir, password_file=None, command=None):
certmonger to run when it renews a certificate. This command must
reside in /usr/lib/ipa/certmonger to work with SELinux.
Returns the stdout, stderr and returncode from running ipa-getcert
This assumes that certmonger is already running.
Returns True or False
"""
if not cert_exists(nickname, os.path.abspath(secdir)):
raise RuntimeError('Nickname "%s" doesn\'t exist in NSS database "%s"' % (nickname, secdir))
args = [paths.IPA_GETCERT, "start-tracking",
"-d", os.path.abspath(secdir),
"-n", nickname]
if password_file:
args.append("-p")
args.append(os.path.abspath(password_file))
cm = _connect_to_certmonger()
params = {'TRACK': True}
params['cert-nickname'] = nickname
params['cert-database'] = os.path.abspath(secdir)
params['cert-storage'] = 'NSSDB'
params['key-nickname'] = nickname
params['key-database'] = os.path.abspath(secdir)
params['key-storage'] = 'NSSDB'
if command:
args.append("-C")
args.append(command)
params['cert-postsave-command'] = command
if password_file:
params['KEY_PIN_FILE'] = os.path.abspath(password_file)
result = cm.obj_if.add_request(params)
try:
if result[0]:
request = _cm_dbus_object(cm.bus, result[1], DBUS_CM_REQUEST_IF,
DBUS_CM_IF, True)
except TypeError, e:
root_logger.error('Failed to add new request.')
raise
return request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname')
(stdout, stderr, returncode) = ipautil.run(args)
return (stdout, stderr, returncode)
def stop_tracking(secdir, request_id=None, nickname=None):
"""
Stop tracking the current request using either the request_id or nickname.
This assumes that the certmonger service is running.
Returns True or False
"""
if request_id is None and nickname is None:
raise RuntimeError('Both request_id and nickname are missing.')
if nickname:
# Using the nickname find the certmonger request_id
criteria = (('cert_storage_location', os.path.abspath(secdir), NPATH),('cert_nickname', nickname, None))
try:
request_id = get_request_id(criteria)
if request_id is None:
return ('', '', 0)
except RuntimeError:
# This means that multiple requests matched, skip it for now
# Fall back to trying to stop tracking using nickname
pass
args = [paths.GETCERT,
'stop-tracking',
]
criteria = {'cert-database': secdir}
if request_id:
args.append('-i')
args.append(request_id)
else:
args.append('-n')
args.append(nickname)
args.append('-d')
args.append(os.path.abspath(secdir))
criteria['nickname'] = request_id
if nickname:
criteria['cert-nickname'] = nickname
try:
request = _get_request(criteria)
except RuntimeError, e:
root_logger.error('Failed to get request: %s' % e)
raise
if request:
cm = _connect_to_certmonger()
cm.obj_if.remove_request(request.path)
(stdout, stderr, returncode) = ipautil.run(args)
return (stdout, stderr, returncode)
def modify(request_id, profile=None):
args = [paths.GETCERT, 'start-tracking',
'-i', request_id]
if profile:
args += ['-T', profile]
return ipautil.run(args)
request = _get_request({'nickname': request_id})
if request:
request.obj_if.modify({'template-profile': profile})
def resubmit_request(request_id, profile=None):
args = [paths.IPA_GETCERT, 'resubmit',
'-i', request_id]
if profile:
args += ['-T', profile]
return ipautil.run(args)
request = _get_request({'nickname': request_id})
if request:
if profile:
request.obj_if.modify({'template-profile': profile})
request.obj_if.resubmit()
def _find_IPA_ca():
"""
@ -301,13 +344,10 @@ def _find_IPA_ca():
We can use find_request_value because the ca files have the
same file format.
"""
fileList=os.listdir(CA_DIR)
for file in fileList:
value = find_request_value('%s/%s' % (CA_DIR, file), 'id')
if value is not None and value.strip() == 'IPA':
return '%s/%s' % (CA_DIR, file)
cm = _connect_to_certmonger()
ca_path = cm.obj_if.find_ca_by_nickname('IPA')
return _cm_dbus_object(cm.bus, ca_path, DBUS_CM_CA_IF, DBUS_CM_IF, True)
return None
def add_principal_to_cas(principal):
"""
@ -317,58 +357,27 @@ def add_principal_to_cas(principal):
/usr/libexec/certmonger/ipa-submit.
We also need to restore this on uninstall.
The certmonger service MUST be stopped in order for this to work.
"""
cafile = _find_IPA_ca()
if cafile is None:
return
ca = _find_IPA_ca()
if ca:
ext_helper = ca.prop_if.Get(DBUS_CM_CA_IF, 'external-helper')
if ext_helper and ext_helper.find('-k') == -1:
ext_helper = '%s -k %s' % (ext_helper.strip(), principal)
ca.prop_if.Set(DBUS_CM_CA_IF, 'external-helper', ext_helper)
update = False
fp = open(cafile, 'r')
lines = fp.readlines()
fp.close()
for i in xrange(len(lines)):
if lines[i].startswith('ca_external_helper') and \
lines[i].find('-k') == -1:
lines[i] = '%s -k %s\n' % (lines[i].strip(), principal)
update = True
if update:
fp = open(cafile, 'w')
for line in lines:
fp.write(line)
fp.close()
def remove_principal_from_cas():
"""
Remove any -k principal options from the ipa_submit helper.
The certmonger service MUST be stopped in order for this to work.
"""
cafile = _find_IPA_ca()
if cafile is None:
return
ca = _find_IPA_ca()
if ca:
ext_helper = ca.prop_if.Get(DBUS_CM_CA_IF, 'external-helper')
if ext_helper and ext_helper.find('-k'):
ext_helper = ext_helper.strip()[0]
ca.prop_if.Set(DBUS_CM_CA_IF, 'external-helper', ext_helper)
update = False
fp = open(cafile, 'r')
lines = fp.readlines()
fp.close()
for i in xrange(len(lines)):
if lines[i].startswith('ca_external_helper') and \
lines[i].find('-k') > 0:
lines[i] = lines[i].strip().split(' ')[0] + '\n'
update = True
if update:
fp = open(cafile, 'w')
for line in lines:
fp.write(line)
fp.close()
# Routines specific to renewing dogtag CA certificates
def get_pin(token, dogtag_constants=None):
"""
Dogtag stores its NSS pin in a file formatted as token:PIN.
@ -384,6 +393,7 @@ def get_pin(token, dogtag_constants=None):
return pin.strip()
return None
def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command,
post_command, profile=None):
"""
@ -398,52 +408,46 @@ def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command,
post_command is the script to execute after a renewal is done.
Both commands can be None.
Returns the stdout, stderr and returncode from running ipa-getcert
This assumes that certmonger is already running.
"""
if not cert_exists(nickname, os.path.abspath(secdir)):
raise RuntimeError('Nickname "%s" doesn\'t exist in NSS database "%s"' % (nickname, secdir))
args = [paths.GETCERT, "start-tracking",
"-d", os.path.abspath(secdir),
"-n", nickname,
"-c", ca,
]
cm = _connect_to_certmonger()
certmonger_cmd_template = paths.CERTMONGER_COMMAND_TEMPLATE
if pre_command is not None:
params = {'TRACK': True}
params['cert-nickname'] = nickname
params['cert-database'] = os.path.abspath(secdir)
params['cert-storage'] = 'NSSDB'
params['key-nickname'] = nickname
params['key-database'] = os.path.abspath(secdir)
params['key-storage'] = 'NSSDB'
ca_path = cm.obj_if.find_ca_by_nickname(ca)
if ca_path:
params['ca'] = ca_path
if pin:
params['KEY_PIN'] = pin
if pinfile:
params['KEY_PIN_FILE'] = os.path.abspath(pinfile)
if pre_command:
if not os.path.isabs(pre_command):
if sys.maxsize > 2**32L:
libpath = 'lib64'
else:
libpath = 'lib'
pre_command = paths.CERTMONGER_COMMAND_TEMPLATE % (libpath, pre_command)
args.append("-B")
args.append(pre_command)
if post_command is not None:
pre_command = certmonger_cmd_template % (libpath, pre_command)
params['cert-presave-command'] = pre_command
if post_command:
if not os.path.isabs(post_command):
if sys.maxsize > 2**32L:
libpath = 'lib64'
else:
libpath = 'lib'
post_command = paths.CERTMONGER_COMMAND_TEMPLATE % (libpath, post_command)
args.append("-C")
args.append(post_command)
if pinfile:
args.append("-p")
args.append(pinfile)
else:
args.append("-P")
args.append(pin)
post_command = certmonger_cmd_template % (libpath, post_command)
params['cert-postsave-command'] = post_command
if profile:
args.append("-T")
args.append(profile)
params['ca-profile'] = profile
cm.obj_if.add_request(params)
(stdout, stderr, returncode) = ipautil.run(args, nolog=[pin])
def check_state(dirs):
"""
@ -461,6 +465,7 @@ def check_state(dirs):
return reqids
def wait_for_request(request_id, timeout=120):
for i in range(0, timeout, 5):
state = get_request_value(request_id, 'state').strip()
@ -475,7 +480,9 @@ def wait_for_request(request_id, timeout=120):
return state
if __name__ == '__main__':
request_id = request_cert(paths.HTTPD_ALIAS_DIR, "Test", "cn=tiger.example.com,O=IPA", "HTTP/tiger.example.com@EXAMPLE.COM")
request_id = request_cert(paths.HTTPD_ALIAS_DIR, "Test",
"cn=tiger.example.com,O=IPA",
"HTTP/tiger.example.com@EXAMPLE.COM")
csr = get_request_value(request_id, 'csr')
print csr
stop_tracking(request_id)

View File

@ -319,13 +319,13 @@ def stop_tracking_certificates(dogtag_constants):
try:
certmonger.stop_tracking(
dogtag_constants.ALIAS_DIR, nickname=nickname)
except (ipautil.CalledProcessError, RuntimeError), e:
except RuntimeError, e:
root_logger.error(
"certmonger failed to stop tracking certificate: %s" % str(e))
try:
certmonger.stop_tracking(paths.HTTPD_ALIAS_DIR, nickname='ipaCert')
except (ipautil.CalledProcessError, RuntimeError), e:
except RuntimeError, e:
root_logger.error(
"certmonger failed to stop tracking certificate: %s" % str(e))
cmonger.stop()
@ -1459,7 +1459,7 @@ class CAInstance(service.Service):
secdir=paths.HTTPD_ALIAS_DIR,
pre_command=None,
post_command='renew_ra_cert')
except (ipautil.CalledProcessError, RuntimeError), e:
except RuntimeError, e:
root_logger.error(
"certmonger failed to start tracking certificate: %s" % e)
@ -1492,7 +1492,7 @@ class CAInstance(service.Service):
pre_command='stop_pkicad',
post_command='renew_ca_cert "%s"' % nickname,
profile=profile)
except (ipautil.CalledProcessError, RuntimeError), e:
except RuntimeError, e:
root_logger.error(
"certmonger failed to start tracking certificate: %s" % e)
@ -1512,7 +1512,7 @@ class CAInstance(service.Service):
secdir=self.dogtag_constants.ALIAS_DIR,
pre_command=None,
post_command=None)
except (ipautil.CalledProcessError, RuntimeError), e:
except RuntimeError, e:
root_logger.error(
"certmonger failed to start tracking certificate: %s" % e)

View File

@ -547,46 +547,26 @@ class CertDB(object):
else:
libpath = 'lib'
command = paths.CERTMONGER_COMMAND_TEMPLATE % (libpath, command)
cmonger = services.knownservices.certmonger
cmonger.enable()
services.knownservices.messagebus.start()
cmonger.start()
try:
(stdout, stderr, rc) = certmonger.start_tracking(nickname, self.secdir, password_file, command)
except (ipautil.CalledProcessError, RuntimeError), e:
request_id = certmonger.start_tracking(nickname, self.secdir, password_file, command)
except RuntimeError, e:
root_logger.error("certmonger failed starting to track certificate: %s" % str(e))
return
cmonger.stop()
cert = self.get_cert_from_db(nickname)
nsscert = x509.load_certificate(cert, dbdir=self.secdir)
subject = str(nsscert.subject)
m = re.match('New tracking request "(\d+)" added', stdout)
if not m:
root_logger.error('Didn\'t get new %s request, got %s' % (cmonger.service_name, stdout))
raise RuntimeError('%s did not issue new tracking request for \'%s\' in \'%s\'. Use \'ipa-getcert list\' to list existing certificates.' % (cmonger.service_name, nickname, self.secdir))
request_id = m.group(1)
certmonger.add_principal(request_id, principal)
certmonger.add_subject(request_id, subject)
cmonger.start()
def untrack_server_cert(self, nickname):
"""
Tell certmonger to stop tracking the given certificate nickname.
"""
# Always start certmonger. We can't untrack something if it isn't
# running
cmonger = services.knownservices.certmonger
services.knownservices.messagebus.start()
cmonger.start()
try:
certmonger.stop_tracking(self.secdir, nickname=nickname)
except (ipautil.CalledProcessError, RuntimeError), e:
except RuntimeError, e:
root_logger.error("certmonger failed to stop tracking certificate: %s" % str(e))
cmonger.stop()
def create_server_cert(self, nickname, hostname, other_certdb=None, subject=None):
"""

View File

@ -153,8 +153,8 @@ class CACertManage(admintool.AdminTool):
raise admintool.ScriptError("CA is not configured on this system")
nss_dir = ca.dogtag_constants.ALIAS_DIR
criteria = (('cert_storage_location', nss_dir, certmonger.NPATH),
('cert_nickname', self.cert_nickname, None))
criteria = {'cert-database': nss_dir,
'cert-nickname': self.cert_nickname}
self.request_id = certmonger.get_request_id(criteria)
if self.request_id is None:
raise admintool.ScriptError(

View File

@ -52,10 +52,10 @@ class update_ca_renewal_master(PostUpdate):
self.debug("found CA renewal master %s", entries[0].dn[1].value)
return (False, False, [])
criteria = (
('cert_storage_location', paths.HTTPD_ALIAS_DIR, certmonger.NPATH),
('cert_nickname', 'ipaCert', None),
)
criteria = {
'cert-database': paths.HTTPD_ALIAS_DIR,
'cert-nickname': 'ipaCert',
}
request_id = certmonger.get_request_id(criteria)
if request_id is not None:
self.debug("found certmonger request for ipaCert")