mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-24 16:10:02 -06:00
b5fd288991
The test sets 389-ds nsslapd-idletimeout to 60s, then does a kinit with an otp token (which makes ipa-otpd create a LDAP connection), then sleeps for 60s. The expectation is that ns-slapd will detect that the LDAP conn from ipa-otpd is idle and close the connection. According to 389ds doc, the idle timeout is enforced when the connection table is walked. By doing a ldapsearch, the test "wakes up" ns-slapd and forces the detection of ipa-otpd idle connection. Fixes: https://pagure.io/freeipa/issue/9044 Signed-off-by: Florence Blanc-Renaud <flo@redhat.com> Reviewed-By: Anuja More <amore@redhat.com>
376 lines
14 KiB
Python
376 lines
14 KiB
Python
#
|
|
# Copyright (C) 2019 FreeIPA Contributors see COPYING for license
|
|
#
|
|
"""OTP token tests
|
|
"""
|
|
import base64
|
|
import logging
|
|
import paramiko
|
|
import pytest
|
|
import re
|
|
import time
|
|
import textwrap
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.twofactor.hotp import HOTP
|
|
from cryptography.hazmat.primitives.twofactor.totp import TOTP
|
|
|
|
from ipatests.test_integration.base import IntegrationTest
|
|
from ipaplatform.paths import paths
|
|
from ipatests.pytest_ipa.integration import tasks
|
|
from ipapython.dn import DN
|
|
|
|
|
|
PASSWORD = "DummyPassword123"
|
|
USER = "opttestuser"
|
|
ARMOR = "/tmp/armor"
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def add_otptoken(host, owner, *, otptype="hotp", digits=6, algo="sha1"):
|
|
args = [
|
|
"ipa",
|
|
"otptoken-add",
|
|
"--owner",
|
|
owner,
|
|
"--type",
|
|
otptype,
|
|
"--digits",
|
|
str(digits),
|
|
"--algo",
|
|
algo,
|
|
"--no-qrcode",
|
|
]
|
|
result = host.run_command(args)
|
|
otpuid = re.search(
|
|
r"Unique ID:\s*([a-z0-9-]*)\s+", result.stdout_text
|
|
).group(1)
|
|
otpuristr = re.search(r"URI:\s*(.*)\s+", result.stdout_text).group(1)
|
|
otpuri = urlparse(otpuristr)
|
|
assert otpuri.netloc == otptype
|
|
|
|
query = parse_qs(otpuri.query)
|
|
assert query["algorithm"][0] == algo.upper()
|
|
assert query["digits"][0] == str(digits)
|
|
key = base64.b32decode(query["secret"][0])
|
|
assert len(key) == 35
|
|
|
|
hashcls = getattr(hashes, algo.upper())
|
|
if otptype == "hotp":
|
|
return otpuid, HOTP(key, digits, hashcls(), default_backend())
|
|
else:
|
|
period = int(query["period"][0])
|
|
return otpuid, TOTP(key, digits, hashcls(), period, default_backend())
|
|
|
|
|
|
def del_otptoken(host, otpuid):
|
|
tasks.kinit_admin(host)
|
|
host.run_command(["ipa", "otptoken-del", otpuid])
|
|
|
|
|
|
def kinit_otp(host, user, *, password, otp, success=True):
|
|
tasks.kdestroy_all(host)
|
|
# create armor for FAST
|
|
host.run_command(["kinit", "-n", "-c", ARMOR])
|
|
host.run_command(
|
|
["kinit", "-T", ARMOR, user],
|
|
stdin_text=f"{password}{otp}\n",
|
|
ok_returncode=0 if success else 1,
|
|
)
|
|
|
|
|
|
def ssh_2f(hostname, username, answers_dict, port=22):
|
|
"""
|
|
:param hostname: hostname
|
|
:param username: username
|
|
:param answers_dict: dictionary of options with prompt_message and value.
|
|
:param port: port for ssh
|
|
"""
|
|
# Handler for server questions
|
|
def answer_handler(title, instructions, prompt_list):
|
|
resp = []
|
|
if title:
|
|
print(title.strip())
|
|
if instructions:
|
|
print(instructions.strip())
|
|
for prmpt in prompt_list:
|
|
prmpt_str = prmpt[0].strip()
|
|
resp.append(answers_dict[prmpt_str])
|
|
logger.info("Prompt is: '%s'", prmpt_str)
|
|
logger.info(
|
|
"Answer to ssh prompt is: '%s'", answers_dict[prmpt_str])
|
|
return resp
|
|
trans = paramiko.Transport((hostname, port))
|
|
trans.connect()
|
|
trans.auth_interactive(username, answer_handler)
|
|
|
|
|
|
def set_sssd_conf(host, add_contents):
|
|
contents = host.get_file_contents(paths.SSSD_CONF, encoding="utf-8")
|
|
file_contents = contents + add_contents
|
|
host.put_file_contents(paths.SSSD_CONF, file_contents)
|
|
tasks.clear_sssd_cache(host)
|
|
|
|
|
|
class TestOTPToken(IntegrationTest):
|
|
"""Tests for member manager feature for groups and hostgroups
|
|
"""
|
|
|
|
topology = "line"
|
|
|
|
@classmethod
|
|
def install(cls, mh):
|
|
super(TestOTPToken, cls).install(mh)
|
|
master = cls.master
|
|
|
|
tasks.kinit_admin(master)
|
|
# create service with OTP auth indicator
|
|
cls.service_name = f"otponly/{master.hostname}"
|
|
master.run_command(
|
|
["ipa", "service-add", cls.service_name, "--auth-ind=otp"]
|
|
)
|
|
# service needs a keytab before user can acquire a ticket for it
|
|
keytab = "/tmp/otponly.keytab"
|
|
master.run_command(
|
|
["ipa-getkeytab", "-p", cls.service_name, "-k", keytab]
|
|
)
|
|
master.run_command(["rm", "-f", keytab])
|
|
|
|
tasks.create_active_user(master, USER, PASSWORD)
|
|
tasks.kinit_admin(master)
|
|
master.run_command(["ipa", "user-mod", USER, "--user-auth-type=otp"])
|
|
|
|
@classmethod
|
|
def uninstall(cls, mh):
|
|
cls.master.run_command(["rm", "-f", ARMOR])
|
|
super(TestOTPToken, cls).uninstall(mh)
|
|
|
|
def test_otp_auth_ind(self):
|
|
tasks.kinit_admin(self.master)
|
|
result = self.master.run_command(
|
|
["kvno", self.service_name], ok_returncode=1
|
|
)
|
|
assert "KDC policy rejects request" in result.stderr_text
|
|
|
|
def test_hopt(self):
|
|
master = self.master
|
|
|
|
tasks.kinit_admin(self.master)
|
|
otpuid, hotp = add_otptoken(master, USER, otptype="hotp")
|
|
master.run_command(["ipa", "otptoken-show", otpuid])
|
|
# normal password login fails
|
|
master.run_command(
|
|
["kinit", USER], stdin_text=f"{PASSWORD}\n", ok_returncode=1
|
|
)
|
|
# OTP login works
|
|
otpvalue = hotp.generate(0).decode("ascii")
|
|
kinit_otp(master, USER, password=PASSWORD, otp=otpvalue)
|
|
# repeating OTP fails
|
|
kinit_otp(
|
|
master, USER, password=PASSWORD, otp=otpvalue, success=False
|
|
)
|
|
# skipping an OTP is ok
|
|
otpvalue = hotp.generate(2).decode("ascii")
|
|
kinit_otp(master, USER, password=PASSWORD, otp=otpvalue)
|
|
# TGT with OTP auth indicator can get a ticket for OTP-only service
|
|
master.run_command(["kvno", self.service_name])
|
|
result = master.run_command(["klist"])
|
|
assert self.service_name in result.stdout_text
|
|
|
|
del_otptoken(master, otpuid)
|
|
|
|
def test_totp(self):
|
|
master = self.master
|
|
|
|
tasks.kinit_admin(self.master)
|
|
otpuid, totp = add_otptoken(master, USER, otptype="totp")
|
|
|
|
otpvalue = totp.generate(int(time.time())).decode("ascii")
|
|
kinit_otp(master, USER, password=PASSWORD, otp=otpvalue)
|
|
# TGT with OTP auth indicator can get a ticket for OTP-only service
|
|
master.run_command(["kvno", self.service_name])
|
|
result = master.run_command(["klist"])
|
|
assert self.service_name in result.stdout_text
|
|
|
|
del_otptoken(master, otpuid)
|
|
|
|
def test_otptoken_sync(self):
|
|
master = self.master
|
|
|
|
tasks.kinit_admin(self.master)
|
|
otpuid, hotp = add_otptoken(master, USER, otptype="hotp")
|
|
|
|
otp1 = hotp.generate(10).decode("ascii")
|
|
otp2 = hotp.generate(11).decode("ascii")
|
|
|
|
master.run_command(
|
|
["ipa", "otptoken-sync", "--user", USER],
|
|
stdin_text=f"{PASSWORD}\n{otp1}\n{otp2}\n",
|
|
)
|
|
otpvalue = hotp.generate(12).decode("ascii")
|
|
kinit_otp(master, USER, password=PASSWORD, otp=otpvalue)
|
|
|
|
otp1 = hotp.generate(20).decode("ascii")
|
|
otp2 = hotp.generate(21).decode("ascii")
|
|
|
|
master.run_command(
|
|
["ipa", "otptoken-sync", otpuid, "--user", USER],
|
|
stdin_text=f"{PASSWORD}\n{otp1}\n{otp2}\n",
|
|
)
|
|
otpvalue = hotp.generate(22).decode("ascii")
|
|
kinit_otp(master, USER, password=PASSWORD, otp=otpvalue)
|
|
|
|
del_otptoken(master, otpuid)
|
|
|
|
def test_2fa_enable_single_prompt(self):
|
|
"""Test ssh with 2FA when single prompt is enabled.
|
|
|
|
Test for : https://pagure.io/SSSD/sssd/issue/3264
|
|
|
|
When [prompting/2fa/sshd] with single_prompt = True is set
|
|
then during ssh it should be prompted with given message
|
|
for first and second factor at once.
|
|
"""
|
|
|
|
master = self.master
|
|
USER1 = 'sshuser1'
|
|
sssd_conf_backup = tasks.FileBackup(master, paths.SSSD_CONF)
|
|
first_prompt = 'Please enter password + OTP token value:'
|
|
add_contents = textwrap.dedent('''
|
|
[prompting/2fa/sshd]
|
|
single_prompt = True
|
|
first_prompt = {0}
|
|
''').format(first_prompt)
|
|
set_sssd_conf(master, add_contents)
|
|
tasks.create_active_user(master, USER1, PASSWORD)
|
|
tasks.kinit_admin(master)
|
|
master.run_command(['ipa', 'user-mod', USER1, '--user-auth-type=otp'])
|
|
try:
|
|
otpuid, totp = add_otptoken(master, USER1, otptype='totp')
|
|
master.run_command(['ipa', 'otptoken-show', otpuid])
|
|
otpvalue = totp.generate(int(time.time())).decode('ascii')
|
|
password = '{0}{1}'.format(PASSWORD, otpvalue)
|
|
tasks.run_ssh_cmd(
|
|
to_host=self.master.external_hostname, username=USER1,
|
|
auth_method="password", password=password
|
|
)
|
|
# check if user listed in output
|
|
cmd = self.master.run_command(['semanage', 'login', '-l'])
|
|
assert USER1 in cmd.stdout_text
|
|
finally:
|
|
master.run_command(['ipa', 'user-del', USER1])
|
|
self.master.run_command(['semanage', 'login', '-D'])
|
|
sssd_conf_backup.restore()
|
|
|
|
def test_2fa_disable_single_prompt(self):
|
|
"""Test ssh with 2FA when single prompt is disabled.
|
|
|
|
Test for : https://pagure.io/SSSD/sssd/issue/3264
|
|
|
|
When [prompting/2fa/sshd] with single_prompt = False is set
|
|
then during ssh it should be prompted with given message
|
|
for first factor and then for second factor.
|
|
|
|
This requires paramiko until the 2-prompt sshpass RFE is
|
|
fulfilled: https://sourceforge.net/p/sshpass/feature-requests/5/
|
|
"""
|
|
if self.master.is_fips_mode: # pylint: disable=no-member
|
|
pytest.skip("paramiko is not compatible with FIPS mode")
|
|
|
|
master = self.master
|
|
USER2 = 'sshuser2'
|
|
sssd_conf_backup = tasks.FileBackup(master, paths.SSSD_CONF)
|
|
first_prompt = 'Enter first factor:'
|
|
second_prompt = 'Enter second factor:'
|
|
add_contents = textwrap.dedent('''
|
|
[prompting/2fa/sshd]
|
|
single_prompt = False
|
|
first_prompt = {0}
|
|
second_prompt = {1}
|
|
''').format(first_prompt, second_prompt)
|
|
set_sssd_conf(master, add_contents)
|
|
tasks.create_active_user(master, USER2, PASSWORD)
|
|
tasks.kinit_admin(master)
|
|
master.run_command(['ipa', 'user-mod', USER2, '--user-auth-type=otp'])
|
|
try:
|
|
otpuid, totp = add_otptoken(master, USER2, otptype='totp')
|
|
master.run_command(['ipa', 'otptoken-show', otpuid])
|
|
otpvalue = totp.generate(int(time.time())).decode('ascii')
|
|
answers = {
|
|
first_prompt: PASSWORD,
|
|
second_prompt: otpvalue
|
|
}
|
|
ssh_2f(master.hostname, USER2, answers)
|
|
# check if user listed in output
|
|
cmd = self.master.run_command(['semanage', 'login', '-l'])
|
|
assert USER2 in cmd.stdout_text
|
|
finally:
|
|
master.run_command(['ipa', 'user-del', USER2])
|
|
self.master.run_command(['semanage', 'login', '-D'])
|
|
sssd_conf_backup.restore()
|
|
|
|
@pytest.fixture
|
|
def setup_otp_nsslapd(self):
|
|
check_services = self.master.run_command(
|
|
['systemctl', 'list-units', '--state=failed']
|
|
)
|
|
assert "0 loaded units listed" in check_services.stdout_text
|
|
assert "ipa-otpd" not in check_services.stdout_text
|
|
# Be sure no services are running and failed units
|
|
self.master.run_command(['killall', 'ipa-otpd'], raiseonerr=False)
|
|
# setting nsslapd-idletimeout
|
|
new_limit = 30
|
|
conn = self.master.ldap_connect()
|
|
dn = DN(('cn', 'config'))
|
|
entry = conn.get_entry(dn) # pylint: disable=no-member
|
|
orig_limit = entry.single_value.get('nsslapd-idletimeout')
|
|
ldap_query = textwrap.dedent("""
|
|
dn: cn=config
|
|
changetype: modify
|
|
replace: nsslapd-idletimeout
|
|
nsslapd-idletimeout: {limit}
|
|
""")
|
|
tasks.ldapmodify_dm(self.master, ldap_query.format(limit=new_limit))
|
|
yield
|
|
# cleanup
|
|
tasks.ldapmodify_dm(self.master, ldap_query.format(limit=orig_limit))
|
|
|
|
def test_check_otpd_after_idle_timeout(self, setup_otp_nsslapd):
|
|
"""Test for OTP when the LDAP connection timed out.
|
|
|
|
Test for : https://pagure.io/freeipa/issue/6587
|
|
|
|
ipa-otpd was exiting with failure when LDAP connection timed out.
|
|
Test to verify that when the nsslapd-idletimeout is exceeded (30s idle,
|
|
60s sleep) then the ipa-otpd process should exit without error.
|
|
"""
|
|
since = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
tasks.kinit_admin(self.master)
|
|
otpuid, totp = add_otptoken(self.master, USER, otptype="totp")
|
|
try:
|
|
# kinit with OTP auth
|
|
otpvalue = totp.generate(int(time.time())).decode("ascii")
|
|
kinit_otp(self.master, USER, password=PASSWORD, otp=otpvalue)
|
|
time.sleep(60)
|
|
# ldapsearch will wake up slapd and force walking through
|
|
# the connection list, in order to spot the idle connections
|
|
tasks.ldapsearch_dm(self.master, "", ldap_args=[], scope="base")
|
|
|
|
def test_cb(cmd_jornalctl):
|
|
# check if LDAP connection is timed out
|
|
expected_msg = "Can't contact LDAP server"
|
|
return expected_msg in cmd_jornalctl
|
|
|
|
# ipa-otpd don't flush its logs to syslog immediately
|
|
cmd = ['journalctl', '--since={}'.format(since)]
|
|
tasks.run_repeatedly(
|
|
self.master, command=cmd, test=test_cb, timeout=90)
|
|
failed_services = self.master.run_command(
|
|
['systemctl', 'list-units', '--state=failed']
|
|
)
|
|
assert "ipa-otpd" not in failed_services.stdout_text
|
|
finally:
|
|
del_otptoken(self.master, otpuid)
|