freeipa/ipapython/certmonger.py
Jan Cholasta 2109d6611b Provide additional functions to ipapython.certmonger.
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
2014-07-30 16:04:21 +02:00

482 lines
15 KiB
Python

# Authors: Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Some certmonger functions, mostly around updating the request file.
# This is used so we can add tracking to the Apache and 389-ds
# server certificates created during the IPA server installation.
import os
import sys
import re
import time
from ipapython import ipautil
from ipapython import dogtag
from ipapython.ipa_log_manager import *
from ipaplatform.paths import paths
REQUEST_DIR=paths.CERTMONGER_REQUESTS_DIR
CA_DIR=paths.CERTMONGER_CAS_DIR
# Normalizer types for critera in get_request_id()
NPATH = 1
def find_request_value(filename, directive):
"""
Return a value from a certmonger request file for the requested directive
It tries to do this a number of times because sometimes there is a delay
when ipa-getcert returns and the file is fully updated, particularly
when doing a request. Generating a CSR is fast but not instantaneous.
"""
tries = 1
value = None
found = False
while value is None and tries <= 5:
tries=tries + 1
time.sleep(1)
fp = open(filename, 'r')
lines = fp.readlines()
fp.close()
for line in lines:
if found:
# A value can span multiple lines. If it does then it has a
# leading space.
if not line.startswith(' '):
# We hit the next directive, return now
return value
else:
value = value + line[1:]
else:
if line.startswith(directive + '='):
found = True
value = line[len(directive)+1:]
return value
def get_request_value(request_id, directive):
"""
There is no guarantee that the request_id will match the filename
in the certmonger requests directory, so open each one to find the
request_id.
"""
fileList=os.listdir(REQUEST_DIR)
for file in fileList:
value = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id')
if value is not None and value.rstrip() == request_id:
return find_request_value('%s/%s' % (REQUEST_DIR, file), directive)
return None
def get_request_id(criteria):
"""
If you don't know the certmonger request_id then try to find it by looking
through all the request files. An alternative would be to parse the
ipa-getcert list output but this seems cleaner.
criteria is a tuple of key/value/type to search for. The more specific
the better. An error is raised if multiple request_ids are returned for
the same criteria.
None is returned if none of the criteria match.
"""
assert type(criteria) is tuple
reqid=None
fileList=os.listdir(REQUEST_DIR)
for file in fileList:
match = True
for (key, value, valtype) in criteria:
rv = find_request_value('%s/%s' % (REQUEST_DIR, file), key)
if rv and valtype == NPATH:
rv = os.path.abspath(rv)
if rv is None or rv.rstrip() != value:
match = False
break
if match and reqid is not None:
raise RuntimeError('multiple certmonger requests match the criteria')
if match:
reqid = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id').rstrip()
return reqid
def get_requests_for_dir(dir):
"""
Return a list containing the request ids for a given NSS database
directory.
"""
reqid=[]
fileList=os.listdir(REQUEST_DIR)
for file in fileList:
rv = find_request_value(os.path.join(REQUEST_DIR, file),
'cert_storage_location')
if rv is None:
continue
rv = os.path.abspath(rv).rstrip()
if rv != dir:
continue
id = find_request_value(os.path.join(REQUEST_DIR, file), 'id')
if id is not None:
reqid.append(id.rstrip())
return reqid
def add_request_value(request_id, directive, value):
"""
Add a new directive to a certmonger request file.
The certmonger service MUST be stopped in order for this to work.
"""
fileList=os.listdir(REQUEST_DIR)
for file in fileList:
id = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id')
if id is not None and id.rstrip() == request_id:
current_value = find_request_value('%s/%s' % (REQUEST_DIR, file), directive)
if not current_value:
fp = open('%s/%s' % (REQUEST_DIR, file), 'a')
fp.write('%s=%s\n' % (directive, value))
fp.close()
return
def add_principal(request_id, principal):
"""
In order for a certmonger request to be renewable it needs a principal.
When an existing certificate is added via start-tracking it won't have
a principal.
"""
return add_request_value(request_id, 'template_principal', principal)
def add_subject(request_id, subject):
"""
In order for a certmonger request to be renwable it needs the subject
set in the request file.
When an existing certificate is added via start-tracking it won't have
a subject_template set.
"""
return add_request_value(request_id, 'template_subject', subject)
def request_cert(nssdb, nickname, subject, principal, passwd_fname=None):
"""
Execute certmonger to request a server certificate
"""
args = [paths.IPA_GETCERT,
'request',
'-d', nssdb,
'-n', nickname,
'-N', subject,
'-K', principal,
]
if passwd_fname:
args.append('-p')
args.append(os.path.abspath(passwd_fname))
(stdout, stderr, returncode) = ipautil.run(args)
# FIXME: should be some error handling around this
m = re.match('New signing request "(\d+)" added', stdout)
request_id = m.group(1)
return request_id
def cert_exists(nickname, secdir):
"""
See if a nickname exists in an NSS database.
Returns True/False
This isn't very sophisticated in that it doesn't differentiate between
a database that doesn't exist and a nickname that doesn't exist within
the database.
"""
args = [paths.CERTUTIL, "-L",
"-d", os.path.abspath(secdir),
"-n", nickname
]
(stdout, stderr, rc) = ipautil.run(args, raiseonerr=False)
if rc == 0:
return True
else:
return False
def start_tracking(nickname, secdir, password_file=None, command=None):
"""
Tell certmonger to track the given certificate nickname in NSS
database in secdir protected by optional password file password_file.
command is an optional parameter which specifies a command for
certmonger to run when it renews a certificate. This command must
reside in /usr/lib/ipa/certmonger to work with SELinux.
Returns the stdout, stderr and returncode from running ipa-getcert
This assumes that certmonger is already running.
"""
if not cert_exists(nickname, os.path.abspath(secdir)):
raise RuntimeError('Nickname "%s" doesn\'t exist in NSS database "%s"' % (nickname, secdir))
args = [paths.IPA_GETCERT, "start-tracking",
"-d", os.path.abspath(secdir),
"-n", nickname]
if password_file:
args.append("-p")
args.append(os.path.abspath(password_file))
if command:
args.append("-C")
args.append(command)
(stdout, stderr, returncode) = ipautil.run(args)
return (stdout, stderr, returncode)
def stop_tracking(secdir, request_id=None, nickname=None):
"""
Stop tracking the current request using either the request_id or nickname.
This assumes that the certmonger service is running.
"""
if request_id is None and nickname is None:
raise RuntimeError('Both request_id and nickname are missing.')
if nickname:
# Using the nickname find the certmonger request_id
criteria = (('cert_storage_location', os.path.abspath(secdir), NPATH),('cert_nickname', nickname, None))
try:
request_id = get_request_id(criteria)
if request_id is None:
return ('', '', 0)
except RuntimeError:
# This means that multiple requests matched, skip it for now
# Fall back to trying to stop tracking using nickname
pass
args = [paths.GETCERT,
'stop-tracking',
]
if request_id:
args.append('-i')
args.append(request_id)
else:
args.append('-n')
args.append(nickname)
args.append('-d')
args.append(os.path.abspath(secdir))
(stdout, stderr, returncode) = ipautil.run(args)
return (stdout, stderr, returncode)
def modify(request_id, profile=None):
args = [paths.GETCERT, 'start-tracking',
'-i', request_id]
if profile:
args += ['-T', profile]
return ipautil.run(args)
def resubmit_request(request_id, profile=None):
args = [paths.IPA_GETCERT, 'resubmit',
'-i', request_id]
if profile:
args += ['-T', profile]
return ipautil.run(args)
def _find_IPA_ca():
"""
Look through all the certmonger CA files to find the one that
has id=IPA
We can use find_request_value because the ca files have the
same file format.
"""
fileList=os.listdir(CA_DIR)
for file in fileList:
value = find_request_value('%s/%s' % (CA_DIR, file), 'id')
if value is not None and value.strip() == 'IPA':
return '%s/%s' % (CA_DIR, file)
return None
def add_principal_to_cas(principal):
"""
If the hostname we were passed to use in ipa-client-install doesn't
match the value of gethostname() then we need to append
-k host/HOSTNAME@REALM to the ca helper defined for
/usr/libexec/certmonger/ipa-submit.
We also need to restore this on uninstall.
The certmonger service MUST be stopped in order for this to work.
"""
cafile = _find_IPA_ca()
if cafile is None:
return
update = False
fp = open(cafile, 'r')
lines = fp.readlines()
fp.close()
for i in xrange(len(lines)):
if lines[i].startswith('ca_external_helper') and \
lines[i].find('-k') == -1:
lines[i] = '%s -k %s\n' % (lines[i].strip(), principal)
update = True
if update:
fp = open(cafile, 'w')
for line in lines:
fp.write(line)
fp.close()
def remove_principal_from_cas():
"""
Remove any -k principal options from the ipa_submit helper.
The certmonger service MUST be stopped in order for this to work.
"""
cafile = _find_IPA_ca()
if cafile is None:
return
update = False
fp = open(cafile, 'r')
lines = fp.readlines()
fp.close()
for i in xrange(len(lines)):
if lines[i].startswith('ca_external_helper') and \
lines[i].find('-k') > 0:
lines[i] = lines[i].strip().split(' ')[0] + '\n'
update = True
if update:
fp = open(cafile, 'w')
for line in lines:
fp.write(line)
fp.close()
# Routines specific to renewing dogtag CA certificates
def get_pin(token, dogtag_constants=None):
"""
Dogtag stores its NSS pin in a file formatted as token:PIN.
The caller is expected to handle any exceptions raised.
"""
if dogtag_constants is None:
dogtag_constants = dogtag.configured_constants()
with open(dogtag_constants.PASSWORD_CONF_PATH, 'r') as f:
for line in f:
(tok, pin) = line.split('=', 1)
if token == tok:
return pin.strip()
return None
def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command,
post_command, profile=None):
"""
Tell certmonger to start tracking a dogtag CA certificate. These
are handled differently because their renewal must be done directly
and not through IPA.
This uses the generic certmonger command getcert so we can specify
a different helper.
pre_command is the script to execute before a renewal is done.
post_command is the script to execute after a renewal is done.
Both commands can be None.
Returns the stdout, stderr and returncode from running ipa-getcert
This assumes that certmonger is already running.
"""
if not cert_exists(nickname, os.path.abspath(secdir)):
raise RuntimeError('Nickname "%s" doesn\'t exist in NSS database "%s"' % (nickname, secdir))
args = [paths.GETCERT, "start-tracking",
"-d", os.path.abspath(secdir),
"-n", nickname,
"-c", ca,
]
if pre_command is not None:
if not os.path.isabs(pre_command):
if sys.maxsize > 2**32L:
libpath = 'lib64'
else:
libpath = 'lib'
pre_command = paths.CERTMONGER_COMMAND_TEMPLATE % (libpath, pre_command)
args.append("-B")
args.append(pre_command)
if post_command is not None:
if not os.path.isabs(post_command):
if sys.maxsize > 2**32L:
libpath = 'lib64'
else:
libpath = 'lib'
post_command = paths.CERTMONGER_COMMAND_TEMPLATE % (libpath, post_command)
args.append("-C")
args.append(post_command)
if pinfile:
args.append("-p")
args.append(pinfile)
else:
args.append("-P")
args.append(pin)
if profile:
args.append("-T")
args.append(profile)
(stdout, stderr, returncode) = ipautil.run(args, nolog=[pin])
def check_state(dirs):
"""
Given a set of directories and nicknames verify that we are no longer
tracking certificates.
dirs is a list of directories to test for. We will return a tuple
of nicknames for any tracked certificates found.
This can only check for NSS-based certificates.
"""
reqids = []
for dir in dirs:
reqids.extend(get_requests_for_dir(dir))
return reqids
def wait_for_request(request_id, timeout=120):
for i in range(0, timeout, 5):
state = get_request_value(request_id, 'state').strip()
root_logger.debug("certmonger request is in state %r", state)
if state in ('CA_REJECTED', 'CA_UNREACHABLE', 'CA_UNCONFIGURED',
'NEED_GUIDANCE', 'NEED_CA', 'MONITORING'):
break
time.sleep(5)
else:
raise RuntimeError("request timed out")
return state
if __name__ == '__main__':
request_id = request_cert(paths.HTTPD_ALIAS_DIR, "Test", "cn=tiger.example.com,O=IPA", "HTTP/tiger.example.com@EXAMPLE.COM")
csr = get_request_value(request_id, 'csr')
print csr
stop_tracking(request_id)