diff --git a/freeipa.spec.in b/freeipa.spec.in index f65bf09a5..7c39d343d 100644 --- a/freeipa.spec.in +++ b/freeipa.spec.in @@ -1606,9 +1606,14 @@ fi %ghost %attr(0644,root,apache) %config(noreplace) %{_sysconfdir}/ipa/default.conf %ghost %attr(0644,root,apache) %config(noreplace) %{_sysconfdir}/ipa/ca.crt %dir %attr(0755,root,root) %{_sysconfdir}/ipa/nssdb +# old dbm format %ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/cert8.db %ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/key3.db %ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/secmod.db +# new sql format +%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/cert9.db +%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/key4.db +%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/pkcs11.txt %ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/pwdfile.txt %ghost %config(noreplace) %{_sysconfdir}/pki/ca-trust/source/ipa.p11-kit %dir %{_localstatedir}/lib/ipa-client diff --git a/ipaclient/install/client.py b/ipaclient/install/client.py index 2f89e7eae..6fa42e366 100644 --- a/ipaclient/install/client.py +++ b/ipaclient/install/client.py @@ -2304,9 +2304,6 @@ def create_ipa_nssdb(): db = certdb.NSSDatabase(paths.IPA_NSSDB_DIR) db.create_db(mode=0o755, backup=True) os.chmod(db.pwd_file, 0o600) - os.chmod(os.path.join(db.secdir, 'cert8.db'), 0o644) - os.chmod(os.path.join(db.secdir, 'key3.db'), 0o644) - os.chmod(os.path.join(db.secdir, 'secmod.db'), 0o644) def update_ipa_nssdb(): @@ -3067,11 +3064,8 @@ def uninstall(options): logger.error("%s failed to stop tracking certificate: %s", cmonger.service_name, e) - for filename in (os.path.join(ipa_db.secdir, 'cert8.db'), - os.path.join(ipa_db.secdir, 'key3.db'), - os.path.join(ipa_db.secdir, 'secmod.db'), - os.path.join(ipa_db.secdir, 'pwdfile.txt')): - remove_file(filename) + for filename in certdb.NSS_FILES: + remove_file(os.path.join(ipa_db.secdir, filename)) # Remove any special principal names we added to the IPA CA helper certmonger.remove_principal_from_cas() diff --git a/ipaplatform/base/constants.py b/ipaplatform/base/constants.py index 5f52b94f6..94bd0f8a1 100644 --- a/ipaplatform/base/constants.py +++ b/ipaplatform/base/constants.py @@ -37,6 +37,7 @@ class BaseConstantsNamespace(object): 'httpd_dbus_sssd': 'on', } SSSD_USER = "sssd" - + # sql (new format), dbm (old format) + NSS_DEFAULT_DBTYPE = 'dbm' constants = BaseConstantsNamespace() diff --git a/ipapython/certdb.py b/ipapython/certdb.py index 14e6adf69..4b3560fab 100644 --- a/ipapython/certdb.py +++ b/ipapython/certdb.py @@ -25,12 +25,14 @@ import io import pwd import grp import re +import stat import tempfile from tempfile import NamedTemporaryFile import shutil import cryptography.x509 +from ipaplatform.constants import constants from ipaplatform.paths import paths from ipapython.dn import DN from ipapython.kerberos import Principal @@ -42,7 +44,9 @@ logger = logging.getLogger(__name__) CA_NICKNAME_FMT = "%s IPA CA" -NSS_FILES = ("cert8.db", "key3.db", "secmod.db", "pwdfile.txt") +NSS_DBM_FILES = ("cert8.db", "key3.db", "secmod.db") +NSS_SQL_FILES = ("cert9.db", "key4.db", "pkcs11.txt") +NSS_FILES = NSS_DBM_FILES + NSS_SQL_FILES + ("pwdfile.txt",) TrustFlags = collections.namedtuple('TrustFlags', 'has_key trusted ca usages') @@ -214,14 +218,50 @@ class NSSDatabase(object): # got too tied to IPA server details, killing reusability. # BaseCertDB is a class that knows nothing about IPA. # Generic NSS DB code should be moved here. - def __init__(self, nssdir=None): + + def __init__(self, nssdir=None, dbtype='auto'): if nssdir is None: self.secdir = tempfile.mkdtemp() self._is_temporary = True + if dbtype == 'auto': + dbtype = constants.NSS_DEFAULT_DBTYPE + else: + dbtype = dbtype else: self.secdir = nssdir self._is_temporary = False + if dbtype == 'auto': + if os.path.isfile(os.path.join(self.secdir, "cert9.db")): + dbtype = 'sql' + elif os.path.isfile(os.path.join(self.secdir, "cert8.db")): + dbtype = 'dbm' + else: + dbtype = constants.NSS_DEFAULT_DBTYPE + self.pwd_file = os.path.join(self.secdir, 'pwdfile.txt') + self.dbtype = None + self.certdb = self.keydb = self.secmod = None + self.filenames = () + self._set_filenames(dbtype) + + def _set_filenames(self, dbtype): + self.dbtype = dbtype + if dbtype == 'dbm': + self.certdb = os.path.join(self.secdir, "cert8.db") + self.keydb = os.path.join(self.secdir, "key3.db") + self.secmod = os.path.join(self.secdir, "secmod.db") + elif dbtype == 'sql': + self.certdb = os.path.join(self.secdir, "cert9.db") + self.keydb = os.path.join(self.secdir, "key4.db") + self.secmod = os.path.join(self.secdir, "pkcs11.txt") + else: + raise ValueError(dbtype) + self.filenames = ( + self.certdb, + self.keydb, + self.secmod, + self.pwd_file, + ) def close(self): if self._is_temporary: @@ -234,11 +274,22 @@ class NSSDatabase(object): self.close() def run_certutil(self, args, stdin=None, **kwargs): - new_args = [paths.CERTUTIL, "-d", self.secdir] - new_args = new_args + args + new_args = [ + paths.CERTUTIL, + "-d", '{}:{}'.format(self.dbtype, self.secdir) + ] + new_args.extend(args) new_args.extend(['-f', self.pwd_file]) return ipautil.run(new_args, stdin, **kwargs) + def run_pk12util(self, args, stdin=None, **kwargs): + new_args = [ + paths.PK12UTIL, + "-d", '{}:{}'.format(self.dbtype, self.secdir) + ] + new_args.extend(args) + return ipautil.run(new_args, stdin, **kwargs) + def create_db(self, user=None, group=None, mode=None, backup=False): """Create cert DB @@ -247,13 +298,14 @@ class NSSDatabase(object): :param mode: Mode of the secdir :param backup: Backup the sedir files """ - dirmode = 0o750 - filemode = 0o640 - pwdfilemode = 0o640 if mode is not None: dirmode = mode filemode = mode & 0o666 pwdfilemode = mode & 0o660 + else: + dirmode = 0o750 + filemode = 0o640 + pwdfilemode = 0o640 uid = -1 gid = -1 @@ -263,7 +315,7 @@ class NSSDatabase(object): gid = grp.getgrnam(group).gr_gid if backup: - for filename in NSS_FILES: + for filename in self.filenames: path = os.path.join(self.secdir, filename) ipautil.backup_file(path) @@ -283,7 +335,7 @@ class NSSDatabase(object): # Finally fix up perms os.chown(self.secdir, uid, gid) os.chmod(self.secdir, dirmode) - for filename in NSS_FILES: + for filename in self.filenames: path = os.path.join(self.secdir, filename) if os.path.exists(path): os.chown(path, uid, gid) @@ -293,8 +345,54 @@ class NSSDatabase(object): new_mode = filemode os.chmod(path, new_mode) + def convert_db(self, rename_old=True): + """Convert DBM database format to SQL database format + """ + if (self.dbtype == 'sql' or + os.path.isfile(os.path.join(self.secdir, "cert9.db"))): + raise ValueError( + 'NSS DB {} has been migrated already.'.format(self.secdir) + ) + + # use certutil to migrate db to new format + # see https://bugzilla.mozilla.org/show_bug.cgi?id=1415912 + args = [ + paths.CERTUTIL, + '-d', 'sql:{}'.format(self.secdir), + '-f', self.pwd_file, + ] + if self.list_keys(): + # has keys, use 'list keys' in read-write mode + args.extend(['-K', '-X']) + else: + # no keys, create new DB with auto-migrate + args.extend(['-N', '-@', self.pwd_file]) + ipautil.run(args) + + # retain file ownership and permission, backup old files + migration = ( + ('cert8.db', 'cert9.db'), + ('key3.db', 'key4.db'), + ('secmod.db', 'pkcs11.txt'), + ) + for oldname, newname in migration: + oldname = os.path.join(self.secdir, oldname) + newname = os.path.join(self.secdir, newname) + oldstat = os.stat(oldname) + os.chmod(newname, stat.S_IMODE(oldstat.st_mode)) + os.chown(newname, oldstat.st_uid, oldstat.st_gid) + # XXX also retain SELinux context? + + self._set_filenames('sql') + self.list_certs() # self-test + + if rename_old: + for oldname, _ in migration: # pylint: disable=unused-variable + oldname = os.path.join(self.secdir, oldname) + os.rename(oldname, oldname + '.migrated') + def restore(self): - for filename in NSS_FILES: + for filename in self.filenames: path = os.path.join(self.secdir, filename) backup_path = path + '.orig' save_path = path + '.ipasave' @@ -325,6 +423,20 @@ class NSSDatabase(object): return tuple(certlist) + def list_keys(self): + result = self.run_certutil( + ["-K"], raiseonerr=False, capture_output=True + ) + if result.returncode == 255: + return () + keylist = [] + for line in result.output.splitlines(): + mo = re.match(r'^<\s*(\d+)>\s+(\w+)\s+([0-9a-z]+)\s+(.*)$', line) + if mo is not None: + slot, algo, keyid, nick = mo.groups() + keylist.append((int(slot), algo, keyid, nick.strip())) + return tuple(keylist) + def find_server_certs(self): """Return nicknames and cert flags for server certs in the database @@ -357,16 +469,17 @@ class NSSDatabase(object): return root_nicknames def export_pkcs12(self, nickname, pkcs12_filename, pkcs12_passwd=None): - args = [paths.PK12UTIL, "-d", self.secdir, - "-o", pkcs12_filename, - "-n", nickname, - "-k", self.pwd_file] + args = [ + "-o", pkcs12_filename, + "-n", nickname, + "-k", self.pwd_file + ] pkcs12_password_file = None if pkcs12_passwd is not None: pkcs12_password_file = ipautil.write_tmp_file(pkcs12_passwd + '\n') - args = args + ["-w", pkcs12_password_file.name] + args.extend(["-w", pkcs12_password_file.name]) try: - ipautil.run(args) + self.run_pk12util(args) except ipautil.CalledProcessError as e: if e.returncode == 17: raise RuntimeError("incorrect password for pkcs#12 file %s" % @@ -381,15 +494,17 @@ class NSSDatabase(object): pkcs12_password_file.close() def import_pkcs12(self, pkcs12_filename, pkcs12_passwd=None): - args = [paths.PK12UTIL, "-d", self.secdir, - "-i", pkcs12_filename, - "-k", self.pwd_file, '-v'] + args = [ + "-i", pkcs12_filename, + "-k", self.pwd_file, + "-v" + ] pkcs12_password_file = None if pkcs12_passwd is not None: pkcs12_password_file = ipautil.write_tmp_file(pkcs12_passwd + '\n') - args = args + ["-w", pkcs12_password_file.name] + args.extend(["-w", pkcs12_password_file.name]) try: - ipautil.run(args) + self.run_pk12util(args) except ipautil.CalledProcessError as e: if e.returncode == 17: raise RuntimeError("incorrect password for pkcs#12 file %s" % diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index a40aff5e9..1579d5c09 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -103,18 +103,20 @@ class CertDB(object): # TODO: Remove all selfsign code def __init__(self, realm, nssdir, fstore=None, host_name=None, subject_base=None, ca_subject=None, - user=None, group=None, mode=None, create=False): - self.nssdb = NSSDatabase(nssdir) + user=None, group=None, mode=None, create=False, + dbtype='auto'): + self.nssdb = NSSDatabase(nssdir, dbtype=dbtype) self.secdir = nssdir self.realm = realm - self.noise_fname = self.secdir + "/noise.txt" - self.certdb_fname = self.secdir + "/cert8.db" - self.keydb_fname = self.secdir + "/key3.db" - self.secmod_fname = self.secdir + "/secmod.db" - self.pk12_fname = self.secdir + "/cacert.p12" - self.pin_fname = self.secdir + "/pin.txt" + self.noise_fname = os.path.join(self.secdir, "noise.txt") + + self.certdb_fname = self.nssdb.certdb + self.keydb_fname = self.nssdb.keydb + self.secmod_fname = self.nssdb.secmod + self.pk12_fname = os.path.join(self.secdir, "cacert.p12") + self.pin_fname = os.path.join(self.secdir + "pin.txt") self.reqdir = None self.certreq_fname = None self.certder_fname = None @@ -171,15 +173,7 @@ class CertDB(object): """ Checks whether all NSS database files + our pwd_file exist """ - db_files = ( - self.secdir, - self.certdb_fname, - self.keydb_fname, - self.secmod_fname, - self.nssdb.pwd_file, - ) - - for f in db_files: + for f in self.nssdb.filenames: if not os.path.exists(f): return False return True @@ -291,11 +285,12 @@ class CertDB(object): if create_pkcs12: ipautil.backup_file(self.pk12_fname) - ipautil.run([paths.PK12UTIL, "-d", self.secdir, - "-o", self.pk12_fname, - "-n", self.cacert_name, - "-w", self.passwd_fname, - "-k", self.passwd_fname]) + self.nssdb.run_pk12util([ + "-o", self.pk12_fname, + "-n", self.cacert_name, + "-k", self.passwd_fname, + "-w", self.passwd_fname, + ]) self.set_perms(self.pk12_fname) def load_cacert(self, cacert_fname, trust_flags): @@ -514,11 +509,12 @@ class CertDB(object): if nickname is None: nickname = get_ca_nickname(api.env.realm) - ipautil.run([paths.PK12UTIL, "-d", self.secdir, - "-o", pkcs12_fname, - "-n", nickname, - "-k", self.passwd_fname, - "-w", pkcs12_pwd_fname]) + self.nssdb.run_pk12util([ + "-o", pkcs12_fname, + "-n", nickname, + "-k", self.passwd_fname, + "-w", pkcs12_pwd_fname + ]) def create_from_cacert(self): cacert_fname = paths.IPA_CA_CRT diff --git a/ipaserver/install/ipa_backup.py b/ipaserver/install/ipa_backup.py index d8ff395fd..475d846e6 100644 --- a/ipaserver/install/ipa_backup.py +++ b/ipaserver/install/ipa_backup.py @@ -25,20 +25,13 @@ import time import pwd import six -# pylint: disable=import-error -if six.PY3: - # The SafeConfigParser class has been renamed to ConfigParser in Py3 - from configparser import ConfigParser as SafeConfigParser -else: - from ConfigParser import SafeConfigParser -# pylint: enable=import-error from ipaplatform.paths import paths from ipaplatform import services from ipalib import api, errors from ipapython import version from ipapython.ipautil import run, write_tmp_file -from ipapython import admintool +from ipapython import admintool, certdb from ipapython.dn import DN from ipaserver.install.replication import wait_for_task from ipaserver.install import installutils @@ -46,7 +39,13 @@ from ipapython import ipaldap from ipaplatform.constants import constants from ipaplatform.tasks import tasks - +# pylint: disable=import-error +if six.PY3: + # The SafeConfigParser class has been renamed to ConfigParser in Py3 + from configparser import ConfigParser as SafeConfigParser +else: + from ConfigParser import SafeConfigParser +# pylint: enable=import-error ISO8601_DATETIME_FMT = '%Y-%m-%dT%H:%M:%S' logger = logging.getLogger(__name__) @@ -194,7 +193,7 @@ class Backup(admintool.AdminTool): paths.HOSTS, ) + tuple( os.path.join(paths.IPA_NSSDB_DIR, file) - for file in ('cert8.db', 'key3.db', 'secmod.db') + for file in (certdb.NSS_DBM_FILES + certdb.NSS_SQL_FILES) ) logs=( diff --git a/ipaserver/install/ipa_replica_prepare.py b/ipaserver/install/ipa_replica_prepare.py index 1b8221885..0be0302dd 100644 --- a/ipaserver/install/ipa_replica_prepare.py +++ b/ipaserver/install/ipa_replica_prepare.py @@ -32,6 +32,20 @@ from optparse import OptionGroup, SUPPRESS_HELP import dns.resolver import six + +from ipaserver.install import certs, installutils, bindinstance, dsinstance, ca +from ipaserver.install.replication import enable_replication_version_checking +from ipaserver.install.server.replicainstall import install_ca_cert +from ipaserver.install.bindinstance import ( + add_zone, add_fwd_rr, add_ptr_rr, dns_container_exists) +from ipapython import ipautil, admintool, certdb +from ipapython.dn import DN +from ipapython import version +from ipalib import api +from ipalib import errors +from ipaplatform.paths import paths +from ipalib.constants import DOMAIN_LEVEL_0 + # pylint: disable=import-error if six.PY3: # The SafeConfigParser class has been renamed to ConfigParser in Py3 @@ -40,18 +54,6 @@ else: from ConfigParser import SafeConfigParser # pylint: enable=import-error -from ipaserver.install import certs, installutils, bindinstance, dsinstance, ca -from ipaserver.install.replication import enable_replication_version_checking -from ipaserver.install.server.replicainstall import install_ca_cert -from ipaserver.install.bindinstance import ( - add_zone, add_fwd_rr, add_ptr_rr, dns_container_exists) -from ipapython import ipautil, admintool -from ipapython.dn import DN -from ipapython import version -from ipalib import api -from ipalib import errors -from ipaplatform.paths import paths -from ipalib.constants import DOMAIN_LEVEL_0 logger = logging.getLogger(__name__) @@ -565,9 +567,8 @@ class ReplicaPrepare(admintool.AdminTool): installutils.remove_file(pkcs12_fname) installutils.remove_file(passwd_fname) - self.remove_info_file("cert8.db") - self.remove_info_file("key3.db") - self.remove_info_file("secmod.db") + for fname in (certdb.NSS_SQL_FILES + certdb.NSS_SQL_FILES): + self.remove_info_file(fname) self.remove_info_file("noise.txt") orig_filename = passwd_fname + ".orig" diff --git a/ipaserver/install/ipa_restore.py b/ipaserver/install/ipa_restore.py index 1dd7b0f8a..99e6297b6 100644 --- a/ipaserver/install/ipa_restore.py +++ b/ipaserver/install/ipa_restore.py @@ -27,20 +27,13 @@ import ldif import itertools import six -# pylint: disable=import-error -if six.PY3: - # The SafeConfigParser class has been renamed to ConfigParser in Py3 - from configparser import ConfigParser as SafeConfigParser -else: - from ConfigParser import SafeConfigParser -# pylint: enable=import-error from ipaclient.install.client import update_ipa_nssdb from ipalib import api, errors from ipalib.constants import FQDN from ipapython import version, ipautil from ipapython.ipautil import run, user_input -from ipapython import admintool +from ipapython import admintool, certdb from ipapython.dn import DN from ipaserver.install.replication import (wait_for_task, ReplicationManager, get_cs_replication_manager) @@ -58,6 +51,14 @@ try: except ImportError: adtrustinstance = None +# pylint: disable=import-error +if six.PY3: + # The SafeConfigParser class has been renamed to ConfigParser in Py3 + from configparser import ConfigParser as SafeConfigParser +else: + from ConfigParser import SafeConfigParser +# pylint: enable=import-error + logger = logging.getLogger(__name__) @@ -847,7 +848,7 @@ class Restore(admintool.AdminTool): krbinstance.KrbInstance().stop_tracking_certs() - for basename in ('cert8.db', 'key3.db', 'secmod.db', 'pwdfile.txt'): + for basename in certdb.NSS_FILES: filename = os.path.join(paths.IPA_NSSDB_DIR, basename) try: ipautil.backup_file(filename) diff --git a/ipaserver/install/ipa_server_certinstall.py b/ipaserver/install/ipa_server_certinstall.py index 36c0a586d..dd2c590e6 100644 --- a/ipaserver/install/ipa_server_certinstall.py +++ b/ipaserver/install/ipa_server_certinstall.py @@ -30,9 +30,10 @@ from ipalib.install import certmonger from ipaplatform.constants import constants from ipaplatform.paths import paths from ipapython import admintool -from ipapython.certdb import (get_ca_nickname, - NSSDatabase, - verify_kdc_cert_validity) +from ipapython.certdb import ( + NSS_DBM_FILES, NSS_SQL_FILES, NSSDatabase, get_ca_nickname, + verify_kdc_cert_validity +) from ipapython.dn import DN from ipalib import api, errors from ipaserver.install import certs, dsinstance, installutils, krbinstance @@ -170,14 +171,12 @@ class ServerCertInstall(admintool.AdminTool): quotes=False) # Fix the database permissions - os.chmod(os.path.join(dirname, 'cert8.db'), 0o640) - os.chmod(os.path.join(dirname, 'key3.db'), 0o640) - os.chmod(os.path.join(dirname, 'secmod.db'), 0o640) - pent = pwd.getpwnam(constants.HTTPD_USER) - os.chown(os.path.join(dirname, 'cert8.db'), 0, pent.pw_gid) - os.chown(os.path.join(dirname, 'key3.db'), 0, pent.pw_gid) - os.chown(os.path.join(dirname, 'secmod.db'), 0, pent.pw_gid) + for filename in (NSS_DBM_FILES + NSS_SQL_FILES): + absname = os.path.join(dirname, filename) + if os.path.isfile(absname): + os.chmod(absname, 0o640) + os.chown(absname, 0, pent.pw_gid) def install_kdc_cert(self): ca_cert_file = paths.CA_BUNDLE_PEM diff --git a/ipaserver/secrets/store.py b/ipaserver/secrets/store.py index eecf83f37..f23a0c396 100644 --- a/ipaserver/secrets/store.py +++ b/ipaserver/secrets/store.py @@ -6,6 +6,7 @@ from custodia.store.interface import CSStore # pylint: disable=relative-import from jwcrypto.common import json_decode, json_encode from ipaplatform.paths import paths from ipapython import ipautil +from ipapython.certdb import NSSDatabase from ipaserver.secrets.common import iSecLdap import ldap import os @@ -65,10 +66,11 @@ class NSSWrappedCertDB(DBMAPHandler): '--wrap-nickname', self.wrap_nick, '--target-nickname', self.target_nick, '-o', wrapped_key_file]) - ipautil.run([ - paths.CERTUTIL, '-d', self.nssdb_path, + nssdb = NSSDatabase(self.nssdb_path) + nssdb.run_certutil([ '-L', '-n', self.target_nick, - '-a', '-o', certificate_file]) + '-a', '-o', certificate_file, + ]) with open(wrapped_key_file, 'rb') as f: wrapped_key = f.read() with open(certificate_file, 'r') as f: @@ -102,12 +104,13 @@ class NSSCertDB(DBMAPHandler): with open(pk12pwfile, 'w') as f: f.write(password) pk12file = os.path.join(tdir, 'pk12file') - ipautil.run([paths.PK12UTIL, - "-d", self.nssdb_path, - "-o", pk12file, - "-n", self.nickname, - "-k", self.nssdb_pwdfile, - "-w", pk12pwfile]) + nssdb = NSSDatabase(self.nssdb_path) + nssdb.run_pk12util([ + "-o", pk12file, + "-n", self.nickname, + "-k", self.nssdb_pwdfile, + "-w", pk12pwfile, + ]) with open(pk12file, 'rb') as f: data = f.read() finally: @@ -125,12 +128,13 @@ class NSSCertDB(DBMAPHandler): pk12file = os.path.join(tdir, 'pk12file') with open(pk12file, 'wb') as f: f.write(b64decode(v['pkcs12 data'])) - ipautil.run([paths.PK12UTIL, - "-d", self.nssdb_path, - "-i", pk12file, - "-n", self.nickname, - "-k", self.nssdb_pwdfile, - "-w", pk12pwfile]) + nssdb = NSSDatabase(self.nssdb_path) + nssdb.run_pk12util([ + "-i", pk12file, + "-n", self.nickname, + "-k", self.nssdb_pwdfile, + "-w", pk12pwfile, + ]) finally: shutil.rmtree(tdir) diff --git a/ipatests/pytest_plugins/integration/tasks.py b/ipatests/pytest_plugins/integration/tasks.py index a33710a45..b407145ac 100644 --- a/ipatests/pytest_plugins/integration/tasks.py +++ b/ipatests/pytest_plugins/integration/tasks.py @@ -35,6 +35,7 @@ from six import StringIO from ipapython import ipautil from ipaplatform.paths import paths +from ipaplatform.constants import constants from ipapython.dn import DN from ipalib import errors from ipalib.util import get_reverse_zone_default, verify_host_resolvable @@ -1262,9 +1263,12 @@ def run_server_del(host, server_to_delete, force=False, return host.run_command(args, raiseonerr=False) -def run_certutil(host, args, reqdir, stdin=None, raiseonerr=True): - new_args = [paths.CERTUTIL, "-d", reqdir] - new_args = " ".join(new_args + args) +def run_certutil(host, args, reqdir, dbtype=None, + stdin=None, raiseonerr=True): + if dbtype is None: + dbtype = constants.NSS_DEFAULT_DBTYPE + new_args = [paths.CERTUTIL, '-d', '{}:{}'.format(dbtype, reqdir)] + new_args.extend(args) return host.run_command(new_args, raiseonerr=raiseonerr, stdin_text=stdin) diff --git a/ipatests/test_ipapython/test_certdb.py b/ipatests/test_ipapython/test_certdb.py new file mode 100644 index 000000000..0dc450555 --- /dev/null +++ b/ipatests/test_ipapython/test_certdb.py @@ -0,0 +1,133 @@ +import os + +from ipapython.certdb import NSSDatabase, TRUSTED_PEER_TRUST_FLAGS + +CERTNICK = 'testcert' + + +def create_selfsigned(nssdb): + # create self-signed cert + key + noisefile = os.path.join(nssdb.secdir, 'noise') + with open(noisefile, 'wb') as f: + f.write(os.urandom(64)) + try: + nssdb.run_certutil([ + '-S', '-x', + '-z', noisefile, + '-k', 'rsa', '-g', '2048', '-Z', 'SHA256', + '-t', 'CTu,Cu,Cu', + '-s', 'CN=testcert', + '-n', CERTNICK, + '-m', '365', + ]) + finally: + os.unlink(noisefile) + + +def test_dbm_tmp(): + with NSSDatabase(dbtype='dbm') as nssdb: + assert nssdb.dbtype == 'dbm' + + for filename in nssdb.filenames: + assert not os.path.isfile(filename) + + nssdb.create_db() + for filename in nssdb.filenames: + assert os.path.isfile(filename) + assert os.path.dirname(filename) == nssdb.secdir + + assert os.path.basename(nssdb.certdb) == 'cert8.db' + assert nssdb.certdb in nssdb.filenames + assert os.path.basename(nssdb.keydb) == 'key3.db' + assert os.path.basename(nssdb.secmod) == 'secmod.db' + + +def test_sql_tmp(): + with NSSDatabase(dbtype='sql') as nssdb: + assert nssdb.dbtype == 'sql' + + for filename in nssdb.filenames: + assert not os.path.isfile(filename) + + nssdb.create_db() + for filename in nssdb.filenames: + assert os.path.isfile(filename) + assert os.path.dirname(filename) == nssdb.secdir + + assert os.path.basename(nssdb.certdb) == 'cert9.db' + assert nssdb.certdb in nssdb.filenames + assert os.path.basename(nssdb.keydb) == 'key4.db' + assert os.path.basename(nssdb.secmod) == 'pkcs11.txt' + + +def test_convert_db(): + with NSSDatabase(dbtype='dbm') as nssdb: + assert nssdb.dbtype == 'dbm' + + nssdb.create_db() + + create_selfsigned(nssdb) + + oldcerts = nssdb.list_certs() + assert len(oldcerts) == 1 + oldkeys = nssdb.list_keys() + assert len(oldkeys) == 1 + + nssdb.convert_db() + + assert nssdb.dbtype == 'sql' + newcerts = nssdb.list_certs() + assert len(newcerts) == 1 + assert newcerts == oldcerts + newkeys = nssdb.list_keys() + assert len(newkeys) == 1 + assert newkeys == oldkeys + + for filename in nssdb.filenames: + assert os.path.isfile(filename) + assert os.path.dirname(filename) == nssdb.secdir + + assert os.path.basename(nssdb.certdb) == 'cert9.db' + assert nssdb.certdb in nssdb.filenames + assert os.path.basename(nssdb.keydb) == 'key4.db' + assert os.path.basename(nssdb.secmod) == 'pkcs11.txt' + + +def test_convert_db_nokey(): + with NSSDatabase(dbtype='dbm') as nssdb: + assert nssdb.dbtype == 'dbm' + nssdb.create_db() + + create_selfsigned(nssdb) + + assert len(nssdb.list_certs()) == 1 + assert len(nssdb.list_keys()) == 1 + # remove key, readd cert + cert = nssdb.get_cert(CERTNICK) + nssdb.run_certutil(['-F', '-n', CERTNICK]) + nssdb.add_cert(cert, CERTNICK, TRUSTED_PEER_TRUST_FLAGS) + assert len(nssdb.list_keys()) == 0 + oldcerts = nssdb.list_certs() + assert len(oldcerts) == 1 + + nssdb.convert_db() + assert nssdb.dbtype == 'sql' + newcerts = nssdb.list_certs() + assert len(newcerts) == 1 + assert newcerts == oldcerts + assert nssdb.get_cert(CERTNICK) == cert + newkeys = nssdb.list_keys() + assert newkeys == () + + for filename in nssdb.filenames: + assert os.path.isfile(filename) + assert os.path.dirname(filename) == nssdb.secdir + + old = os.path.join(nssdb.secdir, 'cert8.db') + assert not os.path.isfile(old) + assert os.path.isfile(old + '.migrated') + + assert os.path.basename(nssdb.certdb) == 'cert9.db' + assert nssdb.certdb in nssdb.filenames + assert os.path.basename(nssdb.keydb) == 'key4.db' + assert os.path.basename(nssdb.secmod) == 'pkcs11.txt' diff --git a/ipatests/test_xmlrpc/test_cert_plugin.py b/ipatests/test_xmlrpc/test_cert_plugin.py index 840830340..34c169b4c 100644 --- a/ipatests/test_xmlrpc/test_cert_plugin.py +++ b/ipatests/test_xmlrpc/test_cert_plugin.py @@ -25,13 +25,11 @@ import base64 import nose import os import pytest -import shutil import six -import tempfile from ipalib import api from ipalib import errors from ipaplatform.paths import paths -from ipapython import ipautil +from ipapython.certdb import NSSDatabase from ipapython.dn import DN from ipapython.ipautil import run from ipatests.test_xmlrpc.testcert import subject_base @@ -77,8 +75,9 @@ def is_db_configured(): # The API tested depends on the value of ~/.ipa/default/ra_plugin when # running as the lite-server. - class BaseCert(XMLRPC_test): + host_fqdn = u'ipatestcert.%s' % api.env.domain + service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm) @classmethod def setup_class(cls): @@ -91,42 +90,27 @@ class BaseCert(XMLRPC_test): is_db_configured() - def run_certutil(self, args, stdin=None): - new_args = [paths.CERTUTIL, "-d", self.reqdir] - new_args = new_args + args - return ipautil.run(new_args, stdin) - def setup(self): - self.reqdir = tempfile.mkdtemp(prefix = "tmp-") - self.reqfile = self.reqdir + "/test.csr" - self.pwname = self.reqdir + "/pwd" - self.certfile = self.reqdir + "/cert.crt" - - # Create an empty password file - with open(self.pwname, "w") as fp: - fp.write("\n") - + self.nssdb = NSSDatabase() + secdir = self.nssdb.secdir + self.reqfile = os.path.join(secdir, "test.csr") + self.certfile = os.path.join(secdir, "cert.crt") # Create our temporary NSS database - self.run_certutil(["-N", "-f", self.pwname]) - + self.nssdb.create_db() self.subject = DN(('CN', self.host_fqdn), subject_base()) def teardown(self): - shutil.rmtree(self.reqdir, ignore_errors=True) + self.nssdb.close() # remove tempdir def generateCSR(self, subject): - self.run_certutil(["-R", "-s", subject, - "-o", self.reqfile, - "-z", paths.GROUP, - "-f", self.pwname, - "-a", - ]) - with open(self.reqfile, "r") as fp: - data = fp.read() - return data - - host_fqdn = u'ipatestcert.%s' % api.env.domain - service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm) + self.nssdb.run_certutil([ + "-R", "-s", subject, + "-o", self.reqfile, + "-z", paths.GROUP, + "-a", + ]) + with open(self.reqfile, "rb") as f: + return f.read().decode('ascii') @pytest.mark.tier1 @@ -149,7 +133,7 @@ class test_cert(BaseCert): # First create the host that will use this policy assert 'result' in api.Command['host_add'](self.host_fqdn, force=True) - csr = unicode(self.generateCSR(str(self.subject))) + csr = self.generateCSR(str(self.subject)) with assert_raises(errors.NotFound): api.Command['cert_request'](csr, principal=self.service_princ) @@ -160,7 +144,7 @@ class test_cert(BaseCert): # Our host should exist from previous test global cert, sn - csr = unicode(self.generateCSR(str(self.subject))) + csr = self.generateCSR(str(self.subject)) res = api.Command['cert_request'](csr, principal=self.service_princ, add=True)['result'] assert DN(res['subject']) == self.subject assert 'cacn' in res @@ -190,8 +174,8 @@ class test_cert(BaseCert): See https://fedorahosted.org/freeipa/ticket/5881 """ result = api.Command.cert_show(sn, out=unicode(self.certfile)) - with open(self.certfile, "r") as f: - pem_cert = unicode(f.read()) + with open(self.certfile, "rb") as f: + pem_cert = f.read().decode('ascii') result = run(['openssl', 'x509', '-text'], stdin=pem_cert, capture_output=True) assert _EXP_CRL_URI in result.output @@ -203,7 +187,7 @@ class test_cert(BaseCert): """ global newcert - csr = unicode(self.generateCSR(str(self.subject))) + csr = self.generateCSR(str(self.subject)) res = api.Command['cert_request'](csr, principal=self.service_princ)['result'] assert DN(res['subject']) == self.subject # save the cert for the service_show/find tests @@ -473,7 +457,7 @@ class test_cert_revocation(BaseCert): assert 'result' in api.Command['host_add'](self.host_fqdn, force=True) # generate CSR, request certificate, obtain serial number - self.csr = unicode(self.generateCSR(str(self.subject))) + self.csr = self.generateCSR(str(self.subject)) res = api.Command['cert_request'](self.csr, principal=self.service_princ, add=True, all=True)['result'] diff --git a/ipatests/test_xmlrpc/testcert.py b/ipatests/test_xmlrpc/testcert.py index 3874d75f2..e6609b7f1 100644 --- a/ipatests/test_xmlrpc/testcert.py +++ b/ipatests/test_xmlrpc/testcert.py @@ -28,20 +28,15 @@ once per test run. import os import tempfile import shutil -import six import base64 import re from ipalib import api, x509 from ipaserver.plugins import rabase -from ipapython import ipautil +from ipapython import certdb from ipapython.dn import DN from ipaplatform.paths import paths -if six.PY3: - unicode = str - - _subject_base = None @@ -80,29 +75,6 @@ def get_testcert(subject, principal): return strip_cert_header(_testcert.decode('utf-8')) -def run_certutil(reqdir, args, stdin=None): - """ - Run an NSS certutil command - """ - new_args = [paths.CERTUTIL, "-d", reqdir] - new_args = new_args + args - return ipautil.run(new_args, stdin) - - -def generate_csr(reqdir, pwname, subject): - """ - Create a CSR for the given subject. - """ - req_path = os.path.join(reqdir, 'req') - run_certutil(reqdir, ["-R", "-s", subject, - "-o", req_path, - "-z", paths.GROUP, - "-f", pwname, - "-a"]) - with open(req_path, "r") as fp: - return fp.read() - - def makecert(reqdir, subject, principal): """ Generate a certificate that can be used during unit testing. @@ -114,16 +86,22 @@ def makecert(reqdir, subject, principal): raise AssertionError('The self-signed CA is not configured, ' 'see ipatests/test_xmlrpc/test_cert.py') - pwname = os.path.join(reqdir, "pwd") - - # Create an empty password file - with open(pwname, "w") as fp: - fp.write("\n") - - # Generate NSS cert database to store the private key for our CSR - run_certutil(reqdir, ["-N", "-f", pwname]) - - csr = unicode(generate_csr(reqdir, pwname, str(subject))) + nssdb = certdb.NSSDatabase(nssdir=reqdir) + with open(nssdb.pwd_file, "w") as f: + # Create an empty password file + f.write("\n") + # create db + nssdb.create_db() + # create CSR + csr_file = os.path.join(reqdir, 'req') + nssdb.run_certutil([ + "-R", "-s", str(subject), + "-o", csr_file, + "-z", paths.GROUP, + "-a" + ]) + with open(csr_file, "rb") as f: + csr = f.read().decode('ascii') res = api.Command['cert_request'](csr, principal=principal, add=True) cert = x509.load_der_x509_certificate(