Add options to write lightweight CA cert or chain to file

Administrators need a way to retrieve the certificate or certificate
chain of an IPA-managed lightweight CA.  Add params to the `ca'
object for carrying the CA certificate and chain (as multiple DER
values).  Add the `--chain' flag for including the chain in the
result (chain is also included with `--all').  Add the
`--certificate-out' option for writing the certificate to a file (or
the chain, if `--chain' was given).

Fixes: https://fedorahosted.org/freeipa/ticket/6178
Reviewed-By: Jan Cholasta <jcholast@redhat.com>
Reviewed-By: Tomas Krizek <tkrizek@redhat.com>
This commit is contained in:
Fraser Tweedale 2016-08-08 14:27:20 +10:00 committed by Jan Cholasta
parent cc5b88e5d4
commit 32b1743e5f
7 changed files with 172 additions and 16 deletions

View File

@ -445,10 +445,11 @@ option: Str('version?')
output: Output('count', type=[<type 'int'>])
output: Output('results', type=[<type 'list'>, <type 'tuple'>])
command: ca_add/1
args: 1,7,3
args: 1,8,3
arg: Str('cn', cli_name='name')
option: Str('addattr*', cli_name='addattr')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Flag('chain', autofill=True, default=False)
option: Str('description?', cli_name='desc')
option: DNParam('ipacasubjectdn', cli_name='subject')
option: Flag('raw', autofill=True, cli_name='raw', default=False)
@ -519,9 +520,10 @@ output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
command: ca_show/1
args: 1,4,3
args: 1,5,3
arg: Str('cn', cli_name='name')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Flag('chain', autofill=True, default=False)
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Flag('rights', autofill=True, default=False)
option: Str('version?')

View File

@ -73,8 +73,8 @@ define(IPA_DATA_VERSION, 20100614120000)
# #
########################################################
define(IPA_API_VERSION_MAJOR, 2)
define(IPA_API_VERSION_MINOR, 216)
# Last change: DNS: Support URI resource record type
define(IPA_API_VERSION_MINOR, 217)
# Last change: Add options to write lightweight CA cert or chain to file
########################################################

53
ipaclient/plugins/ca.py Normal file
View File

@ -0,0 +1,53 @@
#
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
#
import base64
from ipaclient.frontend import MethodOverride
from ipalib import util, x509, Str
from ipalib.plugable import Registry
from ipalib.text import _
register = Registry()
class WithCertOutArgs(MethodOverride):
takes_options = (
Str(
'certificate_out?',
doc=_('Write certificate (chain if --chain used) to file'),
include='cli',
cli_metavar='FILE',
),
)
def forward(self, *keys, **options):
filename = None
if 'certificate_out' in options:
filename = options.pop('certificate_out')
util.check_writable_file(filename)
result = super(WithCertOutArgs, self).forward(*keys, **options)
if filename:
def to_pem(x):
return x509.make_pem(x)
if options.get('chain', False):
ders = result['result']['certificate_chain']
data = '\n'.join(to_pem(base64.b64encode(der)) for der in ders)
else:
data = to_pem(result['result']['certificate'])
with open(filename, 'wb') as f:
f.write(data)
return result
@register(override=True, no_fail=True)
class ca_add(WithCertOutArgs):
pass
@register(override=True, no_fail=True)
class ca_show(WithCertOutArgs):
pass

View File

@ -2,14 +2,18 @@
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
#
from ipalib import api, errors, output, DNParam, Str
import base64
import six
from ipalib import api, errors, output, Bytes, DNParam, Flag, Str
from ipalib.constants import IPA_CA_CN
from ipalib.plugable import Registry
from ipaserver.plugins.baseldap import (
LDAPObject, LDAPSearch, LDAPCreate, LDAPDelete,
LDAPUpdate, LDAPRetrieve, LDAPQuery, pkey_to_value)
from ipaserver.plugins.cert import ca_enabled_check
from ipalib import _, ngettext
from ipalib import _, ngettext, x509
__doc__ = _("""
@ -100,6 +104,18 @@ class ca(LDAPObject):
doc=_('Issuer Distinguished Name'),
flags=['no_create', 'no_update'],
),
Bytes(
'certificate',
label=_("Certificate"),
doc=_("Base-64 encoded certificate."),
flags={'no_create', 'no_update', 'no_search'},
),
Bytes(
'certificate_chain*',
label=_("Certificate chain"),
doc=_("X.509 certificate chain"),
flags={'no_create', 'no_update', 'no_search'},
),
)
permission_filter_objectclasses = ['ipaca']
@ -145,6 +161,21 @@ class ca(LDAPObject):
}
def set_certificate_attrs(entry, options, always_include_cert=True):
ca_id = entry['ipacaid'][0]
full = options.get('all', False)
with api.Backend.ra_lightweight_ca as ca_api:
if always_include_cert or full:
der = ca_api.read_ca_cert(ca_id)
entry['certificate'] = six.text_type(base64.b64encode(der))
if options.get('chain', False) or full:
pkcs7_der = ca_api.read_ca_chain(ca_id)
pems = x509.pkcs7_to_pems(pkcs7_der, x509.DER)
ders = [x509.normalize_certificate(pem) for pem in pems]
entry['certificate_chain'] = ders
@register()
class ca_find(LDAPSearch):
__doc__ = _("Search for CAs.")
@ -154,16 +185,32 @@ class ca_find(LDAPSearch):
def execute(self, *keys, **options):
ca_enabled_check()
return super(ca_find, self).execute(*keys, **options)
result = super(ca_find, self).execute(*keys, **options)
for entry in result['result']:
set_certificate_attrs(entry, options, always_include_cert=False)
return result
_chain_flag = Flag(
'chain',
default=False,
doc=_('Include certificate chain in output'),
)
@register()
class ca_show(LDAPRetrieve):
__doc__ = _("Display the properties of a CA.")
def execute(self, *args, **kwargs):
takes_options = LDAPRetrieve.takes_options + (
_chain_flag,
)
def execute(self, *keys, **options):
ca_enabled_check()
return super(ca_show, self).execute(*args, **kwargs)
result = super(ca_show, self).execute(*keys, **options)
set_certificate_attrs(result['result'], options)
return result
@register()
@ -171,6 +218,10 @@ class ca_add(LDAPCreate):
__doc__ = _("Create a CA.")
msg_summary = _('Created CA "%(value)s"')
takes_options = LDAPCreate.takes_options + (
_chain_flag,
)
def pre_callback(self, ldap, dn, entry, entry_attrs, *keys, **options):
ca_enabled_check()
if not ldap.can_add(dn[1:]):
@ -203,6 +254,10 @@ class ca_add(LDAPCreate):
entry['ipacasubjectdn'] = [resp['dn']]
return dn
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
set_certificate_attrs(entry_attrs, options)
return dn
@register()
class ca_del(LDAPDelete):

View File

@ -2125,6 +2125,18 @@ class ra_lightweight_ca(RestClient):
except:
raise errors.RemoteRetrieveError(reason=_("Response from CA was not valid JSON"))
def read_ca_cert(self, ca_id):
_status, _resp_headers, resp_body = self._ssldo(
'GET', '{}/cert'.format(ca_id),
headers={'Accept': 'application/pkix-cert'})
return resp_body
def read_ca_chain(self, ca_id):
_status, _resp_headers, resp_body = self._ssldo(
'GET', '{}/chain'.format(ca_id),
headers={'Accept': 'application/pkcs7-mime'})
return resp_body
def disable_ca(self, ca_id):
self._ssldo(
'POST', ca_id + '/disable',

View File

@ -8,7 +8,13 @@ import six
from ipapython.dn import DN
from ipatests.test_xmlrpc.tracker.base import Tracker
from ipatests.util import assert_deepequal
from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_issuer, fuzzy_caid
from ipatests.test_xmlrpc.xmlrpc_test import (
fuzzy_issuer,
fuzzy_caid,
fuzzy_base64,
fuzzy_sequence_of,
fuzzy_bytes,
)
from ipatests.test_xmlrpc import objectclasses
@ -19,12 +25,21 @@ if six.PY3:
class CATracker(Tracker):
"""Implementation of a Tracker class for CA plugin."""
retrieve_keys = {
ldap_keys = {
'dn', 'cn', 'ipacaid', 'ipacasubjectdn', 'ipacaissuerdn', 'description'
}
retrieve_all_keys = {'objectclass'} | retrieve_keys
create_keys = retrieve_all_keys
update_keys = retrieve_keys - {'dn'}
cert_keys = {
'certificate',
}
cert_all_keys = {
'certificate_chain',
}
find_keys = ldap_keys
find_all_keys = {'objectclass'} | ldap_keys
retrieve_keys = ldap_keys | cert_keys
retrieve_all_keys = {'objectclass'} | retrieve_keys | cert_all_keys
create_keys = {'objectclass'} | retrieve_keys
update_keys = ldap_keys - {'dn'}
def __init__(self, name, subject, desc=u"Test generated CA",
default_version=None):
@ -59,6 +74,8 @@ class CATracker(Tracker):
ipacasubjectdn=[self.ipasubjectdn],
ipacaissuerdn=[fuzzy_issuer],
ipacaid=[fuzzy_caid],
certificate=fuzzy_base64,
certificate_chain=fuzzy_sequence_of(fuzzy_bytes),
objectclass=objectclasses.ca
)
self.exists = True
@ -102,9 +119,9 @@ class CATracker(Tracker):
def check_find(self, result, all=False, raw=False):
"""Check the plugin's `find` command result"""
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
expected = self.filter_attrs(self.find_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
expected = self.filter_attrs(self.find_keys)
assert_deepequal(dict(
count=1,

View File

@ -22,6 +22,7 @@ Base class for all XML-RPC tests
"""
from __future__ import print_function
import collections
import datetime
import inspect
@ -49,6 +50,20 @@ fuzzy_automember_dn = Fuzzy(
'^cn=%s,cn=automember rebuild membership,cn=tasks,cn=config$' % uuid_re
)
# base64-encoded value
fuzzy_base64 = Fuzzy('^[0-9A-Za-z/+]+={0,2}$')
def fuzzy_sequence_of(fuzzy):
"""Construct a Fuzzy for a Sequence of values matching the given Fuzzy."""
def test(xs):
if not isinstance(xs, collections.Sequence):
return False
else:
return all(fuzzy == x for x in xs)
return Fuzzy(test=test)
# Matches an automember task finish message
fuzzy_automember_message = Fuzzy(
'^Automember rebuild task finished\. Processed \(\d+\) entries\.$'
@ -109,6 +124,8 @@ fuzzy_dergeneralizedtime = Fuzzy(type=datetime.datetime)
# match any string
fuzzy_string = Fuzzy(type=six.string_types)
fuzzy_bytes = Fuzzy(type=bytes)
# case insensitive match of sets
def fuzzy_set_ci(s):
return Fuzzy(test=lambda other: set(x.lower() for x in other) == set(y.lower() for y in s))