diff --git a/ipatests/pytest_ipa/integration/tasks.py b/ipatests/pytest_ipa/integration/tasks.py index 1a9e46ef5..c56d623ad 100755 --- a/ipatests/pytest_ipa/integration/tasks.py +++ b/ipatests/pytest_ipa/integration/tasks.py @@ -799,20 +799,74 @@ def setup_sssd_debugging(host): clear_sssd_cache(host) -def modify_sssd_conf(host, domain, mod_dict, provider='ipa', - provider_subtype=None): - """ - modify options in a single domain section of host's sssd.conf - :param host: multihost.Host object - :param domain: domain section name to modify - :param mod_dict: dictionary of options which will be passed to - SSSDDomain.set_option(). To remove an option specify its value as - None - :param provider: provider backend to set. Defaults to ipa - :param provider_subtype: backend subtype (e.g. id or sudo), will be added - to the domain config if not present - """ +@contextmanager +def remote_sssd_config(host): + """Context manager for editing sssd config file on a remote host. + + It provides SimpleSSSDConfig object which is automatically serialized and + uploaded to remote host upon exit from the context. + + If exception is raised inside the context then the ini file is NOT updated + on remote host. + + SimpleSSSDConfig is a SSSDConfig descendant with added helper methods + for modifying options: edit_domain and edit_service. + + + Example: + + with remote_sssd_config(master) as sssd_conf: + # use helper methods + # add/replace option + sssd_conf.edit_domain(master.domain, 'filter_users', 'root') + # add/replace provider option + sssd_conf.edit_domain(master.domain, 'sudo_provider', 'ipa') + # delete option + sssd_conf.edit_service('pam', 'pam_verbosity', None) + + # use original methods of SSSDConfig + domain = sssd_conf.get_domain(master.domain.name) + domain.set_name('example.test') + self.save_domain(domain) + """ + from SSSDConfig import SSSDConfig + + class SimpleSSSDConfig(SSSDConfig): + def edit_domain(self, domain_or_name, option, value): + """Add/replace/delete option in a domain section. + + :param domain_or_name: Domain object or domain name + :param option: option name + :param value: value to assign to option. If None, option will be + deleted + """ + if hasattr(domain_or_name, 'name'): + domain_name = domain_or_name.name + else: + domain_name = domain_or_name + domain = self.get_domain(domain_name) + if value is None: + domain.remove_option(option) + else: + domain.set_option(option, value) + self.save_domain(domain) + + def edit_service(self, service_name, option, value): + """Add/replace/delete option in a service section. + + :param service_name: a string + :param option: option name + :param value: value to assign to option. If None, option will be + deleted + """ + service = self.get_service(service_name) + if value is None: + service.remove_option(option) + else: + service.set_option(option, value) + self.save_service(service) + fd, temp_config_file = tempfile.mkstemp() os.close(fd) try: @@ -842,19 +896,12 @@ def modify_sssd_conf(host, domain, mod_dict, provider='ipa', os.unlink(tarname) # Use the imported schema - sssd_config = SSSDConfig( + sssd_config = SimpleSSSDConfig( schemafile=os.path.join(tar_dir, "sssd.api.conf"), schemaplugindir=os.path.join(tar_dir, "sssd.api.d")) sssd_config.import_config(temp_config_file) - sssd_domain = sssd_config.get_domain(domain) - if provider_subtype is not None: - sssd_domain.add_provider(provider, provider_subtype) - - for m in mod_dict: - sssd_domain.set_option(m, mod_dict[m]) - - sssd_config.save_domain(sssd_domain) + yield sssd_config new_config = sssd_config.dump(sssd_config.opts).encode('utf-8') host.transport.put_file_contents(paths.SSSD_CONF, new_config) diff --git a/ipatests/test_integration/test_commands.py b/ipatests/test_integration/test_commands.py index 79c1e52d6..8a5d4c071 100644 --- a/ipatests/test_integration/test_commands.py +++ b/ipatests/test_integration/test_commands.py @@ -698,14 +698,10 @@ class TestIPACommand(IntegrationTest): username = "testuser" + str(random.randint(200000, 9999999)) # add ldap_deref_threshold=0 to /etc/sssd/sssd.conf - domain = self.master.domain - tasks.modify_sssd_conf( - self.master, - domain.name, - { - 'ldap_deref_threshold': 0 - }, - ) + sssd_conf_backup = tasks.FileBackup(self.master, paths.SSSD_CONF) + with tasks.remote_sssd_config(self.master) as sssd_config: + sssd_config.edit_domain( + self.master.domain, 'ldap_deref_threshold', 0) try: self.master.run_command(['systemctl', 'restart', 'sssd.service']) @@ -731,15 +727,7 @@ class TestIPACommand(IntegrationTest): password='Secret123') client.close() finally: - # revert back to original ldap config - # remove ldap_deref_threshold=0 - tasks.modify_sssd_conf( - self.master, - domain.name, - { - 'ldap_deref_threshold': None - }, - ) + sssd_conf_backup.restore() self.master.run_command(['systemctl', 'restart', 'sssd.service']) def test_user_mod_change_capitalization_issue5879(self): diff --git a/ipatests/test_integration/test_sssd.py b/ipatests/test_integration/test_sssd.py index bbc4edb98..1b89aeaaf 100644 --- a/ipatests/test_integration/test_sssd.py +++ b/ipatests/test_integration/test_sssd.py @@ -118,10 +118,10 @@ class TestSSSDWithAdTrust(IntegrationTest): @contextmanager def filter_user_setup(self, user): sssd_conf_backup = tasks.FileBackup(self.master, paths.SSSD_CONF) - filter_user = {'filter_users': self.users[user]['name']} try: - tasks.modify_sssd_conf(self.master, self.master.domain.name, - filter_user) + with tasks.remote_sssd_config(self.master) as sssd_conf: + sssd_conf.edit_domain(self.master.domain, + 'filter_users', self.users[user]['name']) tasks.clear_sssd_cache(self.master) yield finally: diff --git a/ipatests/test_integration/test_sudo.py b/ipatests/test_integration/test_sudo.py index faa427d1e..031635135 100644 --- a/ipatests/test_integration/test_sudo.py +++ b/ipatests/test_integration/test_sudo.py @@ -19,10 +19,12 @@ import pytest +from ipaplatform.paths import paths + from ipatests.test_integration.base import IntegrationTest from ipatests.pytest_ipa.integration.tasks import ( - clear_sssd_cache, get_host_ip_with_hostmask, modify_sssd_conf) - + clear_sssd_cache, get_host_ip_with_hostmask, remote_sssd_config, + FileBackup) class TestSudo(IntegrationTest): """ @@ -348,15 +350,13 @@ class TestSudo(IntegrationTest): # SSSD >= 1.13.3-3 uses native IPA schema instead of compat entries to # pull in sudoers. Since native schema does not (yet) support # hostmasks, we need to point ldap_sudo_search_base to the old schema + self.__class__.client_sssd_conf_backup = FileBackup( + self.client, paths.SSSD_CONF) domain = self.client.domain - modify_sssd_conf( - self.client, - domain.name, - { - 'ldap_sudo_search_base': 'ou=sudoers,{}'.format(domain.basedn) - }, - provider_subtype='sudo' - ) + with remote_sssd_config(self.client) as sssd_conf: + sssd_conf.edit_domain(domain, 'sudo_provider', 'ipa') + sssd_conf.edit_domain(domain, 'ldap_sudo_search_base', + 'ou=sudoers,{}'.format(domain.basedn)) def test_sudo_rule_restricted_to_one_hostmask(self): if self.__class__.skip_hostmask_based: @@ -403,15 +403,7 @@ class TestSudo(IntegrationTest): # reset ldap_sudo_search_base back to the default value, the old # schema is not needed for the upcoming tests - domain = self.client.domain - modify_sssd_conf( - self.client, - domain.name, - { - 'ldap_sudo_search_base': None - }, - provider_subtype='sudo' - ) + self.client_sssd_conf_backup.restore() def test_sudo_rule_restricted_to_one_command_setup(self): # Reset testrule configuration diff --git a/ipatests/test_integration/test_trust.py b/ipatests/test_integration/test_trust.py index e59522447..a38da28b6 100644 --- a/ipatests/test_integration/test_trust.py +++ b/ipatests/test_integration/test_trust.py @@ -243,14 +243,9 @@ class TestTrust(BaseTestTrust): try: testuser = 'testuser@%s' % self.ad_domain - domain = self.master.domain - tasks.modify_sssd_conf( - self.master, - domain.name, - { - 'subdomain_homedir': '%o' - } - ) + with tasks.remote_sssd_config(self.master) as sssd_conf: + sssd_conf.edit_domain(self.master.domain, + 'subdomain_homedir', '%o') tasks.clear_sssd_cache(self.master) # The initgroups operation now uses the LDAP connection because @@ -301,14 +296,9 @@ class TestTrust(BaseTestTrust): conn.update_entry(entry) # pylint: disable=no-member self.master.run_command(['ipactl', 'restart']) - domain = self.master.domain - tasks.modify_sssd_conf( - self.master, - domain.name, - { - 'timeout': '999999' - } - ) + with tasks.remote_sssd_config(self.master) as sssd_conf: + sssd_conf.edit_domain(self.master.domain, 'timeout', '999999') + remove_cache = 'sss_cache -E' self.master.run_command(remove_cache) client.run_command(remove_cache)