mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-22 22:33:16 -06:00
b75d06e189
A number of doc strings were not localized, wrap them in _(). Some messages were not localized, wrap them in _() Fix a couple of failing tests: The method name in RPC should not be unicode. The doc attribute must use the .msg attribute for comparison. Also clean up imports of _() The import should come from ipalib or ipalib.text, not ugettext from request.
455 lines
15 KiB
Python
455 lines
15 KiB
Python
# Authors:
|
|
# Andrew Wnuk <awnuk@redhat.com>
|
|
# Jason Gerard DeRose <jderose@redhat.com>
|
|
# John Dennis <jdennis@redhat.com>
|
|
#
|
|
# Copyright (C) 2009 Red Hat
|
|
# see file 'COPYING' for use and warranty information
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License as
|
|
# published by the Free Software Foundation; version 2 only
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
|
|
"""
|
|
Command plugins for IPA-RA certificate operations.
|
|
"""
|
|
|
|
from ipalib import api, SkipPluginModule
|
|
if api.env.enable_ra is not True:
|
|
# In this case, abort loading this plugin module...
|
|
raise SkipPluginModule(reason='env.enable_ra is not True')
|
|
from ipalib import Command, Str, Int, Bytes, Flag, File
|
|
from ipalib import errors
|
|
from ipalib import pkcs10
|
|
from ipalib import x509
|
|
from ipalib.plugins.virtual import *
|
|
from ipalib.plugins.service import split_principal
|
|
import base64
|
|
from pyasn1.error import PyAsn1Error
|
|
import logging
|
|
import traceback
|
|
from ipalib.text import _
|
|
from ipalib.request import context
|
|
from ipalib.output import Output
|
|
|
|
def get_serial(certificate):
|
|
"""
|
|
Given a certificate, return the serial number in that cert
|
|
as a Python long object.
|
|
|
|
In theory there should be only one cert per object so even if we get
|
|
passed in a list/tuple only return the first one.
|
|
"""
|
|
if type(certificate) in (list, tuple):
|
|
certificate = certificate[0]
|
|
try:
|
|
certificate = base64.b64decode(certificate)
|
|
except Exception, e:
|
|
pass
|
|
try:
|
|
|
|
serial = x509.get_serial_number(certificate, x509.DER)
|
|
except PyAsn1Error:
|
|
raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
|
|
|
|
return serial
|
|
|
|
def get_subject(certificate):
|
|
"""
|
|
Given a certificate, return the subject
|
|
|
|
In theory there should be only one cert per object so even if we get
|
|
passed in a list/tuple only return the first one.
|
|
"""
|
|
if type(certificate) in (list, tuple):
|
|
certificate = certificate[0]
|
|
try:
|
|
certificate = base64.b64decode(certificate)
|
|
except Exception, e:
|
|
pass
|
|
try:
|
|
sub = list(x509.get_subject_components(certificate, type=x509.DER))
|
|
sub.reverse()
|
|
except PyAsn1Error:
|
|
raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
|
|
|
|
subject = ""
|
|
for s in sub:
|
|
subject = subject + "%s=%s," % (s[0], s[1])
|
|
|
|
return subject[:-1]
|
|
|
|
def get_csr_hostname(csr):
|
|
"""
|
|
Return the value of CN in the subject of the request
|
|
"""
|
|
try:
|
|
request = pkcs10.load_certificate_request(csr)
|
|
sub = request.get_subject().get_components()
|
|
for s in sub:
|
|
if s[0].lower() == "cn":
|
|
return s[1]
|
|
except PyAsn1Error:
|
|
# The ASN.1 decoding errors tend to be long and involved and the
|
|
# last bit is generally not interesting. We need the whole traceback.
|
|
logging.error('Unable to decode CSR\n%s', traceback.format_exc())
|
|
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
|
|
|
|
return None
|
|
|
|
def get_subjectaltname(csr):
|
|
"""
|
|
Return the value of the subject alt name, if any
|
|
"""
|
|
try:
|
|
request = pkcs10.load_certificate_request(csr)
|
|
except PyAsn1Error:
|
|
# The ASN.1 decoding errors tend to be long and involved and the
|
|
# last bit is generally not interesting. We need the whole traceback.
|
|
logging.error('Unable to decode CSR\n%s', traceback.format_exc())
|
|
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
|
|
return request.get_subjectaltname()
|
|
|
|
def validate_csr(ugettext, csr):
|
|
"""
|
|
Ensure the CSR is base64-encoded and can be decoded by our PKCS#10
|
|
parser.
|
|
"""
|
|
try:
|
|
request = pkcs10.load_certificate_request(csr)
|
|
|
|
# Explicitly request the attributes. This fires off additional
|
|
# decoding to get things like the subjectAltName.
|
|
attrs = request.get_attributes()
|
|
except TypeError, e:
|
|
raise errors.Base64DecodeError(reason=str(e))
|
|
except PyAsn1Error:
|
|
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
|
|
except Exception, e:
|
|
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e))
|
|
|
|
def normalize_csr(csr):
|
|
"""
|
|
Strip any leading and trailing cruft around the BEGIN/END block
|
|
"""
|
|
end_len = 37
|
|
s = csr.find('-----BEGIN NEW CERTIFICATE REQUEST-----')
|
|
if s == -1:
|
|
s = csr.find('-----BEGIN CERTIFICATE REQUEST-----')
|
|
e = csr.find('-----END NEW CERTIFICATE REQUEST-----')
|
|
if e == -1:
|
|
e = csr.find('-----END CERTIFICATE REQUEST-----')
|
|
if e != -1:
|
|
end_len = 33
|
|
|
|
if s > -1 and e > -1:
|
|
# We're normalizing here, not validating
|
|
csr = csr[s:e+end_len]
|
|
|
|
return csr
|
|
|
|
class cert_request(VirtualCommand):
|
|
"""
|
|
Submit a certificate signing request.
|
|
"""
|
|
|
|
takes_args = (
|
|
File('csr', validate_csr,
|
|
cli_name='csr_file',
|
|
normalizer=normalize_csr,
|
|
),
|
|
)
|
|
operation="request certificate"
|
|
|
|
takes_options = (
|
|
Str('principal',
|
|
label=_('Principal'),
|
|
doc=_('Service principal for this certificate (e.g. HTTP/test.example.com)'),
|
|
),
|
|
Str('request_type',
|
|
default=u'pkcs10',
|
|
autofill=True,
|
|
),
|
|
Flag('add',
|
|
doc=_("automatically add the principal if it doesn't exist"),
|
|
default=False,
|
|
autofill=True
|
|
),
|
|
Str('certificate?',
|
|
label=_('Certificate'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
Str('subject?',
|
|
label=_('Subject'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
Str('serial_number?',
|
|
label=_('Serial number'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
)
|
|
|
|
has_output = (
|
|
Output('result',
|
|
type=dict,
|
|
doc=_('Dictionary mapping variable name to value'),
|
|
),
|
|
)
|
|
|
|
def execute(self, csr, **kw):
|
|
ldap = self.api.Backend.ldap2
|
|
principal = kw.get('principal')
|
|
add = kw.get('add')
|
|
del kw['principal']
|
|
del kw['add']
|
|
service = None
|
|
|
|
"""
|
|
Access control is partially handled by the ACI titled
|
|
'Hosts can modify service userCertificate'. This is for the case
|
|
where a machine binds using a host/ prinicpal. It can only do the
|
|
request if the target hostname is in the managedBy attribute which
|
|
is managed using the add/del member commands.
|
|
|
|
Binding with a user principal one needs to be in the request_certs
|
|
taskgroup (directly or indirectly via role membership).
|
|
"""
|
|
|
|
bind_principal = getattr(context, 'principal')
|
|
# Can this user request certs?
|
|
if not bind_principal.startswith('host/'):
|
|
self.check_access()
|
|
|
|
# FIXME: add support for subject alt name
|
|
|
|
# Ensure that the hostname in the CSR matches the principal
|
|
subject_host = get_csr_hostname(csr)
|
|
(servicename, hostname, realm) = split_principal(principal)
|
|
if subject_host.lower() != hostname.lower():
|
|
raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname))
|
|
|
|
dn = None
|
|
service = None
|
|
# See if the service exists and punt if it doesn't and we aren't
|
|
# going to add it
|
|
try:
|
|
if not principal.startswith('host/'):
|
|
service = api.Command['service_show'](principal, all=True, raw=True)['result']
|
|
dn = service['dn']
|
|
else:
|
|
realm = principal.find('@')
|
|
if realm == -1:
|
|
realm = len(principal)
|
|
hostname = principal[5:realm]
|
|
|
|
service = api.Command['host_show'](hostname, all=True, raw=True)['result']
|
|
dn = service['dn']
|
|
except errors.NotFound, e:
|
|
if not add:
|
|
raise errors.NotFound(reason="The service principal for this request doesn't exist.")
|
|
try:
|
|
service = api.Command['service_add'](principal, **{})['result']
|
|
dn = service['dn']
|
|
except errors.ACIError:
|
|
raise errors.ACIError(info='You need to be a member of the serviceadmin role to add services')
|
|
|
|
# We got this far so the service entry exists, can we write it?
|
|
if not ldap.can_write(dn, "usercertificate"):
|
|
raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn)
|
|
|
|
# Validate the subject alt name, if any
|
|
subjectaltname = get_subjectaltname(csr)
|
|
if subjectaltname is not None:
|
|
for name in subjectaltname:
|
|
try:
|
|
hostentry = api.Command['host_show'](name, all=True, raw=True)['result']
|
|
hostdn = hostentry['dn']
|
|
except errors.NotFound:
|
|
# We don't want to issue any certificates referencing
|
|
# machines we don't know about. Nothing is stored in this
|
|
# host record related to this certificate.
|
|
raise errors.NotFound(reason='no host record for subject alt name %s in certificate request' % name)
|
|
authprincipal = getattr(context, 'principal')
|
|
if authprincipal.startswith("host/"):
|
|
if not hostdn in service.get('managedby', []):
|
|
raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name)
|
|
|
|
if 'usercertificate' in service:
|
|
serial = get_serial(base64.b64encode(service['usercertificate'][0]))
|
|
# revoke the certificate and remove it from the service
|
|
# entry before proceeding. First we retrieve the certificate to
|
|
# see if it is already revoked, if not then we revoke it.
|
|
try:
|
|
result = api.Command['cert_get'](unicode(serial))['result']
|
|
if 'revocation_reason' not in result:
|
|
try:
|
|
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
|
|
except errors.NotImplementedError:
|
|
# some CA's might not implement revoke
|
|
pass
|
|
except errors.NotImplementedError:
|
|
# some CA's might not implement get
|
|
pass
|
|
api.Command['service_mod'](principal, usercertificate=None)
|
|
|
|
# Request the certificate
|
|
result = self.Backend.ra.request_certificate(csr, **kw)
|
|
|
|
# Success? Then add it to the service entry.
|
|
if 'certificate' in result:
|
|
if not principal.startswith('host/'):
|
|
skw = {"usercertificate": str(result.get('certificate'))}
|
|
api.Command['service_mod'](principal, **skw)
|
|
else:
|
|
realm = principal.find('@')
|
|
if realm == -1:
|
|
realm = len(principal)
|
|
hostname = principal[5:realm]
|
|
skw = {"usercertificate": str(result.get('certificate'))}
|
|
api.Command['host_mod'](hostname, **skw)
|
|
|
|
return dict(
|
|
result=result
|
|
)
|
|
|
|
api.register(cert_request)
|
|
|
|
|
|
class cert_status(VirtualCommand):
|
|
"""
|
|
Check status of a certificate signing request.
|
|
"""
|
|
|
|
takes_args = (
|
|
Str('request_id',
|
|
label=_('Request id'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
)
|
|
takes_options = (
|
|
Str('cert_request_status?',
|
|
label=_('Request status'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
)
|
|
operation = "certificate status"
|
|
|
|
|
|
def execute(self, request_id, **kw):
|
|
self.check_access()
|
|
return dict(
|
|
result=self.Backend.ra.check_request_status(request_id)
|
|
)
|
|
|
|
api.register(cert_status)
|
|
|
|
|
|
_serial_number = Str('serial_number',
|
|
label=_('Serial number'),
|
|
doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'),
|
|
)
|
|
|
|
class cert_get(VirtualCommand):
|
|
"""
|
|
Retrieve an existing certificate.
|
|
"""
|
|
|
|
takes_args = _serial_number
|
|
|
|
takes_options = (
|
|
Str('certificate?',
|
|
label=_('Certificate'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
Str('subject?',
|
|
label=_('Subject'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
Str('revocation_reason?',
|
|
label=_('Revocation reason'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
)
|
|
|
|
operation="retrieve certificate"
|
|
|
|
def execute(self, serial_number):
|
|
self.check_access()
|
|
result=self.Backend.ra.get_certificate(serial_number)
|
|
result['subject'] = get_subject(result['certificate'])
|
|
return dict(result=result)
|
|
|
|
api.register(cert_get)
|
|
|
|
|
|
class cert_revoke(VirtualCommand):
|
|
"""
|
|
Revoke a certificate.
|
|
"""
|
|
|
|
takes_args = _serial_number
|
|
|
|
takes_options = (
|
|
Flag('revoked?',
|
|
label=_('Revoked'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
)
|
|
operation = "revoke certificate"
|
|
|
|
# FIXME: The default is 0. Is this really an Int param?
|
|
takes_options = (
|
|
Int('revocation_reason?',
|
|
label=_('Reason'),
|
|
doc=_('Reason for revoking the certificate (0-10)'),
|
|
minvalue=0,
|
|
maxvalue=10,
|
|
default=0,
|
|
),
|
|
)
|
|
|
|
def execute(self, serial_number, **kw):
|
|
self.check_access()
|
|
return dict(
|
|
result=self.Backend.ra.revoke_certificate(serial_number, **kw)
|
|
)
|
|
|
|
api.register(cert_revoke)
|
|
|
|
|
|
class cert_remove_hold(VirtualCommand):
|
|
"""
|
|
Take a revoked certificate off hold.
|
|
"""
|
|
|
|
takes_args = _serial_number
|
|
|
|
takes_options = (
|
|
Flag('unrevoked?',
|
|
label=_('Unrevoked'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
Str('error_string?',
|
|
label=_('Error'),
|
|
flags=['no_create', 'no_update', 'no_search'],
|
|
),
|
|
)
|
|
operation = "certificate remove hold"
|
|
|
|
def execute(self, serial_number, **kw):
|
|
self.check_access()
|
|
return dict(
|
|
result=self.Backend.ra.take_certificate_off_hold(serial_number)
|
|
)
|
|
|
|
api.register(cert_remove_hold)
|