Fix plugin to work with new output validation, add new helpers

Add a new get_subject() helper and return the subject when retrieving
certificates.

Add a normalizer so that everything before and after the BEGIN/END
block is removed.
This commit is contained in:
Rob Crittenden 2010-01-20 10:03:42 -05:00
parent c15c1eee72
commit 3a536353fb

View File

@ -64,6 +64,31 @@ def get_serial(certificate):
return serial return serial
def get_subject(certificate):
"""
Given a certificate, return the subject
In theory there should be only one cert per object so even if we get
passed in a list/tuple only return the first one.
"""
if type(certificate) in (list, tuple):
certificate = certificate[0]
try:
certificate = base64.b64decode(certificate)
except Exception, e:
pass
try:
sub = list(x509.get_subject_components(certificate, type=x509.DER))
sub.reverse()
except PyAsn1Error:
raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
subject = ""
for s in sub:
subject = subject + "%s=%s," % (s[0], s[1])
return subject[:-1]
def get_csr_hostname(csr): def get_csr_hostname(csr):
""" """
Return the value of CN in the subject of the request Return the value of CN in the subject of the request
@ -113,6 +138,25 @@ def validate_csr(ugettext, csr):
except Exception, e: except Exception, e:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e)) raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e))
def normalize_csr(csr):
"""
Strip any leading and trailing cruft around the BEGIN/END block
"""
end_len = 37
s = csr.find('-----BEGIN NEW CERTIFICATE REQUEST-----')
if s == -1:
s = csr.find('-----BEGIN CERTIFICATE REQUEST-----')
e = csr.find('-----END NEW CERTIFICATE REQUEST-----')
if e == -1:
e = csr.find('-----END CERTIFICATE REQUEST-----')
if e != -1:
end_len = 33
if s > -1 and e > -1:
# We're normalizing here, not validating
csr = csr[s:e+end_len]
return csr
class cert_request(VirtualCommand): class cert_request(VirtualCommand):
""" """
@ -122,6 +166,7 @@ class cert_request(VirtualCommand):
takes_args = ( takes_args = (
File('csr', validate_csr, File('csr', validate_csr,
cli_name='csr_file', cli_name='csr_file',
normalizer=normalize_csr,
), ),
) )
operation="request certificate" operation="request certificate"
@ -242,12 +287,6 @@ class cert_request(VirtualCommand):
result=result result=result
) )
def output_for_cli(self, textui, result, *args, **kw):
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain(_('Failed to submit a certificate request.'))
api.register(cert_request) api.register(cert_request)
@ -262,13 +301,9 @@ class cert_status(VirtualCommand):
def execute(self, request_id, **kw): def execute(self, request_id, **kw):
self.check_access() self.check_access()
return self.Backend.ra.check_request_status(request_id) return dict(
result=self.Backend.ra.check_request_status(request_id)
def output_for_cli(self, textui, result, *args, **kw): )
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain(_('Failed to retrieve a request status.'))
api.register(cert_status) api.register(cert_status)
@ -284,13 +319,9 @@ class cert_get(VirtualCommand):
def execute(self, serial_number): def execute(self, serial_number):
self.check_access() self.check_access()
return self.Backend.ra.get_certificate(serial_number) result=self.Backend.ra.get_certificate(serial_number)
result['subject'] = get_subject(result['certificate'])
def output_for_cli(self, textui, result, *args, **kw): return dict(result=result)
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain(_('Failed to obtain a certificate.'))
api.register(cert_get) api.register(cert_get)
@ -317,13 +348,9 @@ class cert_revoke(VirtualCommand):
def execute(self, serial_number, **kw): def execute(self, serial_number, **kw):
self.check_access() self.check_access()
return self.Backend.ra.revoke_certificate(serial_number, **kw) return dict(
result=self.Backend.ra.revoke_certificate(serial_number, **kw)
def output_for_cli(self, textui, result, *args, **kw): )
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain(_('Failed to revoke a certificate.'))
api.register(cert_revoke) api.register(cert_revoke)
@ -339,12 +366,8 @@ class cert_remove_hold(VirtualCommand):
def execute(self, serial_number, **kw): def execute(self, serial_number, **kw):
self.check_access() self.check_access()
return self.Backend.ra.take_certificate_off_hold(serial_number) return dict(
result=self.Backend.ra.take_certificate_off_hold(serial_number)
def output_for_cli(self, textui, result, *args, **kw): )
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain(_('Failed to take a revoked certificate off hold.'))
api.register(cert_remove_hold) api.register(cert_remove_hold)