Extend Sub CA replication test

Test more scenarios like replication replica -> master. Verify that master
and replica have all expected certs with correct trust flags and all keys.

See: https://pagure.io/freeipa/issue/7590
See: https://pagure.io/freeipa/issue/7589
Fixes: https://pagure.io/freeipa/issue/7611
Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
This commit is contained in:
Christian Heimes 2018-06-20 11:09:35 +02:00
parent dcaa62f6a4
commit 6896c90eb2
3 changed files with 175 additions and 41 deletions

View File

@ -219,3 +219,14 @@ jobs:
timeout: 3600
topology: *master_1repl_1client
fedora-28/replica_promotion:
requires: [fedora-28/build]
priority: 50
job:
class: RunPytest
args:
build_url: '{fedora-28/build_url}'
test_suite: test_integration/test_replica_promotion.py::TestSubCAkeyReplication
template: *ci-master-f28
timeout: 3600
topology: *master_1repl

View File

@ -205,6 +205,17 @@ def verify_kdc_cert_validity(kdc_cert, ca_certs, realm):
raise ValueError("invalid for realm %s" % realm)
CERT_RE = re.compile(
r'^(?P<nick>.+?)\s+(?P<flags>\w*,\w*,\w*)\s*$'
)
KEY_RE = re.compile(
r'^<\s*(?P<slot>\d+)>'
r'\s+(?P<algo>\w+)'
r'\s+(?P<keyid>[0-9a-z]+)'
r'\s+(?P<nick>.*?)\s*$'
)
class NSSDatabase(object):
"""A general-purpose wrapper around a NSS cert database
@ -465,10 +476,10 @@ class NSSDatabase(object):
# FIXME, this relies on NSS never changing the formatting of certutil
certlist = []
for cert in certs:
match = re.match(r'^(.+?)\s+(\w*,\w*,\w*)\s*$', cert)
match = CERT_RE.match(cert)
if match:
nickname = match.group(1)
trust_flags = parse_trust_flags(match.group(2))
nickname = match.group('nick')
trust_flags = parse_trust_flags(match.group('flags'))
certlist.append((nickname, trust_flags))
return tuple(certlist)
@ -481,10 +492,14 @@ class NSSDatabase(object):
return ()
keylist = []
for line in result.output.splitlines():
mo = re.match(r'^<\s*(\d+)>\s+(\w+)\s+([0-9a-z]+)\s+(.*)$', line)
mo = KEY_RE.match(line)
if mo is not None:
slot, algo, keyid, nick = mo.groups()
keylist.append((int(slot), algo, keyid, nick.strip()))
keylist.append((
int(mo.group('slot')),
mo.group('algo'),
mo.group('keyid'),
mo.group('nick'),
))
return tuple(keylist)
def find_server_certs(self):

View File

@ -17,6 +17,7 @@ from ipatests.pytest_plugins.integration.env_config import get_global_config
from ipalib.constants import (
DOMAIN_LEVEL_0, DOMAIN_LEVEL_1, DOMAIN_SUFFIX_NAME, IPA_CA_NICKNAME)
from ipaplatform.paths import paths
from ipapython import certdb
config = get_global_config()
@ -459,10 +460,6 @@ class TestRenewalMaster(IntegrationTest):
topology = 'star'
num_replicas = 1
@classmethod
def uninstall(cls, mh):
super(TestRenewalMaster, cls).uninstall(mh)
def assertCARenewalMaster(self, host, expected):
""" Ensure there is only one CA renewal master set """
result = host.run_command(["ipa", "config-show"]).stdout_text
@ -509,7 +506,6 @@ class TestRenewalMaster(IntegrationTest):
self.assertCARenewalMaster(replica, replica.hostname)
def test_renewal_master_with_csreplica_manage(self):
master = self.master
replica = self.replicas[0]
@ -593,6 +589,12 @@ class TestReplicaInstallWithExistingEntry(IntegrationTest):
tasks.install_replica(master, replica)
AUTH_ID_RE = re.compile(
'Authority ID: '
'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})'
)
class TestSubCAkeyReplication(IntegrationTest):
"""
Test if subca key replication is not failing.
@ -600,43 +602,145 @@ class TestSubCAkeyReplication(IntegrationTest):
topology = 'line'
num_replicas = 1
SUBCA = 'test_subca'
SUBCA_CN = 'cn=' + SUBCA
SUBCA_DESC = 'subca'
SUBCA_MASTER = 'test_subca_master'
SUBCA_MASTER_CN = 'cn=' + SUBCA_MASTER
SUBCA_REPLICA = 'test_subca_replica'
SUBCA_REPLICA_CN = 'cn=' + SUBCA_REPLICA
PKI_DEBUG_PATH = '/var/log/pki/pki-tomcat/ca/debug'
ERR_MESS = 'Caught exception during cert/key import'
def test_sub_ca_key_replication(self):
SERVER_CERT_NICK = 'Server-Cert cert-pki-ca'
SERVER_KEY_NICK = 'NSS Certificate DB:Server-Cert cert-pki-ca'
EXPECTED_CERTS = {
IPA_CA_NICKNAME: 'CTu,Cu,Cu',
'ocspSigningCert cert-pki-ca': 'u,u,u',
'subsystemCert cert-pki-ca': 'u,u,u',
'auditSigningCert cert-pki-ca': 'u,u,Pu',
SERVER_CERT_NICK: 'u,u,u',
}
def add_subca(self, host, name, subject):
result = host.run_command([
'ipa', 'ca-add', name,
'--subject', subject,
'--desc', self.SUBCA_DESC,
])
auth_id = "".join(re.findall(AUTH_ID_RE, result.stdout_text))
return '{} {}'.format(IPA_CA_NICKNAME, auth_id)
def check_subca(self, host, name, cert_nick):
host.run_command(['ipa', 'ca-show', name])
tasks.run_certutil(
host, ['-L', '-n', cert_nick], paths.PKI_TOMCAT_ALIAS_DIR
)
host.run_command([
paths.CERTUTIL, '-d', paths.PKI_TOMCAT_ALIAS_DIR,
'-f', paths.PKI_TOMCAT_ALIAS_PWDFILE_TXT,
'-K', '-n', cert_nick
])
def get_certinfo(self, host):
result = tasks.run_certutil(
host,
['-L', '-f', paths.PKI_TOMCAT_ALIAS_PWDFILE_TXT],
paths.PKI_TOMCAT_ALIAS_DIR
)
certs = {}
for line in result.stdout_text.splitlines():
mo = certdb.CERT_RE.match(line)
if mo:
certs[mo.group('nick')] = mo.group('flags')
result = tasks.run_certutil(
host,
['-K', '-f', paths.PKI_TOMCAT_ALIAS_PWDFILE_TXT],
paths.PKI_TOMCAT_ALIAS_DIR
)
keys = {}
for line in result.stdout_text.splitlines():
mo = certdb.KEY_RE.match(line)
if mo:
keys[mo.group('nick')] = mo.group('keyid')
return certs, keys
def check_certdb(self, master, replica):
expected_certs = self.EXPECTED_CERTS.copy()
# find and add sub CAs to expected certs
result = master.run_command(
['ipa', 'ca-find', '--desc', self.SUBCA_DESC]
)
for auth_id in re.findall(AUTH_ID_RE, result.stdout_text):
nick = '{} {}'.format(IPA_CA_NICKNAME, auth_id)
expected_certs[nick] = 'u,u,u'
# expected keys, server key has different name
expected_keys = set(expected_certs)
expected_keys.remove(self.SERVER_CERT_NICK)
expected_keys.add(self.SERVER_KEY_NICK)
# get certs and keys from Dogtag's NSSDB
master_certs, master_keys = self.get_certinfo(master)
replica_certs, replica_keys = self.get_certinfo(replica)
assert master_certs == expected_certs
assert replica_certs == expected_certs
assert set(master_keys) == expected_keys
assert set(replica_keys) == expected_keys
# server keys are different
master_server_key = master_keys.pop(self.SERVER_KEY_NICK)
replica_server_key = replica_keys.pop(self.SERVER_KEY_NICK)
assert master_server_key != replica_server_key
# but key ids of other keys are equal
assert master_keys == replica_keys
def check_pki_error(self, host):
pki_log_filename = "{0}.{1}.log".format(
self.PKI_DEBUG_PATH,
time.strftime("%Y-%m-%d")
)
pki_debug_log = host.get_file_contents(
pki_log_filename, encoding='utf-8'
)
# check for cert/key import error message
assert self.ERR_MESS not in pki_debug_log
def test_subca_master(self):
master = self.master
replica = self.replicas[0]
result = master.run_command(['ipa', 'ca-add', self.SUBCA, '--subject',
self.SUBCA_CN])
uuid = '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'
auth_id_re = re.compile('Authority ID: ({})'.format(uuid),
re.IGNORECASE)
auth_id = "".join(re.findall(auth_id_re, result.stdout_text))
cert_nick = '{} {}'.format(IPA_CA_NICKNAME, auth_id)
master_nick = self.add_subca(
master, self.SUBCA_MASTER, self.SUBCA_MASTER_CN
)
# give replication some time
time.sleep(30)
time.sleep(15)
replica.run_command(['ipa-certupdate'])
replica.run_command(['ipa', 'ca-show', self.SUBCA])
self.check_subca(master, self.SUBCA_MASTER, master_nick)
self.check_subca(replica, self.SUBCA_MASTER, master_nick)
self.check_pki_error(replica)
self.check_certdb(master, replica)
tasks.run_certutil(replica, ['-L', '-n', cert_nick],
paths.PKI_TOMCAT_ALIAS_DIR)
def test_subca_replica(self):
master = self.master
replica = self.replicas[0]
pki_log_filename = ("{path}.{date}.log"
.format(path=self.PKI_DEBUG_PATH,
date=time.strftime("%Y-%m-%d")))
pki_debug_log = replica.get_file_contents(pki_log_filename,
encoding='utf-8')
# check for cert/key import error message
assert self.ERR_MESS not in pki_debug_log
replica_nick = self.add_subca(
replica, self.SUBCA_REPLICA, self.SUBCA_REPLICA_CN
)
# give replication some time
time.sleep(15)
# replica.run_command(['ipa-certupdate'])
self.check_subca(replica, self.SUBCA_REPLICA, replica_nick)
self.check_subca(master, self.SUBCA_REPLICA, replica_nick)
self.check_pki_error(master)
self.check_certdb(master, replica)
def test_sign_with_subca_on_replica(self):
master = self.master
@ -645,12 +749,16 @@ class TestSubCAkeyReplication(IntegrationTest):
TEST_KEY_FILE = '/etc/pki/tls/private/test_subca.key'
TEST_CRT_FILE = '/etc/pki/tls/private/test_subca.crt'
caacl_cmd = ['ipa', 'caacl-add-ca', 'hosts_services_caIPAserviceCert',
'--cas', self.SUBCA]
caacl_cmd = [
'ipa', 'caacl-add-ca', 'hosts_services_caIPAserviceCert',
'--cas', self.SUBCA_MASTER
]
master.run_command(caacl_cmd)
request_cmd = [paths.IPA_GETCERT, 'request', '-w', '-k',
TEST_KEY_FILE, '-f', TEST_CRT_FILE, '-X', self.SUBCA]
request_cmd = [
paths.IPA_GETCERT, 'request', '-w', '-k', TEST_KEY_FILE,
'-f', TEST_CRT_FILE, '-X', self.SUBCA_MASTER
]
replica.run_command(request_cmd)
status_cmd = [paths.IPA_GETCERT, 'status', '-v', '-f', TEST_CRT_FILE]
@ -659,7 +767,7 @@ class TestSubCAkeyReplication(IntegrationTest):
ssl_cmd = ['openssl', 'x509', '-text', '-in', TEST_CRT_FILE]
ssl = replica.run_command(ssl_cmd)
assert 'Issuer: CN = {}'.format(self.SUBCA) in ssl.stdout_text
assert 'Issuer: CN = {}'.format(self.SUBCA_MASTER) in ssl.stdout_text
class TestReplicaInstallCustodia(IntegrationTest):