From 6fc213d10deacf107f5ca225a0095bdf215dc73b Mon Sep 17 00:00:00 2001 From: Alexander Bokovoy Date: Wed, 29 Apr 2020 11:34:45 +0300 Subject: [PATCH] test_smb: test that we can auth as NetBIOS alias cifs/... principal on SMB server side has NetBIOS name of the SMB server as its alias. Test that we can actually initialize credentials using this alias. We don't need to use it anywhere in Samba, just verify that alias works. Related: https://pagure.io/freeipa/issue/8291 Signed-off-by: Alexander Bokovoy Reviewed-By: Christian Heimes Reviewed-By: Simo Sorce --- ipatests/pytest_ipa/integration/tasks.py | 113 +++++++++++++++++++++++ ipatests/test_integration/test_smb.py | 24 +++++ 2 files changed, 137 insertions(+) diff --git a/ipatests/pytest_ipa/integration/tasks.py b/ipatests/pytest_ipa/integration/tasks.py index 27e132731..c54e1f5a3 100755 --- a/ipatests/pytest_ipa/integration/tasks.py +++ b/ipatests/pytest_ipa/integration/tasks.py @@ -29,6 +29,7 @@ import re import collections import itertools import shutil +import copy import tempfile import time from pipes import quote @@ -1963,6 +1964,118 @@ def kinit_as_user(host, user, password): host.run_command(['kinit', user], stdin_text=password + '\n') +KeyEntry = collections.namedtuple('KeyEntry', + ['kvno', 'principal', 'etype', 'key']) + + +class KerberosKeyCopier: + """Copy Kerberos keys from a keytab to a keytab on a target host + + Example: + Copy host/master1.ipa.test principal as MASTER$ in a temporary keytab + + # host - master1.ipa.test + copier = KerberosKeyCopier(host) + realm = host.domain.realm + principal = copier.host_princ_template.format( + master=host.hostname, realm=realm) + replacement = {principal: f'MASTER$@{realm}'} + + result = host.run_command(['mktemp']) + tmpname = result.stdout_text.strip() + + copier.copy_keys('/etc/krb5.keytab', tmpname, replacement=replacement) + """ + host_princ_template = "host/{master}@{realm}" + valid_etypes = ['aes256-cts-hmac-sha1-96', 'aes128-cts-hmac-sha1-96'] + + def __init__(self, host): + self.host = host + self.realm = host.domain.realm + + def extract_key_refs(self, keytab, princ=None): + if princ is None: + princ = self.host_princ_template.format(master=self.host.hostname, + realm=self.realm) + result = self.host.run_command( + [paths.KLIST, "-eK", "-k", keytab], + log_stdout=False, raiseonerr=False) + if result.returncode != 0: + return None + + keys_to_sync = [] + for l in result.stdout_text.splitlines(): + if (princ in l and any(e in l for e in self.valid_etypes)): + + els = l.split() + els[-2] = els[-2].strip('()') + els[-1] = els[-1].strip('()') + keys_to_sync.append(KeyEntry._make(els)) + + return keys_to_sync + + def copy_key(self, keytab, keyentry): + # keyentry.key is a hex value of the actual key + # prefixed with 0x, as produced by klist -K -k. + # However, ktutil accepts hex value without 0x, so + # we should strip first two characters. + stdin = textwrap.dedent("""\ + rkt {keytab} + addent -key -p {principal} -k {kvno} -e {etype} + {key} + wkt {keytab} + """).format(keytab=keytab, principal=keyentry.principal, + kvno=keyentry.kvno, etype=keyentry.etype, + key=keyentry.key[2:]) + + result = self.host.run_command( + [paths.KTUTIL], stdin_text=stdin, + raiseonerr=False, log_stdout=False) + + return result.returncode == 0 + + def copy_keys(self, origin, destination, principal=None, replacement=None): + def sync_keys(origkeys, destkeys): + for origkey in origkeys: + copied = False + uptodate = False + if origkey.principal in replacement: + origkey = copy.deepcopy(origkey) + origkey.principal = replacement.get(origkey.principal) + for destkey in destkeys: + if all([destkey.principal == origkey.principal, + destkey.etype == origkey.etype]): + if any([destkey.key != origkey.key, + destkey.kvno != origkey.kvno]): + copied = self.copy_key(destination, origkey) + break + uptodate = True + if not (copied or uptodate): + copied = self.copy_key(destination, origkey) + return copied or uptodate + + if not self.host.transport.file_exists(origin): + return False + origkeys = self.extract_key_refs(origin, princ=principal) + if self.host.transport.file_exists(destination): + destkeys = self.extract_key_refs(destination) + if any([origkeys is None, destkeys is None]): + logger.warning('Either %s or %s are missing or unreadable', + origin, destination) + return False + return sync_keys(origkeys, destkeys) + else: + for origkey in origkeys: + if origkey.principal in replacement: + newkey = KeyEntry._make( + [origkey.kvno, replacement.get(origkey.principal), + origkey.etype, origkey.key]) + origkey = newkey + if not self.copy_key(destination, origkey): + return False + return True + + class FileBackup: """Create file backup and do restore on remote host diff --git a/ipatests/test_integration/test_smb.py b/ipatests/test_integration/test_smb.py index d3d6272c0..d73678051 100644 --- a/ipatests/test_integration/test_smb.py +++ b/ipatests/test_integration/test_smb.py @@ -224,6 +224,28 @@ class TestSMB(IntegrationTest): self.smbclient.run_command(['umount', mountpoint], raiseonerr=False) self.smbclient.run_command(['rmdir', mountpoint], raiseonerr=False) + def smb_cifs_principal_alias_check(self): + netbiosname = self.smbserver.hostname.split('.')[0].upper() + '$' + copier = tasks.KerberosKeyCopier(self.smbserver) + + principal = 'cifs/{hostname}@{realm}'.format( + hostname=self.smbserver.hostname, realm=copier.realm) + alias = '{netbiosname}@{realm}'.format( + netbiosname=netbiosname, realm=copier.realm) + replacement = {principal: alias} + + result = self.smbserver.run_command(['mktemp']) + # klist/ktutil will fail with 0-sized file + # so we just use the temporary file as a prefix + tmpname = result.stdout_text.strip() + '.keytab' + + copier.copy_keys('/etc/samba/samba.keytab', + tmpname, principal=principal, replacement=replacement) + self.smbserver.run_command(['kinit', '-kt', tmpname, netbiosname], + raiseonerr=True) + self.smbserver.run_command(['rm', '-f', tmpname]) + self.smbserver.run_command(['rm', '-f', tmpname[:-7]]) + def test_samba_uninstallation_without_installation(self): res = self.smbserver.run_command( ['ipa-client-samba', '--uninstall', '-U']) @@ -237,6 +259,8 @@ class TestSMB(IntegrationTest): result = self.smbserver.run_command( ['systemctl', 'status', service], raiseonerr=False) assert result.returncode == 3 + # Validate that we can authenticate with the service alias principal + self.smb_cifs_principal_alias_check() self.smbserver.run_command([ 'systemctl', 'enable', '--now', 'smb', 'winbind' ])