mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
x509: remove the strip_header() function
We don't need the strip_header() function, to load an unknown x509 certificate, load_unknown_x509_certificate() should be used. Reviewed-By: Tibor Dudlak <tdudlak@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
parent
d65297311d
commit
c9265a7b05
@ -206,6 +206,7 @@ class cert_find(MethodOverride):
|
|||||||
raise errors.MutuallyExclusiveError(
|
raise errors.MutuallyExclusiveError(
|
||||||
reason=_("cannot specify both raw certificate and file"))
|
reason=_("cannot specify both raw certificate and file"))
|
||||||
if 'certificate' not in options and 'file' in options:
|
if 'certificate' not in options and 'file' in options:
|
||||||
options['certificate'] = x509.strip_header(options.pop('file'))
|
options['certificate'] = x509.load_unknown_x509_certificate(
|
||||||
|
options.pop('file'))
|
||||||
|
|
||||||
return super(cert_find, self).forward(*args, **options)
|
return super(cert_find, self).forward(*args, **options)
|
||||||
|
@ -40,7 +40,7 @@ class certmap_match(MethodOverride):
|
|||||||
raise errors.MutuallyExclusiveError(
|
raise errors.MutuallyExclusiveError(
|
||||||
reason=_("cannot specify both raw certificate and file"))
|
reason=_("cannot specify both raw certificate and file"))
|
||||||
if args:
|
if args:
|
||||||
args = [x509.strip_header(args[0])]
|
args = [x509.load_unknown_x509_certificate(args[0])]
|
||||||
elif 'certificate' in options:
|
elif 'certificate' in options:
|
||||||
args = [options.pop('certificate')]
|
args = [options.pop('certificate')]
|
||||||
else:
|
else:
|
||||||
|
@ -88,21 +88,6 @@ def subject_base():
|
|||||||
|
|
||||||
return _subject_base
|
return _subject_base
|
||||||
|
|
||||||
def strip_header(pem):
|
|
||||||
"""
|
|
||||||
Remove the header and footer from a certificate.
|
|
||||||
"""
|
|
||||||
regexp = (
|
|
||||||
u"^-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----"
|
|
||||||
)
|
|
||||||
if isinstance(pem, bytes):
|
|
||||||
regexp = regexp.encode('ascii')
|
|
||||||
s = re.search(regexp, pem, re.MULTILINE | re.DOTALL)
|
|
||||||
if s is not None:
|
|
||||||
return s.group(1)
|
|
||||||
else:
|
|
||||||
return pem
|
|
||||||
|
|
||||||
|
|
||||||
@crypto_utils.register_interface(crypto_x509.Certificate)
|
@crypto_utils.register_interface(crypto_x509.Certificate)
|
||||||
class IPACertificate(object):
|
class IPACertificate(object):
|
||||||
|
@ -21,7 +21,6 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
import base64
|
|
||||||
import glob
|
import glob
|
||||||
import contextlib
|
import contextlib
|
||||||
import nose
|
import nose
|
||||||
@ -360,10 +359,9 @@ class CALessBase(IntegrationTest):
|
|||||||
expected_cacrt = f.read()
|
expected_cacrt = f.read()
|
||||||
logger.debug('Expected /etc/ipa/ca.crt contents:\n%s',
|
logger.debug('Expected /etc/ipa/ca.crt contents:\n%s',
|
||||||
expected_cacrt)
|
expected_cacrt)
|
||||||
expected_binary_cacrt = base64.b64decode(x509.strip_header(
|
expected_cacrt = x509.load_unknown_x509_certificate(expected_cacrt)
|
||||||
expected_cacrt))
|
|
||||||
logger.debug('Expected binary CA cert:\n%r',
|
logger.debug('Expected binary CA cert:\n%r',
|
||||||
expected_binary_cacrt)
|
expected_cacrt)
|
||||||
for host in [self.master] + self.replicas:
|
for host in [self.master] + self.replicas:
|
||||||
# Check the LDAP entry
|
# Check the LDAP entry
|
||||||
ldap = host.ldap_connect()
|
ldap = host.ldap_connect()
|
||||||
@ -373,7 +371,7 @@ class CALessBase(IntegrationTest):
|
|||||||
cert_from_ldap = entry.single_value['cACertificate']
|
cert_from_ldap = entry.single_value['cACertificate']
|
||||||
logger.debug('CA cert from LDAP on %s:\n%r',
|
logger.debug('CA cert from LDAP on %s:\n%r',
|
||||||
host, cert_from_ldap)
|
host, cert_from_ldap)
|
||||||
assert cert_from_ldap == expected_binary_cacrt
|
assert cert_from_ldap == expected_cacrt
|
||||||
|
|
||||||
# Verify certmonger was not started
|
# Verify certmonger was not started
|
||||||
result = host.run_command(['getcert', 'list'], raiseonerr=False)
|
result = host.run_command(['getcert', 'list'], raiseonerr=False)
|
||||||
@ -384,10 +382,10 @@ class CALessBase(IntegrationTest):
|
|||||||
remote_cacrt = host.get_file_contents(paths.IPA_CA_CRT)
|
remote_cacrt = host.get_file_contents(paths.IPA_CA_CRT)
|
||||||
logger.debug('%s:/etc/ipa/ca.crt contents:\n%s',
|
logger.debug('%s:/etc/ipa/ca.crt contents:\n%s',
|
||||||
host, remote_cacrt)
|
host, remote_cacrt)
|
||||||
binary_cacrt = base64.b64decode(x509.strip_header(remote_cacrt))
|
cacrt = x509.load_unknown_x509_certificate(remote_cacrt)
|
||||||
logger.debug('%s: Decoded /etc/ipa/ca.crt:\n%r',
|
logger.debug('%s: Decoded /etc/ipa/ca.crt:\n%r',
|
||||||
host, binary_cacrt)
|
host, cacrt)
|
||||||
assert expected_binary_cacrt == binary_cacrt
|
assert expected_cacrt == cacrt
|
||||||
|
|
||||||
|
|
||||||
class TestServerInstall(CALessBase):
|
class TestServerInstall(CALessBase):
|
||||||
|
@ -30,6 +30,7 @@ import tempfile
|
|||||||
import shutil
|
import shutil
|
||||||
import six
|
import six
|
||||||
import base64
|
import base64
|
||||||
|
import re
|
||||||
|
|
||||||
from ipalib import api, x509
|
from ipalib import api, x509
|
||||||
from ipaserver.plugins import rabase
|
from ipaserver.plugins import rabase
|
||||||
@ -40,6 +41,20 @@ if six.PY3:
|
|||||||
unicode = str
|
unicode = str
|
||||||
|
|
||||||
|
|
||||||
|
def strip_cert_header(pem):
|
||||||
|
"""
|
||||||
|
Remove the header and footer from a certificate.
|
||||||
|
"""
|
||||||
|
regexp = (
|
||||||
|
r"^-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----"
|
||||||
|
)
|
||||||
|
s = re.search(regexp, pem, re.MULTILINE | re.DOTALL)
|
||||||
|
if s is not None:
|
||||||
|
return s.group(1)
|
||||||
|
else:
|
||||||
|
return pem
|
||||||
|
|
||||||
|
|
||||||
def get_testcert(subject, principal):
|
def get_testcert(subject, principal):
|
||||||
"""Get the certificate, creating it if it doesn't exist"""
|
"""Get the certificate, creating it if it doesn't exist"""
|
||||||
reqdir = tempfile.mkdtemp(prefix="tmp-")
|
reqdir = tempfile.mkdtemp(prefix="tmp-")
|
||||||
@ -48,7 +63,7 @@ def get_testcert(subject, principal):
|
|||||||
principal)
|
principal)
|
||||||
finally:
|
finally:
|
||||||
shutil.rmtree(reqdir)
|
shutil.rmtree(reqdir)
|
||||||
return x509.strip_header(_testcert)
|
return strip_cert_header(_testcert.decode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
def run_certutil(reqdir, args, stdin=None):
|
def run_certutil(reqdir, args, stdin=None):
|
||||||
@ -99,4 +114,4 @@ def makecert(reqdir, subject, principal):
|
|||||||
res = api.Command['cert_request'](csr, principal=principal, add=True)
|
res = api.Command['cert_request'](csr, principal=principal, add=True)
|
||||||
cert = x509.load_der_x509_certificate(
|
cert = x509.load_der_x509_certificate(
|
||||||
base64.b64decode(res['result']['certificate']))
|
base64.b64decode(res['result']['certificate']))
|
||||||
return cert.public_bytes(x509.Encoding.PEM).decode('utf-8')
|
return cert.public_bytes(x509.Encoding.PEM)
|
||||||
|
Loading…
Reference in New Issue
Block a user