User life cycle: stageuser-add verb

Add a accounts plugin (accounts class) that defines
variables and methods common to 'users' and 'stageuser'.
accounts is a superclass of users/stageuser

Add the stageuser plugin, with support of stageuser-add verb.

Reviewed By: David Kupka, Martin Basti, Jan Cholasta

https://fedorahosted.org/freeipa/ticket/3813

Reviewed-By: Jan Cholasta <jcholast@redhat.com>
Reviewed-By: David Kupka <dkupka@redhat.com>
This commit is contained in:
Thierry bordaz (tbordaz)
2015-03-05 14:25:33 +01:00
committed by Martin Kosek
parent c3ede5f1e9
commit d1691eee88
7 changed files with 856 additions and 415 deletions

View File

@@ -25,6 +25,12 @@ import os
from ipalib import api, errors
from ipalib import Flag, Int, Password, Str, Bool, StrEnum, DateTime
from ipalib.plugins.baseuser import baseuser, baseuser_add, baseuser_del, \
baseuser_mod, baseuser_find, baseuser_show, \
NO_UPG_MAGIC, UPG_DEFINITION_DN, baseuser_output_params, \
status_baseuser_output_params, baseuser_pwdchars, \
validate_nsaccountlock, radius_dn2pk, convert_nsaccountlock, split_principal, validate_principal, \
normalize_principal, fix_addressbook_permission_bindrule
from ipalib.plugable import Registry
from ipalib.plugins.baseldap import *
from ipalib.plugins import baseldap
@@ -85,105 +91,10 @@ EXAMPLES:
register = Registry()
NO_UPG_MAGIC = '__no_upg__'
user_output_params = (
Flag('has_keytab',
label=_('Kerberos keys available'),
),
Str('sshpubkeyfp*',
label=_('SSH public key fingerprint'),
),
)
user_output_params = baseuser_output_params
status_output_params = (
Str('server',
label=_('Server'),
),
Str('krbloginfailedcount',
label=_('Failed logins'),
),
Str('krblastsuccessfulauth',
label=_('Last successful authentication'),
),
Str('krblastfailedauth',
label=_('Last failed authentication'),
),
Str('now',
label=_('Time now'),
),
)
UPG_DEFINITION_DN = DN(('cn', 'UPG Definition'),
('cn', 'Definitions'),
('cn', 'Managed Entries'),
('cn', 'etc'),
api.env.basedn)
# characters to be used for generating random user passwords
user_pwdchars = string.digits + string.ascii_letters + '_,.@+-='
def validate_nsaccountlock(entry_attrs):
if 'nsaccountlock' in entry_attrs:
nsaccountlock = entry_attrs['nsaccountlock']
if not isinstance(nsaccountlock, (bool, Bool)):
if not isinstance(nsaccountlock, basestring):
raise errors.OnlyOneValueAllowed(attr='nsaccountlock')
if nsaccountlock.lower() not in ('true', 'false'):
raise errors.ValidationError(name='nsaccountlock',
error=_('must be TRUE or FALSE'))
def radius_dn2pk(api, entry_attrs):
cl = entry_attrs.get('ipatokenradiusconfiglink', None)
if cl:
pk = api.Object['radiusproxy'].get_primary_key_from_dn(cl[0])
entry_attrs['ipatokenradiusconfiglink'] = [pk]
def convert_nsaccountlock(entry_attrs):
if not 'nsaccountlock' in entry_attrs:
entry_attrs['nsaccountlock'] = False
else:
nsaccountlock = Bool('temp')
entry_attrs['nsaccountlock'] = nsaccountlock.convert(entry_attrs['nsaccountlock'][0])
def split_principal(principal):
"""
Split the principal into its components and do some basic validation.
Automatically append our realm if it wasn't provided.
"""
realm = None
parts = principal.split('@')
user = parts[0].lower()
if len(parts) > 2:
raise errors.MalformedUserPrincipal(principal=principal)
if len(parts) == 2:
realm = parts[1].upper()
# At some point we'll support multiple realms
if realm != api.env.realm:
raise errors.RealmMismatch()
else:
realm = api.env.realm
return (user, realm)
def validate_principal(ugettext, principal):
"""
All the real work is done in split_principal.
"""
(user, realm) = split_principal(principal)
return None
def normalize_principal(principal):
"""
Ensure that the name in the principal is lower-case. The realm is
upper-case by convention but it isn't required.
The principal is validated at this point.
"""
(user, realm) = split_principal(principal)
return unicode('%s@%s' % (user, realm))
status_output_params = status_baseuser_output_params
def check_protected_member(user, protected_group_name=u'admins'):
@@ -204,60 +115,17 @@ def check_protected_member(user, protected_group_name=u'admins'):
raise errors.LastMemberError(key=user, label=_(u'group'),
container=protected_group_name)
def fix_addressbook_permission_bindrule(name, template, is_new,
anonymous_read_aci,
**other_options):
"""Fix bind rule type for Read User Addressbook/IPA Attributes permission
When upgrading from an old IPA that had the global read ACI,
or when installing the first replica with granular read permissions,
we need to keep allowing anonymous access to many user attributes.
This fixup_function changes the bind rule type accordingly.
"""
if is_new and anonymous_read_aci:
template['ipapermbindruletype'] = 'anonymous'
@register()
class user(LDAPObject):
class user(baseuser):
"""
User object.
"""
container_dn = api.env.container_user
object_name = _('user')
object_name_plural = _('users')
object_class = ['posixaccount']
object_class_config = 'ipauserobjectclasses'
possible_objectclasses = [
'meporiginentry', 'ipauserauthtypeclass', 'ipauser',
'ipatokenradiusproxyuser'
]
disallow_object_classes = ['krbticketpolicyaux']
permission_filter_objectclasses = ['posixaccount']
search_attributes_config = 'ipausersearchfields'
default_attributes = [
'uid', 'givenname', 'sn', 'homedirectory', 'loginshell',
'uidnumber', 'gidnumber', 'mail', 'ou',
'telephonenumber', 'title', 'memberof', 'nsaccountlock',
'memberofindirect', 'ipauserauthtype', 'userclass',
'ipatokenradiusconfiglink', 'ipatokenradiususername',
'krbprincipalexpiration'
]
search_display_attributes = [
'uid', 'givenname', 'sn', 'homedirectory', 'loginshell',
'mail', 'telephonenumber', 'title', 'nsaccountlock',
'uidnumber', 'gidnumber', 'sshpubkeyfp',
]
uuid_attribute = 'ipauniqueid'
attribute_members = {
'memberof': ['group', 'netgroup', 'role', 'hbacrule', 'sudorule'],
'memberofindirect': ['group', 'netgroup', 'role', 'hbacrule', 'sudorule'],
}
rdn_is_primary_key = True
bindable = True
password_attributes = [('userpassword', 'has_password'),
('krbprincipalkey', 'has_keytab')]
container_dn = baseuser.active_container_dn
label = _('Users')
label_singular = _('User')
object_name = _('user')
object_name_plural = _('users')
managed_permissions = {
'System: Read User Standard Attributes': {
'replaces_global_anonymous_aci': True,
@@ -460,259 +328,28 @@ class user(LDAPObject):
},
}
label = _('Users')
label_singular = _('User')
takes_params = (
Str('uid',
pattern='^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,252}[a-zA-Z0-9_.$-]?$',
pattern_errmsg='may only include letters, numbers, _, -, . and $',
maxlength=255,
cli_name='login',
label=_('User login'),
primary_key=True,
default_from=lambda givenname, sn: givenname[0] + sn,
normalizer=lambda value: value.lower(),
),
Str('givenname',
cli_name='first',
label=_('First name'),
),
Str('sn',
cli_name='last',
label=_('Last name'),
),
Str('cn',
label=_('Full name'),
default_from=lambda givenname, sn: '%s %s' % (givenname, sn),
autofill=True,
),
Str('displayname?',
label=_('Display name'),
default_from=lambda givenname, sn: '%s %s' % (givenname, sn),
autofill=True,
),
Str('initials?',
label=_('Initials'),
default_from=lambda givenname, sn: '%c%c' % (givenname[0], sn[0]),
autofill=True,
),
Str('homedirectory?',
cli_name='homedir',
label=_('Home directory'),
),
Str('gecos?',
label=_('GECOS'),
default_from=lambda givenname, sn: '%s %s' % (givenname, sn),
autofill=True,
),
Str('loginshell?',
cli_name='shell',
label=_('Login shell'),
),
Str('krbprincipalname?', validate_principal,
cli_name='principal',
label=_('Kerberos principal'),
default_from=lambda uid: '%s@%s' % (uid.lower(), api.env.realm),
autofill=True,
flags=['no_update'],
normalizer=lambda value: normalize_principal(value),
),
DateTime('krbprincipalexpiration?',
cli_name='principal_expiration',
label=_('Kerberos principal expiration'),
),
Str('mail*',
cli_name='email',
label=_('Email address'),
),
Password('userpassword?',
cli_name='password',
label=_('Password'),
doc=_('Prompt to set the user password'),
# FIXME: This is temporary till bug is fixed causing updates to
# bomb out via the webUI.
exclude='webui',
),
Flag('random?',
doc=_('Generate a random user password'),
flags=('no_search', 'virtual_attribute'),
default=False,
),
Str('randompassword?',
label=_('Random password'),
flags=('no_create', 'no_update', 'no_search', 'virtual_attribute'),
),
Int('uidnumber?',
cli_name='uid',
label=_('UID'),
doc=_('User ID Number (system will assign one if not provided)'),
minvalue=1,
),
Int('gidnumber?',
label=_('GID'),
doc=_('Group ID Number'),
minvalue=1,
),
Str('street?',
cli_name='street',
label=_('Street address'),
),
Str('l?',
cli_name='city',
label=_('City'),
),
Str('st?',
cli_name='state',
label=_('State/Province'),
),
Str('postalcode?',
label=_('ZIP'),
),
Str('telephonenumber*',
cli_name='phone',
label=_('Telephone Number')
),
Str('mobile*',
label=_('Mobile Telephone Number')
),
Str('pager*',
label=_('Pager Number')
),
Str('facsimiletelephonenumber*',
cli_name='fax',
label=_('Fax Number'),
),
Str('ou?',
cli_name='orgunit',
label=_('Org. Unit'),
),
Str('title?',
label=_('Job Title'),
),
Str('manager?',
label=_('Manager'),
),
Str('carlicense*',
label=_('Car License'),
),
takes_params = baseuser.takes_params + (
Bool('nsaccountlock?',
label=_('Account disabled'),
flags=['no_option'],
),
Str('ipasshpubkey*', validate_sshpubkey,
cli_name='sshpubkey',
label=_('SSH public key'),
normalizer=normalize_sshpubkey,
csv=True,
flags=['no_search'],
),
StrEnum('ipauserauthtype*',
cli_name='user_auth_type',
label=_('User authentication types'),
doc=_('Types of supported user authentication'),
values=(u'password', u'radius', u'otp'),
csv=True,
),
Str('userclass*',
cli_name='class',
label=_('Class'),
doc=_('User category (semantics placed on this attribute are for '
'local interpretation)'),
),
Str('ipatokenradiusconfiglink?',
cli_name='radius',
label=_('RADIUS proxy configuration'),
),
Str('ipatokenradiususername?',
cli_name='radius_username',
label=_('RADIUS proxy username'),
),
Str('departmentnumber*',
label=_('Department Number'),
),
Str('employeenumber?',
label=_('Employee Number'),
),
Str('employeetype?',
label=_('Employee Type'),
),
Str('preferredlanguage?',
label=_('Preferred Language'),
pattern='^(([a-zA-Z]{1,8}(-[a-zA-Z]{1,8})?(;q\=((0(\.[0-9]{0,3})?)|(1(\.0{0,3})?)))?' \
+ '(\s*,\s*[a-zA-Z]{1,8}(-[a-zA-Z]{1,8})?(;q\=((0(\.[0-9]{0,3})?)|(1(\.0{0,3})?)))?)*)|(\*))$',
pattern_errmsg='must match RFC 2068 - 14.4, e.g., "da, en-gb;q=0.8, en;q=0.7"',
),
)
def _normalize_and_validate_email(self, email, config=None):
if not config:
config = self.backend.get_ipa_config()
# check if default email domain should be added
defaultdomain = config.get('ipadefaultemaildomain', [None])[0]
if email:
norm_email = []
if not isinstance(email, (list, tuple)):
email = [email]
for m in email:
if isinstance(m, basestring):
if '@' not in m and defaultdomain:
m = m + u'@' + defaultdomain
if not Email(m):
raise errors.ValidationError(name='email', error=_('invalid e-mail format: %(email)s') % dict(email=m))
norm_email.append(m)
else:
if not Email(m):
raise errors.ValidationError(name='email', error=_('invalid e-mail format: %(email)s') % dict(email=m))
norm_email.append(m)
return norm_email
return email
def _normalize_manager(self, manager):
"""
Given a userid verify the user's existence and return the dn.
"""
if not manager:
return None
if not isinstance(manager, list):
manager = [manager]
try:
container_dn = DN(self.container_dn, api.env.basedn)
for m in xrange(len(manager)):
if isinstance(manager[m], DN) and manager[m].endswith(container_dn):
continue
entry_attrs = self.backend.find_entry_by_attr(
self.primary_key.name, manager[m], self.object_class, [''],
container_dn
)
manager[m] = entry_attrs.dn
except errors.NotFound:
raise errors.NotFound(reason=_('manager %(manager)s not found') % dict(manager=manager[m]))
return manager
def _convert_manager(self, entry_attrs, **options):
"""
Convert a manager dn into a userid
"""
if options.get('raw', False):
return
if 'manager' in entry_attrs:
for m in xrange(len(entry_attrs['manager'])):
entry_attrs['manager'][m] = self.get_primary_key_from_dn(entry_attrs['manager'][m])
return super(user, self).normalize_manager(manager, self.active_container_dn)
@register()
class user_add(LDAPCreate):
class user_add(baseuser_add):
__doc__ = _('Add a new user.')
msg_summary = _('Added user "%(value)s"')
has_output_params = LDAPCreate.has_output_params + user_output_params
has_output_params = baseuser_add.has_output_params + user_output_params
takes_options = LDAPCreate.takes_options + (
Flag('noprivate',
@@ -798,21 +435,21 @@ class user_add(LDAPCreate):
entry_attrs['gidnumber'] = group_attrs['gidnumber']
if 'userpassword' not in entry_attrs and options.get('random'):
entry_attrs['userpassword'] = ipa_generate_password(user_pwdchars)
entry_attrs['userpassword'] = ipa_generate_password(baseuser_pwdchars)
# save the password so it can be displayed in post_callback
setattr(context, 'randompassword', entry_attrs['userpassword'])
if 'mail' in entry_attrs:
entry_attrs['mail'] = self.obj._normalize_and_validate_email(entry_attrs['mail'], config)
entry_attrs['mail'] = self.obj.normalize_and_validate_email(entry_attrs['mail'], config)
else:
# No e-mail passed in. If we have a default e-mail domain set
# then we'll add it automatically.
defaultdomain = config.get('ipadefaultemaildomain', [None])[0]
if defaultdomain:
entry_attrs['mail'] = self.obj._normalize_and_validate_email(keys[-1], config)
entry_attrs['mail'] = self.obj.normalize_and_validate_email(keys[-1], config)
if 'manager' in entry_attrs:
entry_attrs['manager'] = self.obj._normalize_manager(entry_attrs['manager'])
entry_attrs['manager'] = self.obj.normalize_manager(entry_attrs['manager'], self.obj.active_container_dn)
if 'userclass' in entry_attrs and \
'ipauser' not in entry_attrs['objectclass']:
@@ -847,7 +484,7 @@ class user_add(LDAPCreate):
except errors.AlreadyGroupMember:
pass
self.obj._convert_manager(entry_attrs, **options)
self.obj.convert_manager(entry_attrs, **options)
# delete description attribute NO_UPG_MAGIC if present
if options.get('noprivate', False):
if not options.get('all', False):
@@ -880,7 +517,7 @@ class user_add(LDAPCreate):
@register()
class user_del(LDAPDelete):
class user_del(baseuser_del):
__doc__ = _('Delete a user.')
msg_summary = _('Deleted user "%(value)s"')
@@ -905,12 +542,12 @@ class user_del(LDAPDelete):
@register()
class user_mod(LDAPUpdate):
class user_mod(baseuser_mod):
__doc__ = _('Modify a user.')
msg_summary = _('Modified user "%(value)s"')
has_output_params = LDAPUpdate.has_output_params + user_output_params
has_output_params = baseuser_mod.has_output_params + user_output_params
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
assert isinstance(dn, DN)
@@ -925,12 +562,12 @@ class user_mod(LDAPUpdate):
)
)
if 'mail' in entry_attrs:
entry_attrs['mail'] = self.obj._normalize_and_validate_email(entry_attrs['mail'])
entry_attrs['mail'] = self.obj.normalize_and_validate_email(entry_attrs['mail'])
if 'manager' in entry_attrs:
entry_attrs['manager'] = self.obj._normalize_manager(entry_attrs['manager'])
entry_attrs['manager'] = self.obj.normalize_manager(entry_attrs['manager'], self.obj.active_container_dn)
validate_nsaccountlock(entry_attrs)
if 'userpassword' not in entry_attrs and options.get('random'):
entry_attrs['userpassword'] = ipa_generate_password(user_pwdchars)
entry_attrs['userpassword'] = ipa_generate_password(baseuser_pwdchars)
# save the password so it can be displayed in post_callback
setattr(context, 'randompassword', entry_attrs['userpassword'])
if ('ipasshpubkey' in entry_attrs or 'ipauserauthtype' in entry_attrs
@@ -970,7 +607,7 @@ class user_mod(LDAPUpdate):
# if both randompassword and userpassword options were used
pass
convert_nsaccountlock(entry_attrs)
self.obj._convert_manager(entry_attrs, **options)
self.obj.convert_manager(entry_attrs, **options)
self.obj.get_password_attributes(ldap, dn, entry_attrs)
convert_sshpubkey_post(ldap, dn, entry_attrs)
radius_dn2pk(self.api, entry_attrs)
@@ -978,11 +615,11 @@ class user_mod(LDAPUpdate):
@register()
class user_find(LDAPSearch):
class user_find(baseuser_find):
__doc__ = _('Search for users.')
member_attributes = ['memberof']
has_output_params = LDAPSearch.has_output_params + user_output_params
has_output_params = baseuser_find.has_output_params + user_output_params
takes_options = LDAPSearch.takes_options + (
Flag('whoami',
@@ -995,7 +632,7 @@ class user_find(LDAPSearch):
# assure the manager attr is a dn, not just a bare uid
manager = options.get('manager')
if manager is not None:
options['manager'] = self.obj._normalize_manager(manager)
options['manager'] = self.obj.normalize_manager(manager, self.obj.active_container_dn)
# Ensure that the RADIUS config link is a dn, not just the name
cl = 'ipatokenradiusconfiglink'
@@ -1016,7 +653,7 @@ class user_find(LDAPSearch):
if options.get('pkey_only', False):
return truncated
for attrs in entries:
self.obj._convert_manager(attrs, **options)
self.obj.convert_manager(attrs, **options)
self.obj.get_password_attributes(ldap, attrs.dn, attrs)
convert_nsaccountlock(attrs)
convert_sshpubkey_post(ldap, attrs.dn, attrs)
@@ -1028,15 +665,15 @@ class user_find(LDAPSearch):
@register()
class user_show(LDAPRetrieve):
class user_show(baseuser_show):
__doc__ = _('Display information about a user.')
has_output_params = LDAPRetrieve.has_output_params + user_output_params
has_output_params = baseuser_show.has_output_params + user_output_params
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
assert isinstance(dn, DN)
convert_nsaccountlock(entry_attrs)
self.obj._convert_manager(entry_attrs, **options)
self.obj.convert_manager(entry_attrs, **options)
self.obj.get_password_attributes(ldap, dn, entry_attrs)
convert_sshpubkey_post(ldap, dn, entry_attrs)
radius_dn2pk(self.api, entry_attrs)