ipatests: replace utility for editing sssd.conf

There are three patterns for editing sssd.conf in tests now:
1. using modify_sssd_conf() which allows to modify only domain sections
2. using remote_ini_file
3. direct file editing using `sed`

This patch introduces new utility function which combines advantages of
first two approaches:
* changes are verified against schema, so that mistakes can be spotted
  early
* has convenient interface for simple options modification,
  both in domain and service sections
* allows sophisticated modifications through SSSDConfig object

Fixes: https://pagure.io/freeipa/issue/8219
Reviewed-By: Alexander Bokovoy <abokovoy@redhat.com>
This commit is contained in:
Sergey Orlov 2020-03-05 16:05:28 +01:00
parent 888c7ba938
commit 9450aef75f
No known key found for this signature in database
GPG Key ID: ADF8C90EDD04503D
5 changed files with 94 additions and 77 deletions

View File

@ -799,20 +799,74 @@ def setup_sssd_debugging(host):
clear_sssd_cache(host) clear_sssd_cache(host)
def modify_sssd_conf(host, domain, mod_dict, provider='ipa', @contextmanager
provider_subtype=None): def remote_sssd_config(host):
""" """Context manager for editing sssd config file on a remote host.
modify options in a single domain section of host's sssd.conf
:param host: multihost.Host object It provides SimpleSSSDConfig object which is automatically serialized and
:param domain: domain section name to modify uploaded to remote host upon exit from the context.
:param mod_dict: dictionary of options which will be passed to
SSSDDomain.set_option(). To remove an option specify its value as If exception is raised inside the context then the ini file is NOT updated
None on remote host.
:param provider: provider backend to set. Defaults to ipa
:param provider_subtype: backend subtype (e.g. id or sudo), will be added SimpleSSSDConfig is a SSSDConfig descendant with added helper methods
to the domain config if not present for modifying options: edit_domain and edit_service.
"""
Example:
with remote_sssd_config(master) as sssd_conf:
# use helper methods
# add/replace option
sssd_conf.edit_domain(master.domain, 'filter_users', 'root')
# add/replace provider option
sssd_conf.edit_domain(master.domain, 'sudo_provider', 'ipa')
# delete option
sssd_conf.edit_service('pam', 'pam_verbosity', None)
# use original methods of SSSDConfig
domain = sssd_conf.get_domain(master.domain.name)
domain.set_name('example.test')
self.save_domain(domain)
"""
from SSSDConfig import SSSDConfig from SSSDConfig import SSSDConfig
class SimpleSSSDConfig(SSSDConfig):
def edit_domain(self, domain_or_name, option, value):
"""Add/replace/delete option in a domain section.
:param domain_or_name: Domain object or domain name
:param option: option name
:param value: value to assign to option. If None, option will be
deleted
"""
if hasattr(domain_or_name, 'name'):
domain_name = domain_or_name.name
else:
domain_name = domain_or_name
domain = self.get_domain(domain_name)
if value is None:
domain.remove_option(option)
else:
domain.set_option(option, value)
self.save_domain(domain)
def edit_service(self, service_name, option, value):
"""Add/replace/delete option in a service section.
:param service_name: a string
:param option: option name
:param value: value to assign to option. If None, option will be
deleted
"""
service = self.get_service(service_name)
if value is None:
service.remove_option(option)
else:
service.set_option(option, value)
self.save_service(service)
fd, temp_config_file = tempfile.mkstemp() fd, temp_config_file = tempfile.mkstemp()
os.close(fd) os.close(fd)
try: try:
@ -842,19 +896,12 @@ def modify_sssd_conf(host, domain, mod_dict, provider='ipa',
os.unlink(tarname) os.unlink(tarname)
# Use the imported schema # Use the imported schema
sssd_config = SSSDConfig( sssd_config = SimpleSSSDConfig(
schemafile=os.path.join(tar_dir, "sssd.api.conf"), schemafile=os.path.join(tar_dir, "sssd.api.conf"),
schemaplugindir=os.path.join(tar_dir, "sssd.api.d")) schemaplugindir=os.path.join(tar_dir, "sssd.api.d"))
sssd_config.import_config(temp_config_file) sssd_config.import_config(temp_config_file)
sssd_domain = sssd_config.get_domain(domain)
if provider_subtype is not None: yield sssd_config
sssd_domain.add_provider(provider, provider_subtype)
for m in mod_dict:
sssd_domain.set_option(m, mod_dict[m])
sssd_config.save_domain(sssd_domain)
new_config = sssd_config.dump(sssd_config.opts).encode('utf-8') new_config = sssd_config.dump(sssd_config.opts).encode('utf-8')
host.transport.put_file_contents(paths.SSSD_CONF, new_config) host.transport.put_file_contents(paths.SSSD_CONF, new_config)

View File

@ -698,14 +698,10 @@ class TestIPACommand(IntegrationTest):
username = "testuser" + str(random.randint(200000, 9999999)) username = "testuser" + str(random.randint(200000, 9999999))
# add ldap_deref_threshold=0 to /etc/sssd/sssd.conf # add ldap_deref_threshold=0 to /etc/sssd/sssd.conf
domain = self.master.domain sssd_conf_backup = tasks.FileBackup(self.master, paths.SSSD_CONF)
tasks.modify_sssd_conf( with tasks.remote_sssd_config(self.master) as sssd_config:
self.master, sssd_config.edit_domain(
domain.name, self.master.domain, 'ldap_deref_threshold', 0)
{
'ldap_deref_threshold': 0
},
)
try: try:
self.master.run_command(['systemctl', 'restart', 'sssd.service']) self.master.run_command(['systemctl', 'restart', 'sssd.service'])
@ -731,15 +727,7 @@ class TestIPACommand(IntegrationTest):
password='Secret123') password='Secret123')
client.close() client.close()
finally: finally:
# revert back to original ldap config sssd_conf_backup.restore()
# remove ldap_deref_threshold=0
tasks.modify_sssd_conf(
self.master,
domain.name,
{
'ldap_deref_threshold': None
},
)
self.master.run_command(['systemctl', 'restart', 'sssd.service']) self.master.run_command(['systemctl', 'restart', 'sssd.service'])
def test_user_mod_change_capitalization_issue5879(self): def test_user_mod_change_capitalization_issue5879(self):

View File

@ -118,10 +118,10 @@ class TestSSSDWithAdTrust(IntegrationTest):
@contextmanager @contextmanager
def filter_user_setup(self, user): def filter_user_setup(self, user):
sssd_conf_backup = tasks.FileBackup(self.master, paths.SSSD_CONF) sssd_conf_backup = tasks.FileBackup(self.master, paths.SSSD_CONF)
filter_user = {'filter_users': self.users[user]['name']}
try: try:
tasks.modify_sssd_conf(self.master, self.master.domain.name, with tasks.remote_sssd_config(self.master) as sssd_conf:
filter_user) sssd_conf.edit_domain(self.master.domain,
'filter_users', self.users[user]['name'])
tasks.clear_sssd_cache(self.master) tasks.clear_sssd_cache(self.master)
yield yield
finally: finally:

View File

@ -19,10 +19,12 @@
import pytest import pytest
from ipaplatform.paths import paths
from ipatests.test_integration.base import IntegrationTest from ipatests.test_integration.base import IntegrationTest
from ipatests.pytest_ipa.integration.tasks import ( from ipatests.pytest_ipa.integration.tasks import (
clear_sssd_cache, get_host_ip_with_hostmask, modify_sssd_conf) clear_sssd_cache, get_host_ip_with_hostmask, remote_sssd_config,
FileBackup)
class TestSudo(IntegrationTest): class TestSudo(IntegrationTest):
""" """
@ -348,15 +350,13 @@ class TestSudo(IntegrationTest):
# SSSD >= 1.13.3-3 uses native IPA schema instead of compat entries to # SSSD >= 1.13.3-3 uses native IPA schema instead of compat entries to
# pull in sudoers. Since native schema does not (yet) support # pull in sudoers. Since native schema does not (yet) support
# hostmasks, we need to point ldap_sudo_search_base to the old schema # hostmasks, we need to point ldap_sudo_search_base to the old schema
self.__class__.client_sssd_conf_backup = FileBackup(
self.client, paths.SSSD_CONF)
domain = self.client.domain domain = self.client.domain
modify_sssd_conf( with remote_sssd_config(self.client) as sssd_conf:
self.client, sssd_conf.edit_domain(domain, 'sudo_provider', 'ipa')
domain.name, sssd_conf.edit_domain(domain, 'ldap_sudo_search_base',
{ 'ou=sudoers,{}'.format(domain.basedn))
'ldap_sudo_search_base': 'ou=sudoers,{}'.format(domain.basedn)
},
provider_subtype='sudo'
)
def test_sudo_rule_restricted_to_one_hostmask(self): def test_sudo_rule_restricted_to_one_hostmask(self):
if self.__class__.skip_hostmask_based: if self.__class__.skip_hostmask_based:
@ -403,15 +403,7 @@ class TestSudo(IntegrationTest):
# reset ldap_sudo_search_base back to the default value, the old # reset ldap_sudo_search_base back to the default value, the old
# schema is not needed for the upcoming tests # schema is not needed for the upcoming tests
domain = self.client.domain self.client_sssd_conf_backup.restore()
modify_sssd_conf(
self.client,
domain.name,
{
'ldap_sudo_search_base': None
},
provider_subtype='sudo'
)
def test_sudo_rule_restricted_to_one_command_setup(self): def test_sudo_rule_restricted_to_one_command_setup(self):
# Reset testrule configuration # Reset testrule configuration

View File

@ -243,14 +243,9 @@ class TestTrust(BaseTestTrust):
try: try:
testuser = 'testuser@%s' % self.ad_domain testuser = 'testuser@%s' % self.ad_domain
domain = self.master.domain with tasks.remote_sssd_config(self.master) as sssd_conf:
tasks.modify_sssd_conf( sssd_conf.edit_domain(self.master.domain,
self.master, 'subdomain_homedir', '%o')
domain.name,
{
'subdomain_homedir': '%o'
}
)
tasks.clear_sssd_cache(self.master) tasks.clear_sssd_cache(self.master)
# The initgroups operation now uses the LDAP connection because # The initgroups operation now uses the LDAP connection because
@ -301,14 +296,9 @@ class TestTrust(BaseTestTrust):
conn.update_entry(entry) # pylint: disable=no-member conn.update_entry(entry) # pylint: disable=no-member
self.master.run_command(['ipactl', 'restart']) self.master.run_command(['ipactl', 'restart'])
domain = self.master.domain with tasks.remote_sssd_config(self.master) as sssd_conf:
tasks.modify_sssd_conf( sssd_conf.edit_domain(self.master.domain, 'timeout', '999999')
self.master,
domain.name,
{
'timeout': '999999'
}
)
remove_cache = 'sss_cache -E' remove_cache = 'sss_cache -E'
self.master.run_command(remove_cache) self.master.run_command(remove_cache)
client.run_command(remove_cache) client.run_command(remove_cache)