Delete plugins using old LDAP backend.

This commit is contained in:
Pavel Zuna
2009-06-16 13:16:03 +02:00
committed by Rob Crittenden
parent 9352d2fc10
commit 4b993782e6
15 changed files with 0 additions and 4081 deletions

View File

@@ -1,462 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Front-end plugins for managing DS ACIs
"""
from ipalib import api, crud, errors
from ipalib import Object, Command # Plugin base classes
from ipalib import Str, Flag, Int, StrEnum # Parameter types
from ipalib.aci import ACI
type_map = {
'user': 'ldap:///uid=*,%s,%s' % (api.env.container_user, api.env.basedn),
'group': 'ldap:///cn=*,%s,%s' % (api.env.container_group, api.env.basedn),
'host': 'ldap:///cn=*,%s,%s' % (api.env.container_host, api.env.basedn)
}
def make_aci(current, aciname, kw):
try:
taskgroup = api.Command['taskgroup_show'](kw['taskgroup'])
except errors.NotFound:
# The task group doesn't exist, let's be helpful and add it
tgkw = {'description':aciname}
taskgroup = api.Command['taskgroup_add'](kw['taskgroup'], **tgkw)
a = ACI(current)
a.name = aciname
a.permissions = kw['permissions'].replace(' ','').split(',')
a.set_bindrule("groupdn = \"ldap:///%s\"" % taskgroup['dn'])
if kw.get('attrs', None):
a.set_target_attr(kw['attrs'].split(','))
if kw.get('memberof', None):
group = api.Command['group_show'](kw['memberof'])
a.set_target_filter("memberOf=%s" % group['dn'].decode('UTF-8'))
if kw.get('type', None):
target = type_map[kw.get('type')]
a.set_target(target)
if kw.get('targetgroup', None):
# Purposely no try here so we'll raise a NotFound
group = api.Command['group_show'](kw.get('targetgroup'))
target = "ldap:///%s" % group.get('dn')
a.set_target(target)
if kw.get('subtree',None):
# See if the subtree is a full URI
target = kw.get('subtree')
if not target.startswith("ldap:///"):
target = "ldap:///" + target
a.set_target(target)
return a
def search_by_name(acis, aciname):
"""
Find an aci using the name field.
Must be an exact match of the entire name.
"""
for a in acis:
try:
t = ACI(a)
if t.name == aciname:
return str(t)
except SyntaxError, e:
# FIXME: need to log syntax errors, ignore for now
pass
raise errors.NotFound(reason="Unable to find aci %s" % aciname)
def search_by_attr(acis, attrlist):
"""
Find an aci by targetattr.
Returns an ACI list of all acis the attribute appears in.
"""
results = []
for a in acis:
try:
t = ACI(a)
for attr in attrlist:
attr = attr.lower()
for v in t.target['targetattr'].get('expression'):
if attr == v.lower():
results.append(str(t))
except SyntaxError, e:
# FIXME: need to log syntax errors, ignore for now
pass
if results:
return results
raise errors.NotFound(reason="Unable to find any ACIs with attribute %s" % ",".join(attrlist))
def search_by_taskgroup(acis, tgdn):
"""
Find an aci by taskgroup. This searches the ACI bind rule.
Returns an ACI list of all acis that match.
"""
results = []
for a in acis:
try:
t = ACI(a)
if t.bindrule['expression'] == "ldap:///" + tgdn:
results.append(str(t))
except SyntaxError, e:
# FIXME: need to log syntax errors, ignore for now
pass
if results:
return results
raise errors.NotFound(reason="taskgroup %s not found" % tgdn)
def search_by_perm(acis, permlist):
"""
Find an aci by permissions
Returns an ACI list of all acis the permission appears in.
"""
results = []
for a in acis:
try:
t = ACI(a)
for perm in permlist:
if perm.lower() in t.permissions:
results.append(str(t))
except SyntaxError, e:
# FIXME: need to log syntax errors, ignore for now
pass
if results:
return results
raise errors.NotFound(reason="No ACIs with permissions %s found" % ",".join(permlist))
def search_by_memberof(acis, memberoffilter):
"""
Find an aci by memberof
Returns an ACI list of all acis that has a matching memberOf as a
targetfilter.
"""
results = []
memberoffilter = memberoffilter.lower()
for a in acis:
try:
t = ACI(a)
try:
if memberoffilter == t.target['targetfilter'].get('expression').lower():
results.append(str(t))
except KeyError:
pass
except SyntaxError, e:
# FIXME: need to log syntax errors, ignore for now
pass
if results:
return results
raise errors.NotFound(reason="Nothing found for %s" % memberoffilter)
class aci(Object):
"""
ACI object.
"""
takes_params = (
Str('aciname',
doc='Name of ACI',
primary_key=True,
),
Str('taskgroup',
doc='Name of taskgroup this ACI grants access to',
),
StrEnum('permissions',
doc='Permissions to grant: read, write, add, delete, selfwrite, all',
values=(u'read', u'write', u'add', u'delete', u'selfwrite', u'all')
),
Str('attrs?',
doc='Comma-separated list of attributes',
),
StrEnum('type?',
doc='type of IPA object: user, group, host',
values=(u'user', u'group')
),
Str('memberof?',
doc='member of a group',
),
Str('filter?',
doc='A legal LDAP filter (ou=Engineering)',
),
Str('subtree?',
doc='A subtree to apply the ACI to',
),
Str('targetgroup?',
doc='Apply the ACI to a specific group',
),
)
api.register(aci)
class aci_add(crud.Create):
"""
Add a new aci.
"""
def execute(self, aciname, **kw):
"""
Execute the aci-add operation.
Returns the entry as it will be created in LDAP.
:param aciname: The name of the ACI being added.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'aciname' not in kw
ldap = self.api.Backend.ldap
newaci = make_aci(None, aciname, kw)
currentaci = ldap.retrieve(self.api.env.basedn, ['aci'])
acilist = currentaci.get('aci')
for a in acilist:
try:
b = ACI(a)
if newaci.isequal(b):
raise errors.DuplicateEntry()
except SyntaxError:
pass
acilist.append(str(newaci))
kwupdate = {'aci': acilist}
return ldap.update(currentaci.get('dn'), **kwupdate)
api.register(aci_add)
class aci_del(crud.Delete):
'Delete an existing aci.'
"""
Remove an aci by name.
"""
def execute(self, aciname, **kw):
"""
Execute the aci-del operation.
:param aciname: The name of the ACI being added.
:param kw: unused
"""
assert 'aciname' not in kw
ldap = self.api.Backend.ldap
currentaci = ldap.retrieve(self.api.env.basedn, ['aci'])
acilist = currentaci.get('aci')
a = search_by_name(acilist, aciname)
i = acilist.index(a)
del acilist[i]
kwupdate = {'aci': acilist}
return ldap.update(currentaci.get('dn'), **kwupdate)
def output_for_cli(self, textui, result, aciname):
"""
Output result of this command to command line interface.
"""
textui.print_plain('Deleted aci "%s"' % aciname)
api.register(aci_del)
class aci_mod(crud.Update):
'Edit an existing aci.'
def execute(self, aciname, **kw):
return "Not implemented"
def output_for_cli(self, textui, result, aciname, **options):
textui.print_plain(result)
api.register(aci_mod)
class aci_find(crud.Search):
'Search for a aci.'
takes_options = (
Str('bindrule?',
doc='The bindrule (e.g. ldap:///self)'
),
Flag('and?',
doc='Consider multiple options to be \"and\" so all are required.')
)
def execute(self, term, **kw):
ldap = self.api.Backend.ldap
currentaci = ldap.retrieve(self.api.env.basedn, ['aci'])
currentaci = currentaci.get('aci')
results = []
# aciname
if kw.get('aciname'):
try:
a = search_by_name(currentaci, kw.get('aciname'))
results = [a]
if kw.get('and'):
currentaci = results
except errors.NotFound:
if kw.get('and'):
results = []
currentaci = []
pass
# attributes
if kw.get('attrs'):
try:
attrs = kw.get('attrs')
attrs = attrs.replace(' ','').split(',')
a=search_by_attr(currentaci, attrs)
if kw.get('and'):
results = a
currentaci = results
else:
results = results + a
except errors.NotFound:
if kw.get('and'):
results = []
currentaci = []
pass
# taskgroup
if kw.get('taskgroup'):
try:
tg = api.Command['taskgroup_show'](kw.get('taskgroup'))
except errors.NotFound:
# FIXME, need more precise error
raise
try:
a=search_by_taskgroup(currentaci, tg.get('dn'))
if kw.get('and'):
results = a
currentaci = results
else:
results = results + a
except errors.NotFound:
if kw.get('and'):
results = []
currentaci = []
pass
# permissions
if kw.get('permissions'):
try:
permissions = kw.get('permissions')
permissions = permissions.replace(' ','').split(',')
a=search_by_perm(currentaci, permissions)
if kw.get('and'):
results = a
currentaci = results
else:
results = results + a
except errors.NotFound:
if kw.get('and'):
results = []
currentaci = []
pass
# memberOf
if kw.get('memberof'):
try:
group = api.Command['group_show'](kw['memberof'])
memberof = "(memberOf=%s)" % group['dn'].decode('UTF-8')
a=search_by_memberof(currentaci, memberof)
results = results + a
if kw.get('and'):
currentaci = results
except errors.NotFound:
if kw.get('and'):
results = []
currentaci = []
pass
# TODO
# --type=STR type of IPA object: user, group, host
# --filter=STR A legal LDAP filter (ou=Engineering)
# --subtree=STR A subtree to apply the ACI to
# --bindrule=STR A subtree to apply the ACI to
# Make sure we have no dupes in the list
results = list(set(results))
# the first entry contains the count
counter = len(results)
return [counter] + results
def output_for_cli(self, textui, result, term, **options):
counter = result[0]
acis = result[1:]
if counter == 0 or len(acis) == 0:
textui.print_plain("No entries found")
return
textui.print_name(self.name)
for a in acis:
textui.print_plain(a)
textui.print_count(acis, '%d acis matched')
api.register(aci_find)
class aci_show(crud.Retrieve):
'Examine an existing aci.'
def execute(self, aciname, **kw):
"""
Execute the aci-show operation.
Returns the entry
:param uid: The login name of the user to retrieve.
:param kw: unused
"""
ldap = self.api.Backend.ldap
currentaci = ldap.retrieve(self.api.env.basedn, ['aci'])
a = search_by_name(currentaci.get('aci'), aciname)
return str(a)
def output_for_cli(self, textui, result, aciname, **options):
textui.print_plain(result)
api.register(aci_show)
class aci_showall(Command):
'Examine all existing acis.'
def execute(self):
"""
Execute the aci-show operation.
Returns the entry
:param uid: The login name of the user to retrieve.
:param kw: unused
"""
ldap = self.api.Backend.ldap
return ldap.retrieve(self.api.env.basedn, ['aci'])
def output_for_cli(self, textui, result, **options):
textui.print_entry(result)
api.register(aci_showall)

View File

@@ -1,255 +0,0 @@
# Authors:
# Jakub Hrozek <jhrozek@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for application policy containers.
"""
from ipalib import api, crud
from ipalib import Object, Command # Plugin base classes
from ipalib import Str, StrEnum, Flag # Parameter types
def get_base_by_type(type):
if type == 'config':
return api.env.container_applications
if type == 'role':
return api.env.container_roles
class application(Object):
'Application object'
takes_params = (
Str('cn',
cli_name='appname',
primary_key=True,
doc='Application name',
),
Str('description?',
doc='Application description',
),
)
api.register(application)
# The default attributes to query
default_attributes = ['cn','description']
class application_create(crud.Create):
'Add a new application'
takes_options = (
StrEnum('type',
values=(u'config', u'role'),
doc='The type of the application',
),
)
def execute(self, cn, **kw):
"""
Execute the application-create operation
The dn should not be passed as a keyword argument, it
should be constructed by this method.
:param cn: The name of the application being added.
:param kw: Keyword arguments for the other LDAP attributes.
"""
self.log.info("IPA: application-create '%s'" % cn)
assert 'dn' not in kw
assert 'cn' not in kw
ldap = self.api.Backend.ldap
kw['objectClass'] = ['nsContainer', 'ipaContainer']
if kw['type'] == 'config':
kw['dn'] = ldap.make_application_dn(cn)
if kw['type'] == 'role':
kw['dn'] = ldap.make_role_application_dn(cn)
kw['cn'] = cn
del kw['type']
return ldap.create(**kw)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
textui.print_name(self.name)
textui.print_entry(result)
textui.print_dashed('Added application "%s"' % result['cn'])
api.register(application_create)
class application_find(crud.Search):
'Search for applications'
takes_options = (
StrEnum('type',
values=(u'config', u'role'),
doc='The type of the application',
),
Flag('all',
doc='Retrieve all application attributes'
),
)
def execute(self, term, **kw):
"""
Execute the application-find operation
"""
ldap = self.api.Backend.ldap
search_kw = dict()
search_kw['cn'] = term
search_kw['objectclass'] = 'ipaContainer'
search_kw['base'] = get_base_by_type(kw['type'])
search_kw['scope'] = 'one'
if kw.get('all', False):
search_kw['attributes'] = ['*']
else:
search_kw['attributes'] = default_attributes
return ldap.search(**search_kw)
def output_for_cli(self, textui, result, cn, **options):
"""
Output result of this command to command line interface.
"""
counter = result[0]
apps = result[1:]
if counter == 0 or len(apps) == 0:
textui.print_plain("No applications found")
return
if len(apps) == 1:
textui.print_entry(apps[0])
return
textui.print_name(self.name)
for a in apps:
textui.print_plain('%(cn)s:' % a)
textui.print_entry(a)
textui.print_plain('')
if counter == -1:
textui.print_plain('These results are truncated.')
textui.print_plain('Please refine your search and try again.')
textui.print_count(apps, '%d applications matched')
api.register(application_find)
class application_delete(crud.Del):
'Delete an application'
takes_options = (
StrEnum('type',
values=(u'config', u'role'),
doc='The type of the application',
),
)
def execute(self, cn, **kw):
"""
Delete the application container.
:param cn: The name of the application being deleted.
:param kw: Not used.
"""
if cn == "Shell Applications":
raise SyntaxError("Cannot delete shell application")
self.log.info("IPA: application_delete '%s'" % cn)
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn",
cn,
object_type='ipaContainer',
base=get_base_by_type(kw['type']))
return ldap.delete(dn)
def output_for_cli(self, textui, result, cn):
"""
Output result of this command to command line interface.
"""
textui.print_plain('Deleted application "%s"' % cn)
api.register(application_delete)
class application_show(crud.Get):
'Examine an existing application'
takes_options = (
StrEnum('type',
values=(u'config', u'role'),
doc='The type of the application',
),
Flag('all',
doc='Retrieve all application attributes'
),
)
def execute(self, cn, **kw):
"""
Execute the application-show operation.
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn",
cn,
object_type='ipaContainer',
base=get_base_by_type(kw['type']))
if kw.get('all', False):
return ldap.retrieve(dn)
else:
return ldap.retrieve(dn, default_attributes)
def output_for_cli(self, textui, result, cn, **options):
if result:
textui.print_entry(result)
api.register(application_show)
class application_edit(crud.Mod):
'Edit an existing application'
takes_options = (
StrEnum('type',
values=(u'config', u'role'),
doc='The type of the application',
),
)
def execute(self, cn, **kw):
"""
Execute the application-edit operation
:param cn: The name of the application to edit
:param kw: Keyword arguments for the other LDAP attributes.
"""
self.log.info("IPA: application_edit '%s'" % cn)
assert 'cn' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn",
cn,
object_type='ipaContainer',
base=get_base_by_type(kw['type']))
del kw['type']
return ldap.update(dn, **kw)
def output_for_cli(self, textui, result, cn, **options):
"""
Output result of this command to command line interface.
"""
textui.print_name(self.name)
textui.print_entry(result)
textui.print_dashed('Updated application "%s"' % result['cn'])
api.register(application_edit)

View File

@@ -1,692 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for automount.
RFC 2707bis http://www.padl.com/~lukeh/rfc2307bis.txt
A few notes on automount:
- It was a design decision to not support different maps by location
- The default parent when adding an indirect map is auto.master
- This uses the short format for automount maps instead of the
URL format. Support for ldap as a map source in nsswitch.conf was added
in autofs version 4.1.3-197. Any version prior to that is not expected
to work.
As an example, the following automount files:
auto.master:
/- auto.direct
/mnt auto.mnt
auto.mnt:
stuff -ro,soft,rsize=8192,wsize=8192 nfs.example.com:/vol/archive/stuff
are equivalent to the following LDAP entries:
# auto.master, automount, example.com
dn: automountmapname=auto.master,cn=automount,dc=example,dc=com
objectClass: automountMap
objectClass: top
automountMapName: auto.master
# auto.direct, automount, example.com
dn: automountmapname=auto.direct,cn=automount,dc=example,dc=com
objectClass: automountMap
objectClass: top
automountMapName: auto.direct
# /-, auto.master, automount, example.com
dn: automountkey=/-,automountmapname=auto.master,cn=automount,dc=example,dc=co
m
objectClass: automount
objectClass: top
automountKey: /-
automountInformation: auto.direct
# auto.mnt, automount, example.com
dn: automountmapname=auto.mnt,cn=automount,dc=example,dc=com
objectClass: automountMap
objectClass: top
automountMapName: auto.mnt
# /mnt, auto.master, automount, example.com
dn: automountkey=/mnt,automountmapname=auto.master,cn=automount,dc=example,dc=
com
objectClass: automount
objectClass: top
automountKey: /mnt
automountInformation: auto.mnt
# stuff, auto.mnt, automount, example.com
dn: automountkey=stuff,automountmapname=auto.mnt,cn=automount,dc=example,dc=com
objectClass: automount
objectClass: top
automountKey: stuff
automountInformation: -ro,soft,rsize=8192,wsize=8192 nfs.example.com:/vol/arch
ive/stuff
"""
from ldap import explode_dn
from ipalib import crud, errors
from ipalib import api, Str, Flag, Object, Command
map_attributes = ['automountMapName', 'description', ]
key_attributes = ['description', 'automountKey', 'automountInformation']
def display_entry(textui, entry):
# FIXME: for now delete dn here. In the future pass in the kw to
# output_for_cli()
attr = sorted(entry.keys())
for a in attr:
if a != 'dn':
textui.print_plain("%s: %s" % (a, entry[a]))
def make_automount_dn(mapname):
"""
Construct automount dn from map name.
"""
# FIXME, should this be in b_ldap?
# Experimenting to see what a plugin looks like for a 3rd party who can't
# modify the backend.
import ldap
return 'automountmapname=%s,%s,%s' % (
ldap.dn.escape_dn_chars(mapname),
api.env.container_automount,
api.env.basedn,
)
def make_ldap_map(ldapuri, mapname):
"""
Convert a map name into an LDAP name.
Note: This is unused currently. This would return map names as a
quasi ldap URL which will work with older autofs clients. We are
not currently supporting them.
"""
if not ldapuri:
return mapname
if mapname.find('ldap:') >= 0:
return mapname
return 'ldap:%s:%s' % (api.env.host, make_automount_dn(mapname))
class automount(Object):
"""
Automount object.
"""
takes_params = (
Str('automountmapname',
cli_name='mapname',
primary_key=True,
doc='A group of related automount objects',
),
)
api.register(automount)
class automount_addmap(crud.Add):
'Add a new automount map.'
takes_options = (
Str('description?',
doc='A description of the automount map'),
)
def execute(self, mapname, **kw):
"""
Execute the automount-addmap operation.
Returns the entry as it will be created in LDAP.
:param mapname: The map name being added.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'automountmapname' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
kw['automountmapname'] = mapname
kw['dn'] = make_automount_dn(mapname)
kw['objectclass'] = ['automountMap']
return ldap.create(**kw)
def output_for_cli(self, textui, result, map, **options):
"""
Output result of this command to command line interface.
"""
textui.print_plain("Automount map %s added" % map)
api.register(automount_addmap)
class automount_addkey(crud.Add):
'Add a new automount key.'
takes_options = (
Str('automountkey',
cli_name='key',
doc='An entry in an automount map'),
Str('automountinformation',
cli_name='info',
doc='Mount information for this key'),
Str('description?',
doc='A description of the mount'),
)
def execute(self, mapname, **kw):
"""
Execute the automount-addkey operation.
Returns the entry as it will be created in LDAP.
:param mapname: The map name being added to.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'automountmapname' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
config = ldap.get_ipa_config()
# use find_entry_dn instead of make_automap_dn so we can confirm that
# the map exists
map_dn = ldap.find_entry_dn("automountmapname", mapname, "automountmap", api.env.container_automount)
kw['dn'] = "automountkey=%s,%s" % (kw['automountkey'], map_dn)
kw['automountinformation'] = make_ldap_map(config.get('automountldapuri', False), kw['automountinformation'])
kw['objectclass'] = ['automount']
return ldap.create(**kw)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
textui.print_plain("Automount key added")
api.register(automount_addkey)
class automount_delmap(crud.Del):
'Delete an automount map.'
def execute(self, mapname, **kw):
"""Delete an automount map. This will also remove all of the keys
associated with this map.
mapname is the automount map to remove
:param mapname: The map to be removed
:param kw: Not used.
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("automountmapname", mapname, "automountmap", api.env.container_automount)
# First remove all the keys for this map so we don't leave orphans
keys = api.Command['automount_getkeys'](mapname)
for k in keys:
ldap.delete(k.get('dn'))
# Now remove the parental connection
try:
infodn = ldap.find_entry_dn("automountinformation", mapname, "automount", api.env.container_automount)
ldap.delete(infodn)
except errors.NotFound:
# direct maps may not have this
pass
return ldap.delete(dn)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
print "Automount map and associated keys deleted"
api.register(automount_delmap)
class automount_delkey(crud.Del):
'Delete an automount key.'
takes_options = (
Str('automountkey',
cli_name='key',
doc='The automount key to remove'),
)
def execute(self, mapname, **kw):
"""Delete an automount key.
key is the automount key to remove
:param mapname: The automount map containing the key to be removed
:param kw: "key" the key to be removed
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("automountmapname", mapname, "automountmap", api.env.container_automount)
keys = api.Command['automount_getkeys'](mapname)
keydn = None
keyname = kw.get('automountkey').lower()
if keys:
for k in keys:
if k.get('automountkey').lower() == keyname:
keydn = k.get('dn')
break
if not keydn:
raise errors.NotFound(reason='Removing keys failed. key %s not found' % keyname)
return ldap.delete(keydn)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
print "Automount key deleted"
api.register(automount_delkey)
class automount_modmap(crud.Mod):
'Edit an existing automount map.'
takes_options = (
Str('description?',
doc='A description of the automount map'),
)
def execute(self, mapname, **kw):
"""
Execute the automount-modmap operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param mapname: The map name to update.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'automountmapname' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("automountmapname", mapname, "automountmap", api.env.container_automount)
return ldap.update(dn, **kw)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
print "Automount map updated"
api.register(automount_modmap)
class automount_modkey(crud.Mod):
'Edit an existing automount key.'
takes_options = (
Str('automountkey',
cli_name='key',
doc='An entry in an automount map'),
Str('automountinformation?',
cli_name='info',
doc='Mount information for this key'),
Str('description?',
doc='A description of the automount map'),
)
def execute(self, mapname, **kw):
"""
Execute the automount-modkey operation.
Returns the entry
:param mapname: The map name to update.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'automountmapname' not in kw
assert 'dn' not in kw
keyname = kw.get('automountkey').lower()
del kw['automountkey']
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("automountmapname", mapname, "automountmap", api.env.container_automount)
keys = api.Command['automount_getkeys'](mapname)
keydn = None
if keys:
for k in keys:
if k.get('automountkey').lower() == keyname:
keydn = k.get('dn')
break
if not keydn:
raise errors.NotFound(reason='Update failed, unable to find key %s' % keyname)
return ldap.update(keydn, **kw)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
print "Automount key updated"
api.register(automount_modkey)
class automount_findmap(crud.Find):
'Search automount maps.'
takes_options = (
Flag('all', doc='Retrieve all attributes'),
)
def execute(self, term, **kw):
ldap = self.api.Backend.ldap
search_fields = map_attributes
for s in search_fields:
kw[s] = term
kw['objectclass'] = 'automountMap'
kw['base'] = api.env.container_automount
if kw.get('all', False):
kw['attributes'] = ['*']
else:
kw['attributes'] = map_attributes
return ldap.search(**kw)
def output_for_cli(self, textui, result, *args, **options):
counter = result[0]
entries = result[1:]
if counter == 0:
textui.print_plain("No entries found")
return
elif counter == -1:
textui.print_plain("These results are truncated.")
textui.print_plain("Please refine your search and try again.")
for e in entries:
display_entry(textui, e)
textui.print_plain("")
api.register(automount_findmap)
class automount_findkey(crud.Find):
'Search automount keys.'
takes_options = (
Flag('all?', doc='Retrieve all attributes'),
)
def get_args(self):
return (Str('automountkey',
cli_name='key',
doc='An entry in an automount map'),)
def execute(self, term, **kw):
ldap = self.api.Backend.ldap
search_fields = key_attributes
for s in search_fields:
kw[s] = term
kw['objectclass'] = 'automount'
kw['base'] = api.env.container_automount
if kw.get('all', False):
kw['attributes'] = ['*']
else:
kw['attributes'] = key_attributes
return ldap.search(**kw)
def output_for_cli(self, textui, result, *args, **options):
counter = result[0]
entries = result[1:]
if counter == 0:
textui.print_plain("No entries found")
return
elif counter == -1:
textui.print_plain("These results are truncated.")
textui.print_plain("Please refine your search and try again.")
for e in entries:
display_entry(textui, e)
textui.print_plain("")
api.register(automount_findkey)
class automount_showmap(crud.Get):
'Examine an existing automount map.'
takes_options = (
Flag('all?', doc='Retrieve all attributes'),
)
def execute(self, mapname, **kw):
"""
Execute the automount-showmap operation.
Returns the entry
:param mapname: The automount map to retrieve
:param kw: "all" set to True = return all attributes
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("automountmapname", mapname, "automountmap", api.env.container_automount)
# FIXME: should kw contain the list of attributes to display?
if kw.get('all', False):
return ldap.retrieve(dn)
else:
return ldap.retrieve(dn, map_attributes)
def output_for_cli(self, textui, result, *args, **options):
if result:
display_entry(textui, result)
api.register(automount_showmap)
class automount_showkey(crud.Get):
'Examine an existing automount key.'
takes_options = (
Str('automountkey',
cli_name='key',
doc='The automount key to display'),
Flag('all?', doc='Retrieve all attributes'),
)
def execute(self, mapname, **kw):
"""
Execute the automount-showkey operation.
Returns the entry
:param mapname: The mapname to examine
:param kw: "automountkey" the key to retrieve
:param kw: "all" set to True = return all attributes
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("automountmapname", mapname, "automountmap", api.env.container_automount)
keys = api.Command['automount_getkeys'](mapname)
keyname = kw.get('automountkey').lower()
keydn = None
if keys:
for k in keys:
if k.get('automountkey').lower() == keyname:
keydn = k.get('dn')
break
if not keydn:
raise errors.NotFound(reason='Unable to find key %s' % keyname)
# FIXME: should kw contain the list of attributes to display?
if kw.get('all', False):
return ldap.retrieve(keydn)
else:
return ldap.retrieve(keydn, key_attributes)
def output_for_cli(self, textui, result, *args, **options):
# The automount map name associated with this key is available only
# in the dn. Add it as an attribute to display instead.
if result and not result.get('automountmapname'):
elements = explode_dn(result.get('dn').lower())
for e in elements:
(attr, value) = e.split('=',1)
if attr == 'automountmapname':
result['automountmapname'] = value
display_entry(textui, result)
api.register(automount_showkey)
class automount_getkeys(Command):
'Retrieve all keys for an automount map.'
takes_args = (
Str('automountmapname',
cli_name='mapname',
primary_key=True,
doc='A group of related automount objects',
),
)
def execute(self, mapname, **kw):
"""
Execute the automount-getkeys operation.
Return a list of all automount keys for this mapname
:param mapname: Retrieve all keys for this mapname
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("automountmapname", mapname, "automountmap", api.env.container_automount)
try:
keys = ldap.get_one_entry(dn, 'objectclass=*', ['*'])
except errors.NotFound:
keys = []
return keys
def output_for_cli(self, textui, result, *args, **options):
for k in result:
textui.print_plain('%s' % k.get('automountkey'))
api.register(automount_getkeys)
class automount_getmaps(Command):
'Retrieve all automount maps'
takes_args = (
Str('automountmapname?',
cli_name='mapname',
primary_key=True,
doc='A group of related automount objects',
default=u'auto.master',
),
)
def execute(self, mapname, **kw):
"""
Execute the automount-getmaps operation.
Return a list of all automount maps.
"""
ldap = self.api.Backend.ldap
base = api.env.container_automount + "," + api.env.basedn
search_base = "automountmapname=%s,%s" % (mapname, base)
maps = ldap.get_one_entry(search_base, "objectClass=*", ["*"])
return maps
def output_for_cli(self, textui, result, *args, **options):
for k in result:
textui.print_plain('%s: %s' % (k.get('automountinformation'), k.get('automountkey')))
api.register(automount_getmaps)
class automount_addindirectmap(crud.Add):
"""
Add a new automap indirect mount point.
"""
takes_options = (
Str('parentmap?',
cli_name='parentmap',
default=u'auto.master',
autofill=True,
doc='The parent map to connect this to.',
),
Str('automountkey',
cli_name='key',
doc='An entry in an automount map',
),
Str('description?',
doc='A description of the automount map',
),
)
def execute(self, mapname, **kw):
"""
Execute the automount-addindirectmap operation.
Returns the key entry as it will be created in LDAP.
This function creates 2 LDAP entries. It creates an
automountmapname entry and an automountkey entry.
:param mapname: The map name being added.
:param kw['parentmap'] is the top-level map to add this to.
defaulting to auto.master
:param kw['automountkey'] is the mount point
:param kw['description'] is a textual description of this map
"""
mapkw = {}
if kw.get('description'):
mapkw['description'] = kw.get('description')
newmap = api.Command['automount_addmap'](mapname, **mapkw)
keykw = {'automountkey': kw['automountkey'], 'automountinformation': mapname}
if kw.get('description'):
keykw['description'] = kw.get('description')
newkey = api.Command['automount_addkey'](kw['parentmap'], **keykw)
return newkey
def output_for_cli(self, textui, result, map, **options):
"""
Output result of this command to command line interface.
"""
textui.print_plain("Indirect automount map %s added" % map)
api.register(automount_addindirectmap)
class automount_tofiles(Command):
'Generate the automount maps as they would be in the filesystem'
def execute(self, **kw):
"""
Execute the automount-getmaps operation.
Return a list of all automount maps.
"""
ldap = self.api.Backend.ldap
base = api.env.container_automount + "," + api.env.basedn
search_base = "automountmapname=auto.master,%s" % base
maps = ldap.get_one_entry(search_base, "objectClass=autoMount", ["*"])
mapkeys = {}
for m in maps:
keys = api.Command['automount_getkeys'](m.get('automountinformation').decode('UTF-8'))
mapkeys[m.get('automountinformation')] = keys
return maps, mapkeys
def output_for_cli(self, textui, result, **options):
maps = result[0]
keys = result[1]
textui.print_plain("/etc/auto.master:")
for m in maps:
textui.print_plain('%s\t/etc/%s' % (m.get('automountkey'), m.get('automountinformation')))
for m in maps:
textui.print_plain('---------------------------')
textui.print_plain('/etc/%s:' % m.get('automountinformation'))
mapkeys = keys.get(m.get('automountinformation'))
for k in mapkeys:
textui.print_plain('%s\t%s' % (k.get('automountkey'), k.get('automountinformation')))
api.register(automount_tofiles)

View File

@@ -1,422 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Base plugin for groups.
"""
from ipalib import api, crud, errors
from ipalib import Object, Command # Plugin base classes
from ipalib import Str, Int, Flag, List # Parameter types
from ldap.dn import escape_dn_chars
default_attributes = ('cn','description','member','memberof')
default_class = "groupofnames"
def find_members(ldap, failed, members, attribute, filter=None, base=None):
"""
Search for a list of members to operate on.
Returns a tuple of 2 lists: a list of DNs found, a list of errors
:param ldap: The ldap connection
:param failed: The current list of failed entries
:param members: list of members to find DNs for
:param attribute: The primary key attribute (cn, uid, etc)
:param filter: An LDAP filter to narrow the search
:param base: The search base DN
"""
found = []
for m in members:
if not m: continue
try:
member_dn = ldap.find_entry_dn(attribute, m, filter, base)
found.append(member_dn)
except errors.NotFound:
failed.append(m)
return found, failed
class BaseGroup(Object):
"""
Basic Group object.
"""
takes_params = (
Str('description',
doc='A description of this group',
attribute=True,
),
Str('cn',
cli_name='name',
primary_key=True,
normalizer=lambda value: value.lower(),
attribute=True,
),
)
def get_dn(self, cn):
"""
Construct group dn from cn.
"""
assert self.container
return 'cn=%s,%s,%s' % (
escape_dn_chars(cn),
self.container,
api.env.basedn,
)
class basegroup_add(crud.Add):
'Add a new group.'
base_classes = ("top", default_class)
def execute(self, cn, **kw):
"""
Execute a group add operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry as it will be created in LDAP.
:param cn: The name of the group being added.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'cn' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
entry = self.args_options_2_entry(cn, **kw)
entry['dn'] = self.obj.get_dn(cn)
if kw.get('objectclass'):
entry['objectclass'] = kw['objectclass']
else:
entry['objectclass'] = self.base_classes
return ldap.create(**entry)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
textui.print_name(self.name)
textui.print_entry(result)
textui.print_dashed('Added group "%s"' % result['cn'])
class basegroup_del(crud.Del):
'Delete an existing group.'
filter_class = default_class
container = None
def execute(self, cn, **kw):
"""
Delete a group
The memberOf plugin handles removing the group from any other
groups.
:param cn: The name of the group being removed
:param kw: Unused
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
return ldap.delete(dn)
def output_for_cli(self, textui, result, cn):
"""
Output result of this command to command line interface.
"""
textui.print_plain("Deleted group %s" % cn)
class basegroup_mod(crud.Mod):
'Edit an existing group.'
filter_class = default_class
container = None
def execute(self, cn, **kw):
"""
Execute the group-mod operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param cn: The name of the group to update.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'cn' not in kw
assert 'dn' not in kw
assert self.container
assert self.filter_class
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
return ldap.update(dn, **kw)
def output_for_cli(self, textui, result, cn, **options):
"""
Output result of this command to command line interface.
"""
textui.print_plain("Group updated")
class basegroup_find(crud.Find):
'Search the groups.'
filter_class = default_class
searchfields = []
container = None
def execute(self, term, **kw):
ldap = self.api.Backend.ldap
if not self.searchfields:
# Pull the list of searchable attributes out of the configuration.
config = ldap.get_ipa_config()
search_fields_conf_str = config.get('ipagroupsearchfields')
search_fields = search_fields_conf_str.split(",")
else:
search_fields = self.searchfields
search_kw = {}
for s in search_fields:
search_kw[s] = term
if self.filter_class and not kw.get('objectclass'):
search_kw['objectclass'] = self.filter_class
if self.container and not kw.get('base'):
search_kw['base'] = self.container
return ldap.search(**search_kw)
def output_for_cli(self, textui, result, criteria, **options):
counter = result[0]
groups = result[1:]
if counter == 0 or len(groups) == 0:
textui.print_plain("No entries found")
return
if len(groups) == 1:
textui.print_entry(groups[0])
return
textui.print_name(self.name)
for g in groups:
textui.print_entry(g)
textui.print_plain('')
if counter == -1:
textui.print_plain("These results are truncated.")
textui.print_plain("Please refine your search and try again.")
textui.print_count(groups, '%d groups matched')
class basegroup_show(crud.Get):
'Examine an existing group.'
filter_class = default_class
default_attributes = default_attributes
container = None
takes_options = (
Flag('all', doc='Retrieve all attributes'),
)
def execute(self, cn, **kw):
"""
Execute the group-show operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param cn: The group name to retrieve.
:param kw: Not used.
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
# FIXME: should kw contain the list of attributes to display?
if kw.get('all', False):
return ldap.retrieve(dn)
else:
return ldap.retrieve(dn, self.default_attributes)
def output_for_cli(self, textui, result, *args, **options):
textui.print_entry(result)
class basegroup_add_member(Command):
'Add a member to a group.'
takes_args = (
Str('group', primary_key=True),
)
takes_options = (
List('users?', doc='comma-separated list of users to add'),
List('groups?', doc='comma-separated list of user groups to add'),
)
container = None
filter_class = default_class
def _find_members(self, ldap, failed, members, attribute, filter=None, base=None):
return find_members(ldap, failed, members, attribute, filter, base)
def _add_members(self, ldap, completed, members, add_failed, dn, memberattr):
"""
Add members to a group.
Returns a tuple of the # completed and those that weren't added
:param ldap: The ldap connection
:param completed: number of entries successfully added
:param members: list of member DNs to add
:param add_failed: members who failed to be added
:param dn: DN of group to add members to
:param membetattr: The attribute where members are stored
"""
for member_dn in members:
if not member_dn: continue
try:
ldap.add_member_to_group(member_dn, dn, memberattr)
completed+=1
except:
add_failed.append(member_dn)
return completed, add_failed
def execute(self, cn, **kw):
"""
Execute the group-add-member operation.
Returns the updated group entry
:param cn: The group name to add new members to.
:param kw: groups is a comma-separated list of groups to add
:param kw: users is a comma-separated list of users to add
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
add_failed = []
to_add = []
completed = 0
members = kw.get('groups', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", "ipaUserGroup", self.api.env.container_group)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "member")
members = kw.get('users', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "uid", "posixAccount", self.api.env.container_user)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "member")
return add_failed
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
if result:
textui.print_plain("These entries failed to add to the group:")
for a in result:
textui.print_plain("\t'%s'" % a)
else:
textui.print_plain("members added.")
class basegroup_remove_member(Command):
'Remove a member from a group.'
container = None
filter_class = default_class
takes_args = (
Str('group', primary_key=True),
)
takes_options = (
List('users?', doc='comma-separated list of users to remove'),
List('groups?', doc='comma-separated list of user groups to remove'),
)
def _find_members(self, ldap, failed, members, attribute, filter=None, base=None):
return find_members(ldap, failed, members, attribute, filter, base)
def _remove_members(self, ldap, completed, members, remove_failed, dn, memberattr):
"""
Add members to a group.
Returns a tuple of the # completed and those that weren't added
:param ldap: The ldap connection
:param completed: number of entries successfully removed
:param members: list of member DNs to remove
:param remove_failed: members who failed to be removed
:param dn: DN of group to remove members from
:param membetattr: The attribute where members are stored
"""
for member_dn in members:
if not member_dn: continue
try:
ldap.remove_member_from_group(member_dn, dn, memberattr)
completed+=1
except:
remove_failed.append(member_dn)
return completed, remove_failed
def execute(self, cn, **kw):
"""
Execute the group-remove-member operation.
Returns the members that could not be added
:param cn: The group name to add new members to.
:param kw: groups is a comma-separated list of groups to remove
:param kw: users is a comma-separated list of users to remove
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
to_remove = []
remove_failed = []
completed = 0
members = kw.get('groups', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", "ipaUserGroup", self.api.env.container_group)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "member")
members = kw.get('users', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "uid", "posixAccount", self.api.env.container_user)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "member")
return remove_failed
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
if result:
textui.print_plain("These entries failed to be removed from the group:")
for a in result:
textui.print_plain("\t'%s'" % a)
else:
textui.print_plain("members removed.")

View File

@@ -1,137 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugin for default options in IPA.
"""
from ipalib import api
from ipalib import Command # Plugin base classes
from ipalib import Str, Int # Parameter types
class defaultoptions_mod(Command):
"""
Options command.
"""
takes_options = (
Int('ipamaxusernamelength?',
cli_name='maxusername',
doc='Max. Username length',
minvalue=1
),
Str('ipahomesrootdir?',
cli_name='homedirectory',
doc='Default location of home directories'
),
Str('ipadefaultloginshell?',
cli_name='defaultshell',
doc='Default shell for new users'
),
Str('ipadefaultprimarygroup?',
cli_name='defaultgroup',
doc='Default group for new users'
),
Str('ipadefaultemaildomain?',
cli_name='emaildomain',
doc='Default e-mail domain new users'
),
Int('ipasearchtimelimit?',
cli_name='searchtimelimit',
doc='Max. amount of time (sec.) for a search (-1 is unlimited)',
minvalue=-1,
),
Int('ipasearchrecordslimit?',
cli_name='searchrecordslimit',
doc='Max. number of records to search (-1 is unlimited)',
minvalue=-1,
),
Str('ipausersearchfields?',
cli_name='usersearch',
doc='A comma-separated list of fields to search when searching for users'
),
Str('ipagroupsearchfields?',
cli_name='groupsearch',
doc='A comma-separated list of fields to search when searching for groups'
),
)
def execute(self, *args, **kw):
"""
Execute the defaultoptions-mod operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param args: This function takes no positional arguments
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'dn' not in kw
ldap = self.api.Backend.ldap
config = ldap.get_ipa_config()
dn = config.get('dn')
# The LDAP routines want strings, not ints, so convert a few
# things. Otherwise it sees a string -> int conversion as a change.
for k in kw.iterkeys():
if k.startswith("ipa", 0, 3) and type(kw[k]) is int:
kw[k] = str(kw[k])
return ldap.update(dn, **kw)
def output_for_cli(self, textui, result, *args, **options):
textui.print_plain("Default options modified")
api.register(defaultoptions_mod)
class defaultoptions_show(Command):
'Retrieve current default options'
def execute(self, *args, **kw):
"""
Execute the defaultoptions-show operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param args: Not used.
:param kw: Not used.
"""
ldap = self.api.Backend.ldap
return ldap.get_ipa_config()
def output_for_cli(self, textui, result, *args, **options):
textui.print_plain("Search Configuration")
textui.print_plain(" Search Time Limit (sec.): %s" % result.get('ipasearchtimelimit'))
textui.print_plain(" Search Records Limit: %s" % result.get('ipasearchrecordslimit'))
textui.print_plain(" User Search Fields: %s" % result.get('ipausersearchfields'))
textui.print_plain(" Group Search Fields: %s" % result.get('ipagroupsearchfields'))
textui.print_plain("")
textui.print_plain("User Settings")
textui.print_plain(" Max. Username Length: %s" % result.get('ipamaxusernamelength'))
textui.print_plain(" Root for Home Directories: %s" % result.get('ipahomesrootdir'))
textui.print_plain(" Default Shell: %s" % result.get('ipadefaultloginshell'))
textui.print_plain(" Default User Group: %s" % result.get('ipadefaultprimarygroup'))
textui.print_plain("Default E-mail Domain: %s" % result.get('ipadefaultemaildomain'))
api.register(defaultoptions_show)

View File

@@ -1,216 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for groups.
"""
from ipalib import api
from ipalib.plugins.basegroup import *
container_group = api.env.container_group
display_attributes = ['cn','description','gidnumber','member','memberof']
default_class = 'ipaUserGroup'
class group(BaseGroup):
"""
group object.
"""
container=container_group
takes_params = BaseGroup.takes_params + (
Int('gidnumber?',
cli_name='gid',
doc='The gid to use for this group. If not included one is automatically set.',
attribute=True,
),
)
api.register(group)
class group_add(basegroup_add):
'Add a new group.'
takes_options = (
Flag('posix',
doc='Create as a posix group',
attribute=False,
),
)
def execute(self, cn, **kw):
"""
Execute the group-add operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry as it will be created in LDAP.
No need to explicitly set gidNumber. The dna_plugin will do this
for us if the value isn't provided by the caller.
:param cn: The name of the group being added.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'cn' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
"""
entry = self.args_options_2_entry(cn, **kw)
entry['dn'] = ldap.make_group_dn(cn)
"""
# Get our configuration
config = ldap.get_ipa_config()
# some required objectclasses
kw['objectclass'] = config.get('ipagroupobjectclasses')
if kw.get('posix') or kw.get('gidnumber'):
kw['objectclass'].append('posixGroup')
if kw.has_key('posix'):
del kw['posix']
return super(group_add, self).execute(cn, **kw)
api.register(group_add)
class group_del(basegroup_del):
'Delete an existing group.'
container = container_group
filter_class = default_class
def execute(self, cn, **kw):
"""
Delete a group
The memberOf plugin handles removing the group from any other
groups.
:param cn: The name of the group being removed
:param kw: Unused
"""
# We have 2 special groups, don't allow them to be removed
# if "admins" == cn.lower() or "editors" == cn.lower():
# raise ipaerror.gen_exception(ipaerror.CONFIG_REQUIRED_GROUPS)
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class)
# Don't allow the default user group to be removed
try:
config=ldap.get_ipa_config()
default_group = ldap.find_entry_dn("cn", config.get('ipadefaultprimarygroup'), self.filter_class)
if dn == default_group:
raise errors.DefaultGroup
except errors.NotFound:
pass
return super(group_del, self).execute(cn, **kw)
api.register(group_del)
class group_mod(basegroup_mod):
'Edit an existing group.'
container = container_group
filter_class = default_class
takes_options = (
Flag('posix',
doc='Make this group a posix group',
attribute=False,
),
)
def execute(self, cn, **kw):
"""
Execute the group-mod operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param cn: The name of the group to update.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'cn' not in kw
assert 'dn' not in kw
oldgroup = None
if kw.has_key('gidnumber') or kw.get('posix'):
groupkw = {'all': True}
oldgroup = api.Command['group_show'](cn, **groupkw)
# Are we promoting a non-posix group into a posix one? We just
# need to add the posixGroup objectclass to the list and the
# DNA plugin will handle assigning a new gidNumber for us.
if kw.get('posix'):
if oldgroup.get('gidnumber'):
raise errors.AlreadyPosixGroup
else:
oldgroup['objectclass'].append('posixgroup')
kw['objectclass'] = oldgroup['objectclass']
if kw.has_key('gidnumber') and not oldgroup.has_key('gidnumber'):
oldgroup['objectclass'].append('posixgroup')
kw['objectclass'] = oldgroup['objectclass']
if kw.has_key('posix'):
# we want this gone whether it is True or False
del kw['posix']
if isinstance(kw.get('gidnumber',''), int):
# python-ldap wants this as a string
kw['gidnumber'] = str(kw['gidnumber'])
return super(group_mod, self).execute(cn, **kw)
api.register(group_mod)
class group_find(basegroup_find):
'Search the groups.'
default_attributes = display_attributes
container = container_group
filter_class = default_class
api.register(group_find)
class group_show(basegroup_show):
'Examine an existing group.'
default_attributes = display_attributes
container = container_group
api.register(group_show)
class group_add_member(basegroup_add_member):
'Add a member to a group.'
container = container_group
api.register(group_add_member)
class group_remove_member(basegroup_remove_member):
'Remove a member from a group.'
container = container_group
api.register(group_remove_member)

View File

@@ -1,319 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for host/machine Identity.
"""
from ipalib import api, crud, errors, util
from ipalib import Object # Plugin base class
from ipalib import Str, Flag # Parameter types
import sys
import os
import platform
def get_host(hostname):
"""
Try to get the hostname as fully-qualified first, then fall back to
just a host name search.
"""
ldap = api.Backend.ldap
# Strip off trailing dot
if hostname.endswith('.'):
hostname = hostname[:-1]
try:
dn = ldap.find_entry_dn("fqdn", hostname, "ipaHost")
except errors.NotFound:
dn = ldap.find_entry_dn("serverhostname", hostname, "ipaHost")
return dn
def validate_host(ugettext, fqdn):
"""
Require at least one dot in the hostname (to support localhost.localdomain)
"""
dots = len(fqdn.split('.'))
if dots < 2:
return 'Fully-qualified hostname required'
return None
default_attributes = ['fqdn','description','localityname','nshostlocation','nshardwareplatform','nsosversion']
def determine_os():
(sysname, nodename, release, version, machine) = os.uname()
if sys.platform == "linux2":
# something like 'fedora 9 Sulpher'
return unicode(" ".join(platform.dist()))
else:
# on Solaris this will be: 'SunOS 5.10'
return unicode(sysname + " " + release)
def determine_platform():
(sysname, nodename, release, version, machine) = os.uname()
return unicode(machine)
class host(Object):
"""
Host object.
"""
takes_params = (
Str('fqdn', validate_host,
cli_name='hostname',
primary_key=True,
normalizer=lambda value: value.lower(),
),
Str('description?',
doc='Description of the host',
),
Str('localityname?',
cli_name='locality',
doc='Locality of this host (Baltimore, MD)',
),
Str('nshostlocation?',
cli_name='location',
doc='Location of this host (e.g. Lab 2)',
),
Str('nshardwareplatform?',
cli_name='platform',
doc='Hardware platform of this host (e.g. Lenovo T61)',
default=determine_platform(),
autofill=True,
),
Str('nsosversion?',
cli_name='os',
doc='Operating System and version on this host (e.g. Fedora 9)',
default=determine_os(),
autofill=True,
),
Str('userpassword?',
cli_name='password',
doc='Set a password to be used in bulk enrollment',
),
)
api.register(host)
class host_add(crud.Add):
'Add a new host.'
def execute(self, hostname, **kw):
"""
Execute the host-add operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
If password is set then this is considered a 'bulk' host so we
do not create a kerberos service principal.
Returns the entry as it will be created in LDAP.
:param hostname: The name of the host being added.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'fqdn' not in kw
assert 'cn' not in kw
assert 'dn' not in kw
assert 'krbprincipalname' not in kw
ldap = self.api.Backend.ldap
kw['fqdn'] = hostname
kw['cn'] = hostname
kw['serverhostname'] = hostname.split('.',1)[0]
kw['dn'] = ldap.make_host_dn(hostname)
# FIXME: do a DNS lookup to ensure host exists
current = util.get_current_principal()
if not current:
raise errors.NotFound(reason='Unable to determine current user')
kw['enrolledby'] = ldap.find_entry_dn("krbPrincipalName", current, "posixAccount")
# Get our configuration
config = ldap.get_ipa_config()
# some required objectclasses
# FIXME: add this attribute to cn=ipaconfig
#kw['objectclass'] = config.get('ipahostobjectclasses')
kw['objectclass'] = ['nsHost', 'ipaHost', 'pkiUser']
# Ensure the list of objectclasses is lower-case
kw['objectclass'] = map(lambda z: z.lower(), kw.get('objectclass'))
if not kw.get('userpassword', False):
kw['krbprincipalname'] = "host/%s@%s" % (hostname, self.api.env.realm)
if 'krbprincipalaux' not in kw.get('objectclass'):
kw['objectclass'].append('krbprincipalaux')
kw['objectclass'].append('krbprincipal')
else:
if 'krbprincipalaux' in kw.get('objectclass'):
kw['objectclass'].remove('krbprincipalaux')
return ldap.create(**kw)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
textui.print_plain("Host added")
api.register(host_add)
class host_del(crud.Del):
'Delete an existing host.'
def execute(self, hostname, **kw):
"""Delete a host.
hostname is the name of the host to delete
:param hostname: The name of the host being removed.
:param kw: Not used.
"""
ldap = self.api.Backend.ldap
dn = get_host(hostname)
# Remove all service records for this host
services=api.Command['service_find'](hostname, **{})
counter = services[0]
services = services[1:]
if counter > 0:
for s in services:
principal = s.get('krbprincipalname').decode('UTF-8')
api.Command['service_del'](principal, **{})
return ldap.delete(dn)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
textui.print_plain("Host deleted")
api.register(host_del)
class host_mod(crud.Mod):
'Edit an existing host.'
def execute(self, hostname, **kw):
"""
Execute the host-mod operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param hostname: The name of the host to retrieve.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'fqdn' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
dn = get_host(hostname)
return ldap.update(dn, **kw)
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
textui.print_plain("Host updated")
api.register(host_mod)
class host_find(crud.Find):
'Search the hosts.'
takes_options = (
Flag('all', doc='Retrieve all attributes'),
)
# FIXME: This should no longer be needed with the Param.query kwarg.
# def get_args(self):
# """
# Override Find.get_args() so we can exclude the validation rules
# """
# yield self.obj.primary_key.__clone__(rules=tuple())
def execute(self, term, **kw):
ldap = self.api.Backend.ldap
# Pull the list of searchable attributes out of the configuration.
#config = ldap.get_ipa_config()
# FIXME: add this attribute to cn=ipaconfig
#search_fields_conf_str = config.get('ipahostsearchfields')
#search_fields = search_fields_conf_str.split(",")
search_fields = ['fqdn','serverhostname','description','localityname','nshostlocation','nshardwareplatform','nsosversion']
search_kw = {}
for s in search_fields:
search_kw[s] = term
search_kw['objectclass'] = "ipaHost"
if kw.get('all', False):
search_kw['attributes'] = ['*']
else:
search_kw['attributes'] = default_attributes
return ldap.search(**search_kw)
def output_for_cli(self, textui, result, *args, **options):
counter = result[0]
hosts = result[1:]
if counter == 0:
textui.print_plain("No entries found")
return
for h in hosts:
textui.print_entry(h)
if counter == -1:
textui.print_plain("These results are truncated.")
textui.print_plain("Please refine your search and try again.")
api.register(host_find)
class host_show(crud.Get):
'Examine an existing host.'
takes_options = (
Flag('all', doc='Display all host attributes'),
)
def execute(self, hostname, **kw):
"""
Execute the host-show operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param hostname: The login name of the host to retrieve.
:param kw: "all" set to True = return all attributes
"""
ldap = self.api.Backend.ldap
dn = get_host(hostname)
# FIXME: should kw contain the list of attributes to display?
if kw.get('all', False):
return ldap.retrieve(dn)
else:
value = ldap.retrieve(dn, default_attributes)
del value['dn']
return value
def output_for_cli(self, textui, result, *args, **options):
textui.print_entry(result)
api.register(host_show)

View File

@@ -1,156 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for hostgroups.
"""
from ipalib import api
from ipalib.plugins.basegroup import *
container_hostgroup = api.env.container_hostgroup
default_class="ipaHostGroup"
class hostgroup(BaseGroup):
"""
hostgroup object.
"""
container=container_hostgroup
api.register(hostgroup)
class hostgroup_add(basegroup_add):
'Add a new hostgroup.'
base_classes = ("top", "groupofnames", "ipaHostGroup")
api.register(hostgroup_add)
class hostgroup_del(basegroup_del):
'Delete an existing hostgroup.'
container = container_hostgroup
api.register(hostgroup_del)
class hostgroup_mod(basegroup_mod):
'Edit an existing hostgroup.'
container = container_hostgroup
api.register(hostgroup_mod)
class hostgroup_find(basegroup_find):
'Search the groups.'
container = container_hostgroup
api.register(hostgroup_find)
class hostgroup_show(basegroup_show):
'Examine an existing hostgroup.'
container = container_hostgroup
api.register(hostgroup_show)
class hostgroup_add_member(basegroup_add_member):
'Add a member to a hostgroup.'
container = container_hostgroup
takes_options = (
List('groups?', doc='comma-separated list of user groups to add'),
List('hosts?', doc='comma-separated list of hosts to add'),
List('hostgroups?', doc='comma-separated list of hostgroups to add'),
)
def execute(self, cn, **kw):
"""
Execute the group-add-member operation.
Returns the updated group entry
:param cn: The group name to add new members to.
:param kw: groups is a comma-separated list of groups to add
:parem kw: users is a comma-separated list of users to add
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
add_failed = []
to_add = []
completed = 0
# Do the base class additions first. Note that the super function
# also supports users but since we never pass any in that code
# will never execute
add_failed = super(hostgroup_add_member, self).execute(cn, **kw)
members = kw.get('hosts', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", "ipaHost", self.api.env.container_host)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "member")
members = kw.get('hostgroups', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", default_class, self.api.env.container_hostgroup)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "member")
return add_failed
api.register(hostgroup_add_member)
class hostgroup_remove_member(basegroup_remove_member):
'Remove a member from a hostgroup.'
container = container_hostgroup
takes_options = (
List('groups?', doc='comma-separated list of user groups to add'),
List('hosts?', doc='comma-separated list of hosts to add'),
List('hostgroups?', doc='comma-separated list of hostgroups to add'),
)
def execute(self, cn, **kw):
"""
Execute the group-remove-member operation.
Returns the updated group entry
:param cn: The group name to remove new members from.
:parem kw: users is a comma-separated list of users to remove
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
remove_failed = []
to_remove = []
completed = 0
# Do the base class removals first
remove_failed = super(hostgroup_remove_member, self).execute(cn, **kw)
members = kw.get('hosts', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", "ipaHost", self.api.env.container_host)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "member")
members = kw.get('hostgroups', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", default_class, self.api.env.container_hostgroup)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "member")
return remove_failed
api.register(hostgroup_remove_member)

View File

@@ -1,288 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for netgroups.
"""
from ipalib import api
from ipalib.plugins.basegroup import *
from ipalib import uuid
display_attributes = ['cn','description','memberhost','externalhost','memberuser','member']
container_netgroup = "cn=ng, cn=alt"
default_class = "ipaNISNetgroup"
class netgroup(BaseGroup):
"""
netgroup object.
"""
container=container_netgroup
takes_params = (
Str('description',
doc='A description of this group',
attribute=True,
),
Str('cn',
cli_name='name',
primary_key=True,
normalizer=lambda value: value.lower(),
attribute=True,
),
Str('nisdomainname',
doc='The NIS domain name',
attribute=True,
),
)
api.register(netgroup)
class netgroup_add(basegroup_add):
'Add a new netgroup.'
def execute(self, cn, **kw):
"""
Execute the netgroup-add operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry as it will be created in LDAP.
:param cn: The name of the netgroup
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'cn' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
kw['cn'] = cn
kw['ipauniqueid'] = str(uuid.uuid1())
kw['dn'] = "ipauniqueid=%s,%s,%s" % (kw['ipauniqueid'], container_netgroup, api.env.basedn)
if not kw.get('nisdomainname', False):
kw['nisdomainname'] = api.env.domain
# some required objectclasses
kw['objectclass'] = ['top', 'ipaAssociation', 'ipaNISNetgroup']
return ldap.create(**kw)
api.register(netgroup_add)
class netgroup_del(basegroup_del):
'Delete an existing netgroup.'
container = container_netgroup
filter_class = default_class
api.register(netgroup_del)
class netgroup_mod(basegroup_mod):
'Edit an existing netgroup.'
container = container_netgroup
filter_class = default_class
api.register(netgroup_mod)
class netgroup_find(basegroup_find):
'Search the groups.'
container = container_netgroup
filter_class = default_class
api.register(netgroup_find)
class netgroup_show(basegroup_show):
'Examine an existing netgroup.'
default_attributes = display_attributes
container = container_netgroup
filter_class = default_class
api.register(netgroup_show)
class netgroup_add_member(basegroup_add_member):
'Add a member to a netgroup.'
default_attributes = display_attributes
container = container_netgroup
filter_class = default_class
takes_options = basegroup_add_member.takes_options + (
List('hosts?', doc='comma-separated list of hosts to add'),
List('hostgroups?', doc='comma-separated list of host groups to add'),
List('netgroups?', doc='comma-separated list of netgroups to add'),
)
def _add_external(self, ldap, completed, members, cn):
failed = []
kw = {"all": True}
netgroup = api.Command['netgroup_show'](cn, **kw)
external = netgroup.get('externalhost', [])
if not isinstance(external, list):
external = [external]
external_len = len(external)
for m in members:
if not m in external:
external.append(m)
completed+=1
else:
failed.append(m)
if len(external) > external_len:
kw = {'externalhost': external}
ldap.update(netgroup['dn'], **kw)
return completed, failed
def execute(self, cn, **kw):
"""
Execute the group-add-member operation.
Returns the updated group entry
:param cn: The group name to add new members to.
:param kw: groups is a comma-separated list of groups to add
:param kw: users is a comma-separated list of users to add
:param kw: hostgroups is a comma-separated list of hostgroups to add
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
add_failed = []
to_add = []
completed = 0
# Hosts
members = kw.get('hosts', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", "ipaHost")
# If a host is not found we'll consider it an externalHost. It will
# be up to the user to handle typos
if add_failed:
(completed, add_failed) = self._add_external(ldap, completed, add_failed, cn)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "memberhost")
# Hostgroups
members = kw.get('hostgroups', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", "ipaHostGroup", self.api.env.container_hostgroup)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "memberhost")
# User
members = kw.get('users', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "uid", "posixAccount", self.api.env.container_user)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "memberuser")
# Groups
members = kw.get('groups', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", "ipaUserGroup", self.api.env.container_group)
(completed, failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "memberuser")
# Netgroups
members = kw.get('netgroups', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", self.filter_class, self.container)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "member")
return add_failed
api.register(netgroup_add_member)
class netgroup_remove_member(basegroup_remove_member):
'Remove a member from a netgroup.'
default_attributes = display_attributes
container = container_netgroup
filter_class = default_class
takes_options = basegroup_remove_member.takes_options + (
List('hosts?', doc='comma-separated list of hosts to remove'),
List('hostgroups?', doc='comma-separated list of host groups to remove'),
List('netgroups?', doc='comma-separated list of netgroups to remove'),
)
def _remove_external(self, ldap, completed, members, cn):
failed = []
kw = {"all": True}
netgroup = api.Command['netgroup_show'](cn, **kw)
external = netgroup.get('externalhost', [])
if not isinstance(external, list):
external = [external]
external_len = len(external)
for m in members:
try:
external.remove(m)
completed+=1
except ValueError:
failed.append(m)
if len(external) < external_len:
kw = {'externalhost': external}
ldap.update(netgroup['dn'], **kw)
return completed, failed
def execute(self, cn, **kw):
"""
Execute the group-remove-member operation.
Returns the updated group entry
:param cn: The group name to remove new members to.
:param kw: groups is a comma-separated list of groups to remove
:param kw: users is a comma-separated list of users to remove
:param kw: hostgroups is a comma-separated list of hostgroups to remove
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
remove_failed = []
to_remove = []
completed = 0
# Hosts
members = kw.get('hosts', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", "ipaHost")
# If a host is not found we'll consider it an externalHost. It will
# be up to the user to handle typos
if remove_failed:
(completed, remove_failed) = self._remove_external(ldap, completed, remove_failed, cn)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "memberhost")
# Hostgroups
members = kw.get('hostgroups', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", "ipaHostGroup", self.api.env.container_hostgroup)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "memberhost")
# User
members = kw.get('users', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "uid", "posixAccount", self.api.env.container_user)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "memberuser")
# Groups
members = kw.get('groups', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", "ipaUserGroup", self.api.env.container_group)
(completed, failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "memberuser")
# Netgroups
members = kw.get('netgroups', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", self.filter_class, self.container)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "member")
return remove_failed
api.register(netgroup_remove_member)

View File

@@ -1,71 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for password changes.
"""
from ipalib import api, errors, util
from ipalib import Command # Plugin base classes
from ipalib import Str, Password # Parameter types
class passwd(Command):
'Change a user password.'
takes_args = (
Str('principal',
cli_name='user',
primary_key=True,
autofill=True,
create_default=lambda **kw: util.get_current_principal(),
),
Password('password'),
)
def execute(self, principal, password):
"""
Execute the passwd operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param principal: The login name or principal of the user
:param password: the new password
"""
if principal.find('@') > 0:
u = principal.split('@')
if len(u) > 2:
raise errors.MalformedUserPrincipal(principal=principal)
else:
principal = principal+"@"+self.api.env.realm
dn = self.Backend.ldap.find_entry_dn(
"krbprincipalname",
principal,
"posixAccount"
)
return self.Backend.ldap.modify_password(dn, newpass=password)
def output_for_cli(self, textui, result, principal, password):
assert password is None
textui.print_plain('Changed password for "%s"' % principal)
api.register(passwd)

View File

@@ -1,127 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for password policy.
"""
from ipalib import api
from ipalib import Command # Plugin base classes
from ipalib import Int # Parameter types
class pwpolicy_mod(Command):
'Edit existing password policy.'
takes_options = (
Int('krbmaxpwdlife?',
cli_name='maxlife',
doc='Max. Password Lifetime (days)',
minvalue=0,
),
Int('krbminpwdlife?',
cli_name='minlife',
doc='Min. Password Lifetime (hours)',
minvalue=0,
),
Int('krbpwdhistorylength?',
cli_name='history',
doc='Password History Size',
minvalue=0,
),
Int('krbpwdmindiffchars?',
cli_name='minclasses',
doc='Min. Number of Character Classes',
minvalue=0,
),
Int('krbpwdminlength?',
cli_name='minlength',
doc='Min. Length of Password',
minvalue=0,
),
)
def execute(self, *args, **kw):
"""
Execute the pwpolicy-mod operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param args: This function takes no positional arguments
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'dn' not in kw
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", "accounts", "krbPwdPolicy")
# The LDAP routines want strings, not ints, so convert a few
# things. Otherwise it sees a string -> int conversion as a change.
for k in kw.iterkeys():
if k.startswith("krb", 0, 3) and type(kw[k]) is int:
kw[k] = str(kw[k])
# Convert hours and days to seconds
if kw.get('krbmaxpwdlife'):
kw['krbmaxpwdlife'] = str(int(kw.get('krbmaxpwdlife')) * 86400)
if kw.get('krbminpwdlife'):
kw['krbminpwdlife'] = str(int(kw.get('krbminpwdlife')) * 3600)
return ldap.update(dn, **kw)
def output_for_cli(self, textui, result, *args, **options):
textui.print_plain("Policy modified")
api.register(pwpolicy_mod)
class pwpolicy_show(Command):
'Retrieve current password policy'
def execute(self, *args, **kw):
"""
Execute the pwpolicy-show operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param args: Not used.
:param kw: Not used.
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", "accounts", "krbPwdPolicy")
policy = ldap.retrieve(dn)
# convert some values for display purposes
policy['krbmaxpwdlife'] = str(int(policy.get('krbmaxpwdlife')) / 86400)
policy['krbminpwdlife'] = str(int(policy.get('krbminpwdlife')) / 3600)
return policy
def output_for_cli(self, textui, result, *args, **options):
textui.print_plain("Password Policy")
textui.print_plain("Min. Password Lifetime (hours): %s" % result.get('krbminpwdlife'))
textui.print_plain("Max. Password Lifetime (days): %s" % result.get('krbmaxpwdlife'))
textui.print_plain("Min. Number of Character Classes: %s" % result.get('krbpwdmindiffchars'))
textui.print_plain("Min. Length of Password: %s" % result.get('krbpwdminlength'))
textui.print_plain("Password History Size: %s" % result.get('krbpwdhistorylength'))
api.register(pwpolicy_show)

View File

@@ -1,86 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for rolegroups.
"""
from ipalib import api
from ipalib.plugins.basegroup import *
display_attributes = ['cn','description', 'member', 'memberof']
container_rolegroup = "cn=rolegroups,cn=accounts"
class rolegroup(BaseGroup):
"""
rolegroup object.
"""
container=container_rolegroup
api.register(rolegroup)
class rolegroup_add(basegroup_add):
'Add a new rolegroup.'
base_classes = ("top", "groupofnames", "nestedgroup")
api.register(rolegroup_add)
class rolegroup_del(basegroup_del):
'Delete an existing rolegroup.'
container = container_rolegroup
api.register(rolegroup_del)
class rolegroup_mod(basegroup_mod):
'Edit an existing rolegroup.'
container = container_rolegroup
api.register(rolegroup_mod)
class rolegroup_find(basegroup_find):
'Search the groups.'
container = container_rolegroup
api.register(rolegroup_find)
class rolegroup_show(basegroup_show):
'Examine an existing rolegroup.'
default_attributes = display_attributes
container = container_rolegroup
api.register(rolegroup_show)
class rolegroup_add_member(basegroup_add_member):
'Add a member to a rolegroup.'
container = container_rolegroup
api.register(rolegroup_add_member)
class rolegroup_remove_member(basegroup_remove_member):
'Remove a member from a rolegroup.'
container = container_rolegroup
api.register(rolegroup_remove_member)

View File

@@ -1,291 +0,0 @@
# Authors:
# Jason Gerard DeRose <jderose@redhat.com>
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for service (Identity).
"""
from ipalib import api, crud, errors
from ipalib import Object # Plugin base classes
from ipalib import Str, Flag, Bytes # Parameter types
import base64
from OpenSSL import crypto
default_attributes = ['krbprincipalname', 'usercertificate']
def validate_principal(ugettext, principal):
(service, hostname, principal) = split_principal(principal)
def split_principal(principal):
service = hostname = realm = None
# Break down the principal into its component parts, which may or
# may not include the realm.
sp = principal.split('/')
if len(sp) != 2:
raise errors.MalformedServicePrincipal(reason="missing service")
service = sp[0]
sr = sp[1].split('@')
if len(sr) > 2:
raise errors.MalformedServicePrincipal(reason="unable to determine realm")
hostname = sr[0].lower()
if len(sr) == 2:
realm = sr[1].upper()
# At some point we'll support multiple realms
if (realm != api.env.realm):
raise errors.RealmMismatch()
else:
realm = api.env.realm
# Note that realm may be None.
return (service, hostname, realm)
def normalize_principal(principal):
# The principal is already validated when it gets here
(service, hostname, realm) = split_principal(principal)
# Put the principal back together again
principal = service + "/" + hostname + "@" + realm
return unicode(principal)
def validate_certificate(ugettext, cert):
"""
For now just verify that it is properly base64-encoded.
"""
try:
base64.b64decode(cert)
except Exception, e:
raise errors.Base64DecodeError(reason=str(e))
class service(Object):
"""
Service object.
"""
takes_params = (
Str('principal',
validate_principal,
primary_key=True,
normalizer=lambda value: normalize_principal(value),
),
Bytes('usercertificate?',
validate_certificate,
cli_name='certificate',
doc='Base-64 encoded server certificate',
),
)
api.register(service)
class service_add(crud.Add):
"""
Add a new service.
"""
takes_options = (
Flag('force',
doc='Force a service principal name even if not found in DNS',
),
)
def execute(self, principal, **kw):
"""
Execute the service-add operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry as it will be created in LDAP.
:param principal: The service to be added in the form: service/hostname
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'krbprincipalname' not in kw
ldap = self.api.Backend.ldap
force = kw.get('force', False)
try:
del kw['force']
except:
pass
(service, hostname, realm) = split_principal(principal)
if service.lower() == "host" and not force:
raise errors.HostService()
"""
FIXME once DNS client is done
if not force:
fqdn = hostname + "."
rs = dnsclient.query(fqdn, dnsclient.DNS_C_IN, dnsclient.DNS_T_A)
if len(rs) == 0:
self.log.debug("IPA: DNS A record lookup failed for '%s'" % hostname)
raise ipaerror.gen_exception(ipaerror.INPUT_NOT_DNS_A_RECORD)
else:
self.log.debug("IPA: found %d records for '%s'" % (len(rs), hostname))
"""
# FIXME, should be in a normalizer. Need to fix normalizers to work
# on non-unicode data
if kw.get('usercertificate'):
kw['usercertificate'] = base64.b64decode(kw['usercertificate'])
dn = ldap.make_service_dn(principal)
kw['dn'] = dn
kw['objectclass'] = ['krbPrincipal', 'krbPrincipalAux', 'krbTicketPolicyAux', 'ipaService', 'pkiUser']
return ldap.create(**kw)
def output_for_cli(self, textui, result, *args, **options):
textui.print_plain("Service added")
textui.print_entry(result)
api.register(service_add)
class service_del(crud.Del):
'Delete an existing service.'
def execute(self, principal, **kw):
"""
Delete a service principal.
principal is the krbprincipalname of the entry to delete.
This should be called with much care.
:param principal: The service to be added in the form: service/hostname
:param kw: not used
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("krbprincipalname", principal, object_type="ipaService")
entry = ldap.retrieve(dn)
if entry.has_key('usercertificate'):
cert = entry.get('usercertificate')
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
serial = str(x509.get_serial_number())
api.Command['cert_revoke'](unicode(serial, ), **{'revocation_reason': 5})
return ldap.delete(dn)
def output_to_cli(self, ret):
textui.print_plain("Service removed")
api.register(service_del)
class service_mod(crud.Update):
'Update an existing service.'
def execute(self, principal, **kw):
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("krbprincipalname", principal, object_type="ipaService")
entry = ldap.retrieve(dn)
if entry.get('usercertificate') and kw.get('usercertificate'):
# FIXME, what to do here? Do we revoke the old cert?
raise errors.GenericError(format='entry already has a certificate')
# FIXME, should be in a normalizer. Need to fix normalizers to work
# on non-unicode data.
if kw.get('usercertificate'):
kw['usercertificate'] = base64.b64decode(kw['usercertificate'])
return ldap.update(dn, **kw)
def output_to_cli(self, ret):
textui.print_plain("Service updated")
textui.print_entry(result)
api.register(service_mod)
class service_find(crud.Search):
'Search the existing services.'
takes_options = (
Flag('all', doc='Retrieve all attributes'),
)
def execute(self, principal, **kw):
ldap = self.api.Backend.ldap
search_kw = {}
search_kw['filter'] = "&(objectclass=ipaService)(!(objectClass=posixAccount))(!(|(krbprincipalname=kadmin/*)(krbprincipalname=K/M@*)(krbprincipalname=krbtgt/*)))"
search_kw['krbprincipalname'] = principal
object_type = ldap.get_object_type("krbprincipalname")
if object_type and not kw.get('objectclass'):
search_kw['objectclass'] = object_type
if kw.get('all', False):
search_kw['attributes'] = ['*']
else:
search_kw['attributes'] = default_attributes
return ldap.search(**search_kw)
def output_for_cli(self, textui, result, *args, **options):
counter = result[0]
services = result[1:]
if counter == 0:
textui.print_plain("No entries found")
return
for s in services:
textui.print_entry(s)
if counter == -1:
textui.print_plain("These results are truncated.")
textui.print_plain("Please refine your search and try again.")
textui.print_count(services, '%d services matched')
api.register(service_find)
class service_show(crud.Get):
'Examine an existing service.'
takes_options = (
Flag('all', doc='Display all service attributes'),
)
def execute(self, principal, **kw):
"""
Execute the service-show operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param principal: The service principal to retrieve
:param kw: Not used.
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("krbprincipalname", principal, object_type="ipaService")
# FIXME: should kw contain the list of attributes to display?
if kw.get('all', False):
return ldap.retrieve(dn)
else:
return ldap.retrieve(dn, default_attributes)
def output_for_cli(self, textui, result, *args, **options):
textui.print_entry(result)
api.register(service_show)

View File

@@ -1,176 +0,0 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for taskgroups.
"""
from ipalib import api
from ipalib.plugins.basegroup import *
display_attributes = ('cn','description', 'member', 'memberof')
container_taskgroup = "cn=taskgroups,cn=accounts"
container_rolegroup = "cn=rolegroups,cn=accounts"
class taskgroup(BaseGroup):
"""
taskgroup object.
"""
container=container_taskgroup
api.register(taskgroup)
class taskgroup_add(basegroup_add):
'Add a new taskgroup.'
api.register(taskgroup_add)
class taskgroup_del(basegroup_del):
'Delete an existing taskgroup.'
container = container_taskgroup
api.register(taskgroup_del)
class taskgroup_mod(basegroup_mod):
'Edit an existing taskgroup.'
container = container_taskgroup
api.register(taskgroup_mod)
class taskgroup_find(basegroup_find):
'Search the groups.'
container = container_taskgroup
api.register(taskgroup_find)
class taskgroup_show(basegroup_show):
'Examine an existing taskgroup.'
default_attributes = display_attributes
container = container_taskgroup
api.register(taskgroup_show)
class taskgroup_showall(Command):
'List all taskgroups.'
default_attributes = display_attributes
container = container_taskgroup
takes_args = ()
def execute(self, **kw):
ldap = self.api.Backend.ldap
search_kw = {"cn": "*"}
search_kw['objectclass'] = "groupofnames"
search_kw['base'] = self.container
search_kw['exactonly'] = True
search_kw['attributes'] = ['cn', 'description']
return ldap.search(**search_kw)
def output_for_cli(self, textui, result, **options):
counter = result[0]
groups = result[1:]
if counter == 0 or len(groups) == 0:
textui.print_plain("No entries found")
return
for g in groups:
textui.print_entry(g)
textui.print_plain('')
if counter == -1:
textui.print_plain("These results are truncated.")
textui.print_plain("Please refine your search and try again.")
textui.print_count(groups, '%d groups matched')
api.register(taskgroup_showall)
class taskgroup_add_member(basegroup_add_member):
'Add a member to a taskgroup.'
container = container_taskgroup
takes_options = basegroup_add_member.takes_options + (List('rolegroups?', doc='comma-separated list of role groups to add'),)
def execute(self, cn, **kw):
"""
Execute the group-add-member operation.
Returns the updated group entry
:param cn: The group name to add new members to.
:param kw: groups is a comma-separated list of groups to add
:param kw: users is a comma-separated list of users to add
:param kw: rolegroups is a comma-separated list of rolegroups to add
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
add_failed = []
to_add = []
completed = 0
# Do the base class additions first
add_failed = super(taskgroup_add_member, self).execute(cn, **kw)
members = kw.get('rolegroups', [])
(to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", self.filter_class, container_rolegroup)
(completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "member")
return add_failed
api.register(taskgroup_add_member)
class taskgroup_remove_member(basegroup_remove_member):
'Remove a member from a taskgroup.'
container = container_taskgroup
takes_options = basegroup_remove_member.takes_options + (List('rolegroups?', doc='comma-separated list of role groups to remove'),)
def execute(self, cn, **kw):
"""
Execute the group-remove-member operation.
Returns the updated group entry
:param cn: The group name to remove new members from.
:param kw: groups is a comma-separated list of groups to remove
:param kw: users is a comma-separated list of users to remove
:param kw: rolegroups is a comma-separated list of rolegroups to remove
"""
assert self.container
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container)
remove_failed = []
to_remove = []
completed = 0
# Do the base class removals first
remove_failed = super(taskgroup_remove_member, self).execute(cn, **kw)
members = kw.get('rolegroups', [])
(to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", self.filter_class, container_rolegroup)
(completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "member")
return remove_failed
api.register(taskgroup_remove_member)

View File

@@ -1,383 +0,0 @@
# Authors:
# Jason Gerard DeRose <jderose@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Frontend plugins for user (Identity).
"""
from ipalib import api, crud, errors
from ipalib import Object, Command # Plugin base classes
from ipalib import Str, Password, Flag, Int # Parameter types
def display_user(user):
# FIXME: for now delete dn here. In the future pass in the kw to
# output_for_cli()
attr = sorted(user.keys())
# Always have sn following givenname
try:
l = attr.index('givenname')
attr.remove('sn')
attr.insert(l+1, 'sn')
except ValueError:
pass
for a in attr:
if a != 'dn':
print "%s: %s" % (a, user[a])
default_attributes = ['uid','givenname','sn','homeDirectory','loginshell']
class user(Object):
"""
User object.
"""
takes_params = (
Str('givenname',
cli_name='first',
doc="User's first name",
),
Str('sn',
cli_name='last',
doc="User's last name",
),
Str('uid',
cli_name='user',
doc="User's login name",
primary_key=True,
default_from=lambda givenname, sn: givenname[0] + sn,
normalizer=lambda value: value.lower(),
),
Str('gecos?',
doc='GECOS field',
default_from=lambda uid: uid,
),
Str('homedirectory?',
cli_name='home',
doc="User's home directory",
default_from=lambda uid: '/home/%s' % uid,
),
Str('loginshell?',
cli_name='shell',
default=u'/bin/sh',
doc="User's Login shell",
),
Str('krbprincipalname?',
cli_name='principal',
doc="User's Kerberos Principal name",
default_from=lambda uid: '%s@%s' % (uid, api.env.realm),
),
Str('mail?',
cli_name='email',
doc="User's e-mail address",
),
Password('userpassword?',
cli_name='password',
doc="Set user's password",
),
Str('groups?',
doc='Add account to one or more groups (comma-separated)',
),
Int('uidnumber?',
cli_name='uid',
doc='The uid to use for this user. If not included one is automatically set.',
),
Str('street?',
doc='The street address',
),
)
api.register(user)
class user_add(crud.Create):
"""
Add a new user.
"""
def execute(self, uid, **kw):
"""
Execute the user-add operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry as it will be created in LDAP.
:param uid: The login name of the user being added.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'uid' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
kw['uid'] = uid
kw['dn'] = ldap.make_user_dn(uid)
default_group = None
# FIXME: enforce this elsewhere
# if servercore.uid_too_long(kw['uid']):
# raise errors.UsernameTooLong
# Get our configuration
config = ldap.get_ipa_config()
# Let us add in some missing attributes
if kw.get('homedirectory') is None:
kw['homedirectory'] = '%s/%s' % (config.get('ipahomesrootdir'), kw.get('uid'))
kw['homedirectory'] = kw['homedirectory'].replace('//', '/')
kw['homedirectory'] = kw['homedirectory'].rstrip('/')
if kw.get('loginshell') is None:
kw['loginshell'] = config.get('ipadefaultloginshell')
if kw.get('gecos') is None:
kw['gecos'] = kw['uid']
# If uidnumber is blank the the FDS dna_plugin will automatically
# assign the next value. So we don't have to do anything with it.
if not kw.get('gidnumber'):
try:
group_dn = ldap.find_entry_dn("cn", config.get('ipadefaultprimarygroup'))
default_group = ldap.retrieve(group_dn, ['cn', 'dn','gidNumber'])
if default_group:
kw['gidnumber'] = default_group.get('gidnumber')
except errors.NotFound:
# Fake an LDAP error so we can return something useful to the kw
raise errors.NotFound(reason="The default group for new users, '%s', cannot be found." % config.get('ipadefaultprimarygroup'))
except Exception, e:
# catch everything else
raise e
if kw.get('krbprincipalname') is None:
kw['krbprincipalname'] = "%s@%s" % (kw.get('uid'), self.api.env.realm)
# FIXME. This is a hack so we can request separate First and Last
# name in the GUI.
if kw.get('cn') is None:
kw['cn'] = "%s %s" % (kw.get('givenname'),
kw.get('sn'))
# some required objectclasses
kw['objectClass'] = config.get('ipauserobjectclasses')
new_user = ldap.create(**kw)
if default_group:
groupkw = {'users':kw.get('uid')}
api.Command['group_add_member'](default_group['cn'].decode('UTF-8'), **groupkw)
return new_user
def output_for_cli(self, textui, result, *args, **options):
"""
Output result of this command to command line interface.
"""
textui.print_name(self.name)
textui.print_entry(result)
textui.print_dashed('Added user "%s"' % result['uid'])
api.register(user_add)
class user_del(crud.Delete):
'Delete an existing user.'
def execute(self, uid):
"""Delete a user. Not to be confused with inactivate_user. This
makes the entry go away completely.
uid is the uid of the user to delete
The memberOf plugin handles removing the user from any other
groups.
:param uid: The login name of the user being added.
:param kw: Not used.
"""
if uid == "admin":
# FIXME: do we still want a "special" user?
raise SyntaxError("admin required")
# raise ipaerror.gen_exception(ipaerror.INPUT_ADMIN_REQUIRED)
self.log.info("IPA: user-del '%s'" % uid)
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("uid", uid)
return ldap.delete(dn)
def output_for_cli(self, textui, result, uid):
"""
Output result of this command to command line interface.
"""
textui.print_plain('Deleted user "%s"' % uid)
api.register(user_del)
class user_mod(crud.Update):
'Edit an existing user.'
def execute(self, uid, **kw):
"""
Execute the user-mod operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param uid: The login name of the user to retrieve.
:param kw: Keyword arguments for the other LDAP attributes.
"""
assert 'uid' not in kw
assert 'dn' not in kw
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("uid", uid)
return ldap.update(dn, **kw)
def output_for_cli(self, textui, result, uid, **options):
"""
Output result of this command to command line interface.
"""
textui.print_name(self.name)
textui.print_entry(result)
textui.print_dashed('Updated user "%s"' % result['uid'])
api.register(user_mod)
class user_find(crud.Search):
"""
Search for users.
"""
takes_options = (
Flag('all', doc='Retrieve all user attributes'),
)
def execute(self, term, **kw):
ldap = self.api.Backend.ldap
# Pull the list of searchable attributes out of the configuration.
config = ldap.get_ipa_config()
search_fields_conf_str = config.get('ipausersearchfields')
search_fields = search_fields_conf_str.split(",")
search_kw = {}
for s in search_fields:
search_kw[s] = term
object_type = ldap.get_object_type("uid")
if object_type and not kw.get('objectclass'):
search_kw['objectclass'] = object_type
if kw.get('all', False):
search_kw['attributes'] = ['*']
else:
search_kw['attributes'] = default_attributes
return ldap.search(**search_kw)
def output_for_cli(self, textui, result, uid, **options):
counter = result[0]
users = result[1:]
if counter == 0 or len(users) == 0:
textui.print_plain("No entries found")
return
if len(users) == 1:
textui.print_entry(users[0])
return
textui.print_name(self.name)
for u in users:
gn = u.get('givenname', '')
sn= u.get('sn', '')
textui.print_plain('%s %s:' % (gn, sn))
textui.print_entry(u)
textui.print_plain('')
if counter == -1:
textui.print_plain('These results are truncated.')
textui.print_plain('Please refine your search and try again.')
textui.print_count(users, '%d users matched')
api.register(user_find)
class user_show(crud.Retrieve):
'Examine an existing user.'
takes_options = (
Flag('all', doc='Retrieve all user attributes'),
)
def execute(self, uid, **kw):
"""
Execute the user-show operation.
The dn should not be passed as a keyword argument as it is constructed
by this method.
Returns the entry
:param uid: The login name of the user to retrieve.
:param kw: "all" set to True = return all attributes
"""
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("uid", uid)
# FIXME: should kw contain the list of attributes to display?
if kw.get('all', False):
return ldap.retrieve(dn)
else:
return ldap.retrieve(dn, default_attributes)
def output_for_cli(self, textui, result, uid, **options):
display_user(result)
api.register(user_show)
class user_lock(Command):
'Lock a user account.'
takes_args = (
Str('uid', primary_key=True),
)
def execute(self, uid, **kw):
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("uid", uid)
return ldap.mark_entry_inactive(dn)
def output_for_cli(self, textui, result, uid):
if result:
textui.print_plain('Locked user "%s"' % uid)
api.register(user_lock)
class user_unlock(Command):
'Unlock a user account.'
takes_args = (
Str('uid', primary_key=True),
)
def execute(self, uid, **kw):
ldap = self.api.Backend.ldap
dn = ldap.find_entry_dn("uid", uid)
return ldap.mark_entry_active(dn)
def output_for_cli(self, textui, result, uid):
if result:
textui.print_plain('Unlocked user "%s"' % uid)
api.register(user_unlock)