Introduce a class for remote commands

Introduce a class inspired by subprocess.Popen that handles
running a command on a remote machine and handling its output.

To separate stdout & stderr streams of a remote command,
they need to be read in parallel, so that one of them doesn't
stall the runner when its buffer fills up. Accomplish this
by using a thread for each stream.

Part of the work for: https://fedorahosted.org/freeipa/ticket/3621
This commit is contained in:
Petr Viktorin 2013-05-30 14:25:01 +02:00
parent 353f3c62c3
commit 00f133458b
2 changed files with 132 additions and 46 deletions

View File

@ -20,21 +20,103 @@
"""Host class for integration testing""" """Host class for integration testing"""
import os import os
import collections
import socket import socket
import threading
import subprocess
import paramiko import paramiko
from ipapython import ipautil from ipapython import ipautil
from ipapython.ipa_log_manager import log_mgr from ipapython.ipa_log_manager import log_mgr
RunResult = collections.namedtuple('RunResult', 'output exit_code')
class RemoteCommand(object):
"""A Popen-style object representing a remote command
Unlike subprocess.Popen, this does not run the given command; instead
it only starts a shell. The command must be written to stdin manually.
The standard error and output are handled by this class. They're not
available for file-like reading. They are logged by default.
To make sure reading doesn't stall after one buffer fills up, they are read
in parallel using threads.
After calling wait(), stdout_text and stderr_text attributes will be
strings containing the output, and returncode will contain the
exit code.
:param host: The Host on which the command is run
:param argv: The command that will be run (for logging only)
:param index: An identification number added to the logs
:param log_stdout: If false, stdout will not be logged
"""
def __init__(self, host, argv, index, log_stdout=True):
self.returncode = None
self.host = host
self.argv = argv
self._stdout_lines = []
self._stderr_lines = []
self.running_threads = set()
self.logger_name = '%s.cmd%s' % (self.host.logger_name, index)
self.log = log_mgr.get_logger(self.logger_name)
self.log.info('RUN %s', argv)
self._ssh = host.transport.open_channel('session')
self._ssh.invoke_shell()
stdin = self.stdin = self._ssh.makefile('wb')
stdout = self._ssh.makefile('rb')
stderr = self._ssh.makefile_stderr('rb')
self._start_pipe_thread(self._stdout_lines, stdout, 'out', log_stdout)
self._start_pipe_thread(self._stderr_lines, stderr, 'err', True)
self._done = False
def wait(self, raiseonerr=True):
"""Wait for the remote process to exit
Raises an excption if the exit code is not 0.
"""
if self._done:
return self.returncode
self._ssh.shutdown_write()
while self.running_threads:
self.running_threads.pop().join()
self.stdout_text = ''.join(self._stdout_lines)
self.stderr_text = ''.join(self._stderr_lines)
self.returncode = self._ssh.recv_exit_status()
self._ssh.close()
self._done = True
self.log.info('Exit code: %s', self.returncode)
if raiseonerr and self.returncode:
raise subprocess.CalledProcessError(self.returncode, self.argv)
return self.returncode
def _start_pipe_thread(self, result_list, stream, name, do_log=True):
log = log_mgr.get_logger('%s.%s' % (self.logger_name, name))
def read_stream():
for line in stream:
if do_log:
log.info(line.rstrip('\n'))
result_list.append(line)
thread = threading.Thread(target=read_stream)
self.running_threads.add(thread)
thread.start()
return thread
class Host(object): class Host(object):
"""Configuration for an IPA host""" """Representation of a remote IPA host"""
def __init__(self, domain, hostname, role, index): def __init__(self, domain, hostname, role, index):
self.log = log_mgr.get_logger(self)
self.domain = domain self.domain = domain
self.role = role self.role = role
self.index = index self.index = index
@ -43,6 +125,10 @@ class Host(object):
self.hostname = shortname + '.' + self.domain.name self.hostname = shortname + '.' + self.domain.name
self.external_hostname = hostname self.external_hostname = hostname
self.logger_name = '%s.%s.%s' % (
self.__module__, type(self).__name__, shortname)
self.log = log_mgr.get_logger(self.logger_name)
if self.config.ipv6: if self.config.ipv6:
# $(dig +short $M $rrtype|tail -1) # $(dig +short $M $rrtype|tail -1)
stdout, stderr, returncode = ipautil.run( stdout, stderr, returncode = ipautil.run(
@ -64,8 +150,7 @@ class Host(object):
self.env_sh_path = os.path.join(domain.config.test_dir, 'env.sh') self.env_sh_path = os.path.join(domain.config.test_dir, 'env.sh')
self.log = log_mgr.get_logger('%s.%s.%s' % ( self._command_index = 0
self.__module__, type(self).__name__, self.hostname))
def __repr__(self): def __repr__(self):
template = ('<{s.__module__}.{s.__class__.__name__} ' template = ('<{s.__module__}.{s.__class__.__name__} '
@ -99,47 +184,46 @@ class Host(object):
return env return env
def run_command(self, argv, set_env=True, stdin_text=None, def run_command(self, argv, set_env=True, stdin_text=None,
ignore_stdout=False): log_stdout=True, raiseonerr=True):
assert argv """Run the given command on this host
self.log.info('RUN %s', argv)
ssh = self.transport.open_channel('session')
try:
ssh.invoke_shell()
ssh.set_combine_stderr(True)
stdin = ssh.makefile('wb')
stdout = ssh.makefile('rb')
if set_env: Returns a RemoteCommand instance. The command will have already run
stdin.write('. %s\n' % self.env_sh_path) when this method returns, so its stdout_text, stderr_text, and
stdin.write('set -ex\n') returncode attributes will be available.
:param argv: Command to run, as either a Popen-style list, or a string
containing a shell script
:param set_env: If true, env.sh exporting configuration variables will
be sourced before running the command.
:param stdin_text: If given, will be written to the command's stdin
:param log_stdout: If false, standard output will not be logged
(but will still be available as cmd.stdout_text)
:param raiseonerr: If true, an exception will be raised if the command
does not exit with return code 0
"""
command = RemoteCommand(self, argv, index=self._command_index,
log_stdout=log_stdout)
self._command_index += 1
if set_env:
command.stdin.write('. %s\n' % self.env_sh_path)
command.stdin.write('set -e\n')
if isinstance(argv, basestring):
command.stdin.write('(')
command.stdin.write(argv)
command.stdin.write(')')
else:
for arg in argv: for arg in argv:
stdin.write(ipautil.shell_quote(arg)) command.stdin.write(ipautil.shell_quote(arg))
stdin.write(' ') command.stdin.write(' ')
if stdin_text: command.stdin.write(';exit\n')
stdin_filename = os.path.join(self.config.test_dir, 'stdin') if stdin_text:
with self.sftp.open(stdin_filename, 'w') as f: command.stdin.write(stdin_text)
f.write(stdin_text) command.stdin.flush()
stdin.write('<')
stdin.write(stdin_filename) command.wait(raiseonerr=raiseonerr)
else: return command
stdin.write('< /dev/null')
if ignore_stdout:
stdin.write('> /dev/null')
stdin.write('\n')
ssh.shutdown_write()
output = []
for line in stdout:
output.append(line)
self.log.info(' %s', line.strip('\n'))
exit_status = ssh.recv_exit_status()
self.log.info(' -> Exit code %s', exit_status)
if exit_status:
raise RuntimeError('Command %s exited with error code %s' % (
argv[0], exit_status))
return RunResult(''.join(output), exit_status)
finally:
ssh.close()
@property @property
def transport(self): def transport(self):
@ -173,11 +257,13 @@ class Host(object):
self.sftp.chdir(path) self.sftp.chdir(path)
def get_file_contents(self, filename): def get_file_contents(self, filename):
"""Read the named remote file and return the contents as a string"""
self.log.info('READ %s', filename) self.log.info('READ %s', filename)
with self.sftp.open(filename) as f: with self.sftp.open(filename) as f:
return f.read() return f.read()
def put_file_contents(self, filename, contents): def put_file_contents(self, filename, contents):
"""Write the given string to the named remote file"""
self.log.info('WRITE %s', filename) self.log.info('WRITE %s', filename)
with self.sftp.open(filename, 'w') as f: with self.sftp.open(filename, 'w') as f:
return f.write(contents) return f.write(contents)

View File

@ -36,7 +36,7 @@ class TestSimpleReplication(IntegrationTest):
time.sleep(5) time.sleep(5)
result = self.replicas[0].run_command(['ipa', 'user-show', login]) result = self.replicas[0].run_command(['ipa', 'user-show', login])
assert 'User login: %s' % login in result.output assert 'User login: %s' % login in result.stdout_text
def test_user_replication_to_master(self): def test_user_replication_to_master(self):
login = 'testuser2' login = 'testuser2'
@ -48,4 +48,4 @@ class TestSimpleReplication(IntegrationTest):
time.sleep(5) time.sleep(5)
result = self.master.run_command(['ipa', 'user-show', login]) result = self.master.run_command(['ipa', 'user-show', login])
assert 'User login: %s' % login in result.output assert 'User login: %s' % login in result.stdout_text