Let the updater delete entries and add small test harness

In order to run the tests you must put your DM password into
~/.ipa/.dmpw

Some tests are expected to generate errors. Don't let any ERROR
messages from the updater fool you, watch the pass/fail of the nosetests.
This commit is contained in:
Rob Crittenden 2009-09-14 16:12:58 -04:00 committed by Jason Gerard DeRose
parent aa7792a000
commit e62bbab37a
13 changed files with 322 additions and 7 deletions

View File

@ -122,7 +122,7 @@ class LDAPUpdate:
else:
return ""
def __template_str(self, s):
def _template_str(self, s):
try:
return ipautil.template_str(s, self.sub_dict)
except KeyError, e:
@ -199,7 +199,7 @@ class LDAPUpdate:
def parse_update_file(self, data, all_updates, dn_list):
"""Parse the update file into a dictonary of lists and apply the update
for each DN in the file."""
valid_keywords = ["default", "add", "remove", "only"]
valid_keywords = ["default", "add", "remove", "only", "deleteentry"]
update = {}
d = ""
index = ""
@ -219,12 +219,12 @@ class LDAPUpdate:
update = {}
dn = line[3:].strip()
update['dn'] = self.__template_str(dn)
update['dn'] = self._template_str(dn)
else:
if dn is None:
raise BadSyntax, "dn is not defined in the update"
line = self.__template_str(line)
line = self._template_str(line)
if line.startswith(' '):
v = d[len(d) - 1]
v = v + line[1:]
@ -271,7 +271,7 @@ class LDAPUpdate:
# randomness for good measure.
self.sub_dict['TIME'] = int(time.time()) + r.randint(0,10000)
cn = self.__template_str("indextask_$TIME")
cn = self._template_str("indextask_$TIME")
dn = "cn=%s, cn=index, cn=tasks, cn=config" % cn
e = ipaldap.Entry(dn)
@ -368,7 +368,7 @@ class LDAPUpdate:
"""updates is a list of changes to apply
entry is the thing to apply them to
returns the modified entry
Returns the modified entry
"""
if not updates:
return entry
@ -416,6 +416,9 @@ class LDAPUpdate:
only[k] = True
entry.setValues(k, e)
logging.debug('only: updated value %s', e)
elif utype == 'deleteentry':
# skip this update type, it occurs in __delete_entries()
return None
self.print_entity(entry)
@ -436,6 +439,7 @@ class LDAPUpdate:
logging.debug(a + ": ")
for l in value:
logging.debug("\t" + l)
def is_schema_updated(self, s):
"""Compare the schema in 's' with the current schema in the DS to
see if anything has changed. This should account for syntax
@ -489,6 +493,9 @@ class LDAPUpdate:
# Bring this entry up to date
entry = self.__apply_updates(update.get('updates'), entry)
if entry is None:
# It might be None if it is just deleting an entry
return
self.print_entity(entry, "Final value")
@ -500,6 +507,7 @@ class LDAPUpdate:
try:
if self.live_run:
self.conn.addEntry(entry.dn, entry.toTupleList())
self.modified = True
except Exception, e:
logging.error("Add failure %s", e)
else:
@ -533,6 +541,34 @@ class LDAPUpdate:
self.modified = True
return
def __delete_record(self, updates):
"""
Run through all the updates again looking for any that should be
deleted.
This must use a reversed list so that the longest entries are
considered first so we don't end up trying to delete a parent
and child in the wrong order.
"""
dn = updates['dn']
updates = updates['updates']
for u in updates:
# We already do syntax-parsing so this is safe
(utype, k, values) = u.split(':',2)
if utype == 'deleteentry':
try:
if self.live_run:
self.conn.deleteEntry(dn)
self.modified = True
except errors.NotFound, e:
logging.info("Deleting non-existant entry %s", e)
self.modified = True
except errors.DatabaseError, e:
logging.error("Delete failed: %s", e)
return
def get_all_files(self, root, recursive=False):
"""Get all update files"""
f = []
@ -566,11 +602,18 @@ class LDAPUpdate:
(all_updates, dn_list) = self.parse_update_file(data, all_updates, dn_list)
# For adds and updates we want to apply updates from shortest
# to greatest length of the DN. For deletes we want the reverse.
sortedkeys = dn_list.keys()
sortedkeys.sort()
for k in sortedkeys:
for dn in dn_list[k]:
self.__update_record(all_updates[dn])
sortedkeys.reverse()
for k in sortedkeys:
for dn in dn_list[k]:
self.__delete_record(all_updates[dn])
finally:
if self.conn: self.conn.unbind()

View File

@ -11,7 +11,7 @@ do
if [[ -f $executable ]]; then
echo "[ $name: Starting tests... ]"
((runs += 1))
if $executable /usr/bin/nosetests -v --with-doctest --exclude="plugins"
if $executable /usr/bin/nosetests --debug-log=/dev/null -v --with-doctest --exclude="plugins"
then
echo "[ $name: Tests OK ]"
else

View File

@ -0,0 +1,5 @@
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
deleteentry: reset: nada
dn: cn=test, cn=accounts, $SUFFIX
deleteentry: reset: nada

View File

@ -0,0 +1,23 @@
# Add in a new place in the DIT for our test cases
dn: cn=test, cn=accounts, $SUFFIX
add:objectClass: top
add:objectClass: ipaContainer
add:cn: test
add:description: Test container
# Add a test user
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
add:objectclass: top
add:objectclass: person
add:objectclass: posixaccount
add:objectclass: krbprincipalaux
add:objectclass: inetuser
add:homedirectory: /home/tuser
add:loginshell: /bin/bash
add:sn: User
add:uid: tuser
add:uidnumber: 999
add:gidnumber: 999
add:cn: Test User

View File

@ -0,0 +1,3 @@
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
add:gecos: Test User

View File

@ -0,0 +1,3 @@
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
only:gecos: Test User New

View File

@ -0,0 +1,3 @@
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
add:gecos: Test User New2

View File

@ -0,0 +1,3 @@
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
add:cn: Test User New

View File

@ -0,0 +1,3 @@
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
remove:cn: Test User New

View File

@ -0,0 +1,3 @@
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
bogus:cn: Test User New

View File

@ -0,0 +1,3 @@
dn: uid=tuser, cn=test, cn=accounts, $SUFFIX
add:cn

View File

@ -0,0 +1,22 @@
# Authors:
# Jason Gerard DeRose <jderose@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Package containing LDAP updates unit tests.
"""

View File

@ -0,0 +1,201 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Test the `ipaserver/install/ldapupdate.py` module.
"""
import os
import sys
import ldap
import nose
from tests.util import raises, PluginTester
from tests.data import unicode_str
from ipalib import api
from ipalib import errors
from ipaserver.install.ldapupdate import LDAPUpdate, BadSyntax, UPDATES_DIR
from ipaserver.install import installutils
from ipaserver import ipaldap
from ipapython import ipautil
"""
The updater works through files only so this is just a thin-wrapper controlling
which file we test at any given point.
IMPORTANT NOTE: It is easy for these tests to get out of sync. Any changes
made to the update files may require changes to the test cases as well.
Some cases pull records from LDAP and do comparisons to ensure that updates
have occurred as expected.
The DM password needs to be set in ~/.ipa/.dmpw
"""
class test_update(object):
"""
Test the LDAP updater.
"""
def setUp(self):
fqdn = installutils.get_fqdn()
pwfile = api.env.dot_ipa + os.sep + ".dmpw"
if ipautil.file_exists(pwfile):
fp = open(pwfile, "r")
self.dm_password = fp.read().rstrip()
fp.close()
else:
raise nose.SkipTest("No directory manager password")
self.updater = LDAPUpdate(dm_password=self.dm_password, sub_dict={}, live_run=True)
self.ld = ipaldap.IPAdmin(fqdn)
self.ld.do_simple_bind(bindpw=self.dm_password)
if ipautil.file_exists("0_reset.update"):
self.testdir="./"
elif ipautil.file_exists("tests/test_install/0_reset.update"):
self.testdir= "./tests/test_install/"
else:
raise nose.SkipTest("Unable to find test update files")
def tearDown(self):
if self.ld:
self.ld.unbind()
def test_0_reset(self):
"""
Reset the updater test data to a known initial state
"""
try:
modified = self.updater.update([self.testdir + "0_reset.update"])
except errors.NotFound:
# Just means the entry doesn't exist yet
modified = True
assert(modified == True)
def test_1_add(self):
"""
Test the updater with an add directive
"""
modified = self.updater.update([self.testdir + "1_add.update"])
assert(modified == True)
def test_2_update(self):
"""
Test the updater when adding an attribute to an existing entry
"""
modified = self.updater.update([self.testdir + "2_update.update"])
assert(modified == True)
# The update passed, lets look at the record and see if it is
# really updated
dn = self.updater._template_str('uid=tuser, cn=test, cn=accounts, $SUFFIX')
entry = self.ld.getList(dn, ldap.SCOPE_BASE, 'objectclass=*', ['*'])
assert (len(entry) == 1)
assert(entry[0].gecos == 'Test User')
def test_3_update(self):
"""
Test the updater forcing an attribute to a given value
"""
modified = self.updater.update([self.testdir + "3_update.update"])
assert(modified == True)
# The update passed, lets look at the record and see if it is
# really updated
dn = self.updater._template_str('uid=tuser, cn=test, cn=accounts, $SUFFIX')
entry = self.ld.getList(dn, ldap.SCOPE_BASE, 'objectclass=*', ['*'])
assert (len(entry) == 1)
assert(entry[0].gecos == 'Test User New')
def test_4_update(self):
"""
Test the updater adding a new value to a single-valued attribute
"""
modified = self.updater.update([self.testdir + "4_update.update"])
assert(modified == False)
def test_5_update(self):
"""
Test the updater adding a new value to a multi-valued attribute
"""
modified = self.updater.update([self.testdir + "5_update.update"])
assert(modified == True)
# The update passed, lets look at the record and see if it is
# really updated
dn = self.updater._template_str('uid=tuser, cn=test, cn=accounts, $SUFFIX')
entry = self.ld.getList(dn, ldap.SCOPE_BASE, 'objectclass=*', ['*'])
assert (len(entry) == 1)
assert(entry[0].getValues('cn') == ['Test User', 'Test User New'])
def test_6_update(self):
"""
Test the updater removing a value from a multi-valued attribute
"""
modified = self.updater.update([self.testdir + "6_update.update"])
assert(modified == True)
# The update passed, lets look at the record and see if it is
# really updated
dn = self.updater._template_str('uid=tuser, cn=test, cn=accounts, $SUFFIX')
entry = self.ld.getList(dn, ldap.SCOPE_BASE, 'objectclass=*', ['*'])
assert (len(entry) == 1)
assert(entry[0].cn == 'Test User')
def test_6_update_1(self):
"""
Test the updater removing a non-existant value from a multi-valued attribute
"""
modified = self.updater.update([self.testdir + "6_update.update"])
assert(modified == False)
# The update passed, lets look at the record and see if it is
# really updated
dn = self.updater._template_str('uid=tuser, cn=test, cn=accounts, $SUFFIX')
entry = self.ld.getList(dn, ldap.SCOPE_BASE, 'objectclass=*', ['*'])
assert (len(entry) == 1)
assert(entry[0].cn == 'Test User')
def test_7_cleanup(self):
"""
Reset the test data to a known initial state
"""
try:
modified = self.updater.update([self.testdir + "0_reset.update"])
except errors.NotFound:
# Just means the entry doesn't exist yet
modified = True
assert(modified == True)
def test_8_badsyntax(self):
"""
Test the updater with an unknown keyword
"""
try:
modified = self.updater.update([self.testdir + "8_badsyntax.update"])
except BadSyntax:
pass
def test_9_badsyntax(self):
"""
Test the updater with an incomplete line
"""
try:
modified = self.updater.update([self.testdir + "9_badsyntax.update"])
except BadSyntax:
pass