mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-25 08:21:05 -06:00
DNS Locations: extend tests with server-* commands
https://fedorahosted.org/freeipa/ticket/2008 Reviewed-By: Petr Spacek <pspacek@redhat.com> Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
parent
fd2bd60383
commit
42719acdce
@ -5,12 +5,14 @@ from __future__ import absolute_import
|
||||
|
||||
import pytest
|
||||
|
||||
from ipalib import errors
|
||||
from ipalib import errors, api
|
||||
from ipatests.test_xmlrpc.tracker.location_plugin import LocationTracker
|
||||
from ipatests.test_xmlrpc.tracker.server_plugin import ServerTracker
|
||||
from ipatests.test_xmlrpc.xmlrpc_test import (
|
||||
XMLRPC_test,
|
||||
raises_exact,
|
||||
)
|
||||
from ipapython.dnsutil import DNSName
|
||||
|
||||
|
||||
@pytest.fixture(scope='class', params=[u'location1', u'sk\xfa\u0161ka.idna'])
|
||||
@ -31,6 +33,12 @@ def location_absolute(request):
|
||||
return tracker.make_fixture(request)
|
||||
|
||||
|
||||
@pytest.fixture(scope='class')
|
||||
def server(request):
|
||||
tracker = ServerTracker(api.env.host)
|
||||
return tracker
|
||||
|
||||
|
||||
@pytest.mark.tier1
|
||||
class TestNonexistentIPALocation(XMLRPC_test):
|
||||
def test_retrieve_nonexistent(self, location):
|
||||
@ -75,7 +83,7 @@ class TestInvalidIPALocations(XMLRPC_test):
|
||||
class TestCRUD(XMLRPC_test):
|
||||
def test_create_duplicate(self, location):
|
||||
location.ensure_exists()
|
||||
command = location.make_create_command(force=True)
|
||||
command = location.make_create_command()
|
||||
with raises_exact(errors.DuplicateEntry(
|
||||
message=u'location with name "%s" already exists' %
|
||||
location.idnsname)):
|
||||
@ -111,3 +119,82 @@ class TestCRUD(XMLRPC_test):
|
||||
|
||||
def test_delete_location(self, location):
|
||||
location.delete()
|
||||
|
||||
|
||||
@pytest.mark.tier1
|
||||
class TestLocationsServer(XMLRPC_test):
|
||||
|
||||
def test_add_nonexistent_location_to_server(self, server):
|
||||
nonexistent_loc = DNSName(u'nonexistent-location')
|
||||
command = server.make_update_command(
|
||||
updates=dict(
|
||||
ipalocation_location=nonexistent_loc,
|
||||
)
|
||||
)
|
||||
with raises_exact(errors.NotFound(
|
||||
reason=u"{location}: location not found".format(
|
||||
location=nonexistent_loc
|
||||
))):
|
||||
command()
|
||||
|
||||
def test_add_location_to_server(self, location, server):
|
||||
location.ensure_exists()
|
||||
server.update(
|
||||
dict(ipalocation_location=location.idnsname_obj),
|
||||
expected_updates=dict(
|
||||
ipalocation_location=[location.idnsname_obj],
|
||||
)
|
||||
)
|
||||
location.add_server_to_location(server.server_name)
|
||||
location.retrieve()
|
||||
|
||||
def test_retrieve(self, server):
|
||||
server.retrieve()
|
||||
|
||||
def test_retrieve_all(self, server):
|
||||
server.retrieve(all=True)
|
||||
|
||||
def test_search_server_with_location(self, location, server):
|
||||
command = server.make_find_command(
|
||||
server.server_name, in_location=location.idnsname_obj)
|
||||
result = command()
|
||||
server.check_find(result)
|
||||
|
||||
def test_search_server_with_location_with_all(self, location, server):
|
||||
command = server.make_find_command(
|
||||
server.server_name, in_location=location.idnsname_obj, all=True)
|
||||
result = command()
|
||||
server.check_find(result, all=True)
|
||||
|
||||
def test_search_server_without_location(self, location, server):
|
||||
command = server.make_find_command(
|
||||
server.server_name, not_in_location=location.idnsname_obj)
|
||||
result = command()
|
||||
server.check_find_nomatch(result)
|
||||
|
||||
def test_add_location_to_server_custom_weight(self, location, server):
|
||||
location.ensure_exists()
|
||||
server.update(
|
||||
dict(
|
||||
ipalocation_location=location.idnsname_obj,
|
||||
ipalocationweight=200,
|
||||
),
|
||||
expected_updates=dict(
|
||||
ipalocation_location=[location.idnsname_obj],
|
||||
ipalocationweight=[u'200'],
|
||||
)
|
||||
)
|
||||
# remove invalid data from the previous test
|
||||
location.remove_server_from_location(server.server_name)
|
||||
|
||||
location.add_server_to_location(server.server_name, weight=200)
|
||||
location.retrieve()
|
||||
|
||||
def test_remove_location_from_server(self, location, server):
|
||||
server.update(dict(ipalocation_location=None))
|
||||
location.remove_server_from_location(server.server_name)
|
||||
location.retrieve()
|
||||
|
||||
def test_remove_location_weight_from_server(self, location, server):
|
||||
server.update(dict(ipalocationweight=None))
|
||||
location.retrieve()
|
||||
|
@ -281,6 +281,10 @@ class Tracker(object):
|
||||
result = command()
|
||||
self.attrs.update(updates)
|
||||
self.attrs.update(expected_updates)
|
||||
for key, value in self.attrs.items():
|
||||
if value is None:
|
||||
del self.attrs[key]
|
||||
|
||||
self.check_update(result, extra_keys=set(updates.keys()) |
|
||||
set(expected_updates.keys()))
|
||||
|
||||
|
@ -3,19 +3,25 @@
|
||||
#
|
||||
from __future__ import absolute_import
|
||||
|
||||
import six
|
||||
|
||||
from ipapython.dn import DN
|
||||
from ipapython.dnsutil import DNSName
|
||||
from ipatests.util import assert_deepequal
|
||||
from ipatests.test_xmlrpc.tracker.base import Tracker
|
||||
|
||||
|
||||
if six.PY3:
|
||||
unicode = str
|
||||
|
||||
|
||||
class LocationTracker(Tracker):
|
||||
"""Tracker for IPA Location tests"""
|
||||
retrieve_keys = {'idnsname', 'description', 'dn'}
|
||||
retrieve_keys = {'idnsname', 'description', 'dn', 'servers_server'}
|
||||
retrieve_all_keys = retrieve_keys | {'objectclass'}
|
||||
create_keys = retrieve_keys | {'objectclass'}
|
||||
find_keys = retrieve_keys
|
||||
find_all_keys = retrieve_all_keys
|
||||
create_keys = {'idnsname', 'description', 'dn', 'objectclass'}
|
||||
find_keys = {'idnsname', 'description', 'dn',}
|
||||
find_all_keys = find_keys | {'objectclass'}
|
||||
update_keys = {'idnsname', 'description'}
|
||||
|
||||
def __init__(self, name, description=u"Location description"):
|
||||
@ -34,13 +40,15 @@ class LocationTracker(Tracker):
|
||||
'cn=etc', self.api.env.basedn
|
||||
)
|
||||
|
||||
self.servers = {}
|
||||
|
||||
def make_create_command(self, force=None):
|
||||
"""Make function that creates this location using location-add"""
|
||||
return self.make_command(
|
||||
'location_add', self.idnsname, description=self.description,
|
||||
)
|
||||
|
||||
def make_delete_command(self):
|
||||
def make_delete_command(self, force=None):
|
||||
"""Make function that removes this location using location-del"""
|
||||
return self.make_command('location_del', self.idnsname)
|
||||
|
||||
@ -95,6 +103,7 @@ class LocationTracker(Tracker):
|
||||
value=self.idnsname_obj,
|
||||
summary=None,
|
||||
result=expected,
|
||||
servers=self.servers,
|
||||
), result)
|
||||
|
||||
def check_find(self, result, all=False, raw=False):
|
||||
@ -117,3 +126,26 @@ class LocationTracker(Tracker):
|
||||
summary=u'Modified IPA location "{loc}"'.format(loc=self.idnsname),
|
||||
result=self.filter_attrs(self.update_keys | set(extra_keys))
|
||||
), result)
|
||||
|
||||
def add_server_to_location(
|
||||
self, server_name, weight=100, relative_weight=u"100.0%"):
|
||||
self.attrs.setdefault('servers_server', []).append(server_name)
|
||||
self.servers[server_name] = {
|
||||
'cn': [server_name],
|
||||
'ipalocationweight': [unicode(weight)],
|
||||
'location_relative_weight': [relative_weight]
|
||||
}
|
||||
|
||||
def remove_server_from_location(self, server_name):
|
||||
if 'servers_server' in self.attrs:
|
||||
try:
|
||||
self.attrs['servers_server'].remove(server_name)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
if not self.attrs['servers_server']:
|
||||
del self.attrs['servers_server']
|
||||
try:
|
||||
del self.servers[server_name]
|
||||
except KeyError:
|
||||
pass
|
||||
|
110
ipatests/test_xmlrpc/tracker/server_plugin.py
Normal file
110
ipatests/test_xmlrpc/tracker/server_plugin.py
Normal file
@ -0,0 +1,110 @@
|
||||
#
|
||||
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
|
||||
#
|
||||
from __future__ import absolute_import
|
||||
|
||||
from ipapython.dn import DN
|
||||
from ipatests.util import assert_deepequal
|
||||
from ipatests.test_xmlrpc.tracker.base import Tracker
|
||||
|
||||
|
||||
class ServerTracker(Tracker):
|
||||
"""Tracker for IPA Location tests"""
|
||||
retrieve_keys = {
|
||||
'cn', 'dn', 'ipamaxdomainlevel', 'ipamindomainlevel',
|
||||
'iparepltopomanagedsuffix_topologysuffix', 'ipalocation_location',
|
||||
'ipalocationweight',
|
||||
}
|
||||
retrieve_all_keys = retrieve_keys | {'objectclass'}
|
||||
create_keys = retrieve_keys | {'objectclass'}
|
||||
find_keys = {
|
||||
'cn', 'dn', 'ipamaxdomainlevel', 'ipamindomainlevel',
|
||||
'ipalocationweight',
|
||||
}
|
||||
find_all_keys = retrieve_all_keys
|
||||
update_keys = {
|
||||
'cn', 'ipamaxdomainlevel', 'ipamindomainlevel',
|
||||
'ipalocation_location', 'ipalocationweight',
|
||||
}
|
||||
|
||||
def __init__(self, name):
|
||||
super(ServerTracker, self).__init__(default_version=None)
|
||||
self.server_name = name
|
||||
self.dn = DN(
|
||||
('cn', self.server_name),
|
||||
'cn=masters,cn=ipa,cn=etc',
|
||||
self.api.env.basedn
|
||||
)
|
||||
self.exists = True # we cannot add server manually using server-add
|
||||
self.attrs = dict(
|
||||
dn=self.dn,
|
||||
cn=[self.server_name],
|
||||
iparepltopomanagedsuffix_topologysuffix=[u'domain', u'ca'],
|
||||
objectclass=[
|
||||
u"ipalocationmember",
|
||||
u"ipaReplTopoManagedServer",
|
||||
u"top",
|
||||
u"ipaConfigObject",
|
||||
u"nsContainer",
|
||||
u"ipaSupportedDomainLevelConfig"
|
||||
],
|
||||
ipamaxdomainlevel=[u"1"],
|
||||
ipamindomainlevel=[u"0"],
|
||||
)
|
||||
self.exists = True
|
||||
|
||||
def make_retrieve_command(self, all=False, raw=False):
|
||||
"""Make function that retrieves this server using server-show"""
|
||||
return self.make_command(
|
||||
'server_show', self.name, all=all, raw=raw
|
||||
)
|
||||
|
||||
def make_find_command(self, *args, **kwargs):
|
||||
"""Make function that finds servers using server-find"""
|
||||
return self.make_command('server_find', *args, **kwargs)
|
||||
|
||||
def make_update_command(self, updates):
|
||||
"""Make function that modifies the server using server-mod"""
|
||||
return self.make_command('server_mod', self.name, **updates)
|
||||
|
||||
def check_retrieve(self, result, all=False, raw=False):
|
||||
"""Check `server-show` command result"""
|
||||
if all:
|
||||
expected = self.filter_attrs(self.retrieve_all_keys)
|
||||
else:
|
||||
expected = self.filter_attrs(self.retrieve_keys)
|
||||
assert_deepequal(dict(
|
||||
value=self.server_name,
|
||||
summary=None,
|
||||
result=expected,
|
||||
), result)
|
||||
|
||||
def check_find(self, result, all=False, raw=False):
|
||||
"""Check `server-find` command result"""
|
||||
if all:
|
||||
expected = self.filter_attrs(self.find_all_keys)
|
||||
else:
|
||||
expected = self.filter_attrs(self.find_keys)
|
||||
assert_deepequal(dict(
|
||||
count=1,
|
||||
truncated=False,
|
||||
summary=u'1 IPA server matched',
|
||||
result=[expected],
|
||||
), result)
|
||||
|
||||
def check_find_nomatch(self, result):
|
||||
""" Check 'server-find' command result when no match is expected """
|
||||
assert_deepequal(dict(
|
||||
count=0,
|
||||
truncated=False,
|
||||
summary=u'0 IPA servers matched',
|
||||
result=[],
|
||||
), result)
|
||||
|
||||
def check_update(self, result, extra_keys=()):
|
||||
"""Check `server-update` command result"""
|
||||
assert_deepequal(dict(
|
||||
value=self.server_name,
|
||||
summary=u'Modified IPA server "{server}"'.format(server=self.name),
|
||||
result=self.filter_attrs(self.update_keys | set(extra_keys))
|
||||
), result)
|
Loading…
Reference in New Issue
Block a user