Create a Certificate parameter

Up until now, Bytes parameter was used for certificate parameters
throughout the framework. However, the Bytes parameter does nothing
special for certificates, like validation, so this had to be done
for each of the parameters which were supposed to represent a
certificate.

This commit introduces a special Certificate parameter which takes
care of certificate validation so this does not have to be done
separately. It also makes sure that the certificates represented by
this parameter are always converted to DER format so that we can work
with them in a unified manner throughout the framework.

This commit also makes it possible to pass bytes directly during
instantiation of the Certificate parameter and they are still
represented correctly after their conversion in the _convert_scalar()
method.

https://pagure.io/freeipa/issue/4985

Reviewed-By: Fraser Tweedale <ftweedal@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
Reviewed-By: Martin Basti <mbasti@redhat.com>
This commit is contained in:
Stanislav Laznicka
2017-07-03 17:10:34 +02:00
committed by Pavel Vomacka
parent 5ff1de8490
commit 5a44ca6383
19 changed files with 178 additions and 188 deletions

View File

@@ -39,7 +39,9 @@ from ipalib import ngettext
from ipalib.constants import IPA_CA_CN
from ipalib.crud import Create, PKQuery, Retrieve, Search
from ipalib.frontend import Method, Object
from ipalib.parameters import Bytes, DateTime, DNParam, DNSNameParam, Principal
from ipalib.parameters import (
Bytes, Certificate, DateTime, DNParam, DNSNameParam, Principal
)
from ipalib.plugable import Registry
from .virtual import VirtualCommand
from .baseldap import pkey_to_value
@@ -324,10 +326,6 @@ def ca_kdc_check(api_instance, hostname):
% dict(hostname=hostname))
def validate_certificate(value):
return x509.validate_der_x509_certificate(value)
def bind_principal_can_manage_cert(cert):
"""Check that the bind principal can manage the given cert.
@@ -362,11 +360,10 @@ class BaseCertObject(Object):
doc=_('Name of issuing CA'),
flags={'no_create', 'no_update', 'no_search'},
),
Bytes(
'certificate', validate_certificate,
Certificate(
'certificate',
label=_("Certificate"),
doc=_("Base-64 encoded certificate."),
normalizer=x509.ensure_der_format,
flags={'no_create', 'no_update', 'no_search'},
),
Bytes(
@@ -1438,17 +1435,7 @@ class cert_find(Search, CertMethod):
)
def _get_cert_key(self, cert):
try:
cert_obj = x509.load_der_x509_certificate(cert)
except ValueError as e:
message = messages.SearchResultTruncated(
reason=_("failed to load certificate: %s") % e,
)
self.add_message(message)
raise
return (DN(cert_obj.issuer), cert_obj.serial_number)
return (DN(cert.issuer), cert.serial_number)
def _cert_search(self, pkey_only, **options):
result = collections.OrderedDict()
@@ -1458,16 +1445,12 @@ class cert_find(Search, CertMethod):
except KeyError:
return result, False, False
try:
issuer, serial_number = self._get_cert_key(cert)
except ValueError:
return result, True, True
obj = {'serial_number': serial_number}
obj = {'serial_number': cert.serial_number}
if not pkey_only:
obj['certificate'] = base64.b64encode(cert).decode('ascii')
obj['certificate'] = base64.b64encode(
cert.public_bytes(x509.Encoding.DER)).decode('ascii')
result[issuer, serial_number] = obj
result[self._get_cert_key(cert)] = obj
return result, False, True
@@ -1570,7 +1553,8 @@ class cert_find(Search, CertMethod):
cert = options.get('certificate')
if cert is not None:
filter = ldap.make_filter_from_attr('usercertificate', cert)
filter = ldap.make_filter_from_attr(
'usercertificate', cert.public_bytes(x509.Encoding.DER))
else:
filter = '(usercertificate=*)'
filters.append(filter)
@@ -1598,20 +1582,18 @@ class cert_find(Search, CertMethod):
for entry in entries:
for attr in ('usercertificate', 'usercertificate;binary'):
for cert in entry.get(attr, []):
cert_key = self._get_cert_key(cert)
try:
issuer, serial_number = self._get_cert_key(cert)
except ValueError:
truncated = True
continue
try:
obj = result[issuer, serial_number]
obj = result[cert_key]
except KeyError:
obj = {'serial_number': serial_number}
obj = {'serial_number': cert.serial_number}
if not pkey_only and all:
obj['certificate'] = (
base64.b64encode(cert).decode('ascii'))
result[issuer, serial_number] = obj
base64.b64encode(
cert.public_bytes(x509.Encoding.DER))
.decode('ascii'))
result[cert_key] = obj
if not pkey_only and (all or not no_members):
owners = obj.setdefault('owner', [])
@@ -1695,10 +1677,7 @@ class cert_find(Search, CertMethod):
obj['certificate'].replace('\r\n', ''))
if 'certificate_chain' in ca_obj:
cert = x509.load_der_x509_certificate(
obj['certificate'])
cert_der = (
cert.public_bytes(serialization.Encoding.DER))
cert_der = base64.b64decode(obj['certificate'])
obj['certificate_chain'] = (
[cert_der] + ca_obj['certificate_chain'])