diff --git a/ipalib/x509.py b/ipalib/x509.py index 67a9af4c5..64793c896 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -31,6 +31,7 @@ from __future__ import print_function +import os import binascii import datetime import ipaddress @@ -41,8 +42,9 @@ import re from cryptography import x509 as crypto_x509 from cryptography import utils as crypto_utils from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.serialization import ( - Encoding, PublicFormat + Encoding, PublicFormat, PrivateFormat, load_pem_private_key ) from pyasn1.type import univ, char, namedtype, tag from pyasn1.codec.der import decoder, encoder @@ -59,9 +61,13 @@ if six.PY3: PEM = 0 DER = 1 -PEM_REGEX = re.compile( +PEM_CERT_REGEX = re.compile( b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', re.DOTALL) +PEM_PRIV_REGEX = re.compile( + b'-----BEGIN(?: ENCRYPTED)?(?: (?:RSA|DSA|DH|EC))? PRIVATE KEY-----.*?' + b'-----END(?: ENCRYPTED)?(?: (?:RSA|DSA|DH|EC))? PRIVATE KEY-----', + re.DOTALL) EKU_SERVER_AUTH = '1.3.6.1.5.5.7.3.1' EKU_CLIENT_AUTH = '1.3.6.1.5.5.7.3.2' @@ -431,7 +437,7 @@ def load_certificate_list(data): Return a list of python-cryptography ``Certificate`` objects. """ - certs = PEM_REGEX.findall(data) + certs = PEM_CERT_REGEX.findall(data) return [load_pem_x509_certificate(cert) for cert in certs] @@ -446,6 +452,35 @@ def load_certificate_list_from_file(filename): return load_certificate_list(f.read()) +def load_private_key_list(data, password=None): + """ + Load a private key list from a sequence of concatenated PEMs. + + :param data: bytes containing the private keys + :param password: bytes, the password to encrypted keys in the bundle + + :returns: List of python-cryptography ``PrivateKey`` objects + """ + crypto_backend = default_backend() + priv_keys = [] + + for match in re.finditer(PEM_PRIV_REGEX, data): + if re.search(b"ENCRYPTED", match.group()) is not None: + if password is None: + raise RuntimeError("Password is required for the encrypted " + "keys in the bundle.") + # Load private key as encrypted + priv_keys.append( + load_pem_private_key(match.group(), password, + backend=crypto_backend)) + else: + priv_keys.append( + load_pem_private_key(match.group(), None, + backend=crypto_backend)) + + return priv_keys + + def pkcs7_to_certs(data, datatype=PEM): """ Extract certificates from a PKCS #7 object. @@ -535,6 +570,24 @@ def write_certificate_list(certs, filename): raise errors.FileError(reason=str(e)) +def write_pem_private_key(priv_key, filename): + """ + Write a private key to a file in PEM format. Will force 0x600 permissions + on file. + + :param priv_key: cryptography ``PrivateKey`` object + """ + try: + with open(filename, 'wb') as fp: + os.fchmod(fp.fileno(), 0o600) + fp.write(priv_key.private_bytes( + Encoding.PEM, + PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + except (IOError, OSError) as e: + raise errors.FileError(reason=str(e)) + + class _PrincipalName(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('name-type', univ.Integer().subtype( diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index 2349a1c5c..f6102605e 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -86,6 +86,28 @@ def export_pem_p12(pkcs12_fname, pkcs12_pwd_fname, nickname, pem_fname): "-passout", "file:" + pkcs12_pwd_fname]) +def pkcs12_to_certkeys(p12_fname, p12_passwd=None): + """ + Deserializes pkcs12 file to python objects + + :param p12_fname: A PKCS#12 filename + :param p12_passwd: Optional password for the pkcs12_fname file + """ + args = [paths.OPENSSL, "pkcs12", "-in", p12_fname, "-nodes"] + if p12_passwd: + pwd = ipautil.write_tmp_file(p12_passwd) + args.extend(["-passin", "file:{fname}".format(fname=pwd.name)]) + else: + args.extend(["-passin", "pass:"]) + + pems = ipautil.run(args, capture_output=True).raw_output + + certs = x509.load_certificate_list(pems) + priv_keys = x509.load_private_key_list(pems) + + return (certs, priv_keys) + + def is_ipa_issued_cert(api, cert): """ Return True if the certificate has been issued by IPA diff --git a/ipaserver/install/httpinstance.py b/ipaserver/install/httpinstance.py index e8ddd4863..09d62a6f6 100644 --- a/ipaserver/install/httpinstance.py +++ b/ipaserver/install/httpinstance.py @@ -326,27 +326,34 @@ class HTTPInstance(service.Service): certmonger.stop() def __setup_ssl(self): - db = certs.CertDB(self.realm, nssdir=paths.HTTPD_ALIAS_DIR, - subject_base=self.subject_base, user="root", - group=constants.HTTPD_GROUP, - create=True) - if self.pkcs12_info: - # db.init_from_pkcs12(self.pkcs12_info[0], self.pkcs12_info[1], - # ca_file=self.ca_file, - # trust_flags=trust_flags) - server_certs = db.find_server_certs() - if len(server_certs) == 0: + p12_certs, p12_priv_keys = certs.pkcs12_to_certkeys( + *self.pkcs12_info) + keys_dict = { + k.public_key().public_numbers(): k + for k in p12_priv_keys + } + certs_keys = [ + (c, keys_dict.get(c.public_key().public_numbers())) + for c in p12_certs + ] + server_certs_keys = [ + (c, k) for c, k in certs_keys if k is not None + ] + + if not server_certs_keys: raise RuntimeError( "Could not find a suitable server cert in import in %s" % self.pkcs12_info[0] ) # We only handle one server cert - nickname = server_certs[0][0] - if nickname == 'ipaCert': - nickname = server_certs[1][0] - self.cert = x509.load_der_x509_certificate(paths.HTTPD_CERT_FILE) + self.cert = server_certs_keys[0][0] + x509.write_certificate(self.cert, paths.HTTPD_CERT_FILE) + x509.write_pem_private_key( + server_certs_keys[0][1], + paths.HTTPD_KEY_FILE + ) if self.ca_is_configured: self.start_tracking_certificates()