Add support for per-group kerberos password policy.

Use a Class of Service template to do per-group password policy. The
design calls for non-overlapping groups but with cospriority we can
still make sense of things.

The password policy entries stored under the REALM are keyed only on
the group name because the MIT ldap plugin can't handle quotes in the
DN. It also can't handle spaces between elements in the DN.
This commit is contained in:
Rob Crittenden 2009-10-02 09:30:16 -04:00 committed by Jason Gerard DeRose
parent 97dfa586de
commit dac224c25a
4 changed files with 438 additions and 21 deletions

View File

@ -974,6 +974,30 @@ done:
return ret; return ret;
} }
/* Easier handling for virtual attributes. You must call pwd_values_free()
* to free memory allocated here. It must be called before
* slapi_free_search_results_internal(entries) or
* slapi_pblock_destroy(pb)
*/
static int
pwd_get_values(const Slapi_Entry *ent, const char *attrname,
Slapi_ValueSet** results, char** actual_type_name,
int *buffer_flags)
{
int flags=0;
int type_name_disposition = 0;
int ret = slapi_vattr_values_get((Slapi_Entry *)ent, (char *)attrname, results, &type_name_disposition, actual_type_name, flags, buffer_flags);
return ret;
}
static void
pwd_values_free(Slapi_ValueSet** results, char** actual_type_name, int buffer_flags)
{
slapi_vattr_values_free(results, actual_type_name, buffer_flags);
}
/* searches the directory and finds the policy closest to the DN */ /* searches the directory and finds the policy closest to the DN */
/* return 0 on success, -1 on error or if no policy is found */ /* return 0 on success, -1 on error or if no policy is found */
static int ipapwd_getPolicy(const char *dn, Slapi_Entry *target, Slapi_Entry **e) static int ipapwd_getPolicy(const char *dn, Slapi_Entry *target, Slapi_Entry **e)
@ -991,6 +1015,9 @@ static int ipapwd_getPolicy(const char *dn, Slapi_Entry *target, Slapi_Entry **e
char **edn; char **edn;
int ret, res, dist, rdnc, scope, i; int ret, res, dist, rdnc, scope, i;
Slapi_DN *sdn = NULL; Slapi_DN *sdn = NULL;
int buffer_flags=0;
Slapi_ValueSet* results = NULL;
char* actual_type_name = NULL;
slapi_log_error(SLAPI_LOG_TRACE, "ipa_pwd_extop", slapi_log_error(SLAPI_LOG_TRACE, "ipa_pwd_extop",
"ipapwd_getPolicy: Searching policy for [%s]\n", dn); "ipapwd_getPolicy: Searching policy for [%s]\n", dn);
@ -1003,10 +1030,15 @@ static int ipapwd_getPolicy(const char *dn, Slapi_Entry *target, Slapi_Entry **e
goto done; goto done;
} }
krbPwdPolicyReference = slapi_entry_attr_get_charptr(target, "krbPwdPolicyReference"); pwd_get_values(target, "krbPwdPolicyReference", &results, &actual_type_name, &buffer_flags);
if (krbPwdPolicyReference) { if (results) {
Slapi_Value *sv;
slapi_valueset_first_value(results, &sv);
krbPwdPolicyReference = slapi_value_get_string(sv);
pdn = krbPwdPolicyReference; pdn = krbPwdPolicyReference;
scope = LDAP_SCOPE_BASE; scope = LDAP_SCOPE_BASE;
slapi_log_error(SLAPI_LOG_TRACE, "ipa_pwd_extop",
"ipapwd_getPolicy: using policy reference: %s\n", pdn);
} else { } else {
/* Find ancestor base DN */ /* Find ancestor base DN */
be = slapi_be_select(sdn); be = slapi_be_select(sdn);
@ -1117,6 +1149,9 @@ static int ipapwd_getPolicy(const char *dn, Slapi_Entry *target, Slapi_Entry **e
*e = slapi_entry_dup(pe); *e = slapi_entry_dup(pe);
ret = 0; ret = 0;
done: done:
if (results) {
pwd_values_free(&results, &actual_type_name, buffer_flags);
}
if (pb) { if (pb) {
slapi_free_search_results_internal(pb); slapi_free_search_results_internal(pb);
slapi_pblock_destroy(pb); slapi_pblock_destroy(pb);
@ -1597,7 +1632,7 @@ no_policy:
if (pwdCharLen < krbPwdMinLength) { if (pwdCharLen < krbPwdMinLength) {
slapi_log_error(SLAPI_LOG_TRACE, "ipa_pwd_extop", slapi_log_error(SLAPI_LOG_TRACE, "ipa_pwd_extop",
"ipapwd_checkPassword: Password too short\n"); "ipapwd_checkPassword: Password too short (%d < %d)\n", pwdCharLen, krbPwdMinLength);
return IPAPWD_POLICY_ERROR | LDAP_PWPOLICY_PWDTOOSHORT; return IPAPWD_POLICY_ERROR | LDAP_PWPOLICY_PWDTOOSHORT;
} }

View File

@ -204,3 +204,16 @@ dn: cn=Activated,cn=Account Inactivation,cn=accounts,$SUFFIX
changetype: add changetype: add
objectclass: top objectclass: top
objectclass: groupofnames objectclass: groupofnames
# templates for this cos definition are managed by the pwpolicy plugin
dn: cn=Password Policy,cn=accounts,$SUFFIX
changetype: add
description: Password Policy based on group membership
objectClass: top
objectClass: ldapsubentry
objectClass: cosSuperDefinition
objectClass: cosClassicDefinition
cosTemplateDn: cn=cosTemplates,cn=accounts,$SUFFIX
cosAttribute: krbPwdPolicyReference
cosSpecifier: memberOf

View File

@ -22,11 +22,13 @@
Password policy Password policy
""" """
from ipalib import api, errors from ipalib import api, crud, errors
from ipalib import Command from ipalib import Command, Object
from ipalib import Int from ipalib import Int, Str
from ldap.functions import explode_dn
_fields = { _fields = {
'group': 'Group policy',
'krbminpwdlife': 'Minimum lifetime (in hours)', 'krbminpwdlife': 'Minimum lifetime (in hours)',
'krbmaxpwdlife': 'Maximum lifetime (in days)', 'krbmaxpwdlife': 'Maximum lifetime (in days)',
'krbpwdmindiffchars': 'Minimum number of characters classes', 'krbpwdmindiffchars': 'Minimum number of characters classes',
@ -35,6 +37,7 @@ _fields = {
} }
def _convert_time_for_output(entry_attrs): def _convert_time_for_output(entry_attrs):
# Convert seconds to hours and days for displaying to user
if 'krbmaxpwdlife' in entry_attrs: if 'krbmaxpwdlife' in entry_attrs:
entry_attrs['krbmaxpwdlife'][0] = str( entry_attrs['krbmaxpwdlife'][0] = str(
int(entry_attrs['krbmaxpwdlife'][0]) / 86400 int(entry_attrs['krbmaxpwdlife'][0]) / 86400
@ -44,12 +47,63 @@ def _convert_time_for_output(entry_attrs):
int(entry_attrs['krbminpwdlife'][0]) / 3600 int(entry_attrs['krbminpwdlife'][0]) / 3600
) )
def _convert_time_on_input(entry_attrs):
# Convert hours and days to seconds for writing to LDAP
if 'krbmaxpwdlife' in entry_attrs:
entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400
if 'krbminpwdlife' in entry_attrs:
entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600
class pwpolicy_mod(Command): def make_cos_entry(group, cospriority=None):
""" """
Modify password policy. Make the CoS dn and entry for this group.
Returns (cos_dn, cos_entry) where:
cos_dn = DN of the new CoS entry
cos_entry = entry representing this new object
""" """
takes_options = (
try:
(groupdn, group_attrs) = api.Command['group_show'](group)
except errors.NotFound:
raise errors.NotFound(reason="group '%s' does not exist" % group)
cos_entry = {}
if cospriority:
cos_entry['cospriority'] = cospriority
cos_entry['objectclass'] = ['top', 'costemplate', 'extensibleobject', 'krbcontainer']
cos_dn = 'cn=\"%s\", cn=cosTemplates, cn=accounts, %s' % (groupdn, api.env.basedn)
return (cos_dn, cos_entry)
def make_policy_entry(group_cn, policy_entry):
"""
Make the krbpwdpolicy dn and entry for this group.
Returns (policy_dn, policy_entry) where:
policy_dn = DN of the new password policy entry
policy_entry = entry representing this new object
"""
# This DN must *NOT* have spaces between elements
policy_dn = "cn=%s,cn=%s,cn=kerberos,%s" % (group_cn, api.env.realm, api.env.basedn)
# Create the krb password policy entry. This MUST be located
# in the same container as the REALM or the kldap plugin won't
# recognize it. The usual CoS trick of putting the whole DN into
# the dn won't work either because the kldap plugin doesn't like
# quotes in the DN.
policy_entry['objectclass'] = ['top', 'nscontainer', 'krbpwdpolicy']
policy_entry['cn'] = group_cn
return (policy_dn, policy_entry)
class pwpolicy(Object):
"""
Password Policy object.
"""
takes_params = (
Int('krbmaxpwdlife?', Int('krbmaxpwdlife?',
cli_name='maxlife', cli_name='maxlife',
doc='Max. Password Lifetime (days)', doc='Max. Password Lifetime (days)',
@ -82,21 +136,96 @@ class pwpolicy_mod(Command):
), ),
) )
api.register(pwpolicy)
class pwpolicy_add(crud.Create):
"""
Create a new password policy associated with a group.
"""
takes_options = (
Str('group',
doc='Group to set policy for',
attribute=False,
),
Int('cospriority',
cli_name='priority',
doc='Priority of the policy. Higher number equals higher priority',
minvalue=0,
attribute=True,
),
)
def execute(self, *args, **options):
ldap = self.api.Backend.ldap2
group_cn = options['group']
# Create the CoS template
(cos_dn, cos_entry) = make_cos_entry(group_cn, options.get('cospriority', None))
if 'cospriority' in options:
del options['cospriority']
# Create the new password policy
policy_entry = self.args_options_2_entry(*args, **options)
(policy_dn, policy_entry) = make_policy_entry(group_cn, policy_entry)
_convert_time_on_input(policy_entry)
# Link the two entries together
cos_entry['krbpwdpolicyreference'] = policy_dn
ldap.add_entry(policy_dn, policy_entry, normalize=False)
ldap.add_entry(cos_dn, cos_entry, normalize=False)
# The policy is what is interesting, return that
(dn, entry_attrs) = ldap.get_entry(policy_dn, policy_entry.keys())
_convert_time_for_output(entry_attrs)
return (dn, entry_attrs)
def output_for_cli(self, textui, result, *args, **options):
# textui.print_name(self.name)
# textui.print_dashed("Added policy for '%s'." % options['group'])
(dn, entry_attrs) = result
textui.print_name(self.name)
textui.print_plain('Password policy:')
for (k, v) in _fields.iteritems():
if k in entry_attrs:
textui.print_attribute(v, entry_attrs[k])
textui.print_dashed('Modified password policy.')
api.register(pwpolicy_add)
class pwpolicy_mod(crud.Update):
"""
Modify password policy.
"""
takes_options = (
Str('group?',
doc='Group to set policy for',
attribute=False,
),
Int('cospriority?',
cli_name='priority',
doc='Priority of the policy. Higher number equals higher priority',
minvalue=0,
attribute=True,
),
)
def execute(self, *args, **options): def execute(self, *args, **options):
assert 'dn' not in options assert 'dn' not in options
ldap = self.api.Backend.ldap2 ldap = self.api.Backend.ldap2
entry_attrs = self.args_options_2_entry(*args, **options) if not 'group' in options:
dn = self.api.env.container_accounts dn = self.api.env.container_accounts
entry_attrs = self.args_options_2_entry(*args, **options)
# Convert hours and days to seconds else:
if 'krbmaxpwdlife' in entry_attrs: entry_attrs = self.args_options_2_entry(*args, **options)
entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400 (dn, entry_attrs) = make_policy_entry(options['group'], entry_attrs)
del entry_attrs['krbmaxpwdlife'] _convert_time_on_input(entry_attrs)
if 'krbminpwdlife' in entry_attrs:
entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600
del entry_attrs['krbminpwdlife']
try: try:
ldap.update_entry(dn, entry_attrs) ldap.update_entry(dn, entry_attrs)
except errors.EmptyModlist: except errors.EmptyModlist:
@ -120,17 +249,88 @@ class pwpolicy_mod(Command):
api.register(pwpolicy_mod) api.register(pwpolicy_mod)
class pwpolicy_del(crud.Delete):
"""
Delete a group password policy.
"""
takes_options = (
Str('group',
doc='Group to remove policy from',
),
)
def execute(self, *args, **options):
assert 'dn' not in options
ldap = self.api.Backend.ldap2
group_cn = options['group']
# Get the DN of the CoS template to delete
try:
(cos_dn, cos_entry) = make_cos_entry(group_cn, None)
except errors.NotFound:
# Ok, perhaps the group was deleted, try to make the group DN
rdn = ldap.make_rdn_from_attr('cn', group_cn)
group_dn = ldap.make_dn_from_rdn(rdn, api.env.container_group)
cos_dn = 'cn=\"%s\", cn=cosTemplates, cn=accounts, %s' % (group_dn, api.env.basedn)
policy_entry = self.args_options_2_entry(*args, **options)
(policy_dn, policy_entry) = make_policy_entry(group_cn, policy_entry)
ldap.delete_entry(policy_dn, normalize=False)
ldap.delete_entry(cos_dn, normalize=False)
return True
def output_for_cli(self, textui, result, *args, **options):
textui.print_name(self.name)
textui.print_dashed('Deleted policy "%s".' % options['group'])
api.register(pwpolicy_del)
class pwpolicy_show(Command): class pwpolicy_show(Command):
""" """
Display password policy. Display password policy.
""" """
takes_options = (
Str('group?',
doc='Group to display policy',
),
Str('user?',
doc='Display policy applied to a given user',
),
)
def execute(self, *args, **options): def execute(self, *args, **options):
ldap = self.api.Backend.ldap2 ldap = self.api.Backend.ldap2
dn = self.api.env.container_accounts dn = None
group = None
if 'user' in options:
rdn = ldap.make_rdn_from_attr('uid', options['user'])
user_dn = ldap.make_dn_from_rdn(rdn, api.env.container_user)
try:
(user_dn, user_attrs) = ldap.get_entry(user_dn, ['krbpwdpolicyreference'])
if 'krbpwdpolicyreference' in user_attrs:
dn = user_attrs['krbpwdpolicyreference'][0]
rdn = explode_dn(dn)
group = rdn[0].replace('cn=','')
except errors.NotFound:
raise errors.NotFound(reason="user '%s' not found" % options['user'])
if dn is None:
if not 'group' in options:
dn = self.api.env.container_accounts
else:
policy_entry = self.args_options_2_entry(*args, **options)
(dn, policy_entry) = make_policy_entry(options['group'], policy_entry)
(dn, entry_attrs) = ldap.get_entry(dn) (dn, entry_attrs) = ldap.get_entry(dn)
if 'user' in options:
if group:
entry_attrs['group'] = group
else:
entry_attrs['group'] = 'global'
_convert_time_for_output(entry_attrs) _convert_time_for_output(entry_attrs)
return (dn, entry_attrs) return (dn, entry_attrs)

View File

@ -0,0 +1,169 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Test the `ipalib/plugins/pwpolicy.py` module.
"""
import sys
from xmlrpc_test import XMLRPC_test, assert_attr_equal
from ipalib import api
from ipalib import errors
class test_pwpolicy(XMLRPC_test):
"""
Test the `pwpolicy` plugin.
"""
group = u'testgroup1'
group2 = u'testgroup2'
user = u'testuser1'
kw = {'group': group, 'cospriority': 1, 'krbminpwdlife': 30, 'krbmaxpwdlife': 40, 'krbpwdhistorylength': 5, 'krbpwdminlength': 6 }
kw2 = {'group': group2, 'cospriority': 2, 'krbminpwdlife': 40, 'krbmaxpwdlife': 60, 'krbpwdhistorylength': 8, 'krbpwdminlength': 9 }
def test_1_pwpolicy_add(self):
"""
Test adding a per-group policy using the `xmlrpc.pwpolicy_add` method.
"""
# First set up a group and user that will use this policy
(groupdn, res) = api.Command['group_add'](self.group, description=u'pwpolicy test group')
(userdn, res) = api.Command['user_add'](self.user, givenname=u'Test', sn=u'User')
(total, failed, res) = api.Command['group_add_member'](self.group, users=self.user)
(dn, res) = api.Command['pwpolicy_add'](**self.kw)
assert res
assert_attr_equal(res, 'krbminpwdlife', '30')
assert_attr_equal(res, 'krbmaxpwdlife', '40')
assert_attr_equal(res, 'krbpwdhistorylength', '5')
assert_attr_equal(res, 'krbpwdminlength', '6')
def test_2_pwpolicy_add(self):
"""
Add a policy that already exists
"""
try:
api.Command['pwpolicy_add'](**self.kw)
except errors.DuplicateEntry:
pass
else:
assert False
def test_3_pwpolicy_add(self):
"""
Test adding another per-group policy using the `xmlrpc.pwpolicy_add` method.
"""
(groupdn, res) = api.Command['group_add'](self.group2, description=u'pwpolicy test group 2')
(dn, res) = api.Command['pwpolicy_add'](**self.kw2)
assert res
assert_attr_equal(res, 'krbminpwdlife', '40')
assert_attr_equal(res, 'krbmaxpwdlife', '60')
assert_attr_equal(res, 'krbpwdhistorylength', '8')
assert_attr_equal(res, 'krbpwdminlength', '9')
def test_4_pwpolicy_add(self):
"""
Add a pwpolicy for a non-existant group
"""
try:
api.Command['pwpolicy_add'](group=u'nopwpolicy',cospriority=1,krbminpwdlife=1)
except errors.NotFound:
pass
else:
assert False
def test_5_pwpolicy_show(self):
"""
Test the `xmlrpc.pwpolicy_show` method with global policy.
"""
(dn, res) = api.Command['pwpolicy_show']()
assert res
# Note that this assumes an unchanged global policy
assert_attr_equal(res, 'krbminpwdlife', '1')
assert_attr_equal(res, 'krbmaxpwdlife', '90')
assert_attr_equal(res, 'krbpwdhistorylength', '0')
assert_attr_equal(res, 'krbpwdminlength', '8')
def test_6_pwpolicy_show(self):
"""
Test the `xmlrpc.pwpolicy_show` method.
"""
(dn, res) = api.Command['pwpolicy_show'](group=self.group)
assert res
assert_attr_equal(res, 'krbminpwdlife', '30')
assert_attr_equal(res, 'krbmaxpwdlife', '40')
assert_attr_equal(res, 'krbpwdhistorylength', '5')
assert_attr_equal(res, 'krbpwdminlength', '6')
def test_7_pwpolicy_mod(self):
"""
Test the `xmlrpc.pwpolicy_mod` method for global policy.
"""
(dn, res) = api.Command['pwpolicy_mod'](krbminpwdlife=50)
assert res
assert_attr_equal(res, 'krbminpwdlife', '50')
# Great, now change it back
(dn, res) = api.Command['pwpolicy_mod'](krbminpwdlife=1)
assert res
assert_attr_equal(res, 'krbminpwdlife', '1')
def test_8_pwpolicy_mod(self):
"""
Test the `xmlrpc.pwpolicy_mod` method.
"""
(dn, res) = api.Command['pwpolicy_mod'](group=self.group, krbminpwdlife=50)
assert res
assert_attr_equal(res, 'krbminpwdlife', '50')
def test_9_pwpolicy_del(self):
"""
Test the `xmlrpc.pwpolicy_del` method.
"""
res = api.Command['pwpolicy_del'](group=self.group)
assert res == True
# Verify that it is gone
try:
api.Command['pwpolicy_show'](group=self.group)
except errors.NotFound:
pass
else:
assert False
# Remove the groups we created
res = api.Command['group_del'](self.group)
res = api.Command['group_del'](self.group2)
# Remove the user we created
res = api.Command['user_del'](self.user)
def test_a_pwpolicy_del(self):
"""
Remove the second test policy with `xmlrpc.pwpolicy_del`.
"""
res = api.Command['pwpolicy_del'](group=self.group2)
assert res == True
# Verify that it is gone
try:
api.Command['pwpolicy_show'](group=self.group2)
except errors.NotFound:
pass
else:
assert False