test_integration: Use python-pytest-multihost

The core integration testing functionality was split into a separate
project. Use this project, and configure it for FreeIPA.

The "mh" (multihost) fixture is made available for integration tests.

Configuration based on environment variables is moved into a separate
module, to ease eventual deprecation.

Reviewed-By: Tomas Babej <tbabej@redhat.com>
This commit is contained in:
Petr Viktorin 2014-11-13 16:23:56 +01:00 committed by Tomas Babej
parent 8822be36d3
commit 74f7d67fd5
15 changed files with 524 additions and 1129 deletions

View File

@ -311,7 +311,7 @@ Requires: pytest >= 2.6
Requires: python-paste
Requires: python-coverage
Requires: python-polib
Requires: python-paramiko >= 1.7.7
Requires: python-pytest-multihost >= 0.2
Conflicts: %{alt_name}-tests
Obsoletes: %{alt_name}-tests < %{version}

View File

@ -25,7 +25,7 @@ import argparse
import json
from ipalib.constants import FQDN
from ipatests.test_integration import config
from ipatests.test_integration import config, env_config
def main(argv):
@ -92,7 +92,8 @@ def main(argv):
import yaml
return yaml.safe_dump(conf.to_dict(), default_flow_style=False)
else:
return config.env_to_script(get_object(conf, args).to_env(**kwargs))
env = get_object(conf, args).to_env(**kwargs)
return env_config.env_to_script(env)
def get_object(conf, args):

View File

@ -248,6 +248,8 @@ class TaskRunner(object):
args = self.get_parser().parse_args(argv)
self.config = config.Config.from_env(os.environ)
if not self.config:
raise EnvironmentError('Multihost environment not configured')
logs_to_collect = {}

View File

@ -24,10 +24,13 @@ import tempfile
import shutil
import pytest
from pytest_multihost import make_multihost_fixture
from ipapython import ipautil
from ipapython.ipa_log_manager import log_mgr
from ipatests.test_integration.config import get_global_config
from ipatests.test_integration import tasks
from ipatests.test_integration.config import Config
from ipatests.test_integration.env_config import get_global_config
log = log_mgr.get_logger(__name__)
@ -147,74 +150,86 @@ def integration_logs(class_integration_logs, request):
@pytest.yield_fixture(scope='class')
def integration_config(request, class_integration_logs):
"""Integration test Config object
def mh(request, class_integration_logs):
"""IPA's multihost fixture object
"""
cls = request.cls
def get_resources(resource_container, resource_str, num_needed):
if len(resource_container) < num_needed:
raise pytest.skip(
'Not enough %s available (have %s, need %s)' %
(resource_str, len(resource_container), num_needed))
return resource_container[:num_needed]
domain_description = {
'type': 'IPA',
'hosts': {
'master': 1,
'replica': cls.num_replicas,
'client': cls.num_replicas,
},
}
domain_description['hosts'].update(
{role: 1 for role in cls.required_extra_roles})
config = get_global_config()
if not config.domains:
raise pytest.skip('Integration testing not configured')
domain_descriptions = [domain_description]
for i in range(cls.num_ad_domains):
domain_descriptions.append({
'type': 'AD',
'hosts': {'ad': 1, 'ad_subdomain': 1},
})
mh = make_multihost_fixture(
request,
domain_descriptions,
config_class=Config,
_config=get_global_config(),
)
config = mh.config
mh.domain = mh.config.domains[0]
[mh.master] = mh.domain.hosts_by_role('master')
mh.replicas = mh.domain.hosts_by_role('replica')
mh.clients = mh.domain.hosts_by_role('client')
cls.logs_to_collect = class_integration_logs
cls.domain = config.domains[0]
# Check that we have enough resources available
cls.master = cls.domain.master
cls.replicas = get_resources(cls.domain.replicas, 'replicas',
cls.num_replicas)
cls.clients = get_resources(cls.domain.clients, 'clients',
cls.num_clients)
cls.ad_domains = get_resources(config.ad_domains, 'AD domains',
cls.num_ad_domains)
# Check that we have all required extra hosts at our disposal
available_extra_roles = [role for domain in cls.get_domains()
for role in domain.extra_roles]
missing_extra_roles = list(set(cls.required_extra_roles) -
set(available_extra_roles))
if missing_extra_roles:
raise pytest.skip("Not all required extra hosts available, "
"missing: %s, available: %s"
% (missing_extra_roles,
available_extra_roles))
def collect_log(host, filename):
log.info('Adding %s:%s to list of logs to collect' %
(host.external_hostname, filename))
class_integration_logs.setdefault(host, []).append(filename)
for host in cls.get_all_hosts():
print config
for host in config.get_all_hosts():
host.add_log_collector(collect_log)
cls.prepare_host(host)
cls.log.info('Preparing host %s', host.hostname)
tasks.prepare_host(host)
try:
cls.install()
except:
cls.uninstall()
raise
setup_class(cls, config)
mh._pytestmh_request.addfinalizer(lambda: teardown_class(cls))
yield config
yield mh.install()
for host in cls.get_all_hosts():
host.remove_log_collector(collect_log)
collect_test_logs(request.node, class_integration_logs, request.config)
try:
cls.uninstall()
finally:
del cls.master
del cls.replicas
del cls.clients
del cls.ad_domains
del cls.domain
def setup_class(cls, config):
"""Add convenience addributes to the test class
This is deprecated in favor of the mh fixture.
To be removed when no more tests using this.
"""
cls.domain = config.domains[0]
cls.master = cls.domain.master
cls.replicas = cls.domain.replicas
cls.clients = cls.domain.clients
cls.ad_domains = config.ad_domains
def teardown_class(cls):
"""Add convenience addributes to the test class
This is deprecated in favor of the mh fixture.
To be removed when no more tests using this.
"""
del cls.master
del cls.replicas
del cls.clients
del cls.ad_domains
del cls.domain

View File

@ -29,7 +29,7 @@ log = log_mgr.get_logger(__name__)
@ordered
@pytest.mark.usefixtures('integration_config')
@pytest.mark.usefixtures('mh')
@pytest.mark.usefixtures('integration_logs')
class IntegrationTest(object):
num_replicas = 0
@ -61,12 +61,7 @@ class IntegrationTest(object):
return [cls.domain] + cls.ad_domains
@classmethod
def prepare_host(cls, host):
cls.log.info('Preparing host %s', host.hostname)
tasks.prepare_host(host)
@classmethod
def install(cls):
def install(cls, mh):
if cls.topology is None:
return
else:
@ -77,7 +72,7 @@ class IntegrationTest(object):
pass
@classmethod
def uninstall(cls):
def uninstall(cls, mh):
tasks.uninstall_master(cls.master)
for replica in cls.replicas:
tasks.uninstall_master(replica)

View File

@ -20,412 +20,110 @@
"""Utilities for configuration of multi-master tests"""
import os
import collections
import random
import json
from ipapython import ipautil
import pytest_multihost.config
from ipapython.dn import DN
from ipapython.ipa_log_manager import log_mgr
from ipatests.test_integration.util import check_config_dict_empty
from ipatests.test_integration.util import TESTHOST_PREFIX
_SettingInfo = collections.namedtuple('Setting', 'name var_name default')
_setting_infos = (
# Directory on which test-specific files will be stored,
_SettingInfo('test_dir', 'IPATEST_DIR', '/root/ipatests'),
class Config(pytest_multihost.config.Config):
extra_init_args = {
'admin_name',
'admin_password',
'dirman_dn',
'dirman_password',
'nis_domain',
'ntp_server',
'ad_admin_name',
'ad_admin_password',
'dns_forwarder',
}
# File with root's private RSA key for SSH (default: ~/.ssh/id_rsa)
_SettingInfo('root_ssh_key_filename', 'IPA_ROOT_SSH_KEY', None),
# SSH password for root (used if root_ssh_key_filename is not set)
_SettingInfo('root_password', 'IPA_ROOT_SSH_PASSWORD', None),
_SettingInfo('admin_name', 'ADMINID', 'admin'),
_SettingInfo('admin_password', 'ADMINPW', 'Secret123'),
_SettingInfo('dirman_dn', 'ROOTDN', 'cn=Directory Manager'),
_SettingInfo('dirman_password', 'ROOTDNPWD', None),
# 8.8.8.8 is probably the best-known public DNS
_SettingInfo('dns_forwarder', 'DNSFORWARD', '8.8.8.8'),
_SettingInfo('nis_domain', 'NISDOMAIN', 'ipatest'),
_SettingInfo('ntp_server', 'NTPSERVER', None),
_SettingInfo('ad_admin_name', 'ADADMINID', 'Administrator'),
_SettingInfo('ad_admin_password', 'ADADMINPW', 'Secret123'),
_SettingInfo('ipv6', 'IPv6SETUP', False),
_SettingInfo('debug', 'IPADEBUG', False),
)
class Config(object):
def __init__(self, **kwargs):
self.log = log_mgr.get_logger(self)
kwargs.setdefault('test_dir', '/root/ipatests')
super(Config, self).__init__(**kwargs)
admin_password = kwargs.get('admin_password') or 'Secret123'
# This unfortunately duplicates information in _setting_infos,
# but is left here for the sake of static analysis.
self.test_dir = kwargs.get('test_dir', '/root/ipatests')
self.root_ssh_key_filename = kwargs.get('root_ssh_key_filename')
self.root_password = kwargs.get('root_password')
self.admin_name = kwargs.get('admin_name') or 'admin'
self.admin_password = admin_password
self.dirman_dn = DN(kwargs.get('dirman_dn') or 'cn=Directory Manager')
self.dirman_password = kwargs.get('dirman_password') or admin_password
self.dns_forwarder = kwargs.get('dns_forwarder') or '8.8.8.8'
self.nis_domain = kwargs.get('nis_domain') or 'ipatest'
self.ntp_server = str(kwargs.get('ntp_server') or (
'%s.pool.ntp.org' % random.randint(0, 3)))
self.ad_admin_name = kwargs.get('ad_admin_name') or 'Administrator'
self.ad_admin_password = kwargs.get('ad_admin_password') or 'Secret123'
self.ipv6 = bool(kwargs.get('ipv6', False))
self.debug = bool(kwargs.get('debug', False))
if not self.root_password and not self.root_ssh_key_filename:
self.root_ssh_key_filename = '~/.ssh/id_rsa'
# 8.8.8.8 is probably the best-known public DNS
self.dns_forwarder = kwargs.get('dns_forwarder') or '8.8.8.8'
self.debug = False
self.domains = []
def get_domain_class(self):
return Domain
def get_logger(self, name):
return log_mgr.get_logger(name)
@property
def ad_domains(self):
return filter(lambda d: d.type == 'AD', self.domains)
@classmethod
def from_dict(cls, dct):
kwargs = {s.name: dct.pop(s.name, s.default) for s in _setting_infos}
self = cls(**kwargs)
for domain_dict in dct.pop('domains'):
self.domains.append(Domain.from_dict(domain_dict, self))
check_config_dict_empty(dct, 'config')
return self
def get_all_hosts(self):
for domain in self.domains:
for host in domain.hosts:
yield host
def to_dict(self):
dct = {'domains': [d.to_dict() for d in self.domains]}
for setting in _setting_infos:
value = getattr(self, setting.name)
if isinstance(value, DN):
value = str(value)
dct[setting.name] = value
return dct
extra_args = self.extra_init_args - {'dirman_dn'}
result = super(Config, self).to_dict(extra_args)
result['dirman_dn'] = str(self.dirman_dn)
return result
@classmethod
def from_env(cls, env):
"""Create a test config from environment variables
from ipatests.test_integration.env_config import config_from_env
return config_from_env(env)
If IPATEST_YAML_CONFIG or IPATEST_JSON_CONFIG is set,
configuration is read from the named file.
For YAML, the PyYAML (python-yaml) library needs to be installed.
Otherwise, configuration is read from various curiously
named environment variables:
See _setting_infos for test-wide settings
MASTER_env1: FQDN of the master
REPLICA_env1: space-separated FQDNs of the replicas
CLIENT_env1: space-separated FQDNs of the clients
AD_env1: space-separated FQDNs of the Active Directories
OTHER_env1: space-separated FQDNs of other hosts
(same for _env2, _env3, etc)
BEAKERREPLICA1_IP_env1: IP address of replica 1 in env 1
(same for MASTER, CLIENT, or any extra defined ROLE)
For each machine that should be accessible to tests via extra roles,
the following environment variable is necessary:
TESTHOST_<role>_env1: FQDN of the machine with the extra role <role>
You can also optionally specify the IP address of the host:
BEAKER<role>_IP_env1: IP address of the machine of the extra role
The framework will try to resolve the hostname to its IP address
if not passed via this environment variable.
Also see env_normalize() for alternate variable names
"""
if 'IPATEST_YAML_CONFIG' in env:
import yaml
with open(env['IPATEST_YAML_CONFIG']) as file:
data = yaml.safe_load(file)
return cls.from_dict(data)
if 'IPATEST_JSON_CONFIG' in env:
with open(env['IPATEST_JSON_CONFIG']) as file:
data = json.load(file)
return cls.from_dict(data)
env_normalize(env)
kwargs = {s.name: env.get(s.var_name, s.default)
for s in _setting_infos}
# $IPv6SETUP needs to be 'TRUE' to enable ipv6
if isinstance(kwargs['ipv6'], basestring):
kwargs['ipv6'] = (kwargs['ipv6'].upper() == 'TRUE')
self = cls(**kwargs)
# Either IPA master or AD can define a domain
domain_index = 1
while (env.get('MASTER_env%s' % domain_index) or
env.get('AD_env%s' % domain_index)):
if env.get('MASTER_env%s' % domain_index):
# IPA domain takes precedence to AD domain in case of conflict
self.domains.append(Domain.from_env(env, self, domain_index,
domain_type='IPA'))
else:
self.domains.append(Domain.from_env(env, self, domain_index,
domain_type='AD'))
domain_index += 1
return self
def to_env(self, simple=True):
"""Convert this test config into environment variables"""
try:
env = collections.OrderedDict()
except AttributeError:
# Older Python versions
env = {}
for setting in _setting_infos:
value = getattr(self, setting.name)
if value in (None, False):
env[setting.var_name] = ''
elif value is True:
env[setting.var_name] = 'TRUE'
else:
env[setting.var_name] = str(value)
for domain in self.domains:
env_suffix = '_env%s' % (self.domains.index(domain) + 1)
env['DOMAIN%s' % env_suffix] = domain.name
env['RELM%s' % env_suffix] = domain.realm
env['BASEDN%s' % env_suffix] = str(domain.basedn)
for role in domain.roles:
hosts = domain.hosts_by_role(role)
prefix = ('' if role in domain.static_roles
else TESTHOST_PREFIX)
hostnames = ' '.join(h.hostname for h in hosts)
env['%s%s%s' % (prefix, role.upper(), env_suffix)] = hostnames
ext_hostnames = ' '.join(h.external_hostname for h in hosts)
env['BEAKER%s%s' % (role.upper(), env_suffix)] = ext_hostnames
ips = ' '.join(h.ip for h in hosts)
env['BEAKER%s_IP%s' % (role.upper(), env_suffix)] = ips
for i, host in enumerate(hosts, start=1):
suffix = '%s%s' % (role.upper(), i)
prefix = ('' if role in domain.static_roles
else TESTHOST_PREFIX)
ext_hostname = host.external_hostname
env['%s%s%s' % (prefix, suffix,
env_suffix)] = host.hostname
env['BEAKER%s%s' % (suffix, env_suffix)] = ext_hostname
env['BEAKER%s_IP%s' % (suffix, env_suffix)] = host.ip
if simple:
# Simple Vars for simplicity and backwards compatibility with older
# tests. This means no _env<NUM> suffix.
if self.domains:
default_domain = self.domains[0]
if default_domain.master:
env['MASTER'] = default_domain.master.hostname
env['BEAKERMASTER'] = default_domain.master.external_hostname
env['MASTERIP'] = default_domain.master.ip
if default_domain.replicas:
env['SLAVE'] = env['REPLICA'] = env['REPLICA_env1']
env['BEAKERSLAVE'] = env['BEAKERREPLICA_env1']
env['SLAVEIP'] = env['BEAKERREPLICA_IP_env1']
if default_domain.clients:
client = default_domain.clients[0]
env['CLIENT'] = client.hostname
env['BEAKERCLIENT'] = client.external_hostname
if len(default_domain.clients) >= 2:
client = default_domain.clients[1]
env['CLIENT2'] = client.hostname
env['BEAKERCLIENT2'] = client.external_hostname
return env
def host_by_name(self, name):
for domain in self.domains:
try:
return domain.host_by_name(name)
except LookupError:
pass
raise LookupError(name)
def to_env(self, **kwargs):
from ipatests.test_integration.env_config import config_to_env
return config_to_env(self, **kwargs)
def env_normalize(env):
"""Fill env variables from alternate variable names
MASTER_env1 <- MASTER
REPLICA_env1 <- REPLICA, SLAVE
CLIENT_env1 <- CLIENT
similarly for BEAKER* variants: BEAKERMASTER1_env1 <- BEAKERMASTER, etc.
CLIENT_env1 gets extended with CLIENT2 or CLIENT2_env1
"""
def coalesce(name, *other_names):
"""If name is not set, set it to first existing env[other_name]"""
if name not in env:
for other_name in other_names:
try:
env[name] = env[other_name]
except KeyError:
pass
else:
return
else:
env[name] = ''
coalesce('MASTER_env1', 'MASTER')
coalesce('REPLICA_env1', 'REPLICA', 'SLAVE')
coalesce('CLIENT_env1', 'CLIENT')
coalesce('BEAKERMASTER1_env1', 'BEAKERMASTER')
coalesce('BEAKERREPLICA1_env1', 'BEAKERREPLICA', 'BEAKERSLAVE')
coalesce('BEAKERCLIENT1_env1', 'BEAKERCLIENT')
def extend(name, name2):
value = env.get(name2)
if value and value not in env[name].split(' '):
env[name] += ' ' + value
extend('CLIENT_env1', 'CLIENT2')
extend('CLIENT_env1', 'CLIENT2_env1')
class Domain(object):
class Domain(pytest_multihost.config.Domain):
"""Configuration for an IPA / AD domain"""
def __init__(self, config, name, domain_type):
self.log = log_mgr.get_logger(self)
self.type = str(domain_type)
self.config = config
self.name = str(name)
self.hosts = []
assert domain_type in ('IPA', 'AD')
self.realm = self.name.upper()
self.basedn = DN(*(('dc', p) for p in name.split('.')))
@property
def roles(self):
return sorted(set(host.role for host in self.hosts))
@property
def static_roles(self):
# Specific roles for each domain type are hardcoded
if self.type == 'IPA':
return ('master', 'replica', 'client', 'other')
else:
elif self.type == 'AD':
return ('ad',)
@property
def extra_roles(self):
return [role for role in self.roles if role not in self.static_roles]
def _roles_from_env(self, env, env_suffix):
for role in self.static_roles:
yield role
# Extra roles are defined via env variables of form TESTHOST_key_envX
roles = set()
for var in sorted(env):
if var.startswith(TESTHOST_PREFIX) and var.endswith(env_suffix):
variable_split = var.split('_')
role_name = '_'.join(variable_split[1:-1])
if (role_name and not role_name[-1].isdigit()):
roles.add(role_name.lower())
for role in sorted(roles):
yield role
@classmethod
def from_dict(cls, dct, config):
from ipatests.test_integration.host import BaseHost
domain_type = dct.pop('type')
assert domain_type in ('IPA', 'AD')
domain_name = dct.pop('name')
self = cls(config, domain_name, domain_type)
for host_dict in dct.pop('hosts'):
host = BaseHost.from_dict(host_dict, self)
self.hosts.append(host)
check_config_dict_empty(dct, 'domain %s' % domain_name)
return self
def to_dict(self):
return {
'type': self.type,
'name': self.name,
'hosts': [h.to_dict() for h in self.hosts],
}
@classmethod
def from_env(cls, env, config, index, domain_type):
from ipatests.test_integration.host import BaseHost
# Roles available in the domain depend on the type of the domain
# Unix machines are added only to the IPA domains, Windows machines
# only to the AD domains
if domain_type == 'IPA':
master_role = 'MASTER'
else:
master_role = 'AD'
raise LookupError(self.type)
env_suffix = '_env%s' % index
def get_host_class(self, host_dict):
from ipatests.test_integration.host import Host, WinHost
master_env = '%s%s' % (master_role, env_suffix)
hostname, dot, domain_name = env[master_env].partition('.')
self = cls(config, domain_name, domain_type)
for role in self._roles_from_env(env, env_suffix):
prefix = '' if role in self.static_roles else TESTHOST_PREFIX
value = env.get('%s%s%s' % (prefix, role.upper(), env_suffix), '')
for host_index, hostname in enumerate(value.split(), start=1):
host = BaseHost.from_env(env, self, hostname, role,
host_index, index)
self.hosts.append(host)
if not self.hosts:
raise ValueError('No hosts defined for %s' % env_suffix)
return self
def to_env(self, **kwargs):
"""Return environment variables specific to this domain"""
env = self.config.to_env(**kwargs)
env['DOMAIN'] = self.name
env['RELM'] = self.realm
env['BASEDN'] = str(self.basedn)
return env
def host_by_role(self, role):
if self.hosts_by_role(role):
return self.hosts_by_role(role)[0]
if self.type == 'IPA':
return Host
elif self.type == 'AD':
return WinHost
else:
raise LookupError(role)
def hosts_by_role(self, role):
return [h for h in self.hosts if h.role == role]
raise LookupError(self.type)
@property
def master(self):
@ -451,17 +149,11 @@ class Domain(object):
def other_hosts(self):
return self.hosts_by_role('other')
def host_by_name(self, name):
for host in self.hosts:
if name in (host.hostname, host.external_hostname, host.shortname):
return host
raise LookupError(name)
@classmethod
def from_env(cls, env, config, index, domain_type):
from ipatests.test_integration.env_config import domain_from_env
return domain_from_env(env, config, index, domain_type)
def env_to_script(env):
return ''.join(['export %s=%s\n' % (key, ipautil.shell_quote(value))
for key, value in env.items()])
def get_global_config():
return Config.from_env(os.environ)
def to_env(self, **kwargs):
from ipatests.test_integration.env_config import domain_to_env
return domain_to_env(self, **kwargs)

View File

@ -0,0 +1,356 @@
# Authors:
# Petr Viktorin <pviktori@redhat.com>
# Tomas Babej <tbabej@redhat.com>
#
# Copyright (C) 2013 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Support for configuring multihost testing via environment variables
This is here to support tests configured for Beaker,
such as the ones at https://github.com/freeipa/tests/
"""
import os
import json
import collections
from ipapython import ipautil
from ipatests.test_integration.config import Config, Domain
TESTHOST_PREFIX = 'TESTHOST_'
_SettingInfo = collections.namedtuple('Setting', 'name var_name default')
_setting_infos = (
# Directory on which test-specific files will be stored,
_SettingInfo('test_dir', 'IPATEST_DIR', '/root/ipatests'),
# File with root's private RSA key for SSH (default: ~/.ssh/id_rsa)
_SettingInfo('ssh_key_filename', 'IPA_ROOT_SSH_KEY', None),
# SSH password for root (used if root_ssh_key_filename is not set)
_SettingInfo('ssh_password', 'IPA_ROOT_SSH_PASSWORD', None),
_SettingInfo('admin_name', 'ADMINID', 'admin'),
_SettingInfo('admin_password', 'ADMINPW', 'Secret123'),
_SettingInfo('dirman_dn', 'ROOTDN', 'cn=Directory Manager'),
_SettingInfo('dirman_password', 'ROOTDNPWD', None),
# 8.8.8.8 is probably the best-known public DNS
_SettingInfo('dns_forwarder', 'DNSFORWARD', '8.8.8.8'),
_SettingInfo('nis_domain', 'NISDOMAIN', 'ipatest'),
_SettingInfo('ntp_server', 'NTPSERVER', None),
_SettingInfo('ad_admin_name', 'ADADMINID', 'Administrator'),
_SettingInfo('ad_admin_password', 'ADADMINPW', 'Secret123'),
_SettingInfo('ipv6', 'IPv6SETUP', False),
_SettingInfo('debug', 'IPADEBUG', False),
)
def get_global_config(env=None):
"""Create a test config from environment variables
If env is None, uses os.environ; otherwise env is an environment dict.
If IPATEST_YAML_CONFIG or IPATEST_JSON_CONFIG is set,
configuration is read from the named file.
For YAML, the PyYAML (python-yaml) library needs to be installed.
Otherwise, configuration is read from various curiously
named environment variables:
See _setting_infos for test-wide settings
MASTER_env1: FQDN of the master
REPLICA_env1: space-separated FQDNs of the replicas
CLIENT_env1: space-separated FQDNs of the clients
AD_env1: space-separated FQDNs of the Active Directories
OTHER_env1: space-separated FQDNs of other hosts
(same for _env2, _env3, etc)
BEAKERREPLICA1_IP_env1: IP address of replica 1 in env 1
(same for MASTER, CLIENT, or any extra defined ROLE)
For each machine that should be accessible to tests via extra roles,
the following environment variable is necessary:
TESTHOST_<role>_env1: FQDN of the machine with the extra role <role>
You can also optionally specify the IP address of the host:
BEAKER<role>_IP_env1: IP address of the machine of the extra role
The framework will try to resolve the hostname to its IP address
if not passed via this environment variable.
Also see env_normalize() for alternate variable names
"""
if env is None:
env = os.environ
env = dict(env)
return config_from_env(env)
def config_from_env(env):
if 'IPATEST_YAML_CONFIG' in env:
import yaml
with open(env['IPATEST_YAML_CONFIG']) as file:
confdict = yaml.safe_load(file)
return Config.from_dict(confdict)
if 'IPATEST_JSON_CONFIG' in env:
with open(env['IPATEST_JSON_CONFIG']) as file:
confdict = json.load(file)
return Config.from_dict(confdict)
env_normalize(env)
kwargs = {s.name: env.get(s.var_name, s.default)
for s in _setting_infos}
kwargs['domains'] = []
# $IPv6SETUP needs to be 'TRUE' to enable ipv6
if isinstance(kwargs['ipv6'], basestring):
kwargs['ipv6'] = (kwargs['ipv6'].upper() == 'TRUE')
config = Config(**kwargs)
# Either IPA master or AD can define a domain
domain_index = 1
while (env.get('MASTER_env%s' % domain_index) or
env.get('AD_env%s' % domain_index)):
if env.get('MASTER_env%s' % domain_index):
# IPA domain takes precedence to AD domain in case of conflict
config.domains.append(domain_from_env(env, config, domain_index,
domain_type='IPA'))
else:
config.domains.append(domain_from_env(env, config, domain_index,
domain_type='AD'))
domain_index += 1
return config
def config_to_env(config, simple=True):
"""Convert this test config into environment variables"""
try:
env = collections.OrderedDict()
except AttributeError:
# Older Python versions
env = {}
for setting in _setting_infos:
value = getattr(config, setting.name)
if value in (None, False):
env[setting.var_name] = ''
elif value is True:
env[setting.var_name] = 'TRUE'
else:
env[setting.var_name] = str(value)
for domain in config.domains:
env_suffix = '_env%s' % (config.domains.index(domain) + 1)
env['DOMAIN%s' % env_suffix] = domain.name
env['RELM%s' % env_suffix] = domain.realm
env['BASEDN%s' % env_suffix] = str(domain.basedn)
for role in domain.roles:
hosts = domain.hosts_by_role(role)
prefix = ('' if role in domain.static_roles
else TESTHOST_PREFIX)
hostnames = ' '.join(h.hostname for h in hosts)
env['%s%s%s' % (prefix, role.upper(), env_suffix)] = hostnames
ext_hostnames = ' '.join(h.external_hostname for h in hosts)
env['BEAKER%s%s' % (role.upper(), env_suffix)] = ext_hostnames
ips = ' '.join(h.ip for h in hosts)
env['BEAKER%s_IP%s' % (role.upper(), env_suffix)] = ips
for i, host in enumerate(hosts, start=1):
suffix = '%s%s' % (role.upper(), i)
prefix = ('' if role in domain.static_roles
else TESTHOST_PREFIX)
ext_hostname = host.external_hostname
env['%s%s%s' % (prefix, suffix,
env_suffix)] = host.hostname
env['BEAKER%s%s' % (suffix, env_suffix)] = ext_hostname
env['BEAKER%s_IP%s' % (suffix, env_suffix)] = host.ip
if simple:
# Simple Vars for simplicity and backwards compatibility with older
# tests. This means no _env<NUM> suffix.
if config.domains:
default_domain = config.domains[0]
if default_domain.master:
env['MASTER'] = default_domain.master.hostname
env['BEAKERMASTER'] = default_domain.master.external_hostname
env['MASTERIP'] = default_domain.master.ip
if default_domain.replicas:
env['SLAVE'] = env['REPLICA'] = env['REPLICA_env1']
env['BEAKERSLAVE'] = env['BEAKERREPLICA_env1']
env['SLAVEIP'] = env['BEAKERREPLICA_IP_env1']
if default_domain.clients:
client = default_domain.clients[0]
env['CLIENT'] = client.hostname
env['BEAKERCLIENT'] = client.external_hostname
if len(default_domain.clients) >= 2:
client = default_domain.clients[1]
env['CLIENT2'] = client.hostname
env['BEAKERCLIENT2'] = client.external_hostname
return env
def env_normalize(env):
"""Fill env variables from alternate variable names
MASTER_env1 <- MASTER
REPLICA_env1 <- REPLICA, SLAVE
CLIENT_env1 <- CLIENT
similarly for BEAKER* variants: BEAKERMASTER1_env1 <- BEAKERMASTER, etc.
CLIENT_env1 gets extended with CLIENT2 or CLIENT2_env1
"""
def coalesce(name, *other_names):
"""If name is not set, set it to first existing env[other_name]"""
if name not in env:
for other_name in other_names:
try:
env[name] = env[other_name]
except KeyError:
pass
else:
return
else:
env[name] = ''
coalesce('MASTER_env1', 'MASTER')
coalesce('REPLICA_env1', 'REPLICA', 'SLAVE')
coalesce('CLIENT_env1', 'CLIENT')
coalesce('BEAKERMASTER1_env1', 'BEAKERMASTER')
coalesce('BEAKERREPLICA1_env1', 'BEAKERREPLICA', 'BEAKERSLAVE')
coalesce('BEAKERCLIENT1_env1', 'BEAKERCLIENT')
def extend(name, name2):
value = env.get(name2)
if value and value not in env[name].split(' '):
env[name] += ' ' + value
extend('CLIENT_env1', 'CLIENT2')
extend('CLIENT_env1', 'CLIENT2_env1')
def domain_from_env(env, config, index, domain_type):
# Roles available in the domain depend on the type of the domain
# Unix machines are added only to the IPA domains, Windows machines
# only to the AD domains
if domain_type == 'IPA':
master_role = 'MASTER'
else:
master_role = 'AD'
env_suffix = '_env%s' % index
master_env = '%s%s' % (master_role, env_suffix)
hostname, dot, domain_name = env[master_env].partition('.')
domain = Domain(config, domain_name, domain_type)
for role in _roles_from_env(domain, env, env_suffix):
prefix = '' if role in domain.static_roles else TESTHOST_PREFIX
value = env.get('%s%s%s' % (prefix, role.upper(), env_suffix), '')
for host_index, hostname in enumerate(value.split(), start=1):
host = host_from_env(env, domain, hostname, role,
host_index, index)
domain.hosts.append(host)
if not domain.hosts:
raise ValueError('No hosts defined for %s' % env_suffix)
return domain
def _roles_from_env(domain, env, env_suffix):
for role in domain.static_roles:
yield role
# Extra roles are defined via env variables of form TESTHOST_key_envX
roles = set()
for var in sorted(env):
if var.startswith(TESTHOST_PREFIX) and var.endswith(env_suffix):
variable_split = var.split('_')
role_name = '_'.join(variable_split[1:-1])
if (role_name and not role_name[-1].isdigit()):
roles.add(role_name.lower())
for role in sorted(roles):
yield role
def domain_to_env(domain, **kwargs):
"""Return environment variables specific to this domain"""
env = domain.config.to_env(**kwargs)
env['DOMAIN'] = domain.name
env['RELM'] = domain.realm
env['BASEDN'] = str(domain.basedn)
return env
def host_from_env(env, domain, hostname, role, index, domain_index):
ip = env.get('BEAKER%s%s_IP_env%s' %
(role.upper(), index, domain_index), None)
external_hostname = env.get(
'BEAKER%s%s_env%s' % (role.upper(), index, domain_index), None)
cls = domain.get_host_class({})
return cls(domain, hostname, role, ip, external_hostname)
def host_to_env(host, **kwargs):
"""Return environment variables specific to this host"""
env = host.domain.to_env(**kwargs)
index = host.domain.hosts.index(host) + 1
domain_index = host.config.domains.index(host.domain) + 1
role = host.role.upper()
if host.role != 'master':
role += str(index)
env['MYHOSTNAME'] = host.hostname
env['MYBEAKERHOSTNAME'] = host.external_hostname
env['MYIP'] = host.ip
prefix = ('' if host.role in host.domain.static_roles
else TESTHOST_PREFIX)
env_suffix = '_env%s' % domain_index
env['MYROLE'] = '%s%s%s' % (prefix, role, env_suffix)
env['MYENV'] = str(domain_index)
return env
def env_to_script(env):
return ''.join(['export %s=%s\n' % (key, ipautil.shell_quote(value))
for key, value in env.items()])

View File

@ -19,93 +19,13 @@
"""Host class for integration testing"""
import os
import socket
import pytest_multihost.host
from ipapython.ipaldap import IPAdmin
from ipapython import ipautil
from ipapython.ipa_log_manager import log_mgr
from ipatests.test_integration import transport
from ipatests.test_integration.util import check_config_dict_empty
from ipatests.test_integration.util import TESTHOST_PREFIX
class BaseHost(object):
class Host(pytest_multihost.host.Host):
"""Representation of a remote IPA host"""
transport_class = None
def __init__(self, domain, hostname, role, ip=None,
external_hostname=None):
self.domain = domain
self.role = str(role)
shortname, dot, ext_domain = hostname.partition('.')
self.shortname = shortname
self.hostname = (hostname[:-1]
if hostname.endswith('.')
else shortname + '.' + self.domain.name)
self.external_hostname = str(external_hostname or hostname)
self.netbios = self.domain.name.split('.')[0].upper()
self.logger_name = '%s.%s.%s' % (
self.__module__, type(self).__name__, shortname)
self.log = log_mgr.get_logger(self.logger_name)
if ip:
self.ip = str(ip)
else:
if self.config.ipv6:
# $(dig +short $M $rrtype|tail -1)
stdout, stderr, returncode = ipautil.run(
['dig', '+short', self.external_hostname, 'AAAA'])
self.ip = stdout.splitlines()[-1].strip()
else:
try:
self.ip = socket.gethostbyname(self.external_hostname)
except socket.gaierror:
self.ip = None
if not self.ip:
raise RuntimeError('Could not determine IP address of %s' %
self.external_hostname)
self.root_password = self.config.root_password
self.root_ssh_key_filename = self.config.root_ssh_key_filename
self.host_key = None
self.ssh_port = 22
self.env_sh_path = os.path.join(domain.config.test_dir, 'env.sh')
self.log_collectors = []
def __str__(self):
template = ('<{s.__class__.__name__} {s.hostname} ({s.role})>')
return template.format(s=self)
def __repr__(self):
template = ('<{s.__module__}.{s.__class__.__name__} '
'{s.hostname} ({s.role})>')
return template.format(s=self)
def add_log_collector(self, collector):
"""Register a log collector for this host"""
self.log_collectors.append(collector)
def remove_log_collector(self, collector):
"""Unregister a log collector"""
self.log_collectors.remove(collector)
@classmethod
def from_env(cls, env, domain, hostname, role, index, domain_index):
ip = env.get('BEAKER%s%s_IP_env%s' %
(role.upper(), index, domain_index), None)
external_hostname = env.get(
'BEAKER%s%s_env%s' % (role.upper(), index, domain_index), None)
return cls._make_host(domain, hostname, role, ip, external_hostname)
@staticmethod
def _make_host(domain, hostname, role, ip, external_hostname):
@ -120,84 +40,6 @@ class BaseHost(object):
return cls(domain, hostname, role, ip, external_hostname)
@classmethod
def from_dict(cls, dct, domain):
if isinstance(dct, basestring):
dct = {'name': dct}
try:
role = dct.pop('role').lower()
except KeyError:
role = domain.static_roles[0]
hostname = dct.pop('name')
if '.' not in hostname:
hostname = '.'.join((hostname, domain.name))
ip = dct.pop('ip', None)
external_hostname = dct.pop('external_hostname', None)
check_config_dict_empty(dct, 'host %s' % hostname)
return cls._make_host(domain, hostname, role, ip, external_hostname)
def to_dict(self):
return {
'name': str(self.hostname),
'ip': self.ip,
'role': self.role,
'external_hostname': self.external_hostname,
}
@property
def config(self):
return self.domain.config
def to_env(self, **kwargs):
"""Return environment variables specific to this host"""
env = self.domain.to_env(**kwargs)
index = self.domain.hosts.index(self) + 1
domain_index = self.config.domains.index(self.domain) + 1
role = self.role.upper()
if self.role != 'master':
role += str(index)
env['MYHOSTNAME'] = self.hostname
env['MYBEAKERHOSTNAME'] = self.external_hostname
env['MYIP'] = self.ip
prefix = ('' if self.role in self.domain.static_roles
else TESTHOST_PREFIX)
env_suffix = '_env%s' % domain_index
env['MYROLE'] = '%s%s%s' % (prefix, role, env_suffix)
env['MYENV'] = str(domain_index)
return env
@property
def transport(self):
try:
return self._transport
except AttributeError:
cls = self.transport_class
if cls:
# transport_class is None in the base class and must be
# set in subclasses.
# Pylint reports that calling None will fail
self._transport = cls(self) # pylint: disable=E1102
else:
raise NotImplementedError('transport class not available')
return self._transport
def get_file_contents(self, filename):
"""Shortcut for transport.get_file_contents"""
return self.transport.get_file_contents(filename)
def put_file_contents(self, filename, contents):
"""Shortcut for transport.put_file_contents"""
self.transport.put_file_contents(filename, contents)
def ldap_connect(self):
"""Return an LDAPClient authenticated to this host as directory manager
"""
@ -208,80 +50,20 @@ class BaseHost(object):
ldap.do_simple_bind(binddn, self.config.dirman_password)
return ldap
def collect_log(self, filename):
for collector in self.log_collectors:
collector(self, filename)
@classmethod
def from_env(cls, env, domain, hostname, role, index, domain_index):
from ipatests.test_integration.env_config import host_from_env
return host_from_env(env, domain, hostname, role, index, domain_index)
def run_command(self, argv, set_env=True, stdin_text=None,
log_stdout=True, raiseonerr=True,
cwd=None):
"""Run the given command on this host
Returns a Shell instance. The command will have already run in the
shell when this method returns, so its stdout_text, stderr_text, and
returncode attributes will be available.
:param argv: Command to run, as either a Popen-style list, or a string
containing a shell script
:param set_env: If true, env.sh exporting configuration variables will
be sourced before running the command.
:param stdin_text: If given, will be written to the command's stdin
:param log_stdout: If false, standard output will not be logged
(but will still be available as cmd.stdout_text)
:param raiseonerr: If true, an exception will be raised if the command
does not exit with return code 0
:param cwd: The working directory for the command
"""
raise NotImplementedError()
def to_env(self, **kwargs):
from ipatests.test_integration.env_config import host_to_env
return host_to_env(self, **kwargs)
class Host(BaseHost):
"""A Unix host"""
transport_class = transport.SSHTransport
def run_command(self, argv, set_env=True, stdin_text=None,
log_stdout=True, raiseonerr=True,
cwd=None):
# This will give us a Bash shell
command = self.transport.start_shell(argv, log_stdout=log_stdout)
# Set working directory
if cwd is None:
cwd = self.config.test_dir
command.stdin.write('cd %s\n' % ipautil.shell_quote(cwd))
# Set the environment
if set_env:
command.stdin.write('. %s\n' %
ipautil.shell_quote(self.env_sh_path))
command.stdin.write('set -e\n')
if isinstance(argv, basestring):
# Run a shell command given as a string
command.stdin.write('(')
command.stdin.write(argv)
command.stdin.write(')')
else:
# Run a command given as a popen-style list (no shell expansion)
for arg in argv:
command.stdin.write(ipautil.shell_quote(arg))
command.stdin.write(' ')
command.stdin.write(';exit\n')
if stdin_text:
command.stdin.write(stdin_text)
command.stdin.flush()
command.wait(raiseonerr=raiseonerr)
return command
class WinHost(BaseHost):
class WinHost(pytest_multihost.host.WinHost):
"""
Representation of a remote Windows host.
This serves as a sketch class once we move from manual preparation of
Active Directory to the automated setup.
"""
pass

View File

@ -34,7 +34,7 @@ from ipaplatform.paths import paths
from ipapython.dn import DN
from ipapython.ipa_log_manager import log_mgr
from ipatests.test_integration import util
from ipatests.test_integration.config import env_to_script
from ipatests.test_integration.env_config import env_to_script
from ipatests.test_integration.host import Host
log = log_mgr.get_logger(__name__)

View File

@ -24,7 +24,6 @@ import base64
import glob
import contextlib
import nose
import pytest
from ipalib import x509
from ipapython import ipautil
@ -67,7 +66,7 @@ def assert_error(result, stderr_text, returncode=None):
class CALessBase(IntegrationTest):
@classmethod
def install(cls):
def install(cls, mh):
super(CALessBase, cls).install()
cls.cert_dir = tempfile.mkdtemp(prefix="ipatest-")
cls.pem_filename = os.path.join(cls.cert_dir, 'root.pem')
@ -108,7 +107,7 @@ class CALessBase(IntegrationTest):
host.transport.put_file(source, dest)
@classmethod
def uninstall(cls):
def uninstall(cls, mh):
# Remove the NSS database
shutil.rmtree(cls.cert_dir)
@ -340,7 +339,7 @@ class CALessBase(IntegrationTest):
class TestServerInstall(CALessBase):
num_replicas = 0
def teardown(self):
def tearDown(self):
self.uninstall_server()
# Remove CA cert in /etc/pki/nssdb, in case of failed (un)install
@ -750,7 +749,7 @@ class TestServerInstall(CALessBase):
class TestReplicaInstall(CALessBase):
num_replicas = 1
def setup(self):
def setUp(self):
# Install the master for every test
self.export_pkcs12('ca1/server')
with open(self.pem_filename, 'w') as f:
@ -759,7 +758,7 @@ class TestReplicaInstall(CALessBase):
result = self.install_server()
assert result.returncode == 0
def teardown(self):
def tearDown(self):
# Uninstall both master and replica
replica = self.replicas[0]
tasks.kinit_admin(self.master)
@ -1162,19 +1161,25 @@ class TestIPACommands(CALessBase):
cls.test_hostname = 'testhost.%s' % cls.master.domain.name
cls.test_service = 'test/%s' % cls.test_hostname
@pytest.mark.parametrize('cmd', (
'cert-status',
'cert-show',
'cert-find',
'cert-revoke',
'cert-remove-hold',
'cert-status'))
def test_cert_commands_unavailable(self, cmd):
def check_ipa_command_not_available(self, command):
"Verify that the given IPA subcommand is not available"
result = self.master.run_command(['ipa', command], raiseonerr=False)
assert_error(result, "ipa: ERROR: unknown command '%s'" % command)
def test_cert_commands_unavailable(self):
for cmd in (
'cert-status',
'cert-show',
'cert-find',
'cert-revoke',
'cert-remove-hold',
'cert-status'):
func = lambda: self.check_ipa_command_not_available(cmd)
func.description = 'Verify that %s command is not available' % cmd
func.test_argument = cmd
yield (func, )
def test_cert_help_unavailable(self):
"Verify that cert plugin help is not available"
result = self.master.run_command(['ipa', 'help', 'cert'],
@ -1241,7 +1246,7 @@ class TestIPACommands(CALessBase):
class TestCertinstall(CALessBase):
@classmethod
def install(cls):
def install(cls, mh):
super(TestCertinstall, cls).install()
cls.export_pkcs12('ca1/server')

View File

@ -35,7 +35,7 @@ class TestForcedClientReenrollment(IntegrationTest):
num_clients = 1
@classmethod
def install(cls):
def install(cls, mh):
super(TestForcedClientReenrollment, cls).install()
tasks.install_master(cls.master)
tasks.install_replica(cls.master, cls.replicas[0], setup_ca=False)
@ -44,11 +44,11 @@ class TestForcedClientReenrollment(IntegrationTest):
'krb5.keytab'
)
def setup(self):
def setUp(self):
tasks.prepare_host(self.clients[0])
tasks.install_client(self.master, self.clients[0])
def teardown(self):
def tearDown(self):
tasks.uninstall_client(self.clients[0])
self.delete_client_host_entry()

View File

@ -27,13 +27,13 @@ from ipatests.util import assert_deepequal
DEFAULT_OUTPUT_DICT = {
"nis_domain": "ipatest",
"test_dir": "/root/ipatests",
"debug": False,
"ad_admin_name": "Administrator",
"ipv6": False,
"root_ssh_key_filename": "~/.ssh/id_rsa",
"ssh_key_filename": "~/.ssh/id_rsa",
"ssh_username": "root",
"admin_name": "admin",
"ad_admin_password": "Secret123",
"root_password": None,
"ssh_password": None,
"dns_forwarder": "8.8.8.8",
"domains": [],
"dirman_dn": "cn=Directory Manager",

View File

@ -33,7 +33,7 @@ class ADTrustBase(IntegrationTest):
optional_extra_roles = ['ad_subdomain']
@classmethod
def install(cls):
def install(cls, mh):
super(ADTrustBase, cls).install()
cls.ad = cls.ad_domains[0].ads[0]
cls.install_adtrust()

View File

@ -1,443 +0,0 @@
# Authors:
# Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2013 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Objects for communicating with remote hosts
This class defines "SSHTransport" as ParamikoTransport (by default), or as
OpenSSHTransport (if Paramiko is not importable, or the IPA_TEST_SSH_TRANSPORT
environment variable is set to "openssh").
"""
import os
import socket
import threading
import subprocess
from contextlib import contextmanager
import errno
from ipapython.ipa_log_manager import log_mgr
from ipatests import util
try:
import paramiko
have_paramiko = True
except ImportError:
have_paramiko = False
class Transport(object):
"""Mechanism for communicating with remote hosts
The Transport can manipulate files on a remote host, and open a Command.
The base class defines an interface that specific subclasses implement.
"""
def __init__(self, host):
self.host = host
self.logger_name = '%s.%s' % (host.logger_name, type(self).__name__)
self.log = log_mgr.get_logger(self.logger_name)
self._command_index = 0
def get_file_contents(self, filename):
"""Read the named remote file and return the contents as a string"""
raise NotImplementedError('Transport.get_file_contents')
def put_file_contents(self, filename, contents):
"""Write the given string to the named remote file"""
raise NotImplementedError('Transport.put_file_contents')
def file_exists(self, filename):
"""Return true if the named remote file exists"""
raise NotImplementedError('Transport.file_exists')
def mkdir(self, path):
"""Make the named directory"""
raise NotImplementedError('Transport.mkdir')
def start_shell(self, argv, log_stdout=True):
"""Start a Shell
:param argv: The command this shell is intended to run (used for
logging only)
:param log_stdout: If false, the stdout will not be logged (useful when
binary output is expected)
Given a `shell` from this method, the caller can then use
``shell.stdin.write()`` to input any command(s), call ``shell.wait()``
to let the command run, and then inspect ``returncode``,
``stdout_text`` or ``stderr_text``.
"""
raise NotImplementedError('Transport.start_shell')
def mkdir_recursive(self, path):
"""`mkdir -p` on the remote host"""
if not self.file_exists(path):
parent_path = os.path.dirname(path)
if path != parent_path:
self.mkdir_recursive(parent_path)
self.mkdir(path)
def get_file(self, remotepath, localpath):
"""Copy a file from the remote host to a local file"""
contents = self.get_file_contents(remotepath)
with open(localpath, 'wb') as local_file:
local_file.write(contents)
def put_file(self, localpath, remotepath):
"""Copy a local file to the remote host"""
with open(localpath, 'rb') as local_file:
contents = local_file.read()
self.put_file_contents(remotepath, contents)
def get_next_command_logger_name(self):
self._command_index += 1
return '%s.cmd%s' % (self.host.logger_name, self._command_index)
class Command(object):
"""A Popen-style object representing a remote command
Instances of this class should only be created via method of a concrete
Transport, such as start_shell.
The standard error and output are handled by this class. They're not
available for file-like reading, and are logged by default.
To make sure reading doesn't stall after one buffer fills up, they are read
in parallel using threads.
After calling wait(), ``stdout_text`` and ``stderr_text`` attributes will
be strings containing the output, and ``returncode`` will contain the
exit code.
"""
def __init__(self, argv, logger_name=None, log_stdout=True):
self.returncode = None
self.argv = argv
self._done = False
if logger_name:
self.logger_name = logger_name
else:
self.logger_name = '%s.%s' % (self.__module__, type(self).__name__)
self.log = log_mgr.get_logger(self.logger_name)
def wait(self, raiseonerr=True):
"""Wait for the remote process to exit
Raises an excption if the exit code is not 0, unless raiseonerr is
true.
"""
if self._done:
return self.returncode
self._end_process()
self._done = True
if raiseonerr and self.returncode:
self.log.error('Exit code: %s', self.returncode)
raise subprocess.CalledProcessError(self.returncode, self.argv)
else:
self.log.debug('Exit code: %s', self.returncode)
return self.returncode
def _end_process(self):
"""Wait until the process exits and output is received, close channel
Called from wait()
"""
raise NotImplementedError()
class ParamikoTransport(Transport):
"""Transport that uses the Paramiko SSH2 library"""
def __init__(self, host):
super(ParamikoTransport, self).__init__(host)
sock = socket.create_connection((host.external_hostname,
host.ssh_port))
self._transport = transport = paramiko.Transport(sock)
transport.connect(hostkey=host.host_key)
if host.root_ssh_key_filename:
self.log.debug('Authenticating with private RSA key')
filename = os.path.expanduser(host.root_ssh_key_filename)
key = paramiko.RSAKey.from_private_key_file(filename)
transport.auth_publickey(username='root', key=key)
elif host.root_password:
self.log.debug('Authenticating with password')
transport.auth_password(username='root',
password=host.root_password)
else:
self.log.critical('No SSH credentials configured')
raise RuntimeError('No SSH credentials configured')
@contextmanager
def sftp_open(self, filename, mode='r'):
"""Context manager that provides a file-like object over a SFTP channel
This provides compatibility with older Paramiko versions.
(In Paramiko 1.10+, file objects from `sftp.open` are directly usable
as context managers).
"""
file = self.sftp.open(filename, mode)
try:
yield file
finally:
file.close()
@property
def sftp(self):
"""Paramiko SFTPClient connected to this host"""
try:
return self._sftp
except AttributeError:
transport = self._transport
self._sftp = paramiko.SFTPClient.from_transport(transport)
return self._sftp
def get_file_contents(self, filename):
"""Read the named remote file and return the contents as a string"""
self.log.debug('READ %s', filename)
with self.sftp_open(filename) as f:
return f.read()
def put_file_contents(self, filename, contents):
"""Write the given string to the named remote file"""
self.log.info('WRITE %s', filename)
with self.sftp_open(filename, 'w') as f:
f.write(contents)
def file_exists(self, filename):
"""Return true if the named remote file exists"""
self.log.debug('STAT %s', filename)
try:
self.sftp.stat(filename)
except IOError, e:
if e.errno == errno.ENOENT:
return False
else:
raise
return True
def mkdir(self, path):
self.log.info('MKDIR %s', path)
self.sftp.mkdir(path)
def start_shell(self, argv, log_stdout=True):
logger_name = self.get_next_command_logger_name()
ssh = self._transport.open_channel('session')
self.log.info('RUN %s', argv)
return SSHCommand(ssh, argv, logger_name=logger_name,
log_stdout=log_stdout)
def get_file(self, remotepath, localpath):
self.log.debug('GET %s', remotepath)
self.sftp.get(remotepath, localpath)
def put_file(self, localpath, remotepath):
self.log.info('PUT %s', remotepath)
self.sftp.put(localpath, remotepath)
class OpenSSHTransport(Transport):
"""Transport that uses the `ssh` binary"""
def __init__(self, host):
super(OpenSSHTransport, self).__init__(host)
self.control_dir = util.TempDir()
self.ssh_argv = self._get_ssh_argv()
# Run a "control master" process. This serves two purposes:
# - Establishes a control socket; other SSHs will connect to it
# and reuse the same connection. This way the slow handshake
# only needs to be done once
# - Writes the host to known_hosts so stderr of "real" connections
# doesn't contain the "unknown host" warning
# Popen closes the stdin pipe when it's garbage-collected, so
# this process will exit when it's no longer needed
command = ['-o', 'ControlMaster=yes', '/usr/bin/cat']
self.control_master = self._run(command, collect_output=False)
def _get_ssh_argv(self):
"""Return the path to SSH and options needed for every call"""
control_file = os.path.join(self.control_dir.path, 'control')
known_hosts_file = os.path.join(self.control_dir.path, 'known_hosts')
argv = ['ssh',
'-l', 'root',
'-o', 'ControlPath=%s' % control_file,
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=%s' % known_hosts_file]
if self.host.root_ssh_key_filename:
key_filename = os.path.expanduser(self.host.root_ssh_key_filename)
argv.extend(['-i', key_filename])
elif self.host.root_password:
self.log.critical('Password authentication not supported')
raise RuntimeError('Password authentication not supported')
else:
self.log.critical('No SSH credentials configured')
raise RuntimeError('No SSH credentials configured')
argv.append(self.host.external_hostname)
self.log.debug('SSH invocation: %s', argv)
return argv
def start_shell(self, argv, log_stdout=True):
self.log.info('RUN %s', argv)
command = self._run(['bash'], argv=argv, log_stdout=log_stdout)
return command
def _run(self, command, log_stdout=True, argv=None, collect_output=True):
"""Run the given command on the remote host
:param command: Command to run (appended to the common SSH invocation)
:param log_stdout: If false, stdout will not be logged
:param argv: Command to log (if different from ``command``
:param collect_output: If false, no output will be collected
"""
if argv is None:
argv = command
logger_name = self.get_next_command_logger_name()
ssh = SSHCallWrapper(self.ssh_argv + list(command))
return SSHCommand(ssh, argv, logger_name, log_stdout=log_stdout,
collect_output=collect_output)
def file_exists(self, path):
self.log.info('STAT %s', path)
cmd = self._run(['ls', path], log_stdout=False)
cmd.wait(raiseonerr=False)
return cmd.returncode == 0
def mkdir(self, path):
self.log.info('MKDIR %s', path)
cmd = self._run(['mkdir', path])
cmd.wait()
def put_file_contents(self, filename, contents):
self.log.info('PUT %s', filename)
cmd = self._run(['tee', filename], log_stdout=False)
cmd.stdin.write(contents)
cmd.wait()
assert cmd.stdout_text == contents
def get_file_contents(self, filename):
self.log.info('GET %s', filename)
cmd = self._run(['cat', filename], log_stdout=False)
cmd.wait(raiseonerr=False)
if cmd.returncode == 0:
return cmd.stdout_text
else:
raise IOError('File %r could not be read' % filename)
class SSHCallWrapper(object):
"""Adapts a /usr/bin/ssh call to the paramiko.Channel interface
This only wraps what SSHCommand needs.
"""
def __init__(self, command):
self.command = command
def invoke_shell(self):
self.command = subprocess.Popen(
self.command,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def makefile(self, mode):
return {
'wb': self.command.stdin,
'rb': self.command.stdout,
}[mode]
def makefile_stderr(self, mode):
assert mode == 'rb'
return self.command.stderr
def shutdown_write(self):
self.command.stdin.close()
def recv_exit_status(self):
return self.command.wait()
def close(self):
return self.command.wait()
class SSHCommand(Command):
"""Command implementation for ParamikoTransport and OpenSSHTranspport"""
def __init__(self, ssh, argv, logger_name, log_stdout=True,
collect_output=True):
super(SSHCommand, self).__init__(argv, logger_name,
log_stdout=log_stdout)
self._stdout_lines = []
self._stderr_lines = []
self.running_threads = set()
self._ssh = ssh
self.log.debug('RUN %s', argv)
self._ssh.invoke_shell()
stdin = self.stdin = self._ssh.makefile('wb')
stdout = self._ssh.makefile('rb')
stderr = self._ssh.makefile_stderr('rb')
if collect_output:
self._start_pipe_thread(self._stdout_lines, stdout, 'out',
log_stdout)
self._start_pipe_thread(self._stderr_lines, stderr, 'err', True)
def _end_process(self, raiseonerr=True):
self._ssh.shutdown_write()
while self.running_threads:
self.running_threads.pop().join()
self.stdout_text = ''.join(self._stdout_lines)
self.stderr_text = ''.join(self._stderr_lines)
self.returncode = self._ssh.recv_exit_status()
self._ssh.close()
def _start_pipe_thread(self, result_list, stream, name, do_log=True):
"""Start a thread that copies lines from ``stream`` to ``result_list``
If do_log is true, also logs the lines under ``name``
The thread is added to ``self.running_threads``.
"""
log = log_mgr.get_logger('%s.%s' % (self.logger_name, name))
def read_stream():
for line in stream:
if do_log:
log.debug(line.rstrip('\n'))
result_list.append(line)
thread = threading.Thread(target=read_stream)
self.running_threads.add(thread)
thread.start()
return thread
if not have_paramiko or os.environ.get('IPA_TEST_SSH_TRANSPORT') == 'openssh':
SSHTransport = OpenSSHTransport
else:
SSHTransport = ParamikoTransport

View File

@ -20,16 +20,6 @@
import time
TESTHOST_PREFIX = 'TESTHOST_'
def check_config_dict_empty(dct, name):
"""Ensure that no keys are left in a configuration dict"""
if dct:
raise ValueError('Extra keys in confuguration for %s: %s' %
(name, ', '.join(dct)))
def run_repeatedly(host, command, assert_zero_rc=True, test=None,
timeout=30, **kwargs):
"""