diff --git a/API.txt b/API.txt index 4b06a9692..0526d5a90 100644 --- a/API.txt +++ b/API.txt @@ -783,7 +783,7 @@ option: Str('version?') output: Output('result') command: cert_request/1 args: 1,9,3 -arg: Str('csr', cli_name='csr_file') +arg: CertificateSigningRequest('csr', cli_name='csr_file') option: Flag('add', autofill=True, default=False) option: Flag('all', autofill=True, cli_name='all', default=False) option: Str('cacn?', autofill=True, cli_name='ca', default=u'ipa') diff --git a/ipaclient/plugins/cert.py b/ipaclient/plugins/cert.py index d7011e67a..0377c511c 100644 --- a/ipaclient/plugins/cert.py +++ b/ipaclient/plugins/cert.py @@ -143,9 +143,6 @@ class cert_request(CertRetrieveOverride): raise errors.CertificateOperationError( error=(_('Generated CSR was empty'))) - # cert_request requires the CSR to be base64-encoded (but PEM - # header and footer are not required) - csr = unicode(base64.b64encode(csr)) else: if database is not None or private_key is not None: raise errors.MutuallyExclusiveError(reason=_( diff --git a/ipalib/parameters.py b/ipalib/parameters.py index fa0c813a4..d647b6b60 100644 --- a/ipalib/parameters.py +++ b/ipalib/parameters.py @@ -115,12 +115,15 @@ from ipalib.base import check_name from ipalib.plugable import ReadOnly, lock from ipalib.errors import ConversionError, RequirementError, ValidationError from ipalib.errors import ( - PasswordMismatch, Base64DecodeError, CertificateFormatError + PasswordMismatch, Base64DecodeError, CertificateFormatError, + CertificateOperationError ) from ipalib.constants import TYPE_ERROR, CALLABLE_ERROR, LDAP_GENERALIZED_TIME_FORMAT from ipalib.text import Gettext, FixMe from ipalib.util import json_serialize, validate_idna_domain -from ipalib.x509 import load_der_x509_certificate, IPACertificate +from ipalib.x509 import ( + load_der_x509_certificate, IPACertificate, default_backend) +from ipalib.pkcs10 import strip_header as strip_csr_header from ipapython import kerberos from ipapython.dn import DN from ipapython.dnsutil import DNSName @@ -1452,6 +1455,60 @@ class Certificate(Param): return super(Certificate, self)._convert_scalar(value) +class CertificateSigningRequest(Param): + type = crypto_x509.CertificateSigningRequest + type_error = _('must be a certificate signing request') + allowed_types = (crypto_x509.CertificateSigningRequest, bytes, unicode) + + def __extract_der_from_input(self, value): + """ + Tries to get the DER representation of whatever we receive as an input + + :param value: + bytes instance containing something we hope is a certificate + signing request + :returns: + base64-decoded representation of whatever we found in case input + had been something else than DER or something which resembles + DER, in which case we would just return input + """ + try: + value.decode('utf-8') + except UnicodeDecodeError: + # possibly DER-encoded CSR or something similar + return value + + value = strip_csr_header(value) + return base64.b64decode(value) + + def _convert_scalar(self, value, index=None): + """ + :param value: + either DER csr, base64-encoded csr or an object implementing the + cryptography.CertificateSigningRequest interface + :returns: + an object with the cryptography.CertificateSigningRequest interface + """ + if isinstance(value, unicode): + try: + value = value.encode('ascii') + except UnicodeDecodeError: + raise CertificateOperationError('not a valid CSR') + + if isinstance(value, bytes): + # try to extract DER from whatever we got + value = self.__extract_der_from_input(value) + try: + value = crypto_x509.load_der_x509_csr( + value, backend=default_backend()) + except ValueError as e: + raise CertificateOperationError( + error=_("Failure decoding Certificate Signing Request:" + " %s") % e) + + return super(CertificateSigningRequest, self)._convert_scalar(value) + + class Str(Data): """ A parameter for Unicode text (stored in the ``unicode`` type). diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py index 39ec95c68..03d2cb367 100644 --- a/ipalib/pkcs10.py +++ b/ipalib/pkcs10.py @@ -29,13 +29,13 @@ def strip_header(csr): Remove the header and footer (and surrounding material) from a CSR. """ headerlen = 40 - s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----") + s = csr.find(b"-----BEGIN NEW CERTIFICATE REQUEST-----") if s == -1: headerlen = 36 - s = csr.find("-----BEGIN CERTIFICATE REQUEST-----") + s = csr.find(b"-----BEGIN CERTIFICATE REQUEST-----") if s >= 0: - e = csr.find("-----END") - csr = csr[s+headerlen:e] + e = csr.find(b"-----END") + csr = csr[s + headerlen:e] return csr diff --git a/ipalib/rpc.py b/ipalib/rpc.py index 0c2f98176..de3dd9d16 100644 --- a/ipalib/rpc.py +++ b/ipalib/rpc.py @@ -197,6 +197,10 @@ def xml_wrap(value, version): return base64.b64encode( value.public_bytes(x509_Encoding.DER)).decode('ascii') + if isinstance(value, crypto_x509.CertificateSigningRequest): + return base64.b64encode( + value.public_bytes(x509_Encoding.DER)).decode('ascii') + assert type(value) in (unicode, float, bool, type(None)) + six.integer_types return value @@ -325,6 +329,7 @@ class _JSONPrimer(dict): tuple: self._enc_list, dict: self._enc_dict, crypto_x509.Certificate: self._enc_certificate, + crypto_x509.CertificateSigningRequest: self._enc_certificate, }) # int, long for t in six.integer_types: diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index 533972455..946c9ef94 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -409,11 +409,11 @@ class CertDB(object): if self.host_name is None: raise RuntimeError("CA Host is not set.") - with open(certreq_fname, "r") as f: + with open(certreq_fname, "rb") as f: csr = f.read() - # We just want the CSR bits, make sure there is nothing else - csr = pkcs10.strip_header(csr) + # We just want the CSR bits, make sure there is no thing else + csr = pkcs10.strip_header(csr).decode('utf8') params = {'profileId': dogtag.DEFAULT_PROFILE, 'cert_request_type': 'pkcs10', @@ -461,11 +461,12 @@ class CertDB(object): if self.host_name is None: raise RuntimeError("CA Host is not set.") - with open(certreq_fname, "r") as f: + with open(certreq_fname, "rb") as f: csr = f.read() # We just want the CSR bits, make sure there is no thing else - csr = pkcs10.strip_header(csr) + csr = pkcs10.strip_header(csr).decode('utf8') + params = {'profileId': 'caJarSigningCert', 'cert_request_type': 'pkcs10', diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index 202d92dfd..38314cd0c 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -24,7 +24,6 @@ import collections import datetime import logging from operator import attrgetter -import os import cryptography.x509 from cryptography.hazmat.primitives import hashes, serialization @@ -33,14 +32,14 @@ import six from ipalib import Command, Str, Int, Flag from ipalib import api from ipalib import errors, messages -from ipalib import pkcs10 from ipalib import x509 from ipalib import ngettext from ipalib.constants import IPA_CA_CN from ipalib.crud import Create, PKQuery, Retrieve, Search from ipalib.frontend import Method, Object from ipalib.parameters import ( - Bytes, Certificate, DateTime, DNParam, DNSNameParam, Principal + Bytes, Certificate, CertificateSigningRequest, DateTime, DNParam, + DNSNameParam, Principal ) from ipalib.plugable import Registry from .virtual import VirtualCommand @@ -254,22 +253,6 @@ def convert_pkidatetime(value): return x509.format_datetime(value) -def validate_csr(ugettext, csr): - """ - Ensure the CSR is base64-encoded and can be decoded by our PKCS#10 - parser. - """ - if api.env.context == 'cli': - # If we are passed in a pointer to a valid file on the client side - # escape and let the load_files() handle things - if csr and os.path.exists(csr): - return - try: - pkcs10.load_certificate_request(csr) - except (TypeError, ValueError) as e: - raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % e) - - def normalize_serial_number(num): """ Convert a SN given in decimal or hexadecimal. @@ -616,11 +599,10 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): attr_name = 'request' takes_args = ( - Str( - 'csr', validate_csr, + CertificateSigningRequest( + 'csr', label=_('CSR'), cli_name='csr_file', - noextrawhitespace=False, ), ) operation="request certificate" @@ -725,13 +707,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): caacl_check(principal, ca, profile_id) try: - csr_obj = pkcs10.load_certificate_request(csr) - except ValueError as e: - raise errors.CertificateOperationError( - error=_("Failure decoding Certificate Signing Request: %s") % e) - - try: - ext_san = csr_obj.extensions.get_extension_for_oid( + ext_san = csr.extensions.get_extension_for_oid( cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) except cryptography.x509.extensions.ExtensionNotFound: ext_san = None @@ -739,7 +715,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): # Ensure that the DN in the CSR matches the principal # # We only look at the "most specific" CN value - cns = csr_obj.subject.get_attributes_for_oid( + cns = csr.subject.get_attributes_for_oid( cryptography.x509.oid.NameOID.COMMON_NAME) if len(cns) == 0: raise errors.ValidationError(name='csr', @@ -772,7 +748,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): # check email address # # fail if any email addr from DN does not appear in ldap entry - email_addrs = csr_obj.subject.get_attributes_for_oid( + email_addrs = csr.subject.get_attributes_for_oid( cryptography.x509.oid.NameOID.EMAIL_ADDRESS) csr_emails = [attr.value for attr in email_addrs] if not _emails_are_valid(csr_emails, @@ -888,7 +864,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): # re-serialise to PEM, in case the user-supplied data has # extraneous material that will cause Dogtag to freak out # keep it as string not bytes, it is required later - csr_pem = csr_obj.public_bytes( + csr_pem = csr.public_bytes( serialization.Encoding.PEM).decode('utf-8') result = self.Backend.ra.request_certificate( csr_pem, profile_id, ca_id, request_type=request_type)