ipaserver.install.certs: Introduce NSSDatabase as a more generic certutil wrapper

The CertDB class was meant to be a wrapper around NSS databases,
certutil, pk12util, etc. Unfortunately, over time it grew too
dependent on the particular scenarios it is used in.
Introduce a new class that has no knowledge about IPA configuration,
and move generic code to it.

In the future, generic code should be moved to NSSDatabase, code
for the self-signed CA should be removed, and IPA-specific code may
stay in CertDB (which calls NSSDatabase).
This commit is contained in:
Petr Viktorin 2013-03-14 13:54:38 +01:00 committed by Martin Kosek
parent 5fd68e3f9d
commit 1e86378d49

View File

@ -188,8 +188,184 @@ def next_replica(serial_file=CA_SERIALNO):
return str(serial)
class NSSDatabase(object):
"""A general-purpose wrapper around a NSS cert database
For permanent NSS databases, pass the cert DB directory to __init__
For temporary databases, do not pass nssdir, and call close() when done
to remove the DB. Alternatively, a NSSDatabase can be used as a
context manager that calls close() automatically.
"""
# Traditionally, we used CertDB for our NSS DB operations, but that class
# got too tied to IPA server details, killing reusability.
# BaseCertDB is a class that knows nothing about IPA.
# Generic NSS DB code should be moved here.
def __init__(self, nssdir=None):
if nssdir is None:
self.secdir = tempfile.mkdtemp()
self._is_temporary = True
else:
self.secdir = nssdir
self._is_temporary = False
def close(self):
if self._is_temporary:
shutil.rmtree(self.secdir)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
def run_certutil(self, args, stdin=None):
new_args = ["/usr/bin/certutil", "-d", self.secdir]
new_args = new_args + args
return ipautil.run(new_args, stdin)
def create_db(self, password_filename):
"""Create cert DB
:param password_filename: Name of file containing the database password
"""
self.run_certutil(["-N", "-f", password_filename])
def list_certs(self):
"""Return nicknames and cert flags for all certs in the database
:return: List of (name, trust_flags) tuples
"""
certs, stderr, returncode = self.run_certutil(["-L"])
certs = certs.splitlines()
# FIXME, this relies on NSS never changing the formatting of certutil
certlist = []
for cert in certs:
nickname = cert[0:61]
trust = cert[61:]
if re.match(r'\w*,\w*,\w*', trust):
certlist.append((nickname.strip(), trust.strip()))
return tuple(certlist)
def find_server_certs(self):
"""Return nicknames and cert flags for server certs in the database
Server certs have an "u" character in the trust flags.
:return: List of (name, trust_flags) tuples
"""
server_certs = []
for name, flags in self.list_certs():
if 'u' in flags:
server_certs.append((name, flags))
return server_certs
def get_trust_chain(self, nickname):
"""Return names of certs in a given cert's trust chain
:param nickname: Name of the cert
:return: List of certificate names
"""
root_nicknames = []
chain, stderr, returncode = self.run_certutil([
"-O", "-n", nickname])
chain = chain.splitlines()
for c in chain:
m = re.match('\s*"(.*)" \[.*', c)
if m:
root_nicknames.append(m.groups()[0])
return root_nicknames
def import_pkcs12(self, pkcs12_filename, db_password_filename,
pkcs_password_filename=None):
args = ["/usr/bin/pk12util", "-d", self.secdir,
"-i", pkcs12_filename,
"-k", db_password_filename, '-v']
if pkcs_password_filename:
args = args + ["-w", pkcs_password_filename]
try:
ipautil.run(args)
except ipautil.CalledProcessError, e:
if e.returncode == 17:
raise RuntimeError("incorrect password for pkcs#12 file")
else:
raise RuntimeError("unknown error import pkcs#12 file")
def find_root_cert_from_pkcs12(self, pkcs12_fname, passwd_fname=None):
"""Given a PKCS#12 file, try to find any certificates that do
not have a key. The assumption is that these are the root CAs.
"""
args = ["/usr/bin/pk12util", "-d", self.secdir,
"-l", pkcs12_fname,
"-k", passwd_fname]
if passwd_fname:
args = args + ["-w", passwd_fname]
try:
(stdout, stderr, returncode) = ipautil.run(args)
except ipautil.CalledProcessError, e:
if e.returncode == 17:
raise RuntimeError("incorrect password for pkcs#12 file")
else:
raise RuntimeError("unknown error using pkcs#12 file")
lines = stdout.split('\n')
# A simple state machine.
# 1 = looking for a line starting with 'Certificate'
# 2 = looking for the Friendly name (nickname)
nicknames = []
state = 1
for line in lines:
if state == 2:
m = re.match("\W+Friendly Name: (.*)", line)
if m:
nicknames.append( m.groups(0)[0])
state = 1
if line == "Certificate:":
state = 2
elif not line.startswith(' '):
# Top-level item that is not a certificate
state = 1
return nicknames
def trust_root_cert(self, root_nickname):
if root_nickname[:7] == "Builtin":
root_logger.debug(
"No need to add trust for built-in root CAs, skipping %s" %
root_nickname)
else:
try:
self.run_certutil(["-M", "-n", root_nickname,
"-t", "CT,CT,"])
except ipautil.CalledProcessError, e:
raise RuntimeError(
"Setting trust on %s failed" % root_nickname)
def export_pem_cert(self, nickname, location):
"""Export the given cert to PEM file in the given location"""
cert, err, returncode = self.run_certutil(["-L", "-n", nickname, "-a"])
with open(location, "w+") as fd:
fd.write(cert)
os.chmod(location, 0444)
class CertDB(object):
"""An IPA-server-specific wrapper around NSS
This class knows IPA-specific details such as nssdir location, or the
CA cert name.
"""
# TODO: Remove all selfsign code
def __init__(self, realm, nssdir=NSS_DIR, fstore=None, host_name=None, subject_base=None):
self.nssdb = NSSDatabase(nssdir)
self.secdir = nssdir
self.realm = realm
@ -298,9 +474,7 @@ class CertDB(object):
return sha1(ipautil.ipa_generate_password()).hexdigest()
def run_certutil(self, args, stdin=None):
new_args = ["/usr/bin/certutil", "-d", self.secdir]
new_args = new_args + args
return ipautil.run(new_args, stdin)
return self.nssdb.run_certutil(args, stdin)
def run_signtool(self, args, stdin=None):
if not self.self_signed_ca:
@ -334,29 +508,14 @@ class CertDB(object):
ipautil.backup_file(self.certdb_fname)
ipautil.backup_file(self.keydb_fname)
ipautil.backup_file(self.secmod_fname)
self.run_certutil(["-N",
"-f", self.passwd_fname])
self.nssdb.create_db(self.passwd_fname)
self.set_perms(self.passwd_fname, write=True)
def list_certs(self):
"""
Return a tuple of tuples containing (nickname, trust)
"""
p = subprocess.Popen(["/usr/bin/certutil", "-d", self.secdir,
"-L"], stdout=subprocess.PIPE)
certs = p.stdout.read()
certs = certs.split("\n")
# FIXME, this relies on NSS never changing the formatting of certutil
certlist = []
for cert in certs:
nickname = cert[0:61]
trust = cert[61:]
if re.match(r'\w+,\w+,\w+', trust):
certlist.append((nickname.strip(), trust))
return tuple(certlist)
return self.nssdb.list_certs()
def has_nickname(self, nickname):
"""
@ -810,26 +969,7 @@ class CertDB(object):
Given a nickname, return a list of the certificates that make up
the trust chain.
"""
root_nicknames = []
p = subprocess.Popen(["/usr/bin/certutil", "-d", self.secdir,
"-O", "-n", nickname], stdout=subprocess.PIPE)
chain = p.stdout.read()
chain = chain.split("\n")
for c in chain:
m = re.match('\s*"(.*)" \[.*', c)
if m:
root_nicknames.append(m.groups()[0])
if len(root_nicknames) > 1:
# If you pass in the name of a CA to get the chain it may only
# return 1 (self-signed). Return that.
try:
root_nicknames.remove(nickname)
except ValueError:
# The nickname wasn't in the list
pass
root_nicknames = self.nssdb.get_trust_chain(nickname)
# Try to work around a change in the F-11 certutil where untrusted
# CA's are not shown in the chain. This will make a default IPA
@ -876,55 +1016,20 @@ class CertDB(object):
def trust_root_cert(self, root_nickname):
if root_nickname is None:
root_logger.debug("Unable to identify root certificate to trust. Continueing but things are likely to fail.")
root_logger.debug("Unable to identify root certificate to trust. Continuing but things are likely to fail.")
return
if root_nickname[:7] == "Builtin":
root_logger.debug("No need to add trust for built-in root CA's, skipping %s" % root_nickname)
else:
try:
self.run_certutil(["-M", "-n", root_nickname,
"-t", "CT,CT,"])
except ipautil.CalledProcessError, e:
root_logger.error("Setting trust on %s failed" % root_nickname)
self.nssdb.trust_root_cert(root_nickname)
except RuntimeError:
pass
def find_server_certs(self):
p = subprocess.Popen(["/usr/bin/certutil", "-d", self.secdir,
"-L"], stdout=subprocess.PIPE)
certs = p.stdout.read()
certs = certs.split("\n")
server_certs = []
for cert in certs:
fields = cert.split()
if not len(fields):
continue
flags = fields[-1]
if 'u' in flags:
name = " ".join(fields[0:-1])
# NSS 3.12 added a header to the certutil output
if name == "Certificate Nickname Trust":
continue
server_certs.append((name, flags))
return server_certs
return self.nssdb.find_server_certs()
def import_pkcs12(self, pkcs12_fname, passwd_fname=None):
args = ["/usr/bin/pk12util", "-d", self.secdir,
"-i", pkcs12_fname,
"-k", self.passwd_fname]
if passwd_fname:
args = args + ["-w", passwd_fname]
try:
ipautil.run(args)
except ipautil.CalledProcessError, e:
if e.returncode == 17:
raise RuntimeError("incorrect password")
else:
raise RuntimeError("unknown error import pkcs#12 file")
return self.nssdb.import_pkcs12(pkcs12_fname, self.passwd_fname,
pkcs_password_filename=passwd_fname)
def export_pkcs12(self, pkcs12_fname, pkcs12_pwd_fname, nickname=None):
if nickname is None:
@ -1100,18 +1205,9 @@ class CertDB(object):
"-in", p12_fname, "-out", pem_fname,
"-passin", "file:" + p12_pwd_fname])
def backup_files(self):
self.fstore.backup_file(self.noise_fname)
self.fstore.backup_file(self.passwd_fname)
self.fstore.backup_file(self.certdb_fname)
self.fstore.backup_file(self.keydb_fname)
self.fstore.backup_file(self.secmod_fname)
self.fstore.backup_file(self.cacert_fname)
self.fstore.backup_file(self.pk12_fname)
self.fstore.backup_file(self.pin_fname)
self.fstore.backup_file(self.certreq_fname)
self.fstore.backup_file(self.certder_fname)
def publish_ca_cert(self, location):
shutil.copy(self.cacert_fname, location)
os.chmod(location, 0444)
def export_pem_cert(self, nickname, location):
return self.nssdb.export_pem_cert(nickname, location)