parameters: introduce CertificateSigningRequest

Previously, CSRs were handled as a Str parameter which brought
trouble to Python 3 because of its more strict type requirements.
We introduce a CertificateSigningRequest parameter which allows to
use python-cryptography x509.CertificateSigningRequest to represent
CSRs in the framework.

https://pagure.io/freeipa/issue/7131
This commit is contained in:
Stanislav Laznicka 2017-09-22 14:52:36 +02:00
parent 26d721e6ea
commit 61605d28d8
7 changed files with 83 additions and 47 deletions

View File

@ -783,7 +783,7 @@ option: Str('version?')
output: Output('result')
command: cert_request/1
args: 1,9,3
arg: Str('csr', cli_name='csr_file')
arg: CertificateSigningRequest('csr', cli_name='csr_file')
option: Flag('add', autofill=True, default=False)
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', autofill=True, cli_name='ca', default=u'ipa')

View File

@ -143,9 +143,6 @@ class cert_request(CertRetrieveOverride):
raise errors.CertificateOperationError(
error=(_('Generated CSR was empty')))
# cert_request requires the CSR to be base64-encoded (but PEM
# header and footer are not required)
csr = unicode(base64.b64encode(csr))
else:
if database is not None or private_key is not None:
raise errors.MutuallyExclusiveError(reason=_(

View File

@ -115,12 +115,15 @@ from ipalib.base import check_name
from ipalib.plugable import ReadOnly, lock
from ipalib.errors import ConversionError, RequirementError, ValidationError
from ipalib.errors import (
PasswordMismatch, Base64DecodeError, CertificateFormatError
PasswordMismatch, Base64DecodeError, CertificateFormatError,
CertificateOperationError
)
from ipalib.constants import TYPE_ERROR, CALLABLE_ERROR, LDAP_GENERALIZED_TIME_FORMAT
from ipalib.text import Gettext, FixMe
from ipalib.util import json_serialize, validate_idna_domain
from ipalib.x509 import load_der_x509_certificate, IPACertificate
from ipalib.x509 import (
load_der_x509_certificate, IPACertificate, default_backend)
from ipalib.pkcs10 import strip_header as strip_csr_header
from ipapython import kerberos
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
@ -1452,6 +1455,60 @@ class Certificate(Param):
return super(Certificate, self)._convert_scalar(value)
class CertificateSigningRequest(Param):
type = crypto_x509.CertificateSigningRequest
type_error = _('must be a certificate signing request')
allowed_types = (crypto_x509.CertificateSigningRequest, bytes, unicode)
def __extract_der_from_input(self, value):
"""
Tries to get the DER representation of whatever we receive as an input
:param value:
bytes instance containing something we hope is a certificate
signing request
:returns:
base64-decoded representation of whatever we found in case input
had been something else than DER or something which resembles
DER, in which case we would just return input
"""
try:
value.decode('utf-8')
except UnicodeDecodeError:
# possibly DER-encoded CSR or something similar
return value
value = strip_csr_header(value)
return base64.b64decode(value)
def _convert_scalar(self, value, index=None):
"""
:param value:
either DER csr, base64-encoded csr or an object implementing the
cryptography.CertificateSigningRequest interface
:returns:
an object with the cryptography.CertificateSigningRequest interface
"""
if isinstance(value, unicode):
try:
value = value.encode('ascii')
except UnicodeDecodeError:
raise CertificateOperationError('not a valid CSR')
if isinstance(value, bytes):
# try to extract DER from whatever we got
value = self.__extract_der_from_input(value)
try:
value = crypto_x509.load_der_x509_csr(
value, backend=default_backend())
except ValueError as e:
raise CertificateOperationError(
error=_("Failure decoding Certificate Signing Request:"
" %s") % e)
return super(CertificateSigningRequest, self)._convert_scalar(value)
class Str(Data):
"""
A parameter for Unicode text (stored in the ``unicode`` type).

View File

@ -29,13 +29,13 @@ def strip_header(csr):
Remove the header and footer (and surrounding material) from a CSR.
"""
headerlen = 40
s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----")
s = csr.find(b"-----BEGIN NEW CERTIFICATE REQUEST-----")
if s == -1:
headerlen = 36
s = csr.find("-----BEGIN CERTIFICATE REQUEST-----")
s = csr.find(b"-----BEGIN CERTIFICATE REQUEST-----")
if s >= 0:
e = csr.find("-----END")
csr = csr[s+headerlen:e]
e = csr.find(b"-----END")
csr = csr[s + headerlen:e]
return csr

View File

@ -197,6 +197,10 @@ def xml_wrap(value, version):
return base64.b64encode(
value.public_bytes(x509_Encoding.DER)).decode('ascii')
if isinstance(value, crypto_x509.CertificateSigningRequest):
return base64.b64encode(
value.public_bytes(x509_Encoding.DER)).decode('ascii')
assert type(value) in (unicode, float, bool, type(None)) + six.integer_types
return value
@ -325,6 +329,7 @@ class _JSONPrimer(dict):
tuple: self._enc_list,
dict: self._enc_dict,
crypto_x509.Certificate: self._enc_certificate,
crypto_x509.CertificateSigningRequest: self._enc_certificate,
})
# int, long
for t in six.integer_types:

View File

@ -409,11 +409,11 @@ class CertDB(object):
if self.host_name is None:
raise RuntimeError("CA Host is not set.")
with open(certreq_fname, "r") as f:
with open(certreq_fname, "rb") as f:
csr = f.read()
# We just want the CSR bits, make sure there is nothing else
csr = pkcs10.strip_header(csr)
# We just want the CSR bits, make sure there is no thing else
csr = pkcs10.strip_header(csr).decode('utf8')
params = {'profileId': dogtag.DEFAULT_PROFILE,
'cert_request_type': 'pkcs10',
@ -461,11 +461,12 @@ class CertDB(object):
if self.host_name is None:
raise RuntimeError("CA Host is not set.")
with open(certreq_fname, "r") as f:
with open(certreq_fname, "rb") as f:
csr = f.read()
# We just want the CSR bits, make sure there is no thing else
csr = pkcs10.strip_header(csr)
csr = pkcs10.strip_header(csr).decode('utf8')
params = {'profileId': 'caJarSigningCert',
'cert_request_type': 'pkcs10',

View File

@ -24,7 +24,6 @@ import collections
import datetime
import logging
from operator import attrgetter
import os
import cryptography.x509
from cryptography.hazmat.primitives import hashes, serialization
@ -33,14 +32,14 @@ import six
from ipalib import Command, Str, Int, Flag
from ipalib import api
from ipalib import errors, messages
from ipalib import pkcs10
from ipalib import x509
from ipalib import ngettext
from ipalib.constants import IPA_CA_CN
from ipalib.crud import Create, PKQuery, Retrieve, Search
from ipalib.frontend import Method, Object
from ipalib.parameters import (
Bytes, Certificate, DateTime, DNParam, DNSNameParam, Principal
Bytes, Certificate, CertificateSigningRequest, DateTime, DNParam,
DNSNameParam, Principal
)
from ipalib.plugable import Registry
from .virtual import VirtualCommand
@ -254,22 +253,6 @@ def convert_pkidatetime(value):
return x509.format_datetime(value)
def validate_csr(ugettext, csr):
"""
Ensure the CSR is base64-encoded and can be decoded by our PKCS#10
parser.
"""
if api.env.context == 'cli':
# If we are passed in a pointer to a valid file on the client side
# escape and let the load_files() handle things
if csr and os.path.exists(csr):
return
try:
pkcs10.load_certificate_request(csr)
except (TypeError, ValueError) as e:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % e)
def normalize_serial_number(num):
"""
Convert a SN given in decimal or hexadecimal.
@ -616,11 +599,10 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
attr_name = 'request'
takes_args = (
Str(
'csr', validate_csr,
CertificateSigningRequest(
'csr',
label=_('CSR'),
cli_name='csr_file',
noextrawhitespace=False,
),
)
operation="request certificate"
@ -725,13 +707,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
caacl_check(principal, ca, profile_id)
try:
csr_obj = pkcs10.load_certificate_request(csr)
except ValueError as e:
raise errors.CertificateOperationError(
error=_("Failure decoding Certificate Signing Request: %s") % e)
try:
ext_san = csr_obj.extensions.get_extension_for_oid(
ext_san = csr.extensions.get_extension_for_oid(
cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
except cryptography.x509.extensions.ExtensionNotFound:
ext_san = None
@ -739,7 +715,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
# Ensure that the DN in the CSR matches the principal
#
# We only look at the "most specific" CN value
cns = csr_obj.subject.get_attributes_for_oid(
cns = csr.subject.get_attributes_for_oid(
cryptography.x509.oid.NameOID.COMMON_NAME)
if len(cns) == 0:
raise errors.ValidationError(name='csr',
@ -772,7 +748,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
# check email address
#
# fail if any email addr from DN does not appear in ldap entry
email_addrs = csr_obj.subject.get_attributes_for_oid(
email_addrs = csr.subject.get_attributes_for_oid(
cryptography.x509.oid.NameOID.EMAIL_ADDRESS)
csr_emails = [attr.value for attr in email_addrs]
if not _emails_are_valid(csr_emails,
@ -888,7 +864,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
# re-serialise to PEM, in case the user-supplied data has
# extraneous material that will cause Dogtag to freak out
# keep it as string not bytes, it is required later
csr_pem = csr_obj.public_bytes(
csr_pem = csr.public_bytes(
serialization.Encoding.PEM).decode('utf-8')
result = self.Backend.ra.request_certificate(
csr_pem, profile_id, ca_id, request_type=request_type)