freeipa/ipatests/test_ipaserver/test_install/test_installutils.py
Alexander Bokovoy 46b0746fe9 test_installutils: run gpg-agent under a specific SELinux context
system_u:system_r:init_t:s0 cannot execute gpg=agent when SELinux is in
enforcing mode. Use SELinux context that allows this execution:
system_u:system_r:initrc_t:s0 and wrap the whole execution into a bash
run to make sure init_t -> initrc_t transition.

Fixes: https://pagure.io/freeipa/issue/8699
Signed-off-by: Alexander Bokovoy <abokovoy@redhat.com>
Reviewed-By: Florence Blanc-Renaud <flo@redhat.com>
2021-02-11 09:09:46 +01:00

336 lines
10 KiB
Python

#
# Copyright (C) 2017 FreeIPA Contributors. See COPYING for license
#
from __future__ import absolute_import
import binascii
import os
import psutil
import re
import subprocess
import textwrap
import pytest
from unittest.mock import patch, mock_open
from ipaplatform.paths import paths
from ipapython import ipautil
from ipapython.admintool import ScriptError
from ipaserver.install import installutils
from ipaserver.install import ipa_backup
from ipaserver.install import ipa_restore
GPG_GENKEY = textwrap.dedent("""
%echo Generating a standard key
Key-Type: RSA
Key-Length: 2048
Name-Real: IPA Backup
Name-Comment: IPA Backup
Name-Email: root@example.com
Expire-Date: 0
Passphrase: {passphrase}
%commit
%echo done
""")
@pytest.fixture
def gpgkey(request, tempdir):
passphrase = "Secret123"
gnupghome = os.path.join(tempdir, "gnupg")
os.makedirs(gnupghome, 0o700)
# provide clean env for gpg test
env = os.environ.copy()
orig_gnupghome = env.get('GNUPGHOME')
env['GNUPGHOME'] = gnupghome
env['LC_ALL'] = 'C.UTF-8'
env['LANGUAGE'] = 'C'
devnull = open(os.devnull, 'w')
# allow passing passphrases to agent
with open(os.path.join(gnupghome, "gpg-agent.conf"), 'w') as f:
f.write("verbose\n")
f.write("allow-preset-passphrase\n")
# daemonize agent (detach from the console and run in the background)
subprocess.run(
[paths.SYSTEMD_RUN, '--service-type=forking',
'--property', 'SELinuxContext=system_u:system_r:initrc_t:s0',
'--setenv=GNUPGHOME={}'.format(gnupghome),
'--setenv=LC_ALL=C.UTF-8',
'--setenv=LANGUAGE=C',
'--unit=gpg-agent', '/bin/bash',
'-c', ' '.join([paths.GPG_AGENT, '--daemon', '--batch'])],
check=True,
env=env,
)
def fin():
subprocess.run(
[paths.SYSTEMCTL, 'stop', 'gpg-agent'],
check=True,
env=env,
)
if orig_gnupghome is not None:
os.environ['GNUPGHOME'] = orig_gnupghome
else:
os.environ.pop('GNUPGHOME', None)
request.addfinalizer(fin)
# create public / private key pair
keygen = os.path.join(gnupghome, 'keygen')
with open(keygen, 'w') as f:
f.write(GPG_GENKEY.format(passphrase=passphrase))
subprocess.check_call(
[paths.GPG2, '--batch', '--gen-key', keygen],
env=env, stdout=devnull, stderr=devnull
)
# get keygrip of private key
out = subprocess.check_output(
[paths.GPG2, "--list-secret-keys", "--with-keygrip"],
env=env, stderr=subprocess.STDOUT
)
mo = re.search("Keygrip = ([A-Z0-9]{32,})", out.decode('utf-8'))
if mo is None:
raise ValueError(out.decode('utf-8'))
keygrip = mo.group(1)
# unlock private key
cmd = "PRESET_PASSPHRASE {} -1 {}".format(
keygrip,
binascii.hexlify(passphrase.encode('utf-8')).decode('utf-8')
)
subprocess.check_call(
[paths.GPG_CONNECT_AGENT, cmd, "/bye"],
env=env, stdout=devnull, stderr=devnull
)
# set env for the rest of the progress
os.environ['GNUPGHOME'] = gnupghome
def test_gpg_encrypt(tempdir):
src = os.path.join(tempdir, "data.txt")
encrypted = os.path.join(tempdir, "data.gpg")
decrypted = os.path.join(tempdir, "data.out")
passwd = 'Secret123'
payload = 'Dummy text\n'
with open(src, 'w') as f:
f.write(payload)
installutils.encrypt_file(src, encrypted, password=passwd)
assert os.path.isfile(encrypted)
installutils.decrypt_file(encrypted, decrypted, password=passwd)
assert os.path.isfile(decrypted)
with open(decrypted) as f:
assert f.read() == payload
with pytest.raises(ipautil.CalledProcessError):
installutils.decrypt_file(encrypted, decrypted, password='invalid')
def test_gpg_asymmetric(tempdir, gpgkey):
src = os.path.join(tempdir, "asymmetric.txt")
encrypted = src + ".gpg"
payload = 'Dummy text\n'
with open(src, 'w') as f:
f.write(payload)
ipa_backup.encrypt_file(src, remove_original=True)
assert os.path.isfile(encrypted)
assert not os.path.exists(src)
ipa_restore.decrypt_file(tempdir, encrypted)
assert os.path.isfile(src)
with open(src) as f:
assert f.read() == payload
@pytest.mark.parametrize(
"platform, expected",
[
("fedora", "fedora"),
("fedora_container", "fedora"),
("fedora_containers", "fedora_containers"),
("fedoracontainer", "fedoracontainer"),
("rhel", "rhel"),
("rhel_container", "rhel"),
]
)
def test_get_current_platform(monkeypatch, platform, expected):
monkeypatch.setattr(installutils.ipaplatform, "NAME", platform)
assert installutils.get_current_platform() == expected
# The mock_exists in the following tests mocks that the cgroups
# files exist even in non-containers. The values are provided by
# mock_open_multi.
@patch('ipaserver.install.installutils.in_container')
@patch('os.path.exists')
def test_in_container_no_cgroup(mock_exists, mock_in_container):
"""
In a container in a container without cgroups, can't detect RAM
"""
mock_in_container.return_value = True
mock_exists.side_effect = [False, False]
with pytest.raises(ScriptError):
installutils.check_available_memory(False)
def mock_open_multi(*contents):
"""Mock opening multiple files.
For our purposes the first read is limit, second is usage.
Note: this overrides *all* opens so if you use pdb then you will
need to extend the list by 2.
"""
mock_files = [
mock_open(read_data=content).return_value for content in contents
]
mock_multi = mock_open()
mock_multi.side_effect = mock_files
return mock_multi
RAM_OK = str(1800 * 1000 * 1000)
RAM_CA_USED = str(150 * 1000 * 1000)
RAM_MOSTLY_USED = str(1500 * 1000 * 1000)
RAM_NOT_OK = str(10 * 1000 * 1000)
@patch('ipaserver.install.installutils.in_container')
@patch('builtins.open', mock_open_multi(RAM_NOT_OK, "0"))
@patch('os.path.exists')
def test_cgroup_v1_insufficient_ram(mock_exists, mock_in_container):
"""In a container with insufficient RAM and zero used"""
mock_in_container.return_value = True
mock_exists.side_effect = [True, True]
with pytest.raises(ScriptError):
installutils.check_available_memory(True)
@patch('ipaserver.install.installutils.in_container')
@patch('builtins.open', mock_open_multi(RAM_OK, RAM_CA_USED))
@patch('os.path.exists')
def test_cgroup_v1_ram_ok_no_ca(mock_exists, mock_in_container):
"""In a container with just enough RAM to install w/o a CA"""
mock_in_container.return_value = True
mock_exists.side_effect = [True, True]
installutils.check_available_memory(False)
@patch('ipaserver.install.installutils.in_container')
@patch('builtins.open', mock_open_multi(RAM_OK, RAM_MOSTLY_USED))
@patch('os.path.exists')
def test_cgroup_v1_insufficient_ram_with_ca(mock_exists, mock_in_container):
"""In a container and just miss the minimum RAM required"""
mock_in_container.return_value = True
mock_exists.side_effect = [True, True]
with pytest.raises(ScriptError):
installutils.check_available_memory(True)
@patch('ipaserver.install.installutils.in_container')
@patch('builtins.open', mock_open_multi(RAM_NOT_OK, "0"))
@patch('os.path.exists')
def test_cgroup_v2_insufficient_ram(mock_exists, mock_in_container):
"""In a container with insufficient RAM and zero used"""
mock_in_container.return_value = True
mock_exists.side_effect = [False, True, True]
with pytest.raises(ScriptError):
installutils.check_available_memory(True)
@patch('ipaserver.install.installutils.in_container')
@patch('builtins.open', mock_open_multi(RAM_OK, RAM_CA_USED))
@patch('os.path.exists')
def test_cgroup_v2_ram_ok_no_ca(mock_exists, mock_in_container):
"""In a container with just enough RAM to install w/o a CA"""
mock_in_container.return_value = True
mock_exists.side_effect = [False, True, True]
installutils.check_available_memory(False)
@patch('ipaserver.install.installutils.in_container')
@patch('builtins.open', mock_open_multi(RAM_OK, RAM_MOSTLY_USED))
@patch('os.path.exists')
def test_cgroup_v2_insufficient_ram_with_ca(mock_exists, mock_in_container):
"""In a container and just miss the minimum RAM required"""
mock_in_container.return_value = True
mock_exists.side_effect = [False, True, True]
with pytest.raises(ScriptError):
installutils.check_available_memory(True)
@patch('ipaserver.install.installutils.in_container')
@patch('builtins.open', mock_open_multi('max', RAM_MOSTLY_USED))
@patch('os.path.exists')
@patch('psutil.virtual_memory')
def test_cgroup_v2_no_limit_ok(mock_psutil, mock_exists, mock_in_container):
"""In a container and just miss the minimum RAM required"""
mock_in_container.return_value = True
fake_memory = psutil._pslinux.svmem
fake_memory.available = int(RAM_OK)
mock_psutil.return_value = fake_memory
mock_exists.side_effect = [False, True, True]
installutils.check_available_memory(True)
@patch('ipaserver.install.installutils.in_container')
@patch('builtins.open', mock_open_multi('max', RAM_MOSTLY_USED))
@patch('os.path.exists')
@patch('psutil.virtual_memory')
def test_cgroup_v2_no_limit_not_ok(mock_psutil, mock_exists, mock_in_container):
"""In a container and just miss the minimum RAM required"""
mock_in_container.return_value = True
fake_memory = psutil._pslinux.svmem
fake_memory.available = int(RAM_NOT_OK)
mock_psutil.return_value = fake_memory
mock_exists.side_effect = [False, True, True]
with pytest.raises(ScriptError):
installutils.check_available_memory(True)
@patch('ipaserver.install.installutils.in_container')
@patch('psutil.virtual_memory')
def test_bare_insufficient_ram_with_ca(mock_psutil, mock_in_container):
"""Not a container and insufficient RAM"""
mock_in_container.return_value = False
fake_memory = psutil._pslinux.svmem
fake_memory.available = int(RAM_NOT_OK)
mock_psutil.return_value = fake_memory
with pytest.raises(ScriptError):
installutils.check_available_memory(True)
@patch('ipaserver.install.installutils.in_container')
@patch('psutil.virtual_memory')
def test_bare_ram_ok(mock_psutil, mock_in_container):
"""Not a container and sufficient RAM"""
mock_in_container.return_value = False
fake_memory = psutil._pslinux.svmem
fake_memory.available = int(RAM_OK)
mock_psutil.return_value = fake_memory
installutils.check_available_memory(True)