mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Clean up crypto code, take advantage of new nss-python capabilities
This patch does the following: - drops our in-tree x509v3 parser to use the python-nss one - return more information on certificates - make an API change, renaming cert-get to cert-show - Drop a lot of duplicated code
This commit is contained in:
parent
1e1985b17c
commit
8d2d7429be
@ -44,7 +44,7 @@ EXAMPLES:
|
||||
ipa cert-request --add --principal=HTTP/lion.example.com example.csr
|
||||
|
||||
Retrieve an existing certificate:
|
||||
ipa cert-request 1032
|
||||
ipa cert-show 1032
|
||||
|
||||
Revoke a certificate (see RFC 5280 for reason details):
|
||||
ipa cert-revoke --revocation-reason=6 1032
|
||||
@ -75,53 +75,8 @@ import traceback
|
||||
from ipalib.text import _
|
||||
from ipalib.request import context
|
||||
from ipalib.output import Output
|
||||
|
||||
def get_serial(certificate):
|
||||
"""
|
||||
Given a certificate, return the serial number in that cert
|
||||
as a Python long object.
|
||||
|
||||
In theory there should be only one cert per object so even if we get
|
||||
passed in a list/tuple only return the first one.
|
||||
"""
|
||||
if type(certificate) in (list, tuple):
|
||||
certificate = certificate[0]
|
||||
try:
|
||||
certificate = base64.b64decode(certificate)
|
||||
except Exception, e:
|
||||
pass
|
||||
try:
|
||||
|
||||
serial = x509.get_serial_number(certificate, x509.DER)
|
||||
except PyAsn1Error:
|
||||
raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
|
||||
|
||||
return serial
|
||||
|
||||
def get_subject(certificate):
|
||||
"""
|
||||
Given a certificate, return the subject
|
||||
|
||||
In theory there should be only one cert per object so even if we get
|
||||
passed in a list/tuple only return the first one.
|
||||
"""
|
||||
if type(certificate) in (list, tuple):
|
||||
certificate = certificate[0]
|
||||
try:
|
||||
certificate = base64.b64decode(certificate)
|
||||
except Exception, e:
|
||||
pass
|
||||
try:
|
||||
sub = list(x509.get_subject_components(certificate, type=x509.DER))
|
||||
sub.reverse()
|
||||
except PyAsn1Error:
|
||||
raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
|
||||
|
||||
subject = ""
|
||||
for s in sub:
|
||||
subject = subject + "%s=%s," % (s[0], s[1])
|
||||
|
||||
return subject[:-1]
|
||||
from ipalib.plugins.service import validate_principal
|
||||
import nss.nss as nss
|
||||
|
||||
def get_csr_hostname(csr):
|
||||
"""
|
||||
@ -192,6 +147,20 @@ def normalize_csr(csr):
|
||||
|
||||
return csr
|
||||
|
||||
def get_host_from_principal(principal):
|
||||
"""
|
||||
Given a principal with or without a realm return the
|
||||
host portion.
|
||||
"""
|
||||
validate_principal(None, principal)
|
||||
realm = principal.find('@')
|
||||
slash = principal.find('/')
|
||||
if realm == -1:
|
||||
realm = len(principal)
|
||||
hostname = principal[slash+1:realm]
|
||||
|
||||
return hostname
|
||||
|
||||
class cert_request(VirtualCommand):
|
||||
"""
|
||||
Submit a certificate signing request.
|
||||
@ -219,6 +188,9 @@ class cert_request(VirtualCommand):
|
||||
default=False,
|
||||
autofill=True
|
||||
),
|
||||
)
|
||||
|
||||
has_output_params = (
|
||||
Str('certificate?',
|
||||
label=_('Certificate'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
@ -227,6 +199,26 @@ class cert_request(VirtualCommand):
|
||||
label=_('Subject'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('issuer?',
|
||||
label=_('Issuer'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('valid_not_before?',
|
||||
label=_('Not Before'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('valid_not_after?',
|
||||
label=_('Not After'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('md5_fingerprint?',
|
||||
label=_('Fingerprint (MD5)'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('sha1_fingerprint?',
|
||||
label=_('Fingerprint (SHA1)'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('serial_number?',
|
||||
label=_('Serial number'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
@ -281,11 +273,7 @@ class cert_request(VirtualCommand):
|
||||
service = api.Command['service_show'](principal, all=True, raw=True)['result']
|
||||
dn = service['dn']
|
||||
else:
|
||||
realm = principal.find('@')
|
||||
if realm == -1:
|
||||
realm = len(principal)
|
||||
hostname = principal[5:realm]
|
||||
|
||||
hostname = get_host_from_principal(principal)
|
||||
service = api.Command['host_show'](hostname, all=True, raw=True)['result']
|
||||
dn = service['dn']
|
||||
except errors.NotFound, e:
|
||||
@ -319,12 +307,12 @@ class cert_request(VirtualCommand):
|
||||
raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name)
|
||||
|
||||
if 'usercertificate' in service:
|
||||
serial = get_serial(base64.b64encode(service['usercertificate'][0]))
|
||||
serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER)
|
||||
# revoke the certificate and remove it from the service
|
||||
# entry before proceeding. First we retrieve the certificate to
|
||||
# see if it is already revoked, if not then we revoke it.
|
||||
try:
|
||||
result = api.Command['cert_get'](unicode(serial))['result']
|
||||
result = api.Command['cert_show'](unicode(serial))['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
|
||||
@ -334,10 +322,20 @@ class cert_request(VirtualCommand):
|
||||
except errors.NotImplementedError:
|
||||
# some CA's might not implement get
|
||||
pass
|
||||
api.Command['service_mod'](principal, usercertificate=None)
|
||||
if not principal.startswith('host/'):
|
||||
api.Command['service_mod'](principal, usercertificate=None)
|
||||
else:
|
||||
hostname = get_host_from_principal(principal)
|
||||
api.Command['host_mod'](hostname, usercertificate=None)
|
||||
|
||||
# Request the certificate
|
||||
result = self.Backend.ra.request_certificate(csr, **kw)
|
||||
cert = x509.load_certificate(result['certificate'])
|
||||
result['issuer'] = unicode(cert.issuer)
|
||||
result['valid_not_before'] = unicode(cert.valid_not_before_str)
|
||||
result['valid_not_after'] = unicode(cert.valid_not_after_str)
|
||||
result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
|
||||
result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
|
||||
|
||||
# Success? Then add it to the service entry.
|
||||
if 'certificate' in result:
|
||||
@ -345,10 +343,7 @@ class cert_request(VirtualCommand):
|
||||
skw = {"usercertificate": str(result.get('certificate'))}
|
||||
api.Command['service_mod'](principal, **skw)
|
||||
else:
|
||||
realm = principal.find('@')
|
||||
if realm == -1:
|
||||
realm = len(principal)
|
||||
hostname = principal[5:realm]
|
||||
hostname = get_host_from_principal(principal)
|
||||
skw = {"usercertificate": str(result.get('certificate'))}
|
||||
api.Command['host_mod'](hostname, **skw)
|
||||
|
||||
@ -370,10 +365,9 @@ class cert_status(VirtualCommand):
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
)
|
||||
takes_options = (
|
||||
Str('cert_request_status?',
|
||||
has_output_params = (
|
||||
Str('cert_request_status',
|
||||
label=_('Request status'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
)
|
||||
operation = "certificate status"
|
||||
@ -393,25 +387,37 @@ _serial_number = Str('serial_number',
|
||||
doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'),
|
||||
)
|
||||
|
||||
class cert_get(VirtualCommand):
|
||||
class cert_show(VirtualCommand):
|
||||
"""
|
||||
Retrieve an existing certificate.
|
||||
"""
|
||||
|
||||
takes_args = _serial_number
|
||||
|
||||
takes_options = (
|
||||
Str('certificate?',
|
||||
has_output_params = (
|
||||
Str('certificate',
|
||||
label=_('Certificate'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('subject?',
|
||||
Str('subject',
|
||||
label=_('Subject'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('issuer',
|
||||
label=_('Issuer'),
|
||||
),
|
||||
Str('valid_not_before',
|
||||
label=_('Not Before'),
|
||||
),
|
||||
Str('valid_not_after',
|
||||
label=_('Not After'),
|
||||
),
|
||||
Str('md5_fingerprint',
|
||||
label=_('Fingerprint (MD5)'),
|
||||
),
|
||||
Str('sha1_fingerprint',
|
||||
label=_('Fingerprint (SHA1)'),
|
||||
),
|
||||
Str('revocation_reason?',
|
||||
label=_('Revocation reason'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
)
|
||||
|
||||
@ -420,10 +426,16 @@ class cert_get(VirtualCommand):
|
||||
def execute(self, serial_number):
|
||||
self.check_access()
|
||||
result=self.Backend.ra.get_certificate(serial_number)
|
||||
result['subject'] = get_subject(result['certificate'])
|
||||
cert = x509.load_certificate(result['certificate'])
|
||||
result['subject'] = unicode(cert.subject)
|
||||
result['issuer'] = unicode(cert.issuer)
|
||||
result['valid_not_before'] = unicode(cert.valid_not_before_str)
|
||||
result['valid_not_after'] = unicode(cert.valid_not_after_str)
|
||||
result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
|
||||
result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
|
||||
return dict(result=result)
|
||||
|
||||
api.register(cert_get)
|
||||
api.register(cert_show)
|
||||
|
||||
|
||||
class cert_revoke(VirtualCommand):
|
||||
@ -433,10 +445,9 @@ class cert_revoke(VirtualCommand):
|
||||
|
||||
takes_args = _serial_number
|
||||
|
||||
takes_options = (
|
||||
Flag('revoked?',
|
||||
has_output_params = (
|
||||
Flag('revoked',
|
||||
label=_('Revoked'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
)
|
||||
operation = "revoke certificate"
|
||||
@ -468,14 +479,12 @@ class cert_remove_hold(VirtualCommand):
|
||||
|
||||
takes_args = _serial_number
|
||||
|
||||
takes_options = (
|
||||
has_output_params = (
|
||||
Flag('unrevoked?',
|
||||
label=_('Unrevoked'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
Str('error_string?',
|
||||
label=_('Error'),
|
||||
flags=['no_create', 'no_update', 'no_search'],
|
||||
),
|
||||
)
|
||||
operation = "certificate remove hold"
|
||||
|
@ -71,8 +71,8 @@ from ipalib import Str, Flag, Bytes
|
||||
from ipalib.plugins.baseldap import *
|
||||
from ipalib.plugins.service import split_principal
|
||||
from ipalib.plugins.service import validate_certificate
|
||||
from ipalib.plugins.service import get_serial
|
||||
from ipalib import _, ngettext
|
||||
from ipalib import x509
|
||||
import base64
|
||||
|
||||
|
||||
@ -291,10 +291,10 @@ class host_mod(LDAPUpdate):
|
||||
if 'usercertificate' in entry_attrs_old:
|
||||
# FIXME: what to do here? do we revoke the old cert?
|
||||
fmt = 'entry already has a certificate, serial number: %s' % (
|
||||
get_serial(entry_attrs_old['usercertificate'])
|
||||
x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
|
||||
)
|
||||
raise errors.GenericError(format=fmt)
|
||||
# FIXME: should be in normalizer; see service_add
|
||||
# FIXME: decoding should be in normalizer; see service_add
|
||||
entry_attrs['usercertificate'] = base64.b64decode(cert)
|
||||
|
||||
return dn
|
||||
|
@ -64,26 +64,9 @@ from ipalib import api, errors
|
||||
from ipalib import Str, Flag, Bytes
|
||||
from ipalib.plugins.baseldap import *
|
||||
from ipalib import x509
|
||||
from pyasn1.error import PyAsn1Error
|
||||
from ipalib import _, ngettext
|
||||
|
||||
|
||||
def get_serial(certificate):
|
||||
"""
|
||||
Given a certificate, return the serial number in that
|
||||
cert as a Python long object.
|
||||
"""
|
||||
if type(certificate) in (list, tuple):
|
||||
certificate = certificate[0]
|
||||
|
||||
try:
|
||||
serial = x509.get_serial_number(certificate, type=x509.DER)
|
||||
except PyAsn1Error, e:
|
||||
raise errors.GenericError(
|
||||
format='Unable to decode certificate in entry: %s' % e
|
||||
)
|
||||
return serial
|
||||
|
||||
def split_principal(principal):
|
||||
service = hostname = realm = None
|
||||
|
||||
@ -194,6 +177,7 @@ class service_add(LDAPCreate):
|
||||
|
||||
cert = entry_attrs.get('usercertificate')
|
||||
if cert:
|
||||
cert = cert[0]
|
||||
# FIXME: should be in a normalizer: need to fix normalizers
|
||||
# to work on non-unicode data
|
||||
entry_attrs['usercertificate'] = base64.b64decode(cert)
|
||||
@ -229,9 +213,10 @@ class service_del(LDAPDelete):
|
||||
(dn, entry_attrs) = ldap.get_entry(dn, ['usercertificate'])
|
||||
cert = entry_attrs.get('usercertificate')
|
||||
if cert:
|
||||
serial = unicode(get_serial(cert))
|
||||
cert = cert[0]
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
result = api.Command['cert_get'](unicode(serial))['result']
|
||||
result = api.Command['cert_show'](unicode(serial))['result']
|
||||
if 'revocation_reason' not in result:
|
||||
try:
|
||||
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
|
||||
@ -267,7 +252,7 @@ class service_mod(LDAPUpdate):
|
||||
if 'usercertificate' in entry_attrs_old:
|
||||
# FIXME: what to do here? do we revoke the old cert?
|
||||
fmt = 'entry already has a certificate, serial number: %s' % (
|
||||
get_serial(entry_attrs_old['usercertificate'])
|
||||
x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
|
||||
)
|
||||
raise errors.GenericError(format=fmt)
|
||||
# FIXME: should be in normalizer; see service_add
|
||||
|
286
ipalib/x509.py
286
ipalib/x509.py
@ -1,199 +1,31 @@
|
||||
"""
|
||||
Imported from pyasn1 project:
|
||||
# Authors:
|
||||
# Rob Crittenden <rcritten@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2010 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License as
|
||||
# published by the Free Software Foundation; version 2 only
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
|
||||
Copyright (c) 2005-2009 Ilya Etingof <ilya@glas.net>, all rights reserved.
|
||||
|
||||
THIS SOFTWARE IS NOT FAULT TOLERANT AND SHOULD NOT BE USED IN ANY SITUATION
|
||||
ENDANGERING HUMAN LIFE OR PROPERTY.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice,
|
||||
this list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
* The name of the authors may not be used to endorse or promote products
|
||||
derived from this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR
|
||||
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
|
||||
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
"""
|
||||
|
||||
"""
|
||||
Enhancements released under IPA GPLv2 only license
|
||||
"""
|
||||
|
||||
# Read ASN.1/PEM X.509 certificates on stdin, parse each into plain text,
|
||||
# then build substrate from it
|
||||
import sys, string, base64
|
||||
from pyasn1.type import tag,namedtype,namedval,univ,constraint,char,useful
|
||||
from pyasn1.codec.der import decoder, encoder
|
||||
from pyasn1 import error
|
||||
|
||||
# Would be autogenerated from ASN.1 source by a ASN.1 parser
|
||||
# X.509 spec (rfc2459)
|
||||
import sys
|
||||
import base64
|
||||
import nss.nss as nss
|
||||
from ipapython import ipautil
|
||||
from ipalib import api
|
||||
|
||||
PEM = 0
|
||||
DER = 1
|
||||
|
||||
# Common OIDs found in a subject
|
||||
oidtable = { "2.5.4.3": "CN",
|
||||
"2.5.4.6": "C",
|
||||
"2.5.4.7": "L",
|
||||
"2.5.4.8": "ST",
|
||||
"2.5.4.10": "O",
|
||||
"2.5.4.11": "OU",
|
||||
"1.2.840.113549.1.9.1": "E",
|
||||
"0.9.2342.19200300.100.1.25": "DC",
|
||||
}
|
||||
|
||||
MAX = 64 # XXX ?
|
||||
|
||||
class DirectoryString(univ.Choice):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||
namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||
namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||
namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||
namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||
namedtype.NamedType('ia5String', char.IA5String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))) # hm, this should not be here!? XXX
|
||||
)
|
||||
|
||||
class AttributeValue(DirectoryString): pass
|
||||
|
||||
class AttributeType(univ.ObjectIdentifier): pass
|
||||
|
||||
class AttributeTypeAndValue(univ.Sequence):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('type', AttributeType()),
|
||||
namedtype.NamedType('value', AttributeValue())
|
||||
)
|
||||
|
||||
class RelativeDistinguishedName(univ.SetOf):
|
||||
componentType = AttributeTypeAndValue()
|
||||
|
||||
class RDNSequence(univ.SequenceOf):
|
||||
componentType = RelativeDistinguishedName()
|
||||
|
||||
class Name(univ.Choice):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('', RDNSequence())
|
||||
)
|
||||
|
||||
def get_components(self):
|
||||
components = self.getComponentByPosition(0)
|
||||
complist = []
|
||||
for idx in range(len(components)):
|
||||
attrandvalue = components[idx].getComponentByPosition(0)
|
||||
oid = attrandvalue.getComponentByPosition(0)
|
||||
# FIXME, should handle any string type
|
||||
value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet)
|
||||
if value is None:
|
||||
value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet)
|
||||
if value is None:
|
||||
value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet)
|
||||
vout = value.prettyOut(value).decode('utf-8')
|
||||
oidout = oid.prettyOut(oid).decode('utf-8')
|
||||
c = ((oidtable.get(oidout, oidout), vout))
|
||||
complist.append(c)
|
||||
|
||||
return tuple(complist)
|
||||
|
||||
class AlgorithmIdentifier(univ.Sequence):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('algorithm', univ.ObjectIdentifier()),
|
||||
namedtype.OptionalNamedType('parameters', univ.Null())
|
||||
# XXX syntax screwed?
|
||||
# namedtype.OptionalNamedType('parameters', univ.ObjectIdentifier())
|
||||
)
|
||||
|
||||
class Extension(univ.Sequence):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('extnID', univ.ObjectIdentifier()),
|
||||
namedtype.DefaultedNamedType('critical', univ.Boolean('False')),
|
||||
namedtype.NamedType('extnValue', univ.OctetString())
|
||||
)
|
||||
|
||||
class Extensions(univ.SequenceOf):
|
||||
componentType = Extension()
|
||||
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
|
||||
|
||||
class SubjectPublicKeyInfo(univ.Sequence):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('algorithm', AlgorithmIdentifier()),
|
||||
namedtype.NamedType('subjectPublicKey', univ.BitString())
|
||||
)
|
||||
|
||||
class UniqueIdentifier(univ.BitString): pass
|
||||
|
||||
class Time(univ.Choice):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('utcTime', useful.UTCTime()),
|
||||
namedtype.NamedType('generalTime', useful.GeneralizedTime())
|
||||
)
|
||||
|
||||
class Validity(univ.Sequence):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('notBefore', Time()),
|
||||
namedtype.NamedType('notAfter', Time())
|
||||
)
|
||||
|
||||
class CertificateSerialNumber(univ.Integer): pass
|
||||
|
||||
class Version(univ.Integer):
|
||||
namedValues = namedval.NamedValues(
|
||||
('v1', 0), ('v2', 1), ('v3', 2)
|
||||
)
|
||||
|
||||
class TBSCertificate(univ.Sequence):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.DefaultedNamedType('version', Version('v1', tagSet=Version.tagSet.tagExplicitly(tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))),
|
||||
namedtype.NamedType('serialNumber', CertificateSerialNumber()),
|
||||
namedtype.NamedType('signature', AlgorithmIdentifier()),
|
||||
namedtype.NamedType('issuer', Name()),
|
||||
namedtype.NamedType('validity', Validity()),
|
||||
namedtype.NamedType('subject', Name()),
|
||||
namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()),
|
||||
namedtype.OptionalNamedType('issuerUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))),
|
||||
namedtype.OptionalNamedType('subjectUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))),
|
||||
namedtype.OptionalNamedType('extensions', Extensions().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3)))
|
||||
)
|
||||
|
||||
class Certificate(univ.Sequence):
|
||||
componentType = namedtype.NamedTypes(
|
||||
namedtype.NamedType('tbsCertificate', TBSCertificate()),
|
||||
namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()),
|
||||
namedtype.NamedType('signatureValue', univ.BitString())
|
||||
)
|
||||
|
||||
def get_version(self):
|
||||
info = self.getComponentByName('tbsCertificate')
|
||||
version = info.getComponentByName('version')
|
||||
return version._value
|
||||
|
||||
def get_subject(self):
|
||||
info = self.getComponentByName('tbsCertificate')
|
||||
return info.getComponentByName('subject')
|
||||
|
||||
def get_serial_number(self):
|
||||
'return the serial number as a Python long object'
|
||||
info = self.getComponentByName('tbsCertificate')
|
||||
return long(info.getComponentByName('serialNumber'))
|
||||
|
||||
# end of ASN.1 data structures
|
||||
|
||||
def strip_header(pem):
|
||||
"""
|
||||
Remove the header and footer from a certificate.
|
||||
@ -205,69 +37,53 @@ def strip_header(pem):
|
||||
|
||||
return pem
|
||||
|
||||
|
||||
def load_certificate(data, type=PEM):
|
||||
def load_certificate(data, datatype=PEM, dbdir=None):
|
||||
"""
|
||||
Given a base64-encoded certificate, with or without the
|
||||
header/footer, return a request object.
|
||||
|
||||
Returns a nss.Certificate type
|
||||
"""
|
||||
if (type == PEM):
|
||||
if type(data) in (tuple, list):
|
||||
data = data[0]
|
||||
|
||||
if (datatype == PEM):
|
||||
data = strip_header(data)
|
||||
data = base64.b64decode(data)
|
||||
|
||||
return decoder.decode(data, asn1Spec=Certificate())[0]
|
||||
if dbdir is None:
|
||||
if api.env.in_tree:
|
||||
dbdir = api.env.dot_ipa + os.sep + 'alias'
|
||||
else:
|
||||
dbdir = "/etc/httpd/alias"
|
||||
|
||||
def get_subject_components(certificate, type=PEM):
|
||||
nss.nss_init(dbdir)
|
||||
return nss.Certificate(buffer(data))
|
||||
|
||||
def get_subject(certificate, datatype=PEM):
|
||||
"""
|
||||
Load an X509.3 certificate and get the subject.
|
||||
|
||||
Return a tuple of a certificate subject.
|
||||
(('CN', u'www.example.com'), ('O', u'IPA'))
|
||||
"""
|
||||
|
||||
x509cert = load_certificate(certificate, type)
|
||||
return x509cert.get_subject().get_components()
|
||||
cert = load_certificate(certificate, datatype)
|
||||
return cert.subject
|
||||
|
||||
def get_serial_number(certificate, type=PEM):
|
||||
def get_serial_number(certificate, datatype=PEM):
|
||||
"""
|
||||
Return the serial number of a certificate as a Python long object.
|
||||
Return the decimal value of the serial number.
|
||||
"""
|
||||
x509cert = load_certificate(certificate, type)
|
||||
return x509cert.get_serial_number()
|
||||
cert = load_certificate(certificate, datatype)
|
||||
return cert.serial_number
|
||||
|
||||
if __name__ == '__main__':
|
||||
certType = Certificate()
|
||||
|
||||
# Read PEM certs from stdin and print them out in plain text
|
||||
nss.nss_init_nodb()
|
||||
|
||||
stSpam, stHam, stDump = 0, 1, 2
|
||||
state = stSpam
|
||||
certCnt = 0
|
||||
# Read PEM certs from stdin and print out its components
|
||||
|
||||
for certLine in sys.stdin.readlines():
|
||||
certLine = string.strip(certLine)
|
||||
if state == stSpam:
|
||||
if state == stSpam:
|
||||
if certLine == '-----BEGIN CERTIFICATE-----':
|
||||
certLines = []
|
||||
state = stHam
|
||||
continue
|
||||
if state == stHam:
|
||||
if certLine == '-----END CERTIFICATE-----':
|
||||
state = stDump
|
||||
else:
|
||||
certLines.append(certLine)
|
||||
if state == stDump:
|
||||
substrate = ''
|
||||
for certLine in certLines:
|
||||
substrate = substrate + base64.b64decode(certLine)
|
||||
certlines = sys.stdin.readlines()
|
||||
cert = ''.join(certlines)
|
||||
|
||||
cert = decoder.decode(substrate, asn1Spec=certType)[0]
|
||||
print cert.prettyPrint()
|
||||
cert = load_certificate(cert)
|
||||
|
||||
assert encoder.encode(cert) == substrate, 'cert recode fails'
|
||||
|
||||
certCnt = certCnt + 1
|
||||
state = stSpam
|
||||
|
||||
print '*** %s PEM cert(s) de/serialized' % certCnt
|
||||
print cert
|
||||
|
@ -188,7 +188,6 @@ class NSPRConnection(httplib.HTTPConnection):
|
||||
httplib.HTTPConnection.__init__(self, host, port, strict)
|
||||
|
||||
logging.debug('%s init %s', self.__class__.__name__, host)
|
||||
nss.nss_init_nodb()
|
||||
|
||||
self.sock = io.Socket()
|
||||
def connect(self):
|
||||
|
Loading…
Reference in New Issue
Block a user