from __future__ import absolute_import import time import pytest import re import textwrap from ipaplatform.paths import paths from ipatests.test_integration.base import IntegrationTest from ipatests.pytest_ipa.integration import tasks, create_keycloak user_code_script = textwrap.dedent(""" from selenium import webdriver from datetime import datetime from selenium.webdriver.firefox.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC options = Options() options.headless = True driver = webdriver.Firefox(executable_path="/opt/geckodriver", options=options) verification_uri = "{uri}" driver.get(verification_uri) try: element = WebDriverWait(driver, 90).until( EC.presence_of_element_located((By.ID, "username"))) driver.find_element(By.ID, "username").send_keys("testuser1") driver.find_element(By.ID, "password").send_keys("{passwd}") driver.find_element(By.ID, "kc-login").click() element = WebDriverWait(driver, 90).until( EC.presence_of_element_located((By.ID, "kc-login"))) driver.find_element(By.ID, "kc-login").click() assert "Device Login Successful" in driver.page_source finally: now = datetime.now().strftime("%M-%S") driver.get_screenshot_as_file("/var/log/httpd/screenshot-%s.png" % now) driver.quit() """) def add_user_code(host, verification_uri): contents = user_code_script.format(uri=verification_uri, passwd=host.config.admin_password) try: host.put_file_contents("/tmp/add_user_code.py", contents) tasks.run_repeatedly( host, ['python3', '/tmp/add_user_code.py']) finally: host.run_command(["rm", "-f", "/tmp/add_user_code.py"]) def kinit_idp(host, user, keycloak_server): ARMOR = "/tmp/armor" tasks.kdestroy_all(host) # create armor for FAST host.run_command(["kinit", "-n", "-c", ARMOR]) cmd = ["kinit", "-T", ARMOR, user] with host.spawn_expect(cmd, default_timeout=100) as e: e.expect('Authenticate at (.+) and press ENTER.:') prompt = e.get_last_output() uri = re.search(r'Authenticate at (.*?) and press ENTER.:', prompt ).group(1) time.sleep(15) if uri: add_user_code(keycloak_server, uri) e.sendline('\n') e.expect_exit() test_idp = host.run_command(["klist", "-C"]) assert "152" in test_idp.stdout_text class TestIDPKeycloak(IntegrationTest): num_replicas = 2 topology = 'line' @classmethod def install(cls, mh): cls.client = cls.replicas[0] cls.replica = cls.replicas[1] tasks.install_master(cls.master, extra_args=['--no-dnssec-validation']) tasks.install_client(cls.master, cls.replicas[0], extra_args=["--mkhomedir"]) tasks.install_replica(cls.master, cls.replicas[1]) for host in [cls.master, cls.replicas[0], cls.replicas[1]]: content = host.get_file_contents(paths.IPA_DEFAULT_CONF, encoding='utf-8') new_content = content + "\noidc_child_debug_level = 10" host.put_file_contents(paths.IPA_DEFAULT_CONF, new_content) with tasks.remote_sssd_config(cls.master) as sssd_config: sssd_config.edit_domain( cls.master.domain, 'krb5_auth_timeout', 1100) tasks.clear_sssd_cache(cls.master) tasks.clear_sssd_cache(cls.replicas[0]) tasks.kinit_admin(cls.master) cls.master.run_command(["ipa", "config-mod", "--user-auth-type=idp", "--user-auth-type=password"]) xvfb = ("nohup /usr/bin/Xvfb :99 -ac -noreset -screen 0 1400x1200x8 " "/dev/null &") cls.replicas[0].run_command(xvfb) def test_auth_keycloak_idp(self): """ Test case to check that OAuth 2.0 Device Authorization Grant is working as expected for user configured with external idp. """ create_keycloak.setup_keycloakserver(self.client) time.sleep(60) create_keycloak.setup_keycloak_client(self.client) tasks.kinit_admin(self.master) cmd = ["ipa", "idp-add", "keycloakidp", "--provider=keycloak", "--client-id=ipa_oidc_client", "--org=master", "--base-url={0}:8443/auth".format(self.client.hostname)] self.master.run_command(cmd, stdin_text="{0}\n{0}".format( self.client.config.admin_password)) tasks.user_add(self.master, 'keycloakuser', extra_args=["--user-auth-type=idp", "--idp-user-id=testuser1@ipa.test", "--idp=keycloakidp"] ) list_user = self.master.run_command( ["ipa", "user-find", "--idp-user-id=testuser1@ipa.test"] ) assert "keycloakuser" in list_user.stdout_text list_by_idp = self.master.run_command(["ipa", "user-find", "--idp=keycloakidp"] ) assert "keycloakuser" in list_by_idp.stdout_text list_by_user = self.master.run_command( ["ipa", "user-find", "--idp-user-id=testuser1@ipa.test", "--all"] ) assert "keycloakidp" in list_by_user.stdout_text tasks.clear_sssd_cache(self.master) kinit_idp(self.master, 'keycloakuser', keycloak_server=self.client) @pytest.fixture def hbac_setup_teardown(self): # allow sshd only on given host tasks.kinit_admin(self.master) self.master.run_command(["ipa", "hbacrule-disable", "allow_all"]) self.master.run_command(["ipa", "hbacrule-add", "rule1"]) self.master.run_command(["ipa", "hbacrule-add-user", "rule1", "--users=keycloakuser"] ) self.master.run_command(["ipa", "hbacrule-add-host", "rule1", "--hosts", self.replica.hostname]) self.master.run_command(["ipa", "hbacrule-add-service", "rule1", "--hbacsvcs=sshd"] ) tasks.clear_sssd_cache(self.master) tasks.clear_sssd_cache(self.replica) yield # cleanup tasks.kinit_admin(self.master) self.master.run_command(["ipa", "hbacrule-enable", "allow_all"]) self.master.run_command(["ipa", "hbacrule-del", "rule1"]) def test_auth_hbac(self, hbac_setup_teardown): """ Test case to check that hbacrule is working as expected for user configured with external idp. """ kinit_idp(self.master, 'keycloakuser', keycloak_server=self.client) ssh_cmd = "ssh -q -K -l keycloakuser {0} whoami" valid_ssh = self.master.run_command( ssh_cmd.format(self.replica.hostname)) assert "keycloakuser" in valid_ssh.stdout_text negative_ssh = self.master.run_command( ssh_cmd.format(self.master.hostname), raiseonerr=False ) assert negative_ssh.returncode == 255 def test_auth_sudo_idp(self): """ Test case to check that sudorule is working as expected for user configured with external idp. """ tasks.kdestroy_all(self.master) tasks.kinit_admin(self.master) # rule: keycloakuser are allowed to execute yum on # the client machine as root. cmdlist = [ ["ipa", "sudocmd-add", "/usr/bin/yum"], ["ipa", "sudorule-add", "sudorule"], ['ipa', 'sudorule-add-user', '--users=keycloakuser', 'sudorule'], ['ipa', 'sudorule-add-host', '--hosts', self.client.hostname, 'sudorule'], ['ipa', 'sudorule-add-runasuser', '--users=root', 'sudorule'], ['ipa', 'sudorule-add-allow-command', '--sudocmds=/usr/bin/yum', 'sudorule'], ['ipa', 'sudorule-show', 'sudorule', '--all'], ['ipa', 'sudorule-add-option', 'sudorule', '--sudooption', "!authenticate"] ] for cmd in cmdlist: self.master.run_command(cmd) tasks.clear_sssd_cache(self.master) tasks.clear_sssd_cache(self.client) try: cmd = 'sudo -ll -U keycloakuser' test = self.client.run_command(cmd).stdout_text assert "User keycloakuser may run the following commands" in test assert "/usr/bin/yum" in test kinit_idp(self.client, 'keycloakuser', self.client) test_sudo = 'su -c "sudo yum list wget" keycloakuser' self.client.run_command(test_sudo) list_fail = self.master.run_command(cmd).stdout_text assert "User keycloakuser is not allowed to run sudo" in list_fail finally: tasks.kinit_admin(self.master) self.master.run_command(['ipa', 'sudorule-del', 'sudorule']) self.master.run_command(["ipa", "sudocmd-del", "/usr/bin/yum"]) def test_auth_replica(self): """ Test case to check that OAuth 2.0 Device Authorization is working as expected on replica. """ tasks.clear_sssd_cache(self.master) tasks.clear_sssd_cache(self.replica) tasks.kinit_admin(self.replica) list_user = self.master.run_command( ["ipa", "user-find", "--idp-user-id=testuser1@ipa.test"] ) assert "keycloakuser" in list_user.stdout_text list_by_idp = self.replica.run_command(["ipa", "user-find", "--idp=keycloakidp"] ) assert "keycloakuser" in list_by_idp.stdout_text list_by_user = self.replica.run_command( ["ipa", "user-find", "--idp-user-id=testuser1@ipa.test", "--all"] ) assert "keycloakidp" in list_by_user.stdout_text kinit_idp(self.replica, 'keycloakuser', keycloak_server=self.client) def test_idp_with_services(self): """ Test case to check that services can be configured auth indicator as idp. """ tasks.clear_sssd_cache(self.master) tasks.kinit_admin(self.master) domain = self.master.domain.name.upper() services = [ "DNS/{0}@{1}".format(self.master.hostname, domain), "HTTP/{0}@{1}".format(self.client.hostname, domain), "dogtag/{0}@{1}".format(self.master.hostname, domain), "ipa-dnskeysyncd/{0}@{1}".format(self.master.hostname, domain) ] try: for service in services: test = self.master.run_command(["ipa", "service-mod", service, "--auth-ind=idp"] ) assert "Authentication Indicators: idp" in test.stdout_text finally: for service in services: self.master.run_command(["ipa", "service-mod", service, "--auth-ind="]) def test_idp_backup_restore(self): """ Test case to check that after restore data is retrieved with related idp configuration. """ tasks.kinit_admin(self.master) user = "backupuser" cmd = ["ipa", "idp-add", "testidp", "--provider=keycloak", "--client-id=ipa_oidc_client", "--org=master", "--base-url={0}:8443/auth".format(self.client.hostname)] self.master.run_command(cmd, stdin_text="{0}\n{0}".format( self.client.config.admin_password)) tasks.user_add(self.master, user, extra_args=["--user-auth-type=idp", "--idp-user-id=testuser1@ipa.test", "--idp=testidp"] ) backup_path = tasks.get_backup_dir(self.master) # change data after backup self.master.run_command(['ipa', 'user-del', user]) self.master.run_command(['ipa', 'idp-del', 'testidp']) dirman_password = self.master.config.dirman_password self.master.run_command(['ipa-restore', backup_path], stdin_text=dirman_password + '\nyes') try: list_user = self.master.run_command( ['ipa', 'user-show', 'backupuser', '--all'] ).stdout_text assert "External IdP configuration: testidp" in list_user assert "User authentication types: idp" in list_user assert ("External IdP user identifier: " "testuser1@ipa.test") in list_user list_idp = self.master.run_command(['ipa', 'idp-find', 'testidp']) assert "testidp" in list_idp.stdout_text kinit_idp(self.master, user, self.client) finally: tasks.kdestroy_all(self.master) tasks.kinit_admin(self.master) self.master.run_command(["rm", "-rf", backup_path]) self.master.run_command(["ipa", "idp-del", "testidp"])