Add --out option to service, host and cert-show to save the cert to a file.

Override forward() to grab the result and if a certificate is in the entry
and the file is writable then dump the certificate in PEM format.

ticket 473
This commit is contained in:
Rob Crittenden
2010-12-10 10:53:20 -05:00
parent c9807f4b25
commit 5f8a9b9849
4 changed files with 131 additions and 1 deletions

View File

@@ -1110,6 +1110,39 @@ class ManagedPolicyError(ExecutionError):
errno = 4021
format = _('A managed group cannot have a password policy.')
class FileError(ExecutionError):
"""
**4022** Errors when dealing with files
For example:
>>> raise FileError('cannot write file \'test\'')
Traceback (most recent call last):
...
FileError: cannot write file 'test'
"""
errno = 4022
format = _('%(reason)s')
class NoCertificateError(ExecutionError):
"""
**4023** Raised when trying to retrieve a certificate that doesn't exist.
For example:
>>> raise NoCertificateError('\'ipa.example.com\' doesn't have a certificate.')
Traceback (most recent call last):
...
NoCertificateError: 'ipa.example.com' doesn't have a certificate
"""
errno = 4023
format = _('\'%(entry)s\' doesn\'t have a certificate.')
class ManagedGroupExistsError(ExecutionError):
"""
**4024** Raised when adding a user and its managed group exists

View File

@@ -71,6 +71,8 @@ from ipalib import pkcs10
from ipalib import x509
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
from ipalib.plugins.service import make_pem, check_writable_file
from ipalib.plugins.service import write_certificate
import base64
import logging
import traceback
@@ -414,6 +416,12 @@ class cert_show(VirtualCommand):
),
)
takes_options = (
Str('out?',
doc=_('file to store certificate in'),
),
)
operation="retrieve certificate"
def execute(self, serial_number):
@@ -443,6 +451,20 @@ class cert_show(VirtualCommand):
return dict(result=result)
def forward(self, *keys, **options):
if 'out' in options:
check_writable_file(options['out'])
result = super(cert_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
write_certificate(result['result']['usercertificate'][0], options['out'])
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(cert_show, self).forward(*keys, **options)
api.register(cert_show)

View File

@@ -81,6 +81,8 @@ from ipalib.plugins.service import split_principal
from ipalib.plugins.service import validate_certificate
from ipalib.plugins.service import normalize_certificate
from ipalib.plugins.service import set_certificate_attrs
from ipalib.plugins.service import make_pem, check_writable_file
from ipalib.plugins.service import write_certificate
from ipalib.plugins.dns import dns_container_exists, _attribute_types
from ipalib import _, ngettext
from ipalib import x509
@@ -577,6 +579,12 @@ class host_show(LDAPRetrieve):
Display information about a host.
"""
has_output_params = LDAPRetrieve.has_output_params + host_output_params
takes_options = LDAPRetrieve.takes_options + (
Str('out?',
doc=_('file to store certificate in'),
),
)
member_attributes = ['managedby']
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
@@ -591,6 +599,19 @@ class host_show(LDAPRetrieve):
return dn
def forward(self, *keys, **options):
if 'out' in options:
check_writable_file(options['out'])
result = super(host_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
write_certificate(result['result']['usercertificate'][0], options['out'])
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(host_show, self).forward(*keys, **options)
api.register(host_show)

View File

@@ -69,6 +69,7 @@ EXAMPLES:
"""
import base64
import os
from ipalib import api, errors, util
from ipalib import Str, Flag, Bytes
@@ -78,6 +79,7 @@ from ipalib import _, ngettext
from ipalib import util
import nss.nss as nss
from nss.error import NSPRError
from ipapython.ipautil import file_exists
output_params = (
@@ -219,6 +221,41 @@ def set_certificate_attrs(entry_attrs):
entry_attrs['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
entry_attrs['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
def check_writable_file(filename):
"""
Determine if the file is writable. If the file doesn't exist then
open the file to test writability.
"""
try:
if file_exists(filename):
if not os.access(filename, os.W_OK):
raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename))
else:
fp = open(filename, 'w')
fp.close()
except (IOError, OSError), e:
raise errors.FileError(reason=str(e))
def make_pem(data):
"""
Convert a raw base64-encoded blob into something that looks like a PE
file with lines split to 64 characters and proper headers.
"""
cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
return '-----BEGIN CERTIFICATE-----\n' + \
cert + \
'\n-----END CERTIFICATE-----'
def write_certificate(cert, filename):
"""
Check to see if the certificate should be written to a file and do so.
"""
try:
fp = open(filename, 'w')
fp.write(make_pem(base64.b64encode(cert)))
fp.close()
except (IOError, OSError), e:
raise errors.FileError(reason=str(e))
class service(LDAPObject):
"""
@@ -411,7 +448,11 @@ class service_show(LDAPRetrieve):
Display information about an IPA service.
"""
member_attributes = ['managedby']
takes_options = LDAPRetrieve.takes_options
takes_options = LDAPRetrieve.takes_options + (
Str('out?',
doc=_('file to store certificate in'),
),
)
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
if 'krblastpwdchange' in entry_attrs:
@@ -425,6 +466,19 @@ class service_show(LDAPRetrieve):
return dn
def forward(self, *keys, **options):
if 'out' in options:
check_writable_file(options['out'])
result = super(service_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
write_certificate(result['result']['usercertificate'][0], options['out'])
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(service_show, self).forward(*keys, **options)
api.register(service_show)
class service_add_host(LDAPAddMember):