mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-26 08:51:50 -06:00
Store the new certificate in a service record. Clean up some argument names to match the current standard.
This commit is contained in:
parent
0d538b20f2
commit
8f1df0fe8a
@ -26,7 +26,18 @@ from ipalib import api, SkipPluginModule
|
|||||||
if api.env.enable_ra is not True:
|
if api.env.enable_ra is not True:
|
||||||
# In this case, abort loading this plugin module...
|
# In this case, abort loading this plugin module...
|
||||||
raise SkipPluginModule(reason='env.enable_ra is not True')
|
raise SkipPluginModule(reason='env.enable_ra is not True')
|
||||||
from ipalib import Command, Str, Int
|
from ipalib import Command, Str, Int, Bytes, Flag
|
||||||
|
from ipalib import errors
|
||||||
|
import base64
|
||||||
|
|
||||||
|
def validate_csr(ugettext, csr):
|
||||||
|
"""
|
||||||
|
For now just verify that it is properly base64-encoded.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
base64.b64decode(csr)
|
||||||
|
except Exception, e:
|
||||||
|
raise errors.Base64DecodeError(reason=str(e))
|
||||||
|
|
||||||
|
|
||||||
class cert_request(Command):
|
class cert_request(Command):
|
||||||
@ -34,16 +45,57 @@ class cert_request(Command):
|
|||||||
Submit a certificate singing request.
|
Submit a certificate singing request.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
takes_args = ('csr',)
|
takes_args = (Str('csr', validate_csr),)
|
||||||
|
|
||||||
takes_options = (
|
takes_options = (
|
||||||
Str('request_type', default=u'pkcs10', autofill=True),
|
Str('principal',
|
||||||
|
doc="service principal for this certificate (e.g. HTTP/test.example.com)",
|
||||||
|
),
|
||||||
|
Str('request_type',
|
||||||
|
default=u'pkcs10',
|
||||||
|
autofill=True,
|
||||||
|
),
|
||||||
|
Flag('add',
|
||||||
|
doc="automatically add the principal if it doesn't exist",
|
||||||
|
default=False,
|
||||||
|
autofill=True
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
def execute(self, csr, **options):
|
def execute(self, csr, **kw):
|
||||||
return self.Backend.ra.request_certificate(csr, **options)
|
skw = {"all": True}
|
||||||
|
principal = kw.get('principal')
|
||||||
|
add = kw.get('add')
|
||||||
|
del kw['principal']
|
||||||
|
del kw['add']
|
||||||
|
service = None
|
||||||
|
|
||||||
def output_for_cli(self, textui, result, *args, **options):
|
# See if the service exists and punt if it doesn't and we aren't
|
||||||
|
# going to add it
|
||||||
|
try:
|
||||||
|
service = api.Command['service_show'](principal, **skw)
|
||||||
|
if service.get('usercertificate'):
|
||||||
|
# FIXME, what to do here? Do we revoke the old cert?
|
||||||
|
raise errors.GenericError(format='entry already has a certificate')
|
||||||
|
|
||||||
|
except errors.NotFound, e:
|
||||||
|
if not add:
|
||||||
|
raise e
|
||||||
|
|
||||||
|
# Request the certificate
|
||||||
|
result = self.Backend.ra.request_certificate(csr, **kw)
|
||||||
|
|
||||||
|
# Success? Then add it to the service entry. We know that it
|
||||||
|
# either exists or we should add it.
|
||||||
|
if result.get('status') == '0':
|
||||||
|
if service is None:
|
||||||
|
service = api.Command['service_add'](principal, **{})
|
||||||
|
skw = {"usercertificate": str(result.get('certificate'))}
|
||||||
|
api.Command['service_mod'](principal, **skw)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def output_for_cli(self, textui, result, *args, **kw):
|
||||||
if isinstance(result, dict) and len(result) > 0:
|
if isinstance(result, dict) and len(result) > 0:
|
||||||
textui.print_entry(result, 0)
|
textui.print_entry(result, 0)
|
||||||
else:
|
else:
|
||||||
@ -60,10 +112,10 @@ class cert_status(Command):
|
|||||||
takes_args = ['request_id']
|
takes_args = ['request_id']
|
||||||
|
|
||||||
|
|
||||||
def execute(self, request_id, **options):
|
def execute(self, request_id, **kw):
|
||||||
return self.Backend.ra.check_request_status(request_id)
|
return self.Backend.ra.check_request_status(request_id)
|
||||||
|
|
||||||
def output_for_cli(self, textui, result, *args, **options):
|
def output_for_cli(self, textui, result, *args, **kw):
|
||||||
if isinstance(result, dict) and len(result) > 0:
|
if isinstance(result, dict) and len(result) > 0:
|
||||||
textui.print_entry(result, 0)
|
textui.print_entry(result, 0)
|
||||||
else:
|
else:
|
||||||
@ -82,7 +134,7 @@ class cert_get(Command):
|
|||||||
def execute(self, serial_number):
|
def execute(self, serial_number):
|
||||||
return self.Backend.ra.get_certificate(serial_number)
|
return self.Backend.ra.get_certificate(serial_number)
|
||||||
|
|
||||||
def output_for_cli(self, textui, result, *args, **options):
|
def output_for_cli(self, textui, result, *args, **kw):
|
||||||
if isinstance(result, dict) and len(result) > 0:
|
if isinstance(result, dict) and len(result) > 0:
|
||||||
textui.print_entry(result, 0)
|
textui.print_entry(result, 0)
|
||||||
else:
|
else:
|
||||||
@ -102,10 +154,10 @@ class cert_revoke(Command):
|
|||||||
takes_options = [Int('revocation_reason?', default=0)]
|
takes_options = [Int('revocation_reason?', default=0)]
|
||||||
|
|
||||||
|
|
||||||
def execute(self, serial_number, **options):
|
def execute(self, serial_number, **kw):
|
||||||
return self.Backend.ra.revoke_certificate(serial_number, **options)
|
return self.Backend.ra.revoke_certificate(serial_number, **kw)
|
||||||
|
|
||||||
def output_for_cli(self, textui, result, *args, **options):
|
def output_for_cli(self, textui, result, *args, **kw):
|
||||||
if isinstance(result, dict) and len(result) > 0:
|
if isinstance(result, dict) and len(result) > 0:
|
||||||
textui.print_entry(result, 0)
|
textui.print_entry(result, 0)
|
||||||
else:
|
else:
|
||||||
@ -121,10 +173,10 @@ class cert_remove_hold(Command):
|
|||||||
|
|
||||||
takes_args = ['serial_number']
|
takes_args = ['serial_number']
|
||||||
|
|
||||||
def execute(self, serial_number, **options):
|
def execute(self, serial_number, **kw):
|
||||||
return self.Backend.ra.take_certificate_off_hold(serial_number)
|
return self.Backend.ra.take_certificate_off_hold(serial_number)
|
||||||
|
|
||||||
def output_for_cli(self, textui, result, *args, **options):
|
def output_for_cli(self, textui, result, *args, **kw):
|
||||||
if isinstance(result, dict) and len(result) > 0:
|
if isinstance(result, dict) and len(result) > 0:
|
||||||
textui.print_entry(result, 0)
|
textui.print_entry(result, 0)
|
||||||
else:
|
else:
|
||||||
|
Loading…
Reference in New Issue
Block a user