Accept an incoming certificate as either DER or base64 in the service plugin.

The plugin required a base64-encoded certificate and always decoded it
before processing. This doesn't work with the UI because the json module
decodes binary values already.

Try to detect if the incoming value is base64-encoded and decode if
necessary. Finally, try to pull the cert apart to validate it. This will
tell us for sure that the data is a certificate, regardless of the format
it came in as.

ticket 348
This commit is contained in:
Rob Crittenden
2010-10-08 13:15:03 -04:00
parent dccb386d57
commit d2a9ccf407
4 changed files with 86 additions and 17 deletions

View File

@@ -1204,7 +1204,7 @@ class CertificateError(ExecutionError):
errno = 4300
class CertificateOperationError(ExecutionError):
class CertificateOperationError(CertificateError):
"""
**4301** Raised when a certificate operation cannot be completed
@@ -1220,6 +1220,22 @@ class CertificateOperationError(ExecutionError):
errno = 4301
format = _('Certificate operation cannot be completed: %(error)s')
class CertificateFormatError(CertificateError):
"""
**4302** Raised when a certificate is badly formatted
For example:
>>> raise CertificateFormatError(error=u'improperly formated DER-encoded certificate')
Traceback (most recent call last):
...
CertificateFormatError: improperly formated DER-encoded certificate
"""
errno = 4302
format = _('Certificate format error: %(error)s')
class MutuallyExclusiveError(ExecutionError):
"""

View File

@@ -75,6 +75,7 @@ from ipalib import Str, Flag, Bytes
from ipalib.plugins.baseldap import *
from ipalib import x509
from ipalib import _, ngettext
from ipalib import util
from nss.error import NSPRError
@@ -130,10 +131,41 @@ def validate_certificate(ugettext, cert):
"""
For now just verify that it is properly base64-encoded.
"""
if util.isvalid_base64(cert):
try:
base64.b64decode(cert)
except Exception, e:
raise errors.Base64DecodeError(reason=str(e))
else:
# We'll assume this is DER data
pass
def normalize_certificate(cert):
"""
Incoming certificates should be DER-encoded.
Note that this can't be a normalizer on the Param because only unicode
variables are normalized.
"""
if util.isvalid_base64(cert):
try:
cert = base64.b64decode(cert)
except Exception, e:
raise errors.Base64DecodeError(reason=str(e))
# At this point we should have a certificate, either because the data
# was base64-encoded and now its not or it came in as DER format.
# Let's decode it and see. Fetching the serial number will pass the
# certificate through the NSS DER parser.
try:
base64.b64decode(cert)
except Exception, e:
raise errors.Base64DecodeError(reason=str(e))
serial = unicode(x509.get_serial_number(cert, x509.DER))
except NSPRError, nsprerr:
if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate')
else:
raise errors.CertificateFormatError(error=str(nsprerr))
return cert
class service(LDAPObject):
@@ -196,13 +228,9 @@ class service_add(LDAPCreate):
except errors.NotFound:
raise errors.NotFound(reason="The host '%s' does not exist to add a service to." % hostname)
cert = entry_attrs.get('usercertificate')
cert = options.get('usercertificate')
if cert:
cert = cert[0]
# FIXME: should be in a normalizer: need to fix normalizers
# to work on non-unicode data
entry_attrs['usercertificate'] = base64.b64decode(cert)
# FIXME: shouldn't we request signing at this point?
entry_attrs['usercertificate'] = normalize_certificate(cert)
if not options.get('force', False):
# We know the host exists if we've gotten this far but we
@@ -273,6 +301,7 @@ class service_mod(LDAPUpdate):
def pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
if 'usercertificate' in options:
cert = options.get('usercertificate')
cert = normalize_certificate(cert)
if cert:
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
if 'usercertificate' in entry_attrs_old:
@@ -281,8 +310,7 @@ class service_mod(LDAPUpdate):
x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
)
raise errors.GenericError(format=fmt)
# FIXME: should be in normalizer; see service_add
entry_attrs['usercertificate'] = base64.b64decode(cert)
entry_attrs['usercertificate'] = cert
else:
entry_attrs['usercertificate'] = None
return dn

View File

@@ -26,6 +26,7 @@ import imp
import logging
import time
import socket
import re
from types import NoneType
from ipalib import errors
@@ -148,3 +149,24 @@ def validate_host_dns(log, fqdn):
log.debug(
'IPA: found %d records for %s' % (len(rs), fqdn)
)
def isvalid_base64(data):
"""
Validate the incoming data as valid base64 data or not.
The character set must only include of a-z, A-Z, 0-9, + or / and
be padded with = to be a length divisible by 4 (so only 0-2 =s are
allowed). Its length must be divisible by 4. White space is
not significant so it is removed.
This doesn't guarantee we have a base64-encoded value, just that it
fits the base64 requirements.
"""
data = ''.join(data.split())
if len(data) % 4 > 0 or \
re.match('^[a-zA-Z0-9\+\/]+\={0,2}$', data) is None:
return False
else:
return True

View File

@@ -24,6 +24,7 @@ Test the `ipalib/plugins/service.py` module.
from ipalib import api, errors
from tests.test_xmlrpc.xmlrpc_test import Declarative, fuzzy_uuid
from tests.test_xmlrpc import objectclasses
import base64
fqdn1 = u'testhost1.%s' % api.env.domain
@@ -34,6 +35,8 @@ service1dn = u'krbprincipalname=%s,cn=services,cn=accounts,%s' % (service1.lower
host1dn = u'fqdn=%s,cn=computers,cn=accounts,%s' % (fqdn1, api.env.basedn)
host2dn = u'fqdn=%s,cn=computers,cn=accounts,%s' % (fqdn2, api.env.basedn)
servercert = 'MIICbzCCAdigAwIBAgICA/4wDQYJKoZIhvcNAQEFBQAwKTEnMCUGA1UEAxMeSVBBIFRlc3QgQ2VydGlmaWNhdGUgQXV0aG9yaXR5MB4XDTEwMDgwOTE1MDIyN1oXDTIwMDgwOTE1MDIyN1owKTEMMAoGA1UEChMDSVBBMRkwFwYDVQQDExBwdW1hLmdyZXlvYWsuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwYbfEOQPgGenPn9vt1JFKvWm/Je3y2tawGWA3LXDuqfFJyYtZ8ib3TcBUOnLk9WK5g2qCwHaNlei7bj8ggIfr5hegAVe10cun+wYErjnYo7hsHYd+57VZezeipWrXu+7NoNd4+c4A5lk4A/xJay9j3bYx2oOM8BEox4xWYoWge1ljPrc5JK46f0X7AGW4F2VhnKPnf8rwSuzI1U8VGjutyM9TWNy3m9KMWeScjyG/ggIpOjUDMV7HkJL0Di61lznR9jXubpiEC7gWGbTp84eGl/Nn9bgK1AwHfJ2lHwfoY4uiL7ge1gyP6EvuUlHoBzdb7pekiX28iePjW3iEG9IawIDAQABoyIwIDARBglghkgBhvhCAQEEBAMCBkAwCwYDVR0PBAQDAgUgMA0GCSqGSIb3DQEBBQUAA4GBACRESLemRV9BPxfEgbALuxH5oE8jQm8WZ3pm2pALbpDlAd9wQc3yVf6RtkfVthyDnM18bg7IhxKpd77/p3H8eCnS8w5MLVRda6ktUC6tGhFTS4QKAf0WyDGTcIgkXbeDw0OPAoNHivoXbIXIIRxlw/XgaSaMzJQDBG8iROsN4kCv'
class test_host(Declarative):
@@ -53,7 +56,7 @@ class test_host(Declarative):
dict(
desc='Try to update non-existent %r' % service1,
command=('service_mod', [service1], dict(usercertificate='Nope')),
command=('service_mod', [service1], dict(usercertificate=servercert)),
expected=errors.NotFound(reason='no such entry'),
),
@@ -223,12 +226,12 @@ class test_host(Declarative):
dict(
desc='Update %r' % service1,
command=('service_mod', [service1], dict(usercertificate='aGVsbG8=')),
command=('service_mod', [service1], dict(usercertificate=servercert)),
expected=dict(
value=service1,
summary=u'Modified service "%s"' % service1,
result=dict(
usercertificate=['hello'],
usercertificate=[base64.b64decode(servercert)],
krbprincipalname=[service1],
managedby_host=[fqdn1],
),
@@ -244,7 +247,7 @@ class test_host(Declarative):
summary=None,
result=dict(
dn=service1dn,
usercertificate=['hello'],
usercertificate=[base64.b64decode(servercert)],
krbprincipalname=[service1],
has_keytab=False,
managedby_host=[fqdn1],
@@ -273,7 +276,7 @@ class test_host(Declarative):
dict(
desc='Try to update non-existent %r' % service1,
command=('service_mod', [service1], dict(usercertificate='Nope')),
command=('service_mod', [service1], dict(usercertificate=servercert)),
expected=errors.NotFound(reason='no such entry'),
),