diff --git a/ipatests/test_integration/test_caless.py b/ipatests/test_integration/test_caless.py index 4eaa56253..a464aca21 100644 --- a/ipatests/test_integration/test_caless.py +++ b/ipatests/test_integration/test_caless.py @@ -32,6 +32,7 @@ from ipaplatform.paths import paths from ipapython.dn import DN from ipatests.test_integration.base import IntegrationTest from ipatests.test_integration import tasks +from ipalib.constants import DOMAIN_LEVEL_0 _DEFAULT = object() @@ -120,16 +121,16 @@ class CALessBase(IntegrationTest): client_hostname = cls.clients[0].hostname else: client_hostname = 'unused-client.test' - env = { + cls.env = { 'domain': cls.master.domain.name, 'server1': cls.master.hostname, 'server2': replica_hostname, 'client': client_hostname, 'dbdir': 'nssdb', - 'dbpassword': cls.cert_password, 'crl_path': cls.crl_path, + 'dirman_password': cls.master.config.dirman_password, } - ipautil.run(['bash', '-ex', scriptfile], cwd=cls.cert_dir, env=env) + ipautil.run(['bash', '-ex', scriptfile], cwd=cls.cert_dir, env=cls.env) for host in cls.get_all_hosts(): tasks.apply_common_fixes(host) @@ -145,7 +146,8 @@ class CALessBase(IntegrationTest): def uninstall(cls, mh): # Remove the NSS database shutil.rmtree(cls.cert_dir) - + for host in cls.get_all_hosts(): + tasks.uninstall_master(host) super(CALessBase, cls).uninstall(mh) @classmethod @@ -165,7 +167,7 @@ class CALessBase(IntegrationTest): http_pin = cls.cert_password if dirsrv_pin is _DEFAULT: dirsrv_pin = cls.cert_password - + tasks.prepare_host(host) files_to_copy = ['root.pem'] if http_pkcs12_exists: files_to_copy.append(http_pkcs12) @@ -174,51 +176,36 @@ class CALessBase(IntegrationTest): for filename in set(files_to_copy): cls.copy_cert(host, filename) - host.collect_log(paths.IPASERVER_INSTALL_LOG) - host.collect_log(paths.IPACLIENT_INSTALL_LOG) - inst = host.domain.realm.replace('.', '-') - host.collect_log(paths.SLAPD_INSTANCE_ERROR_LOG_TEMPLATE % inst) - host.collect_log(paths.SLAPD_INSTANCE_ACCESS_LOG_TEMPLATE % inst) + # Remove existing ca certs from default database to avoid conflicts + args = [paths.CERTUTIL, "-D", "-d", "/etc/httpd/alias", "-n"] + host.run_command(args + ["ca1"], raiseonerr=False) + host.run_command(args + ["ca1/server"], raiseonerr=False) - args = [ - 'ipa-server-install', - '--http-cert-file', http_pkcs12, - '--dirsrv-cert-file', dirsrv_pkcs12, - '--ca-cert-file', root_ca_file, - '--ip-address', host.ip, - '-r', host.domain.name, - '-p', host.config.dirman_password, - '-a', host.config.admin_password, - '--setup-dns', - '--forwarder', host.config.dns_forwarder, - ] + extra_args = ['--http-cert-file', http_pkcs12, + '--dirsrv-cert-file', dirsrv_pkcs12, + '--ca-cert-file', root_ca_file, + '--ip-address', host.ip] if http_pin is not None: - args.extend(['--http-pin', http_pin]) + extra_args.extend(['--http-pin', http_pin]) if dirsrv_pin is not None: - args.extend(['--dirsrv-pin', dirsrv_pin]) - if unattended: - args.extend(['-U']) - - return host.run_command(args, raiseonerr=False, stdin_text=stdin_text) + extra_args.extend(['--dirsrv-pin', dirsrv_pin]) + return tasks.install_master(host, extra_args=extra_args, + unattended=unattended, + stdin_text=stdin_text, + raiseonerr=False) @classmethod def copy_cert(cls, host, filename): host.transport.put_file(os.path.join(cls.cert_dir, filename), os.path.join(host.config.test_dir, filename)) - @classmethod - def uninstall_server(self, host=None): - if host is None: - host = self.master - host.run_command(['ipa-server-install', '--uninstall', '-U']) - def prepare_replica(self, _replica_number=0, replica=None, master=None, http_pkcs12='replica.p12', dirsrv_pkcs12='replica.p12', http_pkcs12_exists=True, dirsrv_pkcs12_exists=True, http_pin=_DEFAULT, dirsrv_pin=_DEFAULT, root_ca_file='root.pem', unattended=True, - stdin_text=None): + stdin_text=None, domain_level=None): """Prepare a CA-less replica Puts the bundle file into test_dir on the replica if successful, @@ -234,78 +221,55 @@ class CALessBase(IntegrationTest): http_pin = self.cert_password if dirsrv_pin is _DEFAULT: dirsrv_pin = self.cert_password - + if domain_level is None: + domain_level = tasks.domainlevel(master) files_to_copy = ['root.pem'] if http_pkcs12_exists: files_to_copy.append(http_pkcs12) if dirsrv_pkcs12_exists: files_to_copy.append(dirsrv_pkcs12) - for filename in set(files_to_copy): - master.transport.put_file( - os.path.join(self.cert_dir, filename), - os.path.join(master.config.test_dir, filename)) - - replica.collect_log(paths.IPAREPLICA_INSTALL_LOG) - replica.collect_log(paths.IPACLIENT_INSTALL_LOG) - inst = replica.domain.realm.replace('.', '-') - replica.collect_log(paths.SLAPD_INSTANCE_ERROR_LOG_TEMPLATE % inst) - replica.collect_log(paths.SLAPD_INSTANCE_ACCESS_LOG_TEMPLATE % inst) - - args = [ - 'ipa-replica-prepare', - '--ip-address', replica.ip, - '-p', replica.config.dirman_password, - ] - - if http_pkcs12: - args.extend(['--http-cert-file', http_pkcs12]) - if dirsrv_pkcs12: - args.extend(['--dirsrv-cert-file', dirsrv_pkcs12]) - if http_pin is not None: - args.extend(['--http-pin', http_pin]) - if dirsrv_pin is not None: - args.extend(['--dirsrv-pin', dirsrv_pin]) - - args.extend([replica.hostname]) - - result = master.run_command(args, raiseonerr=False, - stdin_text=stdin_text) - - if result.returncode == 0: - replica_bundle = master.get_file_contents( - paths.REPLICA_INFO_GPG_TEMPLATE % replica.hostname) - replica.put_file_contents(self.get_replica_filename(replica), - replica_bundle) + if domain_level == DOMAIN_LEVEL_0: + destination_host = master else: - replica.run_command(['rm', self.get_replica_filename(replica)], - raiseonerr=False) + destination_host = replica + # Both master and replica lack ipatests folder by this time, so we need + # to re-create it + tasks.prepare_host(master) + tasks.prepare_host(replica) + for filename in set(files_to_copy): + try: + destination_host.transport.put_file( + os.path.join(self.cert_dir, filename), + os.path.join(destination_host.config.test_dir, filename)) + except OSError: + pass + extra_args = [] + if http_pkcs12_exists: + extra_args.extend(['--http-cert-file', http_pkcs12]) + if dirsrv_pkcs12_exists: + extra_args.extend(['--dirsrv-cert-file', dirsrv_pkcs12]) + if http_pin is not None: + extra_args.extend(['--http-pin', http_pin]) + if dirsrv_pin is not None: + extra_args.extend(['--dirsrv-pin', dirsrv_pin]) + if domain_level == DOMAIN_LEVEL_0: + result = tasks.replica_prepare(master, replica, + extra_args=extra_args, + raiseonerr=False, + stdin_text=stdin_text) + else: + result = tasks.install_replica(master, replica, setup_ca=False, + extra_args=extra_args, + unattended=unattended, + stdin_text=stdin_text, + raiseonerr=False) return result def get_replica_filename(self, replica): return os.path.join(replica.config.test_dir, 'replica-info.gpg') - def install_replica(self, _replica_number=0, replica=None, - unattended=True): - """Install a CA-less replica - - The bundle file is expected to be in the test_dir - - Return value is the remote ipa-replica-install command - """ - if replica is None: - replica = self.replicas[_replica_number] - - args = ['ipa-replica-install', '-U', - '-p', replica.config.dirman_password, - '-w', replica.config.admin_password, - '--ip-address', replica.ip, - self.get_replica_filename(replica)] - if unattended: - args.append('-U') - return replica.run_command(args) - @classmethod def export_pkcs12(cls, nickname, filename='server.p12', password=None): """Export a cert as PKCS#12 to the given file"""