Integration tests for topology plugin

Reviewed-By: Martin Basti <mbasti@redhat.com>
Reviewed-By: Tomas Babej <tbabej@redhat.com>
This commit is contained in:
Oleg Fayans 2015-08-27 19:19:28 +02:00 committed by Martin Basti
parent 0914cb663e
commit c7408f67f6
3 changed files with 241 additions and 21 deletions

View File

@ -40,6 +40,24 @@ from ipatests.test_integration.host import Host
log = log_mgr.get_logger(__name__)
def check_arguments_are(slice, instanceof):
"""
:param: slice - tuple of integers denoting the beginning and the end
of argument list to be checked
:param: instanceof - name of the class the checked arguments should be
instances of
Example: @check_arguments_are((1, 3), int) will check that the second
and third arguments are integers
"""
def wrapper(func):
def wrapped(*args):
for i in args[slice[0]:slice[1]]:
assert isinstance(i, instanceof), "Wrong type: %s: %s" % (i, type(i))
return func(*args)
return wrapped
return wrapper
def prepare_host(host):
if isinstance(host, Host):
env_filename = os.path.join(host.config.test_dir, 'env.sh')
@ -118,7 +136,8 @@ def fix_apache_semaphores(master):
if systemd_available:
master.run_command(['systemctl', 'stop', 'httpd'], raiseonerr=False)
else:
master.run_command([paths.SBIN_SERVICE, 'httpd', 'stop'], raiseonerr=False)
master.run_command([paths.SBIN_SERVICE, 'httpd', 'stop'],
raiseonerr=False)
master.run_command('for line in `ipcs -s | grep apache | cut -d " " -f 2`; '
'do ipcrm -s $line; done', raiseonerr=False)
@ -261,7 +280,7 @@ def install_client(master, client, extra_args=()):
'-p', client.config.admin_name,
'-w', client.config.admin_password,
'--server', master.hostname]
+ list(extra_args))
+ list(extra_args))
setup_sssd_debugging(client)
kinit_admin(client)
@ -298,10 +317,9 @@ def install_adtrust(host):
else:
host.run_command(['systemctl', 'restart', 'named'])
# Check that named is running and has loaded the information from LDAP
dig_command = ['dig', 'SRV', '+short', '@localhost',
'_ldap._tcp.%s' % host.domain.name]
'_ldap._tcp.%s' % host.domain.name]
dig_output = '0 100 389 %s.' % host.hostname
dig_test = lambda x: re.search(re.escape(dig_output), x)
@ -373,9 +391,9 @@ def establish_trust_with_ad(master, ad, extra_args=()):
master.run_command(['smbcontrol', 'all', 'debug', '100'])
util.run_repeatedly(master,
['ipa', 'trust-add',
'--type', 'ad', ad.domain.name,
'--admin', 'Administrator',
'--password'] + list(extra_args),
'--type', 'ad', ad.domain.name,
'--admin', 'Administrator',
'--password'] + list(extra_args),
stdin_text=master.config.ad_admin_password)
master.run_command(['smbcontrol', 'all', 'debug', '1'])
clear_sssd_cache(master)
@ -428,15 +446,14 @@ def setup_sssd_debugging(host):
# First, remove any previous occurences
host.run_command(['sed', '-i',
'/debug_level = 7/d',
paths.SSSD_CONF
], raiseonerr=False)
paths.SSSD_CONF],
raiseonerr=False)
# Add the debug directive to each section
host.run_command(['sed', '-i',
'/\[*\]/ a\debug_level = 7',
paths.SSSD_CONF
], raiseonerr=False)
paths.SSSD_CONF],
raiseonerr=False)
host.collect_log('/var/log/sssd/*')
@ -492,7 +509,7 @@ def disconnect_replica(master, replica):
def kinit_admin(host):
host.run_command(['kinit', 'admin'],
stdin_text=host.config.admin_password)
stdin_text=host.config.admin_password)
def uninstall_master(host):
@ -507,8 +524,9 @@ def uninstall_master(host):
paths.SYSCONFIG_PKI_TOMCAT,
paths.SYSCONFIG_PKI_TOMCAT_PKI_TOMCAT_DIR,
paths.VAR_LIB_PKI_TOMCAT_DIR,
paths.PKI_TOMCAT],
raiseonerr=False)
paths.PKI_TOMCAT,
paths.REPLICA_INFO_GPG_TEMPLATE % host.hostname],
raiseonerr=False)
unapply_fixes(host)
@ -520,6 +538,56 @@ def uninstall_client(host):
unapply_fixes(host)
@check_arguments_are((0, 2), Host)
def clean_replication_agreement(master, replica):
"""
Performs `ipa-replica-manage del replica_hostname --force`.
"""
master.run_command(['ipa-replica-manage',
'del',
replica.hostname,
'--force'])
@check_arguments_are((0, 3), Host)
def create_segment(master, leftnode, rightnode):
"""
creates a topology segment. The first argument is a node to run the command
:returns: a hash object containing segment's name, leftnode, rightnode
information and an error string.
"""
kinit_admin(master)
lefthost = leftnode.hostname
righthost = rightnode.hostname
segment_name = "%s-to-%s" % (lefthost, righthost)
result = master.run_command(["ipa", "topologysegment-add", "realm",
segment_name,
"--leftnode=%s" % lefthost,
"--rightnode=%s" % righthost], raiseonerr=False)
if result.returncode == 0:
return {'leftnode': lefthost,
'rightnode': righthost,
'name': segment_name}, ""
else:
return {}, result.stderr_text
def destroy_segment(master, segment_name):
"""
Destroys topology segment.
:param master: reference to master object of class Host
:param segment_name: name of the segment to be created
"""
assert isinstance(master, Host), "master should be an instance of Host"
kinit_admin(master)
command = ["ipa",
"topologysegment-del",
"realm",
segment_name]
result = master.run_command(command, raiseonerr=False)
return result.returncode, result.stderr_text
def get_topo(name_or_func):
"""Get a topology function by name

View File

@ -0,0 +1,148 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
import re
import time
from ipatests.test_integration.base import IntegrationTest
from ipatests.test_integration import tasks
class TestTopologyOptions(IntegrationTest):
num_replicas = 2
topology = 'star'
rawsegment_re = ('Segment name: (?P<name>.*?)',
'\s+Left node: (?P<lnode>.*?)',
'\s+Right node: (?P<rnode>.*?)',
'\s+Connectivity: (?P<connectivity>\S+)')
segment_re = re.compile("\n".join(rawsegment_re))
noentries_re = re.compile("Number of entries returned (\d+)")
@classmethod
def install(cls, mh):
tasks.install_topo(cls.topology, cls.master,
cls.replicas[:-1],
cls.clients)
def tokenize_topologies(self, command_output):
"""
takes an output of `ipa topologysegment-find` and returns an array of
segment hashes
"""
segments = command_output.split("-----------------")[2]
raw_segments = segments.split('\n\n')
result = []
for i in raw_segments:
matched = self.segment_re.search(i)
if matched:
result.append({'leftnode': matched.group('lnode'),
'rightnode': matched.group('rnode'),
'name': matched.group('name'),
'connectivity': matched.group('connectivity')
}
)
return result
def test_topology_updated_on_replica_install_remove(self):
"""
Install and remove a replica and make sure topology information is
updated on all other replicas
Testcase: http://www.freeipa.org/page/V4/Manage_replication_topology/
Test_plan#Test_case:
_Replication_topology_should_be_saved_in_the_LDAP_tree
"""
tasks.kinit_admin(self.master)
result1 = self.master.run_command(['ipa', 'topologysegment-find',
'realm'])
first_segment_name = "%s-to-%s" % (self.master.hostname,
self.replicas[0].hostname)
output1 = result1.stdout_text
firstsegment = self.tokenize_topologies(output1)[0]
assert(firstsegment['name'] == first_segment_name)
assert(self.noentries_re.search(output1).group(1) == "1")
assert(firstsegment['leftnode'] == self.master.hostname)
assert(firstsegment['rightnode'] == self.replicas[0].hostname)
tasks.install_replica(self.master, self.replicas[1], setup_ca=False,
setup_dns=False)
# We need to make sure topology information is consistent across all
# replicas
result2 = self.master.run_command(['ipa', 'topologysegment-find',
'realm'])
result3 = self.replicas[0].run_command(['ipa', 'topologysegment-find',
'realm'])
result4 = self.replicas[1].run_command(['ipa', 'topologysegment-find',
'realm'])
segments = self.tokenize_topologies(result2.stdout_text)
assert(len(segments) == 2)
assert(result2.stdout_text == result3.stdout_text)
assert(result3.stdout_text == result4.stdout_text)
# Now let's check that uninstalling the replica will update the topology
# info on the rest of replicas.
tasks.uninstall_master(self.replicas[1])
tasks.clean_replication_agreement(self.master, self.replicas[1])
result5 = self.master.run_command(['ipa', 'topologysegment-find',
'realm'])
assert(self.noentries_re.search(result5.stdout_text).group(1) == "1")
def test_add_remove_segment(self):
"""
Make sure a topology segment can be manually created and deleted
with the influence on the real topology
Testcase http://www.freeipa.org/page/V4/Manage_replication_topology/
Test_plan#Test_case:_Basic_CRUD_test
"""
tasks.kinit_admin(self.master)
# Install the second replica
tasks.install_replica(self.master, self.replicas[1], setup_ca=False,
setup_dns=False)
# turn a star into a ring
segment, err = tasks.create_segment(self.master,
self.replicas[0],
self.replicas[1])
assert err == "", err
# Make sure the new segment is shown by `ipa topologysegment-find`
result1 = self.master.run_command(['ipa', 'topologysegment-find',
'realm'])
assert(result1.stdout_text.find(segment['name']) > 0)
# Remove master <-> replica2 segment and make sure that the changes get
# there through replica1
deleteme = "%s-to-%s" % (self.master.hostname,
self.replicas[1].hostname)
returncode, error = tasks.destroy_segment(self.master, deleteme)
assert returncode == 0, error
# make sure replica1 does not have segment that was deleted on master
result3 = self.replicas[0].run_command(['ipa', 'topologysegment-find',
'realm'])
assert(result3.stdout_text.find(deleteme) < 0)
# Create test data on master and make sure it gets all the way down to
# replica2 through replica1
self.master.run_command(['ipa', 'user-add', 'someuser',
'--first', 'test',
'--last', 'user'])
time.sleep(60) # replication requires some time
users_on_replica2 = self.replicas[1].run_command(['ipa',
'user-find'])
assert(users_on_replica2.find('someuser') > 0)
# We end up having a line topology: master <-> replica1 <-> replica2
def test_remove_the_only_connection(self):
"""
Testcase: http://www.freeipa.org/page/V4/Manage_replication_topology/
Test_plan#Test_case:
_Removal_of_a_topology_segment_is_allowed_only_if_there_is_at_least_one_more_segment_connecting_the_given_replica
"""
text = "Removal of Segment disconnects topology"
error1 = "The system should not have let you remove the segment"
error2 = "Wrong error message thrown during segment removal: \"%s\""
replicas = (self.replicas[0].hostname, self.replicas[1].hostname)
returncode, error = tasks.destroy_segment(self.master, "%s-to-%s" % replicas)
assert returncode != 0, error1
assert error.count(text) == 1, error2 % error
newseg, err = tasks.create_segment(self.master,
self.master,
self.replicas[1])
assert err == "", err
returncode, error = tasks.destroy_segment(self.master, "%s-to-%s" % replicas)
assert returncode == 0, error

View File

@ -2,18 +2,23 @@
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
import io
import os
from ipaserver.plugins.ldap2 import ldap2
from ipalib import api
from ipapython import ipautil
from ipapython.dn import DN
import nose
import pytest
class TestTopologyPlugin(object):
"""
Test Topology plugin from the DS point of view
Testcase: http://www.freeipa.org/page/V4/Manage_replication_topology/
Test_plan#Test_case:
_Replication_Topology_is_listed_among_directory_server_plugins
"""
pwfile = os.path.join(api.env.dot_ipa, ".dmpw")
def setup(self):
"""
@ -25,6 +30,8 @@ class TestTopologyPlugin(object):
if self.conn and self.conn.isconnected():
self.conn.disconnect()
@pytest.mark.skipif(ipautil.file_exists(pwfile) is False,
reason="You did not provide a .dmpw file with the DM password")
def test_topologyplugin(self):
pluginattrs = {
u'nsslapd-pluginPath': [u'libtopology'],
@ -56,11 +63,8 @@ class TestTopologyPlugin(object):
('cn', 'plugins'),
('cn', 'config'))
pwfile = os.path.join(api.env.dot_ipa, ".dmpw")
if ipautil.file_exists(pwfile):
with open(pwfile, "r") as f:
dm_password = f.read().rstrip()
else:
raise nose.SkipTest("No directory manager password in %s" % pwfile)
with io.open(pwfile, "r") as f:
dm_password = f.read().rstrip()
self.conn = ldap2(api)
self.conn.connect(bind_dn=DN(('cn', 'directory manager')),
bind_pw=dm_password)