Cleaned up ra.revoke_certificate() and ra.take_certificate_off_hold(); added more tests in integration.py

This commit is contained in:
Jason Gerard DeRose 2009-02-10 22:49:21 -07:00 committed by Rob Crittenden
parent 80df8f6e15
commit 0e6e11d2e3
2 changed files with 199 additions and 142 deletions

78
integration.py Executable file
View File

@ -0,0 +1,78 @@
#!/usr/bin/python
from base64 import b64encode, b64decode
from ipalib import api
# certificate with serial number 17
cert = b64decode("""
MIIC3zCCAcegAwIBAgIBETANBgkqhkiG9w0BAQUFADA7MRkwFwYDVQQKExBTamNSZWRoYXQgRG9tYW
luMR4wHAYDVQQDExVDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMDkwMTIyMjMzODA2WhcNMDkwNzIx
MjMzODA2WjAUMRIwEAYKCZImiZPyLGQBARMCbGwwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAM
id6i9ri9ldyAXaH4MJSPdUDjdc9+E10hwxw7crFE1K0uvr8YT2e1YotNqv7Q+Bk7KVRrLH6Y5UPlWY
uSAP8G9t8yjn5Uo3iXU5AqsrRek+pxerD/WocwedF6yjJ/zlQyYyg93h0njJr1lStyVLTyp+VVqtk3
FSDIwLCWQHOTejAgMBAAGjgZgwgZUwHwYDVR0jBBgwFoAUlz9JZxqVabh4QQOEkxyWt80pIQkwQwYI
KwYBBQUHAQEENzA1MDMGCCsGAQUFBzABhidodHRwOi8vYS1mOC5zamMucmVkaGF0LmNvbTo5MTgwL2
NhL29jc3AwDgYDVR0PAQH/BAQDAgXgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDBDANBgkq
hkiG9w0BAQUFAAOCAQEAhU+oqPh+rlYFPm0D8HAJ0RIWw9gkNctHUfVGi+NeYTaUAEGWUOpXjLSQgP
gq1fNBHd+IRLhycwp4uUsFCPE1n3eStmn/D6o9u1eNnTFPj74MLZVQQTXPE8+LBYeHgTUwFuKp2WyW
9J/BDZ3pDWKYWWMawhD7ext7UhZkpIJODFEaDxiXCfB8GsAEbmfoYFk21znuGQQu3Wu1s6licyunLh
/W3sxCFGIT9DHxS0GZKimm7M02IPGxK/0TZr0kVcLQx6XGKqiK1464rvl4u60mQjwJwfhawshs84YT
xFnXZKkvsT3GjfIe/k687TMG3paTFtKkis+u7z0v6355uJzLpQ==
""")
csr = 'MIIBlDCB/gIBADBVMR0wGwYDVQQKExRVc2Vyc3lzUmVkaGF0LURvbWFpbjEQMA4GA1UECxMHcGtpLWlwYTEiMCAGA1UEAxMZSVBBLVN1YnN5c3RlbS1DZXJ0aWZpY2F0ZTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA3Qmpr81WxbnISmyyhc2ShiPzUvWIrCg5FgJ1QrBl7CRe62Wl/YYiV/DbuMoex1ec7zKfgfSFVFU9/2iwj7Du0sZdXYJNQPdj9yLdPk2tyxdgJuHLdxI0SNgaEFyvmIMP/X9vQN9H5w0/PyrJQscOxc6tbTcYL0ZSSylLQ+diaQECAwEAA'
api.bootstrap(in_server=True, debug=True)
api.finalize()
ra = api.Backend.ra
def assert_equal(*vals):
val0 = vals[0]
for val in vals[1:]:
assert val == val0, '%r != %r' % (val, val0)
api.log.info('******** Testing ra.check_request_status() ********')
assert_equal(
ra.check_request_status('35'),
dict(
status='0',
serial_number='17',
request_status='complete',
request_id='35',
)
)
api.log.info('******** Testing ra.get_certificate() ********')
assert_equal(
ra.get_certificate('17'),
dict(
status='0',
certificate=b64encode(cert),
)
)
api.log.info('******** Testing ra.request_certificate() ********')
assert_equal(
ra.request_certificate(csr),
dict(
status='1',
)
)
api.log.info('******** Testing ra.revoke_certificate() ********')
assert_equal(
ra.revoke_certificate('17', revocation_reason=6), # Put on hold
dict(
status='0',
revoked=True,
)
)
api.log.info('******** Testing ra.take_certificate_off_hold() ********')
assert_equal(
ra.take_certificate_off_hold('17'),
dict(
taken_off_hold=True,
)
)

View File

@ -21,13 +21,14 @@
"""
Backend plugin for IPA-RA.
IPA-RA provides an access to CA to issue, retrieve, and revoke certificates.
IPA-RA plugin provides CA interface via the following methods:
check_request_status to check certificate request status
get_certificate to retrieve an existing certificate
request_certificate to request certificate
revoke_certificate to revoke certificate
take_certificate_off_hold to take certificate off hold
The `ra` plugin provides access to the CA to issue, retrieve, and revoke
certificates via the following methods:
* `ra.check_request_status()` - check certificate request status.
* `ra.get_certificate()` - retrieve an existing certificate.
* `ra.request_certificate()` - request a new certificate.
* `ra.revoke_certificate()` - revoke a certificate.
* `ra.take_certificate_off_hold()` - take a certificate off hold.
"""
import os, stat, subprocess
@ -43,6 +44,7 @@ from ipalib import api, Backend
from ipalib.errors2 import NetworkError
from ipaserver import servercore
from ipaserver import ipaldap
from ipalib.constants import TYPE_ERROR
class ra(Backend):
@ -66,42 +68,68 @@ class ra(Backend):
self.__import_ca_chain()
self.__request_ipa_certificate(self.__generate_ipa_request())
def _request(self, method, **kw):
def _request(self, url, **kw):
"""
Perform an HTTP request to CA server.
Perform an HTTP request.
:param url: The URL to post to.
:param kw: Keyword arguments to encode into POST body.
"""
# FIXME: should '/ca/ee/ca/%s' be hardcoded, or should it be in Env?
url = '/ca/ee/ca/%s' % method
self.info('CA request: %s:%s%s',
self.env.ca_host, self.env.ca_port, url)
uri = 'http://%s:%s%s' % (self.env.ca_host, self.env.ca_port, url)
post = urlencode(kw)
self.info('request %r', uri)
self.debug('request post %r', post)
conn = HTTPConnection(self.env.ca_host, self.env.ca_port)
try:
conn.request('POST', url,
body=urlencode(kw),
body=post,
headers={'Content-type': 'application/x-www-form-urlencoded'},
)
except socket.error, e:
raise NetworkError(
uri='http://%s:%d' % (self.env.ca_host, self.env.ca_port),
error=e.args[1],
)
raise NetworkError(uri=uri, error=e.args[1])
response = conn.getresponse()
(status, reason) = (response.status, response.reason)
data = response.read()
conn.close()
self.debug('response status: %r', status)
self.debug('response reason: %r', reason)
#self.debug('response data: %r', data)
self.debug('request status %r', status)
self.debug('request reason %s', reason)
self.debug('request data %s', data)
return (status, reason, data)
def _sslget(self, url, **kw):
"""
Perform an HTTPS request using the ``sslget`` command.
:param url: The URL to post to.
:param kw: Keyword arguments to encode into POST body.
"""
uri = 'https://%s:%d%s' % (self.env.ca_host, self.env.ca_ssl_port, url)
post = urlencode(kw)
self.info('sslget %r', uri)
self.debug('sslget post %r', post)
argv = [
'/usr/bin/sslget',
'-n', self.ipa_certificate_nickname, # nickname
'-w', self.pwd_file, # pwfile
'-d', self.sec_dir, # dbdir
'-e', post, # post
'-r', url, # url
'%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
]
(returncode, stdout, stderr) = self.__run(argv)
self.debug('sslget returncode %r', returncode)
self.debug('sslget stderr %s', stderr)
self.debug('sslget stdout %s', stdout)
return (returncode, stdout, stderr)
def check_request_status(self, request_id):
"""
Check status of a certificate signing request.
:param request_id: request ID
"""
self.debug('IPA-RA: check_request_status')
(s, r, data) = self._request('checkRequest',
self.debug('%s.check_request_status()', self.fullname)
(s, r, data) = self._request('/ca/ee/ca/checkRequest',
requestId=request_id,
xmlOutput='true',
)
@ -118,7 +146,7 @@ class ra(Backend):
)
if serial_number is not None:
# This was "0x"+serial_number, but we should return it in
# the same form used in get_certificate()
# the same form used as arg to get_certificate(), etc.
response['serial_number'] = serial_number
request_id = self.__find_substring(
data, 'header.requestId = "', '"'
@ -132,46 +160,19 @@ class ra(Backend):
response['error'] = error
return response
def __run_sslget(self, args, stdin=None):
new_args = ["/usr/bin/sslget", "-d", self.sec_dir, "-w", self.pwd_file, "-n", self.ipa_certificate_nickname]
new_args = new_args + args
return self.__run(new_args, stdin)
def _sslget(self, url, **kw):
"""
Perform HTTPS request using ``sslget`` command.
This is only a stop-gap till it is replaced with python-nss.
"""
post = urlencode(kw)
self.debug('sslget %s %s', url, post)
argv = [
'/usr/bin/sslget',
'-n', self.ipa_certificate_nickname, # nickname
'-w', self.pwd_file, # pwfile
'-d', self.sec_dir, # dbdir
'-e', post, # post
'-r', url, # url
'%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
]
(returncode, stdout, stderr) = self.__run(argv)
self.debug('sslget response %s', stdout)
return (returncode, stdout, stderr)
def get_certificate(self, serial_number=None):
"""
Retrieve an existing certificate.
:param serial_number: certificate serial number
"""
self.debug('IPA-RA: get_certificate')
self.debug('%s.get_certificate()', self.fullname)
issued_certificate = None
(returncode, stdout, stderr) = self._sslget(
'/ca/agent/ca/displayBySerial',
serialNumber=serial_number,
xmlOutput='true',
)
self.debug("IPA-RA: returncode: %d" % returncode)
response = {}
if (returncode == 0):
issued_certificate = self.__find_substring(
@ -201,124 +202,102 @@ class ra(Backend):
:param csr: The certificate signing request.
:param request_type: The request type (defaults to ``'pkcs10'``).
"""
self.debug("IPA-RA: request_certificate")
self.debug('%s.request_certificate()', self.fullname)
certificate = None
(returncode, stdout, stderr) = self._sslget(
'/ca/ee/ca/profileSubmit',
(returncode, stdout, stderr) = self._sslget('/ca/ee/ca/profileSubmit',
profileId='caRAserverCert',
cert_request_type=request_type,
cert_request=csr,
xmlOutput='true',
)
return_values = {}
self.debug("IPA-RA: returncode: %d" % returncode)
response = {}
if (returncode == 0):
status = self.__find_substring(stdout, "<Status>", "</Status>")
if status is not None:
self.debug ("status=%s" % status)
return_values["status"] = status
response["status"] = status
request_id = self.__find_substring(stdout, "<Id>", "</Id>")
if request_id is not None:
self.debug ("request_id=%s" % request_id)
return_values["request_id"] = request_id
response["request_id"] = request_id
serial_number = self.__find_substring(stdout, "<serialno>", "</serialno>")
if serial_number is not None:
self.debug ("serial_number=%s" % serial_number)
return_values["serial_number"] = ("0x%s" % serial_number)
response["serial_number"] = ("0x%s" % serial_number)
subject = self.__find_substring(stdout, "<SubjectDN>", "</SubjectDN>")
if subject is not None:
self.debug ("subject=%s" % subject)
return_values["subject"] = subject
response["subject"] = subject
certificate = self.__find_substring(stdout, "<b64>", "</b64>")
if certificate is not None:
self.debug ("certificate=%s" % certificate)
return_values["certificate"] = certificate
if return_values.has_key("status") is False:
return_values["status"] = "2"
response["certificate"] = certificate
if response.has_key("status") is False:
response["status"] = "2"
else:
return_values["status"] = str(-returncode)
return return_values
response["status"] = str(-returncode)
return response
def revoke_certificate(self, serial_number, revocation_reason=0):
"""
Revoke a certificate.
def revoke_certificate(self, serial_number=None, revocation_reason=0):
The integer ``revocation_reason`` code must have one of these values:
* ``0`` - unspecified
* ``1`` - keyCompromise
* ``2`` - cACompromise
* ``3`` - affiliationChanged
* ``4`` - superseded
* ``5`` - cessationOfOperation
* ``6`` - certificateHold
* ``8`` - removeFromCRL
* ``9`` - privilegeWithdrawn
* ``10`` - aACompromise
Note that reason code ``7`` is not used. See RFC 5280 for more details:
http://www.ietf.org/rfc/rfc5280.txt
:param serial_number: Certificate serial number.
:param revocation_reason: Integer code of revocation reason.
"""
Revoke a certificate
:param serial_number: certificate serial number
:param revocation_reason: revocation reason
revocationr reasons: 0 - unspecified
1 - key compromise
2 - ca compromise
3 - affiliation changed
4 - superseded
5 - cessation of operation
6 - certificate hold
7 - value 7 is not used
8 - remove from CRL
9 - privilege withdrawn
10 - aa compromise
see RFC 5280 for more details
"""
return_values = {}
self.debug("IPA-RA: revoke_certificate")
if revocation_reason is None:
revocation_reason = 0
if serial_number is not None:
if isinstance(serial_number, int):
serial_number = str(serial_number)
if isinstance(revocation_reason, int):
revocation_reason = str(revocation_reason)
request_info = "op=revoke&revocationReason="+revocation_reason+"&revokeAll=(certRecordId%3D"+serial_number+")&totalRecordCount=1"
(returncode, stdout, stderr) = self.__run_sslget([
'-e',
request_info,
'-r',
'/ca/agent/ca/doRevoke',
'%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
])
self.debug("IPA-RA: returncode: %d" % returncode)
if (returncode == 0):
return_values["status"] = "0"
if (stdout.find('revoked = "yes"') > -1):
return_values["revoked"] = True
else:
return_values["revoked"] = False
self.debug('%s.revoke_certificate()', self.fullname)
if type(revocation_reason) is not int:
raise TYPE_ERROR('revocation_reason', int, revocation_reason,
type(revocation_reason)
)
response = {}
(returncode, stdout, stderr) = self._sslget('/ca/agent/ca/doRevoke',
op='revoke',
revocationReason=revocation_reason,
revokeAll='(certRecordId=%s)' % serial_number,
totalRecordCount=1,
)
if returncode == 0:
response['status'] = '0'
if (stdout.find('revoked = "yes"') > -1):
response['revoked'] = True
else:
return_values["status"] = str(-returncode)
response['revoked'] = False
else:
return_values["status"] = "1"
return return_values
response['status'] = str(-returncode)
return response
def take_certificate_off_hold(self, serial_number):
"""
Take revoked certificate off hold.
def take_certificate_off_hold(self, serial_number=None):
:param serial_number: Certificate serial number.
"""
Take revoked certificate off hold
:param serial_number: certificate serial number
"""
return_values = {}
self.debug("IPA-RA: revoke_certificate")
if serial_number is not None:
if isinstance(serial_number, int):
serial_number = str(serial_number)
request_info = "serialNumber="+serial_number
(returncode, stdout, stderr) = self.__run_sslget([
'-e',
request_info,
'-r',
'/ca/agent/ca/doUnrevoke',
'%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
])
self.debug("IPA-RA: returncode: %d" % returncode)
if (returncode == 0):
if (stdout.find('unrevoked = "yes"') > -1):
return_values["taken_off_hold"] = True
else:
return_values["taken_off_hold"] = False
response = {}
self.debug('%s.take_certificate_off_hold()', self.fullname)
(returncode, stdout, stderr) = self._sslget('/ca/agent/ca/doUnrevoke',
serialNumber=serial_number,
)
if (returncode == 0):
if (stdout.find('unrevoked = "yes"') > -1):
response['taken_off_hold'] = True
else:
return_values["status"] = str(-returncode)
response['taken_off_hold'] = False
else:
return_values["status"] = "1"
return return_values
response['status'] = str(-returncode)
return response
def __find_substring(self, str, str1, str2):
sub_str = None