freeipa/ipatests/test_integration/test_commands.py
Christian Heimes 965181362a Fix systemd-user HBAC rule
2ef6e14c5a added an invalid HBAC rule that
encoded the service wrongly.

See: https://bugzilla.redhat.com/show_bug.cgi?id=1643928
Fixes: https://pagure.io/freeipa/issue/7831
Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
2019-01-15 14:29:22 -05:00

563 lines
20 KiB
Python

#
# Copyright (C) 2018 FreeIPA Contributors see COPYING for license
#
"""Misc test for 'ipa' CLI regressions
"""
from __future__ import absolute_import
import base64
import re
import os
import logging
import ssl
from tempfile import NamedTemporaryFile
from itertools import chain, repeat
import textwrap
import time
import paramiko
import pytest
from cryptography.hazmat.backends import default_backend
from cryptography import x509
from ipalib.constants import IPAAPI_USER
from ipaplatform.paths import paths
from ipatests.test_integration.base import IntegrationTest
from ipatests.pytest_ipa.integration import tasks
from ipatests.create_external_ca import ExternalCA
from ipatests.test_ipalib.test_x509 import good_pkcs7, badcert
logger = logging.getLogger(__name__)
class TestIPACommand(IntegrationTest):
"""
A lot of commands can be executed against a single IPA installation
so provide a generic class to execute one-off commands that need to be
tested without having to fire up a full server to run one command.
"""
topology = 'line'
def get_cert_base64(self, host, path):
"""Retrieve cert and return content as single line, base64 encoded
"""
cacrt = host.get_file_contents(path, encoding='ascii')
cader = ssl.PEM_cert_to_DER_cert(cacrt)
return base64.b64encode(cader).decode('ascii')
def test_certmap_match_issue7520(self):
# https://pagure.io/freeipa/issue/7520
tasks.kinit_admin(self.master)
result = self.master.run_command(
['ipa', 'certmap-match', paths.IPA_CA_CRT],
raiseonerr=False
)
assert result.returncode == 1
assert not result.stderr_text
assert "0 users matched" in result.stdout_text
cab64 = self.get_cert_base64(self.master, paths.IPA_CA_CRT)
result = self.master.run_command(
['ipa', 'certmap-match', '--certificate', cab64],
raiseonerr=False
)
assert result.returncode == 1
assert not result.stderr_text
assert "0 users matched" in result.stdout_text
def test_cert_find_issue7520(self):
# https://pagure.io/freeipa/issue/7520
tasks.kinit_admin(self.master)
subject = 'CN=Certificate Authority,O={}'.format(
self.master.domain.realm)
# by cert file
result = self.master.run_command(
['ipa', 'cert-find', '--file', paths.IPA_CA_CRT]
)
assert subject in result.stdout_text
assert '1 certificate matched' in result.stdout_text
# by base64 cert
cab64 = self.get_cert_base64(self.master, paths.IPA_CA_CRT)
result = self.master.run_command(
['ipa', 'cert-find', '--certificate', cab64]
)
assert subject in result.stdout_text
assert '1 certificate matched' in result.stdout_text
def test_add_permission_failure_issue5923(self):
# https://pagure.io/freeipa/issue/5923
# error response used to contain bytes instead of text
tasks.kinit_admin(self.master)
# neither privilege nor permission exists
result = self.master.run_command(
["ipa", "privilege-add-permission", "loc",
"--permission='System: Show IPA Locations"],
raiseonerr=False
)
assert result.returncode == 2
err = result.stderr_text.strip() # pylint: disable=no-member
assert err == "ipa: ERROR: loc: privilege not found"
# add privilege
result = self.master.run_command(
["ipa", "privilege-add", "loc"],
)
assert 'Added privilege "loc"' in result.stdout_text
# permission is still missing
result = self.master.run_command(
["ipa", "privilege-add-permission", "loc",
"--permission='System: Show IPA Locations"],
raiseonerr=False
)
assert result.returncode == 1
assert "Number of permissions added 0" in result.stdout_text
def test_change_sysaccount_password_issue7561(self):
sysuser = 'system'
original_passwd = 'Secret123'
new_passwd = 'userPasswd123'
master = self.master
base_dn = str(master.domain.basedn) # pylint: disable=no-member
tf = NamedTemporaryFile()
ldif_file = tf.name
entry_ldif = textwrap.dedent("""
dn: uid=system,cn=sysaccounts,cn=etc,{base_dn}
changetype: add
objectclass: account
objectclass: simplesecurityobject
uid: system
userPassword: {original_passwd}
passwordExpirationTime: 20380119031407Z
nsIdleTimeout: 0
""").format(
base_dn=base_dn,
original_passwd=original_passwd)
master.put_file_contents(ldif_file, entry_ldif)
arg = ['ldapmodify',
'-h', master.hostname,
'-p', '389', '-D',
str(master.config.dirman_dn), # pylint: disable=no-member
'-w', master.config.dirman_password,
'-f', ldif_file]
master.run_command(arg)
tasks.ldappasswd_sysaccount_change(sysuser, original_passwd,
new_passwd, master)
def test_ldapmodify_password_issue7601(self):
user = 'ipauser'
original_passwd = 'Secret123'
new_passwd = 'userPasswd123'
new_passwd2 = 'mynewPwd123'
master = self.master
base_dn = str(master.domain.basedn) # pylint: disable=no-member
# Create a user with a password
tasks.kinit_admin(master)
add_password_stdin_text = "{pwd}\n{pwd}".format(pwd=original_passwd)
master.run_command(['ipa', 'user-add', user,
'--first', user,
'--last', user,
'--password'],
stdin_text=add_password_stdin_text)
# kinit as that user in order to modify the pwd
user_kinit_stdin_text = "{old}\n%{new}\n%{new}\n".format(
old=original_passwd,
new=original_passwd)
master.run_command(['kinit', user], stdin_text=user_kinit_stdin_text)
# Retrieve krblastpwdchange and krbpasswordexpiration
search_cmd = [
'ldapsearch', '-x',
'-D', 'cn=directory manager',
'-w', master.config.dirman_password,
'-s', 'base',
'-b', 'uid={user},cn=users,cn=accounts,{base_dn}'.format(
user=user, base_dn=base_dn),
'-o', 'ldif-wrap=no',
'-LLL',
'krblastpwdchange',
'krbpasswordexpiration']
output = master.run_command(search_cmd).stdout_text.lower()
# extract krblastpwdchange and krbpasswordexpiration
krbchg_pattern = 'krblastpwdchange: (.+)\n'
krbexp_pattern = 'krbpasswordexpiration: (.+)\n'
krblastpwdchange = re.findall(krbchg_pattern, output)[0]
krbexp = re.findall(krbexp_pattern, output)[0]
# sleep 1 sec (krblastpwdchange and krbpasswordexpiration have at most
# a 1s precision)
time.sleep(1)
# perform ldapmodify on userpassword as dir mgr
mod = NamedTemporaryFile()
ldif_file = mod.name
entry_ldif = textwrap.dedent("""
dn: uid={user},cn=users,cn=accounts,{base_dn}
changetype: modify
replace: userpassword
userpassword: {new_passwd}
""").format(
user=user,
base_dn=base_dn,
new_passwd=new_passwd)
master.put_file_contents(ldif_file, entry_ldif)
arg = ['ldapmodify',
'-h', master.hostname,
'-p', '389', '-D',
str(master.config.dirman_dn), # pylint: disable=no-member
'-w', master.config.dirman_password,
'-f', ldif_file]
master.run_command(arg)
# Test new password with kinit
master.run_command(['kinit', user], stdin_text=new_passwd)
# Retrieve krblastpwdchange and krbpasswordexpiration
output = master.run_command(search_cmd).stdout_text.lower()
# extract krblastpwdchange and krbpasswordexpiration
newkrblastpwdchange = re.findall(krbchg_pattern, output)[0]
newkrbexp = re.findall(krbexp_pattern, output)[0]
# both should have changed
assert newkrblastpwdchange != krblastpwdchange
assert newkrbexp != krbexp
# Now test passwd modif with ldappasswd
time.sleep(1)
master.run_command([
paths.LDAPPASSWD,
'-D', str(master.config.dirman_dn), # pylint: disable=no-member
'-w', master.config.dirman_password,
'-a', new_passwd,
'-s', new_passwd2,
'-x', '-ZZ',
'-H', 'ldap://{hostname}'.format(hostname=master.hostname),
'uid={user},cn=users,cn=accounts,{base_dn}'.format(
user=user, base_dn=base_dn)]
)
# Test new password with kinit
master.run_command(['kinit', user], stdin_text=new_passwd2)
# Retrieve krblastpwdchange and krbpasswordexpiration
output = master.run_command(search_cmd).stdout_text.lower()
# extract krblastpwdchange and krbpasswordexpiration
newkrblastpwdchange2 = re.findall(krbchg_pattern, output)[0]
newkrbexp2 = re.findall(krbexp_pattern, output)[0]
# both should have changed
assert newkrblastpwdchange != newkrblastpwdchange2
assert newkrbexp != newkrbexp2
def test_change_selinuxusermaporder(self):
"""
An update file meant to ensure a more sane default was
overriding any customization done to the order.
"""
maporder = "unconfined_u:s0-s0:c0.c1023"
# set a new default
tasks.kinit_admin(self.master)
result = self.master.run_command(
["ipa", "config-mod",
"--ipaselinuxusermaporder={}".format(maporder)],
raiseonerr=False
)
assert result.returncode == 0
# apply the update
result = self.master.run_command(
["ipa-server-upgrade"],
raiseonerr=False
)
assert result.returncode == 0
# ensure result is the same
result = self.master.run_command(
["ipa", "config-show"],
raiseonerr=False
)
assert result.returncode == 0
assert "SELinux user map order: {}".format(
maporder) in result.stdout_text
def test_ipa_console(self):
tasks.kinit_admin(self.master)
result = self.master.run_command(
["ipa", "console"],
stdin_text="api.env"
)
assert "ipalib.config.Env" in result.stdout_text
filename = tasks.upload_temp_contents(
self.master,
"print(api.env)\n"
)
result = self.master.run_command(
["ipa", "console", filename],
)
assert "ipalib.config.Env" in result.stdout_text
def test_list_help_topics(self):
tasks.kinit_admin(self.master)
result = self.master.run_command(
["ipa", "help", "topics"],
raiseonerr=False
)
assert result.returncode == 0
def test_ssh_key_connection(self, tmpdir):
"""
Integration test for https://pagure.io/SSSD/sssd/issue/3747
"""
test_user = 'test-ssh'
master = self.master.hostname
pub_keys = []
for i in range(40):
ssh_key_pair = tasks.generate_ssh_keypair()
pub_keys.append(ssh_key_pair[1])
with open(os.path.join(
tmpdir, 'ssh_priv_{}'.format(i)), 'w') as fp:
fp.write(ssh_key_pair[0])
tasks.kinit_admin(self.master)
self.master.run_command(['ipa', 'user-add', test_user,
'--first=tester', '--last=tester'])
keys_opts = ' '.join(['--ssh "{}"'.format(k) for k in pub_keys])
cmd = 'ipa user-mod {} {}'.format(test_user, keys_opts)
self.master.run_command(cmd)
# connect with first SSH key
first_priv_key_path = os.path.join(tmpdir, 'ssh_priv_1')
# change private key permission to comply with SS rules
os.chmod(first_priv_key_path, 0o600)
sshcon = paramiko.SSHClient()
sshcon.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# first connection attempt is a workaround for
# https://pagure.io/SSSD/sssd/issue/3669
try:
sshcon.connect(master, username=test_user,
key_filename=first_priv_key_path, timeout=1)
except (paramiko.AuthenticationException, paramiko.SSHException):
pass
try:
sshcon.connect(master, username=test_user,
key_filename=first_priv_key_path, timeout=1)
except (paramiko.AuthenticationException,
paramiko.SSHException) as e:
pytest.fail('Authentication using SSH key not successful', e)
journal_cmd = ['journalctl', '--since=today', '-u', 'sshd']
result = self.master.run_command(journal_cmd)
output = result.stdout_text
assert not re.search('exited on signal 13', output)
# cleanup
self.master.run_command(['ipa', 'user-del', test_user])
def test_ssh_leak(self):
"""
Integration test for https://pagure.io/SSSD/sssd/issue/3794
"""
def count_pipes():
res = self.master.run_command(['pidof', 'sssd_ssh'])
pid = res.stdout_text.strip()
proc_path = '/proc/{}/fd'.format(pid)
res = self.master.run_command(['ls', '-la', proc_path])
fds_text = res.stdout_text.strip()
return sum((1 for _ in re.finditer(r'pipe', fds_text)))
test_user = 'test-ssh'
tasks.kinit_admin(self.master)
self.master.run_command(['ipa', 'user-add', test_user,
'--first=tester', '--last=tester'])
certs = []
# we are ok with whatever certificate for this test
external_ca = ExternalCA()
for _dummy in range(3):
cert = external_ca.create_ca()
cert = tasks.strip_cert_header(cert.decode('utf-8'))
certs.append('"{}"'.format(cert))
cert_args = list(
chain.from_iterable(list(zip(repeat('--certificate'), certs))))
cmd = 'ipa user-add-cert {} {}'.format(test_user, ' '.join(cert_args))
self.master.run_command(cmd)
tasks.clear_sssd_cache(self.master)
num_of_pipes = count_pipes()
for _dummy in range(3):
self.master.run_command([paths.SSS_SSH_AUTHORIZEDKEYS, test_user])
current_num_of_pipes = count_pipes()
assert current_num_of_pipes == num_of_pipes
# cleanup
self.master.run_command(['ipa', 'user-del', test_user])
def test_certificate_out_write_to_file(self):
# commands to test; name of temporary file will be appended
commands = [
['ipa', 'cert-show', '1', '--certificate-out'],
['ipa', 'cert-show', '1', '--chain', '--certificate-out'],
['ipa', 'ca-show', 'ipa', '--certificate-out'],
['ipa', 'ca-show', 'ipa', '--chain', '--certificate-out'],
]
for command in commands:
cmd = self.master.run_command(['mktemp'])
filename = cmd.stdout_text.strip()
self.master.run_command(command + [filename])
# Check that a PEM file was written. If --chain was
# used, load_pem_x509_certificate will return the
# first certificate, which is fine for this test.
data = self.master.get_file_contents(filename)
x509.load_pem_x509_certificate(data, backend=default_backend())
self.master.run_command(['rm', '-f', filename])
def test_sssd_ifp_access_ipaapi(self):
# check that ipaapi is allowed to access sssd-ifp for smartcard auth
# https://pagure.io/freeipa/issue/7751
username = 'admin'
# get UID for user
result = self.master.run_command(['ipa', 'user-show', username])
mo = re.search(r'UID: (\d+)', result.stdout_text)
assert mo is not None, result.stdout_text
uid = mo.group(1)
cmd = [
'dbus-send',
'--print-reply', '--system',
'--dest=org.freedesktop.sssd.infopipe',
'/org/freedesktop/sssd/infopipe/Users',
'org.freedesktop.sssd.infopipe.Users.FindByName',
'string:{}'.format(username)
]
# test IFP as root
result = self.master.run_command(cmd)
assert uid in result.stdout_text
# test IFP as ipaapi
result = self.master.run_command(
['sudo', '-u', IPAAPI_USER, '--'] + cmd
)
assert uid in result.stdout_text
def test_ipa_cacert_manage_install(self):
# Re-install the IPA CA
self.master.run_command([
paths.IPA_CACERT_MANAGE,
'install',
paths.IPA_CA_CRT])
# Test a non-existent file
result = self.master.run_command([
paths.IPA_CACERT_MANAGE,
'install',
'/var/run/cert_not_found'], raiseonerr=False)
assert result.returncode == 1
cmd = self.master.run_command(['mktemp'])
filename = cmd.stdout_text.strip()
for contents in (good_pkcs7,):
self.master.put_file_contents(filename, contents)
result = self.master.run_command([
paths.IPA_CACERT_MANAGE,
'install',
filename])
for contents in (badcert,):
self.master.put_file_contents(filename, contents)
result = self.master.run_command([
paths.IPA_CACERT_MANAGE,
'install',
filename], raiseonerr=False)
assert result.returncode == 1
self.master.run_command(['rm', '-f', filename])
def test_hbac_systemd_user(self):
# https://pagure.io/freeipa/issue/7831
tasks.kinit_admin(self.master)
# check for presence
self.master.run_command(
['ipa', 'hbacsvc-show', 'systemd-user']
)
result = self.master.run_command(
['ipa', 'hbacrule-show', 'allow_systemd-user', '--all']
)
lines = set(l.strip() for l in result.stdout_text.split('\n'))
assert 'User category: all' in lines
assert 'Host category: all' in lines
assert 'Enabled: TRUE' in lines
assert 'Services: systemd-user' in lines
assert 'accessruletype: allow' in lines
# delete both
self.master.run_command(
['ipa', 'hbacrule-del', 'allow_systemd-user']
)
self.master.run_command(
['ipa', 'hbacsvc-del', 'systemd-user']
)
# run upgrade
result = self.master.run_command(['ipa-server-upgrade'])
assert 'Created hbacsvc systemd-user' in result.stderr_text
assert 'Created hbac rule allow_systemd-user' in result.stderr_text
# check for presence
result = self.master.run_command(
['ipa', 'hbacrule-show', 'allow_systemd-user', '--all']
)
lines = set(l.strip() for l in result.stdout_text.split('\n'))
assert 'User category: all' in lines
assert 'Host category: all' in lines
assert 'Enabled: TRUE' in lines
assert 'Services: systemd-user' in lines
assert 'accessruletype: allow' in lines
self.master.run_command(
['ipa', 'hbacsvc-show', 'systemd-user']
)
# only delete rule
self.master.run_command(
['ipa', 'hbacrule-del', 'allow_systemd-user']
)
# run upgrade
result = self.master.run_command(['ipa-server-upgrade'])
assert (
'hbac service systemd-user already exists' in result.stderr_text
)
assert (
'Created hbac rule allow_systemd-user' not in result.stderr_text
)
result = self.master.run_command(
['ipa', 'hbacrule-show', 'allow_systemd-user'],
raiseonerr=False
)
assert result.returncode != 0
assert 'HBAC rule not found' in result.stderr_text