freeipa/ipalib/plugins/cert.py
Rob Crittenden ab1667f3c1 Use pyasn1-based PKCS#10 and X509v3 parsers instead of pyOpenSSL.
The pyOpenSSL PKCS#10 parser doesn't support attributes so we can't identify
requests with subject alt names.

Subject alt names are only allowed if:
  - the host for the alt name exists in IPA
  - if binding as host principal, the host is in the services managedBy attr
2009-11-30 18:10:09 -07:00

315 lines
11 KiB
Python

# Authors:
# Andrew Wnuk <awnuk@redhat.com>
# Jason Gerard DeRose <jderose@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Command plugins for IPA-RA certificate operations.
"""
from ipalib import api, SkipPluginModule
if api.env.enable_ra is not True:
# In this case, abort loading this plugin module...
raise SkipPluginModule(reason='env.enable_ra is not True')
from ipalib import Command, Str, Int, Bytes, Flag, File
from ipalib import errors
from ipalib import pkcs10
from ipalib import x509
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
import base64
from ipalib.request import context
from ipapython import dnsclient
from pyasn1.error import PyAsn1Error
import logging
import traceback
def get_serial(certificate):
"""
Given a certificate, return the serial number in that cert
In theory there should be only one cert per object so even if we get
passed in a list/tuple only return the first one.
"""
if type(certificate) in (list, tuple):
certificate = certificate[0]
try:
serial = str(x509.get_serial_number(certificate))
except PyAsn1Error:
raise errors.GenericError(format='Unable to decode certificate in entry')
return serial
def get_csr_hostname(csr):
"""
Return the value of CN in the subject of the request
"""
try:
request = pkcs10.load_certificate_request(csr)
sub = request.get_subject().get_components()
for s in sub:
if s[0].lower() == "cn":
return s[1]
except PyAsn1Error:
# The ASN.1 decoding errors tend to be long and involved and the
# last bit is generally not interesting. We need the whole traceback.
logging.error('Unable to decode CSR\n%s', traceback.format_exc())
raise errors.GenericError(format='Failure decoding Certificate Signing Request')
return None
def get_subjectaltname(csr):
"""
Return the value of the subject alt name, if any
"""
try:
request = pkcs10.load_certificate_request(csr)
except PyAsn1Error:
# The ASN.1 decoding errors tend to be long and involved and the
# last bit is generally not interesting. We need the whole traceback.
logging.error('Unable to decode CSR\n%s', traceback.format_exc())
raise errors.GenericError(format='Failure decoding Certificate Signing Request')
return request.get_subjectaltname()
def validate_csr(ugettext, csr):
"""
Ensure the CSR is base64-encoded and can be decoded by our PKCS#10
parser.
"""
try:
request = pkcs10.load_certificate_request(csr)
# Explicitly request the attributes. This fires off additional
# decoding to get things like the subjectAltName.
attrs = request.get_attributes()
except TypeError, e:
raise errors.Base64DecodeError(reason=str(e))
except PyAsn1Error:
raise errors.GenericError(format='Failure decoding Certificate Signing Request')
except Exception, e:
raise errors.GenericError(format='Failure decoding Certificate Signing Request: %s' % str(e))
class cert_request(VirtualCommand):
"""
Submit a certificate signing request.
"""
takes_args = (
File('csr', validate_csr,
cli_name='csr_file',
),
)
operation="request certificate"
takes_options = (
Str('principal',
doc="service principal for this certificate (e.g. HTTP/test.example.com)",
),
Str('request_type',
default=u'pkcs10',
autofill=True,
),
Flag('add',
doc="automatically add the principal if it doesn't exist",
default=False,
autofill=True
),
)
def execute(self, csr, **kw):
ldap = self.api.Backend.ldap2
principal = kw.get('principal')
add = kw.get('add')
del kw['principal']
del kw['add']
service = None
"""
Access control is partially handled by the ACI titled
'Hosts can modify service userCertificate'. This is for the case
where a machine binds using a host/ prinicpal. It can only do the
request if the target hostname is in the managedBy attribute which
is managed using the add/del member commands.
Binding with a user principal one needs to be in the request_certs
taskgroup (directly or indirectly via role membership).
"""
# Can this user request certs?
self.check_access()
# FIXME: add support for subject alt name
# Ensure that the hostname in the CSR matches the principal
subject_host = get_csr_hostname(csr)
(servicename, hostname, realm) = split_principal(principal)
if subject_host.lower() != hostname.lower():
raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname))
dn = None
service = None
# See if the service exists and punt if it doesn't and we aren't
# going to add it
try:
(dn, service) = api.Command['service_show'](principal, all=True, raw=True)
if 'usercertificate' in service:
# FIXME, what to do here? Do we revoke the old cert?
raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(base64.b64encode(service['usercertificate'][0])))
except errors.NotFound, e:
if not add:
raise errors.NotFound(reason="The service principal for this request doesn't exist.")
try:
(dn, service) = api.Command['service_add'](principal, **{})
except errors.ACIError:
raise errors.ACIError(info='You need to be a member of the serviceadmin role to add services')
# We got this far so the service entry exists, can we write it?
if not ldap.can_write(dn, "usercertificate"):
raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn)
# Validate the subject alt name, if any
subjectaltname = get_subjectaltname(csr)
if subjectaltname is not None:
for name in subjectaltname:
try:
(hostdn, hostentry) = api.Command['host_show'](name, all=True, raw=True)
except errors.NotFound:
# We don't want to issue any certificates referencing
# machines we don't know about. Nothing is stored in this
# host record related to this certificate.
raise errors.NotFound(reason='no host record for subject alt name %s in certificate request' % name)
authprincipal = getattr(context, 'principal')
if authprincipal.startswith("host/"):
if not hostdn in service.get('managedby', []):
raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name)
# Request the certificate
result = self.Backend.ra.request_certificate(csr, **kw)
# Success? Then add it to the service entry.
if result.get('status') == 0:
skw = {"usercertificate": str(result.get('certificate'))}
api.Command['service_mod'](principal, **skw)
return result
def output_for_cli(self, textui, result, *args, **kw):
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain('Failed to submit a certificate request.')
api.register(cert_request)
class cert_status(VirtualCommand):
"""
Check status of a certificate signing request.
"""
takes_args = ('request_id')
operation = "certificate status"
def execute(self, request_id, **kw):
self.check_access()
return self.Backend.ra.check_request_status(request_id)
def output_for_cli(self, textui, result, *args, **kw):
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain('Failed to retrieve a request status.')
api.register(cert_status)
class cert_get(VirtualCommand):
"""
Retrieve an existing certificate.
"""
takes_args = ('serial_number')
operation="retrieve certificate"
def execute(self, serial_number):
self.check_access()
return self.Backend.ra.get_certificate(serial_number)
def output_for_cli(self, textui, result, *args, **kw):
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain('Failed to obtain a certificate.')
api.register(cert_get)
class cert_revoke(VirtualCommand):
"""
Revoke a certificate.
"""
takes_args = ('serial_number')
operation = "revoke certificate"
# FIXME: The default is 0. Is this really an Int param?
takes_options = (
Int('revocation_reason?',
doc='Reason for revoking the certificate (0-10)',
minvalue=0,
maxvalue=10,
default=0,
),
)
def execute(self, serial_number, **kw):
self.check_access()
return self.Backend.ra.revoke_certificate(serial_number, **kw)
def output_for_cli(self, textui, result, *args, **kw):
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain('Failed to revoke a certificate.')
api.register(cert_revoke)
class cert_remove_hold(VirtualCommand):
"""
Take a revoked certificate off hold.
"""
takes_args = ('serial_number')
operation = "certificate remove hold"
def execute(self, serial_number, **kw):
self.check_access()
return self.Backend.ra.take_certificate_off_hold(serial_number)
def output_for_cli(self, textui, result, *args, **kw):
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain('Failed to take a revoked certificate off hold.')
api.register(cert_remove_hold)