mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
cert: include certificate chain in cert command output
Include the full certificate chain in the output of cert-request, cert-show and cert-find if --chain or --all is specified. If output file is specified in the CLI together with --chain, the full certificate chain is written to the file. https://pagure.io/freeipa/issue/6547 Reviewed-By: David Kupka <dkupka@redhat.com>
This commit is contained in:
committed by
David Kupka
parent
c60d9c9744
commit
8ed891cb61
@@ -267,6 +267,12 @@ class BaseCertObject(Object):
|
||||
normalizer=x509.normalize_certificate,
|
||||
flags={'no_create', 'no_update', 'no_search'},
|
||||
),
|
||||
Bytes(
|
||||
'certificate_chain*',
|
||||
label=_("Certificate chain"),
|
||||
doc=_("X.509 certificate chain"),
|
||||
flags={'no_create', 'no_update', 'no_search'},
|
||||
),
|
||||
DNParam(
|
||||
'subject',
|
||||
label=_('Subject'),
|
||||
@@ -495,6 +501,13 @@ class certreq(BaseCertObject):
|
||||
)
|
||||
|
||||
|
||||
_chain_flag = Flag(
|
||||
'chain',
|
||||
default=False,
|
||||
doc=_('Include certificate chain in output'),
|
||||
)
|
||||
|
||||
|
||||
@register()
|
||||
class cert_request(Create, BaseCertMethod, VirtualCommand):
|
||||
__doc__ = _('Submit a certificate signing request.')
|
||||
@@ -526,6 +539,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
|
||||
"automatically add the principal if it doesn't exist "
|
||||
"(service principals only)"),
|
||||
),
|
||||
_chain_flag,
|
||||
)
|
||||
|
||||
def get_args(self):
|
||||
@@ -535,7 +549,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
|
||||
continue
|
||||
yield arg
|
||||
|
||||
def execute(self, csr, all=False, raw=False, **kw):
|
||||
def execute(self, csr, all=False, raw=False, chain=False, **kw):
|
||||
ca_enabled_check(self.api)
|
||||
|
||||
ldap = self.api.Backend.ldap2
|
||||
@@ -549,7 +563,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
|
||||
# referencing nonexistant CA) and look up authority ID.
|
||||
#
|
||||
ca = kw['cacn']
|
||||
ca_obj = api.Command.ca_show(ca)['result']
|
||||
ca_obj = api.Command.ca_show(ca, all=all, chain=chain)['result']
|
||||
ca_id = ca_obj['ipacaid'][0]
|
||||
|
||||
"""
|
||||
@@ -823,6 +837,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
|
||||
self.log.error("Profiles used to store cert should't be "
|
||||
"used for krbtgt certificates")
|
||||
|
||||
if 'certificate_chain' in ca_obj:
|
||||
cert = x509.load_certificate(result['certificate'])
|
||||
cert = cert.public_bytes(serialization.Encoding.DER)
|
||||
result['certificate_chain'] = [cert] + ca_obj['certificate_chain']
|
||||
|
||||
return dict(
|
||||
result=result,
|
||||
value=pkey_to_value(int(result['request_id']), kw),
|
||||
@@ -999,12 +1018,13 @@ class cert_show(Retrieve, CertMethod, VirtualCommand):
|
||||
doc=_('File to store the certificate in.'),
|
||||
exclude='webui',
|
||||
),
|
||||
_chain_flag,
|
||||
)
|
||||
|
||||
operation="retrieve certificate"
|
||||
|
||||
def execute(self, serial_number, all=False, raw=False, no_members=False,
|
||||
**options):
|
||||
chain=False, **options):
|
||||
ca_enabled_check(self.api)
|
||||
|
||||
# Dogtag lightweight CAs have shared serial number domain, so
|
||||
@@ -1020,7 +1040,11 @@ class cert_show(Retrieve, CertMethod, VirtualCommand):
|
||||
if not bind_principal_can_manage_cert(cert):
|
||||
raise acierr # pylint: disable=E0702
|
||||
|
||||
ca_obj = api.Command.ca_show(options['cacn'])['result']
|
||||
ca_obj = api.Command.ca_show(
|
||||
options['cacn'],
|
||||
all=all,
|
||||
chain=chain,
|
||||
)['result']
|
||||
if DN(cert.issuer) != DN(ca_obj['ipacasubjectdn'][0]):
|
||||
# DN of cert differs from what we requested
|
||||
raise errors.NotFound(
|
||||
@@ -1028,10 +1052,11 @@ class cert_show(Retrieve, CertMethod, VirtualCommand):
|
||||
"issued by CA '%(ca)s' not found")
|
||||
% dict(serial=serial_number, ca=options['cacn']))
|
||||
|
||||
der_cert = base64.b64decode(result['certificate'])
|
||||
|
||||
if all or not no_members:
|
||||
ldap = self.api.Backend.ldap2
|
||||
filter = ldap.make_filter_from_attr(
|
||||
'usercertificate', base64.b64decode(result['certificate']))
|
||||
filter = ldap.make_filter_from_attr('usercertificate', der_cert)
|
||||
try:
|
||||
entries = ldap.get_entries(base_dn=self.api.env.basedn,
|
||||
filter=filter,
|
||||
@@ -1048,6 +1073,10 @@ class cert_show(Retrieve, CertMethod, VirtualCommand):
|
||||
self.obj._fill_owners(result)
|
||||
result['cacn'] = ca_obj['cn'][0]
|
||||
|
||||
if 'certificate_chain' in ca_obj:
|
||||
result['certificate_chain'] = (
|
||||
[der_cert] + ca_obj['certificate_chain'])
|
||||
|
||||
return dict(result=result, value=pkey_to_value(serial_number, options))
|
||||
|
||||
|
||||
@@ -1319,7 +1348,11 @@ class cert_find(Search, CertMethod):
|
||||
raise
|
||||
return result, False, complete
|
||||
|
||||
ca_objs = self.api.Command.ca_find(timelimit=0, sizelimit=0)['result']
|
||||
ca_objs = self.api.Command.ca_find(
|
||||
all=all,
|
||||
timelimit=0,
|
||||
sizelimit=0,
|
||||
)['result']
|
||||
ca_objs = {DN(ca['ipacasubjectdn'][0]): ca for ca in ca_objs}
|
||||
|
||||
ra = self.api.Backend.ra
|
||||
@@ -1354,6 +1387,12 @@ class cert_find(Search, CertMethod):
|
||||
obj['certificate'].replace('\r\n', ''))
|
||||
self.obj._parse(obj)
|
||||
|
||||
if 'certificate_chain' in ca_obj:
|
||||
cert = x509.load_certificate(obj['certificate'])
|
||||
cert_der = cert.public_bytes(serialization.Encoding.DER)
|
||||
obj['certificate_chain'] = (
|
||||
[cert_der] + ca_obj['certificate_chain'])
|
||||
|
||||
obj['cacn'] = ca_obj['cn'][0]
|
||||
|
||||
result[issuer, serial_number] = obj
|
||||
|
||||
Reference in New Issue
Block a user