Add function for extracting PEM certs from PKCS #7

Add a single function for extracting X.509 certs in PEM format from
a PKCS #7 object.  Refactor sites that execute ``openssl pkcs7`` to
use the new function.

Part of: https://fedorahosted.org/freeipa/ticket/6178

Reviewed-By: Jan Cholasta <jcholast@redhat.com>
Reviewed-By: Tomas Krizek <tkrizek@redhat.com>
This commit is contained in:
Fraser Tweedale 2016-08-16 13:16:58 +10:00 committed by Jan Cholasta
parent 95e602598a
commit c7ea56c049
3 changed files with 49 additions and 41 deletions

View File

@ -49,6 +49,14 @@ from ipalib import api
from ipalib import util
from ipalib import errors
from ipapython.dn import DN
from ipapython import ipautil
try:
from ipaplatform.paths import paths
except ImportError:
OPENSSL = '/usr/bin/openssl'
else:
OPENSSL = paths.OPENSSL
if six.PY3:
unicode = str
@ -56,7 +64,9 @@ if six.PY3:
PEM = 0
DER = 1
PEM_REGEX = re.compile(r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)', re.DOTALL)
PEM_REGEX = re.compile(
r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
re.DOTALL)
EKU_SERVER_AUTH = '1.3.6.1.5.5.7.3.1'
EKU_CLIENT_AUTH = '1.3.6.1.5.5.7.3.2'
@ -145,6 +155,23 @@ def load_certificate_list_from_file(filename):
return load_certificate_list(f.read())
def pkcs7_to_pems(data, datatype=PEM):
"""
Extract certificates from a PKCS #7 object.
Return a ``list`` of X.509 PEM strings.
May throw ``ipautil.CalledProcessError`` on invalid data.
"""
cmd = [
OPENSSL, "pkcs7", "-print_certs",
"-inform", "PEM" if datatype == PEM else "DER",
]
result = ipautil.run(cmd, stdin=data, capture_output=True)
return PEM_REGEX.findall(result.output)
def is_self_signed(certificate, datatype=PEM):
cert = load_certificate(certificate, datatype)
return cert.issuer == cert.subject

View File

@ -239,13 +239,8 @@ class NSSDatabase(object):
continue
if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'):
args = [
OPENSSL, 'pkcs7',
'-print_certs',
]
try:
result = ipautil.run(
args, stdin=body, capture_output=True)
certs = x509.pkcs7_to_pems(body)
except ipautil.CalledProcessError as e:
if label == 'CERTIFICATE':
root_logger.warning(
@ -257,7 +252,7 @@ class NSSDatabase(object):
filename, line, e)
continue
else:
extracted_certs += result.output + '\n'
extracted_certs += '\n'.join(certs) + '\n'
loaded = True
continue

View File

@ -749,44 +749,30 @@ class CAInstance(DogtagInstance):
# makes openssl throw up.
data = base64.b64decode(chain)
result = ipautil.run(
[paths.OPENSSL,
"pkcs7",
"-inform",
"DER",
"-print_certs",
], stdin=data, capture_output=True)
certlist = result.output
certlist = x509.pkcs7_to_pems(data, x509.DER)
# Ok, now we have all the certificates in certs, walk through it
# and pull out each certificate and add it to our database
st = 1
en = 0
subid = 0
ca_dn = DN(('CN','Certificate Authority'), self.subject_base)
while st > 0:
st = certlist.find('-----BEGIN', en)
en = certlist.find('-----END', en+1)
if st > 0:
try:
(chain_fd, chain_name) = tempfile.mkstemp()
os.write(chain_fd, certlist[st:en+25])
os.close(chain_fd)
(_rdn, subject_dn) = certs.get_cert_nickname(certlist[st:en+25])
if subject_dn == ca_dn:
nick = get_ca_nickname(self.realm)
trust_flags = 'CT,C,C'
else:
nick = str(subject_dn)
trust_flags = ',,'
self.__run_certutil(
['-A', '-t', trust_flags, '-n', nick, '-a',
'-i', chain_name]
)
finally:
os.remove(chain_name)
subid += 1
for cert in certlist:
try:
chain_fd, chain_name = tempfile.mkstemp()
os.write(chain_fd, cert)
os.close(chain_fd)
(_rdn, subject_dn) = certs.get_cert_nickname(cert)
if subject_dn == ca_dn:
nick = get_ca_nickname(self.realm)
trust_flags = 'CT,C,C'
else:
nick = str(subject_dn)
trust_flags = ',,'
self.__run_certutil(
['-A', '-t', trust_flags, '-n', nick, '-a',
'-i', chain_name]
)
finally:
os.remove(chain_name)
# Restore NSS trust flags of all previously existing certificates
for nick, trust_flags in cert_backup_list: