mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Support sqlite NSSDB
Prepare CertDB and NSSDatabase to support sqlite DB format. NSSDatabase will automatically detect and use either old DBM or new SQL format. Old databases are not migrated yet. https://pagure.io/freeipa/issue/7049 Signed-off-by: Christian Heimes <cheimes@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
parent
64a88d597c
commit
0071744929
@ -1606,9 +1606,14 @@ fi
|
|||||||
%ghost %attr(0644,root,apache) %config(noreplace) %{_sysconfdir}/ipa/default.conf
|
%ghost %attr(0644,root,apache) %config(noreplace) %{_sysconfdir}/ipa/default.conf
|
||||||
%ghost %attr(0644,root,apache) %config(noreplace) %{_sysconfdir}/ipa/ca.crt
|
%ghost %attr(0644,root,apache) %config(noreplace) %{_sysconfdir}/ipa/ca.crt
|
||||||
%dir %attr(0755,root,root) %{_sysconfdir}/ipa/nssdb
|
%dir %attr(0755,root,root) %{_sysconfdir}/ipa/nssdb
|
||||||
|
# old dbm format
|
||||||
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/cert8.db
|
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/cert8.db
|
||||||
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/key3.db
|
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/key3.db
|
||||||
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/secmod.db
|
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/secmod.db
|
||||||
|
# new sql format
|
||||||
|
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/cert9.db
|
||||||
|
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/key4.db
|
||||||
|
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/pkcs11.txt
|
||||||
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/pwdfile.txt
|
%ghost %config(noreplace) %{_sysconfdir}/ipa/nssdb/pwdfile.txt
|
||||||
%ghost %config(noreplace) %{_sysconfdir}/pki/ca-trust/source/ipa.p11-kit
|
%ghost %config(noreplace) %{_sysconfdir}/pki/ca-trust/source/ipa.p11-kit
|
||||||
%dir %{_localstatedir}/lib/ipa-client
|
%dir %{_localstatedir}/lib/ipa-client
|
||||||
|
@ -2304,9 +2304,6 @@ def create_ipa_nssdb():
|
|||||||
db = certdb.NSSDatabase(paths.IPA_NSSDB_DIR)
|
db = certdb.NSSDatabase(paths.IPA_NSSDB_DIR)
|
||||||
db.create_db(mode=0o755, backup=True)
|
db.create_db(mode=0o755, backup=True)
|
||||||
os.chmod(db.pwd_file, 0o600)
|
os.chmod(db.pwd_file, 0o600)
|
||||||
os.chmod(os.path.join(db.secdir, 'cert8.db'), 0o644)
|
|
||||||
os.chmod(os.path.join(db.secdir, 'key3.db'), 0o644)
|
|
||||||
os.chmod(os.path.join(db.secdir, 'secmod.db'), 0o644)
|
|
||||||
|
|
||||||
|
|
||||||
def update_ipa_nssdb():
|
def update_ipa_nssdb():
|
||||||
@ -3067,11 +3064,8 @@ def uninstall(options):
|
|||||||
logger.error("%s failed to stop tracking certificate: %s",
|
logger.error("%s failed to stop tracking certificate: %s",
|
||||||
cmonger.service_name, e)
|
cmonger.service_name, e)
|
||||||
|
|
||||||
for filename in (os.path.join(ipa_db.secdir, 'cert8.db'),
|
for filename in certdb.NSS_FILES:
|
||||||
os.path.join(ipa_db.secdir, 'key3.db'),
|
remove_file(os.path.join(ipa_db.secdir, filename))
|
||||||
os.path.join(ipa_db.secdir, 'secmod.db'),
|
|
||||||
os.path.join(ipa_db.secdir, 'pwdfile.txt')):
|
|
||||||
remove_file(filename)
|
|
||||||
|
|
||||||
# Remove any special principal names we added to the IPA CA helper
|
# Remove any special principal names we added to the IPA CA helper
|
||||||
certmonger.remove_principal_from_cas()
|
certmonger.remove_principal_from_cas()
|
||||||
|
@ -37,6 +37,7 @@ class BaseConstantsNamespace(object):
|
|||||||
'httpd_dbus_sssd': 'on',
|
'httpd_dbus_sssd': 'on',
|
||||||
}
|
}
|
||||||
SSSD_USER = "sssd"
|
SSSD_USER = "sssd"
|
||||||
|
# sql (new format), dbm (old format)
|
||||||
|
NSS_DEFAULT_DBTYPE = 'dbm'
|
||||||
|
|
||||||
constants = BaseConstantsNamespace()
|
constants = BaseConstantsNamespace()
|
||||||
|
@ -25,12 +25,14 @@ import io
|
|||||||
import pwd
|
import pwd
|
||||||
import grp
|
import grp
|
||||||
import re
|
import re
|
||||||
|
import stat
|
||||||
import tempfile
|
import tempfile
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
import cryptography.x509
|
import cryptography.x509
|
||||||
|
|
||||||
|
from ipaplatform.constants import constants
|
||||||
from ipaplatform.paths import paths
|
from ipaplatform.paths import paths
|
||||||
from ipapython.dn import DN
|
from ipapython.dn import DN
|
||||||
from ipapython.kerberos import Principal
|
from ipapython.kerberos import Principal
|
||||||
@ -42,7 +44,9 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
CA_NICKNAME_FMT = "%s IPA CA"
|
CA_NICKNAME_FMT = "%s IPA CA"
|
||||||
|
|
||||||
NSS_FILES = ("cert8.db", "key3.db", "secmod.db", "pwdfile.txt")
|
NSS_DBM_FILES = ("cert8.db", "key3.db", "secmod.db")
|
||||||
|
NSS_SQL_FILES = ("cert9.db", "key4.db", "pkcs11.txt")
|
||||||
|
NSS_FILES = NSS_DBM_FILES + NSS_SQL_FILES + ("pwdfile.txt",)
|
||||||
|
|
||||||
TrustFlags = collections.namedtuple('TrustFlags', 'has_key trusted ca usages')
|
TrustFlags = collections.namedtuple('TrustFlags', 'has_key trusted ca usages')
|
||||||
|
|
||||||
@ -214,14 +218,50 @@ class NSSDatabase(object):
|
|||||||
# got too tied to IPA server details, killing reusability.
|
# got too tied to IPA server details, killing reusability.
|
||||||
# BaseCertDB is a class that knows nothing about IPA.
|
# BaseCertDB is a class that knows nothing about IPA.
|
||||||
# Generic NSS DB code should be moved here.
|
# Generic NSS DB code should be moved here.
|
||||||
def __init__(self, nssdir=None):
|
|
||||||
|
def __init__(self, nssdir=None, dbtype='auto'):
|
||||||
if nssdir is None:
|
if nssdir is None:
|
||||||
self.secdir = tempfile.mkdtemp()
|
self.secdir = tempfile.mkdtemp()
|
||||||
self._is_temporary = True
|
self._is_temporary = True
|
||||||
|
if dbtype == 'auto':
|
||||||
|
dbtype = constants.NSS_DEFAULT_DBTYPE
|
||||||
|
else:
|
||||||
|
dbtype = dbtype
|
||||||
else:
|
else:
|
||||||
self.secdir = nssdir
|
self.secdir = nssdir
|
||||||
self._is_temporary = False
|
self._is_temporary = False
|
||||||
|
if dbtype == 'auto':
|
||||||
|
if os.path.isfile(os.path.join(self.secdir, "cert9.db")):
|
||||||
|
dbtype = 'sql'
|
||||||
|
elif os.path.isfile(os.path.join(self.secdir, "cert8.db")):
|
||||||
|
dbtype = 'dbm'
|
||||||
|
else:
|
||||||
|
dbtype = constants.NSS_DEFAULT_DBTYPE
|
||||||
|
|
||||||
self.pwd_file = os.path.join(self.secdir, 'pwdfile.txt')
|
self.pwd_file = os.path.join(self.secdir, 'pwdfile.txt')
|
||||||
|
self.dbtype = None
|
||||||
|
self.certdb = self.keydb = self.secmod = None
|
||||||
|
self.filenames = ()
|
||||||
|
self._set_filenames(dbtype)
|
||||||
|
|
||||||
|
def _set_filenames(self, dbtype):
|
||||||
|
self.dbtype = dbtype
|
||||||
|
if dbtype == 'dbm':
|
||||||
|
self.certdb = os.path.join(self.secdir, "cert8.db")
|
||||||
|
self.keydb = os.path.join(self.secdir, "key3.db")
|
||||||
|
self.secmod = os.path.join(self.secdir, "secmod.db")
|
||||||
|
elif dbtype == 'sql':
|
||||||
|
self.certdb = os.path.join(self.secdir, "cert9.db")
|
||||||
|
self.keydb = os.path.join(self.secdir, "key4.db")
|
||||||
|
self.secmod = os.path.join(self.secdir, "pkcs11.txt")
|
||||||
|
else:
|
||||||
|
raise ValueError(dbtype)
|
||||||
|
self.filenames = (
|
||||||
|
self.certdb,
|
||||||
|
self.keydb,
|
||||||
|
self.secmod,
|
||||||
|
self.pwd_file,
|
||||||
|
)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self._is_temporary:
|
if self._is_temporary:
|
||||||
@ -234,11 +274,22 @@ class NSSDatabase(object):
|
|||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
def run_certutil(self, args, stdin=None, **kwargs):
|
def run_certutil(self, args, stdin=None, **kwargs):
|
||||||
new_args = [paths.CERTUTIL, "-d", self.secdir]
|
new_args = [
|
||||||
new_args = new_args + args
|
paths.CERTUTIL,
|
||||||
|
"-d", '{}:{}'.format(self.dbtype, self.secdir)
|
||||||
|
]
|
||||||
|
new_args.extend(args)
|
||||||
new_args.extend(['-f', self.pwd_file])
|
new_args.extend(['-f', self.pwd_file])
|
||||||
return ipautil.run(new_args, stdin, **kwargs)
|
return ipautil.run(new_args, stdin, **kwargs)
|
||||||
|
|
||||||
|
def run_pk12util(self, args, stdin=None, **kwargs):
|
||||||
|
new_args = [
|
||||||
|
paths.PK12UTIL,
|
||||||
|
"-d", '{}:{}'.format(self.dbtype, self.secdir)
|
||||||
|
]
|
||||||
|
new_args.extend(args)
|
||||||
|
return ipautil.run(new_args, stdin, **kwargs)
|
||||||
|
|
||||||
def create_db(self, user=None, group=None, mode=None, backup=False):
|
def create_db(self, user=None, group=None, mode=None, backup=False):
|
||||||
"""Create cert DB
|
"""Create cert DB
|
||||||
|
|
||||||
@ -247,13 +298,14 @@ class NSSDatabase(object):
|
|||||||
:param mode: Mode of the secdir
|
:param mode: Mode of the secdir
|
||||||
:param backup: Backup the sedir files
|
:param backup: Backup the sedir files
|
||||||
"""
|
"""
|
||||||
dirmode = 0o750
|
|
||||||
filemode = 0o640
|
|
||||||
pwdfilemode = 0o640
|
|
||||||
if mode is not None:
|
if mode is not None:
|
||||||
dirmode = mode
|
dirmode = mode
|
||||||
filemode = mode & 0o666
|
filemode = mode & 0o666
|
||||||
pwdfilemode = mode & 0o660
|
pwdfilemode = mode & 0o660
|
||||||
|
else:
|
||||||
|
dirmode = 0o750
|
||||||
|
filemode = 0o640
|
||||||
|
pwdfilemode = 0o640
|
||||||
|
|
||||||
uid = -1
|
uid = -1
|
||||||
gid = -1
|
gid = -1
|
||||||
@ -263,7 +315,7 @@ class NSSDatabase(object):
|
|||||||
gid = grp.getgrnam(group).gr_gid
|
gid = grp.getgrnam(group).gr_gid
|
||||||
|
|
||||||
if backup:
|
if backup:
|
||||||
for filename in NSS_FILES:
|
for filename in self.filenames:
|
||||||
path = os.path.join(self.secdir, filename)
|
path = os.path.join(self.secdir, filename)
|
||||||
ipautil.backup_file(path)
|
ipautil.backup_file(path)
|
||||||
|
|
||||||
@ -283,7 +335,7 @@ class NSSDatabase(object):
|
|||||||
# Finally fix up perms
|
# Finally fix up perms
|
||||||
os.chown(self.secdir, uid, gid)
|
os.chown(self.secdir, uid, gid)
|
||||||
os.chmod(self.secdir, dirmode)
|
os.chmod(self.secdir, dirmode)
|
||||||
for filename in NSS_FILES:
|
for filename in self.filenames:
|
||||||
path = os.path.join(self.secdir, filename)
|
path = os.path.join(self.secdir, filename)
|
||||||
if os.path.exists(path):
|
if os.path.exists(path):
|
||||||
os.chown(path, uid, gid)
|
os.chown(path, uid, gid)
|
||||||
@ -293,8 +345,54 @@ class NSSDatabase(object):
|
|||||||
new_mode = filemode
|
new_mode = filemode
|
||||||
os.chmod(path, new_mode)
|
os.chmod(path, new_mode)
|
||||||
|
|
||||||
|
def convert_db(self, rename_old=True):
|
||||||
|
"""Convert DBM database format to SQL database format
|
||||||
|
"""
|
||||||
|
if (self.dbtype == 'sql' or
|
||||||
|
os.path.isfile(os.path.join(self.secdir, "cert9.db"))):
|
||||||
|
raise ValueError(
|
||||||
|
'NSS DB {} has been migrated already.'.format(self.secdir)
|
||||||
|
)
|
||||||
|
|
||||||
|
# use certutil to migrate db to new format
|
||||||
|
# see https://bugzilla.mozilla.org/show_bug.cgi?id=1415912
|
||||||
|
args = [
|
||||||
|
paths.CERTUTIL,
|
||||||
|
'-d', 'sql:{}'.format(self.secdir),
|
||||||
|
'-f', self.pwd_file,
|
||||||
|
]
|
||||||
|
if self.list_keys():
|
||||||
|
# has keys, use 'list keys' in read-write mode
|
||||||
|
args.extend(['-K', '-X'])
|
||||||
|
else:
|
||||||
|
# no keys, create new DB with auto-migrate
|
||||||
|
args.extend(['-N', '-@', self.pwd_file])
|
||||||
|
ipautil.run(args)
|
||||||
|
|
||||||
|
# retain file ownership and permission, backup old files
|
||||||
|
migration = (
|
||||||
|
('cert8.db', 'cert9.db'),
|
||||||
|
('key3.db', 'key4.db'),
|
||||||
|
('secmod.db', 'pkcs11.txt'),
|
||||||
|
)
|
||||||
|
for oldname, newname in migration:
|
||||||
|
oldname = os.path.join(self.secdir, oldname)
|
||||||
|
newname = os.path.join(self.secdir, newname)
|
||||||
|
oldstat = os.stat(oldname)
|
||||||
|
os.chmod(newname, stat.S_IMODE(oldstat.st_mode))
|
||||||
|
os.chown(newname, oldstat.st_uid, oldstat.st_gid)
|
||||||
|
# XXX also retain SELinux context?
|
||||||
|
|
||||||
|
self._set_filenames('sql')
|
||||||
|
self.list_certs() # self-test
|
||||||
|
|
||||||
|
if rename_old:
|
||||||
|
for oldname, _ in migration: # pylint: disable=unused-variable
|
||||||
|
oldname = os.path.join(self.secdir, oldname)
|
||||||
|
os.rename(oldname, oldname + '.migrated')
|
||||||
|
|
||||||
def restore(self):
|
def restore(self):
|
||||||
for filename in NSS_FILES:
|
for filename in self.filenames:
|
||||||
path = os.path.join(self.secdir, filename)
|
path = os.path.join(self.secdir, filename)
|
||||||
backup_path = path + '.orig'
|
backup_path = path + '.orig'
|
||||||
save_path = path + '.ipasave'
|
save_path = path + '.ipasave'
|
||||||
@ -325,6 +423,20 @@ class NSSDatabase(object):
|
|||||||
|
|
||||||
return tuple(certlist)
|
return tuple(certlist)
|
||||||
|
|
||||||
|
def list_keys(self):
|
||||||
|
result = self.run_certutil(
|
||||||
|
["-K"], raiseonerr=False, capture_output=True
|
||||||
|
)
|
||||||
|
if result.returncode == 255:
|
||||||
|
return ()
|
||||||
|
keylist = []
|
||||||
|
for line in result.output.splitlines():
|
||||||
|
mo = re.match(r'^<\s*(\d+)>\s+(\w+)\s+([0-9a-z]+)\s+(.*)$', line)
|
||||||
|
if mo is not None:
|
||||||
|
slot, algo, keyid, nick = mo.groups()
|
||||||
|
keylist.append((int(slot), algo, keyid, nick.strip()))
|
||||||
|
return tuple(keylist)
|
||||||
|
|
||||||
def find_server_certs(self):
|
def find_server_certs(self):
|
||||||
"""Return nicknames and cert flags for server certs in the database
|
"""Return nicknames and cert flags for server certs in the database
|
||||||
|
|
||||||
@ -357,16 +469,17 @@ class NSSDatabase(object):
|
|||||||
return root_nicknames
|
return root_nicknames
|
||||||
|
|
||||||
def export_pkcs12(self, nickname, pkcs12_filename, pkcs12_passwd=None):
|
def export_pkcs12(self, nickname, pkcs12_filename, pkcs12_passwd=None):
|
||||||
args = [paths.PK12UTIL, "-d", self.secdir,
|
args = [
|
||||||
"-o", pkcs12_filename,
|
"-o", pkcs12_filename,
|
||||||
"-n", nickname,
|
"-n", nickname,
|
||||||
"-k", self.pwd_file]
|
"-k", self.pwd_file
|
||||||
|
]
|
||||||
pkcs12_password_file = None
|
pkcs12_password_file = None
|
||||||
if pkcs12_passwd is not None:
|
if pkcs12_passwd is not None:
|
||||||
pkcs12_password_file = ipautil.write_tmp_file(pkcs12_passwd + '\n')
|
pkcs12_password_file = ipautil.write_tmp_file(pkcs12_passwd + '\n')
|
||||||
args = args + ["-w", pkcs12_password_file.name]
|
args.extend(["-w", pkcs12_password_file.name])
|
||||||
try:
|
try:
|
||||||
ipautil.run(args)
|
self.run_pk12util(args)
|
||||||
except ipautil.CalledProcessError as e:
|
except ipautil.CalledProcessError as e:
|
||||||
if e.returncode == 17:
|
if e.returncode == 17:
|
||||||
raise RuntimeError("incorrect password for pkcs#12 file %s" %
|
raise RuntimeError("incorrect password for pkcs#12 file %s" %
|
||||||
@ -381,15 +494,17 @@ class NSSDatabase(object):
|
|||||||
pkcs12_password_file.close()
|
pkcs12_password_file.close()
|
||||||
|
|
||||||
def import_pkcs12(self, pkcs12_filename, pkcs12_passwd=None):
|
def import_pkcs12(self, pkcs12_filename, pkcs12_passwd=None):
|
||||||
args = [paths.PK12UTIL, "-d", self.secdir,
|
args = [
|
||||||
"-i", pkcs12_filename,
|
"-i", pkcs12_filename,
|
||||||
"-k", self.pwd_file, '-v']
|
"-k", self.pwd_file,
|
||||||
|
"-v"
|
||||||
|
]
|
||||||
pkcs12_password_file = None
|
pkcs12_password_file = None
|
||||||
if pkcs12_passwd is not None:
|
if pkcs12_passwd is not None:
|
||||||
pkcs12_password_file = ipautil.write_tmp_file(pkcs12_passwd + '\n')
|
pkcs12_password_file = ipautil.write_tmp_file(pkcs12_passwd + '\n')
|
||||||
args = args + ["-w", pkcs12_password_file.name]
|
args.extend(["-w", pkcs12_password_file.name])
|
||||||
try:
|
try:
|
||||||
ipautil.run(args)
|
self.run_pk12util(args)
|
||||||
except ipautil.CalledProcessError as e:
|
except ipautil.CalledProcessError as e:
|
||||||
if e.returncode == 17:
|
if e.returncode == 17:
|
||||||
raise RuntimeError("incorrect password for pkcs#12 file %s" %
|
raise RuntimeError("incorrect password for pkcs#12 file %s" %
|
||||||
|
@ -103,18 +103,20 @@ class CertDB(object):
|
|||||||
# TODO: Remove all selfsign code
|
# TODO: Remove all selfsign code
|
||||||
def __init__(self, realm, nssdir, fstore=None,
|
def __init__(self, realm, nssdir, fstore=None,
|
||||||
host_name=None, subject_base=None, ca_subject=None,
|
host_name=None, subject_base=None, ca_subject=None,
|
||||||
user=None, group=None, mode=None, create=False):
|
user=None, group=None, mode=None, create=False,
|
||||||
self.nssdb = NSSDatabase(nssdir)
|
dbtype='auto'):
|
||||||
|
self.nssdb = NSSDatabase(nssdir, dbtype=dbtype)
|
||||||
|
|
||||||
self.secdir = nssdir
|
self.secdir = nssdir
|
||||||
self.realm = realm
|
self.realm = realm
|
||||||
|
|
||||||
self.noise_fname = self.secdir + "/noise.txt"
|
self.noise_fname = os.path.join(self.secdir, "noise.txt")
|
||||||
self.certdb_fname = self.secdir + "/cert8.db"
|
|
||||||
self.keydb_fname = self.secdir + "/key3.db"
|
self.certdb_fname = self.nssdb.certdb
|
||||||
self.secmod_fname = self.secdir + "/secmod.db"
|
self.keydb_fname = self.nssdb.keydb
|
||||||
self.pk12_fname = self.secdir + "/cacert.p12"
|
self.secmod_fname = self.nssdb.secmod
|
||||||
self.pin_fname = self.secdir + "/pin.txt"
|
self.pk12_fname = os.path.join(self.secdir, "cacert.p12")
|
||||||
|
self.pin_fname = os.path.join(self.secdir + "pin.txt")
|
||||||
self.reqdir = None
|
self.reqdir = None
|
||||||
self.certreq_fname = None
|
self.certreq_fname = None
|
||||||
self.certder_fname = None
|
self.certder_fname = None
|
||||||
@ -171,15 +173,7 @@ class CertDB(object):
|
|||||||
"""
|
"""
|
||||||
Checks whether all NSS database files + our pwd_file exist
|
Checks whether all NSS database files + our pwd_file exist
|
||||||
"""
|
"""
|
||||||
db_files = (
|
for f in self.nssdb.filenames:
|
||||||
self.secdir,
|
|
||||||
self.certdb_fname,
|
|
||||||
self.keydb_fname,
|
|
||||||
self.secmod_fname,
|
|
||||||
self.nssdb.pwd_file,
|
|
||||||
)
|
|
||||||
|
|
||||||
for f in db_files:
|
|
||||||
if not os.path.exists(f):
|
if not os.path.exists(f):
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
@ -291,11 +285,12 @@ class CertDB(object):
|
|||||||
|
|
||||||
if create_pkcs12:
|
if create_pkcs12:
|
||||||
ipautil.backup_file(self.pk12_fname)
|
ipautil.backup_file(self.pk12_fname)
|
||||||
ipautil.run([paths.PK12UTIL, "-d", self.secdir,
|
self.nssdb.run_pk12util([
|
||||||
"-o", self.pk12_fname,
|
"-o", self.pk12_fname,
|
||||||
"-n", self.cacert_name,
|
"-n", self.cacert_name,
|
||||||
"-w", self.passwd_fname,
|
"-k", self.passwd_fname,
|
||||||
"-k", self.passwd_fname])
|
"-w", self.passwd_fname,
|
||||||
|
])
|
||||||
self.set_perms(self.pk12_fname)
|
self.set_perms(self.pk12_fname)
|
||||||
|
|
||||||
def load_cacert(self, cacert_fname, trust_flags):
|
def load_cacert(self, cacert_fname, trust_flags):
|
||||||
@ -514,11 +509,12 @@ class CertDB(object):
|
|||||||
if nickname is None:
|
if nickname is None:
|
||||||
nickname = get_ca_nickname(api.env.realm)
|
nickname = get_ca_nickname(api.env.realm)
|
||||||
|
|
||||||
ipautil.run([paths.PK12UTIL, "-d", self.secdir,
|
self.nssdb.run_pk12util([
|
||||||
"-o", pkcs12_fname,
|
"-o", pkcs12_fname,
|
||||||
"-n", nickname,
|
"-n", nickname,
|
||||||
"-k", self.passwd_fname,
|
"-k", self.passwd_fname,
|
||||||
"-w", pkcs12_pwd_fname])
|
"-w", pkcs12_pwd_fname
|
||||||
|
])
|
||||||
|
|
||||||
def create_from_cacert(self):
|
def create_from_cacert(self):
|
||||||
cacert_fname = paths.IPA_CA_CRT
|
cacert_fname = paths.IPA_CA_CRT
|
||||||
|
@ -25,20 +25,13 @@ import time
|
|||||||
import pwd
|
import pwd
|
||||||
|
|
||||||
import six
|
import six
|
||||||
# pylint: disable=import-error
|
|
||||||
if six.PY3:
|
|
||||||
# The SafeConfigParser class has been renamed to ConfigParser in Py3
|
|
||||||
from configparser import ConfigParser as SafeConfigParser
|
|
||||||
else:
|
|
||||||
from ConfigParser import SafeConfigParser
|
|
||||||
# pylint: enable=import-error
|
|
||||||
|
|
||||||
from ipaplatform.paths import paths
|
from ipaplatform.paths import paths
|
||||||
from ipaplatform import services
|
from ipaplatform import services
|
||||||
from ipalib import api, errors
|
from ipalib import api, errors
|
||||||
from ipapython import version
|
from ipapython import version
|
||||||
from ipapython.ipautil import run, write_tmp_file
|
from ipapython.ipautil import run, write_tmp_file
|
||||||
from ipapython import admintool
|
from ipapython import admintool, certdb
|
||||||
from ipapython.dn import DN
|
from ipapython.dn import DN
|
||||||
from ipaserver.install.replication import wait_for_task
|
from ipaserver.install.replication import wait_for_task
|
||||||
from ipaserver.install import installutils
|
from ipaserver.install import installutils
|
||||||
@ -46,7 +39,13 @@ from ipapython import ipaldap
|
|||||||
from ipaplatform.constants import constants
|
from ipaplatform.constants import constants
|
||||||
from ipaplatform.tasks import tasks
|
from ipaplatform.tasks import tasks
|
||||||
|
|
||||||
|
# pylint: disable=import-error
|
||||||
|
if six.PY3:
|
||||||
|
# The SafeConfigParser class has been renamed to ConfigParser in Py3
|
||||||
|
from configparser import ConfigParser as SafeConfigParser
|
||||||
|
else:
|
||||||
|
from ConfigParser import SafeConfigParser
|
||||||
|
# pylint: enable=import-error
|
||||||
ISO8601_DATETIME_FMT = '%Y-%m-%dT%H:%M:%S'
|
ISO8601_DATETIME_FMT = '%Y-%m-%dT%H:%M:%S'
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -194,7 +193,7 @@ class Backup(admintool.AdminTool):
|
|||||||
paths.HOSTS,
|
paths.HOSTS,
|
||||||
) + tuple(
|
) + tuple(
|
||||||
os.path.join(paths.IPA_NSSDB_DIR, file)
|
os.path.join(paths.IPA_NSSDB_DIR, file)
|
||||||
for file in ('cert8.db', 'key3.db', 'secmod.db')
|
for file in (certdb.NSS_DBM_FILES + certdb.NSS_SQL_FILES)
|
||||||
)
|
)
|
||||||
|
|
||||||
logs=(
|
logs=(
|
||||||
|
@ -32,6 +32,20 @@ from optparse import OptionGroup, SUPPRESS_HELP
|
|||||||
|
|
||||||
import dns.resolver
|
import dns.resolver
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
from ipaserver.install import certs, installutils, bindinstance, dsinstance, ca
|
||||||
|
from ipaserver.install.replication import enable_replication_version_checking
|
||||||
|
from ipaserver.install.server.replicainstall import install_ca_cert
|
||||||
|
from ipaserver.install.bindinstance import (
|
||||||
|
add_zone, add_fwd_rr, add_ptr_rr, dns_container_exists)
|
||||||
|
from ipapython import ipautil, admintool, certdb
|
||||||
|
from ipapython.dn import DN
|
||||||
|
from ipapython import version
|
||||||
|
from ipalib import api
|
||||||
|
from ipalib import errors
|
||||||
|
from ipaplatform.paths import paths
|
||||||
|
from ipalib.constants import DOMAIN_LEVEL_0
|
||||||
|
|
||||||
# pylint: disable=import-error
|
# pylint: disable=import-error
|
||||||
if six.PY3:
|
if six.PY3:
|
||||||
# The SafeConfigParser class has been renamed to ConfigParser in Py3
|
# The SafeConfigParser class has been renamed to ConfigParser in Py3
|
||||||
@ -40,18 +54,6 @@ else:
|
|||||||
from ConfigParser import SafeConfigParser
|
from ConfigParser import SafeConfigParser
|
||||||
# pylint: enable=import-error
|
# pylint: enable=import-error
|
||||||
|
|
||||||
from ipaserver.install import certs, installutils, bindinstance, dsinstance, ca
|
|
||||||
from ipaserver.install.replication import enable_replication_version_checking
|
|
||||||
from ipaserver.install.server.replicainstall import install_ca_cert
|
|
||||||
from ipaserver.install.bindinstance import (
|
|
||||||
add_zone, add_fwd_rr, add_ptr_rr, dns_container_exists)
|
|
||||||
from ipapython import ipautil, admintool
|
|
||||||
from ipapython.dn import DN
|
|
||||||
from ipapython import version
|
|
||||||
from ipalib import api
|
|
||||||
from ipalib import errors
|
|
||||||
from ipaplatform.paths import paths
|
|
||||||
from ipalib.constants import DOMAIN_LEVEL_0
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -565,9 +567,8 @@ class ReplicaPrepare(admintool.AdminTool):
|
|||||||
installutils.remove_file(pkcs12_fname)
|
installutils.remove_file(pkcs12_fname)
|
||||||
installutils.remove_file(passwd_fname)
|
installutils.remove_file(passwd_fname)
|
||||||
|
|
||||||
self.remove_info_file("cert8.db")
|
for fname in (certdb.NSS_SQL_FILES + certdb.NSS_SQL_FILES):
|
||||||
self.remove_info_file("key3.db")
|
self.remove_info_file(fname)
|
||||||
self.remove_info_file("secmod.db")
|
|
||||||
self.remove_info_file("noise.txt")
|
self.remove_info_file("noise.txt")
|
||||||
|
|
||||||
orig_filename = passwd_fname + ".orig"
|
orig_filename = passwd_fname + ".orig"
|
||||||
|
@ -27,20 +27,13 @@ import ldif
|
|||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
import six
|
import six
|
||||||
# pylint: disable=import-error
|
|
||||||
if six.PY3:
|
|
||||||
# The SafeConfigParser class has been renamed to ConfigParser in Py3
|
|
||||||
from configparser import ConfigParser as SafeConfigParser
|
|
||||||
else:
|
|
||||||
from ConfigParser import SafeConfigParser
|
|
||||||
# pylint: enable=import-error
|
|
||||||
|
|
||||||
from ipaclient.install.client import update_ipa_nssdb
|
from ipaclient.install.client import update_ipa_nssdb
|
||||||
from ipalib import api, errors
|
from ipalib import api, errors
|
||||||
from ipalib.constants import FQDN
|
from ipalib.constants import FQDN
|
||||||
from ipapython import version, ipautil
|
from ipapython import version, ipautil
|
||||||
from ipapython.ipautil import run, user_input
|
from ipapython.ipautil import run, user_input
|
||||||
from ipapython import admintool
|
from ipapython import admintool, certdb
|
||||||
from ipapython.dn import DN
|
from ipapython.dn import DN
|
||||||
from ipaserver.install.replication import (wait_for_task, ReplicationManager,
|
from ipaserver.install.replication import (wait_for_task, ReplicationManager,
|
||||||
get_cs_replication_manager)
|
get_cs_replication_manager)
|
||||||
@ -58,6 +51,14 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
adtrustinstance = None
|
adtrustinstance = None
|
||||||
|
|
||||||
|
# pylint: disable=import-error
|
||||||
|
if six.PY3:
|
||||||
|
# The SafeConfigParser class has been renamed to ConfigParser in Py3
|
||||||
|
from configparser import ConfigParser as SafeConfigParser
|
||||||
|
else:
|
||||||
|
from ConfigParser import SafeConfigParser
|
||||||
|
# pylint: enable=import-error
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -847,7 +848,7 @@ class Restore(admintool.AdminTool):
|
|||||||
|
|
||||||
krbinstance.KrbInstance().stop_tracking_certs()
|
krbinstance.KrbInstance().stop_tracking_certs()
|
||||||
|
|
||||||
for basename in ('cert8.db', 'key3.db', 'secmod.db', 'pwdfile.txt'):
|
for basename in certdb.NSS_FILES:
|
||||||
filename = os.path.join(paths.IPA_NSSDB_DIR, basename)
|
filename = os.path.join(paths.IPA_NSSDB_DIR, basename)
|
||||||
try:
|
try:
|
||||||
ipautil.backup_file(filename)
|
ipautil.backup_file(filename)
|
||||||
|
@ -30,9 +30,10 @@ from ipalib.install import certmonger
|
|||||||
from ipaplatform.constants import constants
|
from ipaplatform.constants import constants
|
||||||
from ipaplatform.paths import paths
|
from ipaplatform.paths import paths
|
||||||
from ipapython import admintool
|
from ipapython import admintool
|
||||||
from ipapython.certdb import (get_ca_nickname,
|
from ipapython.certdb import (
|
||||||
NSSDatabase,
|
NSS_DBM_FILES, NSS_SQL_FILES, NSSDatabase, get_ca_nickname,
|
||||||
verify_kdc_cert_validity)
|
verify_kdc_cert_validity
|
||||||
|
)
|
||||||
from ipapython.dn import DN
|
from ipapython.dn import DN
|
||||||
from ipalib import api, errors
|
from ipalib import api, errors
|
||||||
from ipaserver.install import certs, dsinstance, installutils, krbinstance
|
from ipaserver.install import certs, dsinstance, installutils, krbinstance
|
||||||
@ -170,14 +171,12 @@ class ServerCertInstall(admintool.AdminTool):
|
|||||||
quotes=False)
|
quotes=False)
|
||||||
|
|
||||||
# Fix the database permissions
|
# Fix the database permissions
|
||||||
os.chmod(os.path.join(dirname, 'cert8.db'), 0o640)
|
|
||||||
os.chmod(os.path.join(dirname, 'key3.db'), 0o640)
|
|
||||||
os.chmod(os.path.join(dirname, 'secmod.db'), 0o640)
|
|
||||||
|
|
||||||
pent = pwd.getpwnam(constants.HTTPD_USER)
|
pent = pwd.getpwnam(constants.HTTPD_USER)
|
||||||
os.chown(os.path.join(dirname, 'cert8.db'), 0, pent.pw_gid)
|
for filename in (NSS_DBM_FILES + NSS_SQL_FILES):
|
||||||
os.chown(os.path.join(dirname, 'key3.db'), 0, pent.pw_gid)
|
absname = os.path.join(dirname, filename)
|
||||||
os.chown(os.path.join(dirname, 'secmod.db'), 0, pent.pw_gid)
|
if os.path.isfile(absname):
|
||||||
|
os.chmod(absname, 0o640)
|
||||||
|
os.chown(absname, 0, pent.pw_gid)
|
||||||
|
|
||||||
def install_kdc_cert(self):
|
def install_kdc_cert(self):
|
||||||
ca_cert_file = paths.CA_BUNDLE_PEM
|
ca_cert_file = paths.CA_BUNDLE_PEM
|
||||||
|
@ -6,6 +6,7 @@ from custodia.store.interface import CSStore # pylint: disable=relative-import
|
|||||||
from jwcrypto.common import json_decode, json_encode
|
from jwcrypto.common import json_decode, json_encode
|
||||||
from ipaplatform.paths import paths
|
from ipaplatform.paths import paths
|
||||||
from ipapython import ipautil
|
from ipapython import ipautil
|
||||||
|
from ipapython.certdb import NSSDatabase
|
||||||
from ipaserver.secrets.common import iSecLdap
|
from ipaserver.secrets.common import iSecLdap
|
||||||
import ldap
|
import ldap
|
||||||
import os
|
import os
|
||||||
@ -65,10 +66,11 @@ class NSSWrappedCertDB(DBMAPHandler):
|
|||||||
'--wrap-nickname', self.wrap_nick,
|
'--wrap-nickname', self.wrap_nick,
|
||||||
'--target-nickname', self.target_nick,
|
'--target-nickname', self.target_nick,
|
||||||
'-o', wrapped_key_file])
|
'-o', wrapped_key_file])
|
||||||
ipautil.run([
|
nssdb = NSSDatabase(self.nssdb_path)
|
||||||
paths.CERTUTIL, '-d', self.nssdb_path,
|
nssdb.run_certutil([
|
||||||
'-L', '-n', self.target_nick,
|
'-L', '-n', self.target_nick,
|
||||||
'-a', '-o', certificate_file])
|
'-a', '-o', certificate_file,
|
||||||
|
])
|
||||||
with open(wrapped_key_file, 'rb') as f:
|
with open(wrapped_key_file, 'rb') as f:
|
||||||
wrapped_key = f.read()
|
wrapped_key = f.read()
|
||||||
with open(certificate_file, 'r') as f:
|
with open(certificate_file, 'r') as f:
|
||||||
@ -102,12 +104,13 @@ class NSSCertDB(DBMAPHandler):
|
|||||||
with open(pk12pwfile, 'w') as f:
|
with open(pk12pwfile, 'w') as f:
|
||||||
f.write(password)
|
f.write(password)
|
||||||
pk12file = os.path.join(tdir, 'pk12file')
|
pk12file = os.path.join(tdir, 'pk12file')
|
||||||
ipautil.run([paths.PK12UTIL,
|
nssdb = NSSDatabase(self.nssdb_path)
|
||||||
"-d", self.nssdb_path,
|
nssdb.run_pk12util([
|
||||||
"-o", pk12file,
|
"-o", pk12file,
|
||||||
"-n", self.nickname,
|
"-n", self.nickname,
|
||||||
"-k", self.nssdb_pwdfile,
|
"-k", self.nssdb_pwdfile,
|
||||||
"-w", pk12pwfile])
|
"-w", pk12pwfile,
|
||||||
|
])
|
||||||
with open(pk12file, 'rb') as f:
|
with open(pk12file, 'rb') as f:
|
||||||
data = f.read()
|
data = f.read()
|
||||||
finally:
|
finally:
|
||||||
@ -125,12 +128,13 @@ class NSSCertDB(DBMAPHandler):
|
|||||||
pk12file = os.path.join(tdir, 'pk12file')
|
pk12file = os.path.join(tdir, 'pk12file')
|
||||||
with open(pk12file, 'wb') as f:
|
with open(pk12file, 'wb') as f:
|
||||||
f.write(b64decode(v['pkcs12 data']))
|
f.write(b64decode(v['pkcs12 data']))
|
||||||
ipautil.run([paths.PK12UTIL,
|
nssdb = NSSDatabase(self.nssdb_path)
|
||||||
"-d", self.nssdb_path,
|
nssdb.run_pk12util([
|
||||||
"-i", pk12file,
|
"-i", pk12file,
|
||||||
"-n", self.nickname,
|
"-n", self.nickname,
|
||||||
"-k", self.nssdb_pwdfile,
|
"-k", self.nssdb_pwdfile,
|
||||||
"-w", pk12pwfile])
|
"-w", pk12pwfile,
|
||||||
|
])
|
||||||
finally:
|
finally:
|
||||||
shutil.rmtree(tdir)
|
shutil.rmtree(tdir)
|
||||||
|
|
||||||
|
@ -35,6 +35,7 @@ from six import StringIO
|
|||||||
|
|
||||||
from ipapython import ipautil
|
from ipapython import ipautil
|
||||||
from ipaplatform.paths import paths
|
from ipaplatform.paths import paths
|
||||||
|
from ipaplatform.constants import constants
|
||||||
from ipapython.dn import DN
|
from ipapython.dn import DN
|
||||||
from ipalib import errors
|
from ipalib import errors
|
||||||
from ipalib.util import get_reverse_zone_default, verify_host_resolvable
|
from ipalib.util import get_reverse_zone_default, verify_host_resolvable
|
||||||
@ -1262,9 +1263,12 @@ def run_server_del(host, server_to_delete, force=False,
|
|||||||
return host.run_command(args, raiseonerr=False)
|
return host.run_command(args, raiseonerr=False)
|
||||||
|
|
||||||
|
|
||||||
def run_certutil(host, args, reqdir, stdin=None, raiseonerr=True):
|
def run_certutil(host, args, reqdir, dbtype=None,
|
||||||
new_args = [paths.CERTUTIL, "-d", reqdir]
|
stdin=None, raiseonerr=True):
|
||||||
new_args = " ".join(new_args + args)
|
if dbtype is None:
|
||||||
|
dbtype = constants.NSS_DEFAULT_DBTYPE
|
||||||
|
new_args = [paths.CERTUTIL, '-d', '{}:{}'.format(dbtype, reqdir)]
|
||||||
|
new_args.extend(args)
|
||||||
return host.run_command(new_args, raiseonerr=raiseonerr,
|
return host.run_command(new_args, raiseonerr=raiseonerr,
|
||||||
stdin_text=stdin)
|
stdin_text=stdin)
|
||||||
|
|
||||||
|
133
ipatests/test_ipapython/test_certdb.py
Normal file
133
ipatests/test_ipapython/test_certdb.py
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from ipapython.certdb import NSSDatabase, TRUSTED_PEER_TRUST_FLAGS
|
||||||
|
|
||||||
|
CERTNICK = 'testcert'
|
||||||
|
|
||||||
|
|
||||||
|
def create_selfsigned(nssdb):
|
||||||
|
# create self-signed cert + key
|
||||||
|
noisefile = os.path.join(nssdb.secdir, 'noise')
|
||||||
|
with open(noisefile, 'wb') as f:
|
||||||
|
f.write(os.urandom(64))
|
||||||
|
try:
|
||||||
|
nssdb.run_certutil([
|
||||||
|
'-S', '-x',
|
||||||
|
'-z', noisefile,
|
||||||
|
'-k', 'rsa', '-g', '2048', '-Z', 'SHA256',
|
||||||
|
'-t', 'CTu,Cu,Cu',
|
||||||
|
'-s', 'CN=testcert',
|
||||||
|
'-n', CERTNICK,
|
||||||
|
'-m', '365',
|
||||||
|
])
|
||||||
|
finally:
|
||||||
|
os.unlink(noisefile)
|
||||||
|
|
||||||
|
|
||||||
|
def test_dbm_tmp():
|
||||||
|
with NSSDatabase(dbtype='dbm') as nssdb:
|
||||||
|
assert nssdb.dbtype == 'dbm'
|
||||||
|
|
||||||
|
for filename in nssdb.filenames:
|
||||||
|
assert not os.path.isfile(filename)
|
||||||
|
|
||||||
|
nssdb.create_db()
|
||||||
|
for filename in nssdb.filenames:
|
||||||
|
assert os.path.isfile(filename)
|
||||||
|
assert os.path.dirname(filename) == nssdb.secdir
|
||||||
|
|
||||||
|
assert os.path.basename(nssdb.certdb) == 'cert8.db'
|
||||||
|
assert nssdb.certdb in nssdb.filenames
|
||||||
|
assert os.path.basename(nssdb.keydb) == 'key3.db'
|
||||||
|
assert os.path.basename(nssdb.secmod) == 'secmod.db'
|
||||||
|
|
||||||
|
|
||||||
|
def test_sql_tmp():
|
||||||
|
with NSSDatabase(dbtype='sql') as nssdb:
|
||||||
|
assert nssdb.dbtype == 'sql'
|
||||||
|
|
||||||
|
for filename in nssdb.filenames:
|
||||||
|
assert not os.path.isfile(filename)
|
||||||
|
|
||||||
|
nssdb.create_db()
|
||||||
|
for filename in nssdb.filenames:
|
||||||
|
assert os.path.isfile(filename)
|
||||||
|
assert os.path.dirname(filename) == nssdb.secdir
|
||||||
|
|
||||||
|
assert os.path.basename(nssdb.certdb) == 'cert9.db'
|
||||||
|
assert nssdb.certdb in nssdb.filenames
|
||||||
|
assert os.path.basename(nssdb.keydb) == 'key4.db'
|
||||||
|
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
|
||||||
|
|
||||||
|
|
||||||
|
def test_convert_db():
|
||||||
|
with NSSDatabase(dbtype='dbm') as nssdb:
|
||||||
|
assert nssdb.dbtype == 'dbm'
|
||||||
|
|
||||||
|
nssdb.create_db()
|
||||||
|
|
||||||
|
create_selfsigned(nssdb)
|
||||||
|
|
||||||
|
oldcerts = nssdb.list_certs()
|
||||||
|
assert len(oldcerts) == 1
|
||||||
|
oldkeys = nssdb.list_keys()
|
||||||
|
assert len(oldkeys) == 1
|
||||||
|
|
||||||
|
nssdb.convert_db()
|
||||||
|
|
||||||
|
assert nssdb.dbtype == 'sql'
|
||||||
|
newcerts = nssdb.list_certs()
|
||||||
|
assert len(newcerts) == 1
|
||||||
|
assert newcerts == oldcerts
|
||||||
|
newkeys = nssdb.list_keys()
|
||||||
|
assert len(newkeys) == 1
|
||||||
|
assert newkeys == oldkeys
|
||||||
|
|
||||||
|
for filename in nssdb.filenames:
|
||||||
|
assert os.path.isfile(filename)
|
||||||
|
assert os.path.dirname(filename) == nssdb.secdir
|
||||||
|
|
||||||
|
assert os.path.basename(nssdb.certdb) == 'cert9.db'
|
||||||
|
assert nssdb.certdb in nssdb.filenames
|
||||||
|
assert os.path.basename(nssdb.keydb) == 'key4.db'
|
||||||
|
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
|
||||||
|
|
||||||
|
|
||||||
|
def test_convert_db_nokey():
|
||||||
|
with NSSDatabase(dbtype='dbm') as nssdb:
|
||||||
|
assert nssdb.dbtype == 'dbm'
|
||||||
|
nssdb.create_db()
|
||||||
|
|
||||||
|
create_selfsigned(nssdb)
|
||||||
|
|
||||||
|
assert len(nssdb.list_certs()) == 1
|
||||||
|
assert len(nssdb.list_keys()) == 1
|
||||||
|
# remove key, readd cert
|
||||||
|
cert = nssdb.get_cert(CERTNICK)
|
||||||
|
nssdb.run_certutil(['-F', '-n', CERTNICK])
|
||||||
|
nssdb.add_cert(cert, CERTNICK, TRUSTED_PEER_TRUST_FLAGS)
|
||||||
|
assert len(nssdb.list_keys()) == 0
|
||||||
|
oldcerts = nssdb.list_certs()
|
||||||
|
assert len(oldcerts) == 1
|
||||||
|
|
||||||
|
nssdb.convert_db()
|
||||||
|
assert nssdb.dbtype == 'sql'
|
||||||
|
newcerts = nssdb.list_certs()
|
||||||
|
assert len(newcerts) == 1
|
||||||
|
assert newcerts == oldcerts
|
||||||
|
assert nssdb.get_cert(CERTNICK) == cert
|
||||||
|
newkeys = nssdb.list_keys()
|
||||||
|
assert newkeys == ()
|
||||||
|
|
||||||
|
for filename in nssdb.filenames:
|
||||||
|
assert os.path.isfile(filename)
|
||||||
|
assert os.path.dirname(filename) == nssdb.secdir
|
||||||
|
|
||||||
|
old = os.path.join(nssdb.secdir, 'cert8.db')
|
||||||
|
assert not os.path.isfile(old)
|
||||||
|
assert os.path.isfile(old + '.migrated')
|
||||||
|
|
||||||
|
assert os.path.basename(nssdb.certdb) == 'cert9.db'
|
||||||
|
assert nssdb.certdb in nssdb.filenames
|
||||||
|
assert os.path.basename(nssdb.keydb) == 'key4.db'
|
||||||
|
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
|
@ -25,13 +25,11 @@ import base64
|
|||||||
import nose
|
import nose
|
||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
import shutil
|
|
||||||
import six
|
import six
|
||||||
import tempfile
|
|
||||||
from ipalib import api
|
from ipalib import api
|
||||||
from ipalib import errors
|
from ipalib import errors
|
||||||
from ipaplatform.paths import paths
|
from ipaplatform.paths import paths
|
||||||
from ipapython import ipautil
|
from ipapython.certdb import NSSDatabase
|
||||||
from ipapython.dn import DN
|
from ipapython.dn import DN
|
||||||
from ipapython.ipautil import run
|
from ipapython.ipautil import run
|
||||||
from ipatests.test_xmlrpc.testcert import subject_base
|
from ipatests.test_xmlrpc.testcert import subject_base
|
||||||
@ -77,8 +75,9 @@ def is_db_configured():
|
|||||||
# The API tested depends on the value of ~/.ipa/default/ra_plugin when
|
# The API tested depends on the value of ~/.ipa/default/ra_plugin when
|
||||||
# running as the lite-server.
|
# running as the lite-server.
|
||||||
|
|
||||||
|
|
||||||
class BaseCert(XMLRPC_test):
|
class BaseCert(XMLRPC_test):
|
||||||
|
host_fqdn = u'ipatestcert.%s' % api.env.domain
|
||||||
|
service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setup_class(cls):
|
def setup_class(cls):
|
||||||
@ -91,42 +90,27 @@ class BaseCert(XMLRPC_test):
|
|||||||
|
|
||||||
is_db_configured()
|
is_db_configured()
|
||||||
|
|
||||||
def run_certutil(self, args, stdin=None):
|
|
||||||
new_args = [paths.CERTUTIL, "-d", self.reqdir]
|
|
||||||
new_args = new_args + args
|
|
||||||
return ipautil.run(new_args, stdin)
|
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
self.reqdir = tempfile.mkdtemp(prefix = "tmp-")
|
self.nssdb = NSSDatabase()
|
||||||
self.reqfile = self.reqdir + "/test.csr"
|
secdir = self.nssdb.secdir
|
||||||
self.pwname = self.reqdir + "/pwd"
|
self.reqfile = os.path.join(secdir, "test.csr")
|
||||||
self.certfile = self.reqdir + "/cert.crt"
|
self.certfile = os.path.join(secdir, "cert.crt")
|
||||||
|
|
||||||
# Create an empty password file
|
|
||||||
with open(self.pwname, "w") as fp:
|
|
||||||
fp.write("\n")
|
|
||||||
|
|
||||||
# Create our temporary NSS database
|
# Create our temporary NSS database
|
||||||
self.run_certutil(["-N", "-f", self.pwname])
|
self.nssdb.create_db()
|
||||||
|
|
||||||
self.subject = DN(('CN', self.host_fqdn), subject_base())
|
self.subject = DN(('CN', self.host_fqdn), subject_base())
|
||||||
|
|
||||||
def teardown(self):
|
def teardown(self):
|
||||||
shutil.rmtree(self.reqdir, ignore_errors=True)
|
self.nssdb.close() # remove tempdir
|
||||||
|
|
||||||
def generateCSR(self, subject):
|
def generateCSR(self, subject):
|
||||||
self.run_certutil(["-R", "-s", subject,
|
self.nssdb.run_certutil([
|
||||||
"-o", self.reqfile,
|
"-R", "-s", subject,
|
||||||
"-z", paths.GROUP,
|
"-o", self.reqfile,
|
||||||
"-f", self.pwname,
|
"-z", paths.GROUP,
|
||||||
"-a",
|
"-a",
|
||||||
])
|
])
|
||||||
with open(self.reqfile, "r") as fp:
|
with open(self.reqfile, "rb") as f:
|
||||||
data = fp.read()
|
return f.read().decode('ascii')
|
||||||
return data
|
|
||||||
|
|
||||||
host_fqdn = u'ipatestcert.%s' % api.env.domain
|
|
||||||
service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.tier1
|
@pytest.mark.tier1
|
||||||
@ -149,7 +133,7 @@ class test_cert(BaseCert):
|
|||||||
# First create the host that will use this policy
|
# First create the host that will use this policy
|
||||||
assert 'result' in api.Command['host_add'](self.host_fqdn, force=True)
|
assert 'result' in api.Command['host_add'](self.host_fqdn, force=True)
|
||||||
|
|
||||||
csr = unicode(self.generateCSR(str(self.subject)))
|
csr = self.generateCSR(str(self.subject))
|
||||||
with assert_raises(errors.NotFound):
|
with assert_raises(errors.NotFound):
|
||||||
api.Command['cert_request'](csr, principal=self.service_princ)
|
api.Command['cert_request'](csr, principal=self.service_princ)
|
||||||
|
|
||||||
@ -160,7 +144,7 @@ class test_cert(BaseCert):
|
|||||||
# Our host should exist from previous test
|
# Our host should exist from previous test
|
||||||
global cert, sn
|
global cert, sn
|
||||||
|
|
||||||
csr = unicode(self.generateCSR(str(self.subject)))
|
csr = self.generateCSR(str(self.subject))
|
||||||
res = api.Command['cert_request'](csr, principal=self.service_princ, add=True)['result']
|
res = api.Command['cert_request'](csr, principal=self.service_princ, add=True)['result']
|
||||||
assert DN(res['subject']) == self.subject
|
assert DN(res['subject']) == self.subject
|
||||||
assert 'cacn' in res
|
assert 'cacn' in res
|
||||||
@ -190,8 +174,8 @@ class test_cert(BaseCert):
|
|||||||
See https://fedorahosted.org/freeipa/ticket/5881
|
See https://fedorahosted.org/freeipa/ticket/5881
|
||||||
"""
|
"""
|
||||||
result = api.Command.cert_show(sn, out=unicode(self.certfile))
|
result = api.Command.cert_show(sn, out=unicode(self.certfile))
|
||||||
with open(self.certfile, "r") as f:
|
with open(self.certfile, "rb") as f:
|
||||||
pem_cert = unicode(f.read())
|
pem_cert = f.read().decode('ascii')
|
||||||
result = run(['openssl', 'x509', '-text'],
|
result = run(['openssl', 'x509', '-text'],
|
||||||
stdin=pem_cert, capture_output=True)
|
stdin=pem_cert, capture_output=True)
|
||||||
assert _EXP_CRL_URI in result.output
|
assert _EXP_CRL_URI in result.output
|
||||||
@ -203,7 +187,7 @@ class test_cert(BaseCert):
|
|||||||
"""
|
"""
|
||||||
global newcert
|
global newcert
|
||||||
|
|
||||||
csr = unicode(self.generateCSR(str(self.subject)))
|
csr = self.generateCSR(str(self.subject))
|
||||||
res = api.Command['cert_request'](csr, principal=self.service_princ)['result']
|
res = api.Command['cert_request'](csr, principal=self.service_princ)['result']
|
||||||
assert DN(res['subject']) == self.subject
|
assert DN(res['subject']) == self.subject
|
||||||
# save the cert for the service_show/find tests
|
# save the cert for the service_show/find tests
|
||||||
@ -473,7 +457,7 @@ class test_cert_revocation(BaseCert):
|
|||||||
assert 'result' in api.Command['host_add'](self.host_fqdn, force=True)
|
assert 'result' in api.Command['host_add'](self.host_fqdn, force=True)
|
||||||
|
|
||||||
# generate CSR, request certificate, obtain serial number
|
# generate CSR, request certificate, obtain serial number
|
||||||
self.csr = unicode(self.generateCSR(str(self.subject)))
|
self.csr = self.generateCSR(str(self.subject))
|
||||||
res = api.Command['cert_request'](self.csr,
|
res = api.Command['cert_request'](self.csr,
|
||||||
principal=self.service_princ,
|
principal=self.service_princ,
|
||||||
add=True, all=True)['result']
|
add=True, all=True)['result']
|
||||||
|
@ -28,20 +28,15 @@ once per test run.
|
|||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
import six
|
|
||||||
import base64
|
import base64
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from ipalib import api, x509
|
from ipalib import api, x509
|
||||||
from ipaserver.plugins import rabase
|
from ipaserver.plugins import rabase
|
||||||
from ipapython import ipautil
|
from ipapython import certdb
|
||||||
from ipapython.dn import DN
|
from ipapython.dn import DN
|
||||||
from ipaplatform.paths import paths
|
from ipaplatform.paths import paths
|
||||||
|
|
||||||
if six.PY3:
|
|
||||||
unicode = str
|
|
||||||
|
|
||||||
|
|
||||||
_subject_base = None
|
_subject_base = None
|
||||||
|
|
||||||
|
|
||||||
@ -80,29 +75,6 @@ def get_testcert(subject, principal):
|
|||||||
return strip_cert_header(_testcert.decode('utf-8'))
|
return strip_cert_header(_testcert.decode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
def run_certutil(reqdir, args, stdin=None):
|
|
||||||
"""
|
|
||||||
Run an NSS certutil command
|
|
||||||
"""
|
|
||||||
new_args = [paths.CERTUTIL, "-d", reqdir]
|
|
||||||
new_args = new_args + args
|
|
||||||
return ipautil.run(new_args, stdin)
|
|
||||||
|
|
||||||
|
|
||||||
def generate_csr(reqdir, pwname, subject):
|
|
||||||
"""
|
|
||||||
Create a CSR for the given subject.
|
|
||||||
"""
|
|
||||||
req_path = os.path.join(reqdir, 'req')
|
|
||||||
run_certutil(reqdir, ["-R", "-s", subject,
|
|
||||||
"-o", req_path,
|
|
||||||
"-z", paths.GROUP,
|
|
||||||
"-f", pwname,
|
|
||||||
"-a"])
|
|
||||||
with open(req_path, "r") as fp:
|
|
||||||
return fp.read()
|
|
||||||
|
|
||||||
|
|
||||||
def makecert(reqdir, subject, principal):
|
def makecert(reqdir, subject, principal):
|
||||||
"""
|
"""
|
||||||
Generate a certificate that can be used during unit testing.
|
Generate a certificate that can be used during unit testing.
|
||||||
@ -114,16 +86,22 @@ def makecert(reqdir, subject, principal):
|
|||||||
raise AssertionError('The self-signed CA is not configured, '
|
raise AssertionError('The self-signed CA is not configured, '
|
||||||
'see ipatests/test_xmlrpc/test_cert.py')
|
'see ipatests/test_xmlrpc/test_cert.py')
|
||||||
|
|
||||||
pwname = os.path.join(reqdir, "pwd")
|
nssdb = certdb.NSSDatabase(nssdir=reqdir)
|
||||||
|
with open(nssdb.pwd_file, "w") as f:
|
||||||
# Create an empty password file
|
# Create an empty password file
|
||||||
with open(pwname, "w") as fp:
|
f.write("\n")
|
||||||
fp.write("\n")
|
# create db
|
||||||
|
nssdb.create_db()
|
||||||
# Generate NSS cert database to store the private key for our CSR
|
# create CSR
|
||||||
run_certutil(reqdir, ["-N", "-f", pwname])
|
csr_file = os.path.join(reqdir, 'req')
|
||||||
|
nssdb.run_certutil([
|
||||||
csr = unicode(generate_csr(reqdir, pwname, str(subject)))
|
"-R", "-s", str(subject),
|
||||||
|
"-o", csr_file,
|
||||||
|
"-z", paths.GROUP,
|
||||||
|
"-a"
|
||||||
|
])
|
||||||
|
with open(csr_file, "rb") as f:
|
||||||
|
csr = f.read().decode('ascii')
|
||||||
|
|
||||||
res = api.Command['cert_request'](csr, principal=principal, add=True)
|
res = api.Command['cert_request'](csr, principal=principal, add=True)
|
||||||
cert = x509.load_der_x509_certificate(
|
cert = x509.load_der_x509_certificate(
|
||||||
|
Loading…
Reference in New Issue
Block a user