diff --git a/ipaclient/plugins/cert.py b/ipaclient/plugins/cert.py index 0377c511c..50549cf59 100644 --- a/ipaclient/plugins/cert.py +++ b/ipaclient/plugins/cert.py @@ -206,6 +206,7 @@ class cert_find(MethodOverride): raise errors.MutuallyExclusiveError( reason=_("cannot specify both raw certificate and file")) if 'certificate' not in options and 'file' in options: - options['certificate'] = x509.strip_header(options.pop('file')) + options['certificate'] = x509.load_unknown_x509_certificate( + options.pop('file')) return super(cert_find, self).forward(*args, **options) diff --git a/ipaclient/plugins/certmap.py b/ipaclient/plugins/certmap.py index 50a594f39..981ba292f 100644 --- a/ipaclient/plugins/certmap.py +++ b/ipaclient/plugins/certmap.py @@ -40,7 +40,7 @@ class certmap_match(MethodOverride): raise errors.MutuallyExclusiveError( reason=_("cannot specify both raw certificate and file")) if args: - args = [x509.strip_header(args[0])] + args = [x509.load_unknown_x509_certificate(args[0])] elif 'certificate' in options: args = [options.pop('certificate')] else: diff --git a/ipalib/x509.py b/ipalib/x509.py index 205e2f82d..8efba37d3 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -88,21 +88,6 @@ def subject_base(): return _subject_base -def strip_header(pem): - """ - Remove the header and footer from a certificate. - """ - regexp = ( - u"^-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----" - ) - if isinstance(pem, bytes): - regexp = regexp.encode('ascii') - s = re.search(regexp, pem, re.MULTILINE | re.DOTALL) - if s is not None: - return s.group(1) - else: - return pem - @crypto_utils.register_interface(crypto_x509.Certificate) class IPACertificate(object): diff --git a/ipatests/test_integration/test_caless.py b/ipatests/test_integration/test_caless.py index 95c55fb8d..d32e22357 100644 --- a/ipatests/test_integration/test_caless.py +++ b/ipatests/test_integration/test_caless.py @@ -21,7 +21,6 @@ import logging import os import tempfile import shutil -import base64 import glob import contextlib import nose @@ -360,10 +359,9 @@ class CALessBase(IntegrationTest): expected_cacrt = f.read() logger.debug('Expected /etc/ipa/ca.crt contents:\n%s', expected_cacrt) - expected_binary_cacrt = base64.b64decode(x509.strip_header( - expected_cacrt)) + expected_cacrt = x509.load_unknown_x509_certificate(expected_cacrt) logger.debug('Expected binary CA cert:\n%r', - expected_binary_cacrt) + expected_cacrt) for host in [self.master] + self.replicas: # Check the LDAP entry ldap = host.ldap_connect() @@ -373,7 +371,7 @@ class CALessBase(IntegrationTest): cert_from_ldap = entry.single_value['cACertificate'] logger.debug('CA cert from LDAP on %s:\n%r', host, cert_from_ldap) - assert cert_from_ldap == expected_binary_cacrt + assert cert_from_ldap == expected_cacrt # Verify certmonger was not started result = host.run_command(['getcert', 'list'], raiseonerr=False) @@ -384,10 +382,10 @@ class CALessBase(IntegrationTest): remote_cacrt = host.get_file_contents(paths.IPA_CA_CRT) logger.debug('%s:/etc/ipa/ca.crt contents:\n%s', host, remote_cacrt) - binary_cacrt = base64.b64decode(x509.strip_header(remote_cacrt)) + cacrt = x509.load_unknown_x509_certificate(remote_cacrt) logger.debug('%s: Decoded /etc/ipa/ca.crt:\n%r', - host, binary_cacrt) - assert expected_binary_cacrt == binary_cacrt + host, cacrt) + assert expected_cacrt == cacrt class TestServerInstall(CALessBase): diff --git a/ipatests/test_xmlrpc/testcert.py b/ipatests/test_xmlrpc/testcert.py index 151919180..6ea5a50ee 100644 --- a/ipatests/test_xmlrpc/testcert.py +++ b/ipatests/test_xmlrpc/testcert.py @@ -30,6 +30,7 @@ import tempfile import shutil import six import base64 +import re from ipalib import api, x509 from ipaserver.plugins import rabase @@ -40,6 +41,20 @@ if six.PY3: unicode = str +def strip_cert_header(pem): + """ + Remove the header and footer from a certificate. + """ + regexp = ( + r"^-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----" + ) + s = re.search(regexp, pem, re.MULTILINE | re.DOTALL) + if s is not None: + return s.group(1) + else: + return pem + + def get_testcert(subject, principal): """Get the certificate, creating it if it doesn't exist""" reqdir = tempfile.mkdtemp(prefix="tmp-") @@ -48,7 +63,7 @@ def get_testcert(subject, principal): principal) finally: shutil.rmtree(reqdir) - return x509.strip_header(_testcert) + return strip_cert_header(_testcert.decode('utf-8')) def run_certutil(reqdir, args, stdin=None): @@ -99,4 +114,4 @@ def makecert(reqdir, subject, principal): res = api.Command['cert_request'](csr, principal=principal, add=True) cert = x509.load_der_x509_certificate( base64.b64decode(res['result']['certificate'])) - return cert.public_bytes(x509.Encoding.PEM).decode('utf-8') + return cert.public_bytes(x509.Encoding.PEM)