More work on ra.check_request_status() and ra.get_certificate()

This commit is contained in:
Jason Gerard DeRose 2009-02-10 00:15:53 -07:00 committed by Rob Crittenden
parent 97c04c491b
commit 1518772d75

View File

@ -89,9 +89,9 @@ class ra(Backend):
(status, reason) = (response.status, response.reason)
data = response.read()
conn.close()
self.debug('CA response status: %r', status)
self.debug('CA response reason: %r', reason)
self.debug('CA response data: %r', data)
self.debug('response status: %r', status)
self.debug('response reason: %r', reason)
#self.debug('response data: %r', data)
return (status, reason, data)
def check_request_status(self, request_id):
@ -101,64 +101,95 @@ class ra(Backend):
:param request_id: request ID
"""
self.debug('IPA-RA: check_request_status')
return_values = {}
if request_id is not None:
(s, r, data) = self._request('checkRequest',
requestId=request_id,
xmlOutput='true',
(s, r, data) = self._request('checkRequest',
requestId=request_id,
xmlOutput='true',
)
response = {'status': '2'}
if data is not None:
request_status = self.__find_substring(
data, 'header.status = "', '"'
)
if data is not None:
request_status = self.__find_substring(data, 'header.status = "', '"')
if request_status is not None:
return_values["status"] = "0"
return_values["request_status"] = request_status
self.debug("IPA-RA: request_status: '%s'" % request_status)
serial_number = self.__find_substring(data, 'record.serialNumber="', '"')
if serial_number is not None:
return_values["serial_number"] = "0x"+serial_number
request_id = self.__find_substring(data, 'header.requestId = "', '"')
if request_id is not None:
return_values["request_id"] = request_id
error = self.__find_substring(data, 'fixed.unexpectedError = "', '"')
if error is not None:
return_values["error"] = error
if return_values.has_key("status") is False:
return_values["status"] = "2"
else:
return_values["status"] = "1"
return return_values
if request_status is not None:
response['status'] = '0'
response['request_status'] = request_status
serial_number = self.__find_substring(
data, 'record.serialNumber="', '"'
)
if serial_number is not None:
# This was "0x"+serial_number, but we should return it in
# the same form used in get_certificate()
response['serial_number'] = serial_number
request_id = self.__find_substring(
data, 'header.requestId = "', '"'
)
if request_id is not None:
response['request_id'] = request_id
error = self.__find_substring(
data, 'fixed.unexpectedError = "', '"'
)
if error is not None:
response['error'] = error
return response
def __run_sslget(self, args, stdin=None):
new_args = ["/usr/bin/sslget", "-d", self.sec_dir, "-w", self.pwd_file, "-n", self.ipa_certificate_nickname]
new_args = new_args + args
return self.__run(new_args, stdin)
def _sslget(self, url, **kw):
"""
Perform HTTPS request using ``sslget`` command.
This is only a stop-gap till it is replaced with python-nss.
"""
post = urlencode(kw)
self.debug('sslget %s %s', url, post)
argv = [
'/usr/bin/sslget',
'-n', self.ipa_certificate_nickname, # nickname
'-w', self.pwd_file, # pwfile
'-d', self.sec_dir, # dbdir
'-e', post, # post
'-r', url, # url
'%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
]
return self.__run(argv)
def get_certificate(self, serial_number=None):
"""
Retrieve an existing certificate
Retrieve an existing certificate.
:param serial_number: certificate serial number
"""
self.debug('IPA-RA: get_certificate')
issued_certificate = None
return_values = {}
if serial_number is not None:
request_info = 'serialNumber=%s' % serial_number
self.debug('request_info: %r', request_info)
returncode, stdout, stderr = self.__run_sslget(["-e", request_info, "-r", "/ca/agent/ca/displayBySerial", self.env.ca_host+":"+str(self.env.ca_ssl_port)])
self.debug("IPA-RA: returncode: %d" % returncode)
if (returncode == 0):
issued_certificate = self.__find_substring(stdout, 'header.certChainBase64 = "', '"')
if issued_certificate is not None:
return_values["status"] = "0"
issued_certificate = issued_certificate.replace("\\r", "")
issued_certificate = issued_certificate.replace("\\n", "")
self.debug("IPA-RA: issued_certificate: '%s'" % issued_certificate)
return_values["certificate"] = issued_certificate
else:
return_values["status"] = "1"
revocation_reason = self.__find_substring(stdout, 'header.revocationReason = ', ';')
if revocation_reason is not None:
return_values["revocation_reason"] = revocation_reason
(returncode, stdout, stderr) = self._sslget(
'/ca/agent/ca/displayBySerial',
serialNumber=serial_number,
)
self.debug("IPA-RA: returncode: %d" % returncode)
response = {}
if (returncode == 0):
issued_certificate = self.__find_substring(
stdout, 'header.certChainBase64 = "', '"'
)
if issued_certificate is not None:
response['status'] = '0'
issued_certificate = issued_certificate.replace('\\r', '')
issued_certificate = issued_certificate.replace('\\n', '')
self.debug('IPA-RA: issued_certificate: %s', issued_certificate)
response['certificate'] = issued_certificate
else:
return_values["status"] = str(-returncode)
response['status'] = '1'
revocation_reason = self.__find_substring(
stdout, 'header.revocationReason = ', ';'
)
if revocation_reason is not None:
response['revocation_reason'] = revocation_reason
else:
return_values["status"] = "1"
return return_values
response['status'] = str(-returncode)
return response
def request_certificate(self, certificate_request=None, request_type="pkcs10"):
"""
@ -391,10 +422,7 @@ class ra(Backend):
"""
sslget and certutil utilities are used only till Python-NSS completion.
"""
def __run_sslget(self, args, stdin=None):
new_args = ["/usr/bin/sslget", "-d", self.sec_dir, "-w", self.pwd_file, "-n", self.ipa_certificate_nickname]
new_args = new_args + args
return self.__run(new_args, stdin)
def __run_certutil(self, args, stdin=None):
new_args = ["/usr/bin/certutil", "-d", self.sec_dir, "-f", self.pwd_file]