Fail on unknown Command options

When unknown keyword arguments are passed to a Command, raise an
error instead of ignoring them.

Options used when IPA calls its commands internally are listed
in a new Command attribute called internal_options, and allowed.

Previous patches (0b01751c, c45174d6, c5689e7f) made IPA not use
unknown keyword arguments in its own commands and tests, but since
that some violations were reintroduced in permission_find and tests.
Fix those.

Tests included; both a frontend unittest and a XML-RPC test via the
ping plugin (which was untested previously).

https://fedorahosted.org/freeipa/ticket/2509
This commit is contained in:
Petr Viktorin 2012-04-17 12:42:35 -04:00 committed by Martin Kosek
parent 1484ccc404
commit 1235dfa7bf
10 changed files with 123 additions and 28 deletions

View File

@ -29,7 +29,8 @@ from parameters import create_param, parse_param_spec, Param, Str, Flag, Passwor
from output import Output, Entry, ListOfEntries from output import Output, Entry, ListOfEntries
from text import _, ngettext from text import _, ngettext
from errors import ZeroArgumentError, MaxArgumentError, OverlapError, RequiresRoot, VersionError, RequirementError from errors import (ZeroArgumentError, MaxArgumentError, OverlapError,
RequiresRoot, VersionError, RequirementError, OptionError)
from errors import InvocationError from errors import InvocationError
from constants import TYPE_ERROR from constants import TYPE_ERROR
from ipapython.version import API_VERSION from ipapython.version import API_VERSION
@ -404,6 +405,8 @@ class Command(HasParam):
output_params = Plugin.finalize_attr('output_params') output_params = Plugin.finalize_attr('output_params')
has_output_params = tuple() has_output_params = tuple()
internal_options = tuple()
msg_summary = None msg_summary = None
msg_truncated = _('Results are truncated, try a more specific search') msg_truncated = _('Results are truncated, try a more specific search')
@ -520,7 +523,13 @@ class Command(HasParam):
def __options_2_params(self, options): def __options_2_params(self, options):
for name in self.params: for name in self.params:
if name in options: if name in options:
yield (name, options[name]) yield (name, options.pop(name))
# If any options remain, they are either internal or unknown
unused_keys = set(options).difference(self.internal_options)
unused_keys.discard('version')
if unused_keys:
raise OptionError(_('Unknown option: %(option)s'),
option=unused_keys.pop())
def args_options_2_entry(self, *args, **options): def args_options_2_entry(self, *args, **options):
""" """

View File

@ -610,6 +610,8 @@ class aci_mod(crud.Update):
takes_options = (_prefix_option,) takes_options = (_prefix_option,)
internal_options = ['rename']
msg_summary = _('Modified ACI "%(value)s"') msg_summary = _('Modified ACI "%(value)s"')
def execute(self, aciname, **kw): def execute(self, aciname, **kw):

View File

@ -787,6 +787,8 @@ class automountkey_add(LDAPCreate):
msg_summary = _('Added automount key "%(value)s"') msg_summary = _('Added automount key "%(value)s"')
internal_options = ['description', 'add_operation']
def pre_callback(self, ldap, dn, entry_attrs, *keys, **options): def pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
options.pop('add_operation', None) options.pop('add_operation', None)
options.pop('description', None) options.pop('description', None)
@ -846,7 +848,7 @@ class automountmap_add_indirect(LDAPCreate):
self.api.Command['automountmap_show'](location, parentmap) self.api.Command['automountmap_show'](location, parentmap)
# Add a submount key # Add a submount key
self.api.Command['automountkey_add']( self.api.Command['automountkey_add'](
location, parentmap, automountkey=key, key=key, location, parentmap, automountkey=key,
automountinformation='-fstype=autofs ldap:%s' % map) automountinformation='-fstype=autofs ldap:%s' % map)
else: # adding to auto.master else: # adding to auto.master
# Ensure auto.master exists # Ensure auto.master exists
@ -910,6 +912,8 @@ class automountkey_mod(LDAPUpdate):
msg_summary = _('Modified automount key "%(value)s"') msg_summary = _('Modified automount key "%(value)s"')
internal_options = ['newautomountkey']
takes_options = LDAPUpdate.takes_options + ( takes_options = LDAPUpdate.takes_options + (
IA5Str('newautomountinformation?', IA5Str('newautomountinformation?',
cli_name='newinfo', cli_name='newinfo',

View File

@ -90,6 +90,16 @@ output_params = (
), ),
) )
def filter_options(options, keys):
"""Return a dict that includes entries from `options` that are in `keys`
example:
>>> filtered = filter_options({'a': 1, 'b': 2, 'c': 3}, ['a', 'c'])
>>> filtered == {'a': 1, 'c': 3}
True
"""
return dict((k, options[k]) for k in keys if k in options)
class permission(LDAPObject): class permission(LDAPObject):
""" """
Permission object. Permission object.
@ -331,7 +341,7 @@ class permission_mod(LDAPUpdate):
cn = options['rename'] # rename finished cn = options['rename'] # rename finished
common_options = dict((k, options[k]) for k in ('all', 'raw') if k in options) common_options = filter_options(options, ['all', 'raw'])
result = self.api.Command.permission_show(cn, **common_options)['result'] result = self.api.Command.permission_show(cn, **common_options)['result']
for r in result: for r in result:
if not r.startswith('member_'): if not r.startswith('member_'):
@ -350,12 +360,19 @@ class permission_find(LDAPSearch):
has_output_params = LDAPSearch.has_output_params + output_params has_output_params = LDAPSearch.has_output_params + output_params
def post_callback(self, ldap, entries, truncated, *args, **options): def post_callback(self, ldap, entries, truncated, *args, **options):
# There is an option/param overlap: "cn" must be passed as "aciname"
# to aci-find. Besides that we don't need cn anymore so pop it
aciname = options.pop('cn', None)
pkey_only = options.pop('pkey_only', False) pkey_only = options.pop('pkey_only', False)
if not pkey_only: if not pkey_only:
for entry in entries: for entry in entries:
(dn, attrs) = entry (dn, attrs) = entry
try: try:
aci = self.api.Command.aci_show(attrs['cn'][0], aciprefix=ACI_PREFIX, **options)['result'] common_options = filter_options(options, ['all', 'raw'])
aci = self.api.Command.aci_show(attrs['cn'][0],
aciprefix=ACI_PREFIX, **common_options)['result']
# copy information from respective ACI to permission entry # copy information from respective ACI to permission entry
for attr in self.obj.aci_attributes: for attr in self.obj.aci_attributes:
@ -377,23 +394,16 @@ class permission_find(LDAPSearch):
# aren't already in the list along with their permission info. # aren't already in the list along with their permission info.
opts = copy.copy(options) opts = copy.copy(options)
if aciname:
opts['aciname'] = aciname
opts['aciprefix'] = ACI_PREFIX opts['aciprefix'] = ACI_PREFIX
try: # permission ACI attribute is needed
# permission ACI attribute is needed opts.pop('raw', None)
del opts['raw'] opts.pop('sizelimit', None)
except:
pass
if 'cn' in options:
# the attribute for name is difference in acis
opts['aciname'] = options['cn']
aciresults = self.api.Command.aci_find(*args, **opts) aciresults = self.api.Command.aci_find(*args, **opts)
truncated = truncated or aciresults['truncated'] truncated = truncated or aciresults['truncated']
results = aciresults['result'] results = aciresults['result']
if 'cn' in options:
# there is an option/param overlap if --name is in the
# search list, we don't need cn anymore so drop it.
options.pop('cn')
for aci in results: for aci in results:
found = False found = False
if 'permission' in aci: if 'permission' in aci:
@ -403,7 +413,9 @@ class permission_find(LDAPSearch):
found = True found = True
break break
if not found: if not found:
permission = self.api.Command.permission_show(aci['permission'], **options)['result'] common_options = filter_options(options, ['all', 'raw'])
permission = self.api.Command.permission_show(
aci['permission'], **common_options)['result']
dn = permission['dn'] dn = permission['dn']
del permission['dn'] del permission['dn']
if pkey_only: if pkey_only:
@ -429,8 +441,9 @@ class permission_show(LDAPRetrieve):
has_output_params = LDAPRetrieve.has_output_params + output_params has_output_params = LDAPRetrieve.has_output_params + output_params
def post_callback(self, ldap, dn, entry_attrs, *keys, **options): def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
try: try:
common_options = dict((k, options[k]) for k in ('all', 'raw') if k in options) common_options = filter_options(options, ['all', 'raw'])
aci = self.api.Command.aci_show(keys[-1], aciprefix=ACI_PREFIX, **common_options)['result'] aci = self.api.Command.aci_show(keys[-1], aciprefix=ACI_PREFIX,
**common_options)['result']
for attr in self.obj.aci_attributes: for attr in self.obj.aci_attributes:
if attr in aci: if attr in aci:
entry_attrs[attr] = aci[attr] entry_attrs[attr] = aci[attr]

View File

@ -128,8 +128,7 @@ class TestCLIParsing(object):
def test_dnsrecord_del_all(self): def test_dnsrecord_del_all(self):
try: try:
self.run_command('dnszone_add', idnsname=u'test-example.com', self.run_command('dnszone_add', idnsname=u'test-example.com',
idnssoamname=u'ns.test-example.com', idnssoamname=u'ns.test-example.com', force=True)
admin_email=u'devnull@test-example.com', force=True)
except errors.NotFound: except errors.NotFound:
raise nose.SkipTest('DNS is not configured') raise nose.SkipTest('DNS is not configured')
try: try:
@ -162,8 +161,7 @@ class TestCLIParsing(object):
def test_dnsrecord_del_one_by_one(self): def test_dnsrecord_del_one_by_one(self):
try: try:
self.run_command('dnszone_add', idnsname=u'test-example.com', self.run_command('dnszone_add', idnsname=u'test-example.com',
idnssoamname=u'ns.test-example.com', idnssoamname=u'ns.test-example.com', force=True)
admin_email=u'devnull@test-example.com', force=True)
except errors.NotFound: except errors.NotFound:
raise nose.SkipTest('DNS is not configured') raise nose.SkipTest('DNS is not configured')
try: try:

View File

@ -511,6 +511,11 @@ class test_Command(ClassChecker):
assert e.count == 2 assert e.count == 2
assert str(e) == "command 'example' takes at most 2 arguments" assert str(e) == "command 'example' takes at most 2 arguments"
# Test that OptionError is raised when an extra option is given:
o = self.get_instance()
e = raises(errors.OptionError, o.args_options_2_params, bad_option=True)
assert e.option == 'bad_option'
# Test that OverlapError is raised: # Test that OverlapError is raised:
o = self.get_instance(args=('one', 'two'), options=('three', 'four')) o = self.get_instance(args=('one', 'two'), options=('three', 'four'))
e = raises(errors.OverlapError, o.args_options_2_params, e = raises(errors.OverlapError, o.args_options_2_params,

View File

@ -126,7 +126,7 @@ class test_host(Declarative):
command=('host_add', [fqdn1], command=('host_add', [fqdn1],
dict( dict(
description=u'Test host 1', description=u'Test host 1',
locality=u'Undisclosed location 1', l=u'Undisclosed location 1',
force=True, force=True,
), ),
), ),

View File

@ -726,7 +726,7 @@ class test_netgroup(Declarative):
dict( dict(
desc='Add duplicatehost %r to netgroup %r' % (host1, netgroup1), desc='Add duplicate host %r to netgroup %r' % (host1, netgroup1),
command=( command=(
'netgroup_add_member', [netgroup1], dict(host=host1) 'netgroup_add_member', [netgroup1], dict(host=host1)
), ),
@ -960,8 +960,8 @@ class test_netgroup(Declarative):
), ),
dict( dict(
desc='Search for all netgroups using empty memberuser', desc='Search for all netgroups using empty member user',
command=('netgroup_find', [], dict(memberuser=None)), command=('netgroup_find', [], dict(user=None)),
expected=dict( expected=dict(
count=2, count=2,
truncated=False, truncated=False,

View File

@ -643,6 +643,18 @@ class test_permission(Declarative):
), ),
dict(
desc='Search using nonexistent --subtree',
command=('permission_find', [], {'subtree': u'foo'}),
expected=dict(
count=0,
truncated=False,
summary=u'0 permissions matched',
result=[],
),
),
dict( dict(
desc='Delete %r' % permission1_renamed_ucase, desc='Delete %r' % permission1_renamed_ucase,
command=('permission_del', [permission1_renamed_ucase], {}), command=('permission_del', [permission1_renamed_ucase], {}),

View File

@ -0,0 +1,52 @@
# Authors:
# Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2012 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Test the `ipalib/plugins/ping.py` module, and XML-RPC in general.
"""
from ipalib import api, errors, _
from tests.util import assert_equal, Fuzzy
from xmlrpc_test import Declarative
class test_ping(Declarative):
tests = [
dict(
desc='Ping the server',
command=('ping', [], {}),
expected=dict(
summary=Fuzzy('IPA server version .*. API version .*')),
),
dict(
desc='Try to ping with an argument',
command=('ping', ['bad_arg'], {}),
expected=errors.ZeroArgumentError(name='ping'),
),
dict(
desc='Try to ping with an option',
command=('ping', [], dict(bad_arg=True)),
expected=errors.OptionError(_('Unknown option: %(option)s'),
option='bad_arg'),
),
]