Drop our own PKCS#10 ASN.1 decoder and use the one from python-nss

This patch:
- bumps up the minimum version of python-nss
- will initialize NSS with nodb if a CSR is loaded and it isn't already
  init'd
- will shutdown NSS if initialized in the RPC subsystem so we use right db
- updated and added a few more tests

Relying more on NSS introduces a bit of a problem. For NSS to work you
need to have initialized a database (either a real one or no_db). But once
you've initialized one and want to use another you have to close down the
first one.  I've added some code to nsslib.py to do just that. This could
potentially have some bad side-effects at some point, it works ok now.
This commit is contained in:
Rob Crittenden
2010-07-20 14:00:43 -04:00
parent 563c7cde40
commit b7ca3d68c2
11 changed files with 154 additions and 477 deletions

View File

@@ -176,7 +176,7 @@ Requires: python-kerberos >= 1.1-3
Requires: authconfig
Requires: gnupg
Requires: pyOpenSSL
Requires: python-nss >= 0.9
Requires: python-nss >= 0.9-8
Requires: python-lxml
%description python
@@ -494,6 +494,9 @@ fi
%endif
%changelog
* Mon Jul 19 2010 Rob Crittenden <rcritten@redhat.com> - 1.99-25
- Bump up minimum version of python-nss to pick up nss_is_initialize() API
* Thu Jun 24 2010 Adam Young <ayoung@redhat.com> - 1.99-24
- Removed python-asset based webui

View File

@@ -1,7 +1,7 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2009 Red Hat
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@@ -17,356 +17,34 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# Read PKCS#10 certificate requests (see RFC 2986 and 5280)
import os
import sys
import base64
import nss.nss as nss
from ipapython import ipautil
from ipalib import api
# NOTE: Not every extension is currently handled. Known to now work:
# 2.5.29.37 - extKeyUsage
PEM = 0
DER = 1
import sys, string, base64
from pyasn1.type import base,tag,namedtype,namedval,univ,constraint,char,useful
from pyasn1.codec.der import decoder, encoder
from pyasn1 import error
import copy
def get_subjectaltname(request):
"""
Given a CSR return the subjectaltname value, if any.
# Common OIDs found in a subject
oidtable = { "2.5.4.3": "CN",
"2.5.4.6": "C",
"2.5.4.7": "L",
"2.5.4.8": "ST",
"2.5.4.10": "O",
"2.5.4.11": "OU",
"1.2.840.113549.1.9.1": "E",
"0.9.2342.19200300.100.1.25": "DC",
}
The return value is a tuple of strings or None
"""
for extension in request.extensions:
if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
return nss.x509_alt_name(extension.value)
return None
# Some useful OIDs
FRIENDLYNAME = '1.2.840.113549.1.9.20'
EXTENSIONREQUEST = '1.2.840.113549.1.9.14'
def get_subject(request):
"""
Given a CSR return the subject value.
MAX = 32 # from mozilla/security/nss/lib/util/secasn1t.h
class DirectoryString(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
)
class AttributeValue(DirectoryString): pass
class AttributeType(univ.ObjectIdentifier): pass
class AttributeTypeAndValue(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('type', AttributeType()),
namedtype.NamedType('value', AttributeValue()) # FIXME, could be any type
)
class KeyPurposeId(univ.ObjectIdentifier): pass
class ExtKeyUsageSyntax(univ.SequenceOf):
componentType = KeyPurposeId()
class UPN(char.UTF8String):
tagSet = char.UTF8String.tagSet.tagExplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
class AttributeValueSet(univ.SetOf):
componentType = univ.Any()
sizeSpec = univ.SetOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
class Attribute(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('type', AttributeType()),
namedtype.NamedType('values', AttributeValueSet()),
)
class Attributes(univ.SetOf):
componentType = Attribute()
class RelativeDistinguishedName(univ.SetOf):
componentType = AttributeTypeAndValue()
class RDNSequence(univ.SequenceOf):
componentType = RelativeDistinguishedName()
class Name(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('', RDNSequence())
)
def get_components(self):
components = self.getComponentByPosition(0)
complist = []
for idx in range(len(components)):
attrandvalue = components[idx].getComponentByPosition(0)
oid = attrandvalue.getComponentByPosition(0)
# FIXME, should handle any string type
value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet)
if value is None:
value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet)
if value is None:
value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet)
vout = value.prettyOut(value).decode('utf-8')
oidout = oid.prettyOut(oid).decode('utf-8')
c = ((oidtable.get(oidout, oidout), vout))
complist.append(c)
return tuple(complist)
class AnotherName(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('type-id', univ.ObjectIdentifier()),
namedtype.NamedType('value', univ.Any())
)
class rfc822Name(char.IA5String):
tagSet = char.IA5String.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)
)
class dNSName(char.IA5String):
tagSet = char.IA5String.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
)
class x400Address(univ.OctetString):
tagSet = univ.OctetString.tagSet.tagImplicitly(
tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 3)
)
class directoryName(Name):
tagSet = Name.tagSet.tagImplicitly(
tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 4)
)
class uniformResourceIdentifier(char.IA5String):
tagSet = char.IA5String.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6)
)
# Not all general types are handled, nor are these necessarily done
# per the specification.
class GeneralName(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('otherName', AnotherName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
namedtype.NamedType('rfc822Name', rfc822Name()), #1
namedtype.NamedType('dNSName', dNSName()), #2
namedtype.NamedType('x400Address', x400Address()), #3
namedtype.NamedType('directoryName', directoryName()), #4
# 5 FIXME
namedtype.NamedType('uniformResourceIdentifier', uniformResourceIdentifier()), #6
# namedtype.NamedType('uniformResourceIdentifier', char.IA5String(tagSet=char.IA5String.tagSet.tagImplicitly(tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 6)))),
)
class GeneralNames(univ.SequenceOf):
componentType = GeneralName()
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
class SubjectAltName(univ.SequenceOf):
componentType = GeneralName()
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
class DistributionPointName(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('fullName', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
namedtype.NamedType('nameRelativeToCRLIssuer', RelativeDistinguishedName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))),
)
class DistributionPoint(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.OptionalNamedType('distributionPoint', DistributionPointName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
namedtype.OptionalNamedType('reasons', univ.BitString().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), # FIXME
namedtype.OptionalNamedType('cRLIssuer', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))),
)
class cRLDistributionPoints(univ.SequenceOf):
componentType = DistributionPoint()
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
class basicConstraints(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.DefaultedNamedType('cA', univ.Boolean('False')),
namedtype.OptionalNamedType('pathLenConstraint', univ.Integer()),
)
class AlgorithmIdentifier(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('algorithm', univ.ObjectIdentifier()),
namedtype.OptionalNamedType('parameters', univ.Any())
)
class SubjectPublicKeyInfo(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('algorithm', AlgorithmIdentifier()),
namedtype.NamedType('subjectPublicKey', univ.BitString())
)
class Version(univ.Integer): pass
class CertificationRequestInfo(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', Version()),
namedtype.NamedType('subject', Name()),
namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()),
namedtype.OptionalNamedType('attributes', Attributes().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))
)
class CertificationRequest(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('certificationRequestInfo', CertificationRequestInfo()),
namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()),
namedtype.NamedType('signatureValue', univ.BitString())
)
def get_version(self):
info = self.getComponentByName('certificationRequestInfo')
version = info.getComponentByName('version')
return version._value
def get_subject(self):
info = self.getComponentByName('certificationRequestInfo')
return info.getComponentByName('subject')
def get_subjectaltname(self):
attrs = self.get_attributes()
attrdict = dict(attrs)
if EXTENSIONREQUEST in attrdict:
# Extensions are a 3 position tuple
for ext in attrdict[EXTENSIONREQUEST]:
if ext[0] == '2.5.29.17':
# alt name is in the dNSName position
return ext[2][2]
def get_attributes(self):
info = self.getComponentByName('certificationRequestInfo')
attrs = info.getComponentByName('attributes')
attributes = []
for idx in range(len(attrs)):
atype = attrs[idx].getComponentByPosition(0)
aval = attrs[idx].getComponentByPosition(1)
# The attribute list is of type Any, need to re-encode
aenc = encoder.encode(aval, maxChunkSize=1024)
decoded = decoder.decode(aenc)[0]
oid = atype.prettyOut(atype)
if oid == "1.2.840.113549.1.9.20": # PKCS#9 Friendly Name
value = decoded.getComponentByPosition(0)
t = (oid, value.prettyOut(value).decode('utf-8'))
attributes.append(t)
elif oid == "1.2.840.113549.1.9.14": # PKCS#9 Extension Req
extensions = []
extlist = decoded.getComponentByPosition(0)
for jdx in range(len(extlist)):
ext = extlist.getComponentByPosition(jdx)
# An extension has 3 elements:
# oid
# bool - critical
# value
if len(ext) == 2: # If no critical, default to False
extoid = atype.prettyOut(ext.getComponentByPosition(0))
critical = False
extvalue = ext.getComponentByPosition(1)
else:
extoid = atype.prettyOut(ext.getComponentByPosition(0))
critical = bool(ext.getComponentByPosition(1)._value)
extvalue = ext.getComponentByPosition(2)
if extoid == '2.5.29.19': # basicConstraints
extdecoded = decoder.decode(extvalue._value, asn1Spec=basicConstraints())[0]
ca = bool(extdecoded[0])
if len(extdecoded) == 2: # path length is optional
pathlen = extdecoded[1]._value
else:
pathlen = None
constraint = (ca, pathlen)
e = (extoid, critical, constraint)
extensions.append(e)
continue
elif extoid == '2.5.29.31': # cRLDistributionPoints
extdecoded = decoder.decode(extvalue._value, asn1Spec=cRLDistributionPoints())[0]
distpoints = []
for elem in range(len(extdecoded)):
name = extdecoded[elem]
# DistributionPoint is position 0
distpoint = name.getComponentByPosition(0)
# fullName is position 0
fullname = distpoint.getComponentByPosition(0)
for crl in range(len(fullname)):
# Get the GeneralName, URI type
uri = fullname.getComponentByPosition(crl).getComponentByPosition(5)
distpoints.append(uri.prettyOut(uri).decode('utf-8'))
e = (extoid, critical, tuple(distpoints))
extensions.append(e)
continue
# The data is is encoded as "Any". Pull the raw data out
# and re-decode it using a different specification.
try:
extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralNames())[0]
except error.PyAsn1Error:
# I've seen CSRs where this isn't a sequence of names
# but is a single name, try to handle that too.
try:
extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralName())[0]
extdecoded = [extdecoded]
except error.PyAsn1Error, e:
# skip for now
generalnames = 9*["Error"]
e = (extoid, critical, tuple(generalnames))
extensions.append(e)
continue
# We now have a list of extensions in the order they
# are in the request as GeneralNames. We iterate through
# each of those to get a GeneralName. We then have to
# iterate through that to find the position set in it.
# Note that not every type will be returned. Those that
# are handled are returned in a tuple in the position
# which they are in the request.
generalnames = 9*[None]
for elem in range(len(extdecoded)):
name = extdecoded[elem]
for n in range(len(name)):
if name[n] is None:
continue
if generalnames[n] is None:
generalnames[n] = []
if n == 3: # OctetString
generalnames[n].append(name[n]._value)
if n in [1, 2, 6]: # IA5String
if n == 6 and extoid == "2.5.29.37":
# Extended key usage
v = copy.deepcopy(extvalue._value)
othername = decoder.decode(v, asn1Spec=ExtKeyUsageSyntax())[0]
keyusage = []
for l in range(len(othername)):
keyusage.append(othername[l].prettyOut(othername[l]))
generalnames[n] = tuple(keyusage)
else:
generalnames[n].append(name[n].prettyOut(name[n]).decode('utf-8'))
if n == 0: # AnotherName
nameoid = name[n].getComponentByPosition(0)
nameoid = nameoid.prettyOut(nameoid)
val = name[n].getComponentByPosition(1)
if nameoid == "1.3.6.1.4.1.311.20.2.3": # UPN
v = copy.deepcopy(val._value)
othername = decoder.decode(v, asn1Spec=UPN())[0]
generalnames[0].append(othername.prettyOut(othername).decode('utf-8'))
e = (extoid, critical, tuple(generalnames))
extensions.append(e)
t = (oid, tuple(extensions))
attributes.append(t)
return tuple(attributes)
This returns an nss.DN object.
"""
return request.subject
def strip_header(csr):
"""
@@ -392,50 +70,26 @@ def load_certificate_request(csr):
substrate = base64.b64decode(csr)
return decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
# A fail-safe so we can always read a CSR. python-nss/NSS will segfault
# otherwise
if not nss.nss_is_initialized():
nss.nss_init_nodb()
return nss.CertificateRequest(substrate)
if __name__ == '__main__':
# Read PEM certs from stdin and print them out in plain text
nss.nss_init_nodb()
stSpam, stHam, stDump = 0, 1, 2
state = stSpam
# Read PEM request from stdin and print out its components
for certLine in sys.stdin.readlines():
certLine = string.strip(certLine)
if state == stSpam:
if state == stSpam:
if certLine == '-----BEGIN NEW CERTIFICATE REQUEST-----':
certLines = []
state = stHam
continue
if state == stHam:
if certLine == '-----END NEW CERTIFICATE REQUEST-----':
state = stDump
else:
certLines.append(certLine)
complist = []
if state == stDump:
substrate = ''
for certLine in certLines:
substrate = substrate + base64.b64decode(certLine)
csrlines = sys.stdin.readlines()
csrlines = fp.readlines()
fp.close()
csr = ''.join(csrlines)
request = decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
subject = request.get_subject()
attrs = request.get_attributes()
print "Attributes:"
print attrs
csr = load_certificate_request(csr)
print "Subject:"
complist = subject.get_components()
print complist
out=""
for c in complist:
out = out + "%s=%s," % (c[0], c[1])
print out[:-1]
print csr
print request.get_subjectaltname()
# Re-encode the request just to be sure things are working
assert encoder.encode(request, maxChunkSize=1024) == substrate, 'cert recode fails'
state = stSpam
print get_subject(csr)
print get_subjectaltname(csr)

View File

@@ -69,7 +69,6 @@ from ipalib import x509
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
import base64
from pyasn1.error import PyAsn1Error
import logging
import traceback
from ipalib.text import _
@@ -77,37 +76,31 @@ from ipalib.request import context
from ipalib.output import Output
from ipalib.plugins.service import validate_principal
import nss.nss as nss
from nss.error import NSPRError
def get_csr_hostname(csr):
"""
Return the value of CN in the subject of the request
Return the value of CN in the subject of the request or None
"""
try:
request = pkcs10.load_certificate_request(csr)
sub = request.get_subject().get_components()
for s in sub:
if s[0].lower() == "cn":
return s[1]
except PyAsn1Error:
# The ASN.1 decoding errors tend to be long and involved and the
# last bit is generally not interesting. We need the whole traceback.
logging.error('Unable to decode CSR\n%s', traceback.format_exc())
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
return None
subject = pkcs10.get_subject(request)
return subject.common_name
except NSPRError, nsprerr:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request:'))
def get_subjectaltname(csr):
"""
Return the value of the subject alt name, if any
Return the first value of the subject alt name, if any
"""
try:
request = pkcs10.load_certificate_request(csr)
except PyAsn1Error:
# The ASN.1 decoding errors tend to be long and involved and the
# last bit is generally not interesting. We need the whole traceback.
logging.error('Unable to decode CSR\n%s', traceback.format_exc())
for extension in request.extensions:
if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
return nss.x509_alt_name(extension.value)[0]
return None
except NSPRError, nsprerr:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
return request.get_subjectaltname()
def validate_csr(ugettext, csr):
"""
@@ -116,13 +109,9 @@ def validate_csr(ugettext, csr):
"""
try:
request = pkcs10.load_certificate_request(csr)
# Explicitly request the attributes. This fires off additional
# decoding to get things like the subjectAltName.
attrs = request.get_attributes()
except TypeError, e:
raise errors.Base64DecodeError(reason=str(e))
except PyAsn1Error:
except NSPRError:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
except Exception, e:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e))
@@ -290,7 +279,8 @@ class cert_request(VirtualCommand):
raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn)
# Validate the subject alt name, if any
subjectaltname = get_subjectaltname(csr)
request = pkcs10.load_certificate_request(csr)
subjectaltname = pkcs10.get_subjectaltname(request)
if subjectaltname is not None:
for name in subjectaltname:
try:

View File

@@ -122,6 +122,10 @@ class NSSConnection(httplib.HTTPConnection):
raise RuntimeError("dbdir is required")
logging.debug('%s init %s', self.__class__.__name__, host)
if nss.nss_is_initialized():
# close any open NSS database and use the new one
ssl.clear_session_cache()
nss.nss_shutdown()
nss.nss_init(dbdir)
ssl.set_domestic_policy()
nss.set_password_callback(self.password_callback)

View File

@@ -366,7 +366,7 @@ class DsInstance(service.Service):
self._ldap_mod("ipa-winsync-conf.ldif")
def __config_version_module(self):
self._ldap_mod("ipa-version-conf.ldif")
self._ldap_mod("version-conf.ldif")
def __user_private_groups(self):
if has_managed_entries(self.host_name, self.dm_password):

View File

@@ -45,10 +45,9 @@ import re
from ipaserver.plugins import rabase
from ipaserver.install import certs
import tempfile
from pyasn1 import error
from ipalib import _
from pyasn1.codec.der import encoder
from ipalib.plugins.cert import get_csr_hostname
from nss.error import NSPRError
class ra(rabase.rabase):
"""
@@ -87,23 +86,19 @@ class ra(rabase.rabase):
config = api.Command['config_show']()['result']
subject_base = config.get('ipacertificatesubjectbase')[0]
hostname = get_csr_hostname(csr)
request = pkcs10.load_certificate_request(csr)
base = re.split(',\s*(?=\w+=)', subject_base)
base.reverse()
base.append("CN=%s" % hostname)
request_subject = request.get_subject().get_components()
new_request = []
for r in request_subject:
new_request.append("%s=%s" % (r[0], r[1]))
base.insert(0,'CN=%s' % hostname)
subject_base = ",".join(base)
request = pkcs10.load_certificate_request(csr)
# python-nss normalizes the request subject
request_subject = str(pkcs10.get_subject(request))
if str(base).lower() != str(new_request).lower():
subject_base='CN=%s, %s' % (hostname, subject_base)
new_request.reverse()
if str(subject_base).lower() != request_subject.lower():
raise errors.CertificateOperationError(error=_('Request subject "%(request_subject)s" does not match the form "%(subject_base)s"') % \
{'request_subject' : ', '.join(new_request), 'subject_base' : subject_base})
{'request_subject' : request_subject, 'subject_base' : subject_base})
except errors.CertificateOperationError, e:
raise e
except Exception, e:
except NSPRError, e:
raise errors.CertificateOperationError(error=_('unable to decode csr: %s' % e))
# certutil wants the CSR to have have a header and footer. Add one
@@ -207,11 +202,10 @@ class ra(rabase.rabase):
pass
try:
# Grab the subject, reverse it, combine it and return it
subject = x509.get_subject(cert)
serial = x509.get_serial_number(cert)
except error.PyAsn1Error, e:
except NSPRError, e:
self.log.error('Unable to decode certificate in entry: %s' % str(e))
raise errors.CertificateOperationError(error='Unable to decode certificate in entry: %s' % str(e))

View File

@@ -92,18 +92,18 @@ class test_x509(object):
Test retrieving the subject
"""
subject = x509.get_subject(goodcert)
assert subject == 'CN=ipa.example.com,O=IPA'
assert str(subject) == 'CN=ipa.example.com,O=IPA'
der = base64.b64decode(goodcert)
subject = x509.get_subject(der, x509.DER)
assert subject == 'CN=ipa.example.com,O=IPA'
assert str(subject) == 'CN=ipa.example.com,O=IPA'
# We should be able to pass in a tuple/list of certs too
subject = x509.get_subject((goodcert))
assert subject == 'CN=ipa.example.com,O=IPA'
assert str(subject) == 'CN=ipa.example.com,O=IPA'
subject = x509.get_subject([goodcert])
assert subject == 'CN=ipa.example.com,O=IPA'
assert str(subject) == 'CN=ipa.example.com,O=IPA'
def test_2_get_serial_number(self):
"""
@@ -132,8 +132,8 @@ class test_x509(object):
cert = x509.load_certificate(goodcert)
assert cert.subject == 'CN=ipa.example.com,O=IPA'
assert cert.issuer == 'CN=IPA Test Certificate Authority'
assert str(cert.subject) == 'CN=ipa.example.com,O=IPA'
assert str(cert.issuer) == 'CN=IPA Test Certificate Authority'
assert cert.serial_number == 1093
assert cert.valid_not_before_str == 'Fri Jun 25 13:00:42 2010 UTC'
assert cert.valid_not_after_str == 'Thu Jun 25 13:00:42 2015 UTC'

View File

@@ -0,0 +1,3 @@
-----BEGIN NEW CERTIFICATE REQUEST-----
VGhpcyBpcyBhbiBpbnZhbGlkIENTUg==
-----END NEW CERTIFICATE REQUEST-----

View File

@@ -0,0 +1,4 @@
-----BEGIN NEW CERTIFICATE REQUEST-----
Invalidate data
-----END NEW CERTIFICATE REQUEST-----

View File

@@ -0,0 +1,20 @@
Certificate request generated by Netscape certutil
Phone: (not specified)
Common Name: test.example.com
Email: (not specified)
Organization: IPA
State: (not specified)
Country: (not specified)
-----BEGIN NEW CERTIFICATE REQUEST-----
MIIBaDCB0gIBADApMQwwCgYDVQQKEwNJUEExGTAXBgNVBAMTEHRlc3QuZXhhbXBs
ZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAPnSCLwl7IytP2HC7+zv
nI2fe6oRCE/J8K1jIoiqS9engx3Yfe4kaXWWzcwmuUV57VhUmWDEQIbSREPdrVSi
tWC55ilGmPOAEw+mP4qg6Ctb+d8Egmy1JVrpIYCLNXvEd3dAaimB0J+K3hKFRyHI
2MzrIuFqqohRijkDLwB8oVVdAgMBAAGgADANBgkqhkiG9w0BAQUFAAOBgQACt37K
j+RMEbqG8s0Uxs3FhcfiAx8Do99CDizY/b7hZEgMyG4dLmm+vSCBbxBrG5oMlxJD
dxnpk0PQSknNkJVrCS/J1OTpOPRTi4VKATT3tHJAfDbWZTwcSelUCLQ4lREiuT3D
WP4vKrLIxDJDb+/mwuV7WWo34E6MD9iTB1xINg==
-----END NEW CERTIFICATE REQUEST-----

View File

@@ -26,6 +26,8 @@ import nose
from tests.util import raises, PluginTester
from ipalib import pkcs10
from ipapython import ipautil
import nss.nss as nss
from nss.error import NSPRError
class test_update(object):
"""
@@ -33,6 +35,7 @@ class test_update(object):
"""
def setUp(self):
nss.nss_init_nodb()
if ipautil.file_exists("test0.csr"):
self.testdir="./"
elif ipautil.file_exists("tests/test_pkcs10/test0.csr"):
@@ -53,15 +56,11 @@ class test_update(object):
csr = self.read_file("test0.csr")
request = pkcs10.load_certificate_request(csr)
attributes = request.get_attributes()
subject = request.get_subject()
components = subject.get_components()
compdict = dict(components)
subject = pkcs10.get_subject(request)
assert(attributes == ())
assert(compdict['CN'] == u'test.example.com')
assert(compdict['ST'] == u'California')
assert(compdict['C'] == u'US')
assert(subject.common_name == 'test.example.com')
assert(subject.state_name == 'California')
assert(subject.country_name == 'US')
def test_1(self):
"""
@@ -70,23 +69,15 @@ class test_update(object):
csr = self.read_file("test1.csr")
request = pkcs10.load_certificate_request(csr)
attributes = request.get_attributes()
subject = request.get_subject()
components = subject.get_components()
compdict = dict(components)
attrdict = dict(attributes)
subject = pkcs10.get_subject(request)
assert(compdict['CN'] == u'test.example.com')
assert(compdict['ST'] == u'California')
assert(compdict['C'] == u'US')
assert(subject.common_name == 'test.example.com')
assert(subject.state_name == 'California')
assert(subject.country_name == 'US')
extensions = attrdict['1.2.840.113549.1.9.14']
for ext in range(len(extensions)):
if extensions[ext][0] == '2.5.29.17':
names = extensions[ext][2]
# check the dNSName field
assert(names[2] == [u'testlow.example.com'])
for extension in request.extensions:
if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
assert nss.x509_alt_name(extension.value)[0] == 'testlow.example.com'
def test_2(self):
"""
@@ -95,25 +86,39 @@ class test_update(object):
csr = self.read_file("test2.csr")
request = pkcs10.load_certificate_request(csr)
attributes = request.get_attributes()
subject = request.get_subject()
components = subject.get_components()
compdict = dict(components)
attrdict = dict(attributes)
subject = pkcs10.get_subject(request)
assert(compdict['CN'] == u'test.example.com')
assert(compdict['ST'] == u'California')
assert(compdict['C'] == u'US')
assert(subject.common_name == 'test.example.com')
assert(subject.state_name == 'California')
assert(subject.country_name == 'US')
extensions = attrdict['1.2.840.113549.1.9.14']
for extension in request.extensions:
if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
assert nss.x509_alt_name(extension.value)[0] == 'testlow.example.com'
if extension.oid_tag == nss.SEC_OID_X509_CRL_DIST_POINTS:
pts = nss.CRLDistributionPts(extension.value)
urls = pts[0].get_general_names()
assert('http://ca.example.com/my.crl' in urls)
assert('http://other.example.com/my.crl' in urls)
for ext in range(len(extensions)):
if extensions[ext][0] == '2.5.29.17':
names = extensions[ext][2]
# check the dNSName field
assert(names[2] == [u'testlow.example.com'])
if extensions[ext][0] == '2.5.29.31':
urls = extensions[ext][2]
assert(len(urls) == 2)
assert(urls[0] == u'http://ca.example.com/my.crl')
assert(urls[1] == u'http://other.example.com/my.crl')
def test_3(self):
"""
Test CSR with base64-encoded bogus data
"""
csr = self.read_file("test3.csr")
try:
request = pkcs10.load_certificate_request(csr)
except NSPRError, nsprerr:
# (SEC_ERROR_BAD_DER) security library: improperly formatted DER-encoded message.
assert(nsprerr. errno== -8183)
def test_4(self):
"""
Test CSR with badly formatted base64-encoded data
"""
csr = self.read_file("test4.csr")
try:
request = pkcs10.load_certificate_request(csr)
except TypeError, typeerr:
assert(str(typeerr) == 'Incorrect padding')