#!/usr/bin/python3 # Authors: # Petr Viktorin # # Copyright (C) 2013 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from __future__ import print_function import logging import sys import os import argparse from ipapython.ipa_log_manager import standard_logging_setup from ipatests.pytest_ipa.integration import config from ipatests.pytest_ipa.integration import tasks from ipatests.pytest_ipa.integration.host import Host from ipatests.pytest_ipa.integration import collect_logs try: from pytest_beakerlib import BeakerLibProcess except ImportError: BeakerLibProcess = None logger = logging.getLogger(os.path.basename(__file__)) class TaskRunner(object): def __init__(self): self._prepared_hosts = set() def get_parser(self): parser = argparse.ArgumentParser( description="Perform an operation for integration testing. " "All operations are performed on configured hosts, see " "http://www.freeipa.org/page/V3/Integration_testing " "for configuration details") parser.add_argument('--with-beakerlib', action='store_true', dest='with_beakerlib', help="""Issue BeakerLib commands for logging and log collection""") parser.add_argument('--logfile-dir', dest='logfile_dir', help="""Directory to collect logs in""") subparsers = parser.add_subparsers( metavar='SUBCOMMAND', help='The action to perform (* indicates an idempotent operation)') subparser = subparsers.add_parser( 'install-topo', help='Install IPA in a given topology') subparser.add_argument('topo', metavar='TOPO', help='Desired topology ' '(see `ipa-test-task list-topos` for details)', choices=tasks.topologies) subparser.add_argument('--skip-master', action='store_true', help='Skip installing master') subparser.add_argument('--skip-clients', action='store_true', help='Skip installing clients') subparser.add_argument('--master', type=str, help='Master to use (Default: from config)') subparser.add_argument('--replicas', type=str, nargs='*', help='Replicas to install (Default: from config)') subparser.add_argument('--clients', type=str, nargs='*', help='Clients to install (Default: from config)') subparser.set_defaults(func=self.install_topo) subparser = subparsers.add_parser( 'list-topos', help='List the available topologies') subparser.set_defaults(func=self.list_topos) subparser = subparsers.add_parser( 'install-master', help='Install IPA on the master') subparser.add_argument('--host', type=str, help='Host to use (Default: from config)') subparser.set_defaults(func=self.install_master) subparser = subparsers.add_parser( 'install-replica', help='Install an IPA replica') subparser.add_argument('replica', type=str, help='Replica to install') subparser.add_argument('--master', type=str, help="""Master to replicate from (Default: from config)""") subparser.set_defaults(func=self.install_replica) subparser = subparsers.add_parser( 'install-client', help='Install an IPA client') subparser.add_argument('client', type=str, help='Client to install') subparser.add_argument('--master', type=str, help="""Master to replicate from (Default: from config)""") subparser.set_defaults(func=self.install_client) subparser = subparsers.add_parser( 'connect-replica', help='Connect two IPA masters') subparser.add_argument('host1', type=str, help='First replica to connect') subparser.add_argument('host2', type=str, help='Second replica to connect') subparser.set_defaults(func=self.connect_replica) subparser = subparsers.add_parser( 'disconnect-replica', help='Disconnect two IPA masters') subparser.add_argument('host1', type=str, help='First replica to disconnect') subparser.add_argument('host2', type=str, help='Second replica to disconnect') subparser.set_defaults(func=self.disconnect_replica) subparser = subparsers.add_parser( 'uninstall-server', help='Uninstall IPA server *') subparser.add_argument('host', type=str, nargs='*', help="""Host to use (Default: master and all replicas from config)""") subparser.set_defaults(func=self.uninstall_master) subparser = subparsers.add_parser( 'uninstall-client', help='Uninstall IPA client *') subparser.add_argument('host', type=str, nargs='*', help="""Host to use (Default: all clients from config)""") subparser.set_defaults(func=self.uninstall_client) subparser = subparsers.add_parser( 'uninstall-all', help='Uninstall all hosts, according to config *') subparser.set_defaults(func=self.uninstall_all, host=None) subparser = subparsers.add_parser( 'cleanup', help='Clean up a host *') subparser.add_argument('host', type=str, nargs='*', help="""Host to clean up (Default: all hosts from config)""") subparser.set_defaults(func=self.cleanup) subparser = subparsers.add_parser( 'install-adtrust', help='Runs ipa-adtrust-install on the host') subparser.add_argument('host', type=str, help='Host to run ipa-adtrust-install on') subparser.set_defaults(func=self.install_adtrust) subparser = subparsers.add_parser( 'configure-dns-for-trust', help='Sets DNS on the given host for trust with the given AD') subparser.add_argument('host', type=str, help='Host to change DNS configuration on') subparser.add_argument('ad', type=str, help='AD that trust will be established with') subparser.set_defaults(func=self.configure_dns_for_trust) subparser = subparsers.add_parser( 'establish-trust-with-ad', help='Establishes trust between IPA host and AD') subparser.add_argument('host', type=str, help='IPA Host to establish AD trust on') subparser.add_argument('ad', type=str, help='AD to establish trust with') subparser.set_defaults(func=self.establish_trust_with_ad) subparser = subparsers.add_parser( 'remove-trust-with-ad', help='Removes trust between IPA host and AD') subparser.add_argument('host', type=str, help='IPA Host to remove AD trust on') subparser.add_argument('ad', type=str, help='AD to remove trust with') subparser.set_defaults(func=self.remove_trust_with_ad) subparser = subparsers.add_parser( 'configure-auth-to-local-rule', help='Configures auth_to_local rule on IPA host with respect to AD') subparser.add_argument('host', type=str, help='IPA Host to configure auth_to_local rule on') subparser.add_argument('ad', type=str, help='AD to configure the rule with') subparser.set_defaults(func=self.configure_auth_to_local_rule) subparser = subparsers.add_parser( 'clear-sssd-cache', help='Clears SSSD cache on the IPA host.') subparser.add_argument('host', type=str, help='IPA Host to clear SSSD cache on') subparser.set_defaults(func=self.clear_sssd_cache) subparser = subparsers.add_parser( 'setup-sssd-debugging', help='Turns on SSSD debugging levels.') subparser.add_argument('host', type=str, help='IPA Host to turn SSSD debugging on') subparser.set_defaults(func=self.setup_sssd_debugging) subparser = subparsers.add_parser( 'sync-time', help='Synchronize time on host with respect to server') subparser.add_argument('host', type=str, help='IPA Host to set the time on') subparser.add_argument('server', type=str, help='Server that serves as a time source') subparser.set_defaults(func=self.sync_time) subparser = subparsers.add_parser( 'add-a-records-in-master-domain', help='Adds A records to the IPA master for all the hosts in the ' 'master domain.') subparser.add_argument('master', type=str, help='IPA master to add records on') subparser.set_defaults( func=self.add_a_records_for_hosts_in_master_domain) subparser = subparsers.add_parser( 'add-a-record', help='Adds A record for the host to the IPA master') subparser.add_argument('master', type=str, help='IPA master to add record on') subparser.add_argument('host', type=str, help='Host whose record should be added') subparser.set_defaults(func=self.add_a_record) return parser def main(self, argv): parser = self.get_parser() args = parser.parse_args(argv) self.config = config.Config.from_env(os.environ) if not self.config: raise EnvironmentError('Multihost environment not configured') logs_to_collect = {} def collect_log(host, filename): logs_to_collect.setdefault(host, []).append(filename) self.collect_log = collect_log if args.with_beakerlib: if BeakerLibProcess is None: parser.error( 'pytest_beakerlib not installed, cannot use BeakerLib') beakerlib_process = BeakerLibProcess() args.verbose = True standard_logging_setup( console_format='%(name)s: %(levelname)s: %(message)s', debug=True) if not self.config.domains: raise SystemExit('No configuration available') args.domain = self.config.domains[0] if self.config.ad_domains: args.ad_domain = self.config.ad_domains[0] else: args.ad_domain = None import logging; logging.basicConfig() try: return args.func(args) except Exception as e: if args.with_beakerlib: beakerlib_process.log_exception() beakerlib_process.run_beakerlib_command( ['rlFail', 'Unhandled exception']) raise finally: if args.with_beakerlib: collect_logs('ipa-test-task', logs_to_collect, logfile_dir=args.logfile_dir, beakerlib_plugin=beakerlib_process) beakerlib_process.end() for host in self._prepared_hosts: host.remove_log_collector(self.collect_log) def get_host(self, host_name, default=None): if host_name is None: host = default else: host = self.config.host_by_name(host_name) return self.prepare_host(host) def get_hosts(self, host_names, default=()): if host_names is None: host_names = () hosts = [self.get_host(host_name) for host_name in host_names] if hosts: return hosts else: return [self.prepare_host(h) for h in default] def prepare_host(self, host): # Prepare only UNIX hosts if host not in self._prepared_hosts and isinstance(host, Host): host.add_log_collector(self.collect_log) tasks.prepare_host(host) self._prepared_hosts.add(host) return host def require_ad_domain(self, args): if not args.ad_domain: SystemExit("At least one AD domain is required for this task") def install_master(self, args): master = self.get_host(args.host, default=args.domain.master) logger.info('Installing master %s', master.hostname) tasks.install_master(master) def install_replica(self, args): replica = self.get_host(args.replica) master = self.get_host(args.master, default=args.domain.master) logger.info('Installing replica %s from %s', replica.hostname, master.hostname) tasks.install_replica(master, replica) def install_client(self, args): client = self.get_host(args.client) master = self.get_host(args.master, default=args.domain.master) logger.info('Installing client %s on %s', client.hostname, master.hostname) tasks.install_client(master, client) def uninstall_master(self, args): default_hosts = [args.domain.master] + args.domain.replicas hosts = self.get_hosts(args.host, default=default_hosts) logger.info('Uninstalling masters: %s', [h.hostname for h in hosts]) for master in hosts: logger.info('Uninstalling %s', master.hostname) tasks.uninstall_master(master) def uninstall_client(self, args): default_hosts = args.domain.clients hosts = self.get_hosts(args.host, default=default_hosts) logger.info('Uninstalling clients: %s', [h.hostname for h in hosts]) for client in hosts: logger.info('Uninstalling %s', client.hostname) tasks.uninstall_client(client) def uninstall_all(self, args): self.uninstall_master(args) self.uninstall_client(args) def cleanup(self, args): default_hosts = args.domain.hosts hosts = self.get_hosts(args.host, default=default_hosts) logger.info('Cleaning up hosts: %s', [h.hostname for h in hosts]) for host in hosts: logger.info('Cleaning up %s', host.hostname) tasks.unapply_fixes(host) def connect_replica(self, args): host1 = self.get_host(args.host1) host2 = self.get_host(args.host2) tasks.connect_replica(host1, host2) def disconnect_replica(self, args): host1 = self.get_host(args.host1) host2 = self.get_host(args.host2) tasks.disconnect_replica(host1, host2) def list_topos(self, args): for name, topo in tasks.topologies.items(): print('%s: %s' % (name, topo.__doc__)) def install_topo(self, args): master = self.get_host(args.master, default=args.domain.master) replicas = self.get_hosts(args.replicas, default=args.domain.replicas) clients = self.get_hosts(args.clients, default=args.domain.clients) if args.skip_clients: clients = [] tasks.install_topo(args.topo, master, replicas, clients, skip_master=args.skip_master) def install_adtrust(self, args): master = self.get_host(args.host, default=args.domain.master) logger.info('Configuring AD trust support on %s', master.hostname) tasks.install_adtrust(master) def configure_dns_for_trust(self, args): self.require_ad_domain(args) host = self.get_host(args.host, default=args.domain.master) ad = self.get_host(args.ad, default=args.ad_domain.ads[0]) tasks.configure_dns_for_trust(host, ad) def establish_trust_with_ad(self, args): self.require_ad_domain(args) host = self.get_host(args.host, default=args.domain.master) ad = self.get_host(args.ad, default=args.ad_domain.ads[0]) tasks.establish_trust_with_ad(host, ad) def remove_trust_with_ad(self, args): self.require_ad_domain(args) host = self.get_host(args.host, default=args.domain.master) ad = self.get_host(args.ad, default=args.ad_domain.ads[0]) tasks.remove_trust_with_ad(host, ad) def configure_auth_to_local_rule(self, args): self.require_ad_domain(args) host = self.get_host(args.host, default=args.domain.master) ad = self.get_host(args.ad, default=args.ad_domain.ads[0]) tasks.configure_auth_to_local_rule(host, ad) def clear_sssd_cache(self, args): host = self.get_host(args.host, default=args.domain.master) tasks.clear_sssd_cache(host) def setup_sssd_debugging(self, args): host = self.get_host(args.host, default=args.domain.master) tasks.setup_sssd_debugging(host) def sync_time(self, args): host = self.get_host(args.host, default=args.domain.master) server = self.get_host(args.server) tasks.sync_time(host, server) def add_a_records_for_hosts_in_master_domain(self, args): master = self.get_host(args.master, default=args.domain.master) tasks.add_a_records_for_hosts_in_master_domain(master) def add_a_record(self, args): master = self.get_host(args.master, default=args.domain.master) host = self.get_host(args.host) tasks.add_a_record(master, host) if __name__ == '__main__': sys.exit(TaskRunner().main(sys.argv[1:]))