Replace old pwpolicy plugin with new one using baseldap, fix tests.

Fix deletion of policy when a group is removed.
This commit is contained in:
Rob Crittenden 2010-05-14 15:58:34 -04:00
parent 58fed69768
commit 542768bec7
5 changed files with 267 additions and 932 deletions

View File

@ -129,7 +129,7 @@ class group_del(LDAPDelete):
def post_callback(self, ldap, dn, *keys, **options):
try:
api.Command['pwpolicy_del'](group=keys[-1])
api.Command['pwpolicy_del'](keys[-1])
except errors.NotFound:
pass

View File

@ -1,8 +1,7 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2008 Red Hat
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@ -17,472 +16,344 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Password policy
"""
from ipalib import api, crud, errors
from ipalib import Method, Object
from ipalib import api
from ipalib import Int, Str
from ipalib import output
from ipalib import _, ngettext
from ldap.functions import explode_dn
from ipalib.plugins.baseldap import *
from ipalib import _
_fields = {
'group': 'Group policy',
'krbminpwdlife': 'Minimum lifetime (in hours)',
'krbmaxpwdlife': 'Maximum lifetime (in days)',
'krbpwdmindiffchars': 'Minimum number of characters classes',
'krbpwdminlength': 'Minimum length',
'krbpwdhistorylength': 'History size',
}
_global=u'global'
def _convert_time_for_output(entry_attrs):
# Convert seconds to hours and days for displaying to user
if 'krbmaxpwdlife' in entry_attrs:
entry_attrs['krbmaxpwdlife'][0] = unicode(
int(entry_attrs['krbmaxpwdlife'][0]) / 86400
)
if 'krbminpwdlife' in entry_attrs:
entry_attrs['krbminpwdlife'][0] = unicode(
int(entry_attrs['krbminpwdlife'][0]) / 3600
)
def _convert_time_on_input(entry_attrs):
# Convert hours and days to seconds for writing to LDAP
if 'krbmaxpwdlife' in entry_attrs and entry_attrs['krbmaxpwdlife']:
entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400
if 'krbminpwdlife' in entry_attrs and entry_attrs['krbminpwdlife']:
entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600
def find_group_dn(group):
class cosentry(LDAPObject):
"""
Given a group name find the DN of that group
Class of Service object used for linking policies with groups
"""
try:
entry = api.Command['group_show'](group)['result']
except errors.NotFound:
raise errors.NotFound(reason="group '%s' does not exist" % group)
return entry['dn']
INTERNAL = True
def make_cos_entry(group, cospriority=None):
"""
Make the CoS dn and entry for this group.
container_dn = 'cn=costemplates,%s' % api.env.container_accounts
object_class = ['top', 'costemplate', 'extensibleobject', 'krbcontainer']
default_attributes = ['cn', 'cospriority', 'krbpwdpolicyreference']
Returns (cos_dn, cos_entry) where:
cos_dn = DN of the new CoS entry
cos_entry = entry representing this new object
"""
ldap = api.Backend.ldap2
groupdn = find_group_dn(group)
cos_entry = {}
if cospriority:
cos_entry['cospriority'] = cospriority
cos_entry['objectclass'] = ['top', 'costemplate', 'extensibleobject', 'krbcontainer']
cos_dn = ldap.make_dn_from_attr(
'cn', groupdn, 'cn=cosTemplates,%s' % api.env.container_accounts
takes_params = (
Str('cn', primary_key=True),
Str('krbpwdpolicyreference'),
Int('cospriority', minvalue=0),
)
return (cos_dn, cos_entry)
def make_policy_entry(group_cn, policy_entry):
"""
Make the krbpwdpolicy dn and entry for this group.
Returns (policy_dn, policy_entry) where:
policy_dn = DN of the new password policy entry
policy_entry = entry representing this new object
"""
ldap = api.Backend.ldap2
# This DN must *NOT* have spaces between elements
policy_dn = ldap.make_dn_from_attr(
'cn', api.env.realm, 'cn=kerberos,%s' % api.env.basedn
priority_not_unique_msg = _(
'priority must be a unique value (%(prio)d already used by %(gname)s)'
)
policy_dn = ldap.make_dn_from_attr('cn', group_cn, policy_dn)
# Create the krb password policy entry. This MUST be located
# in the same container as the REALM or the kldap plugin won't
# recognize it. The usual CoS trick of putting the whole DN into
# the dn won't work either because the kldap plugin doesn't like
# quotes in the DN.
policy_entry['objectclass'] = ['top', 'nscontainer', 'krbpwdpolicy']
policy_entry['cn'] = group_cn
return (policy_dn, policy_entry)
def find_group_policy(ldap):
"""
Return all group policy entries.
"""
attrs = ('cn','krbminpwdlife', 'krbmaxpwdlife', 'krbpwdmindiffchars', 'krbpwdminlength', 'krbpwdhistorylength',)
attr_filter = ldap.make_filter({'objectclass':'krbpwdpolicy'}, rules=ldap.MATCH_ALL)
try:
(entries, truncated) = ldap.find_entries(
attr_filter, attrs, 'cn=%s,cn=kerberos,%s' % (api.env.realm, api.env.basedn), scope=ldap.SCOPE_ONELEVEL
def get_dn(self, *keys, **options):
group_dn = self.api.Object.group.get_dn(keys[-1])
return self.backend.make_dn_from_attr(
'cn', group_dn, self.container_dn
)
except errors.NotFound:
(entries, truncated) = (tuple(), False)
return (entries, truncated)
def check_priority_uniqueness(self, *keys, **options):
if options.get('cospriority') is not None:
entries = self.methods.find(
cospriority=options['cospriority']
)['result']
if len(entries) > 0:
group_name = self.api.Object.group.get_primary_key_from_dn(
entries[0]['cn'][0]
)
raise errors.ValidationError(
name='priority',
error=self.priority_not_unique_msg % {
'prio': options['cospriority'],
'gname': group_name,
}
)
def unique_priority(ldap, priority):
api.register(cosentry)
class cosentry_add(LDAPCreate):
INTERNAL = True
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
# check for existence of the group
self.api.Command.group_show(keys[-1])
self.obj.check_priority_uniqueness(*keys, **options)
del entry_attrs['cn']
return dn
api.register(cosentry_add)
class cosentry_del(LDAPDelete):
INTERNAL = True
api.register(cosentry_del)
class cosentry_mod(LDAPUpdate):
INTERNAL = True
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
self.obj.check_priority_uniqueness(*keys, **options)
return dn
api.register(cosentry_mod)
class cosentry_show(LDAPRetrieve):
INTERNAL = True
api.register(cosentry_show)
class cosentry_find(LDAPSearch):
INTERNAL = True
api.register(cosentry_find)
GLOBAL_POLICY_NAME = u'GLOBAL'
class pwpolicy(LDAPObject):
"""
Return True if the given priority is unique, False otherwise
Having two cosPriority with the same value is undefined in the DS.
This isn't done as a validation on the attribute since we want it done
only on the server side.
"""
attrs = ('cospriority',)
attr_filter = ldap.make_filter({'objectclass':'krbcontainer', 'cospriority':priority }, rules=ldap.MATCH_ALL)
try:
(entries, truncated) = ldap.find_entries(
attr_filter, attrs, 'cn=cosTemplates,%s' % api.env.container_accounts, scope=ldap.SCOPE_ONELEVEL
)
return False
except errors.NotFound:
return True
return True
class pwpolicy(Object):
"""
Password Policy object.
Password Policy object
"""
container_dn = 'cn=%s,cn=kerberos' % api.env.realm
object_name = 'password policy'
object_name_plural = 'password policies'
object_class = ['top', 'nscontainer', 'krbpwdpolicy']
default_attributes = [
'cn', 'cospriority', 'krbmaxpwdlife', 'krbminpwdlife',
'krbpwdhistorylength', 'krbpwdmindiffchars', 'krbpwdminlength',
]
takes_params = (
Str('cn?',
cli_name='group',
label=_('Group'),
flags=['no_create', 'no_update', 'no_search'],
doc=_('Manage password policy for specific group'),
primary_key=True,
),
Int('krbmaxpwdlife?',
cli_name='maxlife',
label=_('Max lifetime (days)'),
doc=_('Maximum password lifetime (in days)'),
minvalue=0,
attribute=True,
),
Int('krbminpwdlife?',
cli_name='minlife',
label=_('Min lifetime (hours)'),
doc=_('Minimum password lifetime (in hours)'),
minvalue=0,
attribute=True,
),
Int('krbpwdhistorylength?',
cli_name='history',
label=_('History size'),
doc=_('Password history size'),
minvalue=0,
attribute=True,
),
Int('krbpwdmindiffchars?',
cli_name='minclasses',
label=_('Character classes'),
doc=_('Minimum number of character classes'),
minvalue=0,
attribute=True,
maxvalue=5,
),
Int('krbpwdminlength?',
cli_name='minlength',
label=_('Min length'),
doc=_('Minimum length of password'),
minvalue=0,
attribute=True,
),
)
api.register(pwpolicy)
class pwpolicy_add(crud.Create):
"""
Create a new password policy associated with a group.
"""
msg_summary = _('Added policy for group "%(value)s"')
takes_options = (
Str('group',
label=_('Group'),
doc=_('Group to set policy for'),
attribute=False,
),
Int('cospriority',
cli_name='priority',
label=_('Priority'),
doc=_('Priority of the policy (higher number equals lower priority)'),
doc=_('Priority of the policy (higher number means lower priority'),
minvalue=0,
attribute=True,
),
)
def execute(self, *args, **options):
ldap = self.api.Backend.ldap2
def get_dn(self, *keys, **options):
if keys[-1] is not None:
return self.backend.make_dn_from_attr(
self.primary_key.name, keys[-1], self.container_dn
)
return self.api.env.container_accounts
group_cn = options['group']
def convert_time_for_output(self, entry_attrs, **options):
# Convert seconds to hours and days for displaying to user
if not options.get('raw', False):
if 'krbmaxpwdlife' in entry_attrs:
entry_attrs['krbmaxpwdlife'][0] = unicode(
int(entry_attrs['krbmaxpwdlife'][0]) / 86400
)
if 'krbminpwdlife' in entry_attrs:
entry_attrs['krbminpwdlife'][0] = unicode(
int(entry_attrs['krbminpwdlife'][0]) / 3600
)
if 'cospriority' in options:
if not unique_priority(ldap, options['cospriority']):
raise errors.ValidationError(name='priority', error=_('Priority must be a unique value.'))
def convert_time_on_input(self, entry_attrs):
# Convert hours and days to seconds for writing to LDAP
if 'krbmaxpwdlife' in entry_attrs and entry_attrs['krbmaxpwdlife']:
entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400
if 'krbminpwdlife' in entry_attrs and entry_attrs['krbminpwdlife']:
entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600
# Create the CoS template
(cos_dn, cos_entry) = make_cos_entry(group_cn, options.get('cospriority', None))
if 'cospriority' in options:
del options['cospriority']
api.register(pwpolicy)
# Create the new password policy
policy_entry = self.args_options_2_entry(*args, **options)
(policy_dn, policy_entry) = make_policy_entry(group_cn, policy_entry)
_convert_time_on_input(policy_entry)
# Link the two entries together
cos_entry['krbpwdpolicyreference'] = policy_dn
class pwpolicy_add(LDAPCreate):
"""
Create new group password policy.
"""
def get_args(self):
yield self.obj.primary_key.clone(attribute=True, required=True)
ldap.add_entry(policy_dn, policy_entry)
ldap.add_entry(cos_dn, cos_entry)
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
self.api.Command.cosentry_add(
keys[-1], krbpwdpolicyreference=dn,
cospriority=options.get('cospriority')
)
if 'cospriority' in entry_attrs:
del entry_attrs['cospriority']
self.obj.convert_time_on_input(entry_attrs)
return dn
# The policy is what is interesting, return that
(dn, entry_attrs) = ldap.get_entry(policy_dn, policy_entry.keys())
_convert_time_for_output(entry_attrs)
entry_attrs['dn'] = dn
return dict(result=entry_attrs, value=group_cn)
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
self.log.info('%r' % entry_attrs)
if not options.get('raw', False):
if options.get('cospriority') is not None:
entry_attrs['cospriority'] = [unicode(options['cospriority'])]
self.obj.convert_time_for_output(entry_attrs, **options)
return dn
api.register(pwpolicy_add)
class pwpolicy_mod(crud.Update):
class pwpolicy_del(LDAPDelete):
"""
Modify password policy.
Delete group password policy.
"""
msg_summary = _('Modified policy for group "%(value)s"')
takes_options = (
Str('group?',
label=_('Group'),
doc=_('Group to set policy for'),
attribute=False,
),
Int('cospriority?',
cli_name='priority',
label=_('Priority'),
doc=_('Priority of the policy (higher number equals lower priority)'),
minvalue=0,
attribute=True,
),
)
def get_args(self):
yield self.obj.primary_key.clone(attribute=True, required=True)
def execute(self, *args, **options):
assert 'dn' not in options
ldap = self.api.Backend.ldap2
cospriority = None
if 'group' in options:
group_cn = options['group']
del options['group']
else:
group_cn = None
if len(options) == 2: # 'all' and 'raw' are always sent
raise errors.EmptyModlist()
if not group_cn:
group_cn = _global
if 'cospriority' in options:
raise errors.ValidationError(name='priority', error=_('priority cannot be set on global policy'))
dn = self.api.env.container_accounts
entry_attrs = self.args_options_2_entry(*args, **options)
else:
if 'cospriority' in options:
if options['cospriority'] is None:
raise errors.RequirementError(name='priority')
if not unique_priority(ldap, options['cospriority']):
raise errors.ValidationError(name='priority', error=_('Priority must be a unique value.'))
groupdn = find_group_dn(group_cn)
cos_dn = ldap.make_dn_from_attr(
'cn', groupdn,
'cn=cosTemplates,%s' % self.api.env.container_accounts
)
ldap.update_entry(cos_dn, dict(cospriority = options['cospriority']))
cospriority = options['cospriority']
del options['cospriority']
entry_attrs = self.args_options_2_entry(*args, **options)
(dn, entry_attrs) = make_policy_entry(group_cn, entry_attrs)
_convert_time_on_input(entry_attrs)
def post_callback(self, ldap, dn, *keys, **options):
try:
ldap.update_entry(dn, entry_attrs)
except errors.EmptyModlist:
pass
(dn, entry_attrs) = ldap.get_entry(dn, entry_attrs.keys())
if cospriority:
entry_attrs['cospriority'] = cospriority
_convert_time_for_output(entry_attrs)
return dict(result=entry_attrs, value=group_cn)
api.register(pwpolicy_mod)
class pwpolicy_del(crud.Delete):
"""
Delete a group password policy.
"""
msg_summary = _('Deleted policy for group "%(value)s"')
takes_options = (
Str('group',
doc=_('Group to remove policy from'),
),
)
def execute(self, *args, **options):
assert 'dn' not in options
ldap = self.api.Backend.ldap2
group_cn = options['group']
# Get the DN of the CoS template to delete
try:
(cos_dn, cos_entry) = make_cos_entry(group_cn, None)
self.api.Command.cosentry_del(keys[-1])
except errors.NotFound:
# Ok, perhaps the group was deleted, try to make the group DN
rdn = ldap.make_rdn_from_attr('cn', group_cn)
group_dn = ldap.make_dn_from_rdn(rdn, api.env.container_group)
cos_dn = ldap.make_dn_from_attr(
'cn', group_dn,
'cn=cosTemplates,%s' % self.api.env.container_accounts
)
policy_entry = self.args_options_2_entry(*args, **options)
(policy_dn, policy_entry) = make_policy_entry(group_cn, policy_entry)
ldap.delete_entry(policy_dn)
ldap.delete_entry(cos_dn)
return dict(
result=True,
value=group_cn,
)
pass
return True
api.register(pwpolicy_del)
class pwpolicy_show(Method):
class pwpolicy_mod(LDAPUpdate):
"""
Display password policy.
Modify group password policy.
"""
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
if options.get('cospriority') is not None:
if keys[-1] is None:
raise errors.ValidationError(
name='priority',
error=_('priority cannot be set on global policy')
)
try:
self.api.Command.cosentry_mod(
keys[-1], cospriority=options['cospriority']
)
except errors.NotFound:
self.api.Command.cosentry_add(
keys[-1], krbpwdpolicyreference=dn,
cospriority=options['cospriority']
)
del entry_attrs['cospriority']
self.obj.convert_time_on_input(entry_attrs)
return dn
has_output = (
output.Entry('result'),
)
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
if not options.get('raw', False):
if options.get('cospriority') is not None:
entry_attrs['cospriority'] = [unicode(options['cospriority'])]
if keys[-1] is None:
entry_attrs['cn'] = GLOBAL_POLICY_NAME
self.obj.convert_time_for_output(entry_attrs, **options)
return dn
def exc_callback(self, keys, options, exc, call_func, *call_args, **call_kwargs):
if isinstance(exc, errors.EmptyModlist):
entry_attrs = call_args[1]
if not entry_attrs and 'cospriority' in options:
return
raise exc
api.register(pwpolicy_mod)
class pwpolicy_show(LDAPRetrieve):
"""
Display group password policy.
"""
takes_options = (
Str('group?',
label=_('Group'),
doc=_('Group to display policy'),
),
Str('user?',
label=_('User'),
doc=_('Display policy applied to a given user'),
),
Int('cospriority?',
cli_name='priority',
label=_('Priority'),
flags=['no_create', 'no_update', 'no_search'],
doc=_('Display effective policy for a specific user'),
),
)
def execute(self, *args, **options):
ldap = self.api.Backend.ldap2
def pre_callback(self, ldap, dn, attrs_list, *keys, **options):
if options.get('user') is not None:
user_entry = self.api.Command.user_show(
options['user'], all=True
)['result']
if 'krbpwdpolicyreference' in user_entry:
return user_entry.get('krbpwdpolicyreference', [dn])[0]
return dn
dn = None
group = None
if 'user' in options:
rdn = ldap.make_rdn_from_attr('uid', options['user'])
user_dn = ldap.make_dn_from_rdn(rdn, api.env.container_user)
try:
(user_dn, user_attrs) = ldap.get_entry(user_dn, ['krbpwdpolicyreference'])
if 'krbpwdpolicyreference' in user_attrs:
dn = user_attrs['krbpwdpolicyreference'][0]
rdn = explode_dn(dn)
group = rdn[0].replace('cn=','')
except errors.NotFound:
raise errors.NotFound(reason="user '%s' not found" % options['user'])
if dn is None:
if not 'group' in options:
dn = self.api.env.container_accounts
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
if not options.get('raw', False):
if keys[-1] is not None:
try:
cos_entry = self.api.Command.cosentry_show(
keys[-1]
)['result']
if cos_entry.get('cospriority') is not None:
entry_attrs['cospriority'] = cos_entry['cospriority']
except errors.NotFound:
pass
else:
policy_entry = self.args_options_2_entry(*args, **options)
(dn, policy_entry) = make_policy_entry(options['group'], policy_entry)
(dn, entry_attrs) = ldap.get_entry(dn)
if 'group' in options:
groupdn = find_group_dn(options['group'])
cos_dn = ldap.make_dn_from_attr(
'cn', groupdn,
'cn=cosTemplates,%s' % self.api.env.container_accounts
)
(dn, cos_attrs) = ldap.get_entry(cos_dn)
entry_attrs['cospriority'] = cos_attrs['cospriority']
else:
entry_attrs['cn'] = _global
if 'user' in options:
if group:
entry_attrs['cn'] = unicode(group)
_convert_time_for_output(entry_attrs)
return dict(result=entry_attrs)
entry_attrs['cn'] = GLOBAL_POLICY_NAME
self.obj.convert_time_for_output(entry_attrs, **options)
return dn
api.register(pwpolicy_show)
class pwpolicy_find(Method):
class pwpolicy_find(LDAPSearch):
"""
Display all groups with a password policy.
Search for group password policies.
"""
has_output = output.standard_list_of_entries
takes_options = (
Int('cospriority?',
cli_name='priority',
label=_('Priority'),
flags=['no_create', 'no_update', 'no_search'],
),
)
def execute(self, *args, **options):
ldap = self.api.Backend.ldap2
(entries, truncated) = find_group_policy(ldap)
for e in entries:
_convert_time_for_output(e[1])
e[1]['dn'] = e[0]
groupdn = find_group_dn(e[1]['cn'][0])
cos_dn = ldap.make_dn_from_attr(
'cn', groupdn,
'cn=cosTemplates,%s' % self.api.env.container_accounts
)
(dn, cos_attrs) = ldap.get_entry(cos_dn)
e[1]['cospriority'] = cos_attrs['cospriority']
entries = tuple(e for (dn, e) in entries)
return dict(result=entries,
count=len(entries),
truncated=truncated,
)
def post_callback(self, ldap, entries, truncated, *args, **options):
if not options.get('raw', False):
for e in entries:
try:
cos_entry = self.api.Command.cosentry_show(
e[1]['cn'][0]
)['result']
if cos_entry.get('cospriority') is not None:
e[1]['cospriority'] = cos_entry['cospriority']
except errors.NotFound:
pass
self.obj.convert_time_for_output(e[1], **options)
if not args[-1]:
global_entry = self.api.Command.pwpolicy_show(
all=options.get('all', False), raw=options.get('raw', False)
)['result']
dn = global_entry['dn']
del global_entry['dn']
entries.insert(0, (dn, global_entry))
api.register(pwpolicy_find)

View File

@ -1,359 +0,0 @@
# Authors:
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Password policy
"""
from ipalib import api
from ipalib import Int, Str
from ipalib.plugins.baseldap import *
from ipalib import _
class cosentry(LDAPObject):
"""
Class of Service object used for linking policies with groups
"""
INTERNAL = True
container_dn = 'cn=costemplates,%s' % api.env.container_accounts
object_class = ['top', 'costemplate', 'extensibleobject', 'krbcontainer']
default_attributes = ['cn', 'cospriority', 'krbpwdpolicyreference']
takes_params = (
Str('cn', primary_key=True),
Str('krbpwdpolicyreference'),
Int('cospriority', minvalue=0),
)
priority_not_unique_msg = _(
'priority must be a unique value (%(prio)d already used by %(gname)s)'
)
def get_dn(self, *keys, **options):
group_dn = self.api.Object.group.get_dn(keys[-1])
return self.backend.make_dn_from_attr(
'cn', group_dn, self.container_dn
)
def check_priority_uniqueness(self, *keys, **options):
if options.get('cospriority') is not None:
entries = self.methods.find(
cospriority=options['cospriority']
)['result']
if len(entries) > 0:
group_name = self.api.Object.group.get_primary_key_from_dn(
entries[0]['cn'][0]
)
raise errors.ValidationError(
name='priority',
error=self.priority_not_unique_msg % {
'prio': options['cospriority'],
'gname': group_name,
}
)
api.register(cosentry)
class cosentry_add(LDAPCreate):
INTERNAL = True
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
# check for existence of the group
self.api.Command.group_show(keys[-1])
self.obj.check_priority_uniqueness(*keys, **options)
del entry_attrs['cn']
return dn
api.register(cosentry_add)
class cosentry_del(LDAPDelete):
INTERNAL = True
api.register(cosentry_del)
class cosentry_mod(LDAPUpdate):
INTERNAL = True
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
self.obj.check_priority_uniqueness(*keys, **options)
return dn
api.register(cosentry_mod)
class cosentry_show(LDAPRetrieve):
INTERNAL = True
api.register(cosentry_show)
class cosentry_find(LDAPSearch):
INTERNAL = True
api.register(cosentry_find)
GLOBAL_POLICY_NAME = u'GLOBAL'
class pwpolicy2(LDAPObject):
"""
Password Policy object
"""
container_dn = 'cn=%s,cn=kerberos' % api.env.realm
object_name = 'password policy'
object_name_plural = 'password policies'
object_class = ['top', 'nscontainer', 'krbpwdpolicy']
default_attributes = [
'cn', 'cospriority', 'krbmaxpwdlife', 'krbminpwdlife',
'krbpwdhistorylength', 'krbpwdmindiffchars', 'krbpwdminlength',
]
takes_params = (
Str('cn?',
cli_name='group',
label=_('Group'),
doc=_('Manage password policy for specific group'),
primary_key=True,
),
Int('krbmaxpwdlife?',
cli_name='maxlife',
label=_('Max lifetime (days)'),
doc=_('Maximum password lifetime (in days)'),
minvalue=0,
),
Int('krbminpwdlife?',
cli_name='minlife',
label=_('Min lifetime (hours)'),
doc=_('Minimum password lifetime (in hours)'),
minvalue=0,
),
Int('krbpwdhistorylength?',
cli_name='history',
label=_('History size'),
doc=_('Password history size'),
minvalue=0,
),
Int('krbpwdmindiffchars?',
cli_name='minclasses',
label=_('Character classes'),
doc=_('Minimum number of character classes'),
minvalue=0,
maxvalue=5,
),
Int('krbpwdminlength?',
cli_name='minlength',
label=_('Min length'),
doc=_('Minimum length of password'),
minvalue=0,
),
Int('cospriority',
cli_name='priority',
label=_('Priority'),
doc=_('Priority of the policy (higher number means lower priority'),
minvalue=0,
),
)
def get_dn(self, *keys, **options):
if keys[-1] is not None:
return self.backend.make_dn_from_attr(
self.primary_key.name, keys[-1], self.container_dn
)
return self.api.env.container_accounts
def convert_time_for_output(self, entry_attrs, **options):
# Convert seconds to hours and days for displaying to user
if not options.get('raw', False):
if 'krbmaxpwdlife' in entry_attrs:
entry_attrs['krbmaxpwdlife'][0] = unicode(
int(entry_attrs['krbmaxpwdlife'][0]) / 86400
)
if 'krbminpwdlife' in entry_attrs:
entry_attrs['krbminpwdlife'][0] = unicode(
int(entry_attrs['krbminpwdlife'][0]) / 3600
)
def convert_time_on_input(self, entry_attrs):
# Convert hours and days to seconds for writing to LDAP
if 'krbmaxpwdlife' in entry_attrs and entry_attrs['krbmaxpwdlife']:
entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400
if 'krbminpwdlife' in entry_attrs and entry_attrs['krbminpwdlife']:
entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600
api.register(pwpolicy2)
class pwpolicy2_add(LDAPCreate):
"""
Create new group password policy.
"""
def get_args(self):
yield self.obj.primary_key.clone(attribute=True, required=True)
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
self.api.Command.cosentry_add(
keys[-1], krbpwdpolicyreference=dn,
cospriority=options.get('cospriority')
)
if 'cospriority' in entry_attrs:
del entry_attrs['cospriority']
self.obj.convert_time_on_input(entry_attrs)
return dn
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
self.log.info('%r' % entry_attrs)
if not options.get('raw', False):
if options.get('cospriority') is not None:
entry_attrs['cospriority'] = [unicode(options['cospriority'])]
self.obj.convert_time_for_output(entry_attrs, **options)
return dn
api.register(pwpolicy2_add)
class pwpolicy2_del(LDAPDelete):
"""
Delete group password policy.
"""
def get_args(self):
yield self.obj.primary_key.clone(attribute=True, required=True)
def post_callback(self, ldap, dn, *keys, **options):
try:
self.api.Command.cosentry_del(keys[-1])
except errors.NotFound:
pass
return True
api.register(pwpolicy2_del)
class pwpolicy2_mod(LDAPUpdate):
"""
Modify group password policy.
"""
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
if options.get('cospriority') is not None:
if keys[-1] is None:
raise errors.ValidationError(
name='priority',
error=_('priority cannot be set on global policy')
)
try:
self.api.Command.cosentry_mod(
keys[-1], cospriority=options['cospriority']
)
except errors.NotFound:
self.api.Command.cosentry_add(
keys[-1], krbpwdpolicyreference=dn,
cospriority=options['cospriority']
)
del entry_attrs['cospriority']
self.obj.convert_time_on_input(entry_attrs)
return dn
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
if not options.get('raw', False):
if options.get('cospriority') is not None:
entry_attrs['cospriority'] = [unicode(options['cospriority'])]
if keys[-1] is None:
entry_attrs['cn'] = GLOBAL_POLICY_NAME
self.obj.convert_time_for_output(entry_attrs, **options)
return dn
def exc_callback(self, keys, options, exc, call_func, *call_args, **call_kwargs):
if isinstance(exc, errors.EmptyModlist):
entry_attrs = call_args[1]
if not entry_attrs and 'cospriority' in options:
return
raise exc
api.register(pwpolicy2_mod)
class pwpolicy2_show(LDAPRetrieve):
"""
Display group password policy.
"""
takes_options = (
Str('user?',
label=_('User'),
doc=_('Display effective policy for a specific user'),
),
)
def pre_callback(self, ldap, dn, attrs_list, *keys, **options):
if options.get('user') is not None:
user_entry = self.api.Command.user_show(
options['user'], all=True
)['result']
if 'krbpwdpolicyreference' in user_entry:
return user_entry.get('krbpwdpolicyreference', [dn])[0]
return dn
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
if not options.get('raw', False):
if keys[-1] is not None:
try:
cos_entry = self.api.Command.cosentry_show(
keys[-1]
)['result']
if cos_entry.get('cospriority') is not None:
entry_attrs['cospriority'] = cos_entry['cospriority']
except errors.NotFound:
pass
else:
entry_attrs['cn'] = GLOBAL_POLICY_NAME
self.obj.convert_time_for_output(entry_attrs, **options)
return dn
api.register(pwpolicy2_show)
class pwpolicy2_find(LDAPSearch):
"""
Search for group password policies.
"""
def post_callback(self, ldap, entries, truncated, *args, **options):
if not options.get('raw', False):
for e in entries:
try:
cos_entry = self.api.Command.cosentry_show(
e[1]['cn'][0]
)['result']
if cos_entry.get('cospriority') is not None:
e[1]['cospriority'] = cos_entry['cospriority']
except errors.NotFound:
pass
self.obj.convert_time_for_output(e[1], **options)
if not args[-1]:
global_entry = self.api.Command.pwpolicy2_show(
all=options.get('all', False), raw=options.get('raw', False)
)['result']
dn = global_entry['dn']
del global_entry['dn']
entries.insert(0, (dn, global_entry))
api.register(pwpolicy2_find)

View File

@ -1,7 +1,8 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2009 Red Hat
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@ -30,11 +31,11 @@ class test_pwpolicy(XMLRPC_test):
"""
Test the `pwpolicy` plugin.
"""
group = u'testgroup1'
group2 = u'testgroup2'
user = u'testuser1'
kw = {'group': group, 'cospriority': 1, 'krbminpwdlife': 30, 'krbmaxpwdlife': 40, 'krbpwdhistorylength': 5, 'krbpwdminlength': 6 }
kw2 = {'group': group2, 'cospriority': 2, 'krbminpwdlife': 40, 'krbmaxpwdlife': 60, 'krbpwdhistorylength': 8, 'krbpwdminlength': 9 }
group = u'testgroup12'
group2 = u'testgroup22'
user = u'testuser12'
kw = {'cospriority': 1, 'krbminpwdlife': 30, 'krbmaxpwdlife': 40, 'krbpwdhistorylength': 5, 'krbpwdminlength': 6 }
kw2 = {'cospriority': 2, 'krbminpwdlife': 40, 'krbmaxpwdlife': 60, 'krbpwdhistorylength': 8, 'krbpwdminlength': 9 }
def test_1_pwpolicy_add(self):
"""
@ -49,21 +50,22 @@ class test_pwpolicy(XMLRPC_test):
)
api.Command.group_add_member(self.group, users=self.user)
entry = api.Command['pwpolicy_add'](**self.kw)['result']
entry = api.Command['pwpolicy_add'](self.group, **self.kw)['result']
assert_attr_equal(entry, 'krbminpwdlife', '30')
assert_attr_equal(entry, 'krbmaxpwdlife', '40')
assert_attr_equal(entry, 'krbpwdhistorylength', '5')
assert_attr_equal(entry, 'krbpwdminlength', '6')
assert_attr_equal(entry, 'cospriority', '1')
def test_2_pwpolicy_add(self):
"""
Add a policy with a duplicate priority
Add a policy with a already used priority.
The priority validation is done first so it's ok that the group
The priority validation is done first, so it's OK that the group
is the same here.
"""
try:
api.Command['pwpolicy_add'](**self.kw)
api.Command['pwpolicy_add'](self.group, **self.kw)
except errors.ValidationError:
pass
else:
@ -71,12 +73,12 @@ class test_pwpolicy(XMLRPC_test):
def test_3_pwpolicy_add(self):
"""
Add a policy that already exists
Add a policy that already exists.
"""
try:
# cospriority needs to be unique
self.kw['cospriority'] = 3
api.Command['pwpolicy_add'](**self.kw)
api.Command['pwpolicy_add'](self.group, **self.kw)
except errors.DuplicateEntry:
pass
else:
@ -89,18 +91,19 @@ class test_pwpolicy(XMLRPC_test):
self.failsafe_add(
api.Object.group, self.group2, description=u'pwpolicy test group 2'
)
entry = api.Command['pwpolicy_add'](**self.kw2)['result']
entry = api.Command['pwpolicy_add'](self.group2, **self.kw2)['result']
assert_attr_equal(entry, 'krbminpwdlife', '40')
assert_attr_equal(entry, 'krbmaxpwdlife', '60')
assert_attr_equal(entry, 'krbpwdhistorylength', '8')
assert_attr_equal(entry, 'krbpwdminlength', '9')
assert_attr_equal(entry, 'cospriority', '2')
def test_5_pwpolicy_add(self):
"""
Add a pwpolicy for a non-existent group
"""
try:
api.Command['pwpolicy_add'](group=u'nopwpolicy',cospriority=4,krbminpwdlife=1)
api.Command['pwpolicy_add'](u'nopwpolicy', cospriority=1, krbminpwdlife=1)
except errors.NotFound:
pass
else:
@ -121,11 +124,12 @@ class test_pwpolicy(XMLRPC_test):
"""
Test the `xmlrpc.pwpolicy_show` method.
"""
entry = api.Command['pwpolicy_show'](group=self.group)['result']
entry = api.Command['pwpolicy_show'](self.group)['result']
assert_attr_equal(entry, 'krbminpwdlife', '30')
assert_attr_equal(entry, 'krbmaxpwdlife', '40')
assert_attr_equal(entry, 'krbpwdhistorylength', '5')
assert_attr_equal(entry, 'krbpwdminlength', '6')
assert_attr_equal(entry, 'cospriority', '1')
def test_8_pwpolicy_mod(self):
"""
@ -142,28 +146,17 @@ class test_pwpolicy(XMLRPC_test):
"""
Test the `xmlrpc.pwpolicy_mod` method.
"""
entry = api.Command['pwpolicy_mod'](group=self.group, krbminpwdlife=50)['result']
entry = api.Command['pwpolicy_mod'](self.group, krbminpwdlife=50)['result']
assert_attr_equal(entry, 'krbminpwdlife', '50')
def test_a_pwpolicy_mod(self):
"""
Test `xmlrpc.pwpolicy_mod` with a duplicate priority
"""
try:
api.Command['pwpolicy_mod'](group=self.group, cospriority=1)
except errors.ValidationError:
pass
else:
assert False
def test_b_pwpolicy_del(self):
def test_a_pwpolicy_del(self):
"""
Test the `xmlrpc.pwpolicy_del` method.
"""
assert api.Command['pwpolicy_del'](group=self.group)['result'] is True
assert api.Command['pwpolicy_del'](self.group)['result'] is True
# Verify that it is gone
try:
api.Command['pwpolicy_show'](group=self.group)
api.Command['pwpolicy_show'](self.group)
except errors.NotFound:
pass
else:
@ -175,3 +168,4 @@ class test_pwpolicy(XMLRPC_test):
# Remove the user we created
api.Command['user_del'](self.user)

View File

@ -1,171 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Test the `ipalib/plugins/pwpolicy2.py` module.
"""
import sys
from xmlrpc_test import XMLRPC_test, assert_attr_equal
from ipalib import api
from ipalib import errors
class test_pwpolicy2(XMLRPC_test):
"""
Test the `pwpolicy2` plugin.
"""
group = u'testgroup12'
group2 = u'testgroup22'
user = u'testuser12'
kw = {'cospriority': 1, 'krbminpwdlife': 30, 'krbmaxpwdlife': 40, 'krbpwdhistorylength': 5, 'krbpwdminlength': 6 }
kw2 = {'cospriority': 2, 'krbminpwdlife': 40, 'krbmaxpwdlife': 60, 'krbpwdhistorylength': 8, 'krbpwdminlength': 9 }
def test_1_pwpolicy2_add(self):
"""
Test adding a per-group policy using the `xmlrpc.pwpolicy2_add` method.
"""
# First set up a group and user that will use this policy
self.failsafe_add(
api.Object.group, self.group, description=u'pwpolicy test group',
)
self.failsafe_add(
api.Object.user, self.user, givenname=u'Test', sn=u'User'
)
api.Command.group_add_member(self.group, users=self.user)
entry = api.Command['pwpolicy2_add'](self.group, **self.kw)['result']
assert_attr_equal(entry, 'krbminpwdlife', '30')
assert_attr_equal(entry, 'krbmaxpwdlife', '40')
assert_attr_equal(entry, 'krbpwdhistorylength', '5')
assert_attr_equal(entry, 'krbpwdminlength', '6')
assert_attr_equal(entry, 'cospriority', '1')
def test_2_pwpolicy2_add(self):
"""
Add a policy with a already used priority.
The priority validation is done first, so it's OK that the group
is the same here.
"""
try:
api.Command['pwpolicy2_add'](self.group, **self.kw)
except errors.ValidationError:
pass
else:
assert False
def test_3_pwpolicy2_add(self):
"""
Add a policy that already exists.
"""
try:
# cospriority needs to be unique
self.kw['cospriority'] = 3
api.Command['pwpolicy2_add'](self.group, **self.kw)
except errors.DuplicateEntry:
pass
else:
assert False
def test_4_pwpolicy2_add(self):
"""
Test adding another per-group policy using the `xmlrpc.pwpolicy2_add` method.
"""
self.failsafe_add(
api.Object.group, self.group2, description=u'pwpolicy test group 2'
)
entry = api.Command['pwpolicy2_add'](self.group2, **self.kw2)['result']
assert_attr_equal(entry, 'krbminpwdlife', '40')
assert_attr_equal(entry, 'krbmaxpwdlife', '60')
assert_attr_equal(entry, 'krbpwdhistorylength', '8')
assert_attr_equal(entry, 'krbpwdminlength', '9')
assert_attr_equal(entry, 'cospriority', '2')
def test_5_pwpolicy2_add(self):
"""
Add a pwpolicy for a non-existent group
"""
try:
api.Command['pwpolicy2_add'](u'nopwpolicy', cospriority=1, krbminpwdlife=1)
except errors.NotFound:
pass
else:
assert False
def test_6_pwpolicy2_show(self):
"""
Test the `xmlrpc.pwpolicy2_show` method with global policy.
"""
entry = api.Command['pwpolicy2_show']()['result']
# Note that this assumes an unchanged global policy
assert_attr_equal(entry, 'krbminpwdlife', '1')
assert_attr_equal(entry, 'krbmaxpwdlife', '90')
assert_attr_equal(entry, 'krbpwdhistorylength', '0')
assert_attr_equal(entry, 'krbpwdminlength', '8')
def test_7_pwpolicy2_show(self):
"""
Test the `xmlrpc.pwpolicy2_show` method.
"""
entry = api.Command['pwpolicy2_show'](self.group)['result']
assert_attr_equal(entry, 'krbminpwdlife', '30')
assert_attr_equal(entry, 'krbmaxpwdlife', '40')
assert_attr_equal(entry, 'krbpwdhistorylength', '5')
assert_attr_equal(entry, 'krbpwdminlength', '6')
assert_attr_equal(entry, 'cospriority', '1')
def test_8_pwpolicy2_mod(self):
"""
Test the `xmlrpc.pwpolicy2_mod` method for global policy.
"""
entry = api.Command['pwpolicy2_mod'](krbminpwdlife=50)['result']
assert_attr_equal(entry, 'krbminpwdlife', '50')
# Great, now change it back
entry = api.Command['pwpolicy2_mod'](krbminpwdlife=1)['result']
assert_attr_equal(entry, 'krbminpwdlife', '1')
def test_9_pwpolicy2_mod(self):
"""
Test the `xmlrpc.pwpolicy2_mod` method.
"""
entry = api.Command['pwpolicy2_mod'](self.group, krbminpwdlife=50)['result']
assert_attr_equal(entry, 'krbminpwdlife', '50')
def test_a_pwpolicy_del(self):
"""
Test the `xmlrpc.pwpolicy2_del` method.
"""
assert api.Command['pwpolicy2_del'](self.group)['result'] is True
# Verify that it is gone
try:
api.Command['pwpolicy2_show'](self.group)
except errors.NotFound:
pass
else:
assert False
# Remove the groups we created
api.Command['group_del'](self.group)
api.Command['group_del'](self.group2)
# Remove the user we created
api.Command['user_del'](self.user)