mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
test_smb: test that we can auth as NetBIOS alias
cifs/... principal on SMB server side has NetBIOS name of the SMB server as its alias. Test that we can actually initialize credentials using this alias. We don't need to use it anywhere in Samba, just verify that alias works. Related: https://pagure.io/freeipa/issue/8291 Signed-off-by: Alexander Bokovoy <abokovoy@redhat.com> Reviewed-By: Christian Heimes <cheimes@redhat.com> Reviewed-By: Simo Sorce <ssorce@redhat.com>
This commit is contained in:
@@ -29,6 +29,7 @@ import re
|
||||
import collections
|
||||
import itertools
|
||||
import shutil
|
||||
import copy
|
||||
import tempfile
|
||||
import time
|
||||
from pipes import quote
|
||||
@@ -1963,6 +1964,118 @@ def kinit_as_user(host, user, password):
|
||||
host.run_command(['kinit', user], stdin_text=password + '\n')
|
||||
|
||||
|
||||
KeyEntry = collections.namedtuple('KeyEntry',
|
||||
['kvno', 'principal', 'etype', 'key'])
|
||||
|
||||
|
||||
class KerberosKeyCopier:
|
||||
"""Copy Kerberos keys from a keytab to a keytab on a target host
|
||||
|
||||
Example:
|
||||
Copy host/master1.ipa.test principal as MASTER$ in a temporary keytab
|
||||
|
||||
# host - master1.ipa.test
|
||||
copier = KerberosKeyCopier(host)
|
||||
realm = host.domain.realm
|
||||
principal = copier.host_princ_template.format(
|
||||
master=host.hostname, realm=realm)
|
||||
replacement = {principal: f'MASTER$@{realm}'}
|
||||
|
||||
result = host.run_command(['mktemp'])
|
||||
tmpname = result.stdout_text.strip()
|
||||
|
||||
copier.copy_keys('/etc/krb5.keytab', tmpname, replacement=replacement)
|
||||
"""
|
||||
host_princ_template = "host/{master}@{realm}"
|
||||
valid_etypes = ['aes256-cts-hmac-sha1-96', 'aes128-cts-hmac-sha1-96']
|
||||
|
||||
def __init__(self, host):
|
||||
self.host = host
|
||||
self.realm = host.domain.realm
|
||||
|
||||
def extract_key_refs(self, keytab, princ=None):
|
||||
if princ is None:
|
||||
princ = self.host_princ_template.format(master=self.host.hostname,
|
||||
realm=self.realm)
|
||||
result = self.host.run_command(
|
||||
[paths.KLIST, "-eK", "-k", keytab],
|
||||
log_stdout=False, raiseonerr=False)
|
||||
if result.returncode != 0:
|
||||
return None
|
||||
|
||||
keys_to_sync = []
|
||||
for l in result.stdout_text.splitlines():
|
||||
if (princ in l and any(e in l for e in self.valid_etypes)):
|
||||
|
||||
els = l.split()
|
||||
els[-2] = els[-2].strip('()')
|
||||
els[-1] = els[-1].strip('()')
|
||||
keys_to_sync.append(KeyEntry._make(els))
|
||||
|
||||
return keys_to_sync
|
||||
|
||||
def copy_key(self, keytab, keyentry):
|
||||
# keyentry.key is a hex value of the actual key
|
||||
# prefixed with 0x, as produced by klist -K -k.
|
||||
# However, ktutil accepts hex value without 0x, so
|
||||
# we should strip first two characters.
|
||||
stdin = textwrap.dedent("""\
|
||||
rkt {keytab}
|
||||
addent -key -p {principal} -k {kvno} -e {etype}
|
||||
{key}
|
||||
wkt {keytab}
|
||||
""").format(keytab=keytab, principal=keyentry.principal,
|
||||
kvno=keyentry.kvno, etype=keyentry.etype,
|
||||
key=keyentry.key[2:])
|
||||
|
||||
result = self.host.run_command(
|
||||
[paths.KTUTIL], stdin_text=stdin,
|
||||
raiseonerr=False, log_stdout=False)
|
||||
|
||||
return result.returncode == 0
|
||||
|
||||
def copy_keys(self, origin, destination, principal=None, replacement=None):
|
||||
def sync_keys(origkeys, destkeys):
|
||||
for origkey in origkeys:
|
||||
copied = False
|
||||
uptodate = False
|
||||
if origkey.principal in replacement:
|
||||
origkey = copy.deepcopy(origkey)
|
||||
origkey.principal = replacement.get(origkey.principal)
|
||||
for destkey in destkeys:
|
||||
if all([destkey.principal == origkey.principal,
|
||||
destkey.etype == origkey.etype]):
|
||||
if any([destkey.key != origkey.key,
|
||||
destkey.kvno != origkey.kvno]):
|
||||
copied = self.copy_key(destination, origkey)
|
||||
break
|
||||
uptodate = True
|
||||
if not (copied or uptodate):
|
||||
copied = self.copy_key(destination, origkey)
|
||||
return copied or uptodate
|
||||
|
||||
if not self.host.transport.file_exists(origin):
|
||||
return False
|
||||
origkeys = self.extract_key_refs(origin, princ=principal)
|
||||
if self.host.transport.file_exists(destination):
|
||||
destkeys = self.extract_key_refs(destination)
|
||||
if any([origkeys is None, destkeys is None]):
|
||||
logger.warning('Either %s or %s are missing or unreadable',
|
||||
origin, destination)
|
||||
return False
|
||||
return sync_keys(origkeys, destkeys)
|
||||
else:
|
||||
for origkey in origkeys:
|
||||
if origkey.principal in replacement:
|
||||
newkey = KeyEntry._make(
|
||||
[origkey.kvno, replacement.get(origkey.principal),
|
||||
origkey.etype, origkey.key])
|
||||
origkey = newkey
|
||||
if not self.copy_key(destination, origkey):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class FileBackup:
|
||||
"""Create file backup and do restore on remote host
|
||||
|
||||
|
||||
@@ -224,6 +224,28 @@ class TestSMB(IntegrationTest):
|
||||
self.smbclient.run_command(['umount', mountpoint], raiseonerr=False)
|
||||
self.smbclient.run_command(['rmdir', mountpoint], raiseonerr=False)
|
||||
|
||||
def smb_cifs_principal_alias_check(self):
|
||||
netbiosname = self.smbserver.hostname.split('.')[0].upper() + '$'
|
||||
copier = tasks.KerberosKeyCopier(self.smbserver)
|
||||
|
||||
principal = 'cifs/{hostname}@{realm}'.format(
|
||||
hostname=self.smbserver.hostname, realm=copier.realm)
|
||||
alias = '{netbiosname}@{realm}'.format(
|
||||
netbiosname=netbiosname, realm=copier.realm)
|
||||
replacement = {principal: alias}
|
||||
|
||||
result = self.smbserver.run_command(['mktemp'])
|
||||
# klist/ktutil will fail with 0-sized file
|
||||
# so we just use the temporary file as a prefix
|
||||
tmpname = result.stdout_text.strip() + '.keytab'
|
||||
|
||||
copier.copy_keys('/etc/samba/samba.keytab',
|
||||
tmpname, principal=principal, replacement=replacement)
|
||||
self.smbserver.run_command(['kinit', '-kt', tmpname, netbiosname],
|
||||
raiseonerr=True)
|
||||
self.smbserver.run_command(['rm', '-f', tmpname])
|
||||
self.smbserver.run_command(['rm', '-f', tmpname[:-7]])
|
||||
|
||||
def test_samba_uninstallation_without_installation(self):
|
||||
res = self.smbserver.run_command(
|
||||
['ipa-client-samba', '--uninstall', '-U'])
|
||||
@@ -237,6 +259,8 @@ class TestSMB(IntegrationTest):
|
||||
result = self.smbserver.run_command(
|
||||
['systemctl', 'status', service], raiseonerr=False)
|
||||
assert result.returncode == 3
|
||||
# Validate that we can authenticate with the service alias principal
|
||||
self.smb_cifs_principal_alias_check()
|
||||
self.smbserver.run_command([
|
||||
'systemctl', 'enable', '--now', 'smb', 'winbind'
|
||||
])
|
||||
|
||||
Reference in New Issue
Block a user