mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-16 03:11:57 -06:00
321 lines
11 KiB
Python
321 lines
11 KiB
Python
# Authors:
|
|
# Andrew Wnuk <awnuk@redhat.com>
|
|
# Jason Gerard DeRose <jderose@redhat.com>
|
|
# John Dennis <jdennis@redhat.com>
|
|
#
|
|
# Copyright (C) 2009 Red Hat
|
|
# see file 'COPYING' for use and warranty information
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License as
|
|
# published by the Free Software Foundation; version 2 only
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
|
|
"""
|
|
Command plugins for IPA-RA certificate operations.
|
|
"""
|
|
|
|
from ipalib import api, SkipPluginModule
|
|
if api.env.enable_ra is not True:
|
|
# In this case, abort loading this plugin module...
|
|
raise SkipPluginModule(reason='env.enable_ra is not True')
|
|
from ipalib import Command, Str, Int, Bytes, Flag, File
|
|
from ipalib import errors
|
|
from ipalib import pkcs10
|
|
from ipalib import x509
|
|
from ipalib.plugins.virtual import *
|
|
from ipalib.plugins.service import split_principal
|
|
import base64
|
|
from ipalib.request import context
|
|
from ipapython import dnsclient
|
|
from pyasn1.error import PyAsn1Error
|
|
import logging
|
|
import traceback
|
|
from ipalib.request import ugettext as _
|
|
|
|
def get_serial(certificate):
|
|
"""
|
|
Given a certificate, return the serial number in that cert
|
|
as a Python long object.
|
|
|
|
In theory there should be only one cert per object so even if we get
|
|
passed in a list/tuple only return the first one.
|
|
"""
|
|
if type(certificate) in (list, tuple):
|
|
certificate = certificate[0]
|
|
try:
|
|
serial = x509.get_serial_number(certificate)
|
|
except PyAsn1Error:
|
|
raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
|
|
|
|
return serial
|
|
|
|
def get_csr_hostname(csr):
|
|
"""
|
|
Return the value of CN in the subject of the request
|
|
"""
|
|
try:
|
|
request = pkcs10.load_certificate_request(csr)
|
|
sub = request.get_subject().get_components()
|
|
for s in sub:
|
|
if s[0].lower() == "cn":
|
|
return s[1]
|
|
except PyAsn1Error:
|
|
# The ASN.1 decoding errors tend to be long and involved and the
|
|
# last bit is generally not interesting. We need the whole traceback.
|
|
logging.error('Unable to decode CSR\n%s', traceback.format_exc())
|
|
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
|
|
|
|
return None
|
|
|
|
def get_subjectaltname(csr):
|
|
"""
|
|
Return the value of the subject alt name, if any
|
|
"""
|
|
try:
|
|
request = pkcs10.load_certificate_request(csr)
|
|
except PyAsn1Error:
|
|
# The ASN.1 decoding errors tend to be long and involved and the
|
|
# last bit is generally not interesting. We need the whole traceback.
|
|
logging.error('Unable to decode CSR\n%s', traceback.format_exc())
|
|
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
|
|
return request.get_subjectaltname()
|
|
|
|
def validate_csr(ugettext, csr):
|
|
"""
|
|
Ensure the CSR is base64-encoded and can be decoded by our PKCS#10
|
|
parser.
|
|
"""
|
|
try:
|
|
request = pkcs10.load_certificate_request(csr)
|
|
|
|
# Explicitly request the attributes. This fires off additional
|
|
# decoding to get things like the subjectAltName.
|
|
attrs = request.get_attributes()
|
|
except TypeError, e:
|
|
raise errors.Base64DecodeError(reason=str(e))
|
|
except PyAsn1Error:
|
|
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
|
|
except Exception, e:
|
|
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e))
|
|
|
|
|
|
class cert_request(VirtualCommand):
|
|
"""
|
|
Submit a certificate signing request.
|
|
"""
|
|
|
|
takes_args = (
|
|
File('csr', validate_csr,
|
|
cli_name='csr_file',
|
|
),
|
|
)
|
|
operation="request certificate"
|
|
|
|
takes_options = (
|
|
Str('principal',
|
|
doc="service principal for this certificate (e.g. HTTP/test.example.com)",
|
|
),
|
|
Str('request_type',
|
|
default=u'pkcs10',
|
|
autofill=True,
|
|
),
|
|
Flag('add',
|
|
doc="automatically add the principal if it doesn't exist",
|
|
default=False,
|
|
autofill=True
|
|
),
|
|
)
|
|
|
|
def execute(self, csr, **kw):
|
|
ldap = self.api.Backend.ldap2
|
|
principal = kw.get('principal')
|
|
add = kw.get('add')
|
|
del kw['principal']
|
|
del kw['add']
|
|
service = None
|
|
|
|
"""
|
|
Access control is partially handled by the ACI titled
|
|
'Hosts can modify service userCertificate'. This is for the case
|
|
where a machine binds using a host/ prinicpal. It can only do the
|
|
request if the target hostname is in the managedBy attribute which
|
|
is managed using the add/del member commands.
|
|
|
|
Binding with a user principal one needs to be in the request_certs
|
|
taskgroup (directly or indirectly via role membership).
|
|
"""
|
|
|
|
# Can this user request certs?
|
|
self.check_access()
|
|
|
|
# FIXME: add support for subject alt name
|
|
|
|
# Ensure that the hostname in the CSR matches the principal
|
|
subject_host = get_csr_hostname(csr)
|
|
(servicename, hostname, realm) = split_principal(principal)
|
|
if subject_host.lower() != hostname.lower():
|
|
raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname))
|
|
|
|
dn = None
|
|
service = None
|
|
# See if the service exists and punt if it doesn't and we aren't
|
|
# going to add it
|
|
try:
|
|
(dn, service) = api.Command['service_show'](principal, all=True, raw=True)
|
|
if 'usercertificate' in service:
|
|
# FIXME, what to do here? Do we revoke the old cert?
|
|
raise errors.CertificateOperationError(error=_('entry already has a certificate, serial number %s') % get_serial(base64.b64encode(service['usercertificate'][0])))
|
|
except errors.NotFound, e:
|
|
if not add:
|
|
raise errors.NotFound(reason="The service principal for this request doesn't exist.")
|
|
try:
|
|
(dn, service) = api.Command['service_add'](principal, **{})
|
|
except errors.ACIError:
|
|
raise errors.ACIError(info='You need to be a member of the serviceadmin role to add services')
|
|
|
|
# We got this far so the service entry exists, can we write it?
|
|
if not ldap.can_write(dn, "usercertificate"):
|
|
raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn)
|
|
|
|
# Validate the subject alt name, if any
|
|
subjectaltname = get_subjectaltname(csr)
|
|
if subjectaltname is not None:
|
|
for name in subjectaltname:
|
|
try:
|
|
(hostdn, hostentry) = api.Command['host_show'](name, all=True, raw=True)
|
|
except errors.NotFound:
|
|
# We don't want to issue any certificates referencing
|
|
# machines we don't know about. Nothing is stored in this
|
|
# host record related to this certificate.
|
|
raise errors.NotFound(reason='no host record for subject alt name %s in certificate request' % name)
|
|
authprincipal = getattr(context, 'principal')
|
|
if authprincipal.startswith("host/"):
|
|
if not hostdn in service.get('managedby', []):
|
|
raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name)
|
|
|
|
# Request the certificate
|
|
result = self.Backend.ra.request_certificate(csr, **kw)
|
|
|
|
# Success? Then add it to the service entry.
|
|
if result.get('status') == 0:
|
|
skw = {"usercertificate": str(result.get('certificate'))}
|
|
api.Command['service_mod'](principal, **skw)
|
|
|
|
return result
|
|
|
|
def output_for_cli(self, textui, result, *args, **kw):
|
|
if isinstance(result, dict) and len(result) > 0:
|
|
textui.print_entry(result, 0)
|
|
else:
|
|
textui.print_plain(_('Failed to submit a certificate request.'))
|
|
|
|
api.register(cert_request)
|
|
|
|
|
|
class cert_status(VirtualCommand):
|
|
"""
|
|
Check status of a certificate signing request.
|
|
"""
|
|
|
|
takes_args = ('request_id')
|
|
operation = "certificate status"
|
|
|
|
|
|
def execute(self, request_id, **kw):
|
|
self.check_access()
|
|
return self.Backend.ra.check_request_status(request_id)
|
|
|
|
def output_for_cli(self, textui, result, *args, **kw):
|
|
if isinstance(result, dict) and len(result) > 0:
|
|
textui.print_entry(result, 0)
|
|
else:
|
|
textui.print_plain(_('Failed to retrieve a request status.'))
|
|
|
|
api.register(cert_status)
|
|
|
|
|
|
class cert_get(VirtualCommand):
|
|
"""
|
|
Retrieve an existing certificate.
|
|
"""
|
|
|
|
takes_args = (Str('serial_number',
|
|
doc='serial number in decimal or if prefixed with 0x in hexadecimal'))
|
|
operation="retrieve certificate"
|
|
|
|
def execute(self, serial_number):
|
|
self.check_access()
|
|
return self.Backend.ra.get_certificate(serial_number)
|
|
|
|
def output_for_cli(self, textui, result, *args, **kw):
|
|
if isinstance(result, dict) and len(result) > 0:
|
|
textui.print_entry(result, 0)
|
|
else:
|
|
textui.print_plain(_('Failed to obtain a certificate.'))
|
|
|
|
api.register(cert_get)
|
|
|
|
|
|
class cert_revoke(VirtualCommand):
|
|
"""
|
|
Revoke a certificate.
|
|
"""
|
|
|
|
takes_args = (Str('serial_number',
|
|
doc='serial number in decimal or if prefixed with 0x in hexadecimal'))
|
|
operation = "revoke certificate"
|
|
|
|
# FIXME: The default is 0. Is this really an Int param?
|
|
takes_options = (
|
|
Int('revocation_reason?',
|
|
doc='Reason for revoking the certificate (0-10)',
|
|
minvalue=0,
|
|
maxvalue=10,
|
|
default=0,
|
|
),
|
|
)
|
|
|
|
|
|
def execute(self, serial_number, **kw):
|
|
self.check_access()
|
|
return self.Backend.ra.revoke_certificate(serial_number, **kw)
|
|
|
|
def output_for_cli(self, textui, result, *args, **kw):
|
|
if isinstance(result, dict) and len(result) > 0:
|
|
textui.print_entry(result, 0)
|
|
else:
|
|
textui.print_plain(_('Failed to revoke a certificate.'))
|
|
|
|
api.register(cert_revoke)
|
|
|
|
|
|
class cert_remove_hold(VirtualCommand):
|
|
"""
|
|
Take a revoked certificate off hold.
|
|
"""
|
|
|
|
takes_args = (Str('serial_number',
|
|
doc='serial number in decimal or if prefixed with 0x in hexadecimal'))
|
|
operation = "certificate remove hold"
|
|
|
|
def execute(self, serial_number, **kw):
|
|
self.check_access()
|
|
return self.Backend.ra.take_certificate_off_hold(serial_number)
|
|
|
|
def output_for_cli(self, textui, result, *args, **kw):
|
|
if isinstance(result, dict) and len(result) > 0:
|
|
textui.print_entry(result, 0)
|
|
else:
|
|
textui.print_plain(_('Failed to take a revoked certificate off hold.'))
|
|
|
|
api.register(cert_remove_hold)
|