freeipa/ipatests/test_integration/test_smb.py

487 lines
20 KiB
Python
Raw Normal View History

#
# Copyright (C) 2019 FreeIPA Contributors see COPYING for license
#
"""This module provides tests for SMB-related features like
configuring Samba file server and mounting SMB file system
"""
from __future__ import absolute_import
from functools import partial
import textwrap
import re
import os
import pytest
from ipatests.test_integration.base import IntegrationTest
from ipatests.pytest_ipa.integration import tasks
from ipaplatform.osinfo import osinfo
from ipaplatform.paths import paths
from ipatests.pytest_ipa.integration import skip_if_fips
def wait_smbd_functional(host):
"""Wait smbd is functional after (re)start
After start of smbd there is a 2-3 seconds delay before daemon is
fully functional and clients can successfuly mount a share.
The ping command effectively blocks until the daemon is ready.
"""
host.run_command(['smbcontrol', 'smbd', 'ping'])
class TestSMB(IntegrationTest):
topology = 'star'
num_clients = 2
num_ad_domains = 1
ipa_user1 = 'user1'
ipa_user1_password = 'SecretUser1'
ipa_user2 = 'user2'
ipa_user2_password = 'SecretUser2'
ad_user_login = 'testuser'
ad_user_password = 'Secret123'
ipa_test_group = 'ipa_testgroup'
ad_test_group = 'testgroup'
@classmethod
def install(cls, mh):
if cls.domain_level is not None:
domain_level = cls.domain_level
else:
domain_level = cls.master.config.domain_level
tasks.install_topo(cls.topology,
cls.master, cls.replicas,
cls.clients, domain_level,
clients_extra_args=('--mkhomedir',))
cls.ad = cls.ads[0]
cls.smbserver = cls.clients[0]
cls.smbclient = cls.clients[1]
cls.ad_user = '{}@{}'.format(cls.ad_user_login, cls.ad.domain.name)
tasks.install_adtrust(cls.master)
tasks.configure_dns_for_trust(cls.master, cls.ad)
tasks.configure_windows_dns_for_trust(cls.ad, cls.master)
tasks.establish_trust_with_ad(cls.master, cls.ad.domain.name,
extra_args=['--two-way=true'])
tasks.create_active_user(cls.master, cls.ipa_user1,
password=cls.ipa_user1_password)
tasks.create_active_user(cls.master, cls.ipa_user2,
password=cls.ipa_user2_password)
# Trigger creation of home directories on the SMB server
for user in [cls.ipa_user1, cls.ipa_user2, cls.ad_user]:
tasks.run_command_as_user(cls.smbserver, user, ['stat', '.'])
@pytest.fixture
def samba_share_public(self):
"""Setup share outside /home on samba server."""
share_name = 'shared'
share_path = '/srv/samba_shared'
smbserver = self.smbserver
smbserver.run_command(['mkdir', share_path])
smbserver.run_command(['chmod', '777', share_path])
# apply selinux context only if selinux is enabled
if tasks.is_selinux_enabled(smbserver):
smbserver.run_command(['chcon', '-t', 'samba_share_t', share_path])
with tasks.FileBackup(smbserver, paths.SMB_CONF):
smb_conf = smbserver.get_file_contents(
paths.SMB_CONF, encoding='utf-8')
smb_conf += textwrap.dedent('''
[{name}]
path = {path}
writable = yes
browsable=yes
'''.format(name=share_name, path=share_path))
smbserver.put_file_contents(paths.SMB_CONF, smb_conf)
smbserver.run_command(['systemctl', 'restart', 'smb'])
wait_smbd_functional(smbserver)
yield {
'name': share_name,
'server_path': share_path,
'unc': '//{}/{}'.format(smbserver.hostname, share_name)
}
smbserver.run_command(['systemctl', 'restart', 'smb'])
wait_smbd_functional(smbserver)
smbserver.run_command(['rmdir', share_path])
def mount_smb_share(self, user, password, share, mountpoint):
tasks.kdestroy_all(self.smbclient)
tasks.kinit_as_user(self.smbclient, user, password)
self.smbclient.run_command(['mkdir', '-p', mountpoint])
self.smbclient.run_command([
'mount', '-t', 'cifs', share['unc'], mountpoint,
'-o', 'sec=krb5i,multiuser'
])
tasks.kdestroy_all(self.smbclient)
def smb_sanity_check(self, user, client_mountpoint, share):
test_dir = 'testdir_{}'.format(user)
test_file = 'testfile_{}'.format(user)
test_file_path = '{}/{}'.format(test_dir, test_file)
test_string = 'Hello, world!'
run_smb_client = partial(tasks.run_command_as_user, self.smbclient,
user, cwd=client_mountpoint)
run_smb_server = partial(self.smbserver.run_command,
cwd=share['server_path'])
try:
# check creation of directory from client side
run_smb_client(['mkdir', test_dir])
# check dir properties at client side
res = run_smb_client(['stat', '-c', '%n %U %G', test_dir])
assert res.stdout_text == '{0} {1} {1}\n'.format(test_dir, user)
# check dir properties at server side
res = run_smb_server(['stat', '-c', '%n %U %G', test_dir])
assert res.stdout_text == '{0} {1} {1}\n'.format(test_dir, user)
# check creation of file from client side
run_smb_client('printf "{}" > {}'.format(
test_string, test_file_path))
# check file is listed at client side
res = run_smb_client(['ls', test_dir])
assert res.stdout_text == test_file + '\n'
# check file is listed at server side
res = run_smb_server(['ls', test_dir])
assert res.stdout_text == test_file + '\n'
# check file properties at server side
res = run_smb_server(['stat', '-c', '%n %s %U %G', test_file_path])
assert res.stdout_text == '{0} {1} {2} {2}\n'.format(
test_file_path, len(test_string), user)
# check file properties at client side
res = run_smb_client(['stat', '-c', '%n %s %U %G', test_file_path])
assert res.stdout_text == '{0} {1} {2} {2}\n'.format(
test_file_path, len(test_string), user)
# check file contents at client side
res = run_smb_client(['cat', test_file_path])
assert res.stdout_text == test_string
# check file contents at server side
file_contents_at_server = self.smbserver.get_file_contents(
'{}/{}'.format(share['server_path'], test_file_path),
encoding='utf-8')
assert file_contents_at_server == test_string
# Detect whether smbclient uses -k or --use-kerberos=required
# https://pagure.io/freeipa/issue/8926
# then check access using smbclient.
res = run_smb_client(
[
"smbclient",
"-h",
], raiseonerr=False
)
if "[-k|--kerberos]" in res.stderr_text:
smbclient_krb5_knob = "-k"
else:
smbclient_krb5_knob = "--use-kerberos=desired"
res = run_smb_client(
[
"smbclient",
smbclient_krb5_knob,
share["unc"],
"-c",
"dir",
]
)
assert test_dir in res.stdout_text
# check file and dir removal from client side
run_smb_client(['rm', test_file_path])
run_smb_client(['rmdir', test_dir])
# check dir does not exist at client side
res = run_smb_client(['stat', test_dir], raiseonerr=False)
assert res.returncode == 1
assert 'No such file or directory' in res.stderr_text
# check dir does not exist at server side
res = run_smb_server(['stat', test_dir], raiseonerr=False)
assert res.returncode == 1
assert 'No such file or directory' in res.stderr_text
finally:
run_smb_server(['rm', '-rf', test_dir], raiseonerr=False)
def smb_installation_check(self, result):
domain_regexp_tpl = r'''
Domain\ name:\s*{domain}\n
\s*NetBIOS\ name:\s*{netbios}\n
\s*SID:\s*S-1-5-21-\d+-\d+-\d+\n
\s+ID\ range:\s*\d+\s*-\s*\d+
'''
# pylint: disable=no-member
ipa_regexp = domain_regexp_tpl.format(
domain=re.escape(self.master.domain.name),
netbios=self.master.netbios)
ad_regexp = domain_regexp_tpl.format(
domain=re.escape(self.ad.domain.name), netbios=self.ad.netbios)
# pylint: enable=no-member
output_regexp = r'''
Discovered\ domains.*
{}
.*
{}
.*
Samba.+configured.+check.+/etc/samba/smb\.conf
'''.format(ipa_regexp, ad_regexp)
assert re.search(output_regexp, result.stdout_text,
re.VERBOSE | re.DOTALL)
def cleanup_mount(self, mountpoint):
self.smbclient.run_command(['umount', mountpoint], raiseonerr=False)
self.smbclient.run_command(['rmdir', mountpoint], raiseonerr=False)
def test_samba_uninstallation_without_installation(self):
res = self.smbserver.run_command(
['ipa-client-samba', '--uninstall', '-U'])
assert res.stdout_text == 'Samba domain member is not configured yet\n'
def test_install_samba(self):
samba_install_result = self.smbserver.run_command(
['ipa-client-samba', '-U'])
# smb and winbind are expected to be not running
for service in ['smb', 'winbind']:
result = self.smbserver.run_command(
['systemctl', 'status', service], raiseonerr=False)
assert result.returncode == 3
self.smbserver.run_command([
'systemctl', 'enable', '--now', 'smb', 'winbind'
])
wait_smbd_functional(self.smbserver)
# check that smb and winbind started successfully
for service in ['smb', 'winbind']:
self.smbserver.run_command(['systemctl', 'status', service])
# print status for debugging purposes
self.smbserver.run_command(['smbstatus'])
# checks postponed till the end of method to be sure services are
# started - this way we prevent other tests from failing
self.smb_installation_check(samba_install_result)
def test_authentication_with_smb_cifs_principal_alias(self):
"""Test that we can auth as NetBIOS alias
cifs/... principal on SMB server side has NetBIOS name of the SMB
server as its alias. Test that we can actually initialize credentials
using this alias. We don't need to use it anywhere in Samba, just
verify that alias works.
Test for https://pagure.io/freeipa/issue/8291"""
netbiosname = self.smbserver.hostname.split('.')[0].upper() + '$'
copier = tasks.KerberosKeyCopier(self.smbserver)
principal = 'cifs/{hostname}@{realm}'.format(
hostname=self.smbserver.hostname, realm=copier.realm)
alias = '{netbiosname}@{realm}'.format(
netbiosname=netbiosname, realm=copier.realm)
replacement = {principal: alias}
tmpname = tasks.create_temp_file(self.smbserver, create_file=False)
try:
copier.copy_keys(paths.SAMBA_KEYTAB, tmpname, principal=principal,
replacement=replacement)
self.smbserver.run_command(['kinit', '-kt', tmpname, netbiosname])
finally:
self.smbserver.run_command(['rm', '-f', tmpname])
def test_samba_service_listed(self):
"""Check samba service is listed.
Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=1731433
"""
service_name = 'cifs/{}@{}'.format(
self.smbserver.hostname, self.smbserver.domain.name.upper())
tasks.kinit_admin(self.master)
res = self.master.run_command(
['ipa', 'service-show', '--raw', service_name])
expected_output = 'krbprincipalname: {}\n'.format(service_name)
assert expected_output in res.stdout_text
res = self.master.run_command(
['ipa', 'service-find', '--raw', service_name])
assert expected_output in res.stdout_text
def check_smb_access_at_ipa_client(self, user, password, samba_share):
mount_point = '/mnt/smb'
self.mount_smb_share(user, password, samba_share, mount_point)
try:
tasks.run_command_as_user(self.smbclient, user, ['kdestroy', '-A'])
tasks.run_command_as_user(self.smbclient, user, ['kinit', user],
stdin_text=password + '\n')
self.smb_sanity_check(user, mount_point, samba_share)
finally:
self.cleanup_mount(mount_point)
def test_smb_access_for_ipa_user_at_ipa_client(self):
samba_share = {
'name': 'homes',
'server_path': '/home/{}'.format(self.ipa_user1),
'unc': '//{}/homes'.format(self.smbserver.hostname)
}
self.check_smb_access_at_ipa_client(
self.ipa_user1, self.ipa_user1_password, samba_share)
def test_smb_access_for_ad_user_at_ipa_client(self):
samba_share = {
'name': 'homes',
'server_path': '/home/{}/{}'.format(self.ad.domain.name,
self.ad_user_login),
'unc': '//{}/homes'.format(self.smbserver.hostname)
}
self.check_smb_access_at_ipa_client(
self.ad_user, self.ad_user_password, samba_share)
def test_smb_mount_and_access_by_different_users(self, samba_share_public):
user1 = self.ipa_user1
password1 = self.ipa_user1_password
user2 = self.ipa_user2
password2 = self.ipa_user2_password
mount_point = '/mnt/smb'
try:
self.mount_smb_share(user1, password1, samba_share_public,
mount_point)
tasks.run_command_as_user(self.smbclient, user2,
['kdestroy', '-A'])
tasks.run_command_as_user(self.smbclient, user2, ['kinit', user2],
stdin_text=password2 + '\n')
self.smb_sanity_check(user2, mount_point, samba_share_public)
finally:
self.cleanup_mount(mount_point)
@pytest.mark.skipif(
osinfo.id == 'fedora' and osinfo.version_number <= (31,),
reason='Test requires krb 1.18')
def test_smb_service_s4u2self(self):
"""Test S4U2Self operation by IPA service
against both AD and IPA users
"""
script = textwrap.dedent("""export KRB5_TRACE=/dev/stderr
kdestroy -A
kinit -kt /etc/samba/samba.keytab {principal}
klist -f
{print_pac} -k /etc/samba/samba.keytab -E impersonate {user_princ}
klist -f
""")
principal = 'cifs/{hostname}'.format(
hostname=self.smbserver.hostname)
# Copy ipa-print-pac to SMB server
# We can do so because Samba and GSSAPI libraries
# are present there
print_pac = self.master.get_file_contents(
os.path.join(paths.LIBEXEC_IPA_DIR, "ipa-print-pac"))
result = self.smbserver.run_command(['mktemp'])
tmpname = result.stdout_text.strip()
self.smbserver.put_file_contents(tmpname, print_pac)
self.smbserver.run_command(['chmod', 'a+x', tmpname])
for user in (self.ad_user, self.ipa_user1,):
shell_script = script.format(principal=principal,
user_princ=user,
print_pac=tmpname)
self.smbserver.run_command(['/bin/bash', '-s', '-e'],
stdin_text=shell_script)
self.smbserver.run_command(['rm', '-f', tmpname])
tasks.kdestroy_all(self.smbserver)
def test_smb_mount_fails_without_kerberos_ticket(self, samba_share_public):
mountpoint = '/mnt/smb'
try:
tasks.kdestroy_all(self.smbclient)
self.smbclient.run_command(['mkdir', '-p', mountpoint])
res = self.smbclient.run_command([
'mount', '-t', 'cifs', samba_share_public['unc'], mountpoint,
'-o', 'sec=krb5i,multiuser'
], raiseonerr=False)
assert res.returncode == 32
finally:
self.cleanup_mount(mountpoint)
def check_repeated_smb_mount(self, options):
mountpoint = '/mnt/smb'
unc = '//{}/homes'.format(self.smbserver.hostname)
test_file = 'ntlm_test'
test_file_server_path = '/home/{}/{}'.format(self.ipa_user1, test_file)
test_file_client_path = '{}/{}'.format(mountpoint, test_file)
self.smbclient.run_command(['mkdir', '-p', mountpoint])
self.smbserver.put_file_contents(test_file_server_path, '')
try:
for i in [1, 2]:
res = self.smbclient.run_command([
'mount', '-t', 'cifs', unc, mountpoint, '-o', options],
raiseonerr=False)
assert res.returncode == 0, (
'Mount failed at iteration {}. Output: {}'
.format(i, res.stdout_text + res.stderr_text))
assert self.smbclient.transport.file_exists(
test_file_client_path)
self.smbclient.run_command(['umount', mountpoint])
finally:
self.cleanup_mount(mountpoint)
self.smbserver.run_command(['rm', '-f', test_file_server_path])
@skip_if_fips()
def test_ntlm_authentication_with_auto_domain(self):
"""Repeatedly try to authenticate with username and password with
automatic domain discovery.
This is a regression test for https://pagure.io/freeipa/issue/8636
"""
tasks.kdestroy_all(self.smbclient)
mount_options = 'user={user},pass={password},domainauto'.format(
user=self.ipa_user1,
password=self.ipa_user1_password
)
self.check_repeated_smb_mount(mount_options)
@skip_if_fips()
def test_ntlm_authentication_with_upn_with_lowercase_domain(self):
tasks.kdestroy_all(self.smbclient)
mount_options = 'user={user}@{domain},pass={password}'.format(
user=self.ipa_user1,
password=self.ipa_user1_password,
domain=self.master.domain.name.lower()
)
self.check_repeated_smb_mount(mount_options)
@skip_if_fips()
def test_ntlm_authentication_with_upn_with_uppercase_domain(self):
tasks.kdestroy_all(self.smbclient)
mount_options = 'user={user}@{domain},pass={password}'.format(
user=self.ipa_user1,
password=self.ipa_user1_password,
domain=self.master.domain.name.upper()
)
self.check_repeated_smb_mount(mount_options)
def test_uninstall_samba(self):
self.smbserver.run_command(['ipa-client-samba', '--uninstall', '-U'])
res = self.smbserver.run_command(
['systemctl', 'status', 'winbind'], raiseonerr=False)
assert res.returncode == 3
res = self.smbserver.run_command(
['systemctl', 'status', 'smb'], raiseonerr=False)
assert res.returncode == 3
def test_repeated_uninstall_samba(self):
"""Test samba uninstallation after successful uninstallation.
Test for bug https://pagure.io/freeipa/issue/8019.
"""
self.smbserver.run_command(['ipa-client-samba', '--uninstall', '-U'])
def test_samba_reinstall(self):
"""Test samba can be reinstalled.
Test installation after uninstallation and do some sanity checks.
Test for bug https://pagure.io/freeipa/issue/8021
"""
self.test_install_samba()
self.test_smb_access_for_ipa_user_at_ipa_client()
def test_cleanup(self):
tasks.unconfigure_windows_dns_for_trust(self.ad, self.master)