From 73bc11a20b904a1203201f5525f147fe1737e960 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 3 Jan 2019 20:56:39 +0100 Subject: [PATCH] Add ldapmodify/search helper functions Move common LDAP commands to ldapmodify_dm() and ldapsearch_dm() helper functions. Signed-off-by: Christian Heimes Reviewed-By: Rob Crittenden --- ipatests/pytest_ipa/integration/tasks.py | 51 +++++++++++-- .../test_backup_and_restore.py | 14 +--- ipatests/test_integration/test_commands.py | 76 ++++++------------- ipatests/test_integration/test_external_ca.py | 17 ++--- .../test_replica_promotion.py | 13 +--- 5 files changed, 75 insertions(+), 96 deletions(-) diff --git a/ipatests/pytest_ipa/integration/tasks.py b/ipatests/pytest_ipa/integration/tasks.py index f103f19ea..2189353f8 100644 --- a/ipatests/pytest_ipa/integration/tasks.py +++ b/ipatests/pytest_ipa/integration/tasks.py @@ -303,12 +303,7 @@ def enable_replication_debugging(host, log_level=0): replace: nsslapd-errorlog-level nsslapd-errorlog-level: {log_level} """.format(log_level=log_level)) - host.run_command(['ldapmodify', '-x', '-ZZ', - '-D', str(host.config.dirman_dn), - '-w', host.config.dirman_password, - ], - stdin_text=logging_ldif) - + ldapmodify_dm(host, logging_ldif) def set_default_ttl_for_ipa_dns_zone(host, raiseonerr=True): args = [ @@ -1613,3 +1608,47 @@ def group_add(host, groupname, extra_args=()): ] cmd.extend(extra_args) return host.run_command(cmd) + + +def ldapmodify_dm(host, ldif_text, **kwargs): + """Run ldapmodify as Directory Manager + + :param host: host object + :param ldif_text: ldif string + :param kwargs: additional keyword arguments to run_command() + :return: result object + """ + # no hard-coded hostname, let ldapmodify pick up the host from ldap.conf. + args = [ + 'ldapmodify', + '-x', + '-D', str(host.config.dirman_dn), # pylint: disable=no-member + '-w', host.config.dirman_password + ] + return host.run_command(args, stdin_text=ldif_text, **kwargs) + + +def ldapsearch_dm(host, base, ldap_args, scope='sub', **kwargs): + """Run ldapsearch as Directory Manager + + :param host: host object + :param base: Base DN + :param ldap_args: additional arguments to ldapsearch (filter, attributes) + :param scope: search scope (base, sub, one) + :param kwargs: additional keyword arguments to run_command() + :return: result object + """ + args = [ + 'ldapsearch', + '-x', '-ZZ', + '-h', host.hostname, + '-p', '389', + '-D', str(host.config.dirman_dn), # pylint: disable=no-member + '-w', host.config.dirman_password, + '-s', scope, + '-b', base, + '-o', 'ldif-wrap=no', + '-LLL', + ] + args.extend(ldap_args) + return host.run_command(args, **kwargs) diff --git a/ipatests/test_integration/test_backup_and_restore.py b/ipatests/test_integration/test_backup_and_restore.py index cb045a22b..7f04d7abb 100644 --- a/ipatests/test_integration/test_backup_and_restore.py +++ b/ipatests/test_integration/test_backup_and_restore.py @@ -23,7 +23,6 @@ import logging import os import re import contextlib -from tempfile import NamedTemporaryFile import pytest from ipaplatform.constants import constants @@ -794,8 +793,6 @@ class TestReplicaInstallAfterRestore(IntegrationTest): suffix = ipautil.realm_to_suffix(master.domain.realm) suffix = escape_dn_chars(str(suffix)) - tf = NamedTemporaryFile() - ldif_file = tf.name entry_ldif = ( "dn: cn=meTo{hostname},cn=replica," "cn={suffix}," @@ -811,17 +808,8 @@ class TestReplicaInstallAfterRestore(IntegrationTest): "nsds5ReplicaEnabled: off").format( hostname=replica1.hostname, suffix=suffix) - master.put_file_contents(ldif_file, entry_ldif) - # disable replication agreement - arg = ['ldapmodify', - '-ZZ', - '-h', master.hostname, - '-p', '389', '-D', - str(master.config.dirman_dn), # pylint: disable=no-member - '-w', master.config.dirman_password, - '-f', ldif_file] - master.run_command(arg) + tasks.ldapmodify_dm(master, entry_ldif) # uninstall master. tasks.uninstall_master(master, clean=False) diff --git a/ipatests/test_integration/test_commands.py b/ipatests/test_integration/test_commands.py index 209cd4ef7..70f14d319 100644 --- a/ipatests/test_integration/test_commands.py +++ b/ipatests/test_integration/test_commands.py @@ -10,7 +10,6 @@ import re import os import logging import ssl -from tempfile import NamedTemporaryFile from itertools import chain, repeat import textwrap import time @@ -124,8 +123,6 @@ class TestIPACommand(IntegrationTest): master = self.master base_dn = str(master.domain.basedn) # pylint: disable=no-member - tf = NamedTemporaryFile() - ldif_file = tf.name entry_ldif = textwrap.dedent(""" dn: uid=system,cn=sysaccounts,cn=etc,{base_dn} changetype: add @@ -138,19 +135,29 @@ class TestIPACommand(IntegrationTest): """).format( base_dn=base_dn, original_passwd=original_passwd) - master.put_file_contents(ldif_file, entry_ldif) - arg = ['ldapmodify', - '-ZZ', - '-h', master.hostname, - '-p', '389', '-D', - str(master.config.dirman_dn), # pylint: disable=no-member - '-w', master.config.dirman_password, - '-f', ldif_file] - master.run_command(arg) + tasks.ldapmodify_dm(master, entry_ldif) tasks.ldappasswd_sysaccount_change(sysuser, original_passwd, new_passwd, master) + def get_krbinfo(self, user): + base_dn = str(self.master.domain.basedn) # pylint: disable=no-member + result = tasks.ldapsearch_dm( + self.master, + 'uid={user},cn=users,cn=accounts,{base_dn}'.format( + user=user, base_dn=base_dn), + ['krblastpwdchange', 'krbpasswordexpiration'], + scope='base' + ) + output = result.stdout_text.lower() + + # extract krblastpwdchange and krbpasswordexpiration + krbchg_pattern = 'krblastpwdchange: (.+)\n' + krbexp_pattern = 'krbpasswordexpiration: (.+)\n' + krblastpwdchange = re.findall(krbchg_pattern, output)[0] + krbexp = re.findall(krbexp_pattern, output)[0] + return krblastpwdchange, krbexp + def test_ldapmodify_password_issue7601(self): user = 'ipauser' original_passwd = 'Secret123' @@ -173,33 +180,12 @@ class TestIPACommand(IntegrationTest): new=original_passwd) master.run_command(['kinit', user], stdin_text=user_kinit_stdin_text) # Retrieve krblastpwdchange and krbpasswordexpiration - search_cmd = [ - 'ldapsearch', '-x', '-ZZ', - '-h', master.hostname, - '-p', '389', - '-D', 'cn=directory manager', - '-w', master.config.dirman_password, - '-s', 'base', - '-b', 'uid={user},cn=users,cn=accounts,{base_dn}'.format( - user=user, base_dn=base_dn), - '-o', 'ldif-wrap=no', - '-LLL', - 'krblastpwdchange', - 'krbpasswordexpiration'] - output = master.run_command(search_cmd).stdout_text.lower() - - # extract krblastpwdchange and krbpasswordexpiration - krbchg_pattern = 'krblastpwdchange: (.+)\n' - krbexp_pattern = 'krbpasswordexpiration: (.+)\n' - krblastpwdchange = re.findall(krbchg_pattern, output)[0] - krbexp = re.findall(krbexp_pattern, output)[0] + krblastpwdchange, krbexp = self.get_krbinfo(user) # sleep 1 sec (krblastpwdchange and krbpasswordexpiration have at most # a 1s precision) time.sleep(1) # perform ldapmodify on userpassword as dir mgr - mod = NamedTemporaryFile() - ldif_file = mod.name entry_ldif = textwrap.dedent(""" dn: uid={user},cn=users,cn=accounts,{base_dn} changetype: modify @@ -209,25 +195,13 @@ class TestIPACommand(IntegrationTest): user=user, base_dn=base_dn, new_passwd=new_passwd) - master.put_file_contents(ldif_file, entry_ldif) - arg = ['ldapmodify', - '-ZZ', - '-h', master.hostname, - '-p', '389', '-D', - str(master.config.dirman_dn), # pylint: disable=no-member - '-w', master.config.dirman_password, - '-f', ldif_file] - master.run_command(arg) + tasks.ldapmodify_dm(master, entry_ldif) # Test new password with kinit master.run_command(['kinit', user], stdin_text=new_passwd) - # Retrieve krblastpwdchange and krbpasswordexpiration - output = master.run_command(search_cmd).stdout_text.lower() - # extract krblastpwdchange and krbpasswordexpiration - newkrblastpwdchange = re.findall(krbchg_pattern, output)[0] - newkrbexp = re.findall(krbexp_pattern, output)[0] # both should have changed + newkrblastpwdchange, newkrbexp = self.get_krbinfo(user) assert newkrblastpwdchange != krblastpwdchange assert newkrbexp != krbexp @@ -246,13 +220,9 @@ class TestIPACommand(IntegrationTest): ) # Test new password with kinit master.run_command(['kinit', user], stdin_text=new_passwd2) - # Retrieve krblastpwdchange and krbpasswordexpiration - output = master.run_command(search_cmd).stdout_text.lower() - # extract krblastpwdchange and krbpasswordexpiration - newkrblastpwdchange2 = re.findall(krbchg_pattern, output)[0] - newkrbexp2 = re.findall(krbexp_pattern, output)[0] # both should have changed + newkrblastpwdchange2, newkrbexp2 = self.get_krbinfo(user) assert newkrblastpwdchange != newkrblastpwdchange2 assert newkrbexp != newkrbexp2 diff --git a/ipatests/test_integration/test_external_ca.py b/ipatests/test_integration/test_external_ca.py index d2d081535..192491cde 100644 --- a/ipatests/test_integration/test_external_ca.py +++ b/ipatests/test_integration/test_external_ca.py @@ -130,18 +130,11 @@ class TestExternalCA(IntegrationTest): tasks.install_replica(self.master, self.replicas[0]) # check that nsds5ReplicaReleaseTimeout option was set - result = self.master.run_command([ - 'ldapsearch', - '-x', - '-ZZ', - '-h', self.master.hostname, - '-D', 'cn=directory manager', - '-w', self.master.config.dirman_password, - '-b', 'cn=mapping tree,cn=config', - '(cn=replica)', - '-LLL', - '-o', - 'ldif-wrap=no']) + result = tasks.ldapsearch_dm( + self.master, + 'cn=mapping tree,cn=config', + ['(cn=replica)'], + ) # case insensitive match text = result.stdout_text.lower() # see ipaserver.install.replication.REPLICA_FINAL_SETTINGS diff --git a/ipatests/test_integration/test_replica_promotion.py b/ipatests/test_integration/test_replica_promotion.py index 3b0993ab4..c6e3550bf 100644 --- a/ipatests/test_integration/test_replica_promotion.py +++ b/ipatests/test_integration/test_replica_promotion.py @@ -6,7 +6,6 @@ from __future__ import absolute_import import time import re -from tempfile import NamedTemporaryFile import textwrap from ipatests.test_integration.base import IntegrationTest from ipatests.pytest_ipa.integration import tasks @@ -384,8 +383,6 @@ class TestReplicaInstallWithExistingEntry(IntegrationTest): master = self.master tasks.install_master(master) replica = self.replicas[0] - tf = NamedTemporaryFile() - ldif_file = tf.name base_dn = "dc=%s" % (",dc=".join(replica.domain.name.split("."))) # adding entry for replica on master so that master will have it before # replica installtion begins and creates a situation for pagure-7174 @@ -401,15 +398,7 @@ class TestReplicaInstallWithExistingEntry(IntegrationTest): memberPrincipal: ldap/{hostname}@{realm}""").format( base_dn=base_dn, hostname=replica.hostname, realm=replica.domain.name.upper()) - master.put_file_contents(ldif_file, entry_ldif) - arg = ['ldapmodify', - '-ZZ', - '-h', master.hostname, - '-p', '389', '-D', - str(master.config.dirman_dn), # pylint: disable=no-member - '-w', master.config.dirman_password, - '-f', ldif_file] - master.run_command(arg) + tasks.ldapmodify_dm(master, entry_ldif) tasks.install_replica(master, replica)