tests: Adapted installation methods to utilize methods from tasks

Master and replica installation methods were made to utilize corresponding
methods from tasks.py for the sake of DRY

Reviewed-By: David Kupka <dkupka@redhat.com>
This commit is contained in:
Oleg Fayans 2016-09-09 13:16:11 +02:00 committed by David Kupka
parent 725d8d0cac
commit fad6ec8256

View File

@ -32,6 +32,7 @@ from ipaplatform.paths import paths
from ipapython.dn import DN
from ipatests.test_integration.base import IntegrationTest
from ipatests.test_integration import tasks
from ipalib.constants import DOMAIN_LEVEL_0
_DEFAULT = object()
@ -120,16 +121,16 @@ class CALessBase(IntegrationTest):
client_hostname = cls.clients[0].hostname
else:
client_hostname = 'unused-client.test'
env = {
cls.env = {
'domain': cls.master.domain.name,
'server1': cls.master.hostname,
'server2': replica_hostname,
'client': client_hostname,
'dbdir': 'nssdb',
'dbpassword': cls.cert_password,
'crl_path': cls.crl_path,
'dirman_password': cls.master.config.dirman_password,
}
ipautil.run(['bash', '-ex', scriptfile], cwd=cls.cert_dir, env=env)
ipautil.run(['bash', '-ex', scriptfile], cwd=cls.cert_dir, env=cls.env)
for host in cls.get_all_hosts():
tasks.apply_common_fixes(host)
@ -145,7 +146,8 @@ class CALessBase(IntegrationTest):
def uninstall(cls, mh):
# Remove the NSS database
shutil.rmtree(cls.cert_dir)
for host in cls.get_all_hosts():
tasks.uninstall_master(host)
super(CALessBase, cls).uninstall(mh)
@classmethod
@ -165,7 +167,7 @@ class CALessBase(IntegrationTest):
http_pin = cls.cert_password
if dirsrv_pin is _DEFAULT:
dirsrv_pin = cls.cert_password
tasks.prepare_host(host)
files_to_copy = ['root.pem']
if http_pkcs12_exists:
files_to_copy.append(http_pkcs12)
@ -174,51 +176,36 @@ class CALessBase(IntegrationTest):
for filename in set(files_to_copy):
cls.copy_cert(host, filename)
host.collect_log(paths.IPASERVER_INSTALL_LOG)
host.collect_log(paths.IPACLIENT_INSTALL_LOG)
inst = host.domain.realm.replace('.', '-')
host.collect_log(paths.SLAPD_INSTANCE_ERROR_LOG_TEMPLATE % inst)
host.collect_log(paths.SLAPD_INSTANCE_ACCESS_LOG_TEMPLATE % inst)
# Remove existing ca certs from default database to avoid conflicts
args = [paths.CERTUTIL, "-D", "-d", "/etc/httpd/alias", "-n"]
host.run_command(args + ["ca1"], raiseonerr=False)
host.run_command(args + ["ca1/server"], raiseonerr=False)
args = [
'ipa-server-install',
'--http-cert-file', http_pkcs12,
extra_args = ['--http-cert-file', http_pkcs12,
'--dirsrv-cert-file', dirsrv_pkcs12,
'--ca-cert-file', root_ca_file,
'--ip-address', host.ip,
'-r', host.domain.name,
'-p', host.config.dirman_password,
'-a', host.config.admin_password,
'--setup-dns',
'--forwarder', host.config.dns_forwarder,
]
'--ip-address', host.ip]
if http_pin is not None:
args.extend(['--http-pin', http_pin])
extra_args.extend(['--http-pin', http_pin])
if dirsrv_pin is not None:
args.extend(['--dirsrv-pin', dirsrv_pin])
if unattended:
args.extend(['-U'])
return host.run_command(args, raiseonerr=False, stdin_text=stdin_text)
extra_args.extend(['--dirsrv-pin', dirsrv_pin])
return tasks.install_master(host, extra_args=extra_args,
unattended=unattended,
stdin_text=stdin_text,
raiseonerr=False)
@classmethod
def copy_cert(cls, host, filename):
host.transport.put_file(os.path.join(cls.cert_dir, filename),
os.path.join(host.config.test_dir, filename))
@classmethod
def uninstall_server(self, host=None):
if host is None:
host = self.master
host.run_command(['ipa-server-install', '--uninstall', '-U'])
def prepare_replica(self, _replica_number=0, replica=None, master=None,
http_pkcs12='replica.p12', dirsrv_pkcs12='replica.p12',
http_pkcs12_exists=True, dirsrv_pkcs12_exists=True,
http_pin=_DEFAULT, dirsrv_pin=_DEFAULT,
root_ca_file='root.pem', unattended=True,
stdin_text=None):
stdin_text=None, domain_level=None):
"""Prepare a CA-less replica
Puts the bundle file into test_dir on the replica if successful,
@ -234,78 +221,55 @@ class CALessBase(IntegrationTest):
http_pin = self.cert_password
if dirsrv_pin is _DEFAULT:
dirsrv_pin = self.cert_password
if domain_level is None:
domain_level = tasks.domainlevel(master)
files_to_copy = ['root.pem']
if http_pkcs12_exists:
files_to_copy.append(http_pkcs12)
if dirsrv_pkcs12_exists:
files_to_copy.append(dirsrv_pkcs12)
for filename in set(files_to_copy):
master.transport.put_file(
os.path.join(self.cert_dir, filename),
os.path.join(master.config.test_dir, filename))
replica.collect_log(paths.IPAREPLICA_INSTALL_LOG)
replica.collect_log(paths.IPACLIENT_INSTALL_LOG)
inst = replica.domain.realm.replace('.', '-')
replica.collect_log(paths.SLAPD_INSTANCE_ERROR_LOG_TEMPLATE % inst)
replica.collect_log(paths.SLAPD_INSTANCE_ACCESS_LOG_TEMPLATE % inst)
args = [
'ipa-replica-prepare',
'--ip-address', replica.ip,
'-p', replica.config.dirman_password,
]
if http_pkcs12:
args.extend(['--http-cert-file', http_pkcs12])
if dirsrv_pkcs12:
args.extend(['--dirsrv-cert-file', dirsrv_pkcs12])
if http_pin is not None:
args.extend(['--http-pin', http_pin])
if dirsrv_pin is not None:
args.extend(['--dirsrv-pin', dirsrv_pin])
args.extend([replica.hostname])
result = master.run_command(args, raiseonerr=False,
stdin_text=stdin_text)
if result.returncode == 0:
replica_bundle = master.get_file_contents(
paths.REPLICA_INFO_GPG_TEMPLATE % replica.hostname)
replica.put_file_contents(self.get_replica_filename(replica),
replica_bundle)
if domain_level == DOMAIN_LEVEL_0:
destination_host = master
else:
replica.run_command(['rm', self.get_replica_filename(replica)],
raiseonerr=False)
destination_host = replica
# Both master and replica lack ipatests folder by this time, so we need
# to re-create it
tasks.prepare_host(master)
tasks.prepare_host(replica)
for filename in set(files_to_copy):
try:
destination_host.transport.put_file(
os.path.join(self.cert_dir, filename),
os.path.join(destination_host.config.test_dir, filename))
except OSError:
pass
extra_args = []
if http_pkcs12_exists:
extra_args.extend(['--http-cert-file', http_pkcs12])
if dirsrv_pkcs12_exists:
extra_args.extend(['--dirsrv-cert-file', dirsrv_pkcs12])
if http_pin is not None:
extra_args.extend(['--http-pin', http_pin])
if dirsrv_pin is not None:
extra_args.extend(['--dirsrv-pin', dirsrv_pin])
if domain_level == DOMAIN_LEVEL_0:
result = tasks.replica_prepare(master, replica,
extra_args=extra_args,
raiseonerr=False,
stdin_text=stdin_text)
else:
result = tasks.install_replica(master, replica, setup_ca=False,
extra_args=extra_args,
unattended=unattended,
stdin_text=stdin_text,
raiseonerr=False)
return result
def get_replica_filename(self, replica):
return os.path.join(replica.config.test_dir,
'replica-info.gpg')
def install_replica(self, _replica_number=0, replica=None,
unattended=True):
"""Install a CA-less replica
The bundle file is expected to be in the test_dir
Return value is the remote ipa-replica-install command
"""
if replica is None:
replica = self.replicas[_replica_number]
args = ['ipa-replica-install', '-U',
'-p', replica.config.dirman_password,
'-w', replica.config.admin_password,
'--ip-address', replica.ip,
self.get_replica_filename(replica)]
if unattended:
args.append('-U')
return replica.run_command(args)
@classmethod
def export_pkcs12(cls, nickname, filename='server.p12', password=None):
"""Export a cert as PKCS#12 to the given file"""