diff --git a/ipatests/test_integration/test_otp.py b/ipatests/test_integration/test_otp.py index ef1af74e6..cbb602cc0 100644 --- a/ipatests/test_integration/test_otp.py +++ b/ipatests/test_integration/test_otp.py @@ -4,8 +4,11 @@ """OTP token tests """ import base64 +import logging +import paramiko import re import time +import textwrap from urllib.parse import urlparse, parse_qs from cryptography.hazmat.backends import default_backend @@ -14,12 +17,14 @@ from cryptography.hazmat.primitives.twofactor.hotp import HOTP from cryptography.hazmat.primitives.twofactor.totp import TOTP from ipatests.test_integration.base import IntegrationTest +from ipaplatform.paths import paths from ipatests.pytest_ipa.integration import tasks PASSWORD = "DummyPassword123" USER = "opttestuser" ARMOR = "/tmp/armor" +logger = logging.getLogger(__name__) def add_otptoken(host, owner, *, otptype="hotp", digits=6, algo="sha1"): @@ -74,6 +79,39 @@ def kinit_otp(host, user, *, password, otp, success=True): ) +def ssh_2f(hostname, username, answers_dict, port=22): + """ + :param hostname: hostname + :param username: username + :param answers_dict: dictionary of options with prompt_message and value. + :param port: port for ssh + """ + # Handler for server questions + def answer_handler(title, instructions, prompt_list): + resp = [] + if title: + print(title.strip()) + if instructions: + print(instructions.strip()) + for prmpt in prompt_list: + prmpt_str = prmpt[0].strip() + resp.append(answers_dict[prmpt_str]) + logger.info("Prompt is: '%s'", prmpt_str) + logger.info( + "Answer to ssh prompt is: '%s'", answers_dict[prmpt_str]) + return resp + trans = paramiko.Transport((hostname, port)) + trans.connect() + trans.auth_interactive(username, answer_handler) + + +def set_sssd_conf(host, add_contents): + contents = host.get_file_contents(paths.SSSD_CONF, encoding="utf-8") + file_contents = contents + add_contents + host.put_file_contents(paths.SSSD_CONF, file_contents) + tasks.clear_sssd_cache(host) + + class TestOTPToken(IntegrationTest): """Tests for member manager feature for groups and hostgroups """ @@ -183,3 +221,82 @@ class TestOTPToken(IntegrationTest): kinit_otp(master, USER, password=PASSWORD, otp=otpvalue) del_otptoken(master, otpuid) + + def test_2fa_enable_single_prompt(self): + """Test ssh with 2FA when single prompt is enabled. + + Test for : https://pagure.io/SSSD/sssd/issue/3264 + + When [prompting/2fa/sshd] with single_prompt = True is set + then during ssh it should be prompted with given message + for first and second factor at once. + """ + master = self.master + USER1 = 'sshuser1' + sssd_conf_backup = tasks.FileBackup(master, paths.SSSD_CONF) + first_prompt = 'Please enter password + OTP token value:' + add_contents = textwrap.dedent(''' + [prompting/2fa/sshd] + single_prompt = True + first_prompt = {0} + ''').format(first_prompt) + set_sssd_conf(master, add_contents) + tasks.create_active_user(master, USER1, PASSWORD) + tasks.kinit_admin(master) + master.run_command(['ipa', 'user-mod', USER1, '--user-auth-type=otp']) + try: + otpuid, totp = add_otptoken(master, USER1, otptype='totp') + master.run_command(['ipa', 'otptoken-show', otpuid]) + otpvalue = totp.generate(int(time.time())).decode('ascii') + answers = { + first_prompt: '{0}{1}'.format(PASSWORD, otpvalue), + } + ssh_2f(master.hostname, USER1, answers) + # check if user listed in output + cmd = self.master.run_command(['semanage', 'login', '-l']) + assert USER1 in cmd.stdout_text + finally: + master.run_command(['ipa', 'user-del', USER1]) + self.master.run_command(['semanage', 'login', '-D']) + sssd_conf_backup.restore() + + def test_2fa_disable_single_prompt(self): + """Test ssh with 2FA when single prompt is disabled. + + Test for : https://pagure.io/SSSD/sssd/issue/3264 + + When [prompting/2fa/sshd] with single_prompt = False is set + then during ssh it should be prompted with given message + for first factor and then for second factor. + """ + master = self.master + USER2 = 'sshuser2' + sssd_conf_backup = tasks.FileBackup(master, paths.SSSD_CONF) + first_prompt = 'Enter first factor:' + second_prompt = 'Enter second factor:' + add_contents = textwrap.dedent(''' + [prompting/2fa/sshd] + single_prompt = False + first_prompt = {0} + second_prompt = {1} + ''').format(first_prompt, second_prompt) + set_sssd_conf(master, add_contents) + tasks.create_active_user(master, USER2, PASSWORD) + tasks.kinit_admin(master) + master.run_command(['ipa', 'user-mod', USER2, '--user-auth-type=otp']) + try: + otpuid, totp = add_otptoken(master, USER2, otptype='totp') + master.run_command(['ipa', 'otptoken-show', otpuid]) + otpvalue = totp.generate(int(time.time())).decode('ascii') + answers = { + first_prompt: PASSWORD, + second_prompt: otpvalue + } + ssh_2f(master.hostname, USER2, answers) + # check if user listed in output + cmd = self.master.run_command(['semanage', 'login', '-l']) + assert USER2 in cmd.stdout_text + finally: + master.run_command(['ipa', 'user-del', USER2]) + self.master.run_command(['semanage', 'login', '-D']) + sssd_conf_backup.restore()