cert: add object plugin

Implement cert as an object with methods rather than a bunch of loosely
related commands.

https://fedorahosted.org/freeipa/ticket/5381

Reviewed-By: David Kupka <dkupka@redhat.com>
Reviewed-By: Pavel Vomacka <pvomacka@redhat.com>
This commit is contained in:
Jan Cholasta 2016-06-14 06:29:18 +02:00
parent b484667d15
commit d44ffdad42
4 changed files with 327 additions and 265 deletions

60
API.txt
View File

@ -723,25 +723,27 @@ output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>]) output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value') output: PrimaryKey('value')
command: cert_find command: cert_find
args: 0,19,4 args: 1,20,4
arg: Str('criteria?')
option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', autofill=False, cli_name='ca') option: Str('cacn?', cli_name='ca')
option: Flag('exactly?', autofill=True, default=False) option: Flag('exactly?', autofill=True, default=False)
option: Str('issuedon_from?', autofill=False) option: DateTime('issuedon_from?', autofill=False)
option: Str('issuedon_to?', autofill=False) option: DateTime('issuedon_to?', autofill=False)
option: Str('issuer?', autofill=False) option: DNParam('issuer?', autofill=False)
option: Int('max_serial_number?', autofill=False) option: Int('max_serial_number?', autofill=False)
option: Int('min_serial_number?', autofill=False) option: Int('min_serial_number?', autofill=False)
option: Flag('pkey_only?', autofill=True, default=False)
option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Int('revocation_reason?', autofill=False) option: Int('revocation_reason?', autofill=False)
option: Str('revokedon_from?', autofill=False) option: DateTime('revokedon_from?', autofill=False)
option: Str('revokedon_to?', autofill=False) option: DateTime('revokedon_to?', autofill=False)
option: Int('sizelimit?', default=100) option: Int('sizelimit?')
option: Str('subject?', autofill=False) option: Str('subject?', autofill=False)
option: Str('validnotafter_from?', autofill=False) option: DateTime('validnotafter_from?', autofill=False)
option: Str('validnotafter_to?', autofill=False) option: DateTime('validnotafter_to?', autofill=False)
option: Str('validnotbefore_from?', autofill=False) option: DateTime('validnotbefore_from?', autofill=False)
option: Str('validnotbefore_to?', autofill=False) option: DateTime('validnotbefore_to?', autofill=False)
option: Str('version?') option: Str('version?')
output: Output('count', type=[<type 'int'>]) output: Output('count', type=[<type 'int'>])
output: ListOfEntries('result') output: ListOfEntries('result')
@ -749,37 +751,49 @@ output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: Output('truncated', type=[<type 'bool'>]) output: Output('truncated', type=[<type 'bool'>])
command: cert_remove_hold command: cert_remove_hold
args: 1,1,1 args: 1,1,1
arg: Str('serial_number') arg: Int('serial_number')
option: Str('version?') option: Str('version?')
output: Output('result') output: Output('result')
command: cert_request command: cert_request
args: 1,6,1 args: 1,8,3
arg: Str('csr', cli_name='csr_file') arg: Str('csr', cli_name='csr_file')
option: Flag('add', autofill=True, default=False) option: Flag('add', autofill=True, default=False)
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', cli_name='ca') option: Str('cacn?', cli_name='ca')
option: Str('principal') option: Str('principal')
option: Str('profile_id?') option: Str('profile_id?')
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Str('request_type', autofill=True, default=u'pkcs10') option: Str('request_type', autofill=True, default=u'pkcs10')
option: Str('version?') option: Str('version?')
output: Output('result', type=[<type 'dict'>]) output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
command: cert_revoke command: cert_revoke
args: 1,2,1 args: 1,2,1
arg: Str('serial_number') arg: Int('serial_number')
option: Int('revocation_reason', autofill=True, default=0) option: Int('revocation_reason', autofill=True, default=0)
option: Str('version?') option: Str('version?')
output: Output('result') output: Output('result')
command: cert_show command: cert_show
args: 1,3,1 args: 1,5,3
arg: Str('serial_number') arg: Int('serial_number')
option: Str('cacn?', autofill=False, cli_name='ca') option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', cli_name='ca')
option: Str('out?') option: Str('out?')
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Str('version?') option: Str('version?')
output: Output('result') output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
command: cert_status command: cert_status
args: 1,1,1 args: 1,3,3
arg: Str('request_id') arg: Int('request_id')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Str('version?') option: Str('version?')
output: Output('result') output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
command: certprofile_del command: certprofile_del
args: 1,2,3 args: 1,2,3
arg: Str('cn+', cli_name='id') arg: Str('cn+', cli_name='id')

View File

@ -90,5 +90,5 @@ IPA_DATA_VERSION=20100614120000
# # # #
######################################################## ########################################################
IPA_API_VERSION_MAJOR=2 IPA_API_VERSION_MAJOR=2
IPA_API_VERSION_MINOR=193 IPA_API_VERSION_MINOR=194
# Last change: schema: remove `no_cli` from command schema # Last change: cert: add object plugin

View File

@ -19,7 +19,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import CommandOverride from ipaclient.frontend import MethodOverride
from ipalib import errors from ipalib import errors
from ipalib import x509 from ipalib import x509
from ipalib import util from ipalib import util
@ -30,7 +30,7 @@ register = Registry()
@register(override=True) @register(override=True)
class cert_request(CommandOverride): class cert_request(MethodOverride):
def get_args(self): def get_args(self):
for arg in super(cert_request, self).get_args(): for arg in super(cert_request, self).get_args():
if arg.name == 'csr': if arg.name == 'csr':
@ -39,7 +39,7 @@ class cert_request(CommandOverride):
@register(override=True) @register(override=True)
class cert_show(CommandOverride): class cert_show(MethodOverride):
def forward(self, *keys, **options): def forward(self, *keys, **options):
if 'out' in options: if 'out' in options:
util.check_writable_file(options['out']) util.check_writable_file(options['out'])

View File

@ -19,9 +19,14 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import time
import binascii import binascii
import datetime
import os
from nss import nss
from nss.error import NSPRError
from pyasn1.error import PyAsn1Error
import six
from ipalib import Command, Str, Int, Flag from ipalib import Command, Str, Int, Flag
from ipalib import api from ipalib import api
@ -30,6 +35,9 @@ from ipalib import pkcs10
from ipalib import x509 from ipalib import x509
from ipalib import ngettext from ipalib import ngettext
from ipalib.constants import IPA_CA_CN from ipalib.constants import IPA_CA_CN
from ipalib.crud import Create, PKQuery, Retrieve, Search
from ipalib.frontend import Method, Object
from ipalib.parameters import Bytes, DateTime, DNParam
from ipalib.plugable import Registry from ipalib.plugable import Registry
from .virtual import VirtualCommand from .virtual import VirtualCommand
from .baseldap import pkey_to_value from .baseldap import pkey_to_value
@ -42,11 +50,6 @@ from ipalib import output
from .service import validate_principal from .service import validate_principal
from ipapython.dn import DN from ipapython.dn import DN
import six
import nss.nss as nss
from nss.error import NSPRError
from pyasn1.error import PyAsn1Error
if six.PY3: if six.PY3:
unicode = str unicode = str
@ -134,16 +137,12 @@ USER, HOST, SERVICE = range(3)
register = Registry() register = Registry()
def validate_pkidate(ugettext, value): PKIDATE_FORMAT = '%Y-%m-%d'
"""
A date in the format of %Y-%m-%d
""" def normalize_pkidate(value):
try: return datetime.datetime.strptime(value, PKIDATE_FORMAT)
ts = time.strptime(value, '%Y-%m-%d')
except ValueError as e:
return str(e)
return None
def validate_csr(ugettext, csr): def validate_csr(ugettext, csr):
""" """
@ -182,7 +181,8 @@ def normalize_csr(csr):
return csr return csr
def _convert_serial_number(num):
def normalize_serial_number(num):
""" """
Convert a SN given in decimal or hexadecimal. Convert a SN given in decimal or hexadecimal.
Returns the number or None if conversion fails. Returns the number or None if conversion fails.
@ -195,18 +195,10 @@ def _convert_serial_number(num):
# hexa without prefix # hexa without prefix
num = int(num, 16) num = int(num, 16)
except ValueError: except ValueError:
num = None pass
return num return unicode(num)
def validate_serial_number(ugettext, num):
if _convert_serial_number(num) == None:
return u"Decimal or hexadecimal number is required for serial number"
return None
def normalize_serial_number(num):
# It's been already validated
return unicode(_convert_serial_number(num))
def get_host_from_principal(principal): def get_host_from_principal(principal):
""" """
@ -242,10 +234,123 @@ def caacl_check(principal_type, principal_string, ca, profile_id):
) )
) )
class BaseCertObject(Object):
takes_params = (
Bytes(
'certificate',
label=_("Certificate"),
doc=_("Base-64 encoded certificate."),
flags={'no_create', 'no_update', 'no_search'},
),
DNParam(
'subject',
label=_('Subject'),
flags={'no_create', 'no_update', 'no_search'},
),
DNParam(
'issuer',
label=_('Issuer'),
doc=_('Issuer DN'),
flags={'no_create', 'no_update', 'no_search'},
),
DateTime(
'valid_not_before',
label=_('Not Before'),
flags={'no_create', 'no_update', 'no_search'},
),
DateTime(
'valid_not_after',
label=_('Not After'),
flags={'no_create', 'no_update', 'no_search'},
),
Str(
'md5_fingerprint',
label=_('Fingerprint (MD5)'),
flags={'no_create', 'no_update', 'no_search'},
),
Str(
'sha1_fingerprint',
label=_('Fingerprint (SHA1)'),
flags={'no_create', 'no_update', 'no_search'},
),
Int(
'serial_number',
label=_('Serial number'),
doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'),
normalizer=normalize_serial_number,
flags={'no_create', 'no_update', 'no_search'},
),
Str(
'serial_number_hex',
label=_('Serial number (hex)'),
flags={'no_create', 'no_update', 'no_search'},
),
)
def _parse(self, obj):
cert = x509.load_certificate(obj['certificate'])
obj['subject'] = DN(unicode(cert.subject))
obj['issuer'] = DN(unicode(cert.issuer))
obj['valid_not_before'] = unicode(cert.valid_not_before_str)
obj['valid_not_after'] = unicode(cert.valid_not_after_str)
obj['md5_fingerprint'] = unicode(
nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
obj['sha1_fingerprint'] = unicode(
nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
obj['serial_number'] = cert.serial_number
obj['serial_number_hex'] = u'0x%X' % cert.serial_number
class BaseCertMethod(Method):
def get_options(self):
yield Str('cacn?',
cli_name='ca',
query=True,
label=_('Issuing CA'),
doc=_('Name of issuing CA'),
)
for option in super(BaseCertMethod, self).get_options():
yield option
@register() @register()
class cert_request(VirtualCommand): class certreq(BaseCertObject):
takes_params = BaseCertObject.takes_params + (
Str(
'request_type',
default=u'pkcs10',
autofill=True,
flags={'no_update', 'no_update', 'no_search'},
),
Str(
'profile_id?', validate_profile_id,
label=_("Profile ID"),
doc=_("Certificate Profile to use"),
flags={'no_update', 'no_update', 'no_search'},
),
Str(
'cert_request_status',
label=_('Request status'),
flags={'no_create', 'no_update', 'no_search'},
),
Int(
'request_id',
label=_('Request id'),
primary_key=True,
flags={'no_create', 'no_update', 'no_search', 'no_output'},
),
)
@register()
class cert_request(Create, BaseCertMethod, VirtualCommand):
__doc__ = _('Submit a certificate signing request.') __doc__ = _('Submit a certificate signing request.')
obj_name = 'certreq'
attr_name = 'request'
takes_args = ( takes_args = (
Str( Str(
'csr', validate_csr, 'csr', validate_csr,
@ -258,69 +363,25 @@ class cert_request(VirtualCommand):
operation="request certificate" operation="request certificate"
takes_options = ( takes_options = (
Str('principal', Str(
'principal',
label=_('Principal'), label=_('Principal'),
doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'), doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'),
), ),
Str('request_type', Flag(
default=u'pkcs10', 'add',
autofill=True,
),
Flag('add',
doc=_("automatically add the principal if it doesn't exist"), doc=_("automatically add the principal if it doesn't exist"),
default=False,
autofill=True
),
Str('profile_id?', validate_profile_id,
label=_("Profile ID"),
doc=_("Certificate Profile to use"),
),
Str('cacn?',
cli_name='ca',
query=True,
label=_("CA"),
doc=_("CA to use"),
), ),
) )
has_output_params = ( def get_args(self):
Str('certificate', # FIXME: the 'no_create' flag is ignored for positional arguments
label=_('Certificate'), for arg in super(cert_request, self).get_args():
), if arg.name == 'request_id':
Str('subject', continue
label=_('Subject'), yield arg
),
Str('issuer',
label=_('Issuer'),
),
Str('valid_not_before',
label=_('Not Before'),
),
Str('valid_not_after',
label=_('Not After'),
),
Str('md5_fingerprint',
label=_('Fingerprint (MD5)'),
),
Str('sha1_fingerprint',
label=_('Fingerprint (SHA1)'),
),
Str('serial_number',
label=_('Serial number'),
),
Str('serial_number_hex',
label=_('Serial number (hex)'),
),
)
has_output = ( def execute(self, csr, all=False, raw=False, **kw):
output.Output('result',
type=dict,
doc=_('Dictionary mapping variable name to value'),
),
)
def execute(self, csr, **kw):
ca_enabled_check() ca_enabled_check()
ldap = self.api.Backend.ldap2 ldap = self.api.Backend.ldap2
@ -512,12 +573,9 @@ class cert_request(VirtualCommand):
# Request the certificate # Request the certificate
result = self.Backend.ra.request_certificate( result = self.Backend.ra.request_certificate(
csr, profile_id, ca_id, request_type=request_type) csr, profile_id, ca_id, request_type=request_type)
cert = x509.load_certificate(result['certificate']) if not raw:
result['issuer'] = unicode(cert.issuer) self.obj._parse(result)
result['valid_not_before'] = unicode(cert.valid_not_before_str) result['request_id'] = int(result['request_id'])
result['valid_not_after'] = unicode(cert.valid_not_after_str)
result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
# Success? Then add it to the principal's entry # Success? Then add it to the principal's entry
# (unless the profile tells us not to) # (unless the profile tells us not to)
@ -534,91 +592,72 @@ class cert_request(VirtualCommand):
api.Command['user_mod'](principal_name, **kwargs) api.Command['user_mod'](principal_name, **kwargs)
return dict( return dict(
result=result result=result,
value=pkey_to_value(int(result['request_id']), kw),
) )
@register() @register()
class cert_status(VirtualCommand): class cert_status(Retrieve, BaseCertMethod, VirtualCommand):
__doc__ = _('Check the status of a certificate signing request.') __doc__ = _('Check the status of a certificate signing request.')
takes_args = ( obj_name = 'certreq'
Str('request_id', attr_name = 'status'
label=_('Request id'),
flags=['no_create', 'no_update', 'no_search'],
),
)
has_output_params = (
Str('cert_request_status',
label=_('Request status'),
),
takes_args[0],
)
operation = "certificate status" operation = "certificate status"
def get_options(self):
for option in super(cert_status, self).get_options():
if option.name == 'cacn':
continue
yield option
def execute(self, request_id, **kw): def execute(self, request_id, **kw):
ca_enabled_check() ca_enabled_check()
self.check_access() self.check_access()
return dict( return dict(
result=self.Backend.ra.check_request_status(request_id) result=self.Backend.ra.check_request_status(str(request_id)),
value=pkey_to_value(request_id, kw),
) )
_serial_number = Str('serial_number',
validate_serial_number,
label=_('Serial number'),
doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'),
normalizer=normalize_serial_number,
)
@register() @register()
class cert_show(VirtualCommand): class cert(BaseCertObject):
__doc__ = _('Retrieve an existing certificate.') takes_params = BaseCertObject.takes_params + (
Str(
takes_args = _serial_number 'status',
label=_('Status'),
has_output_params = ( flags={'no_create', 'no_update', 'no_search'},
Str('certificate',
label=_('Certificate'),
), ),
Str('subject', Flag(
label=_('Subject'), 'revoked',
label=_('Revoked'),
flags={'no_create', 'no_update', 'no_search'},
), ),
Str('issuer', Int(
label=_('Issuer'), 'revocation_reason',
),
Str('valid_not_before',
label=_('Not Before'),
),
Str('valid_not_after',
label=_('Not After'),
),
Str('md5_fingerprint',
label=_('Fingerprint (MD5)'),
),
Str('sha1_fingerprint',
label=_('Fingerprint (SHA1)'),
),
Str('revocation_reason',
label=_('Revocation reason'), label=_('Revocation reason'),
doc=_('Reason for revoking the certificate (0-10)'),
minvalue=0,
maxvalue=10,
flags={'no_create', 'no_update'},
), ),
Str('serial_number_hex',
label=_('Serial number (hex)'),
),
_serial_number,
) )
def get_params(self):
for param in super(cert, self).get_params():
if param.name == 'serial_number':
param = param.clone(primary_key=True)
elif param.name == 'issuer':
param = param.clone(flags=param.flags - {'no_search'})
yield param
@register()
class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
__doc__ = _('Retrieve an existing certificate.')
takes_options = ( takes_options = (
Str('cacn?',
cli_name='ca',
query=True,
label=_('Issuing CA'),
doc=_('Name of issuing CA'),
autofill=False,
),
Str('out?', Str('out?',
label=_('Output filename'), label=_('Output filename'),
doc=_('File to store the certificate in.'), doc=_('File to store the certificate in.'),
@ -628,7 +667,7 @@ class cert_show(VirtualCommand):
operation="retrieve certificate" operation="retrieve certificate"
def execute(self, serial_number, **options): def execute(self, serial_number, all=False, raw=False, **options):
ca_enabled_check() ca_enabled_check()
hostname = None hostname = None
try: try:
@ -648,7 +687,7 @@ class cert_show(VirtualCommand):
# Dogtag lightweight CAs have shared serial number domain, so # Dogtag lightweight CAs have shared serial number domain, so
# we don't tell Dogtag the issuer (but we check the cert after). # we don't tell Dogtag the issuer (but we check the cert after).
# #
result=self.Backend.ra.get_certificate(serial_number) result = self.Backend.ra.get_certificate(str(serial_number))
cert = x509.load_certificate(result['certificate']) cert = x509.load_certificate(result['certificate'])
if issuer_dn is not None and DN(unicode(cert.issuer)) != DN(issuer_dn): if issuer_dn is not None and DN(unicode(cert.issuer)) != DN(issuer_dn):
@ -658,49 +697,38 @@ class cert_show(VirtualCommand):
"issued by CA '%(ca)s' not found") "issued by CA '%(ca)s' not found")
% dict(serial=serial_number, ca=options['cacn'])) % dict(serial=serial_number, ca=options['cacn']))
result['subject'] = unicode(cert.subject) if not raw:
result['issuer'] = unicode(cert.issuer) result['certificate'] = result['certificate'].replace('\r\n', '')
result['valid_not_before'] = unicode(cert.valid_not_before_str) self.obj._parse(result)
result['valid_not_after'] = unicode(cert.valid_not_after_str) result['revoked'] = ('revocation_reason' in result)
result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
if hostname: if hostname:
# If we have a hostname we want to verify that the subject # If we have a hostname we want to verify that the subject
# of the certificate matches it, otherwise raise an error # of the certificate matches it, otherwise raise an error
if hostname != cert.subject.common_name: #pylint: disable=E1101 if hostname != cert.subject.common_name: #pylint: disable=E1101
raise acierr raise acierr
return dict(result=result) return dict(result=result, value=pkey_to_value(serial_number, options))
@register() @register()
class cert_revoke(VirtualCommand): class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand):
__doc__ = _('Revoke a certificate.') __doc__ = _('Revoke a certificate.')
takes_args = _serial_number
has_output_params = (
Flag('revoked',
label=_('Revoked'),
),
)
operation = "revoke certificate" operation = "revoke certificate"
def get_options(self):
# FIXME: The default is 0. Is this really an Int param? # FIXME: The default is 0. Is this really an Int param?
takes_options = ( yield self.obj.params['revocation_reason'].clone(
Int('revocation_reason',
label=_('Reason'),
doc=_('Reason for revoking the certificate (0-10). Type '
'"ipa help cert" for revocation reason details. '),
minvalue=0,
maxvalue=10,
default=0, default=0,
autofill=True autofill=True,
),
) )
for option in super(cert_revoke, self).get_options():
if option.name == 'cacn':
continue
yield option
def execute(self, serial_number, **kw): def execute(self, serial_number, **kw):
ca_enabled_check() ca_enabled_check()
hostname = None hostname = None
@ -719,17 +747,15 @@ class cert_revoke(VirtualCommand):
raise errors.CertificateOperationError(error=_('7 is not a valid revocation reason')) raise errors.CertificateOperationError(error=_('7 is not a valid revocation reason'))
return dict( return dict(
result=self.Backend.ra.revoke_certificate( result=self.Backend.ra.revoke_certificate(
serial_number, revocation_reason=revocation_reason) str(serial_number), revocation_reason=revocation_reason)
) )
@register() @register()
class cert_remove_hold(VirtualCommand): class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand):
__doc__ = _('Take a revoked certificate off hold.') __doc__ = _('Take a revoked certificate off hold.')
takes_args = _serial_number
has_output_params = ( has_output_params = (
Flag('unrevoked', Flag('unrevoked',
label=_('Unrevoked'), label=_('Unrevoked'),
@ -740,17 +766,23 @@ class cert_remove_hold(VirtualCommand):
) )
operation = "certificate remove hold" operation = "certificate remove hold"
def get_options(self):
for option in super(cert_remove_hold, self).get_options():
if option.name == 'cacn':
continue
yield option
def execute(self, serial_number, **kw): def execute(self, serial_number, **kw):
ca_enabled_check() ca_enabled_check()
self.check_access() self.check_access()
return dict( return dict(
result=self.Backend.ra.take_certificate_off_hold(serial_number) result=self.Backend.ra.take_certificate_off_hold(
str(serial_number))
) )
@register() @register()
class cert_find(Command): class cert_find(Search, BaseCertMethod):
__doc__ = _('Search for existing certificates.') __doc__ = _('Search for existing certificates.')
takes_options = ( takes_options = (
@ -759,26 +791,6 @@ class cert_find(Command):
doc=_('Subject'), doc=_('Subject'),
autofill=False, autofill=False,
), ),
Str('cacn?',
cli_name='ca',
query=True,
label=_('Issuing CA'),
doc=_('Name of issuing CA'),
autofill=False,
),
Str('issuer?',
label=_('Issuer'),
doc=_('Issuer DN'),
autofill=False,
),
Int('revocation_reason?',
label=_('Reason'),
doc=_('Reason for revoking the certificate (0-10). Type '
'"ipa help cert" for revocation reason details.'),
minvalue=0,
maxvalue=10,
autofill=False,
),
Int('min_serial_number?', Int('min_serial_number?',
doc=_("minimum serial number"), doc=_("minimum serial number"),
autofill=False, autofill=False,
@ -795,60 +807,55 @@ class cert_find(Command):
doc=_('match the common name exactly'), doc=_('match the common name exactly'),
autofill=False, autofill=False,
), ),
Str('validnotafter_from?', validate_pkidate, DateTime('validnotafter_from?',
doc=_('Valid not after from this date (YYYY-mm-dd)'), doc=_('Valid not after from this date (YYYY-mm-dd)'),
normalizer=normalize_pkidate,
autofill=False, autofill=False,
), ),
Str('validnotafter_to?', validate_pkidate, DateTime('validnotafter_to?',
doc=_('Valid not after to this date (YYYY-mm-dd)'), doc=_('Valid not after to this date (YYYY-mm-dd)'),
normalizer=normalize_pkidate,
autofill=False, autofill=False,
), ),
Str('validnotbefore_from?', validate_pkidate, DateTime('validnotbefore_from?',
doc=_('Valid not before from this date (YYYY-mm-dd)'), doc=_('Valid not before from this date (YYYY-mm-dd)'),
normalizer=normalize_pkidate,
autofill=False, autofill=False,
), ),
Str('validnotbefore_to?', validate_pkidate, DateTime('validnotbefore_to?',
doc=_('Valid not before to this date (YYYY-mm-dd)'), doc=_('Valid not before to this date (YYYY-mm-dd)'),
normalizer=normalize_pkidate,
autofill=False, autofill=False,
), ),
Str('issuedon_from?', validate_pkidate, DateTime('issuedon_from?',
doc=_('Issued on from this date (YYYY-mm-dd)'), doc=_('Issued on from this date (YYYY-mm-dd)'),
normalizer=normalize_pkidate,
autofill=False, autofill=False,
), ),
Str('issuedon_to?', validate_pkidate, DateTime('issuedon_to?',
doc=_('Issued on to this date (YYYY-mm-dd)'), doc=_('Issued on to this date (YYYY-mm-dd)'),
normalizer=normalize_pkidate,
autofill=False, autofill=False,
), ),
Str('revokedon_from?', validate_pkidate, DateTime('revokedon_from?',
doc=_('Revoked on from this date (YYYY-mm-dd)'), doc=_('Revoked on from this date (YYYY-mm-dd)'),
normalizer=normalize_pkidate,
autofill=False, autofill=False,
), ),
Str('revokedon_to?', validate_pkidate, DateTime('revokedon_to?',
doc=_('Revoked on to this date (YYYY-mm-dd)'), doc=_('Revoked on to this date (YYYY-mm-dd)'),
normalizer=normalize_pkidate,
autofill=False, autofill=False,
), ),
Flag('pkey_only?',
label=_("Primary key only"),
doc=_("Results should contain primary key attribute only "
"(\"certificate\")"),
),
Int('sizelimit?', Int('sizelimit?',
label=_('Size Limit'), label=_("Size Limit"),
doc=_('Maximum number of certs returned'), doc=_("Maximum number of entries returned (0 is unlimited)"),
flags=['no_display'],
minvalue=0, minvalue=0,
default=100,
),
)
has_output = output.standard_list_of_entries
has_output_params = (
Str('serial_number_hex',
label=_('Serial number (hex)'),
),
Str('serial_number',
label=_('Serial number'),
),
Str('status',
label=_('Status'),
),
Str('subject',
label=_('Subject'),
), ),
) )
@ -856,7 +863,8 @@ class cert_find(Command):
'%(count)d certificate matched', '%(count)d certificates matched', 0 '%(count)d certificate matched', '%(count)d certificates matched', 0
) )
def execute(self, **options): def execute(self, criteria=None, all=False, raw=False, pkey_only=False,
sizelimit=None, **options):
ca_enabled_check() ca_enabled_check()
if 'cacn' in options: if 'cacn' in options:
@ -870,8 +878,48 @@ class cert_find(Command):
else: else:
options['issuer'] = ca_sdn options['issuer'] = ca_sdn
if criteria is not None:
return dict(result=[], count=0, truncated=False)
obj_seq = []
ra_options = {}
for name, value in options.items():
if isinstance(value, datetime.datetime):
value = value.strftime(PKIDATE_FORMAT)
ra_options[name] = value
if sizelimit is not None and sizelimit != 0:
ra_options['sizelimit'] = sizelimit
for ra_obj in self.Backend.ra.find(ra_options):
obj = {}
if all:
ra_obj.update(
self.Backend.ra.get_certificate(
str(ra_obj['serial_number'])))
obj_seq.append(obj)
obj.update(ra_obj)
result = []
for obj in obj_seq:
if not pkey_only:
if not raw:
if 'certificate' in obj:
obj['certificate'] = (
obj['certificate'].replace('\r\n', ''))
self.obj._parse(obj)
obj['subject'] = DN(obj['subject'])
obj['issuer'] = DN(obj['issuer'])
obj['revoked'] = (
obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED'))
else:
serial_number = obj['serial_number']
obj.clear()
obj['serial_number'] = serial_number
result.append(obj)
ret = dict( ret = dict(
result=self.Backend.ra.find(options) result=result
) )
ret['count'] = len(ret['result']) ret['count'] = len(ret['result'])
ret['truncated'] = False ret['truncated'] = False