diff --git a/ipatests/test_integration/create_caless_pki.py b/ipatests/test_integration/create_caless_pki.py new file mode 100644 index 000000000..ddad3f96b --- /dev/null +++ b/ipatests/test_integration/create_caless_pki.py @@ -0,0 +1,548 @@ +# Copyright (c) 2015-2017, Jan Cholasta +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + + +import collections +import datetime +import itertools +import os +import os.path +import six + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID +from pyasn1.type import univ, char, namedtype, tag +from pyasn1.codec.der import encoder as der_encoder +from pyasn1.codec.native import decoder as native_decoder + +if six.PY3: + unicode = str + +DAY = datetime.timedelta(days=1) +YEAR = 365 * DAY + +# we get the variables from ca_less test +domain = None +realm = None +server1 = None +server2 = None +client = None +password = None +cert_dir = None + +CertInfo = collections.namedtuple('CertInfo', 'nick key cert counter') + + +class PrincipalName(univ.Sequence): + '''See RFC 4120 for details''' + componentType = namedtype.NamedTypes( + namedtype.NamedType( + 'name-type', + univ.Integer().subtype( + explicitTag=tag.Tag( + tag.tagClassContext, + tag.tagFormatSimple, + 0, + ), + ), + ), + namedtype.NamedType( + 'name-string', + univ.SequenceOf(char.GeneralString()).subtype( + explicitTag=tag.Tag( + tag.tagClassContext, + tag.tagFormatSimple, + 1, + ), + ), + ), + ) + + +class KRB5PrincipalName(univ.Sequence): + '''See RFC 4556 for details''' + componentType = namedtype.NamedTypes( + namedtype.NamedType( + 'realm', + char.GeneralString().subtype( + explicitTag=tag.Tag( + tag.tagClassContext, + tag.tagFormatSimple, + 0, + ), + ), + ), + namedtype.NamedType( + 'principalName', + PrincipalName().subtype( + explicitTag=tag.Tag( + tag.tagClassContext, + tag.tagFormatSimple, + 1, + ), + ), + ), + ) + + +def profile_ca(builder, ca_nick, ca): + now = datetime.datetime.utcnow() + + builder = builder.not_valid_before(now) + builder = builder.not_valid_after(now + 10 * YEAR) + + crl_uri = u'file://{}.crl'.format(os.path.join(cert_dir, ca_nick)) + + builder = builder.add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=True, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=True, + crl_sign=True, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + builder = builder.add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + builder = builder.add_extension( + x509.CRLDistributionPoints([ + x509.DistributionPoint( + full_name=[x509.UniformResourceIdentifier(crl_uri)], + relative_name=None, + crl_issuer=None, + reasons=None, + ), + ]), + critical=False, + ) + + public_key = builder._public_key + + builder = builder.add_extension( + x509.SubjectKeyIdentifier.from_public_key(public_key), + critical=False, + ) + # here we get "ca" only for "ca1/subca" CA + if not ca: + builder = builder.add_extension( + x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key), + critical=False, + ) + else: + ski = ca.cert.extensions.get_extension_for_class( + x509.SubjectKeyIdentifier) + builder = builder.add_extension( + x509.AuthorityKeyIdentifier + .from_issuer_subject_key_identifier(ski), + critical=False, + ) + return builder + + +def profile_server(builder, ca_nick, ca, + warp=datetime.timedelta(days=0), dns_name=None, + badusage=False): + now = datetime.datetime.utcnow() + warp + + builder = builder.not_valid_before(now) + builder = builder.not_valid_after(now + YEAR) + + crl_uri = u'file://{}.crl'.format(os.path.join(cert_dir, ca_nick)) + + builder = builder.add_extension( + x509.CRLDistributionPoints([ + x509.DistributionPoint( + full_name=[x509.UniformResourceIdentifier(crl_uri)], + relative_name=None, + crl_issuer=None, + reasons=None, + ), + ]), + critical=False, + ) + + if dns_name is not None: + builder = builder.add_extension( + x509.SubjectAlternativeName([x509.DNSName(dns_name)]), + critical=False, + ) + + if badusage: + builder = builder.add_extension( + x509.KeyUsage( + digital_signature=False, + content_commitment=False, + key_encipherment=False, + data_encipherment=True, + key_agreement=True, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False + ), + critical=False + ) + + return builder + + +def profile_kdc(builder, ca_nick, ca, + warp=datetime.timedelta(days=0), dns_name=None, + badusage=False): + now = datetime.datetime.utcnow() + warp + + builder = builder.not_valid_before(now) + builder = builder.not_valid_after(now + YEAR) + + crl_uri = u'file://{}.crl'.format(os.path.join(cert_dir, ca_nick)) + + builder = builder.add_extension( + x509.ExtendedKeyUsage([x509.ObjectIdentifier('1.3.6.1.5.2.3.5')]), + critical=False, + ) + + name = { + 'realm': realm, + 'principalName': { + 'name-type': 2, + 'name-string': ['krbtgt', realm], + }, + } + name = native_decoder.decode(name, asn1Spec=KRB5PrincipalName()) + name = der_encoder.encode(name) + + names = [x509.OtherName(x509.ObjectIdentifier('1.3.6.1.5.2.2'), name)] + if dns_name is not None: + names += [x509.DNSName(dns_name)] + + builder = builder.add_extension( + x509.SubjectAlternativeName(names), + critical=False, + ) + + builder = builder.add_extension( + x509.CRLDistributionPoints([ + x509.DistributionPoint( + full_name=[x509.UniformResourceIdentifier(crl_uri)], + relative_name=None, + crl_issuer=None, + reasons=None, + ), + ]), + critical=False, + ) + + if badusage: + builder = builder.add_extension( + x509.KeyUsage( + digital_signature=False, + content_commitment=False, + key_encipherment=False, + data_encipherment=True, + key_agreement=True, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False + ), + critical=False + ) + + return builder + + +def gen_cert(profile, nick_base, subject, ca=None, **kwargs): + key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend(), + ) + public_key = key.public_key() + + counter = itertools.count(1) + + if ca is not None: + ca_nick, ca_key, ca_cert, ca_counter = ca + nick = os.path.join(ca_nick, nick_base) + issuer = ca_cert.subject + else: + nick = ca_nick = nick_base + ca_key = key + ca_counter = counter + issuer = subject + + serial = next(ca_counter) + + builder = x509.CertificateBuilder() + builder = builder.serial_number(serial) + builder = builder.issuer_name(issuer) + builder = builder.subject_name(subject) + builder = builder.public_key(public_key) + builder = profile(builder, ca_nick, ca, **kwargs) + + cert = builder.sign( + private_key=ca_key, + algorithm=hashes.SHA256(), + backend=default_backend(), + ) + + key_pem = key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.BestAvailableEncryption(password.encode()), + ) + cert_pem = cert.public_bytes(serialization.Encoding.PEM) + try: + os.makedirs(os.path.dirname(os.path.join(cert_dir, nick))) + except OSError: + pass + with open(os.path.join(cert_dir, nick + '.key'), 'wb') as f: + f.write(key_pem) + with open(os.path.join(cert_dir, nick + '.crt'), 'wb') as f: + f.write(cert_pem) + + return CertInfo(nick, key, cert, counter) + + +def revoke_cert(ca, serial): + now = datetime.datetime.utcnow() + + crl_builder = x509.CertificateRevocationListBuilder() + crl_builder = crl_builder.issuer_name(ca.cert.subject) + crl_builder = crl_builder.last_update(now) + crl_builder = crl_builder.next_update(now + DAY) + + crl_filename = os.path.join(cert_dir, ca.nick + '.crl') + + try: + f = open(crl_filename, 'rb') + except IOError: + pass + else: + with f: + crl_pem = f.read() + + crl = x509.load_pem_x509_crl(crl_pem, default_backend()) + + for revoked_cert in crl: + crl_builder = crl_builder.add_revoked_certificate(revoked_cert) + + builder = x509.RevokedCertificateBuilder() + builder = builder.serial_number(serial) + builder = builder.revocation_date(now) + + revoked_cert = builder.build(default_backend()) + + crl_builder = crl_builder.add_revoked_certificate(revoked_cert) + + crl = crl_builder.sign( + private_key=ca.key, + algorithm=hashes.SHA256(), + backend=default_backend(), + ) + + crl_pem = crl.public_bytes(serialization.Encoding.PEM) + + with open(crl_filename, 'wb') as f: + f.write(crl_pem) + + +def gen_server_certs(nick_base, hostname, org, ca=None): + gen_cert(profile_server, nick_base, + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.COMMON_NAME, hostname) + ]), + ca + ) + gen_cert(profile_server, nick_base + u'-badname', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.COMMON_NAME, u'not-' + hostname) + ]), + ca + ) + gen_cert(profile_server, nick_base + u'-altname', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.COMMON_NAME, u'alt-' + hostname) + ]), + ca, dns_name=hostname + ) + gen_cert(profile_server, nick_base + u'-expired', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, + u'Expired'), + x509.NameAttribute(NameOID.COMMON_NAME, hostname) + ]), + ca, warp=-2 * YEAR + ) + gen_cert(profile_server, nick_base + u'-badusage', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, + u'Bad Usage'), + x509.NameAttribute(NameOID.COMMON_NAME, hostname) + ]), + ca, badusage=True + ) + revoked = gen_cert(profile_server, nick_base + u'-revoked', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, + u'Revoked'), + x509.NameAttribute(NameOID.COMMON_NAME, hostname) + ]), + ca + ) + revoke_cert(ca, revoked.cert.serial_number) + + +def gen_kdc_certs(nick_base, hostname, org, ca=None): + gen_cert(profile_kdc, nick_base + u'-kdc', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'KDC'), + x509.NameAttribute(NameOID.COMMON_NAME, hostname) + ]), + ca + ) + gen_cert(profile_kdc, nick_base + u'-kdc-badname', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'KDC'), + x509.NameAttribute(NameOID.COMMON_NAME, u'not-' + hostname) + ]), + ca + ) + gen_cert(profile_kdc, nick_base + u'-kdc-altname', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'KDC'), + x509.NameAttribute(NameOID.COMMON_NAME, u'alt-' + hostname) + ]), + ca, dns_name=hostname + ) + gen_cert(profile_kdc, nick_base + u'-kdc-expired', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, + u'Expired KDC'), + x509.NameAttribute(NameOID.COMMON_NAME, hostname) + ]), + ca, warp=-2 * YEAR + ) + gen_cert(profile_kdc, nick_base + u'-kdc-badusage', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, + u'Bad Usage KDC'), + x509.NameAttribute(NameOID.COMMON_NAME, hostname) + ]), + ca, badusage=True + ) + revoked = gen_cert(profile_kdc, nick_base + u'-kdc-revoked', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, + u'Revoked KDC'), + x509.NameAttribute(NameOID.COMMON_NAME, hostname) + ]), + ca + ) + revoke_cert(ca, revoked.cert.serial_number) + + +def gen_subtree(nick_base, org, ca=None): + subca = gen_cert(profile_ca, nick_base, + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.COMMON_NAME, u'CA') + ]), + ca + ) + gen_cert(profile_server, u'wildcard', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.COMMON_NAME, u'*.' + domain) + ]), + subca + ) + gen_server_certs(u'server', server1, org, subca) + gen_server_certs(u'replica', server2, org, subca) + gen_server_certs(u'client', client, org, subca) + gen_cert(profile_kdc, u'kdcwildcard', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, org), + x509.NameAttribute(NameOID.COMMON_NAME, u'*.' + domain) + ]), + subca + ) + gen_kdc_certs(u'server', server1, org, subca) + gen_kdc_certs(u'replica', server2, org, subca) + gen_kdc_certs(u'client', client, org, subca) + return subca + + +def create_pki(): + + gen_cert(profile_server, u'server-selfsign', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Self-signed'), + x509.NameAttribute(NameOID.COMMON_NAME, server1) + ]) + ) + gen_cert(profile_server, u'replica-selfsign', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Self-signed'), + x509.NameAttribute(NameOID.COMMON_NAME, server2) + ]) + ) + gen_cert(profile_server, u'noca', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'No-CA'), + x509.NameAttribute(NameOID.COMMON_NAME, server1) + ]) + ) + gen_cert(profile_kdc, u'server-kdc-selfsign', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Self-signed'), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'KDC'), + x509.NameAttribute(NameOID.COMMON_NAME, server1) + ]) + ) + gen_cert(profile_kdc, u'replica-kdc-selfsign', + x509.Name([ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Self-signed'), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'KDC'), + x509.NameAttribute(NameOID.COMMON_NAME, server2) + ]) + ) + ca1 = gen_subtree(u'ca1', u'Example Organization') + gen_subtree(u'subca', u'Subsidiary Example Organization', ca1) + gen_subtree(u'ca2', u'Other Example Organization') + ca3 = gen_subtree(u'ca3', u'Unknown Organization') + os.unlink(os.path.join(cert_dir, ca3.nick + '.key')) + os.unlink(os.path.join(cert_dir, ca3.nick + '.crt')) diff --git a/ipatests/test_integration/scripts/caless-create-pki b/ipatests/test_integration/scripts/caless-create-pki deleted file mode 100644 index dbcdb3e60..000000000 --- a/ipatests/test_integration/scripts/caless-create-pki +++ /dev/null @@ -1,188 +0,0 @@ -#!/bin/bash -e -# -# Copyright (C) 2017 FreeIPA Contributors see COPYING for license -# - -profile_ca_request_options=(-1 -2 -4) -profile_ca_request_input="\$'0\n1\n5\n6\n9\ny\ny\n\ny\n1\n7\nfile://'\$(readlink -f \$dbdir)/\$ca.crl\$'\n-1\n-1\n-1\nn\nn\n'" -profile_ca_create_options=(-v 120) -profile_ca_add_options=(-t CT,C,C) -profile_server_request_options=(-4) -profile_server_request_input="\$'1\n7\nfile://'\$(readlink -f \$dbdir)/\$ca.crl\$'\n-1\n-1\n-1\nn\nn\n'" -profile_server_create_options=(-v 12) -profile_server_add_options=(-t ,,) - -write_chain() { - local nick="$1" - - chain=`certutil -O -d $dbdir -n "$nick" | - sed -e '/^\s*$/d' -e "s/\s*\"\(.*\)\" \[.*/\1/g"` - - while read -r name; do - # OpenSSL requires a reverse order to what we get from NSS - echo -e "`certutil -L -d "$dbdir" -n "$name" -a`\n`cat $dbdir/$nick.pem` - " > "$dbdir/$nick.pem" - done <<< "$chain" -} - -gen_cert() { - local profile="$1" nick="$2" subject="$3" ca request_options request_input create_options serial add_options pwfile noise csr crt - shift 3 - - echo "gen_cert(profile=$profile nick=$nick subject=$subject)" - - ca="$(dirname $nick)" - if [ "$ca" = "." ]; then - ca="$nick" - fi - - eval "request_options=(\"\${profile_${profile}_request_options[@]}\")" - eval "eval request_input=\"\${profile_${profile}_request_input}\"" - - eval "create_options=(\"\${profile_${profile}_create_options[@]}\")" - if [ "$ca" = "$nick" ]; then - create_options=("${create_options[@]}" -x -m 1) - else - eval "serial_${ca//\//_}=\$((\${serial_${ca//\//_}:-1}+1))" - eval "serial=\$serial_${ca//\//_}" - create_options=("${create_options[@]}" -c "$ca" -m "$serial") - fi - - eval "add_options=(\"\${profile_${profile}_add_options[@]}\")" - - pwfile="$(mktemp)" - echo "$dbpassword" >"$pwfile" - - noise="$(mktemp)" - head -c 20 /dev/urandom >"$noise" - - if [ ! -d "$dbdir" ]; then - mkdir "$dbdir" - certutil -N -d "$dbdir" -f "$pwfile" - fi - - csr="$(mktemp)" - crt="$(mktemp)" - - certutil -R -d "$dbdir" -s "$subject" -f "$pwfile" -z "$noise" -o "$csr" "${request_options[@]}" >/dev/null <<<"$request_input" - certutil -C -d "$dbdir" -f "$pwfile" -i "$csr" -o "$crt" "${create_options[@]}" "$@" - certutil -A -d "$dbdir" -n "$nick" -f "$pwfile" -i "$crt" "${add_options[@]}" - - mkdir -p "$(dirname $dbdir/$nick.pem)" - write_chain "$nick" - pk12util -o "$dbdir/$nick.p12" -n "$nick" -d "$dbdir" -k "$pwfile" -w "$pwfile" - - rm -f "$pwfile" "$noise" "$csr" "$crt" -} - -revoke_cert() { - local nick="$1" ca pwfile serial - shift 1 - - echo "revoke_cert(nick=$nick)" - - ca="$(dirname $nick)" - if [ "$ca" = "." ]; then - ca="$nick" - fi - - pwfile="$(mktemp)" - echo "$dbpassword" >"$pwfile" - - if ! crlutil -L -d "$dbdir" -n "$ca" &>/dev/null; then - crlutil -G -d "$dbdir" -n "$ca" -c /dev/null -f "$pwfile" - fi - - sleep 1 - - mkdir -p "$(dirname $dbdir/$ca.crl)" - serial=$(certutil -L -d "$dbdir" -n "$nick" | awk '/^\s+Serial Number: / { print $3 }') - crlutil -M -d "$dbdir" -n "$ca" -c /dev/stdin -f "$pwfile" -o "$dbdir/$ca.crl" < "$dbdir/ext.kdc" -} - -gen_pkinit_cert() { - local nick="$1" subj="$2" outname="$3" - shift 3 - - openssl genrsa -out "$dbdir/$nick/kdc.key" 2048 > /dev/null - openssl req -new -out "$dbdir/$nick/kdc.req" -key "$dbdir/$nick/kdc.key" \ - -subj "$subj" - - openssl pkcs12 -in "$dbdir/$nick.p12" -passin "pass:$dbpassword" \ - -nodes -nocerts -out "$dbdir/$nick.key" > /dev/null - - openssl x509 -req -in "$dbdir/$nick/kdc.req" \ - -CAkey "$dbdir/$nick.key" -CA "$dbdir/$nick.pem" \ - -out "$dbdir/$nick/kdc.crt" -days 365 \ - -extfile "$dbdir/ext.kdc" -extensions kdc_cert -CAcreateserial > /dev/null - - rm "$dbdir/$nick/kdc.req" - - openssl pkcs12 -export -in "$dbdir/$nick/kdc.crt" \ - -inkey "$dbdir/$nick/kdc.key" -password "pass:$dbpassword" \ - -out "$dbdir/$nick/$outname.p12" -chain -CAfile "$dbdir/$nick.pem" -} - -gen_subtree() { - local nick="$1" org="$2" - shift 2 - - echo "gen_subtree(nick=$nick org=$org)" - - gen_cert ca "$nick" "CN=CA,O=$org" "$@" - gen_cert server "$nick/wildcard" "CN=*.$domain,O=$org" - gen_pkinit_cert "$nick" "/O=$realm/CN=$server1" "pkinit-server" - gen_pkinit_cert "$nick" "/O=$realm/CN=$server2" "pkinit-replica" - gen_server_certs "$nick/server" "$server1" "$org" - gen_server_certs "$nick/replica" "$server2" "$org" - gen_server_certs "$nick/client" "$client" "$org" -} - -gen_cert server server-selfsign "CN=$server1,O=Self-signed" -gen_cert server replica-selfsign "CN=$server2,O=Self-signed" -gen_pkinit_extensions -gen_cert server noca "CN=$server1,O=No-CA" -gen_subtree ca1 'Example Organization' -gen_subtree ca1/subca 'Subsidiary Example Organization' -gen_subtree ca2 'Other Example Organization' -gen_subtree ca3 'Unknown Organization' -certutil -D -d "$dbdir" -n ca3 diff --git a/ipatests/test_integration/test_caless.py b/ipatests/test_integration/test_caless.py index 8ce20cb9a..8e26d1a39 100644 --- a/ipatests/test_integration/test_caless.py +++ b/ipatests/test_integration/test_caless.py @@ -26,15 +26,20 @@ import glob import contextlib import nose import pytest +import six from ipalib import x509 from ipapython import ipautil from ipaplatform.paths import paths from ipapython.dn import DN from ipatests.test_integration.base import IntegrationTest +from ipatests.test_integration import create_caless_pki from ipatests.pytest_plugins.integration import tasks from ipalib.constants import DOMAIN_LEVEL_0 +if six.PY3: + unicode = str + logger = logging.getLogger(__name__) _DEFAULT = object() @@ -114,11 +119,7 @@ class CALessBase(IntegrationTest): def install(cls, mh): cls.cert_dir = tempfile.mkdtemp(prefix="ipatest-") cls.pem_filename = os.path.join(cls.cert_dir, 'root.pem') - scriptfile = os.path.join(os.path.dirname(__file__), - 'scripts', - 'caless-create-pki') cls.cert_password = cls.master.config.admin_password - cls.crl_path = os.path.join(cls.master.config.test_dir, 'crl') if cls.replicas: @@ -129,26 +130,25 @@ class CALessBase(IntegrationTest): client_hostname = cls.clients[0].hostname else: client_hostname = 'unused-client.test' - cls.env = { - 'domain': cls.master.domain.name, - 'realm': cls.master.domain.name.upper(), - 'server1': cls.master.hostname, - 'server2': replica_hostname, - 'client': client_hostname, - 'dbdir': 'nssdb', - 'dbpassword': cls.cert_password, - 'crl_path': cls.crl_path, - 'dirman_password': cls.master.config.dirman_password, - } - ipautil.run(['bash', '-ex', scriptfile], cwd=cls.cert_dir, env=cls.env) + + create_caless_pki.domain = unicode(cls.master.domain.name) + create_caless_pki.realm = unicode(cls.master.domain.name.upper()) + create_caless_pki.server1 = unicode(cls.master.hostname) + create_caless_pki.server2 = unicode(replica_hostname) + create_caless_pki.client = unicode(client_hostname) + create_caless_pki.password = unicode(cls.master.config.dirman_password) + create_caless_pki.cert_dir = unicode(cls.cert_dir) + + # here we generate our certificates (not yet converted to .p12) + logger.info('Generating certificates to %s', cls.cert_dir) + create_caless_pki.create_pki() for host in cls.get_all_hosts(): tasks.apply_common_fixes(host) # Copy CRLs over - base = os.path.join(cls.cert_dir, 'nssdb') host.transport.mkdir_recursive(cls.crl_path) - for source in glob.glob(os.path.join(base, '*.crl')): + for source in glob.glob(os.path.join(cls.cert_dir, '*.crl')): dest = os.path.join(cls.crl_path, os.path.basename(source)) host.transport.put_file(source, dest) @@ -166,7 +166,7 @@ class CALessBase(IntegrationTest): http_pkcs12_exists=True, dirsrv_pkcs12_exists=True, http_pin=_DEFAULT, dirsrv_pin=_DEFAULT, pkinit_pin=None, root_ca_file='root.pem', pkinit_pkcs12_exists=False, - pkinit_pkcs12='pkinit-server.p12', unattended=True, + pkinit_pkcs12='server-kdc.p12', unattended=True, stdin_text=None): """Install a CA-less server @@ -192,8 +192,8 @@ class CALessBase(IntegrationTest): files_to_copy.append(http_pkcs12) if dirsrv_pkcs12_exists: files_to_copy.append(dirsrv_pkcs12) - if pkinit_pkcs12_exists: + files_to_copy.append(pkinit_pkcs12) extra_args.extend(['--pkinit-cert-file', pkinit_pkcs12]) else: extra_args.append('--no-pkinit') @@ -220,19 +220,14 @@ class CALessBase(IntegrationTest): def copy_cert(cls, host, filename): host.transport.put_file(os.path.join(cls.cert_dir, filename), os.path.join(host.config.test_dir, filename)) - @classmethod - def copy_pkinit_cert(cls, host, pkinit_nick): - filename = pkinit_nick.split('/')[-1] - host.transport.put_file(os.path.join(cls.cert_dir, 'nssdb', pkinit_nick), - os.path.join(host.config.test_dir, filename)) - def prepare_replica(self, _replica_number=0, replica=None, master=None, http_pkcs12='replica.p12', dirsrv_pkcs12='replica.p12', http_pkcs12_exists=True, dirsrv_pkcs12_exists=True, - http_pin=_DEFAULT, dirsrv_pin=_DEFAULT, pkinit_pin=None, - root_ca_file='root.pem', pkinit_pkcs12_exists=False, - pkinit_pkcs12='pkinit-replica.p12', unattended=True, + http_pin=_DEFAULT, dirsrv_pin=_DEFAULT, + pkinit_pin=None, root_ca_file='root.pem', + pkinit_pkcs12_exists=False, + pkinit_pkcs12='replica-kdc.p12', unattended=True, stdin_text=None, domain_level=None): """Prepare a CA-less replica @@ -259,6 +254,8 @@ class CALessBase(IntegrationTest): files_to_copy.append(http_pkcs12) if dirsrv_pkcs12_exists: files_to_copy.append(dirsrv_pkcs12) + if pkinit_pkcs12_exists: + files_to_copy.append(pkinit_pkcs12) if domain_level == DOMAIN_LEVEL_0: destination_host = master else: @@ -309,23 +306,49 @@ class CALessBase(IntegrationTest): 'replica-info.gpg') @classmethod - def export_pkcs12(cls, nickname, filename='server.p12', password=None): - """Export a cert as PKCS#12 to the given file""" + def create_pkcs12(cls, nickname, filename='server.p12', password=None): + """Create a cert chain and generate pkcs12 cert""" if password is None: password = cls.cert_password - ipautil.run(['pk12util', - '-o', filename, - '-n', nickname, - '-d', 'nssdb', - '-K', cls.cert_password, - '-W', password], cwd=cls.cert_dir) + + fname_chain = [] + + key_fname = '{}.key'.format(os.path.join(cls.cert_dir, nickname)) + certchain_fname = '{}.pem'.format(os.path.join(cls.cert_dir, nickname)) + + nick_chain = nickname.split('/') + + # to construct whole chain e.g "ca1 - ca1/sub - ca1/sub/server" + for index, _value in enumerate(nick_chain): + cert_nick = '/'.join(nick_chain[:index+1]) + cert_path = '{}.crt'.format(os.path.join(cls.cert_dir, cert_nick)) + if os.path.isfile(cert_path): + fname_chain.append(cert_path) + + # create the chain file + with open(certchain_fname, 'w') as chain: + for cert_fname in fname_chain: + with open(cert_fname) as cert: + chain.write(cert.read()) + + ipautil.run(["openssl", "pkcs12", "-export", "-out", filename, + "-inkey", key_fname, "-in", certchain_fname, "-passin", + "pass:"+cls.cert_password, "-passout", "pass:"+password, + "-name", nickname], cwd=cls.cert_dir) + + @classmethod + def prepare_cacert(cls, nickname): + """ Prepare pem file for root_ca_file/ca-cert-file option """ + # create_caless_pki saves certificates with ".crt" extension by default + fname_from_nick = '{}.crt'.format(os.path.join(cls.cert_dir, nickname)) + shutil.copy(fname_from_nick, cls.pem_filename) @classmethod def get_pem(cls, nickname): - result = ipautil.run( - [paths.CERTUTIL, '-L', '-d', 'nssdb', '-n', nickname, '-a'], - cwd=cls.cert_dir, capture_output=True) - return result.output + """ Return PEM cert as base64 encoded ascii for TestIPACommands """ + cacert_fname = '{}.crt'.format(os.path.join(cls.cert_dir, nickname)) + with open(cacert_fname, 'r') as f: + return f.read() def verify_installation(self): """Verify CA cert PEM file and LDAP entry created by install @@ -343,6 +366,7 @@ class CALessBase(IntegrationTest): for host in [self.master] + self.replicas: # Check the LDAP entry ldap = host.ldap_connect() + entry = ldap.get_entry(DN(('cn', 'CACert'), ('cn', 'ipa'), ('cn', 'etc'), host.domain.basedn)) cert_from_ldap = entry.single_value['cACertificate'] @@ -372,9 +396,8 @@ class TestServerInstall(CALessBase): def test_nonexistent_ca_pem_file(self): "IPA server install with non-existent CA PEM file " - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca2')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca2') result = self.install_server(root_ca_file='does_not_exist') assert_error(result, @@ -385,9 +408,8 @@ class TestServerInstall(CALessBase): def test_unknown_ca(self): "IPA server install with CA PEM file with unknown CA certificate" - self.export_pkcs12('ca3/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca2')) + self.create_pkcs12('ca3/server') + self.prepare_cacert('ca2') result = self.install_server() assert_error(result, @@ -397,9 +419,8 @@ class TestServerInstall(CALessBase): def test_ca_server_cert(self): "IPA server install with CA PEM file with server certificate" - self.export_pkcs12('noca') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('noca')) + self.create_pkcs12('noca') + self.prepare_cacert('noca') result = self.install_server() assert_error(result, @@ -410,10 +431,9 @@ class TestServerInstall(CALessBase): def test_ca_2_certs(self): "IPA server install with CA PEM file with 2 certificates" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) - f.write(self.get_pem('ca2')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') + self.prepare_cacert('ca2') result = self.install_server() assert_error(result, 'root.pem contains more than one certificate') @@ -422,9 +442,8 @@ class TestServerInstall(CALessBase): def test_nonexistent_http_pkcs12_file(self): "IPA server install with non-existent HTTP PKCS#12 file" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='does_not_exist', http_pkcs12_exists=False) @@ -434,9 +453,8 @@ class TestServerInstall(CALessBase): def test_nonexistent_ds_pkcs12_file(self): "IPA server install with non-existent DS PKCS#12 file" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca2') result = self.install_server(dirsrv_pkcs12='does_not_exist', dirsrv_pkcs12_exists=False) @@ -446,9 +464,8 @@ class TestServerInstall(CALessBase): def test_missing_http_password(self): "IPA server install with missing HTTP PKCS#12 password (unattended)" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') result = self.install_server(http_pin=None) assert_error(result, @@ -459,9 +476,8 @@ class TestServerInstall(CALessBase): def test_missing_ds_password(self): "IPA server install with missing DS PKCS#12 password (unattended)" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') result = self.install_server(dirsrv_pin=None) assert_error(result, @@ -473,9 +489,8 @@ class TestServerInstall(CALessBase): def test_incorect_http_pin(self): "IPA server install with incorrect HTTP PKCS#12 password" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') result = self.install_server(http_pin='bad') assert_error(result, 'incorrect password for pkcs#12 file server.p12') @@ -485,9 +500,8 @@ class TestServerInstall(CALessBase): def test_incorect_ds_pin(self): "IPA server install with incorrect DS PKCS#12 password" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') result = self.install_server(dirsrv_pin='bad') assert_error(result, 'incorrect password for pkcs#12 file server.p12') @@ -496,10 +510,9 @@ class TestServerInstall(CALessBase): def test_invalid_http_cn(self): "IPA server install with HTTP certificate with invalid CN" - self.export_pkcs12('ca1/server-badname', filename='http.p12') - self.export_pkcs12('ca1/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server-badname', filename='http.p12') + self.create_pkcs12('ca1/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -511,9 +524,8 @@ class TestServerInstall(CALessBase): def test_invalid_ds_cn(self): "IPA server install with DS certificate with invalid CN" - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -525,10 +537,9 @@ class TestServerInstall(CALessBase): def test_expired_http(self): "IPA server install with expired HTTP certificate" - self.export_pkcs12('ca1/server-expired', filename='http.p12') - self.export_pkcs12('ca1/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server-expired', filename='http.p12') + self.create_pkcs12('ca1/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -540,10 +551,9 @@ class TestServerInstall(CALessBase): def test_expired_ds(self): "IPA server install with expired DS certificate" - self.export_pkcs12('ca1/server', filename='http.p12') - self.export_pkcs12('ca1/server-expired', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server', filename='http.p12') + self.create_pkcs12('ca1/server-expired', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -555,10 +565,9 @@ class TestServerInstall(CALessBase): def test_http_bad_usage(self): "IPA server install with HTTP certificate with invalid key usage" - self.export_pkcs12('ca1/server-badusage', filename='http.p12') - self.export_pkcs12('ca1/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server-badusage', filename='http.p12') + self.create_pkcs12('ca1/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -570,10 +579,9 @@ class TestServerInstall(CALessBase): def test_ds_bad_usage(self): "IPA server install with DS certificate with invalid key usage" - self.export_pkcs12('ca1/server', filename='http.p12') - self.export_pkcs12('ca1/server-badusage', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server', filename='http.p12') + self.create_pkcs12('ca1/server-badusage', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -585,10 +593,9 @@ class TestServerInstall(CALessBase): def test_revoked_http(self): "IPA server install with revoked HTTP certificate" - self.export_pkcs12('ca1/server-revoked', filename='http.p12') - self.export_pkcs12('ca1/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server-revoked', filename='http.p12') + self.create_pkcs12('ca1/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -604,10 +611,9 @@ class TestServerInstall(CALessBase): def test_revoked_ds(self): "IPA server install with revoked DS certificate" - self.export_pkcs12('ca1/server', filename='http.p12') - self.export_pkcs12('ca1/server-revoked', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server', filename='http.p12') + self.create_pkcs12('ca1/server-revoked', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -623,10 +629,9 @@ class TestServerInstall(CALessBase): def test_http_intermediate_ca(self): "IPA server install with HTTP certificate issued by intermediate CA" - self.export_pkcs12('ca1/subca/server', filename='http.p12') - self.export_pkcs12('ca1/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/subca/server', filename='http.p12') + self.create_pkcs12('ca1/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -638,10 +643,9 @@ class TestServerInstall(CALessBase): def test_ds_intermediate_ca(self): "IPA server install with DS certificate issued by intermediate CA" - self.export_pkcs12('ca1/server', filename='http.p12') - self.export_pkcs12('ca1/subca/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server', filename='http.p12') + self.create_pkcs12('ca1/subca/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -653,9 +657,8 @@ class TestServerInstall(CALessBase): def test_ca_self_signed(self): "IPA server install with self-signed certificate" - self.export_pkcs12('server-selfsign') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('server-selfsign')) + self.create_pkcs12('server-selfsign') + self.prepare_cacert('server-selfsign') result = self.install_server() assert result.returncode > 0 @@ -664,9 +667,8 @@ class TestServerInstall(CALessBase): def test_valid_certs(self): "IPA server install with valid certificates" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') result = self.install_server() assert result.returncode == 0 @@ -677,10 +679,9 @@ class TestServerInstall(CALessBase): def test_wildcard_http(self): "IPA server install with wildcard HTTP certificate" - self.export_pkcs12('ca1/wildcard', filename='http.p12') - self.export_pkcs12('ca1/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/wildcard', filename='http.p12') + self.create_pkcs12('ca1/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -692,10 +693,9 @@ class TestServerInstall(CALessBase): def test_wildcard_ds(self): "IPA server install with wildcard DS certificate" - self.export_pkcs12('ca1/server', filename='http.p12') - self.export_pkcs12('ca1/wildcard', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server', filename='http.p12') + self.create_pkcs12('ca1/wildcard', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -706,10 +706,9 @@ class TestServerInstall(CALessBase): def test_http_san(self): "IPA server install with HTTP certificate with SAN" - self.export_pkcs12('ca1/server-altname', filename='http.p12') - self.export_pkcs12('ca1/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server-altname', filename='http.p12') + self.create_pkcs12('ca1/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -720,10 +719,9 @@ class TestServerInstall(CALessBase): def test_ds_san(self): "IPA server install with DS certificate with SAN" - self.export_pkcs12('ca1/server', filename='http.p12') - self.export_pkcs12('ca1/server-altname', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server', filename='http.p12') + self.create_pkcs12('ca1/server-altname', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -734,9 +732,8 @@ class TestServerInstall(CALessBase): def test_interactive_missing_http_pkcs_password(self): "IPA server install with prompt for HTTP PKCS#12 password" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') stdin_text = get_install_stdin(cert_passwords=[self.cert_password]) @@ -751,9 +748,8 @@ class TestServerInstall(CALessBase): def test_interactive_missing_ds_pkcs_password(self): "IPA server install with prompt for DS PKCS#12 password" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') stdin_text = get_install_stdin(cert_passwords=[self.cert_password]) @@ -768,10 +764,9 @@ class TestServerInstall(CALessBase): def test_no_http_password(self): "IPA server install with empty HTTP password" - self.export_pkcs12('ca1/server', filename='http.p12', password='') - self.export_pkcs12('ca1/server', filename='dirsrv.p12') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server', filename='http.p12', password='') + self.create_pkcs12('ca1/server', filename='dirsrv.p12') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12', @@ -783,10 +778,9 @@ class TestServerInstall(CALessBase): def test_no_ds_password(self): "IPA server install with empty DS password" - self.export_pkcs12('ca1/server', filename='http.p12') - self.export_pkcs12('ca1/server', filename='dirsrv.p12', password='') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server', filename='http.p12') + self.create_pkcs12('ca1/server', filename='dirsrv.p12', password='') + self.prepare_cacert('ca1') result = self.install_server(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12', @@ -801,9 +795,8 @@ class TestReplicaInstall(CALessBase): @classmethod def install(cls, mh): super(TestReplicaInstall, cls).install(mh) - cls.export_pkcs12('ca1/server') - with open(cls.pem_filename, 'w') as f: - f.write(cls.get_pem('ca1')) + cls.create_pkcs12('ca1/server') + cls.prepare_cacert('ca1') result = cls.install_server() assert result.returncode == 0 @@ -821,7 +814,7 @@ class TestReplicaInstall(CALessBase): def test_nonexistent_http_pkcs12_file(self): "IPA replica install with non-existent DS PKCS#12 file" - self.export_pkcs12('ca1/replica', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='http.p12') result = self.prepare_replica(dirsrv_pkcs12='does_not_exist', http_pkcs12='http.p12') @@ -831,7 +824,7 @@ class TestReplicaInstall(CALessBase): def test_nonexistent_ds_pkcs12_file(self): "IPA replica install with non-existent HTTP PKCS#12 file" - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='does_not_exist', dirsrv_pkcs12='dirsrv.p12') @@ -842,7 +835,7 @@ class TestReplicaInstall(CALessBase): def test_incorect_http_pin(self): "IPA replica install with incorrect HTTP PKCS#12 password" - self.export_pkcs12('ca1/replica', filename='replica.p12') + self.create_pkcs12('ca1/replica', filename='replica.p12') result = self.prepare_replica(http_pin='bad') assert result.returncode > 0 @@ -853,7 +846,7 @@ class TestReplicaInstall(CALessBase): def test_incorect_ds_pin(self): "IPA replica install with incorrect DS PKCS#12 password" - self.export_pkcs12('ca1/replica', filename='replica.p12') + self.create_pkcs12('ca1/replica', filename='replica.p12') result = self.prepare_replica(dirsrv_pin='bad') assert_error(result, 'incorrect password for pkcs#12 file replica.p12') @@ -862,8 +855,8 @@ class TestReplicaInstall(CALessBase): def test_http_unknown_ca(self): "IPA replica install with HTTP certificate issued by unknown CA" - self.export_pkcs12('ca2/replica', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca2/replica', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -875,8 +868,8 @@ class TestReplicaInstall(CALessBase): def test_ds_unknown_ca(self): "IPA replica install with DS certificate issued by unknown CA" - self.export_pkcs12('ca1/replica', filename='http.p12') - self.export_pkcs12('ca2/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica', filename='http.p12') + self.create_pkcs12('ca2/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -888,8 +881,8 @@ class TestReplicaInstall(CALessBase): def test_invalid_http_cn(self): "IPA replica install with HTTP certificate with invalid CN" - self.export_pkcs12('ca1/replica-badname', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica-badname', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -901,8 +894,8 @@ class TestReplicaInstall(CALessBase): def test_invalid_ds_cn(self): "IPA replica install with DS certificate with invalid CN" - self.export_pkcs12('ca1/replica', filename='http.p12') - self.export_pkcs12('ca1/replica-badname', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica', filename='http.p12') + self.create_pkcs12('ca1/replica-badname', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -914,8 +907,8 @@ class TestReplicaInstall(CALessBase): def test_expired_http(self): "IPA replica install with expired HTTP certificate" - self.export_pkcs12('ca1/replica-expired', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica-expired', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -927,8 +920,8 @@ class TestReplicaInstall(CALessBase): def test_expired_ds(self): "IPA replica install with expired DS certificate" - self.export_pkcs12('ca1/replica-expired', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica-expired', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -940,8 +933,8 @@ class TestReplicaInstall(CALessBase): def test_http_bad_usage(self): "IPA replica install with HTTP certificate with invalid key usage" - self.export_pkcs12('ca1/replica-badusage', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica-badusage', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -953,8 +946,8 @@ class TestReplicaInstall(CALessBase): def test_ds_bad_usage(self): "IPA replica install with DS certificate with invalid key usage" - self.export_pkcs12('ca1/replica', filename='http.p12') - self.export_pkcs12('ca1/replica-badusage', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica', filename='http.p12') + self.create_pkcs12('ca1/replica-badusage', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -966,8 +959,8 @@ class TestReplicaInstall(CALessBase): def test_revoked_http(self): "IPA replica install with revoked HTTP certificate" - self.export_pkcs12('ca1/replica-revoked', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica-revoked', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -983,8 +976,8 @@ class TestReplicaInstall(CALessBase): def test_revoked_ds(self): "IPA replica install with revoked DS certificate" - self.export_pkcs12('ca1/replica', filename='http.p12') - self.export_pkcs12('ca1/replica-revoked', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica', filename='http.p12') + self.create_pkcs12('ca1/replica-revoked', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -1000,8 +993,8 @@ class TestReplicaInstall(CALessBase): def test_http_intermediate_ca(self): "IPA replica install with HTTP certificate issued by intermediate CA" - self.export_pkcs12('ca1/subca/replica', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/subca/replica', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -1013,8 +1006,8 @@ class TestReplicaInstall(CALessBase): def test_ds_intermediate_ca(self): "IPA replica install with DS certificate issued by intermediate CA" - self.export_pkcs12('ca1/replica', filename='http.p12') - self.export_pkcs12('ca1/subca/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica', filename='http.p12') + self.create_pkcs12('ca1/subca/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -1026,7 +1019,7 @@ class TestReplicaInstall(CALessBase): def test_valid_certs(self): "IPA replica install with valid certificates" - self.export_pkcs12('ca1/replica', filename='server.p12') + self.create_pkcs12('ca1/replica', filename='server.p12') result = self.prepare_replica(http_pkcs12='server.p12', dirsrv_pkcs12='server.p12') @@ -1039,8 +1032,8 @@ class TestReplicaInstall(CALessBase): def test_wildcard_http(self): "IPA replica install with wildcard HTTP certificate" - self.export_pkcs12('ca1/wildcard', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/wildcard', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -1053,8 +1046,8 @@ class TestReplicaInstall(CALessBase): def test_wildcard_ds(self): "IPA replica install with wildcard DS certificate" - self.export_pkcs12('ca1/wildcard', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/wildcard', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -1066,8 +1059,8 @@ class TestReplicaInstall(CALessBase): def test_http_san(self): "IPA replica install with HTTP certificate with SAN" - self.export_pkcs12('ca1/replica-altname', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica-altname', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -1079,8 +1072,8 @@ class TestReplicaInstall(CALessBase): def test_ds_san(self): "IPA replica install with DS certificate with SAN" - self.export_pkcs12('ca1/replica', filename='http.p12') - self.export_pkcs12('ca1/replica-altname', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica', filename='http.p12') + self.create_pkcs12('ca1/replica-altname', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12') @@ -1092,7 +1085,7 @@ class TestReplicaInstall(CALessBase): def test_interactive_missing_http_pkcs_password(self): "IPA replica install with missing HTTP PKCS#12 password" - self.export_pkcs12('ca1/replica', filename='replica.p12') + self.create_pkcs12('ca1/replica', filename='replica.p12') stdin_text = get_replica_prepare_stdin( cert_passwords=[self.cert_password]) @@ -1107,7 +1100,7 @@ class TestReplicaInstall(CALessBase): def test_interactive_missing_ds_pkcs_password(self): "IPA replica install with missing DS PKCS#12 password" - self.export_pkcs12('ca1/replica', filename='replica.p12') + self.create_pkcs12('ca1/replica', filename='replica.p12') stdin_text = get_replica_prepare_stdin( cert_passwords=[self.cert_password]) @@ -1122,8 +1115,8 @@ class TestReplicaInstall(CALessBase): def test_no_http_password(self): "IPA replica install with empty HTTP password" - self.export_pkcs12('ca1/replica', filename='http.p12', password='') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12') + self.create_pkcs12('ca1/replica', filename='http.p12', password='') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12', @@ -1136,8 +1129,8 @@ class TestReplicaInstall(CALessBase): def test_no_ds_password(self): "IPA replica install with empty DS password" - self.export_pkcs12('ca1/replica', filename='http.p12') - self.export_pkcs12('ca1/replica', filename='dirsrv.p12', password='') + self.create_pkcs12('ca1/replica', filename='http.p12') + self.create_pkcs12('ca1/replica', filename='dirsrv.p12', password='') result = self.prepare_replica(http_pkcs12='http.p12', dirsrv_pkcs12='dirsrv.p12', @@ -1153,9 +1146,8 @@ class TestClientInstall(CALessBase): def test_client_install(self): "IPA client install" - self.export_pkcs12('ca1/server') - with open(self.pem_filename, 'w') as f: - f.write(self.get_pem('ca1')) + self.create_pkcs12('ca1/server') + self.prepare_cacert('ca1') result = self.install_server() assert result.returncode == 0 @@ -1175,9 +1167,8 @@ class TestIPACommands(CALessBase): def install(cls, mh): super(TestIPACommands, cls).install(mh) - cls.export_pkcs12('ca1/server') - with open(cls.pem_filename, 'w') as f: - f.write(cls.get_pem('ca1')) + cls.create_pkcs12('ca1/server') + cls.prepare_cacert('ca1') result = cls.install_server() assert result.returncode == 0 @@ -1261,14 +1252,13 @@ class TestIPACommands(CALessBase): self.master.run_command(['ipa', 'host-del', self.test_hostname]) -class TestCertinstall(CALessBase): +class TestCertInstall(CALessBase): @classmethod def install(cls, mh): - super(TestCertinstall, cls).install(mh) + super(TestCertInstall, cls).install(mh) - cls.export_pkcs12('ca1/server') - with open(cls.pem_filename, 'w') as f: - f.write(cls.get_pem('ca1')) + cls.create_pkcs12('ca1/server') + cls.prepare_cacert('ca1') result = cls.install_server() assert result.returncode == 0 @@ -1279,7 +1269,7 @@ class TestCertinstall(CALessBase): filename='server.p12', pin=_DEFAULT, stdin_text=None, p12_pin=None, args=None): if cert_nick: - self.export_pkcs12(cert_nick, password=p12_pin) + self.create_pkcs12(cert_nick, password=p12_pin) if pin is _DEFAULT: pin = self.cert_password if cert_exists: @@ -1514,18 +1504,17 @@ class TestPKINIT(CALessBase): @classmethod def install(cls, mh): super(TestPKINIT, cls).install(mh) - cls.export_pkcs12('ca1/server') - cls.copy_pkinit_cert(cls.master, 'ca1/pkinit-server.p12') - with open(cls.pem_filename, 'w') as f: - f.write(cls.get_pem('ca1')) + cls.create_pkcs12('ca1/server') + cls.create_pkcs12('ca1/server-kdc', filename='server-kdc.p12') + cls.prepare_cacert('ca1') result = cls.install_server(pkinit_pkcs12_exists=True, pkinit_pin=_DEFAULT) assert result.returncode == 0 @replica_install_teardown def test_server_replica_install_pkinit(self): - self.export_pkcs12('ca1/replica', filename='replica.p12') - self.copy_pkinit_cert(self.replicas[0], 'ca1/pkinit-replica.p12') + self.create_pkcs12('ca1/replica', filename='replica.p12') + self.create_pkcs12('ca1/replica-kdc', filename='replica-kdc.p12') result = self.prepare_replica(pkinit_pkcs12_exists=True, pkinit_pin=_DEFAULT) assert result.returncode == 0