Add certmonger wait_for_request that uses run_command

Add a little utility function to get the certmonger status
of a request id on a particular host and wait until it is either
failed on the CA or issued (or times out).

Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Mohammad Rizwan Yusuf
2020-02-21 12:06:09 +05:30
committed by Rob Crittenden
parent fe21094c8e
commit 8067954229

View File

@@ -25,7 +25,6 @@ from ipaplatform.osinfo import osinfo
from ipaplatform.paths import paths from ipaplatform.paths import paths
from ipaplatform.tasks import tasks as platformtasks from ipaplatform.tasks import tasks as platformtasks
from ipapython import ipautil from ipapython import ipautil
from ipalib.install.certmonger import wait_for_request
from ipatests.pytest_ipa.integration import tasks from ipatests.pytest_ipa.integration import tasks
from ipatests.pytest_ipa.integration.env_config import get_global_config from ipatests.pytest_ipa.integration.env_config import get_global_config
from ipatests.test_integration.base import IntegrationTest from ipatests.test_integration.base import IntegrationTest
@@ -70,6 +69,25 @@ def server_install_setup(func):
return wrapped return wrapped
def wait_for_request(host, request_id, timeout=120):
for _i in range(0, timeout, 5):
result = host.run_command(
"getcert list -i %s | grep status: | awk '{ print $2 }'" %
request_id
)
state = result.stdout_text.strip()
print("certmonger request is in state %r", state)
if state in ('CA_REJECTED', 'CA_UNREACHABLE', 'CA_UNCONFIGURED',
'NEED_GUIDANCE', 'NEED_CA', 'MONITORING'):
break
time.sleep(5)
else:
raise RuntimeError("request timed out")
return state
class InstallTestBase1(IntegrationTest): class InstallTestBase1(IntegrationTest):
num_replicas = 3 num_replicas = 3
@@ -335,14 +353,14 @@ class TestInstallCA(IntegrationTest):
request_id = re.findall(r'\d+', result.stdout_text) request_id = re.findall(r'\d+', result.stdout_text)
# check if certificate is tracked by certmonger # check if certificate is tracked by certmonger
status = wait_for_request(request_id[0], 300) status = wait_for_request(self.master, request_id[0], 300)
assert status == "MONITORING" assert status == "MONITORING"
# ensure if key and token are re-usable # ensure if key and token are re-usable
cmd_args = ['getcert', 'resubmit', '-i', request_id[0]] cmd_args = ['getcert', 'resubmit', '-i', request_id[0]]
self.master.run_command(cmd_args) self.master.run_command(cmd_args)
status = wait_for_request(request_id[0], 300) status = wait_for_request(self.master, request_id[0], 300)
assert status == "MONITORING" assert status == "MONITORING"