Handle missing LWCA certificate or chain

If lightweight CA key replication has not completed, requests for
the certificate or chain will return 404**.  This can occur in
normal operation, and should be a temporary condition.  Detect this
case and handle it by simply omitting the 'certificate' and/or
'certificate_out' fields in the response, and add a warning message
to the response.

Also update the client-side plugin that handles the
--certificate-out option.  Because the CLI will automatically print
the warning message, if the expected field is missing from the
response, just ignore it and continue processing.

** after the Dogtag NullPointerException gets fixed!

Part of: https://pagure.io/freeipa/issue/7964

Reviewed-By: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Fraser Tweedale <ftweedal@redhat.com>
This commit is contained in:
Fraser Tweedale
2019-05-30 20:57:10 +10:00
parent 02d6fc7474
commit 854d3053e2
3 changed files with 67 additions and 16 deletions

View File

@@ -6,7 +6,7 @@ import base64
import six
from ipalib import api, errors, output, Bytes, DNParam, Flag, Str
from ipalib import api, errors, messages, output, Bytes, DNParam, Flag, Str
from ipalib.constants import IPA_CA_CN
from ipalib.plugable import Registry
from ipapython.dn import ATTR_NAME_BY_OID
@@ -163,28 +163,53 @@ class ca(LDAPObject):
def set_certificate_attrs(entry, options, want_cert=True):
"""
Set certificate attributes into the entry. Depending on
options, this may contact Dogtag to retrieve certificate or
chain. If the retrieval fails with 404 (which can occur under
normal operation due to lightweight CA key replication delay),
return a message object that should be set in the response.
"""
try:
ca_id = entry['ipacaid'][0]
except KeyError:
return
return None
full = options.get('all', False)
want_chain = options.get('chain', False)
want_data = want_cert or want_chain or full
if not want_data:
return
return None
msg = None
with api.Backend.ra_lightweight_ca as ca_api:
if want_cert or full:
der = ca_api.read_ca_cert(ca_id)
entry['certificate'] = base64.b64encode(der).decode('ascii')
try:
der = ca_api.read_ca_cert(ca_id)
entry['certificate'] = base64.b64encode(der).decode('ascii')
except errors.HTTPRequestError as e:
if e.status == 404: # pylint: disable=no-member
msg = messages.LightweightCACertificateNotAvailable(
ca=entry['cn'][0])
else:
raise e
if want_chain or full:
pkcs7_der = ca_api.read_ca_chain(ca_id)
certs = x509.pkcs7_to_certs(pkcs7_der, x509.DER)
ders = [cert.public_bytes(x509.Encoding.DER) for cert in certs]
entry['certificate_chain'] = ders
try:
pkcs7_der = ca_api.read_ca_chain(ca_id)
certs = x509.pkcs7_to_certs(pkcs7_der, x509.DER)
ders = [cert.public_bytes(x509.Encoding.DER) for cert in certs]
entry['certificate_chain'] = ders
except errors.HTTPRequestError as e:
if e.status == 404: # pylint: disable=no-member
msg = messages.LightweightCACertificateNotAvailable(
ca=entry['cn'][0])
else:
raise e
return msg
@register()
class ca_find(LDAPSearch):
@@ -198,7 +223,9 @@ class ca_find(LDAPSearch):
result = super(ca_find, self).execute(*keys, **options)
if not options.get('pkey_only', False):
for entry in result['result']:
set_certificate_attrs(entry, options, want_cert=False)
msg = set_certificate_attrs(entry, options, want_cert=False)
if msg:
self.add_message(msg)
return result
@@ -220,7 +247,9 @@ class ca_show(LDAPRetrieve):
def execute(self, *keys, **options):
ca_enabled_check(self.api)
result = super(ca_show, self).execute(*keys, **options)
set_certificate_attrs(result['result'], options)
msg = set_certificate_attrs(result['result'], options)
if msg:
self.add_message(msg)
return result
@@ -284,7 +313,9 @@ class ca_add(LDAPCreate):
return dn
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
set_certificate_attrs(entry_attrs, options)
msg = set_certificate_attrs(entry_attrs, options)
if msg:
self.add_message(msg)
return dn