Add a framework for integration testing

Add methods to run commands and copy files to Host objects.
Adds a base class for integration tests which can currently install
and uninstall IPA in a "star" topology with per-test specified number
of hosts.
A simple test for user replication between two masters is provided.
Log files from the remote hosts can be marked for collection, but the
actual collection is left to a Nose plugin.

Part of the work for: https://fedorahosted.org/freeipa/ticket/3621
This commit is contained in:
Petr Viktorin
2013-05-28 13:31:37 +02:00
parent c577420e40
commit 353f3c62c3
6 changed files with 450 additions and 55 deletions

View File

@@ -300,6 +300,7 @@ Requires: python-nose
Requires: python-paste
Requires: python-coverage
Requires: python-polib
Requires: python-paramiko >= 1.10.1
%description tests
IPA is an integrated solution to provide centrally managed Identity (machine,

View File

@@ -0,0 +1,120 @@
# Authors:
# Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2013 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Base class for FreeIPA integration tests"""
import os
import nose
from ipapython.ipa_log_manager import log_mgr
from ipatests.test_integration.config import get_global_config, env_to_script
from ipatests.test_integration import tasks
from ipatests.order_plugin import ordered
log = log_mgr.get_logger(__name__)
@ordered
class IntegrationTest(object):
num_replicas = 0
num_clients = 0
topology = 'none'
@classmethod
def setup_class(cls):
config = get_global_config()
if not config.domains:
raise nose.SkipTest('Integration testing not configured')
cls.logs_to_collect = {}
domain = config.domains[0]
cls.master = domain.master
if len(domain.replicas) < cls.num_replicas:
raise nose.SkipTest(
'Not enough replicas available (have %s, need %s)' %
(len(domain.replicas), cls.num_replicas))
if len(domain.clients) < cls.num_clients:
raise nose.SkipTest(
'Not enough clients available (have %s, need %s)' %
(len(domain.clients), cls.num_clients))
cls.replicas = domain.replicas[:cls.num_replicas]
cls.clients = domain.clients[:cls.num_clients]
for host in cls.get_all_hosts():
cls.prepare_host(host)
cls.install()
cls.kinit_all()
@classmethod
def get_all_hosts(cls):
return [cls.master] + cls.replicas + cls.clients
@classmethod
def prepare_host(cls, host):
log.info('Preparing host %s', host.hostname)
env_filename = os.path.join(host.config.test_dir, 'env.sh')
cls.collect_log(host, env_filename)
host.mkdir_recursive(host.config.test_dir)
host.put_file_contents(env_filename, env_to_script(host.to_env()))
@classmethod
def install(cls):
if cls.topology == 'none':
return
elif cls.topology == 'star':
tasks.install_master(cls.master, collect_log=cls.collect_log)
for replica in cls.replicas:
tasks.install_replica(cls.master, replica,
collect_log=cls.collect_log)
else:
raise ValueError('Unknown topology %s' % cls.topology)
@classmethod
def kinit_all(cls):
for host in cls.get_all_hosts():
host.run_command(['kinit', 'admin'],
stdin_text=host.config.admin_password)
@classmethod
def teardown_class(cls):
try:
cls.uninstall()
finally:
del cls.logs_to_collect
del cls.master
del cls.replicas
del cls.clients
@classmethod
def uninstall(cls):
cls.master.run_command(['ipa-server-install', '--uninstall', '-U'])
for replica in cls.replicas:
replica.run_command(['ipa-server-install', '--uninstall', '-U'])
for client in cls.clients:
client.run_command(['ipa-client-install', '--uninstall', '-U'])
@classmethod
def collect_log(cls, host, filename):
cls.log.info('Adding %s:%s to list of logs to collect' %
(host.hostname, filename))
cls.logs_to_collect.setdefault(host, []).append(filename)
IntegrationTest.log = log_mgr.get_logger(IntegrationTest())

View File

@@ -22,11 +22,11 @@
import os
import collections
import random
import socket
from ipapython import ipautil
from ipapython.dn import DN
from ipapython.ipa_log_manager import log_mgr
from ipatests.test_integration.host import Host
class Config(object):
@@ -36,6 +36,7 @@ class Config(object):
admin_password = kwargs.get('admin_password') or 'Secret123'
self.test_dir = kwargs.get('test_dir', '/root/ipatests')
self.root_password = kwargs.get('root_password')
self.ipv6 = bool(kwargs.get('ipv6', False))
self.debug = bool(kwargs.get('debug', False))
self.admin_name = kwargs.get('admin_name') or 'admin'
@@ -62,6 +63,7 @@ class Config(object):
by default /root/ipatests
IPv6SETUP: "TRUE" if setting up with IPv6
IPADEBUG: non-empty if debugging is turned on
IPA_ROOT_SSH_PASSWORD: SSH password for root
ADMINID: Administrator username
ADMINPW: Administrator password
@@ -84,6 +86,7 @@ class Config(object):
self = cls(test_dir=env.get('IPATEST_DIR') or '/root/ipatests',
ipv6=(env.get('IPv6SETUP') == 'TRUE'),
debug=env.get('IPADEBUG'),
root_password=env.get('IPA_ROOT_SSH_PASSWORD'),
admin_name=env.get('ADMINID'),
admin_password=env.get('ADMINPW'),
dirman_dn=env.get('ROOTDN'),
@@ -111,6 +114,7 @@ class Config(object):
env['IPATEST_DIR'] = self.test_dir
env['IPv6SETUP'] = 'TRUE' if self.ipv6 else ''
env['IPADEBUG'] = 'TRUE' if self.debug else ''
env['IPA_ROOT_SSH_PASSWORD'] = self.root_password or ''
env['ADMINID'] = self.admin_name
env['ADMINPW'] = self.admin_password
@@ -292,60 +296,6 @@ class Domain(object):
raise LookupError(name)
class Host(object):
"""Configuration for an IPA host"""
def __init__(self, domain, hostname, role, index):
self.log = log_mgr.get_logger(self)
self.domain = domain
self.role = role
self.index = index
shortname, dot, ext_domain = hostname.partition('.')
self.hostname = shortname + '.' + self.domain.name
self.external_hostname = hostname
if self.config.ipv6:
# $(dig +short $M $rrtype|tail -1)
stdout, stderr, returncode = ipautil.run(
['dig', '+short', self.external_hostname, 'AAAA'])
self.ip = stdout.splitlines()[-1].strip()
else:
try:
self.ip = socket.gethostbyname(self.external_hostname)
except socket.gaierror:
self.ip = None
if not self.ip:
self.ip = ''
self.role = 'other'
@classmethod
def from_env(cls, env, domain, hostname, role, index):
self = cls(domain, hostname, role, index)
return self
@property
def config(self):
return self.domain.config
def to_env(self, **kwargs):
"""Return environment variables specific to this host"""
env = self.domain.to_env(**kwargs)
role = self.role.upper()
if self.role != 'master':
role += str(self.index)
env['MYHOSTNAME'] = self.hostname
env['MYBREAKERHOSTNAME'] = self.external_hostname
env['MYIP'] = self.ip
env['MYROLE'] = '%s%s' % (role, self.domain._env)
env['MYENV'] = str(self.domain.index)
return env
def env_to_script(env):
return ''.join(['export %s=%s\n' % (key, ipautil.shell_quote(value))
for key, value in env.items()])

View File

@@ -0,0 +1,183 @@
# Authors:
# Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2013 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Host class for integration testing"""
import os
import collections
import socket
import paramiko
from ipapython import ipautil
from ipapython.ipa_log_manager import log_mgr
RunResult = collections.namedtuple('RunResult', 'output exit_code')
class Host(object):
"""Configuration for an IPA host"""
def __init__(self, domain, hostname, role, index):
self.log = log_mgr.get_logger(self)
self.domain = domain
self.role = role
self.index = index
shortname, dot, ext_domain = hostname.partition('.')
self.hostname = shortname + '.' + self.domain.name
self.external_hostname = hostname
if self.config.ipv6:
# $(dig +short $M $rrtype|tail -1)
stdout, stderr, returncode = ipautil.run(
['dig', '+short', self.external_hostname, 'AAAA'])
self.ip = stdout.splitlines()[-1].strip()
else:
try:
self.ip = socket.gethostbyname(self.external_hostname)
except socket.gaierror:
self.ip = None
if not self.ip:
self.ip = ''
self.role = 'other'
self.root_password = self.config.root_password
self.host_key = None
self.ssh_port = 22
self.env_sh_path = os.path.join(domain.config.test_dir, 'env.sh')
self.log = log_mgr.get_logger('%s.%s.%s' % (
self.__module__, type(self).__name__, self.hostname))
def __repr__(self):
template = ('<{s.__module__}.{s.__class__.__name__} '
'{s.hostname} ({s.role})>')
return template.format(s=self)
@classmethod
def from_env(cls, env, domain, hostname, role, index):
self = cls(domain, hostname, role, index)
return self
@property
def config(self):
return self.domain.config
def to_env(self, **kwargs):
"""Return environment variables specific to this host"""
env = self.domain.to_env(**kwargs)
role = self.role.upper()
if self.role != 'master':
role += str(self.index)
env['MYHOSTNAME'] = self.hostname
env['MYBEAKERHOSTNAME'] = self.external_hostname
env['MYIP'] = self.ip
env['MYROLE'] = '%s%s' % (role, self.domain._env)
env['MYENV'] = str(self.domain.index)
return env
def run_command(self, argv, set_env=True, stdin_text=None,
ignore_stdout=False):
assert argv
self.log.info('RUN %s', argv)
ssh = self.transport.open_channel('session')
try:
ssh.invoke_shell()
ssh.set_combine_stderr(True)
stdin = ssh.makefile('wb')
stdout = ssh.makefile('rb')
if set_env:
stdin.write('. %s\n' % self.env_sh_path)
stdin.write('set -ex\n')
for arg in argv:
stdin.write(ipautil.shell_quote(arg))
stdin.write(' ')
if stdin_text:
stdin_filename = os.path.join(self.config.test_dir, 'stdin')
with self.sftp.open(stdin_filename, 'w') as f:
f.write(stdin_text)
stdin.write('<')
stdin.write(stdin_filename)
else:
stdin.write('< /dev/null')
if ignore_stdout:
stdin.write('> /dev/null')
stdin.write('\n')
ssh.shutdown_write()
output = []
for line in stdout:
output.append(line)
self.log.info(' %s', line.strip('\n'))
exit_status = ssh.recv_exit_status()
self.log.info(' -> Exit code %s', exit_status)
if exit_status:
raise RuntimeError('Command %s exited with error code %s' % (
argv[0], exit_status))
return RunResult(''.join(output), exit_status)
finally:
ssh.close()
@property
def transport(self):
"""Paramiko Transport connected to this host"""
try:
return self._transport
except AttributeError:
sock = socket.create_connection((self.hostname, self.ssh_port))
self._transport = transport = paramiko.Transport(sock)
transport.connect(hostkey=self.host_key, username='root',
password=self.root_password)
return transport
@property
def sftp(self):
"""Paramiko SFTPClient connected to this host"""
try:
return self._sftp
except AttributeError:
transport = self.transport
self._sftp = paramiko.SFTPClient.from_transport(transport)
return self._sftp
def mkdir_recursive(self, path):
"""`mkdir -p` on the remote host"""
try:
self.sftp.chdir(path)
except IOError:
self.mkdir_recursive(os.path.dirname(path))
self.sftp.mkdir(path)
self.sftp.chdir(path)
def get_file_contents(self, filename):
self.log.info('READ %s', filename)
with self.sftp.open(filename) as f:
return f.read()
def put_file_contents(self, filename, contents):
self.log.info('WRITE %s', filename)
with self.sftp.open(filename, 'w') as f:
return f.write(contents)

View File

@@ -0,0 +1,90 @@
# Authors:
# Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2013 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Common tasks for FreeIPA integration tests"""
import os
import textwrap
from ipapython.ipa_log_manager import log_mgr
log = log_mgr.get_logger(__name__)
def enable_replication_debugging(host):
log.info('Enable LDAP replication logging')
logging_ldif = textwrap.dedent("""
dn: cn=config
changetype: modify
replace: nsslapd-errorlog-level
nsslapd-errorlog-level: 8192
""")
host.run_command(['ldapmodify', '-x',
'-D', str(host.config.dirman_dn),
'-w', host.config.dirman_password],
stdin_text=logging_ldif)
def install_master(host, collect_log=None):
if collect_log:
collect_log(host, '/var/log/ipaserver-install.log')
collect_log(host, '/var/log/ipaclient-install.log')
inst = host.domain.realm.replace('.', '-')
collect_log(host, '/var/log/dirsrv/slapd-%s/errors' % inst)
collect_log(host, '/var/log/dirsrv/slapd-%s/access' % inst)
host.run_command(['ipa-server-install', '-U',
'-r', host.domain.name,
'-p', host.config.dirman_password,
'-a', host.config.admin_password,
'--setup-dns',
'--forwarder', host.config.dns_forwarder])
enable_replication_debugging(host)
def install_replica(master, replica, collect_log=None):
if collect_log:
collect_log(replica, '/var/log/ipareplica-install.log')
collect_log(replica, '/var/log/ipareplica-conncheck.log')
master.run_command(['ipa-replica-prepare',
'-p', replica.config.dirman_password,
'--ip-address', replica.ip,
replica.hostname])
replica_bundle = master.get_file_contents(
'/var/lib/ipa/replica-info-%s.gpg' % replica.hostname)
replica_filename = os.path.join(replica.config.test_dir,
'replica-info.gpg')
replica.put_file_contents(replica_filename, replica_bundle)
replica.run_command(['ipa-replica-install', '-U',
'-p', replica.config.dirman_password,
'-w', replica.config.admin_password,
'--ip-address', replica.ip,
replica_filename])
enable_replication_debugging(replica)
def connect_replica(master, replica=None):
if replica is None:
args = [replica.hostname, master.hostname]
else:
args = [master.hostname]
replica.run_command(['ipa-replica-manage', 'connect'] + args)

View File

@@ -0,0 +1,51 @@
# Authors:
# Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2013 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
from ipatests.test_integration.base import IntegrationTest
class TestSimpleReplication(IntegrationTest):
num_replicas = 1
topology = 'star'
def test_user_replication_to_replica(self):
login = 'testuser1'
self.master.run_command(['ipa', 'user-add', login,
'--first', 'test',
'--last', 'user'])
self.log.debug('Sleeping so replication has a chance to finish')
time.sleep(5)
result = self.replicas[0].run_command(['ipa', 'user-show', login])
assert 'User login: %s' % login in result.output
def test_user_replication_to_master(self):
login = 'testuser2'
self.replicas[0].run_command(['ipa', 'user-add', login,
'--first', 'test',
'--last', 'user'])
self.log.debug('Sleeping so replication has a chance to finish')
time.sleep(5)
result = self.master.run_command(['ipa', 'user-show', login])
assert 'User login: %s' % login in result.output