mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Add HBAC plugin and introduce GeneralizedTime parameter type.
This commit is contained in:
parent
dac224c25a
commit
a6eb928f98
@ -34,6 +34,12 @@ objectClass: top
|
||||
objectClass: nsContainer
|
||||
cn: computers
|
||||
|
||||
dn: cn=hbac,$SUFFIX
|
||||
changetype: add
|
||||
objectClass: top
|
||||
objectClass: nsContainer
|
||||
cn: hbac
|
||||
|
||||
dn: cn=etc,$SUFFIX
|
||||
changetype: add
|
||||
objectClass: nsContainer
|
||||
|
@ -870,7 +870,7 @@ from frontend import Command, LocalOrRemote, Application
|
||||
from frontend import Object, Method, Property
|
||||
from crud import Create, Retrieve, Update, Delete, Search
|
||||
from parameters import DefaultFrom, Bool, Flag, Int, Float, Bytes, Str, Password,List
|
||||
from parameters import BytesEnum, StrEnum
|
||||
from parameters import BytesEnum, StrEnum, GeneralizedTime
|
||||
from errors import SkipPluginModule
|
||||
|
||||
# We can't import the python uuid since it includes ctypes which makes
|
||||
|
@ -1201,6 +1201,172 @@ class List(Param):
|
||||
def _validate_scalar(self, value, index=None):
|
||||
return
|
||||
|
||||
|
||||
class GeneralizedTime(Str):
|
||||
"""
|
||||
Generalized time parameter type.
|
||||
|
||||
Accepts values conforming to generalizedTime as defined in RFC 4517
|
||||
section 3.3.13 without time zone information.
|
||||
"""
|
||||
def _check_HHMM(self, t):
|
||||
if len(t) != 4:
|
||||
raise ValueError('HHMM must be exactly 4 characters long')
|
||||
if not t.isnumeric():
|
||||
raise ValueError('HHMM non-numeric')
|
||||
hh = int(t[0:2])
|
||||
if hh < 0 or hh > 23:
|
||||
raise ValueError('HH out of range')
|
||||
mm = int(t[2:4])
|
||||
if mm < 0 or mm > 59:
|
||||
raise ValueError('MM out of range')
|
||||
|
||||
def _check_dotw(self, t):
|
||||
if t.isnumeric():
|
||||
value = int(t)
|
||||
if value < 1 or value > 7:
|
||||
raise ValueError('day of the week out of range')
|
||||
elif t not in ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'):
|
||||
raise ValueError('invalid day of the week')
|
||||
|
||||
def _check_dotm(self, t, month_num=1, year=4):
|
||||
if not t.isnumeric():
|
||||
raise ValueError('day of the month non-numeric')
|
||||
value = int(t)
|
||||
if month_num in (1, 3, 5, 7, 8, 10, 12):
|
||||
if value < 1 or value > 31:
|
||||
raise ValueError('day of the month out of range')
|
||||
elif month_num in (4, 6, 9, 11):
|
||||
if value < 1 or value > 30:
|
||||
raise ValueError('day of the month out of range')
|
||||
elif month_num == 2:
|
||||
if year % 4 == 0 and (year % 100 != 0 or year % 400 == 0):
|
||||
if value < 1 or value > 29:
|
||||
raise ValueError('day of the month out of range')
|
||||
else:
|
||||
if value < 1 or value > 28:
|
||||
raise ValueError('day of the month out of range')
|
||||
|
||||
def _check_wotm(self, t):
|
||||
if not t.isnumeric():
|
||||
raise ValueError('week of the month non-numeric')
|
||||
value = int(t)
|
||||
if value < 1 or value > 4:
|
||||
raise ValueError('week of the month out of range')
|
||||
|
||||
def _check_woty(self, t):
|
||||
if not t.isnumeric():
|
||||
raise ValueError('week of the year non-numeric')
|
||||
value = int(t)
|
||||
if value < 1 or value > 52:
|
||||
raise ValueError('week of the year out of range')
|
||||
|
||||
def _check_month_num(self, t):
|
||||
if not t.isnumeric():
|
||||
raise ValueError('month number non-numeric')
|
||||
value = int(t)
|
||||
if value < 1 or value > 12:
|
||||
raise ValueError('month number out of range')
|
||||
|
||||
def _check_interval(self, t, check_func):
|
||||
intervals = t.split(',')
|
||||
for i in intervals:
|
||||
if not i:
|
||||
raise ValueError('invalid time range')
|
||||
values = i.split('-')
|
||||
if len(values) > 2:
|
||||
raise ValueError('invalid time range')
|
||||
for v in values:
|
||||
check_func(v)
|
||||
if len(values) == 2:
|
||||
if int(v[0]) > int(v[1]):
|
||||
raise ValueError('invalid time range')
|
||||
|
||||
def _check_W_spec(self, ts, index):
|
||||
if ts[index] != 'day':
|
||||
raise ValueError('invalid week specifier')
|
||||
index += 1
|
||||
self._check_interval(ts[index], self._check_dotw)
|
||||
return index
|
||||
|
||||
def _check_M_spec(self, ts, index):
|
||||
if ts[index] == 'week':
|
||||
self._check_interval(ts[index + 1], self._check_wotm)
|
||||
index = self._check_W_spec(ts, index + 2)
|
||||
elif ts[index] == 'day':
|
||||
index += 1
|
||||
self._check_interval(ts[index], self._check_dotm)
|
||||
else:
|
||||
raise ValueError('invalid month specifier')
|
||||
return index
|
||||
|
||||
def _check_Y_spec(self, ts, index):
|
||||
if ts[index] == 'month':
|
||||
index += 1
|
||||
self._check_interval(ts[index], self._check_month_num)
|
||||
month_num = int(ts[index])
|
||||
index = self._check_M_spec(ts, index + 1, month_num)
|
||||
elif ts[index] == 'week':
|
||||
self._check_interval(ts[index + 1], self._check_woty)
|
||||
index = self._check_W_spec(ts, index + 2)
|
||||
elif ts[index] == 'day':
|
||||
index += 1
|
||||
self._check_interval(ts[index], self._check_doty)
|
||||
else:
|
||||
raise ValueError('invalid year specifier')
|
||||
return index
|
||||
|
||||
def _check_generalized(self, t):
|
||||
if len(t) not in (10, 12, 14):
|
||||
raise ValueError('incomplete generalized time')
|
||||
if not t.isnumeric():
|
||||
raise ValueError('generalized time non-numeric')
|
||||
# don't check year value, with time travel and all :)
|
||||
self._check_month_num(t[4:6])
|
||||
year_num = int(t[0:4])
|
||||
month_num = int(t[4:6])
|
||||
self._check_dotm(t[6:8], month_num, year_num)
|
||||
if len(t) >= 12:
|
||||
self._check_HHMM(t[8:12])
|
||||
else:
|
||||
self._check_HHMM('%s00' % t[8:10])
|
||||
if len(t) == 14:
|
||||
s = int(t[12:14])
|
||||
if s < 0 or s > 60:
|
||||
raise ValueError('seconds out of range')
|
||||
|
||||
def _check(self, time):
|
||||
ts = time.split()
|
||||
if ts[0] == 'absolute':
|
||||
self._check_generalized(ts[1])
|
||||
if ts[2] != '~':
|
||||
raise ValueError('invalid time range separator')
|
||||
self._check_generalized(ts[3])
|
||||
if int(ts[1]) >= int(ts[3]):
|
||||
raise ValueError('invalid generalized time range')
|
||||
elif ts[0] == 'periodic':
|
||||
if ts[1] == 'yearly':
|
||||
index = self._check_Y_spec(ts, 2)
|
||||
elif ts[1] == 'monthly':
|
||||
index = self._check_M_spec(ts, 2)
|
||||
elif ts[1] == 'daily':
|
||||
index = 1
|
||||
self._check_interval(ts[index + 1], self._check_HHMM)
|
||||
else:
|
||||
raise ValueError('time neither absolute or periodic')
|
||||
|
||||
def _rule_required(self, _, value):
|
||||
try:
|
||||
self._check(value)
|
||||
except ValueError, e:
|
||||
raise ValidationError(name=self.cli_name, error=e.message)
|
||||
except IndexError:
|
||||
raise ValidationError(
|
||||
name=self.cli_name, errors='incomplete time value'
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def create_param(spec):
|
||||
"""
|
||||
Create an `Str` instance from the shorthand ``spec``.
|
||||
|
260
ipalib/plugins/hbac.py
Normal file
260
ipalib/plugins/hbac.py
Normal file
@ -0,0 +1,260 @@
|
||||
# Authors:
|
||||
# Pavel Zuna <pzuna@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2009 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License as
|
||||
# published by the Free Software Foundation; version 2 only
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
"""
|
||||
Host based access control
|
||||
"""
|
||||
|
||||
from ipalib import api, errors
|
||||
from ipalib import GeneralizedTime, Password, Str, StrEnum
|
||||
from ipalib.plugins.baseldap import *
|
||||
|
||||
class hbac(LDAPObject):
|
||||
"""
|
||||
HBAC object.
|
||||
"""
|
||||
container_dn = api.env.container_hbac
|
||||
object_name = 'HBAC rule'
|
||||
object_name_plural = 'HBAC rules'
|
||||
object_class = ['ipaassociation', 'ipahbacrule']
|
||||
default_attributes = [
|
||||
'cn', 'accessruletype', 'ipaenabledflag', 'servicename',
|
||||
'accesstime', 'description',
|
||||
|
||||
]
|
||||
uuid_attribute = 'ipauniqueid'
|
||||
attribute_names = {
|
||||
'cn': 'name',
|
||||
'accessruletype': 'type',
|
||||
'ipaenabledflag': 'status',
|
||||
'servicename': 'service',
|
||||
'ipauniqueid': 'unique id',
|
||||
'memberuser user': 'affected users',
|
||||
'memberuser group': 'affected groups',
|
||||
'memberhost host': 'affected hosts',
|
||||
'memberhost hostgroup': 'affected hostgroups',
|
||||
'sourcehost host': 'affected source hosts',
|
||||
'sourcehost hostgroup': 'affected source hostgroups',
|
||||
}
|
||||
attribute_order = ['cn', 'accessruletype', 'ipaenabledflag', 'servicename']
|
||||
attribute_members = {
|
||||
'memberuser': ['user', 'group'],
|
||||
'memberhost': ['host', 'hostgroup'],
|
||||
'sourcehost': ['host', 'hostgroup'],
|
||||
}
|
||||
|
||||
takes_params = (
|
||||
Str('cn',
|
||||
cli_name='name',
|
||||
doc='rule name',
|
||||
primary_key=True,
|
||||
),
|
||||
StrEnum('accessruletype',
|
||||
cli_name='type',
|
||||
doc='rule type (allow or deny)',
|
||||
values=(u'allow', u'deny'),
|
||||
),
|
||||
Str('servicename?',
|
||||
cli_name='service',
|
||||
doc='name of service the rule applies to (e.g. ssh)',
|
||||
),
|
||||
GeneralizedTime('accesstime?',
|
||||
cli_name='time',
|
||||
doc='access time in generalizedTime format (RFC 4517)',
|
||||
),
|
||||
Str('description?',
|
||||
cli_name='desc',
|
||||
doc='description',
|
||||
),
|
||||
)
|
||||
|
||||
def get_dn(self, *keys, **kwargs):
|
||||
try:
|
||||
(dn, entry_attrs) = self.backend.find_entry_by_attr(
|
||||
self.primary_key.name, keys[-1], self.object_class, [''],
|
||||
self.container_dn
|
||||
)
|
||||
except errors.NotFound:
|
||||
dn = super(hbac, self).get_dn(*keys, **kwargs)
|
||||
return dn
|
||||
|
||||
def get_primary_key_from_dn(self, dn):
|
||||
pkey = self.primary_key.name
|
||||
(dn, entry_attrs) = self.backend.get_entry(dn, [pkey])
|
||||
return entry_attrs.get(pkey, '')
|
||||
|
||||
api.register(hbac)
|
||||
|
||||
|
||||
class hbac_add(LDAPCreate):
|
||||
"""
|
||||
Create new HBAC rule.
|
||||
"""
|
||||
def pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
|
||||
if not dn.startswith('cn='):
|
||||
msg = 'HBAC rule with name "%s" already exists' % keys[-1]
|
||||
raise errors.DuplicateEntry(message=msg)
|
||||
# HBAC rules are enabled by default
|
||||
entry_attrs['ipaenabledflag'] = 'enabled'
|
||||
return ldap.make_dn(
|
||||
entry_attrs, self.obj.uuid_attribute, self.obj.container_dn
|
||||
)
|
||||
|
||||
api.register(hbac_add)
|
||||
|
||||
|
||||
class hbac_del(LDAPDelete):
|
||||
"""
|
||||
Delete HBAC rule.
|
||||
"""
|
||||
|
||||
api.register(hbac_del)
|
||||
|
||||
|
||||
class hbac_mod(LDAPUpdate):
|
||||
"""
|
||||
Modify HBAC rule.
|
||||
"""
|
||||
|
||||
api.register(hbac_mod)
|
||||
|
||||
|
||||
class hbac_find(LDAPSearch):
|
||||
"""
|
||||
Search for HBAC rules.
|
||||
"""
|
||||
|
||||
api.register(hbac_find)
|
||||
|
||||
|
||||
class hbac_show(LDAPRetrieve):
|
||||
"""
|
||||
Dispaly HBAC rule.
|
||||
"""
|
||||
|
||||
api.register(hbac_show)
|
||||
|
||||
|
||||
class hbac_enable(LDAPQuery):
|
||||
"""
|
||||
Enable HBAC rule.
|
||||
"""
|
||||
def execute(self, cn):
|
||||
ldap = self.obj.backend
|
||||
|
||||
dn = self.obj.get_dn(cn)
|
||||
entry_attrs = {'ipaenabledflag': 'enabled'}
|
||||
|
||||
try:
|
||||
ldap.update_entry(dn, entry_attrs)
|
||||
except errors.EmptyModlist:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
def output_for_cli(self, textui, result, cn):
|
||||
textui.print_name(self.name)
|
||||
textui.print_dashed('Enabled HBAC rule "%s".' % cn)
|
||||
|
||||
api.register(hbac_enable)
|
||||
|
||||
|
||||
class hbac_disable(LDAPQuery):
|
||||
"""
|
||||
Disable HBAC rule.
|
||||
"""
|
||||
def execute(self, cn):
|
||||
ldap = self.obj.backend
|
||||
|
||||
dn = self.obj.get_dn(cn)
|
||||
entry_attrs = {'ipaenabledflag': 'disabled'}
|
||||
|
||||
try:
|
||||
ldap.update_entry(dn, entry_attrs)
|
||||
except errors.EmptyModlist:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
def output_for_cli(self, textui, result, cn):
|
||||
textui.print_name(self.name)
|
||||
textui.print_dashed('Disabled HBAC rule "%s".' % cn)
|
||||
|
||||
api.register(hbac_disable)
|
||||
|
||||
|
||||
class hbac_add_user(LDAPAddMember):
|
||||
"""
|
||||
Add users and groups affected by HBAC rule.
|
||||
"""
|
||||
member_attributes = ['memberuser']
|
||||
member_count_out = ('%i object added.', '%i objects added.')
|
||||
|
||||
api.register(hbac_add_user)
|
||||
|
||||
|
||||
class hbac_remove_user(LDAPRemoveMember):
|
||||
"""
|
||||
Remove users and groups affected by HBAC rule.
|
||||
"""
|
||||
member_attributes = ['memberuser']
|
||||
member_count_out = ('%i object removed.', '%i objects removed.')
|
||||
|
||||
api.register(hbac_remove_user)
|
||||
|
||||
|
||||
class hbac_add_host(LDAPAddMember):
|
||||
"""
|
||||
Add hosts and hostgroups affected by HBAC rule.
|
||||
"""
|
||||
member_attributes = ['memberhost']
|
||||
member_count_out = ('%i object added.', '%i objects added.')
|
||||
|
||||
api.register(hbac_add_host)
|
||||
|
||||
|
||||
class hbac_remove_host(LDAPRemoveMember):
|
||||
"""
|
||||
Remove hosts and hostgroups affected by HBAC rule.
|
||||
"""
|
||||
member_attributes = ['memberhost']
|
||||
member_count_out = ('%i object removed.', '%i objects removed.')
|
||||
|
||||
api.register(hbac_remove_host)
|
||||
|
||||
|
||||
class hbac_add_sourcehost(LDAPAddMember):
|
||||
"""
|
||||
Add source hosts and hostgroups affected by HBAC rule.
|
||||
"""
|
||||
member_attributes = ['sourcehost']
|
||||
member_count_out = ('%i object added.', '%i objects added.')
|
||||
|
||||
api.register(hbac_add_sourcehost)
|
||||
|
||||
|
||||
class hbac_remove_sourcehost(LDAPRemoveMember):
|
||||
"""
|
||||
Remove source hosts and hostgroups affected by HBAC rule.
|
||||
"""
|
||||
member_attributes = ['sourcehost']
|
||||
member_count_out = ('%i object removed.', '%i objects removed.')
|
||||
|
||||
api.register(hbac_remove_sourcehost)
|
||||
|
||||
|
305
tests/test_xmlrpc/test_hbac_plugin.py
Normal file
305
tests/test_xmlrpc/test_hbac_plugin.py
Normal file
@ -0,0 +1,305 @@
|
||||
# Authors:
|
||||
# Pavel Zuna <pzuna@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2009 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License as
|
||||
# published by the Free Software Foundation; version 2 only
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
"""
|
||||
Test the `ipalib/plugins/hbac.py` module.
|
||||
"""
|
||||
|
||||
from xmlrpc_test import XMLRPC_test, assert_attr_equal
|
||||
from ipalib import api
|
||||
from ipalib import errors
|
||||
|
||||
|
||||
class test_hbac(XMLRPC_test):
|
||||
"""
|
||||
Test the `hbac` plugin.
|
||||
"""
|
||||
rule_name = u'testing_rule1234'
|
||||
rule_type = u'allow'
|
||||
rule_type_fail = u'value not allowed'
|
||||
rule_service = u'ssh'
|
||||
rule_time = u'absolute 20081010000000 ~ 20081015120000'
|
||||
# wrong time, has 30th day in February in first date
|
||||
rule_time_fail = u'absolute 20080230000000 ~ 20081015120000'
|
||||
rule_desc = u'description'
|
||||
rule_desc_mod = u'description modified'
|
||||
|
||||
test_user = u'hbac_test_user'
|
||||
test_group = u'hbac_test_group'
|
||||
test_host = u'hbac._test_netgroup'
|
||||
test_hostgroup = u'hbac_test_hostgroup'
|
||||
test_sourcehost = u'hbac._test_src_host'
|
||||
test_sourcehostgroup = u'hbac_test_src_hostgroup'
|
||||
|
||||
def test_0_hbac_add(self):
|
||||
"""
|
||||
Test adding a new HBAC rule using `xmlrpc.hbac_add`.
|
||||
"""
|
||||
(dn, res) = api.Command['hbac_add'](
|
||||
self.rule_name, accessruletype=self.rule_type,
|
||||
servicename=self.rule_service, accesstime=self.rule_time,
|
||||
description=self.rule_desc
|
||||
)
|
||||
assert res
|
||||
assert_attr_equal(res, 'cn', self.rule_name)
|
||||
assert_attr_equal(res, 'accessruletype', self.rule_type)
|
||||
assert_attr_equal(res, 'servicename', self.rule_service)
|
||||
assert_attr_equal(res, 'ipaenabledflag', 'enabled')
|
||||
assert_attr_equal(res, 'accesstime', self.rule_time)
|
||||
assert_attr_equal(res, 'description', self.rule_desc)
|
||||
|
||||
def test_1_hbac_add(self):
|
||||
"""
|
||||
Test adding an existing HBAC rule using `xmlrpc.hbac_add'.
|
||||
"""
|
||||
try:
|
||||
(dn, res) = api.Command['hbac_add'](
|
||||
self.rule_name, accessruletype=self.rule_type
|
||||
)
|
||||
except errors.DuplicateEntry:
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
|
||||
def test_2_hbac_show(self):
|
||||
"""
|
||||
Test displaying a HBAC rule using `xmlrpc.hbac_show`.
|
||||
"""
|
||||
(dn, res) = api.Command['hbac_show'](self.rule_name)
|
||||
assert res
|
||||
assert_attr_equal(res, 'cn', self.rule_name)
|
||||
assert_attr_equal(res, 'accessruletype', self.rule_type)
|
||||
assert_attr_equal(res, 'servicename', self.rule_service)
|
||||
assert_attr_equal(res, 'ipaenabledflag', 'enabled')
|
||||
assert_attr_equal(res, 'accesstime', self.rule_time)
|
||||
assert_attr_equal(res, 'description', self.rule_desc)
|
||||
|
||||
def test_3_hbac_mod(self):
|
||||
"""
|
||||
Test modifying a HBAC rule using `xmlrpc.hbac_mod`.
|
||||
"""
|
||||
(dn, res) = api.Command['hbac_mod'](
|
||||
self.rule_name, description=self.rule_desc_mod
|
||||
)
|
||||
assert res
|
||||
assert_attr_equal(res, 'description', self.rule_desc_mod)
|
||||
|
||||
def test_4_hbac_mod(self):
|
||||
"""
|
||||
Test setting invalid type of HBAC rule using `xmlrpc.hbac_mod`.
|
||||
"""
|
||||
try:
|
||||
(dn, res) = api.Command['hbac_mod'](
|
||||
self.rule_name, accessruletype=self.rule_type_fail
|
||||
)
|
||||
except errors.ValidationError:
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
|
||||
def test_5_hbac_mod(self):
|
||||
"""
|
||||
Test setting invalid time in HBAC rule using `xmlrpc.hbac_mod`.
|
||||
"""
|
||||
try:
|
||||
(dn, res) = api.Command['hbac_mod'](
|
||||
self.rule_name, accesstime=self.rule_time_fail
|
||||
)
|
||||
except errors.ValidationError:
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
|
||||
def test_6_hbac_find(self):
|
||||
"""
|
||||
Test searching for HBAC rules using `xmlrpc.hbac_find`.
|
||||
"""
|
||||
(res, truncated) = api.Command['hbac_find'](
|
||||
name=self.rule_name, accessruletype=self.rule_type,
|
||||
description=self.rule_desc_mod
|
||||
)
|
||||
assert res
|
||||
assert res[0]
|
||||
assert_attr_equal(res[0][1], 'cn', self.rule_name)
|
||||
assert_attr_equal(res[0][1], 'accessruletype', self.rule_type)
|
||||
assert_attr_equal(res[0][1], 'description', self.rule_desc_mod)
|
||||
|
||||
def test_7_hbac_init_testing_data(self):
|
||||
"""
|
||||
Initialize data for more HBAC plugin testing.
|
||||
"""
|
||||
api.Command['user_add'](self.test_user, givenname=u'first', sn=u'last')
|
||||
api.Command['group_add'](self.test_group, description=u'description')
|
||||
api.Command['host_add'](self.test_host)
|
||||
api.Command['hostgroup_add'](
|
||||
self.test_hostgroup, description=u'description'
|
||||
)
|
||||
api.Command['host_add'](self.test_sourcehost)
|
||||
api.Command['hostgroup_add'](
|
||||
self.test_sourcehostgroup, description=u'desc'
|
||||
)
|
||||
|
||||
def test_8_hbac_add_user(self):
|
||||
"""
|
||||
Test adding user and group to HBAC rule using `xmlrpc.hbac_add_user`.
|
||||
"""
|
||||
(completed, failed, res) = api.Command['hbac_add_user'](
|
||||
self.rule_name, user=self.test_user, group=self.test_group
|
||||
)
|
||||
assert completed == 2
|
||||
assert 'memberuser' in failed
|
||||
assert 'user' in failed['memberuser']
|
||||
assert not failed['memberuser']['user']
|
||||
assert 'group' in failed['memberuser']
|
||||
assert not failed['memberuser']['group']
|
||||
assert res
|
||||
assert_attr_equal(res[1], 'memberuser user', self.test_user)
|
||||
assert_attr_equal(res[1], 'memberuser group', self.test_group)
|
||||
|
||||
def test_9_hbac_remove_user(self):
|
||||
"""
|
||||
Test removing user and group from HBAC rule using `xmlrpc.hbac_remove_user'.
|
||||
"""
|
||||
(completed, failed, res) = api.Command['hbac_remove_user'](
|
||||
self.rule_name, user=self.test_user, group=self.test_group
|
||||
)
|
||||
assert completed == 2
|
||||
assert 'memberuser' in failed
|
||||
assert 'user' in failed['memberuser']
|
||||
assert not failed['memberuser']['user']
|
||||
assert 'group' in failed['memberuser']
|
||||
assert not failed['memberuser']['group']
|
||||
assert res
|
||||
assert 'memberuser user' not in res[1]
|
||||
assert 'memberuser group' not in res[1]
|
||||
|
||||
def test_a_hbac_add_host(self):
|
||||
"""
|
||||
Test adding host and hostgroup to HBAC rule using `xmlrpc.hbac_add_host`.
|
||||
"""
|
||||
(completed, failed, res) = api.Command['hbac_add_host'](
|
||||
self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
|
||||
)
|
||||
assert completed == 2
|
||||
assert 'memberhost' in failed
|
||||
assert 'host' in failed['memberhost']
|
||||
assert not failed['memberhost']['host']
|
||||
assert 'hostgroup' in failed['memberhost']
|
||||
assert not failed['memberhost']['hostgroup']
|
||||
assert res
|
||||
assert_attr_equal(res[1], 'memberhost host', self.test_host)
|
||||
assert_attr_equal(res[1], 'memberhost hostgroup', self.test_hostgroup)
|
||||
|
||||
def test_b_hbac_remove_host(self):
|
||||
"""
|
||||
Test removing host and hostgroup from HBAC rule using `xmlrpc.hbac_remove_host`.
|
||||
"""
|
||||
(completed, failed, res) = api.Command['hbac_remove_host'](
|
||||
self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
|
||||
)
|
||||
assert completed == 2
|
||||
assert 'memberhost' in failed
|
||||
assert 'host' in failed['memberhost']
|
||||
assert not failed['memberhost']['host']
|
||||
assert 'hostgroup' in failed['memberhost']
|
||||
assert not failed['memberhost']['hostgroup']
|
||||
assert res
|
||||
assert 'memberhost host' not in res[1]
|
||||
assert 'memberhost hostgroup' not in res[1]
|
||||
|
||||
def test_a_hbac_add_sourcehost(self):
|
||||
"""
|
||||
Test adding source host and hostgroup to HBAC rule using `xmlrpc.hbac_add_host`.
|
||||
"""
|
||||
(completed, failed, res) = api.Command['hbac_add_sourcehost'](
|
||||
self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
|
||||
)
|
||||
assert completed == 2
|
||||
assert 'sourcehost' in failed
|
||||
assert 'host' in failed['sourcehost']
|
||||
assert not failed['sourcehost']['host']
|
||||
assert 'hostgroup' in failed['sourcehost']
|
||||
assert not failed['sourcehost']['hostgroup']
|
||||
assert res
|
||||
assert_attr_equal(res[1], 'sourcehost host', self.test_host)
|
||||
assert_attr_equal(res[1], 'sourcehost hostgroup', self.test_hostgroup)
|
||||
|
||||
def test_b_hbac_remove_host(self):
|
||||
"""
|
||||
Test removing source host and hostgroup from HBAC rule using `xmlrpc.hbac_remove_host`.
|
||||
"""
|
||||
(completed, failed, res) = api.Command['hbac_remove_sourcehost'](
|
||||
self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
|
||||
)
|
||||
assert completed == 2
|
||||
assert 'sourcehost' in failed
|
||||
assert 'host' in failed['sourcehost']
|
||||
assert not failed['sourcehost']['host']
|
||||
assert 'hostgroup' in failed['sourcehost']
|
||||
assert not failed['sourcehost']['hostgroup']
|
||||
assert res
|
||||
assert 'sourcehost host' not in res[1]
|
||||
assert 'sourcehost hostgroup' not in res[1]
|
||||
|
||||
def test_c_hbac_clear_testing_data(self):
|
||||
"""
|
||||
Clear data for HBAC plugin testing.
|
||||
"""
|
||||
api.Command['user_del'](self.test_user)
|
||||
api.Command['group_del'](self.test_group)
|
||||
api.Command['host_del'](self.test_host)
|
||||
api.Command['hostgroup_del'](self.test_hostgroup)
|
||||
api.Command['host_del'](self.test_sourcehost)
|
||||
api.Command['hostgroup_del'](self.test_sourcehostgroup)
|
||||
|
||||
def test_d_hbac_disable(self):
|
||||
"""
|
||||
Test disabling HBAC rule using `xmlrpc.hbac_disable`.
|
||||
"""
|
||||
res = api.Command['hbac_disable'](self.rule_name)
|
||||
assert res == True
|
||||
# check it's really disabled
|
||||
(dn, res) = api.Command['hbac_show'](self.rule_name)
|
||||
assert res
|
||||
assert_attr_equal(res, 'ipaenabledflag', 'disabled')
|
||||
|
||||
def test_e_hbac_enabled(self):
|
||||
"""
|
||||
Test enabling HBAC rule using `xmlrpc.hbac_enable`.
|
||||
"""
|
||||
res = api.Command['hbac_enable'](self.rule_name)
|
||||
assert res == True
|
||||
# check it's really enabled
|
||||
(dn, res) = api.Command['hbac_show'](self.rule_name)
|
||||
assert res
|
||||
assert_attr_equal(res, 'ipaenabledflag', 'enabled')
|
||||
|
||||
def test_f_hbac_del(self):
|
||||
"""
|
||||
Test deleting a HBAC rule using `xmlrpc.hbac_remove_sourcehost`.
|
||||
"""
|
||||
res = api.Command['hbac_del'](self.rule_name)
|
||||
assert res == True
|
||||
# verify that it's gone
|
||||
try:
|
||||
api.Command['hbac_show'](self.rule_name)
|
||||
except errors.NotFound:
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
|
Loading…
Reference in New Issue
Block a user