mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
ra.get_certificate: use REST API
Update ra.get_certificate to use the Dogtag REST API. This change is being done as part of the Dogtag GSS-API authentication effort because the servlet-based method expects an internal Dogtag user. It is less intrusive to just change FreeIPA to call the REST API instead (which is also part of an existing ticket). Depends on https://pagure.io/dogtagpki/issue/2601 (which was merged and released long ago). Part of: https://pagure.io/freeipa/issue/3473 Part of: https://pagure.io/freeipa/issue/5011 Reviewed-By: Christian Heimes <cheimes@redhat.com>
This commit is contained in:
parent
0c0061babd
commit
d7f3a0b2d3
@ -63,9 +63,13 @@ if six.PY3:
|
||||
PEM = 0
|
||||
DER = 1
|
||||
|
||||
# The first group is the whole PEM datum and the second group is
|
||||
# the base64 content (with newlines). For findall() the result is
|
||||
# a list of 2-tuples of the PEM and base64 data.
|
||||
PEM_CERT_REGEX = re.compile(
|
||||
b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
|
||||
b'(-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----)',
|
||||
re.DOTALL)
|
||||
|
||||
PEM_PRIV_REGEX = re.compile(
|
||||
b'-----BEGIN(?: ENCRYPTED)?(?: (?:RSA|DSA|DH|EC))? PRIVATE KEY-----.*?'
|
||||
b'-----END(?: ENCRYPTED)?(?: (?:RSA|DSA|DH|EC))? PRIVATE KEY-----',
|
||||
@ -447,7 +451,7 @@ def load_certificate_list(data):
|
||||
Return a list of python-cryptography ``Certificate`` objects.
|
||||
"""
|
||||
certs = PEM_CERT_REGEX.findall(data)
|
||||
return [load_pem_x509_certificate(cert) for cert in certs]
|
||||
return [load_pem_x509_certificate(cert[0]) for cert in certs]
|
||||
|
||||
|
||||
def load_certificate_list_from_file(filename):
|
||||
|
@ -250,7 +250,7 @@ import contextlib
|
||||
|
||||
import six
|
||||
|
||||
from ipalib import Backend, api
|
||||
from ipalib import Backend, api, x509
|
||||
from ipapython.dn import DN
|
||||
import ipapython.cookie
|
||||
from ipapython import dogtag, ipautil, certdb
|
||||
@ -686,118 +686,6 @@ def parse_check_request_result_xml(doc):
|
||||
|
||||
return response
|
||||
|
||||
def parse_display_cert_xml(doc):
|
||||
'''
|
||||
:param doc: The root node of the xml document to parse
|
||||
:returns: result dict
|
||||
:except ValueError:
|
||||
|
||||
After parsing the results are returned in a result dict. The following
|
||||
table illustrates the mapping from the CMS data item to what may be found in
|
||||
the result dict. If a CMS data item is absent it will also be absent in the
|
||||
result dict.
|
||||
|
||||
If the requestStatus is not SUCCESS then the response dict will have the
|
||||
contents described in `parse_error_template_xml`.
|
||||
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|cms name |cms type |result name |result type |
|
||||
+================+===============+=================+===============+
|
||||
|emailCert |Boolean |email_cert |bool |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|noCertImport |Boolean |no_cert_import |bool |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|revocationReason|int |revocation_reason|int [1]_ |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|certPrettyPrint |string |cert_pretty |unicode |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|authorityid |string |authority |unicode |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|certFingerprint |string |fingerprint |unicode |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|certChainBase64 |string |certificate |unicode [2]_ |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|serialNumber |string |serial_number |int|long |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|pkcs7ChainBase64|string |pkcs7_chain |unicode [2]_ |
|
||||
+----------------+---------------+-----------------+---------------+
|
||||
|
||||
.. [1] revocation reason may be one of:
|
||||
|
||||
- 0 = UNSPECIFIED
|
||||
- 1 = KEY_COMPROMISE
|
||||
- 2 = CA_COMPROMISE
|
||||
- 3 = AFFILIATION_CHANGED
|
||||
- 4 = SUPERSEDED
|
||||
- 5 = CESSATION_OF_OPERATION
|
||||
- 6 = CERTIFICATE_HOLD
|
||||
- 8 = REMOVE_FROM_CRL
|
||||
- 9 = PRIVILEGE_WITHDRAWN
|
||||
- 10 = AA_COMPROMISE
|
||||
|
||||
.. [2] Base64 encoded
|
||||
|
||||
'''
|
||||
|
||||
request_status = get_request_status_xml(doc)
|
||||
|
||||
if request_status != CMS_STATUS_SUCCESS:
|
||||
response = parse_error_template_xml(doc)
|
||||
return response
|
||||
|
||||
response = {}
|
||||
response['request_status'] = request_status
|
||||
|
||||
email_cert = doc.xpath('//xml/header/emailCert[1]')
|
||||
if len(email_cert) == 1:
|
||||
parse_and_set_boolean_xml(email_cert[0], response, 'email_cert')
|
||||
|
||||
no_cert_import = doc.xpath('//xml/header/noCertImport[1]')
|
||||
if len(no_cert_import) == 1:
|
||||
parse_and_set_boolean_xml(no_cert_import[0], response, 'no_cert_import')
|
||||
|
||||
revocation_reason = doc.xpath('//xml/header/revocationReason[1]')
|
||||
if len(revocation_reason) == 1:
|
||||
revocation_reason = int(revocation_reason[0].text)
|
||||
response['revocation_reason'] = revocation_reason
|
||||
|
||||
cert_pretty = doc.xpath('//xml/header/certPrettyPrint[1]')
|
||||
if len(cert_pretty) == 1:
|
||||
cert_pretty = etree.tostring(cert_pretty[0], method='text',
|
||||
encoding=unicode).strip()
|
||||
response['cert_pretty'] = cert_pretty
|
||||
|
||||
authority = doc.xpath('//xml/header/authorityid[1]')
|
||||
if len(authority) == 1:
|
||||
authority = etree.tostring(authority[0], method='text',
|
||||
encoding=unicode).strip()
|
||||
response['authority'] = authority
|
||||
|
||||
fingerprint = doc.xpath('//xml/header/certFingerprint[1]')
|
||||
if len(fingerprint) == 1:
|
||||
fingerprint = etree.tostring(fingerprint[0], method='text',
|
||||
encoding=unicode).strip()
|
||||
response['fingerprint'] = fingerprint
|
||||
|
||||
certificate = doc.xpath('//xml/header/certChainBase64[1]')
|
||||
if len(certificate) == 1:
|
||||
certificate = etree.tostring(certificate[0], method='text',
|
||||
encoding=unicode).strip()
|
||||
response['certificate'] = certificate
|
||||
|
||||
serial_number = doc.xpath('//xml/header/serialNumber[1]')
|
||||
if len(serial_number) == 1:
|
||||
serial_number = int(serial_number[0].text, 16) # parse as hex
|
||||
response['serial_number'] = serial_number
|
||||
response['serial_number_hex'] = u'0x%X' % serial_number
|
||||
|
||||
pkcs7_chain = doc.xpath('//xml/header/pkcs7ChainBase64[1]')
|
||||
if len(pkcs7_chain) == 1:
|
||||
pkcs7_chain = etree.tostring(pkcs7_chain[0], method='text',
|
||||
encoding=unicode).strip()
|
||||
response['pkcs7_chain'] = pkcs7_chain
|
||||
|
||||
return response
|
||||
|
||||
def parse_revoke_cert_xml(doc):
|
||||
'''
|
||||
@ -1532,13 +1420,10 @@ class ra(rabase.rabase, RestClient):
|
||||
"""
|
||||
Retrieve an existing certificate.
|
||||
|
||||
:param serial_number: Certificate serial number. Must be a string value
|
||||
because serial numbers may be of any magnitude and
|
||||
XMLRPC cannot handle integers larger than 64-bit.
|
||||
The string value should be decimal, but may optionally
|
||||
be prefixed with a hex radix prefix if the integral value
|
||||
is represented as hexadecimal. If no radix prefix is
|
||||
supplied the string will be interpreted as decimal.
|
||||
:param serial_number: Certificate serial number. May be int,
|
||||
decimal string, or hex string with "0x"
|
||||
prefix.
|
||||
|
||||
|
||||
The command returns a dict with these possible key/value pairs.
|
||||
Some key/value pairs may be absent.
|
||||
@ -1575,49 +1460,44 @@ class ra(rabase.rabase, RestClient):
|
||||
"""
|
||||
logger.debug('%s.get_certificate()', type(self).__name__)
|
||||
|
||||
# Convert serial number to integral type from string to properly handle
|
||||
# radix issues. Note: the int object constructor will properly handle large
|
||||
# magnitude integral values by returning a Python long type when necessary.
|
||||
serial_number = int(serial_number, 0)
|
||||
|
||||
# Call CMS
|
||||
http_status, _http_headers, http_body = (
|
||||
self._sslget('/ca/agent/ca/displayBySerial',
|
||||
self.env.ca_agent_port,
|
||||
serialNumber=str(serial_number),
|
||||
xml='true')
|
||||
path = 'certs/{}'.format(serial_number)
|
||||
_http_status, _http_headers, http_body = self._ssldo(
|
||||
'GET', path, use_session=False,
|
||||
headers={
|
||||
'Accept': 'application/json',
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# Parse and handle errors
|
||||
if http_status != 200:
|
||||
self.raise_certificate_operation_error('get_certificate',
|
||||
detail=http_status)
|
||||
|
||||
parse_result = self.get_parse_result_xml(http_body, parse_display_cert_xml)
|
||||
request_status = parse_result['request_status']
|
||||
if request_status != CMS_STATUS_SUCCESS:
|
||||
self.raise_certificate_operation_error('get_certificate',
|
||||
cms_request_status_to_string(request_status),
|
||||
parse_result.get('error_string'))
|
||||
try:
|
||||
resp = json.loads(ipautil.decode_json(http_body))
|
||||
except ValueError:
|
||||
raise errors.RemoteRetrieveError(
|
||||
reason=_("Response from CA was not valid JSON"))
|
||||
|
||||
# Return command result
|
||||
cmd_result = {}
|
||||
|
||||
if 'certificate' in parse_result:
|
||||
cmd_result['certificate'] = parse_result['certificate']
|
||||
if 'Encoded' in resp:
|
||||
s = resp['Encoded']
|
||||
# The 'cert' plugin expects the result to be base64-encoded
|
||||
# X.509 DER. We expect the result to be PEM. We have to
|
||||
# strip the PEM headers and we use PEM_CERT_REGEX to do it.
|
||||
match = x509.PEM_CERT_REGEX.search(s.encode('utf-8'))
|
||||
if match:
|
||||
s = match.group(2).decode('utf-8')
|
||||
cmd_result['certificate'] = s.strip()
|
||||
|
||||
if 'serial_number' in parse_result:
|
||||
# see module documentation concerning serial numbers and XMLRPC
|
||||
cmd_result['serial_number'] = unicode(parse_result['serial_number'])
|
||||
cmd_result['serial_number_hex'] = u'0x%X' % int(cmd_result['serial_number'])
|
||||
if 'id' in resp:
|
||||
serial = int(resp['id'], 0)
|
||||
cmd_result['serial_number'] = unicode(serial)
|
||||
cmd_result['serial_number_hex'] = u'0x%X' % serial
|
||||
|
||||
if 'revocation_reason' in parse_result:
|
||||
cmd_result['revocation_reason'] = parse_result['revocation_reason']
|
||||
if 'RevocationReason' in resp:
|
||||
cmd_result['revocation_reason'] = resp['RevocationReason']
|
||||
|
||||
return cmd_result
|
||||
|
||||
|
||||
def request_certificate(
|
||||
self, csr, profile_id, ca_id, request_type='pkcs10'):
|
||||
"""
|
||||
|
Loading…
Reference in New Issue
Block a user