tasks: add run_ssh_cmd

Paramiko is not compatible with FIPS.
A replacement is needed, and since what clients use is "ssh",
create a shim over it so that tests can leverage it.

Fixes: https://pagure.io/freeipa/issue/8129
Signed-off-by: François Cami <fcami@redhat.com>
Reviewed-By: Mohammad Rizwan <myusuf@redhat.com>
Reviewed-By: Michal Polovka <mpolovka@redhat.com>
This commit is contained in:
François Cami
2020-07-22 09:59:12 +02:00
parent 178d80969c
commit d5148c6541

View File

@@ -30,6 +30,7 @@ import collections
import itertools
import shutil
import copy
import subprocess
import tempfile
import time
from pipes import quote
@@ -2297,3 +2298,135 @@ def get_sssd_version(host):
"""Get sssd version on remote host."""
version = host.run_command('sssd --version').stdout_text.strip()
return parse_version(version)
def run_ssh_cmd(
from_host=None, to_host=None, username=None, cmd=None,
auth_method=None, password=None, private_key_path=None,
expect_auth_success=True, expect_auth_failure=None,
verbose=True, connect_timeout=2, strict_host_key_checking=False
):
"""Runs an ssh connection from the controller to the host.
- auth_method can be either "password" or "key".
- In the first case, set password to the user's password ; in the
second case, set private_key_path to the path of the private key.
- If expect_auth_success or expect_auth_failure, analyze the ssh
client's log and check whether the selected authentication method
worked. expect_auth_failure takes precedence over expect_auth_success.
- If verbose, display the ssh client verbose log.
- Both expect_auth_success and verbose are True by default. Debugging
ssh client failures is next to impossible without the associated
debug log.
Possible enhancements:
- select which host to run from (currently: controller only)
"""
if from_host is not None:
raise NotImplementedError(
"from_host must be None ; running from anywhere but the "
"controller is not implemented yet."
)
if expect_auth_failure:
expect_auth_success = False
if to_host is None or username is None or auth_method is None:
raise ValueError("host, username and auth_method are mandatory")
if cmd is None:
# cmd must run properly on all supported platforms.
# true(1) ("do nothing, successfully") is the obvious candidate.
cmd = "true"
if auth_method == "password":
if password is None:
raise ValueError(
"password is mandatory if auth_method == password"
)
ssh_cmd = (
"ssh",
"-v",
"-o", "PubkeyAuthentication=no",
"-o", "GSSAPIAuthentication=no",
"-o", "ConnectTimeout={connect_timeout}".format(
connect_timeout=connect_timeout
),
)
elif auth_method == "key":
if private_key_path is None:
raise ValueError(
"private_key_path is mandatory if auth_method == key"
)
ssh_cmd = (
"ssh",
"-v",
"-o", "BatchMode=yes",
"-o", "PubkeyAuthentication=yes",
"-o", "GSSAPIAuthentication=no",
"-o", "ConnectTimeout={connect_timeout}".format(
connect_timeout=connect_timeout
),
)
else:
raise ValueError(
"auth_method must either be password or key"
)
ssh_cmd_1 = list(ssh_cmd)
if strict_host_key_checking is True:
ssh_cmd_1.extend(("-o", "StrictHostKeyChecking=yes"))
else:
ssh_cmd_1.extend(("-o", "StrictHostKeyChecking=no"))
if auth_method == "password":
ssh_cmd_1 = list(("sshpass", "-p", password)) + ssh_cmd_1
elif auth_method == "key":
ssh_cmd_1.extend(("-i", private_key_path))
ssh_cmd_1.extend(("-l", username, to_host, cmd))
try:
if verbose:
output = "OpenSSH command: {sshcmd}".format(sshcmd=ssh_cmd_1)
logger.info(output)
remote_cmd = subprocess.Popen(
ssh_cmd_1,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
while remote_cmd.poll() is None:
time.sleep(0.1)
return_code = remote_cmd.returncode
stderr = os.linesep.join(
str(line) for line in remote_cmd.stderr.readlines()
)
stdout = os.linesep.join(
str(line) for line in remote_cmd.stderr.readlines()
)
if verbose:
print_stdout = "Standard output: {stdout}".format(stdout=stdout)
print_stderr = "Standard error: {stderr}".format(stderr=stderr)
logger.info(print_stdout)
logger.info(print_stderr)
except Exception as e:
pytest.fail("Unable to run ssh command.", e)
if auth_method == "password":
if expect_auth_success is True:
assert "Authentication succeeded (keyboard-interactive)" in \
stderr
# do not assert the return code:
# it can be >0 if the command failed.
elif expect_auth_failure is True:
# sshpass return code: 5 for failed auth
assert return_code == 5
assert "Authentication succeeded" not in stderr
elif auth_method == "key":
if expect_auth_success is True:
assert "Authentication succeeded (publickey)" in stderr
# do not assert the return code:
# it can be >0 if the command failed.
elif expect_auth_failure is True:
# ssh return code: 255 for failed auth
assert return_code == 255
assert "Authentication succeeded" not in stderr
assert "No more authentication methods to try." in stderr
return (return_code, stdout, stderr)