| 
									
										
										
										
											2018-04-27 12:29:17 +02:00
										 |  |  | # | 
					
						
							|  |  |  | # Copyright (C) 2018  FreeIPA Contributors see COPYING for license | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | """Misc test for 'ipa' CLI regressions
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | from __future__ import absolute_import | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import base64 | 
					
						
							| 
									
										
										
										
											2018-08-02 16:59:07 +02:00
										 |  |  | import re | 
					
						
							| 
									
										
										
										
											2018-07-31 13:24:01 +02:00
										 |  |  | import os | 
					
						
							|  |  |  | import logging | 
					
						
							| 
									
										
										
										
											2018-04-27 12:29:17 +02:00
										 |  |  | import ssl | 
					
						
							| 
									
										
										
										
											2018-05-25 10:16:24 -04:00
										 |  |  | from tempfile import NamedTemporaryFile | 
					
						
							| 
									
										
										
										
											2018-08-23 10:34:39 +02:00
										 |  |  | from itertools import chain, repeat | 
					
						
							| 
									
										
										
										
											2018-05-25 10:16:24 -04:00
										 |  |  | import textwrap | 
					
						
							| 
									
										
										
										
											2018-08-02 16:59:07 +02:00
										 |  |  | import time | 
					
						
							| 
									
										
										
										
											2018-07-31 13:24:01 +02:00
										 |  |  | import paramiko | 
					
						
							|  |  |  | import pytest | 
					
						
							| 
									
										
										
										
											2018-04-27 12:29:17 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-27 16:36:59 +10:00
										 |  |  | from cryptography.hazmat.backends import default_backend | 
					
						
							|  |  |  | from cryptography import x509 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-11-06 13:57:14 +01:00
										 |  |  | from ipalib.constants import IPAAPI_USER | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-04-27 12:29:17 +02:00
										 |  |  | from ipaplatform.paths import paths | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from ipatests.test_integration.base import IntegrationTest | 
					
						
							| 
									
										
										
										
											2018-08-02 13:45:19 +02:00
										 |  |  | from ipatests.pytest_ipa.integration import tasks | 
					
						
							| 
									
										
										
										
											2018-08-23 10:34:39 +02:00
										 |  |  | from ipatests.create_external_ca import ExternalCA | 
					
						
							| 
									
										
										
										
											2018-10-17 18:12:52 -04:00
										 |  |  | from ipatests.test_ipalib.test_x509 import good_pkcs7, badcert | 
					
						
							| 
									
										
										
										
											2018-04-27 12:29:17 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-31 13:24:01 +02:00
										 |  |  | logger = logging.getLogger(__name__) | 
					
						
							| 
									
										
										
										
											2018-04-27 12:29:17 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-11-06 13:57:14 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-04-27 12:29:17 +02:00
										 |  |  | class TestIPACommand(IntegrationTest): | 
					
						
							| 
									
										
										
										
											2018-05-25 10:16:24 -04:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     A lot of commands can be executed against a single IPA installation | 
					
						
							|  |  |  |     so provide a generic class to execute one-off commands that need to be | 
					
						
							|  |  |  |     tested without having to fire up a full server to run one command. | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2018-04-27 12:29:17 +02:00
										 |  |  |     topology = 'line' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_cert_base64(self, host, path): | 
					
						
							|  |  |  |         """Retrieve cert and return content as single line, base64 encoded
 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         cacrt = host.get_file_contents(path, encoding='ascii') | 
					
						
							|  |  |  |         cader = ssl.PEM_cert_to_DER_cert(cacrt) | 
					
						
							|  |  |  |         return base64.b64encode(cader).decode('ascii') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_certmap_match_issue7520(self): | 
					
						
							|  |  |  |         # https://pagure.io/freeipa/issue/7520 | 
					
						
							|  |  |  |         tasks.kinit_admin(self.master) | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'certmap-match', paths.IPA_CA_CRT], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode == 1 | 
					
						
							|  |  |  |         assert not result.stderr_text | 
					
						
							|  |  |  |         assert "0 users matched" in result.stdout_text | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cab64 = self.get_cert_base64(self.master, paths.IPA_CA_CRT) | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'certmap-match', '--certificate', cab64], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode == 1 | 
					
						
							|  |  |  |         assert not result.stderr_text | 
					
						
							|  |  |  |         assert "0 users matched" in result.stdout_text | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_cert_find_issue7520(self): | 
					
						
							|  |  |  |         # https://pagure.io/freeipa/issue/7520 | 
					
						
							|  |  |  |         tasks.kinit_admin(self.master) | 
					
						
							|  |  |  |         subject = 'CN=Certificate Authority,O={}'.format( | 
					
						
							|  |  |  |             self.master.domain.realm) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # by cert file | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'cert-find', '--file', paths.IPA_CA_CRT] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert subject in result.stdout_text | 
					
						
							|  |  |  |         assert '1 certificate matched' in result.stdout_text | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # by base64 cert | 
					
						
							|  |  |  |         cab64 = self.get_cert_base64(self.master, paths.IPA_CA_CRT) | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'cert-find', '--certificate', cab64] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert subject in result.stdout_text | 
					
						
							|  |  |  |         assert '1 certificate matched' in result.stdout_text | 
					
						
							| 
									
										
										
										
											2018-05-24 15:38:33 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_add_permission_failure_issue5923(self): | 
					
						
							|  |  |  |         # https://pagure.io/freeipa/issue/5923 | 
					
						
							|  |  |  |         # error response used to contain bytes instead of text | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tasks.kinit_admin(self.master) | 
					
						
							|  |  |  |         # neither privilege nor permission exists | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa", "privilege-add-permission", "loc", | 
					
						
							|  |  |  |              "--permission='System: Show IPA Locations"], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode == 2 | 
					
						
							|  |  |  |         err = result.stderr_text.strip()  # pylint: disable=no-member | 
					
						
							|  |  |  |         assert err == "ipa: ERROR: loc: privilege not found" | 
					
						
							|  |  |  |         # add privilege | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa", "privilege-add", "loc"], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert 'Added privilege "loc"' in result.stdout_text | 
					
						
							|  |  |  |         # permission is still missing | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa", "privilege-add-permission", "loc", | 
					
						
							|  |  |  |              "--permission='System: Show IPA Locations"], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode == 1 | 
					
						
							|  |  |  |         assert "Number of permissions added 0" in result.stdout_text | 
					
						
							| 
									
										
										
										
											2018-05-25 10:16:24 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_change_sysaccount_password_issue7561(self): | 
					
						
							|  |  |  |         sysuser = 'system' | 
					
						
							|  |  |  |         original_passwd = 'Secret123' | 
					
						
							|  |  |  |         new_passwd = 'userPasswd123' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         master = self.master | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         base_dn = str(master.domain.basedn)  # pylint: disable=no-member | 
					
						
							|  |  |  |         tf = NamedTemporaryFile() | 
					
						
							|  |  |  |         ldif_file = tf.name | 
					
						
							|  |  |  |         entry_ldif = textwrap.dedent("""
 | 
					
						
							|  |  |  |             dn: uid=system,cn=sysaccounts,cn=etc,{base_dn} | 
					
						
							|  |  |  |             changetype: add | 
					
						
							|  |  |  |             objectclass: account | 
					
						
							|  |  |  |             objectclass: simplesecurityobject | 
					
						
							|  |  |  |             uid: system | 
					
						
							|  |  |  |             userPassword: {original_passwd} | 
					
						
							|  |  |  |             passwordExpirationTime: 20380119031407Z | 
					
						
							|  |  |  |             nsIdleTimeout: 0 | 
					
						
							|  |  |  |         """).format(
 | 
					
						
							|  |  |  |             base_dn=base_dn, | 
					
						
							|  |  |  |             original_passwd=original_passwd) | 
					
						
							|  |  |  |         master.put_file_contents(ldif_file, entry_ldif) | 
					
						
							|  |  |  |         arg = ['ldapmodify', | 
					
						
							| 
									
										
										
										
											2018-11-30 17:14:41 +01:00
										 |  |  |                '-ZZ', | 
					
						
							| 
									
										
										
										
											2018-05-25 10:16:24 -04:00
										 |  |  |                '-h', master.hostname, | 
					
						
							|  |  |  |                '-p', '389', '-D', | 
					
						
							|  |  |  |                str(master.config.dirman_dn),   # pylint: disable=no-member | 
					
						
							|  |  |  |                '-w', master.config.dirman_password, | 
					
						
							|  |  |  |                '-f', ldif_file] | 
					
						
							|  |  |  |         master.run_command(arg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tasks.ldappasswd_sysaccount_change(sysuser, original_passwd, | 
					
						
							|  |  |  |                                            new_passwd, master) | 
					
						
							| 
									
										
										
										
											2018-06-01 15:19:35 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-08-02 16:59:07 +02:00
										 |  |  |     def test_ldapmodify_password_issue7601(self): | 
					
						
							|  |  |  |         user = 'ipauser' | 
					
						
							|  |  |  |         original_passwd = 'Secret123' | 
					
						
							|  |  |  |         new_passwd = 'userPasswd123' | 
					
						
							|  |  |  |         new_passwd2 = 'mynewPwd123' | 
					
						
							|  |  |  |         master = self.master | 
					
						
							|  |  |  |         base_dn = str(master.domain.basedn)  # pylint: disable=no-member | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Create a user with a password | 
					
						
							|  |  |  |         tasks.kinit_admin(master) | 
					
						
							|  |  |  |         add_password_stdin_text = "{pwd}\n{pwd}".format(pwd=original_passwd) | 
					
						
							|  |  |  |         master.run_command(['ipa', 'user-add', user, | 
					
						
							|  |  |  |                             '--first', user, | 
					
						
							|  |  |  |                             '--last', user, | 
					
						
							|  |  |  |                             '--password'], | 
					
						
							|  |  |  |                            stdin_text=add_password_stdin_text) | 
					
						
							|  |  |  |         # kinit as that user in order to modify the pwd | 
					
						
							|  |  |  |         user_kinit_stdin_text = "{old}\n%{new}\n%{new}\n".format( | 
					
						
							|  |  |  |             old=original_passwd, | 
					
						
							|  |  |  |             new=original_passwd) | 
					
						
							|  |  |  |         master.run_command(['kinit', user], stdin_text=user_kinit_stdin_text) | 
					
						
							|  |  |  |         # Retrieve krblastpwdchange and krbpasswordexpiration | 
					
						
							|  |  |  |         search_cmd = [ | 
					
						
							| 
									
										
										
										
											2018-11-30 17:14:41 +01:00
										 |  |  |             'ldapsearch', '-x', '-ZZ', | 
					
						
							|  |  |  |             '-h', master.hostname, | 
					
						
							|  |  |  |             '-p', '389', | 
					
						
							| 
									
										
										
										
											2018-08-02 16:59:07 +02:00
										 |  |  |             '-D', 'cn=directory manager', | 
					
						
							|  |  |  |             '-w', master.config.dirman_password, | 
					
						
							|  |  |  |             '-s', 'base', | 
					
						
							|  |  |  |             '-b', 'uid={user},cn=users,cn=accounts,{base_dn}'.format( | 
					
						
							|  |  |  |                 user=user, base_dn=base_dn), | 
					
						
							|  |  |  |             '-o', 'ldif-wrap=no', | 
					
						
							|  |  |  |             '-LLL', | 
					
						
							|  |  |  |             'krblastpwdchange', | 
					
						
							|  |  |  |             'krbpasswordexpiration'] | 
					
						
							|  |  |  |         output = master.run_command(search_cmd).stdout_text.lower() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # extract krblastpwdchange and krbpasswordexpiration | 
					
						
							|  |  |  |         krbchg_pattern = 'krblastpwdchange: (.+)\n' | 
					
						
							|  |  |  |         krbexp_pattern = 'krbpasswordexpiration: (.+)\n' | 
					
						
							|  |  |  |         krblastpwdchange = re.findall(krbchg_pattern, output)[0] | 
					
						
							|  |  |  |         krbexp = re.findall(krbexp_pattern, output)[0] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # sleep 1 sec (krblastpwdchange and krbpasswordexpiration have at most | 
					
						
							|  |  |  |         # a 1s precision) | 
					
						
							|  |  |  |         time.sleep(1) | 
					
						
							|  |  |  |         # perform ldapmodify on userpassword as dir mgr | 
					
						
							|  |  |  |         mod = NamedTemporaryFile() | 
					
						
							|  |  |  |         ldif_file = mod.name | 
					
						
							|  |  |  |         entry_ldif = textwrap.dedent("""
 | 
					
						
							|  |  |  |             dn: uid={user},cn=users,cn=accounts,{base_dn} | 
					
						
							|  |  |  |             changetype: modify | 
					
						
							|  |  |  |             replace: userpassword | 
					
						
							|  |  |  |             userpassword: {new_passwd} | 
					
						
							|  |  |  |         """).format(
 | 
					
						
							|  |  |  |             user=user, | 
					
						
							|  |  |  |             base_dn=base_dn, | 
					
						
							|  |  |  |             new_passwd=new_passwd) | 
					
						
							|  |  |  |         master.put_file_contents(ldif_file, entry_ldif) | 
					
						
							|  |  |  |         arg = ['ldapmodify', | 
					
						
							| 
									
										
										
										
											2018-11-30 17:14:41 +01:00
										 |  |  |                '-ZZ', | 
					
						
							| 
									
										
										
										
											2018-08-02 16:59:07 +02:00
										 |  |  |                '-h', master.hostname, | 
					
						
							|  |  |  |                '-p', '389', '-D', | 
					
						
							|  |  |  |                str(master.config.dirman_dn),   # pylint: disable=no-member | 
					
						
							|  |  |  |                '-w', master.config.dirman_password, | 
					
						
							|  |  |  |                '-f', ldif_file] | 
					
						
							|  |  |  |         master.run_command(arg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Test new password with kinit | 
					
						
							|  |  |  |         master.run_command(['kinit', user], stdin_text=new_passwd) | 
					
						
							|  |  |  |         # Retrieve krblastpwdchange and krbpasswordexpiration | 
					
						
							|  |  |  |         output = master.run_command(search_cmd).stdout_text.lower() | 
					
						
							|  |  |  |         # extract krblastpwdchange and krbpasswordexpiration | 
					
						
							|  |  |  |         newkrblastpwdchange = re.findall(krbchg_pattern, output)[0] | 
					
						
							|  |  |  |         newkrbexp = re.findall(krbexp_pattern, output)[0] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # both should have changed | 
					
						
							|  |  |  |         assert newkrblastpwdchange != krblastpwdchange | 
					
						
							|  |  |  |         assert newkrbexp != krbexp | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Now test passwd modif with ldappasswd | 
					
						
							|  |  |  |         time.sleep(1) | 
					
						
							|  |  |  |         master.run_command([ | 
					
						
							|  |  |  |             paths.LDAPPASSWD, | 
					
						
							|  |  |  |             '-D', str(master.config.dirman_dn),   # pylint: disable=no-member | 
					
						
							|  |  |  |             '-w', master.config.dirman_password, | 
					
						
							|  |  |  |             '-a', new_passwd, | 
					
						
							|  |  |  |             '-s', new_passwd2, | 
					
						
							|  |  |  |             '-x', '-ZZ', | 
					
						
							|  |  |  |             '-H', 'ldap://{hostname}'.format(hostname=master.hostname), | 
					
						
							|  |  |  |             'uid={user},cn=users,cn=accounts,{base_dn}'.format( | 
					
						
							|  |  |  |                 user=user, base_dn=base_dn)] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         # Test new password with kinit | 
					
						
							|  |  |  |         master.run_command(['kinit', user], stdin_text=new_passwd2) | 
					
						
							|  |  |  |         # Retrieve krblastpwdchange and krbpasswordexpiration | 
					
						
							|  |  |  |         output = master.run_command(search_cmd).stdout_text.lower() | 
					
						
							|  |  |  |         # extract krblastpwdchange and krbpasswordexpiration | 
					
						
							|  |  |  |         newkrblastpwdchange2 = re.findall(krbchg_pattern, output)[0] | 
					
						
							|  |  |  |         newkrbexp2 = re.findall(krbexp_pattern, output)[0] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # both should have changed | 
					
						
							|  |  |  |         assert newkrblastpwdchange != newkrblastpwdchange2 | 
					
						
							|  |  |  |         assert newkrbexp != newkrbexp2 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-01 15:19:35 -04:00
										 |  |  |     def test_change_selinuxusermaporder(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         An update file meant to ensure a more sane default was | 
					
						
							|  |  |  |         overriding any customization done to the order. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         maporder = "unconfined_u:s0-s0:c0.c1023" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # set a new default | 
					
						
							| 
									
										
										
										
											2018-08-02 16:59:07 +02:00
										 |  |  |         tasks.kinit_admin(self.master) | 
					
						
							| 
									
										
										
										
											2018-06-01 15:19:35 -04:00
										 |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa", "config-mod", | 
					
						
							|  |  |  |              "--ipaselinuxusermaporder={}".format(maporder)], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode == 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # apply the update | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa-server-upgrade"], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode == 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # ensure result is the same | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa", "config-show"], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode == 0 | 
					
						
							|  |  |  |         assert "SELinux user map order: {}".format( | 
					
						
							|  |  |  |             maporder) in result.stdout_text | 
					
						
							| 
									
										
										
										
											2018-07-12 14:50:40 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_ipa_console(self): | 
					
						
							| 
									
										
										
										
											2018-08-02 16:59:07 +02:00
										 |  |  |         tasks.kinit_admin(self.master) | 
					
						
							| 
									
										
										
										
											2018-07-12 14:50:40 +02:00
										 |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa", "console"], | 
					
						
							|  |  |  |             stdin_text="api.env" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert "ipalib.config.Env" in result.stdout_text | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         filename = tasks.upload_temp_contents( | 
					
						
							|  |  |  |             self.master, | 
					
						
							|  |  |  |             "print(api.env)\n" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa", "console", filename], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert "ipalib.config.Env" in result.stdout_text | 
					
						
							| 
									
										
										
										
											2018-07-17 16:08:49 -03:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_list_help_topics(self): | 
					
						
							| 
									
										
										
										
											2018-08-02 16:59:07 +02:00
										 |  |  |         tasks.kinit_admin(self.master) | 
					
						
							| 
									
										
										
										
											2018-07-17 16:08:49 -03:00
										 |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ["ipa", "help", "topics"], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode == 0 | 
					
						
							| 
									
										
										
										
											2018-07-31 13:24:01 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_ssh_key_connection(self, tmpdir): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Integration test for https://pagure.io/SSSD/sssd/issue/3747 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         test_user = 'test-ssh' | 
					
						
							|  |  |  |         master = self.master.hostname | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         pub_keys = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for i in range(40): | 
					
						
							|  |  |  |             ssh_key_pair = tasks.generate_ssh_keypair() | 
					
						
							|  |  |  |             pub_keys.append(ssh_key_pair[1]) | 
					
						
							|  |  |  |             with open(os.path.join( | 
					
						
							|  |  |  |                     tmpdir, 'ssh_priv_{}'.format(i)), 'w') as fp: | 
					
						
							|  |  |  |                 fp.write(ssh_key_pair[0]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tasks.kinit_admin(self.master) | 
					
						
							|  |  |  |         self.master.run_command(['ipa', 'user-add', test_user, | 
					
						
							|  |  |  |                                  '--first=tester', '--last=tester']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         keys_opts = ' '.join(['--ssh "{}"'.format(k) for k in pub_keys]) | 
					
						
							|  |  |  |         cmd = 'ipa user-mod {} {}'.format(test_user, keys_opts) | 
					
						
							|  |  |  |         self.master.run_command(cmd) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # connect with first SSH key | 
					
						
							|  |  |  |         first_priv_key_path = os.path.join(tmpdir, 'ssh_priv_1') | 
					
						
							|  |  |  |         # change private key permission to comply with SS rules | 
					
						
							|  |  |  |         os.chmod(first_priv_key_path, 0o600) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         sshcon = paramiko.SSHClient() | 
					
						
							|  |  |  |         sshcon.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # first connection attempt is a workaround for | 
					
						
							|  |  |  |         # https://pagure.io/SSSD/sssd/issue/3669 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             sshcon.connect(master, username=test_user, | 
					
						
							|  |  |  |                            key_filename=first_priv_key_path, timeout=1) | 
					
						
							|  |  |  |         except (paramiko.AuthenticationException, paramiko.SSHException): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             sshcon.connect(master, username=test_user, | 
					
						
							|  |  |  |                            key_filename=first_priv_key_path, timeout=1) | 
					
						
							|  |  |  |         except (paramiko.AuthenticationException, | 
					
						
							|  |  |  |                 paramiko.SSHException) as e: | 
					
						
							|  |  |  |             pytest.fail('Authentication using SSH key not successful', e) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         journal_cmd = ['journalctl', '--since=today', '-u', 'sshd'] | 
					
						
							|  |  |  |         result = self.master.run_command(journal_cmd) | 
					
						
							|  |  |  |         output = result.stdout_text | 
					
						
							|  |  |  |         assert not re.search('exited on signal 13', output) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # cleanup | 
					
						
							|  |  |  |         self.master.run_command(['ipa', 'user-del', test_user]) | 
					
						
							| 
									
										
										
										
											2018-08-23 10:34:39 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_ssh_leak(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Integration test for https://pagure.io/SSSD/sssd/issue/3794 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def count_pipes(): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             res = self.master.run_command(['pidof', 'sssd_ssh']) | 
					
						
							|  |  |  |             pid = res.stdout_text.strip() | 
					
						
							|  |  |  |             proc_path = '/proc/{}/fd'.format(pid) | 
					
						
							|  |  |  |             res = self.master.run_command(['ls', '-la', proc_path]) | 
					
						
							|  |  |  |             fds_text = res.stdout_text.strip() | 
					
						
							|  |  |  |             return sum((1 for _ in re.finditer(r'pipe', fds_text))) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         test_user = 'test-ssh' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tasks.kinit_admin(self.master) | 
					
						
							|  |  |  |         self.master.run_command(['ipa', 'user-add', test_user, | 
					
						
							|  |  |  |                                  '--first=tester', '--last=tester']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         certs = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # we are ok with whatever certificate for this test | 
					
						
							|  |  |  |         external_ca = ExternalCA() | 
					
						
							|  |  |  |         for _dummy in range(3): | 
					
						
							|  |  |  |             cert = external_ca.create_ca() | 
					
						
							|  |  |  |             cert = tasks.strip_cert_header(cert.decode('utf-8')) | 
					
						
							|  |  |  |             certs.append('"{}"'.format(cert)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cert_args = list( | 
					
						
							|  |  |  |             chain.from_iterable(list(zip(repeat('--certificate'), certs)))) | 
					
						
							|  |  |  |         cmd = 'ipa user-add-cert {} {}'.format(test_user, ' '.join(cert_args)) | 
					
						
							|  |  |  |         self.master.run_command(cmd) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tasks.clear_sssd_cache(self.master) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         num_of_pipes = count_pipes() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for _dummy in range(3): | 
					
						
							|  |  |  |             self.master.run_command([paths.SSS_SSH_AUTHORIZEDKEYS, test_user]) | 
					
						
							|  |  |  |             current_num_of_pipes = count_pipes() | 
					
						
							|  |  |  |             assert current_num_of_pipes == num_of_pipes | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # cleanup | 
					
						
							|  |  |  |         self.master.run_command(['ipa', 'user-del', test_user]) | 
					
						
							| 
									
										
										
										
											2018-09-27 16:36:59 +10:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_certificate_out_write_to_file(self): | 
					
						
							|  |  |  |         # commands to test; name of temporary file will be appended | 
					
						
							|  |  |  |         commands = [ | 
					
						
							|  |  |  |             ['ipa', 'cert-show', '1', '--certificate-out'], | 
					
						
							|  |  |  |             ['ipa', 'cert-show', '1', '--chain', '--certificate-out'], | 
					
						
							|  |  |  |             ['ipa', 'ca-show', 'ipa', '--certificate-out'], | 
					
						
							|  |  |  |             ['ipa', 'ca-show', 'ipa', '--chain', '--certificate-out'], | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for command in commands: | 
					
						
							|  |  |  |             cmd = self.master.run_command(['mktemp']) | 
					
						
							|  |  |  |             filename = cmd.stdout_text.strip() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.master.run_command(command + [filename]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Check that a PEM file was written.  If --chain was | 
					
						
							|  |  |  |             # used, load_pem_x509_certificate will return the | 
					
						
							|  |  |  |             # first certificate, which is fine for this test. | 
					
						
							|  |  |  |             data = self.master.get_file_contents(filename) | 
					
						
							|  |  |  |             x509.load_pem_x509_certificate(data, backend=default_backend()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.master.run_command(['rm', '-f', filename]) | 
					
						
							| 
									
										
										
										
											2018-11-06 13:57:14 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_sssd_ifp_access_ipaapi(self): | 
					
						
							|  |  |  |         # check that ipaapi is allowed to access sssd-ifp for smartcard auth | 
					
						
							|  |  |  |         # https://pagure.io/freeipa/issue/7751 | 
					
						
							|  |  |  |         username = 'admin' | 
					
						
							|  |  |  |         # get UID for user | 
					
						
							|  |  |  |         result = self.master.run_command(['ipa', 'user-show', username]) | 
					
						
							|  |  |  |         mo = re.search(r'UID: (\d+)', result.stdout_text) | 
					
						
							|  |  |  |         assert mo is not None, result.stdout_text | 
					
						
							|  |  |  |         uid = mo.group(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cmd = [ | 
					
						
							|  |  |  |             'dbus-send', | 
					
						
							|  |  |  |             '--print-reply', '--system', | 
					
						
							|  |  |  |             '--dest=org.freedesktop.sssd.infopipe', | 
					
						
							|  |  |  |             '/org/freedesktop/sssd/infopipe/Users', | 
					
						
							|  |  |  |             'org.freedesktop.sssd.infopipe.Users.FindByName', | 
					
						
							|  |  |  |             'string:{}'.format(username) | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         # test IFP as root | 
					
						
							|  |  |  |         result = self.master.run_command(cmd) | 
					
						
							|  |  |  |         assert uid in result.stdout_text | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # test IFP as ipaapi | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ['sudo', '-u', IPAAPI_USER, '--'] + cmd | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert uid in result.stdout_text | 
					
						
							| 
									
										
										
										
											2018-10-17 18:12:52 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_ipa_cacert_manage_install(self): | 
					
						
							|  |  |  |         # Re-install the IPA CA | 
					
						
							|  |  |  |         self.master.run_command([ | 
					
						
							|  |  |  |             paths.IPA_CACERT_MANAGE, | 
					
						
							|  |  |  |             'install', | 
					
						
							|  |  |  |             paths.IPA_CA_CRT]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Test a non-existent file | 
					
						
							|  |  |  |         result = self.master.run_command([ | 
					
						
							|  |  |  |             paths.IPA_CACERT_MANAGE, | 
					
						
							|  |  |  |             'install', | 
					
						
							|  |  |  |             '/var/run/cert_not_found'], raiseonerr=False) | 
					
						
							|  |  |  |         assert result.returncode == 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cmd = self.master.run_command(['mktemp']) | 
					
						
							|  |  |  |         filename = cmd.stdout_text.strip() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for contents in (good_pkcs7,): | 
					
						
							|  |  |  |             self.master.put_file_contents(filename, contents) | 
					
						
							|  |  |  |             result = self.master.run_command([ | 
					
						
							|  |  |  |                 paths.IPA_CACERT_MANAGE, | 
					
						
							|  |  |  |                 'install', | 
					
						
							|  |  |  |                 filename]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for contents in (badcert,): | 
					
						
							|  |  |  |             self.master.put_file_contents(filename, contents) | 
					
						
							|  |  |  |             result = self.master.run_command([ | 
					
						
							|  |  |  |                 paths.IPA_CACERT_MANAGE, | 
					
						
							|  |  |  |                 'install', | 
					
						
							|  |  |  |                 filename], raiseonerr=False) | 
					
						
							|  |  |  |             assert result.returncode == 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.master.run_command(['rm', '-f', filename]) | 
					
						
							| 
									
										
										
										
											2019-01-11 11:18:05 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_hbac_systemd_user(self): | 
					
						
							|  |  |  |         # https://pagure.io/freeipa/issue/7831 | 
					
						
							|  |  |  |         tasks.kinit_admin(self.master) | 
					
						
							|  |  |  |         # check for presence | 
					
						
							|  |  |  |         self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'hbacsvc-show', 'systemd-user'] | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2019-01-15 17:33:56 +01:00
										 |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'hbacrule-show', 'allow_systemd-user', '--all'] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         lines = set(l.strip() for l in result.stdout_text.split('\n')) | 
					
						
							|  |  |  |         assert 'User category: all' in lines | 
					
						
							|  |  |  |         assert 'Host category: all' in lines | 
					
						
							|  |  |  |         assert 'Enabled: TRUE' in lines | 
					
						
							|  |  |  |         assert 'Services: systemd-user' in lines | 
					
						
							|  |  |  |         assert 'accessruletype: allow' in lines | 
					
						
							| 
									
										
										
										
											2019-01-11 11:18:05 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         # delete both | 
					
						
							|  |  |  |         self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'hbacrule-del', 'allow_systemd-user'] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'hbacsvc-del', 'systemd-user'] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # run upgrade | 
					
						
							|  |  |  |         result = self.master.run_command(['ipa-server-upgrade']) | 
					
						
							|  |  |  |         assert 'Created hbacsvc systemd-user' in result.stderr_text | 
					
						
							|  |  |  |         assert 'Created hbac rule allow_systemd-user' in result.stderr_text | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # check for presence | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'hbacrule-show', 'allow_systemd-user', '--all'] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         lines = set(l.strip() for l in result.stdout_text.split('\n')) | 
					
						
							|  |  |  |         assert 'User category: all' in lines | 
					
						
							|  |  |  |         assert 'Host category: all' in lines | 
					
						
							|  |  |  |         assert 'Enabled: TRUE' in lines | 
					
						
							|  |  |  |         assert 'Services: systemd-user' in lines | 
					
						
							|  |  |  |         assert 'accessruletype: allow' in lines | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'hbacsvc-show', 'systemd-user'] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # only delete rule | 
					
						
							|  |  |  |         self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'hbacrule-del', 'allow_systemd-user'] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # run upgrade | 
					
						
							|  |  |  |         result = self.master.run_command(['ipa-server-upgrade']) | 
					
						
							|  |  |  |         assert ( | 
					
						
							|  |  |  |             'hbac service systemd-user already exists' in result.stderr_text | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert ( | 
					
						
							|  |  |  |             'Created hbac rule allow_systemd-user' not in result.stderr_text | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         result = self.master.run_command( | 
					
						
							|  |  |  |             ['ipa', 'hbacrule-show', 'allow_systemd-user'], | 
					
						
							|  |  |  |             raiseonerr=False | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert result.returncode != 0 | 
					
						
							|  |  |  |         assert 'HBAC rule not found' in result.stderr_text |