diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index 4d508cde8..82d8290a8 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -195,6 +195,197 @@ class NSSDatabase(object): raise RuntimeError("unknown error import pkcs#12 file %s" % pkcs12_filename) + def import_files(self, files, db_password_filename, import_keys=False, + key_password=None, key_nickname=None): + """ + Import certificates and a single private key from multiple files + + The files may be in PEM and DER certificate, PKCS#7 certificate chain, + PKCS#8 and raw private key and PKCS#12 formats. + + :param files: Names of files to import + :param db_password_filename: Name of file containing the database + password + :param import_keys: Whether to import private keys + :param key_password: Password to decrypt private keys + :param key_nickname: Nickname of the private key to import from PKCS#12 + files + """ + key_file = None + extracted_key = None + extracted_certs = '' + + for filename in files: + try: + with open(filename, 'rb') as f: + data = f.read() + except IOError as e: + raise RuntimeError( + "Failed to open %s: %s" % (filename, e.strerror)) + + # Try to parse the file as PEM file + matches = list(re.finditer( + r'-----BEGIN (.+?)-----(.*?)-----END \1-----', data, re.DOTALL)) + if matches: + loaded = False + for match in matches: + body = match.group() + label = match.group(1) + line = len(data[:match.start() + 1].splitlines()) + + if label in ('CERTIFICATE', 'X509 CERTIFICATE', + 'X.509 CERTIFICATE'): + try: + x509.load_certificate(match.group(2)) + except NSPRError as e: + if label != 'CERTIFICATE': + root_logger.warning( + "Skipping certificate in %s at line %s: %s", + filename, line, e) + continue + else: + extracted_certs += body + '\n' + loaded = True + continue + + if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'): + args = [ + paths.OPENSSL, 'pkcs7', + '-print_certs', + ] + try: + stdout, stderr, rc = ipautil.run(args, stdin=body) + except ipautil.CalledProcessError as e: + if label == 'CERTIFICATE': + root_logger.warning( + "Skipping certificate in %s at line %s: %s", + filename, line, e) + else: + root_logger.warning( + "Skipping PKCS#7 in %s at line %s: %s", + filename, line, e) + continue + else: + extracted_certs += stdout + '\n' + loaded = True + continue + + if label in ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY', + 'RSA PRIVATE KEY', 'DSA PRIVATE KEY', + 'EC PRIVATE KEY'): + if not import_keys: + continue + + if key_file: + raise RuntimeError( + "Can't load private key from both %s and %s" % + (key_file, filename)) + + args = [ + paths.OPENSSL, 'pkcs8', + '-topk8', + '-passout', 'file:' + db_password_filename, + ] + if ((label != 'PRIVATE KEY' and key_password) or + label == 'ENCRYPTED PRIVATE KEY'): + key_pwdfile = ipautil.write_tmp_file(key_password) + args += [ + '-passin', 'file:' + key_pwdfile.name, + ] + try: + stdout, stderr, rc = ipautil.run(args, stdin=body) + except ipautil.CalledProcessError as e: + root_logger.warning( + "Skipping private key in %s at line %s: %s", + filename, line, e) + continue + else: + extracted_key = stdout + key_file = filename + loaded = True + continue + if loaded: + continue + raise RuntimeError("Failed to load %s" % filename) + + # Try to load the file as DER certificate + try: + x509.load_certificate(data, x509.DER) + except NSPRError: + pass + else: + data = x509.make_pem(base64.b64encode(data)) + extracted_certs += data + '\n' + continue + + # Try to import the file as PKCS#12 file + if import_keys: + try: + self.import_pkcs12( + filename, db_password_filename, key_password) + except RuntimeError: + pass + else: + if key_file: + raise RuntimeError( + "Can't load private key from both %s and %s" % + (key_file, filename)) + key_file = filename + + server_certs = self.find_server_certs() + if key_nickname: + for nickname, trust_flags in server_certs: + if nickname == key_nickname: + break + else: + raise RuntimeError( + "Server certificate \"%s\" not found in %s" % + (key_nickname, filename)) + else: + if len(server_certs) > 1: + raise RuntimeError( + "%s server certificates found in %s, " + "expecting only one" % + (len(server_certs), filename)) + + continue + + raise RuntimeError("Failed to load %s" % filename) + + if import_keys and not key_file: + raise RuntimeError( + "No server certificates found in %s" % (', '.join(files))) + + nss_certs = x509.load_certificate_list(extracted_certs) + nss_cert = None + for nss_cert in nss_certs: + nickname = str(nss_cert.subject) + self.add_cert(nss_cert.der_data, nickname, ',,') + del nss_certs, nss_cert + + if extracted_key: + in_file = ipautil.write_tmp_file(extracted_certs + extracted_key) + out_file = tempfile.NamedTemporaryFile() + out_password = ipautil.ipa_generate_password() + out_pwdfile = ipautil.write_tmp_file(out_password) + args = [ + paths.OPENSSL, 'pkcs12', + '-export', + '-in', in_file.name, + '-out', out_file.name, + '-passin', 'file:' + db_password_filename, + '-passout', 'file:' + out_pwdfile.name, + ] + try: + ipautil.run(args) + except ipautil.CalledProcessError as e: + raise RuntimeError( + "No matching certificate found for private key from %s" % + key_file) + + self.import_pkcs12(out_file.name, db_password_filename, + out_password) + def trust_root_cert(self, root_nickname, trust_flags=None): if root_nickname[:7] == "Builtin": root_logger.debug(