Use NSSDatabase instead of direct certutil calls in client code

https://fedorahosted.org/freeipa/ticket/4416

Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Jan Cholasta 2014-09-18 12:00:15 +02:00 committed by Martin Kosek
parent b764e9d3e6
commit bbf962299d
3 changed files with 26 additions and 58 deletions

View File

@ -226,14 +226,6 @@ def logging_setup(options):
def log_service_error(name, action, error): def log_service_error(name, action, error):
root_logger.error("%s failed to %s: %s", name, action, str(error)) root_logger.error("%s failed to %s: %s", name, action, str(error))
def nickname_exists(nickname):
(sout, serr, returncode) = run([paths.CERTUTIL, "-L", "-d", paths.NSS_DB_DIR, "-n", nickname], raiseonerr=False)
if returncode == 0:
return True
else:
return False
def purge_ipa_certs(additional=[]): def purge_ipa_certs(additional=[]):
filename = paths.NSSDB_IPA_TXT filename = paths.NSSDB_IPA_TXT
if file_exists(filename): if file_exists(filename):
@ -258,12 +250,11 @@ def purge_ipa_certs(additional=[]):
if nickname: if nickname:
nicknames.add(nickname) nicknames.add(nickname)
sys_db = certdb.NSSDatabase(paths.NSS_DB_DIR)
for nickname in nicknames: for nickname in nicknames:
while nickname_exists(nickname): while sys_db.has_nickname(nickname):
try: try:
run([paths.CERTUTIL, "-D", sys_db.delete_cert(nickname)
"-d", paths.NSS_DB_DIR,
"-n", nickname])
except Exception, e: except Exception, e:
root_logger.error( root_logger.error(
"Failed to remove %s from /etc/pki/nssdb: %s", nickname, e) "Failed to remove %s from /etc/pki/nssdb: %s", nickname, e)
@ -2533,23 +2524,16 @@ def install(options, env, fstore, statestore):
except ValueError: except ValueError:
pass pass
tmp_nss_dir = tempfile.mkdtemp() with certdb.NSSDatabase() as tmp_db:
try:
# Add CA certs to a temporary NSS database # Add CA certs to a temporary NSS database
try: try:
pwd_file = ipautil.write_tmp_file(ipautil.ipa_generate_password()) pwd_file = ipautil.write_tmp_file(ipautil.ipa_generate_password())
run([paths.CERTUTIL, '-N', tmp_db.create_db(pwd_file.name)
'-d', tmp_nss_dir,
'-f', pwd_file.name])
ca_certs = x509.load_certificate_list_from_file(CACERT) ca_certs = x509.load_certificate_list_from_file(CACERT)
ca_certs = [cert.der_data for cert in ca_certs] ca_certs = [cert.der_data for cert in ca_certs]
for i, cert in enumerate(ca_certs): for i, cert in enumerate(ca_certs):
run([paths.CERTUTIL, '-A', tmp_db.add_cert(cert, 'CA certificate %d' % (i + 1), 'C,,')
'-d', tmp_nss_dir,
'-n', 'CA certificate %d' % (i + 1),
'-t', 'C,,'],
stdin=cert)
except CalledProcessError, e: except CalledProcessError, e:
root_logger.info("Failed to add CA to temporary NSS database.") root_logger.info("Failed to add CA to temporary NSS database.")
return CLIENT_INSTALL_ERROR return CLIENT_INSTALL_ERROR
@ -2557,7 +2541,7 @@ def install(options, env, fstore, statestore):
# Now, let's try to connect to the server's RPC interface # Now, let's try to connect to the server's RPC interface
connected = False connected = False
try: try:
api.Backend.rpcclient.connect(nss_dir=tmp_nss_dir) api.Backend.rpcclient.connect(nss_dir=tmp_db.secdir)
connected = True connected = True
root_logger.debug("Try RPC connection") root_logger.debug("Try RPC connection")
api.Backend.rpcclient.forward('ping') api.Backend.rpcclient.forward('ping')
@ -2569,7 +2553,7 @@ def install(options, env, fstore, statestore):
"Trying with delegate=True", e) "Trying with delegate=True", e)
try: try:
api.Backend.rpcclient.connect(delegate=True, api.Backend.rpcclient.connect(delegate=True,
nss_dir=tmp_nss_dir) nss_dir=tmp_db.secdir)
root_logger.debug("Try RPC connection") root_logger.debug("Try RPC connection")
api.Backend.rpcclient.forward('ping') api.Backend.rpcclient.forward('ping')
@ -2594,8 +2578,6 @@ def install(options, env, fstore, statestore):
root_logger.error( root_logger.error(
"Cannot connect to the server due to generic error: %s", e) "Cannot connect to the server due to generic error: %s", e)
return CLIENT_INSTALL_ERROR return CLIENT_INSTALL_ERROR
finally:
shutil.rmtree(tmp_nss_dir)
# Use the RPC directly so older servers are supported # Use the RPC directly so older servers are supported
result = api.Backend.rpcclient.forward( result = api.Backend.rpcclient.forward(
@ -2622,14 +2604,10 @@ def install(options, env, fstore, statestore):
# Add the CA certificates to the IPA NSS database # Add the CA certificates to the IPA NSS database
root_logger.debug("Adding CA certificates to the IPA NSS database.") root_logger.debug("Adding CA certificates to the IPA NSS database.")
ipa_db = certdb.NSSDatabase(paths.IPA_NSSDB_DIR)
for cert, nickname, trust_flags in ca_certs_trust: for cert, nickname, trust_flags in ca_certs_trust:
try: try:
run([paths.CERTUTIL, ipa_db.add_cert(cert, nickname, trust_flags)
"-A",
"-d", paths.IPA_NSSDB_DIR,
"-n", nickname,
"-t", trust_flags],
stdin=cert)
except CalledProcessError, e: except CalledProcessError, e:
root_logger.error( root_logger.error(
"Failed to add %s to the IPA NSS database.", nickname) "Failed to add %s to the IPA NSS database.", nickname)
@ -2653,14 +2631,10 @@ def install(options, env, fstore, statestore):
root_logger.debug( root_logger.debug(
"Attempting to add CA certificates to the default NSS database.") "Attempting to add CA certificates to the default NSS database.")
sys_db = certdb.NSSDatabase(paths.NSS_DB_DIR)
for cert, nickname, trust_flags in ca_certs_trust: for cert, nickname, trust_flags in ca_certs_trust:
try: try:
run([paths.CERTUTIL, sys_db.add_cert(cert, nickname, trust_flags)
"-A",
"-d", paths.NSS_DB_DIR,
"-n", nickname,
"-t", trust_flags],
stdin=cert)
except CalledProcessError, e: except CalledProcessError, e:
root_logger.error( root_logger.error(
"Failed to add %s to the default NSS database.", nickname) "Failed to add %s to the default NSS database.", nickname)

View File

@ -22,7 +22,7 @@ import tempfile
import shutil import shutil
from ipapython import (admintool, ipautil, ipaldap, sysrestore, dogtag, from ipapython import (admintool, ipautil, ipaldap, sysrestore, dogtag,
certmonger) certmonger, certdb)
from ipaplatform import services from ipaplatform import services
from ipaplatform.paths import paths from ipaplatform.paths import paths
from ipaplatform.tasks import tasks from ipaplatform.tasks import tasks
@ -72,11 +72,10 @@ class CertUpdate(admintool.AdminTool):
self.update_file(paths.IPA_CA_CRT, certs) self.update_file(paths.IPA_CA_CRT, certs)
self.update_db(paths.IPA_NSSDB_DIR, certs) self.update_db(paths.IPA_NSSDB_DIR, certs)
sys_db = certdb.NSSDatabase(paths.NSS_DB_DIR)
for nickname in ('IPA CA', 'External CA cert'): for nickname in ('IPA CA', 'External CA cert'):
try: try:
ipautil.run([paths.CERTUTIL, '-D', sys_db.delete_cert(nickname)
'-d', paths.NSS_DB_DIR,
'-n', nickname])
except ipautil.CalledProcessError, e: except ipautil.CalledProcessError, e:
pass pass
@ -165,15 +164,12 @@ class CertUpdate(admintool.AdminTool):
self.log.error("failed to update %s: %s", filename, e) self.log.error("failed to update %s: %s", filename, e)
def update_db(self, path, certs): def update_db(self, path, certs):
db = certdb.NSSDatabase(path)
for cert, nickname, trusted, eku in certs: for cert, nickname, trusted, eku in certs:
trust_flags = certstore.key_policy_to_trust_flags( trust_flags = certstore.key_policy_to_trust_flags(
trusted, True, eku) trusted, True, eku)
try: try:
ipautil.run([paths.CERTUTIL, '-A', db.add_cert(cert, nickname, trust_flags)
'-d', path,
'-n', nickname,
'-t', trust_flags],
stdin=cert)
except ipautil.CalledProcessError, e: except ipautil.CalledProcessError, e:
self.log.error( self.log.error(
"failed to update %s in %s: %s", nickname, path, e) "failed to update %s in %s: %s", nickname, path, e)

View File

@ -36,24 +36,22 @@ def get_ca_nickname(realm, format=CA_NICKNAME_FMT):
def create_ipa_nssdb(): def create_ipa_nssdb():
pwdfile = os.path.join(paths.IPA_NSSDB_DIR, 'pwdfile.txt') db = NSSDatabase(paths.IPA_NSSDB_DIR)
pwdfile = os.path.join(db.secdir, 'pwdfile.txt')
ipautil.backup_file(pwdfile) ipautil.backup_file(pwdfile)
ipautil.backup_file(os.path.join(paths.IPA_NSSDB_DIR, 'cert8.db')) ipautil.backup_file(os.path.join(db.secdir, 'cert8.db'))
ipautil.backup_file(os.path.join(paths.IPA_NSSDB_DIR, 'key3.db')) ipautil.backup_file(os.path.join(db.secdir, 'key3.db'))
ipautil.backup_file(os.path.join(paths.IPA_NSSDB_DIR, 'secmod.db')) ipautil.backup_file(os.path.join(db.secdir, 'secmod.db'))
with open(pwdfile, 'w') as f: with open(pwdfile, 'w') as f:
f.write(ipautil.ipa_generate_password(pwd_len=40)) f.write(ipautil.ipa_generate_password(pwd_len=40))
os.chmod(pwdfile, 0600) os.chmod(pwdfile, 0600)
ipautil.run([paths.CERTUTIL, db.create_db(pwdfile)
"-N", os.chmod(os.path.join(db.secdir, 'cert8.db'), 0644)
"-d", paths.IPA_NSSDB_DIR, os.chmod(os.path.join(db.secdir, 'key3.db'), 0644)
"-f", pwdfile]) os.chmod(os.path.join(db.secdir, 'secmod.db'), 0644)
os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'cert8.db'), 0644)
os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'key3.db'), 0644)
os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'secmod.db'), 0644)
def find_cert_from_txt(cert, start=0): def find_cert_from_txt(cert, start=0):