From 007174492908db3e3e7f45f768df1cebb79738a6 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Wed, 8 Nov 2017 12:10:54 +0100 Subject: [PATCH] Support sqlite NSSDB Prepare CertDB and NSSDatabase to support sqlite DB format. NSSDatabase will automatically detect and use either old DBM or new SQL format. Old databases are not migrated yet. https://pagure.io/freeipa/issue/7049 Signed-off-by: Christian Heimes Reviewed-By: Rob Crittenden --- freeipa.spec.in | 5 + ipaclient/install/client.py | 10 +- ipaplatform/base/constants.py | 3 +- ipapython/certdb.py | 157 ++++++++++++++++--- ipaserver/install/certs.py | 50 +++--- ipaserver/install/ipa_backup.py | 19 ++- ipaserver/install/ipa_replica_prepare.py | 31 ++-- ipaserver/install/ipa_restore.py | 19 +-- ipaserver/install/ipa_server_certinstall.py | 19 ++- ipaserver/secrets/store.py | 34 ++-- ipatests/pytest_plugins/integration/tasks.py | 10 +- ipatests/test_ipapython/test_certdb.py | 133 ++++++++++++++++ ipatests/test_xmlrpc/test_cert_plugin.py | 62 +++----- ipatests/test_xmlrpc/testcert.py | 56 ++----- 14 files changed, 411 insertions(+), 197 deletions(-) create mode 100644 ipatests/test_ipapython/test_certdb.py diff --git a/freeipa.spec.in b/freeipa.spec.in index f65bf09a5..7c39d343d 100644 --- a/freeipa.spec.in +++ b/freeipa.spec.in @@ -1606,9 +1606,14 @@ fi %ghost %attr(0644,root,apache) %config(noreplace) %{_sysconfdir}/ipa/default.conf %ghost %attr(0644,root,apache) %config(noreplace) %{_sysconfdir}/ipa/ca.crt %dir %attr(0755,root,root) %{_sysconfdir}/ipa/nssdb +# old dbm format %ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/cert8.db %ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/key3.db %ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/secmod.db +# new sql format +%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/cert9.db +%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/key4.db +%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/pkcs11.txt %ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/pwdfile.txt %ghost %config(noreplace) %{_sysconfdir}/pki/ca-trust/source/ipa.p11-kit %dir %{_localstatedir}/lib/ipa-client diff --git a/ipaclient/install/client.py b/ipaclient/install/client.py index 2f89e7eae..6fa42e366 100644 --- a/ipaclient/install/client.py +++ b/ipaclient/install/client.py @@ -2304,9 +2304,6 @@ def create_ipa_nssdb(): db = certdb.NSSDatabase(paths.IPA_NSSDB_DIR) db.create_db(mode=0o755, backup=True) os.chmod(db.pwd_file, 0o600) - os.chmod(os.path.join(db.secdir, 'cert8.db'), 0o644) - os.chmod(os.path.join(db.secdir, 'key3.db'), 0o644) - os.chmod(os.path.join(db.secdir, 'secmod.db'), 0o644) def update_ipa_nssdb(): @@ -3067,11 +3064,8 @@ def uninstall(options): logger.error("%s failed to stop tracking certificate: %s", cmonger.service_name, e) - for filename in (os.path.join(ipa_db.secdir, 'cert8.db'), - os.path.join(ipa_db.secdir, 'key3.db'), - os.path.join(ipa_db.secdir, 'secmod.db'), - os.path.join(ipa_db.secdir, 'pwdfile.txt')): - remove_file(filename) + for filename in certdb.NSS_FILES: + remove_file(os.path.join(ipa_db.secdir, filename)) # Remove any special principal names we added to the IPA CA helper certmonger.remove_principal_from_cas() diff --git a/ipaplatform/base/constants.py b/ipaplatform/base/constants.py index 5f52b94f6..94bd0f8a1 100644 --- a/ipaplatform/base/constants.py +++ b/ipaplatform/base/constants.py @@ -37,6 +37,7 @@ class BaseConstantsNamespace(object): 'httpd_dbus_sssd': 'on', } SSSD_USER = "sssd" - + # sql (new format), dbm (old format) + NSS_DEFAULT_DBTYPE = 'dbm' constants = BaseConstantsNamespace() diff --git a/ipapython/certdb.py b/ipapython/certdb.py index 14e6adf69..4b3560fab 100644 --- a/ipapython/certdb.py +++ b/ipapython/certdb.py @@ -25,12 +25,14 @@ import io import pwd import grp import re +import stat import tempfile from tempfile import NamedTemporaryFile import shutil import cryptography.x509 +from ipaplatform.constants import constants from ipaplatform.paths import paths from ipapython.dn import DN from ipapython.kerberos import Principal @@ -42,7 +44,9 @@ logger = logging.getLogger(__name__) CA_NICKNAME_FMT = "%s IPA CA" -NSS_FILES = ("cert8.db", "key3.db", "secmod.db", "pwdfile.txt") +NSS_DBM_FILES = ("cert8.db", "key3.db", "secmod.db") +NSS_SQL_FILES = ("cert9.db", "key4.db", "pkcs11.txt") +NSS_FILES = NSS_DBM_FILES + NSS_SQL_FILES + ("pwdfile.txt",) TrustFlags = collections.namedtuple('TrustFlags', 'has_key trusted ca usages') @@ -214,14 +218,50 @@ class NSSDatabase(object): # got too tied to IPA server details, killing reusability. # BaseCertDB is a class that knows nothing about IPA. # Generic NSS DB code should be moved here. - def __init__(self, nssdir=None): + + def __init__(self, nssdir=None, dbtype='auto'): if nssdir is None: self.secdir = tempfile.mkdtemp() self._is_temporary = True + if dbtype == 'auto': + dbtype = constants.NSS_DEFAULT_DBTYPE + else: + dbtype = dbtype else: self.secdir = nssdir self._is_temporary = False + if dbtype == 'auto': + if os.path.isfile(os.path.join(self.secdir, "cert9.db")): + dbtype = 'sql' + elif os.path.isfile(os.path.join(self.secdir, "cert8.db")): + dbtype = 'dbm' + else: + dbtype = constants.NSS_DEFAULT_DBTYPE + self.pwd_file = os.path.join(self.secdir, 'pwdfile.txt') + self.dbtype = None + self.certdb = self.keydb = self.secmod = None + self.filenames = () + self._set_filenames(dbtype) + + def _set_filenames(self, dbtype): + self.dbtype = dbtype + if dbtype == 'dbm': + self.certdb = os.path.join(self.secdir, "cert8.db") + self.keydb = os.path.join(self.secdir, "key3.db") + self.secmod = os.path.join(self.secdir, "secmod.db") + elif dbtype == 'sql': + self.certdb = os.path.join(self.secdir, "cert9.db") + self.keydb = os.path.join(self.secdir, "key4.db") + self.secmod = os.path.join(self.secdir, "pkcs11.txt") + else: + raise ValueError(dbtype) + self.filenames = ( + self.certdb, + self.keydb, + self.secmod, + self.pwd_file, + ) def close(self): if self._is_temporary: @@ -234,11 +274,22 @@ class NSSDatabase(object): self.close() def run_certutil(self, args, stdin=None, **kwargs): - new_args = [paths.CERTUTIL, "-d", self.secdir] - new_args = new_args + args + new_args = [ + paths.CERTUTIL, + "-d", '{}:{}'.format(self.dbtype, self.secdir) + ] + new_args.extend(args) new_args.extend(['-f', self.pwd_file]) return ipautil.run(new_args, stdin, **kwargs) + def run_pk12util(self, args, stdin=None, **kwargs): + new_args = [ + paths.PK12UTIL, + "-d", '{}:{}'.format(self.dbtype, self.secdir) + ] + new_args.extend(args) + return ipautil.run(new_args, stdin, **kwargs) + def create_db(self, user=None, group=None, mode=None, backup=False): """Create cert DB @@ -247,13 +298,14 @@ class NSSDatabase(object): :param mode: Mode of the secdir :param backup: Backup the sedir files """ - dirmode = 0o750 - filemode = 0o640 - pwdfilemode = 0o640 if mode is not None: dirmode = mode filemode = mode & 0o666 pwdfilemode = mode & 0o660 + else: + dirmode = 0o750 + filemode = 0o640 + pwdfilemode = 0o640 uid = -1 gid = -1 @@ -263,7 +315,7 @@ class NSSDatabase(object): gid = grp.getgrnam(group).gr_gid if backup: - for filename in NSS_FILES: + for filename in self.filenames: path = os.path.join(self.secdir, filename) ipautil.backup_file(path) @@ -283,7 +335,7 @@ class NSSDatabase(object): # Finally fix up perms os.chown(self.secdir, uid, gid) os.chmod(self.secdir, dirmode) - for filename in NSS_FILES: + for filename in self.filenames: path = os.path.join(self.secdir, filename) if os.path.exists(path): os.chown(path, uid, gid) @@ -293,8 +345,54 @@ class NSSDatabase(object): new_mode = filemode os.chmod(path, new_mode) + def convert_db(self, rename_old=True): + """Convert DBM database format to SQL database format + """ + if (self.dbtype == 'sql' or + os.path.isfile(os.path.join(self.secdir, "cert9.db"))): + raise ValueError( + 'NSS DB {} has been migrated already.'.format(self.secdir) + ) + + # use certutil to migrate db to new format + # see https://bugzilla.mozilla.org/show_bug.cgi?id=1415912 + args = [ + paths.CERTUTIL, + '-d', 'sql:{}'.format(self.secdir), + '-f', self.pwd_file, + ] + if self.list_keys(): + # has keys, use 'list keys' in read-write mode + args.extend(['-K', '-X']) + else: + # no keys, create new DB with auto-migrate + args.extend(['-N', '-@', self.pwd_file]) + ipautil.run(args) + + # retain file ownership and permission, backup old files + migration = ( + ('cert8.db', 'cert9.db'), + ('key3.db', 'key4.db'), + ('secmod.db', 'pkcs11.txt'), + ) + for oldname, newname in migration: + oldname = os.path.join(self.secdir, oldname) + newname = os.path.join(self.secdir, newname) + oldstat = os.stat(oldname) + os.chmod(newname, stat.S_IMODE(oldstat.st_mode)) + os.chown(newname, oldstat.st_uid, oldstat.st_gid) + # XXX also retain SELinux context? + + self._set_filenames('sql') + self.list_certs() # self-test + + if rename_old: + for oldname, _ in migration: # pylint: disable=unused-variable + oldname = os.path.join(self.secdir, oldname) + os.rename(oldname, oldname + '.migrated') + def restore(self): - for filename in NSS_FILES: + for filename in self.filenames: path = os.path.join(self.secdir, filename) backup_path = path + '.orig' save_path = path + '.ipasave' @@ -325,6 +423,20 @@ class NSSDatabase(object): return tuple(certlist) + def list_keys(self): + result = self.run_certutil( + ["-K"], raiseonerr=False, capture_output=True + ) + if result.returncode == 255: + return () + keylist = [] + for line in result.output.splitlines(): + mo = re.match(r'^<\s*(\d+)>\s+(\w+)\s+([0-9a-z]+)\s+(.*)$', line) + if mo is not None: + slot, algo, keyid, nick = mo.groups() + keylist.append((int(slot), algo, keyid, nick.strip())) + return tuple(keylist) + def find_server_certs(self): """Return nicknames and cert flags for server certs in the database @@ -357,16 +469,17 @@ class NSSDatabase(object): return root_nicknames def export_pkcs12(self, nickname, pkcs12_filename, pkcs12_passwd=None): - args = [paths.PK12UTIL, "-d", self.secdir, - "-o", pkcs12_filename, - "-n", nickname, - "-k", self.pwd_file] + args = [ + "-o", pkcs12_filename, + "-n", nickname, + "-k", self.pwd_file + ] pkcs12_password_file = None if pkcs12_passwd is not None: pkcs12_password_file = ipautil.write_tmp_file(pkcs12_passwd + '\n') - args = args + ["-w", pkcs12_password_file.name] + args.extend(["-w", pkcs12_password_file.name]) try: - ipautil.run(args) + self.run_pk12util(args) except ipautil.CalledProcessError as e: if e.returncode == 17: raise RuntimeError("incorrect password for pkcs#12 file %s" % @@ -381,15 +494,17 @@ class NSSDatabase(object): pkcs12_password_file.close() def import_pkcs12(self, pkcs12_filename, pkcs12_passwd=None): - args = [paths.PK12UTIL, "-d", self.secdir, - "-i", pkcs12_filename, - "-k", self.pwd_file, '-v'] + args = [ + "-i", pkcs12_filename, + "-k", self.pwd_file, + "-v" + ] pkcs12_password_file = None if pkcs12_passwd is not None: pkcs12_password_file = ipautil.write_tmp_file(pkcs12_passwd + '\n') - args = args + ["-w", pkcs12_password_file.name] + args.extend(["-w", pkcs12_password_file.name]) try: - ipautil.run(args) + self.run_pk12util(args) except ipautil.CalledProcessError as e: if e.returncode == 17: raise RuntimeError("incorrect password for pkcs#12 file %s" % diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index a40aff5e9..1579d5c09 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -103,18 +103,20 @@ class CertDB(object): # TODO: Remove all selfsign code def __init__(self, realm, nssdir, fstore=None, host_name=None, subject_base=None, ca_subject=None, - user=None, group=None, mode=None, create=False): - self.nssdb = NSSDatabase(nssdir) + user=None, group=None, mode=None, create=False, + dbtype='auto'): + self.nssdb = NSSDatabase(nssdir, dbtype=dbtype) self.secdir = nssdir self.realm = realm - self.noise_fname = self.secdir + "/noise.txt" - self.certdb_fname = self.secdir + "/cert8.db" - self.keydb_fname = self.secdir + "/key3.db" - self.secmod_fname = self.secdir + "/secmod.db" - self.pk12_fname = self.secdir + "/cacert.p12" - self.pin_fname = self.secdir + "/pin.txt" + self.noise_fname = os.path.join(self.secdir, "noise.txt") + + self.certdb_fname = self.nssdb.certdb + self.keydb_fname = self.nssdb.keydb + self.secmod_fname = self.nssdb.secmod + self.pk12_fname = os.path.join(self.secdir, "cacert.p12") + self.pin_fname = os.path.join(self.secdir + "pin.txt") self.reqdir = None self.certreq_fname = None self.certder_fname = None @@ -171,15 +173,7 @@ class CertDB(object): """ Checks whether all NSS database files + our pwd_file exist """ - db_files = ( - self.secdir, - self.certdb_fname, - self.keydb_fname, - self.secmod_fname, - self.nssdb.pwd_file, - ) - - for f in db_files: + for f in self.nssdb.filenames: if not os.path.exists(f): return False return True @@ -291,11 +285,12 @@ class CertDB(object): if create_pkcs12: ipautil.backup_file(self.pk12_fname) - ipautil.run([paths.PK12UTIL, "-d", self.secdir, - "-o", self.pk12_fname, - "-n", self.cacert_name, - "-w", self.passwd_fname, - "-k", self.passwd_fname]) + self.nssdb.run_pk12util([ + "-o", self.pk12_fname, + "-n", self.cacert_name, + "-k", self.passwd_fname, + "-w", self.passwd_fname, + ]) self.set_perms(self.pk12_fname) def load_cacert(self, cacert_fname, trust_flags): @@ -514,11 +509,12 @@ class CertDB(object): if nickname is None: nickname = get_ca_nickname(api.env.realm) - ipautil.run([paths.PK12UTIL, "-d", self.secdir, - "-o", pkcs12_fname, - "-n", nickname, - "-k", self.passwd_fname, - "-w", pkcs12_pwd_fname]) + self.nssdb.run_pk12util([ + "-o", pkcs12_fname, + "-n", nickname, + "-k", self.passwd_fname, + "-w", pkcs12_pwd_fname + ]) def create_from_cacert(self): cacert_fname = paths.IPA_CA_CRT diff --git a/ipaserver/install/ipa_backup.py b/ipaserver/install/ipa_backup.py index d8ff395fd..475d846e6 100644 --- a/ipaserver/install/ipa_backup.py +++ b/ipaserver/install/ipa_backup.py @@ -25,20 +25,13 @@ import time import pwd import six -# pylint: disable=import-error -if six.PY3: - # The SafeConfigParser class has been renamed to ConfigParser in Py3 - from configparser import ConfigParser as SafeConfigParser -else: - from ConfigParser import SafeConfigParser -# pylint: enable=import-error from ipaplatform.paths import paths from ipaplatform import services from ipalib import api, errors from ipapython import version from ipapython.ipautil import run, write_tmp_file -from ipapython import admintool +from ipapython import admintool, certdb from ipapython.dn import DN from ipaserver.install.replication import wait_for_task from ipaserver.install import installutils @@ -46,7 +39,13 @@ from ipapython import ipaldap from ipaplatform.constants import constants from ipaplatform.tasks import tasks - +# pylint: disable=import-error +if six.PY3: + # The SafeConfigParser class has been renamed to ConfigParser in Py3 + from configparser import ConfigParser as SafeConfigParser +else: + from ConfigParser import SafeConfigParser +# pylint: enable=import-error ISO8601_DATETIME_FMT = '%Y-%m-%dT%H:%M:%S' logger = logging.getLogger(__name__) @@ -194,7 +193,7 @@ class Backup(admintool.AdminTool): paths.HOSTS, ) + tuple( os.path.join(paths.IPA_NSSDB_DIR, file) - for file in ('cert8.db', 'key3.db', 'secmod.db') + for file in (certdb.NSS_DBM_FILES + certdb.NSS_SQL_FILES) ) logs=( diff --git a/ipaserver/install/ipa_replica_prepare.py b/ipaserver/install/ipa_replica_prepare.py index 1b8221885..0be0302dd 100644 --- a/ipaserver/install/ipa_replica_prepare.py +++ b/ipaserver/install/ipa_replica_prepare.py @@ -32,6 +32,20 @@ from optparse import OptionGroup, SUPPRESS_HELP import dns.resolver import six + +from ipaserver.install import certs, installutils, bindinstance, dsinstance, ca +from ipaserver.install.replication import enable_replication_version_checking +from ipaserver.install.server.replicainstall import install_ca_cert +from ipaserver.install.bindinstance import ( + add_zone, add_fwd_rr, add_ptr_rr, dns_container_exists) +from ipapython import ipautil, admintool, certdb +from ipapython.dn import DN +from ipapython import version +from ipalib import api +from ipalib import errors +from ipaplatform.paths import paths +from ipalib.constants import DOMAIN_LEVEL_0 + # pylint: disable=import-error if six.PY3: # The SafeConfigParser class has been renamed to ConfigParser in Py3 @@ -40,18 +54,6 @@ else: from ConfigParser import SafeConfigParser # pylint: enable=import-error -from ipaserver.install import certs, installutils, bindinstance, dsinstance, ca -from ipaserver.install.replication import enable_replication_version_checking -from ipaserver.install.server.replicainstall import install_ca_cert -from ipaserver.install.bindinstance import ( - add_zone, add_fwd_rr, add_ptr_rr, dns_container_exists) -from ipapython import ipautil, admintool -from ipapython.dn import DN -from ipapython import version -from ipalib import api -from ipalib import errors -from ipaplatform.paths import paths -from ipalib.constants import DOMAIN_LEVEL_0 logger = logging.getLogger(__name__) @@ -565,9 +567,8 @@ class ReplicaPrepare(admintool.AdminTool): installutils.remove_file(pkcs12_fname) installutils.remove_file(passwd_fname) - self.remove_info_file("cert8.db") - self.remove_info_file("key3.db") - self.remove_info_file("secmod.db") + for fname in (certdb.NSS_SQL_FILES + certdb.NSS_SQL_FILES): + self.remove_info_file(fname) self.remove_info_file("noise.txt") orig_filename = passwd_fname + ".orig" diff --git a/ipaserver/install/ipa_restore.py b/ipaserver/install/ipa_restore.py index 1dd7b0f8a..99e6297b6 100644 --- a/ipaserver/install/ipa_restore.py +++ b/ipaserver/install/ipa_restore.py @@ -27,20 +27,13 @@ import ldif import itertools import six -# pylint: disable=import-error -if six.PY3: - # The SafeConfigParser class has been renamed to ConfigParser in Py3 - from configparser import ConfigParser as SafeConfigParser -else: - from ConfigParser import SafeConfigParser -# pylint: enable=import-error from ipaclient.install.client import update_ipa_nssdb from ipalib import api, errors from ipalib.constants import FQDN from ipapython import version, ipautil from ipapython.ipautil import run, user_input -from ipapython import admintool +from ipapython import admintool, certdb from ipapython.dn import DN from ipaserver.install.replication import (wait_for_task, ReplicationManager, get_cs_replication_manager) @@ -58,6 +51,14 @@ try: except ImportError: adtrustinstance = None +# pylint: disable=import-error +if six.PY3: + # The SafeConfigParser class has been renamed to ConfigParser in Py3 + from configparser import ConfigParser as SafeConfigParser +else: + from ConfigParser import SafeConfigParser +# pylint: enable=import-error + logger = logging.getLogger(__name__) @@ -847,7 +848,7 @@ class Restore(admintool.AdminTool): krbinstance.KrbInstance().stop_tracking_certs() - for basename in ('cert8.db', 'key3.db', 'secmod.db', 'pwdfile.txt'): + for basename in certdb.NSS_FILES: filename = os.path.join(paths.IPA_NSSDB_DIR, basename) try: ipautil.backup_file(filename) diff --git a/ipaserver/install/ipa_server_certinstall.py b/ipaserver/install/ipa_server_certinstall.py index 36c0a586d..dd2c590e6 100644 --- a/ipaserver/install/ipa_server_certinstall.py +++ b/ipaserver/install/ipa_server_certinstall.py @@ -30,9 +30,10 @@ from ipalib.install import certmonger from ipaplatform.constants import constants from ipaplatform.paths import paths from ipapython import admintool -from ipapython.certdb import (get_ca_nickname, - NSSDatabase, - verify_kdc_cert_validity) +from ipapython.certdb import ( + NSS_DBM_FILES, NSS_SQL_FILES, NSSDatabase, get_ca_nickname, + verify_kdc_cert_validity +) from ipapython.dn import DN from ipalib import api, errors from ipaserver.install import certs, dsinstance, installutils, krbinstance @@ -170,14 +171,12 @@ class ServerCertInstall(admintool.AdminTool): quotes=False) # Fix the database permissions - os.chmod(os.path.join(dirname, 'cert8.db'), 0o640) - os.chmod(os.path.join(dirname, 'key3.db'), 0o640) - os.chmod(os.path.join(dirname, 'secmod.db'), 0o640) - pent = pwd.getpwnam(constants.HTTPD_USER) - os.chown(os.path.join(dirname, 'cert8.db'), 0, pent.pw_gid) - os.chown(os.path.join(dirname, 'key3.db'), 0, pent.pw_gid) - os.chown(os.path.join(dirname, 'secmod.db'), 0, pent.pw_gid) + for filename in (NSS_DBM_FILES + NSS_SQL_FILES): + absname = os.path.join(dirname, filename) + if os.path.isfile(absname): + os.chmod(absname, 0o640) + os.chown(absname, 0, pent.pw_gid) def install_kdc_cert(self): ca_cert_file = paths.CA_BUNDLE_PEM diff --git a/ipaserver/secrets/store.py b/ipaserver/secrets/store.py index eecf83f37..f23a0c396 100644 --- a/ipaserver/secrets/store.py +++ b/ipaserver/secrets/store.py @@ -6,6 +6,7 @@ from custodia.store.interface import CSStore # pylint: disable=relative-import from jwcrypto.common import json_decode, json_encode from ipaplatform.paths import paths from ipapython import ipautil +from ipapython.certdb import NSSDatabase from ipaserver.secrets.common import iSecLdap import ldap import os @@ -65,10 +66,11 @@ class NSSWrappedCertDB(DBMAPHandler): '--wrap-nickname', self.wrap_nick, '--target-nickname', self.target_nick, '-o', wrapped_key_file]) - ipautil.run([ - paths.CERTUTIL, '-d', self.nssdb_path, + nssdb = NSSDatabase(self.nssdb_path) + nssdb.run_certutil([ '-L', '-n', self.target_nick, - '-a', '-o', certificate_file]) + '-a', '-o', certificate_file, + ]) with open(wrapped_key_file, 'rb') as f: wrapped_key = f.read() with open(certificate_file, 'r') as f: @@ -102,12 +104,13 @@ class NSSCertDB(DBMAPHandler): with open(pk12pwfile, 'w') as f: f.write(password) pk12file = os.path.join(tdir, 'pk12file') - ipautil.run([paths.PK12UTIL, - "-d", self.nssdb_path, - "-o", pk12file, - "-n", self.nickname, - "-k", self.nssdb_pwdfile, - "-w", pk12pwfile]) + nssdb = NSSDatabase(self.nssdb_path) + nssdb.run_pk12util([ + "-o", pk12file, + "-n", self.nickname, + "-k", self.nssdb_pwdfile, + "-w", pk12pwfile, + ]) with open(pk12file, 'rb') as f: data = f.read() finally: @@ -125,12 +128,13 @@ class NSSCertDB(DBMAPHandler): pk12file = os.path.join(tdir, 'pk12file') with open(pk12file, 'wb') as f: f.write(b64decode(v['pkcs12 data'])) - ipautil.run([paths.PK12UTIL, - "-d", self.nssdb_path, - "-i", pk12file, - "-n", self.nickname, - "-k", self.nssdb_pwdfile, - "-w", pk12pwfile]) + nssdb = NSSDatabase(self.nssdb_path) + nssdb.run_pk12util([ + "-i", pk12file, + "-n", self.nickname, + "-k", self.nssdb_pwdfile, + "-w", pk12pwfile, + ]) finally: shutil.rmtree(tdir) diff --git a/ipatests/pytest_plugins/integration/tasks.py b/ipatests/pytest_plugins/integration/tasks.py index a33710a45..b407145ac 100644 --- a/ipatests/pytest_plugins/integration/tasks.py +++ b/ipatests/pytest_plugins/integration/tasks.py @@ -35,6 +35,7 @@ from six import StringIO from ipapython import ipautil from ipaplatform.paths import paths +from ipaplatform.constants import constants from ipapython.dn import DN from ipalib import errors from ipalib.util import get_reverse_zone_default, verify_host_resolvable @@ -1262,9 +1263,12 @@ def run_server_del(host, server_to_delete, force=False, return host.run_command(args, raiseonerr=False) -def run_certutil(host, args, reqdir, stdin=None, raiseonerr=True): - new_args = [paths.CERTUTIL, "-d", reqdir] - new_args = " ".join(new_args + args) +def run_certutil(host, args, reqdir, dbtype=None, + stdin=None, raiseonerr=True): + if dbtype is None: + dbtype = constants.NSS_DEFAULT_DBTYPE + new_args = [paths.CERTUTIL, '-d', '{}:{}'.format(dbtype, reqdir)] + new_args.extend(args) return host.run_command(new_args, raiseonerr=raiseonerr, stdin_text=stdin) diff --git a/ipatests/test_ipapython/test_certdb.py b/ipatests/test_ipapython/test_certdb.py new file mode 100644 index 000000000..0dc450555 --- /dev/null +++ b/ipatests/test_ipapython/test_certdb.py @@ -0,0 +1,133 @@ +import os + +from ipapython.certdb import NSSDatabase, TRUSTED_PEER_TRUST_FLAGS + +CERTNICK = 'testcert' + + +def create_selfsigned(nssdb): + # create self-signed cert + key + noisefile = os.path.join(nssdb.secdir, 'noise') + with open(noisefile, 'wb') as f: + f.write(os.urandom(64)) + try: + nssdb.run_certutil([ + '-S', '-x', + '-z', noisefile, + '-k', 'rsa', '-g', '2048', '-Z', 'SHA256', + '-t', 'CTu,Cu,Cu', + '-s', 'CN=testcert', + '-n', CERTNICK, + '-m', '365', + ]) + finally: + os.unlink(noisefile) + + +def test_dbm_tmp(): + with NSSDatabase(dbtype='dbm') as nssdb: + assert nssdb.dbtype == 'dbm' + + for filename in nssdb.filenames: + assert not os.path.isfile(filename) + + nssdb.create_db() + for filename in nssdb.filenames: + assert os.path.isfile(filename) + assert os.path.dirname(filename) == nssdb.secdir + + assert os.path.basename(nssdb.certdb) == 'cert8.db' + assert nssdb.certdb in nssdb.filenames + assert os.path.basename(nssdb.keydb) == 'key3.db' + assert os.path.basename(nssdb.secmod) == 'secmod.db' + + +def test_sql_tmp(): + with NSSDatabase(dbtype='sql') as nssdb: + assert nssdb.dbtype == 'sql' + + for filename in nssdb.filenames: + assert not os.path.isfile(filename) + + nssdb.create_db() + for filename in nssdb.filenames: + assert os.path.isfile(filename) + assert os.path.dirname(filename) == nssdb.secdir + + assert os.path.basename(nssdb.certdb) == 'cert9.db' + assert nssdb.certdb in nssdb.filenames + assert os.path.basename(nssdb.keydb) == 'key4.db' + assert os.path.basename(nssdb.secmod) == 'pkcs11.txt' + + +def test_convert_db(): + with NSSDatabase(dbtype='dbm') as nssdb: + assert nssdb.dbtype == 'dbm' + + nssdb.create_db() + + create_selfsigned(nssdb) + + oldcerts = nssdb.list_certs() + assert len(oldcerts) == 1 + oldkeys = nssdb.list_keys() + assert len(oldkeys) == 1 + + nssdb.convert_db() + + assert nssdb.dbtype == 'sql' + newcerts = nssdb.list_certs() + assert len(newcerts) == 1 + assert newcerts == oldcerts + newkeys = nssdb.list_keys() + assert len(newkeys) == 1 + assert newkeys == oldkeys + + for filename in nssdb.filenames: + assert os.path.isfile(filename) + assert os.path.dirname(filename) == nssdb.secdir + + assert os.path.basename(nssdb.certdb) == 'cert9.db' + assert nssdb.certdb in nssdb.filenames + assert os.path.basename(nssdb.keydb) == 'key4.db' + assert os.path.basename(nssdb.secmod) == 'pkcs11.txt' + + +def test_convert_db_nokey(): + with NSSDatabase(dbtype='dbm') as nssdb: + assert nssdb.dbtype == 'dbm' + nssdb.create_db() + + create_selfsigned(nssdb) + + assert len(nssdb.list_certs()) == 1 + assert len(nssdb.list_keys()) == 1 + # remove key, readd cert + cert = nssdb.get_cert(CERTNICK) + nssdb.run_certutil(['-F', '-n', CERTNICK]) + nssdb.add_cert(cert, CERTNICK, TRUSTED_PEER_TRUST_FLAGS) + assert len(nssdb.list_keys()) == 0 + oldcerts = nssdb.list_certs() + assert len(oldcerts) == 1 + + nssdb.convert_db() + assert nssdb.dbtype == 'sql' + newcerts = nssdb.list_certs() + assert len(newcerts) == 1 + assert newcerts == oldcerts + assert nssdb.get_cert(CERTNICK) == cert + newkeys = nssdb.list_keys() + assert newkeys == () + + for filename in nssdb.filenames: + assert os.path.isfile(filename) + assert os.path.dirname(filename) == nssdb.secdir + + old = os.path.join(nssdb.secdir, 'cert8.db') + assert not os.path.isfile(old) + assert os.path.isfile(old + '.migrated') + + assert os.path.basename(nssdb.certdb) == 'cert9.db' + assert nssdb.certdb in nssdb.filenames + assert os.path.basename(nssdb.keydb) == 'key4.db' + assert os.path.basename(nssdb.secmod) == 'pkcs11.txt' diff --git a/ipatests/test_xmlrpc/test_cert_plugin.py b/ipatests/test_xmlrpc/test_cert_plugin.py index 840830340..34c169b4c 100644 --- a/ipatests/test_xmlrpc/test_cert_plugin.py +++ b/ipatests/test_xmlrpc/test_cert_plugin.py @@ -25,13 +25,11 @@ import base64 import nose import os import pytest -import shutil import six -import tempfile from ipalib import api from ipalib import errors from ipaplatform.paths import paths -from ipapython import ipautil +from ipapython.certdb import NSSDatabase from ipapython.dn import DN from ipapython.ipautil import run from ipatests.test_xmlrpc.testcert import subject_base @@ -77,8 +75,9 @@ def is_db_configured(): # The API tested depends on the value of ~/.ipa/default/ra_plugin when # running as the lite-server. - class BaseCert(XMLRPC_test): + host_fqdn = u'ipatestcert.%s' % api.env.domain + service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm) @classmethod def setup_class(cls): @@ -91,42 +90,27 @@ class BaseCert(XMLRPC_test): is_db_configured() - def run_certutil(self, args, stdin=None): - new_args = [paths.CERTUTIL, "-d", self.reqdir] - new_args = new_args + args - return ipautil.run(new_args, stdin) - def setup(self): - self.reqdir = tempfile.mkdtemp(prefix = "tmp-") - self.reqfile = self.reqdir + "/test.csr" - self.pwname = self.reqdir + "/pwd" - self.certfile = self.reqdir + "/cert.crt" - - # Create an empty password file - with open(self.pwname, "w") as fp: - fp.write("\n") - + self.nssdb = NSSDatabase() + secdir = self.nssdb.secdir + self.reqfile = os.path.join(secdir, "test.csr") + self.certfile = os.path.join(secdir, "cert.crt") # Create our temporary NSS database - self.run_certutil(["-N", "-f", self.pwname]) - + self.nssdb.create_db() self.subject = DN(('CN', self.host_fqdn), subject_base()) def teardown(self): - shutil.rmtree(self.reqdir, ignore_errors=True) + self.nssdb.close() # remove tempdir def generateCSR(self, subject): - self.run_certutil(["-R", "-s", subject, - "-o", self.reqfile, - "-z", paths.GROUP, - "-f", self.pwname, - "-a", - ]) - with open(self.reqfile, "r") as fp: - data = fp.read() - return data - - host_fqdn = u'ipatestcert.%s' % api.env.domain - service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm) + self.nssdb.run_certutil([ + "-R", "-s", subject, + "-o", self.reqfile, + "-z", paths.GROUP, + "-a", + ]) + with open(self.reqfile, "rb") as f: + return f.read().decode('ascii') @pytest.mark.tier1 @@ -149,7 +133,7 @@ class test_cert(BaseCert): # First create the host that will use this policy assert 'result' in api.Command['host_add'](self.host_fqdn, force=True) - csr = unicode(self.generateCSR(str(self.subject))) + csr = self.generateCSR(str(self.subject)) with assert_raises(errors.NotFound): api.Command['cert_request'](csr, principal=self.service_princ) @@ -160,7 +144,7 @@ class test_cert(BaseCert): # Our host should exist from previous test global cert, sn - csr = unicode(self.generateCSR(str(self.subject))) + csr = self.generateCSR(str(self.subject)) res = api.Command['cert_request'](csr, principal=self.service_princ, add=True)['result'] assert DN(res['subject']) == self.subject assert 'cacn' in res @@ -190,8 +174,8 @@ class test_cert(BaseCert): See https://fedorahosted.org/freeipa/ticket/5881 """ result = api.Command.cert_show(sn, out=unicode(self.certfile)) - with open(self.certfile, "r") as f: - pem_cert = unicode(f.read()) + with open(self.certfile, "rb") as f: + pem_cert = f.read().decode('ascii') result = run(['openssl', 'x509', '-text'], stdin=pem_cert, capture_output=True) assert _EXP_CRL_URI in result.output @@ -203,7 +187,7 @@ class test_cert(BaseCert): """ global newcert - csr = unicode(self.generateCSR(str(self.subject))) + csr = self.generateCSR(str(self.subject)) res = api.Command['cert_request'](csr, principal=self.service_princ)['result'] assert DN(res['subject']) == self.subject # save the cert for the service_show/find tests @@ -473,7 +457,7 @@ class test_cert_revocation(BaseCert): assert 'result' in api.Command['host_add'](self.host_fqdn, force=True) # generate CSR, request certificate, obtain serial number - self.csr = unicode(self.generateCSR(str(self.subject))) + self.csr = self.generateCSR(str(self.subject)) res = api.Command['cert_request'](self.csr, principal=self.service_princ, add=True, all=True)['result'] diff --git a/ipatests/test_xmlrpc/testcert.py b/ipatests/test_xmlrpc/testcert.py index 3874d75f2..e6609b7f1 100644 --- a/ipatests/test_xmlrpc/testcert.py +++ b/ipatests/test_xmlrpc/testcert.py @@ -28,20 +28,15 @@ once per test run. import os import tempfile import shutil -import six import base64 import re from ipalib import api, x509 from ipaserver.plugins import rabase -from ipapython import ipautil +from ipapython import certdb from ipapython.dn import DN from ipaplatform.paths import paths -if six.PY3: - unicode = str - - _subject_base = None @@ -80,29 +75,6 @@ def get_testcert(subject, principal): return strip_cert_header(_testcert.decode('utf-8')) -def run_certutil(reqdir, args, stdin=None): - """ - Run an NSS certutil command - """ - new_args = [paths.CERTUTIL, "-d", reqdir] - new_args = new_args + args - return ipautil.run(new_args, stdin) - - -def generate_csr(reqdir, pwname, subject): - """ - Create a CSR for the given subject. - """ - req_path = os.path.join(reqdir, 'req') - run_certutil(reqdir, ["-R", "-s", subject, - "-o", req_path, - "-z", paths.GROUP, - "-f", pwname, - "-a"]) - with open(req_path, "r") as fp: - return fp.read() - - def makecert(reqdir, subject, principal): """ Generate a certificate that can be used during unit testing. @@ -114,16 +86,22 @@ def makecert(reqdir, subject, principal): raise AssertionError('The self-signed CA is not configured, ' 'see ipatests/test_xmlrpc/test_cert.py') - pwname = os.path.join(reqdir, "pwd") - - # Create an empty password file - with open(pwname, "w") as fp: - fp.write("\n") - - # Generate NSS cert database to store the private key for our CSR - run_certutil(reqdir, ["-N", "-f", pwname]) - - csr = unicode(generate_csr(reqdir, pwname, str(subject))) + nssdb = certdb.NSSDatabase(nssdir=reqdir) + with open(nssdb.pwd_file, "w") as f: + # Create an empty password file + f.write("\n") + # create db + nssdb.create_db() + # create CSR + csr_file = os.path.join(reqdir, 'req') + nssdb.run_certutil([ + "-R", "-s", str(subject), + "-o", csr_file, + "-z", paths.GROUP, + "-a" + ]) + with open(csr_file, "rb") as f: + csr = f.read().decode('ascii') res = api.Command['cert_request'](csr, principal=principal, add=True) cert = x509.load_der_x509_certificate(