mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
There are two reasons for the plugin framework: 1. To provide a way of doing manual/complex LDAP changes without having to keep extending ldapupdate.py (like we did with managed entries). 2. Allows for better control of restarts. There are two types of plugins, preop and postop. A preop plugin runs before any file-based updates are loaded. A postop plugin runs after all file-based updates are applied. A preop plugin may update LDAP directly or craft update entries to be applied with the file-based updates. Either a preop or postop plugin may attempt to restart the dirsrv instance. The instance is only restartable if ipa-ldap-updater is being executed as root. A warning is printed if a restart is requested for a non-root user. Plugins are not executed by default. This is so we can use ldapupdate to apply simple updates in commands like ipa-nis-manage. https://fedorahosted.org/freeipa/ticket/1789 https://fedorahosted.org/freeipa/ticket/1790 https://fedorahosted.org/freeipa/ticket/2032
141 lines
5.3 KiB
Python
141 lines
5.3 KiB
Python
# Authors:
|
|
# Rob Crittenden <rcritten@redhat.com>
|
|
#
|
|
# Copyright (C) 2010 Red Hat
|
|
# see file 'COPYING' for use and warranty information
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
# Test some simple LDAP requests using the ldap2 backend
|
|
|
|
# This fetches a certificate from a host principal so we can ensure that the
|
|
# schema is working properly. We know this because the schema will tell the
|
|
# encoder not to utf-8 encode binary attributes.
|
|
|
|
# The DM password needs to be set in ~/.ipa/.dmpw
|
|
|
|
import nose
|
|
import os
|
|
from ipaserver.plugins.ldap2 import ldap2
|
|
from ipalib.plugins.service import service, service_show
|
|
from ipalib.plugins.host import host
|
|
import nss.nss as nss
|
|
from ipalib import api, x509, create_api, errors
|
|
from ipapython import ipautil
|
|
from ipalib.dn import *
|
|
|
|
class test_ldap(object):
|
|
"""
|
|
Test various LDAP client bind methods.
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.conn = None
|
|
self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host)
|
|
self.ccache = '/tmp/krb5cc_%d' % os.getuid()
|
|
nss.nss_init_nodb()
|
|
self.dn = str(DN(('krbprincipalname','ldap/%s@%s' % (api.env.host, api.env.realm)),
|
|
('cn','services'),('cn','accounts'),api.env.basedn))
|
|
|
|
def tearDown(self):
|
|
if self.conn and self.conn.isconnected():
|
|
self.conn.disconnect()
|
|
|
|
def test_anonymous(self):
|
|
"""
|
|
Test an anonymous LDAP bind using ldap2
|
|
"""
|
|
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
|
|
self.conn.connect()
|
|
(dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
|
|
cert = entry_attrs.get('usercertificate')
|
|
cert = cert[0]
|
|
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
|
assert serial is not None
|
|
|
|
def test_GSSAPI(self):
|
|
"""
|
|
Test a GSSAPI LDAP bind using ldap2
|
|
"""
|
|
if not ipautil.file_exists(self.ccache):
|
|
raise nose.SkipTest('Missing ccache %s' % self.ccache)
|
|
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
|
|
self.conn.connect(ccache='FILE:%s' % self.ccache)
|
|
(dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
|
|
cert = entry_attrs.get('usercertificate')
|
|
cert = cert[0]
|
|
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
|
assert serial is not None
|
|
|
|
def test_simple(self):
|
|
"""
|
|
Test a simple LDAP bind using ldap2
|
|
"""
|
|
pwfile = api.env.dot_ipa + os.sep + ".dmpw"
|
|
if ipautil.file_exists(pwfile):
|
|
fp = open(pwfile, "r")
|
|
dm_password = fp.read().rstrip()
|
|
fp.close()
|
|
else:
|
|
raise nose.SkipTest("No directory manager password in %s" % pwfile)
|
|
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
|
|
self.conn.connect(bind_dn='cn=directory manager', bind_pw=dm_password)
|
|
(dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
|
|
cert = entry_attrs.get('usercertificate')
|
|
cert = cert[0]
|
|
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
|
assert serial is not None
|
|
|
|
def test_Backend(self):
|
|
"""
|
|
Test using the ldap2 Backend directly (ala ipa-server-install)
|
|
"""
|
|
|
|
# Create our own api because the one generated for the tests is
|
|
# a client-only api. Then we register in the commands and objects
|
|
# we need for the test.
|
|
myapi = create_api(mode=None)
|
|
myapi.bootstrap(context='cli', in_server=True, in_tree=True)
|
|
myapi.register(ldap2)
|
|
myapi.register(host)
|
|
myapi.register(service)
|
|
myapi.register(service_show)
|
|
myapi.finalize()
|
|
myapi.Backend.ldap2.connect(bind_dn="cn=Directory Manager", bind_pw='password')
|
|
|
|
result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,))
|
|
entry_attrs = result['result']
|
|
cert = entry_attrs.get('usercertificate')
|
|
cert = cert[0]
|
|
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
|
assert serial is not None
|
|
|
|
def test_autobind(self):
|
|
"""
|
|
Test an autobind LDAP bind using ldap2
|
|
"""
|
|
ldapuri = 'ldapi://%%2fvar%%2frun%%2fslapd-%s.socket' % api.env.realm.replace('.','-')
|
|
self.conn = ldap2(shared_instance=False, ldap_uri=ldapuri)
|
|
try:
|
|
self.conn.connect(autobind=True)
|
|
except errors.DatabaseError, e:
|
|
if e.desc == 'Inappropriate authentication':
|
|
raise nose.SkipTest("Only executed as root")
|
|
(dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
|
|
cert = entry_attrs.get('usercertificate')
|
|
cert = cert[0]
|
|
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
|
assert serial is not None
|
|
|