Move function run_repeatedly to tasks module

https://pagure.io/freeipa/issue/6798
Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Milan Kubik <mkubik@redhat.com>
This commit is contained in:
Christian Heimes 2017-03-20 11:40:17 +01:00 committed by Tomas Krizek
parent 8867412adc
commit 8aadd55c93
No known key found for this signature in database
GPG Key ID: 22A2A94B5E49415A
3 changed files with 49 additions and 50 deletions

View File

@ -36,7 +36,6 @@ from ipapython import ipautil
from ipaplatform.paths import paths
from ipapython.dn import DN
from ipapython.ipa_log_manager import log_mgr
from ipatests.test_integration import util
from ipatests.pytest_plugins.integration.env_config import env_to_script
from ipatests.pytest_plugins.integration.host import Host
from ipalib import errors
@ -450,7 +449,7 @@ def install_adtrust(host):
dig_output = '0 100 389 %s.' % host.hostname
dig_test = lambda x: re.search(re.escape(dig_output), x)
util.run_repeatedly(host, dig_command, test=dig_test)
run_repeatedly(host, dig_command, test=dig_test)
def configure_dns_for_trust(master, ad):
@ -481,12 +480,12 @@ def establish_trust_with_ad(master, ad_domain, extra_args=()):
master.run_command(['klist'])
master.run_command(['smbcontrol', 'all', 'debug', '100'])
util.run_repeatedly(master,
['ipa', 'trust-add',
'--type', 'ad', ad_domain,
'--admin', 'Administrator',
'--password'] + list(extra_args),
stdin_text=master.config.ad_admin_password)
run_repeatedly(master,
['ipa', 'trust-add',
'--type', 'ad', ad_domain,
'--admin', 'Administrator',
'--password'] + list(extra_args),
stdin_text=master.config.ad_admin_password)
master.run_command(['smbcontrol', 'all', 'debug', '1'])
clear_sssd_cache(master)
master.run_command(['systemctl', 'restart', 'krb5kdc.service'])
@ -1246,3 +1245,43 @@ def restart_named(*args):
for host in args:
host.run_command(["systemctl", "restart", "named-pkcs11.service"])
time.sleep(20) # give a time to named to be ready (zone loading)
def run_repeatedly(host, command, assert_zero_rc=True, test=None,
timeout=30, **kwargs):
"""
Runs command on host repeatedly until it's finished successfully (returns
0 exit code and its stdout passes the test function).
Returns True if the command was executed succesfully, False otherwise.
This method accepts additional kwargs and passes these arguments
to the actual run_command method.
"""
time_waited = 0
time_step = 2
# Check that the test is a function
if test:
assert callable(test)
while(time_waited <= timeout):
result = host.run_command(command, raiseonerr=False, **kwargs)
return_code_ok = not assert_zero_rc or (result.returncode == 0)
test_ok = not test or test(result.stdout_text)
if return_code_ok and test_ok:
# Command successful
return True
else:
# Command not successful
time.sleep(time_step)
time_waited += time_step
raise AssertionError("Command: {cmd} repeatedly failed {times} times, "
"exceeding the timeout of {timeout} seconds."
.format(cmd=' '.join(command),
times=timeout // time_step,
timeout=timeout))

View File

@ -80,8 +80,8 @@ class ADTrustBase(IntegrationTest):
% dict(idauth=_sid_identifier_authority)
stdout_re = re.escape(' ipaNTSecurityIdentifier: ') + sid_regex
util.run_repeatedly(cls.master, command,
test=lambda x: re.search(stdout_re, x))
tasks.run_repeatedly(cls.master, command,
test=lambda x: re.search(stdout_re, x))
@classmethod
def configure_dns_and_time(cls):

View File

@ -17,51 +17,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import re
from ipaplatform.paths import paths
from ipalib.constants import DEFAULT_CONFIG
def run_repeatedly(host, command, assert_zero_rc=True, test=None,
timeout=30, **kwargs):
"""
Runs command on host repeatedly until it's finished successfully (returns
0 exit code and its stdout passes the test function).
Returns True if the command was executed succesfully, False otherwise.
This method accepts additional kwargs and passes these arguments
to the actual run_command method.
"""
time_waited = 0
time_step = 2
# Check that the test is a function
if test:
assert callable(test)
while(time_waited <= timeout):
result = host.run_command(command, raiseonerr=False, **kwargs)
return_code_ok = not assert_zero_rc or (result.returncode == 0)
test_ok = not test or test(result.stdout_text)
if return_code_ok and test_ok:
# Command successful
return True
else:
# Command not successful
time.sleep(time_step)
time_waited += time_step
raise AssertionError("Command: {cmd} repeatedly failed {times} times, "
"exceeding the timeout of {timeout} seconds."
.format(cmd=' '.join(command),
times=timeout // time_step,
timeout=timeout))
def get_host_ip_with_hostmask(host):
"""