freeipa/ipatests/ipa-test-task
Petr Viktorin 29c28786e3 Integration tests: Port the BeakerLib plugin and log collection to pytest
Move the IPA-specific log collection out of the Beakerlib plugin.
Add the --logfile-dir option to tests and ipa-test-task, so that logs
can be collected even if BeakerLib is not used.

https://fedorahosted.org/freeipa/ticket/4610

Reviewed-By: Tomas Babej <tbabej@redhat.com>
2014-11-21 12:14:44 +01:00

446 lines
18 KiB
Python
Executable File

#! /usr/bin/python2
# Authors:
# Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2013 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import os
import argparse
from ipapython.ipa_log_manager import log_mgr, standard_logging_setup
from ipatests.test_integration import config
from ipatests.test_integration import tasks
from ipatests.test_integration.host import Host
from ipatests.pytest_plugins.beakerlib import BeakerLibProcess
from ipatests.pytest_plugins.integration import collect_logs
log = log_mgr.get_logger(__name__)
class TaskRunner(object):
def __init__(self):
self._prepared_hosts = set()
def get_parser(self):
parser = argparse.ArgumentParser(
description="Perform an operation for integration testing. "
"All operations are performed on configured hosts, see "
"http://www.freeipa.org/page/V3/Integration_testing "
"for configuration details")
parser.add_argument('--with-beakerlib', action='store_true',
dest='with_beakerlib',
help="""Issue BeakerLib commands for logging
and log collection""")
parser.add_argument('--logfile-dir', dest='logfile_dir',
help="""Directory to collect logs in""")
subparsers = parser.add_subparsers(
metavar='SUBCOMMAND',
help='The action to perform (* indicates an idempotent operation)')
subparser = subparsers.add_parser(
'install-topo',
help='Install IPA in a given topology')
subparser.add_argument('topo',
metavar='TOPO',
help='Desired topology '
'(see `ipa-test-task list-topos` for details)',
choices=tasks.topologies)
subparser.add_argument('--skip-master', action='store_true',
help='Skip installing master')
subparser.add_argument('--skip-clients', action='store_true',
help='Skip installing clients')
subparser.add_argument('--master', type=str,
help='Master to use (Default: from config)')
subparser.add_argument('--replicas', type=str, nargs='*',
help='Replicas to install (Default: from config)')
subparser.add_argument('--clients', type=str, nargs='*',
help='Clients to install (Default: from config)')
subparser.set_defaults(func=self.install_topo)
subparser = subparsers.add_parser(
'list-topos',
help='List the available topologies')
subparser.set_defaults(func=self.list_topos)
subparser = subparsers.add_parser(
'install-master',
help='Install IPA on the master')
subparser.add_argument('--host', type=str,
help='Host to use (Default: from config)')
subparser.set_defaults(func=self.install_master)
subparser = subparsers.add_parser(
'install-replica',
help='Install an IPA replica')
subparser.add_argument('replica', type=str,
help='Replica to install')
subparser.add_argument('--master', type=str,
help="""Master to replicate from
(Default: from config)""")
subparser.set_defaults(func=self.install_replica)
subparser = subparsers.add_parser(
'install-client',
help='Install an IPA client')
subparser.add_argument('client', type=str,
help='Client to install')
subparser.add_argument('--master', type=str,
help="""Master to replicate from
(Default: from config)""")
subparser.set_defaults(func=self.install_client)
subparser = subparsers.add_parser(
'connect-replica',
help='Connect two IPA masters')
subparser.add_argument('host1', type=str,
help='First replica to connect')
subparser.add_argument('host2', type=str,
help='Second replica to connect')
subparser.set_defaults(func=self.connect_replica)
subparser = subparsers.add_parser(
'disconnect-replica',
help='Disconnect two IPA masters')
subparser.add_argument('host1', type=str,
help='First replica to disconnect')
subparser.add_argument('host2', type=str,
help='Second replica to disconnect')
subparser.set_defaults(func=self.disconnect_replica)
subparser = subparsers.add_parser(
'uninstall-server',
help='Uninstall IPA server *')
subparser.add_argument('host', type=str, nargs='*',
help="""Host to use
(Default: master and all replicas
from config)""")
subparser.set_defaults(func=self.uninstall_master)
subparser = subparsers.add_parser(
'uninstall-client',
help='Uninstall IPA client *')
subparser.add_argument('host', type=str, nargs='*',
help="""Host to use
(Default: all clients from config)""")
subparser.set_defaults(func=self.uninstall_client)
subparser = subparsers.add_parser(
'uninstall-all',
help='Uninstall all hosts, according to config *')
subparser.set_defaults(func=self.uninstall_all, host=None)
subparser = subparsers.add_parser(
'cleanup',
help='Clean up a host *')
subparser.add_argument('host', type=str, nargs='*',
help="""Host to clean up
(Default: all hosts from config)""")
subparser.set_defaults(func=self.cleanup)
subparser = subparsers.add_parser(
'install-adtrust',
help='Runs ipa-adtrust-install on the host')
subparser.add_argument('host', type=str,
help='Host to run ipa-adtrust-install on')
subparser.set_defaults(func=self.install_adtrust)
subparser = subparsers.add_parser(
'configure-dns-for-trust',
help='Sets DNS on the given host for trust with the given AD')
subparser.add_argument('host', type=str,
help='Host to change DNS configuration on')
subparser.add_argument('ad', type=str,
help='AD that trust will be established with')
subparser.set_defaults(func=self.configure_dns_for_trust)
subparser = subparsers.add_parser(
'establish-trust-with-ad',
help='Establishes trust between IPA host and AD')
subparser.add_argument('host', type=str,
help='IPA Host to establish AD trust on')
subparser.add_argument('ad', type=str,
help='AD to establish trust with')
subparser.set_defaults(func=self.establish_trust_with_ad)
subparser = subparsers.add_parser(
'remove-trust-with-ad',
help='Removes trust between IPA host and AD')
subparser.add_argument('host', type=str,
help='IPA Host to remove AD trust on')
subparser.add_argument('ad', type=str,
help='AD to remove trust with')
subparser.set_defaults(func=self.remove_trust_with_ad)
subparser = subparsers.add_parser(
'configure-auth-to-local-rule',
help='Configures auth_to_local rule on IPA host with respect to AD')
subparser.add_argument('host', type=str,
help='IPA Host to configure auth_to_local rule on')
subparser.add_argument('ad', type=str,
help='AD to configure the rule with')
subparser.set_defaults(func=self.configure_auth_to_local_rule)
subparser = subparsers.add_parser(
'clear-sssd-cache',
help='Clears SSSD cache on the IPA host.')
subparser.add_argument('host', type=str,
help='IPA Host to clear SSSD cache on')
subparser.set_defaults(func=self.clear_sssd_cache)
subparser = subparsers.add_parser(
'setup-sssd-debugging',
help='Turns on SSSD debugging levels.')
subparser.add_argument('host', type=str,
help='IPA Host to turn SSSD debugging on')
subparser.set_defaults(func=self.setup_sssd_debugging)
subparser = subparsers.add_parser(
'sync-time',
help='Synchronize time on host with respect to server')
subparser.add_argument('host', type=str,
help='IPA Host to set the time on')
subparser.add_argument('server', type=str,
help='Server that serves as a time source')
subparser.set_defaults(func=self.sync_time)
subparser = subparsers.add_parser(
'add-a-records-in-master-domain',
help='Adds A records to the IPA master for all the hosts in the '
'master domain.')
subparser.add_argument('master', type=str,
help='IPA master to add records on')
subparser.set_defaults(
func=self.add_a_records_for_hosts_in_master_domain)
subparser = subparsers.add_parser(
'add-a-record',
help='Adds A record for the host to the IPA master')
subparser.add_argument('master', type=str,
help='IPA master to add record on')
subparser.add_argument('host', type=str,
help='Host whose record should be added')
subparser.set_defaults(func=self.add_a_record)
return parser
def main(self, argv):
args = self.get_parser().parse_args(argv)
self.config = config.Config.from_env(os.environ)
logs_to_collect = {}
def collect_log(host, filename):
logs_to_collect.setdefault(host, []).append(filename)
self.collect_log = collect_log
if args.with_beakerlib:
beakerlib_process = BeakerLibProcess()
args.verbose = True
standard_logging_setup(
console_format='%(name)s: %(levelname)s: %(message)s',
debug=True)
if not self.config.domains:
raise SystemExit('No configuration available')
args.domain = self.config.domains[0]
if self.config.ad_domains:
args.ad_domain = self.config.ad_domains[0]
else:
args.ad_domain = None
import logging; logging.basicConfig()
try:
return args.func(args)
except Exception, e:
if args.with_beakerlib:
beakerlib_process.log_exception()
beakerlib_process.run_beakerlib_command(
['rlFail', 'Unhandled exception'])
raise
finally:
if args.with_beakerlib:
collect_logs('ipa-test-task', logs_to_collect,
logfile_dir=args.logfile_dir,
beakerlib_plugin=beakerlib_process)
beakerlib_process.end()
for host in self._prepared_hosts:
host.remove_log_collector(self.collect_log)
def get_host(self, host_name, default=None):
if host_name is None:
host = default
else:
host = self.config.host_by_name(host_name)
return self.prepare_host(host)
def get_hosts(self, host_names, default=()):
if host_names is None:
host_names = ()
hosts = [self.get_host(host_name) for host_name in host_names]
if hosts:
return hosts
else:
return [self.prepare_host(h) for h in default]
def prepare_host(self, host):
# Prepare only UNIX hosts
if host not in self._prepared_hosts and isinstance(host, Host):
host.add_log_collector(self.collect_log)
tasks.prepare_host(host)
self._prepared_hosts.add(host)
return host
def require_ad_domain(self, args):
if not args.ad_domain:
SystemExit("At least one AD domain is required for this task")
def install_master(self, args):
master = self.get_host(args.host, default=args.domain.master)
log.info('Installing master %s', master.hostname)
tasks.install_master(master)
def install_replica(self, args):
replica = self.get_host(args.replica)
master = self.get_host(args.master, default=args.domain.master)
log.info('Installing replica %s from %s',
replica.hostname, master.hostname)
tasks.install_replica(master, replica)
def install_client(self, args):
client = self.get_host(args.client)
master = self.get_host(args.master, default=args.domain.master)
log.info('Installing client %s on %s', client.hostname, master.hostname)
tasks.install_client(master, client)
def uninstall_master(self, args):
default_hosts = [args.domain.master] + args.domain.replicas
hosts = self.get_hosts(args.host, default=default_hosts)
log.info('Uninstalling masters: %s', [h.hostname for h in hosts])
for master in hosts:
log.info('Uninstalling %s', master.hostname)
tasks.uninstall_master(master)
def uninstall_client(self, args):
default_hosts = args.domain.clients
hosts = self.get_hosts(args.host, default=default_hosts)
log.info('Uninstalling clients: %s', [h.hostname for h in hosts])
for client in hosts:
log.info('Uninstalling %s', client.hostname)
tasks.uninstall_client(client)
def uninstall_all(self, args):
self.uninstall_master(args)
self.uninstall_client(args)
def cleanup(self, args):
default_hosts = args.domain.hosts
hosts = self.get_hosts(args.host, default=default_hosts)
log.info('Cleaning up hosts: %s', [h.hostname for h in hosts])
for host in hosts:
log.info('Cleaning up %s', host.hostname)
tasks.unapply_fixes(host)
def connect_replica(self, args):
host1 = self.get_host(args.host1)
host2 = self.get_host(args.host2)
tasks.connect_replica(host1, host2)
def disconnect_replica(self, args):
host1 = self.get_host(args.host1)
host2 = self.get_host(args.host2)
tasks.disconnect_replica(host1, host2)
def list_topos(self, args):
for name, topo in tasks.topologies.items():
print '%s: %s' % (name, topo.__doc__)
def install_topo(self, args):
master = self.get_host(args.master, default=args.domain.master)
replicas = self.get_hosts(args.replicas, default=args.domain.replicas)
clients = self.get_hosts(args.clients, default=args.domain.clients)
if args.skip_clients:
clients = []
tasks.install_topo(args.topo, master, replicas, clients,
skip_master=args.skip_master)
def install_adtrust(self, args):
master = self.get_host(args.host, default=args.domain.master)
log.info('Configuring AD trust support on %s', master.hostname)
tasks.install_adtrust(master)
def configure_dns_for_trust(self, args):
self.require_ad_domain(args)
host = self.get_host(args.host, default=args.domain.master)
ad = self.get_host(args.ad, default=args.ad_domain.ads[0])
tasks.configure_dns_for_trust(host, ad)
def establish_trust_with_ad(self, args):
self.require_ad_domain(args)
host = self.get_host(args.host, default=args.domain.master)
ad = self.get_host(args.ad, default=args.ad_domain.ads[0])
tasks.establish_trust_with_ad(host, ad)
def remove_trust_with_ad(self, args):
self.require_ad_domain(args)
host = self.get_host(args.host, default=args.domain.master)
ad = self.get_host(args.ad, default=args.ad_domain.ads[0])
tasks.remove_trust_with_ad(host, ad)
def configure_auth_to_local_rule(self, args):
self.require_ad_domain(args)
host = self.get_host(args.host, default=args.domain.master)
ad = self.get_host(args.ad, default=args.ad_domain.ads[0])
tasks.configure_auth_to_local_rule(host, ad)
def clear_sssd_cache(self, args):
host = self.get_host(args.host, default=args.domain.master)
tasks.clear_sssd_cache(host)
def setup_sssd_debugging(self, args):
host = self.get_host(args.host, default=args.domain.master)
tasks.setup_sssd_debugging(host)
def sync_time(self, args):
host = self.get_host(args.host, default=args.domain.master)
server = self.get_host(args.server)
tasks.sync_time(host, server)
def add_a_records_for_hosts_in_master_domain(self, args):
master = self.get_host(args.master, default=args.domain.master)
tasks.add_a_records_for_hosts_in_master_domain(master)
def add_a_record(self, args):
master = self.get_host(args.master, default=args.domain.master)
host = self.get_host(args.host)
tasks.add_a_record(master, host)
if __name__ == '__main__':
exit(TaskRunner().main(sys.argv[1:]))