mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Make data type of certificates more obvious/predictable internally.
For the most part certificates will be treated as being in DER format. When we load a certificate we will generally accept it in any format but will convert it to DER before proceeding in normalize_certificate(). This also re-arranges a bit of code to pull some certificate-specific functions out of ipalib/plugins/service.py into ipalib/x509.py. This also tries to use variable names to indicate what format the certificate is in at any given point: dercert: DER cert: PEM nsscert: a python-nss Certificate object rawcert: unknown format ticket 32
This commit is contained in:
@@ -81,7 +81,7 @@ def check_compliance(tmpdir, debug=False):
|
||||
api.bootstrap(**cfg)
|
||||
api.register(client)
|
||||
api.finalize()
|
||||
from ipalib.plugins.service import normalize_certificate, make_pem
|
||||
from ipalib.x509 import normalize_certificate, make_pem
|
||||
|
||||
try:
|
||||
# Create a new credentials cache for this tool. This executes
|
||||
|
@@ -87,10 +87,9 @@ from ipalib import Command, Str, Int, Bytes, Flag, File
|
||||
from ipalib import errors
|
||||
from ipalib import pkcs10
|
||||
from ipalib import x509
|
||||
from ipalib import util
|
||||
from ipalib.plugins.virtual import *
|
||||
from ipalib.plugins.service import split_principal
|
||||
from ipalib.plugins.service import make_pem, check_writable_file
|
||||
from ipalib.plugins.service import write_certificate
|
||||
import base64
|
||||
import logging
|
||||
import traceback
|
||||
@@ -501,10 +500,10 @@ class cert_show(VirtualCommand):
|
||||
|
||||
def forward(self, *keys, **options):
|
||||
if 'out' in options:
|
||||
check_writable_file(options['out'])
|
||||
util.check_writable_file(options['out'])
|
||||
result = super(cert_show, self).forward(*keys, **options)
|
||||
if 'certificate' in result['result']:
|
||||
write_certificate(result['result']['certificate'], options['out'])
|
||||
x509.write_certificate(result['result']['certificate'], options['out'])
|
||||
return result
|
||||
else:
|
||||
raise errors.NoCertificateError(entry=keys[-1])
|
||||
|
@@ -78,7 +78,8 @@ import base64
|
||||
from OpenSSL import crypto
|
||||
from ipapython.ipautil import run
|
||||
from ipalib.request import context
|
||||
from ipalib.plugins.service import validate_certificate, normalize_certificate
|
||||
from ipalib.plugins.service import validate_certificate
|
||||
from ipalib import x509
|
||||
|
||||
import locale
|
||||
|
||||
@@ -101,16 +102,6 @@ def read_pkcs12_pin():
|
||||
fp.close()
|
||||
return pwd
|
||||
|
||||
def make_pem(data):
|
||||
"""
|
||||
The M2Crypto/openSSL modules are very picky about PEM format and
|
||||
require lines split to 64 characters with proper headers.
|
||||
"""
|
||||
cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
|
||||
return '-----BEGIN CERTIFICATE-----\n' + \
|
||||
cert + \
|
||||
'\n-----END CERTIFICATE-----'
|
||||
|
||||
def get_pool(ldap):
|
||||
"""
|
||||
Get our entitlement pool. Assume there is only one pool.
|
||||
@@ -256,7 +247,7 @@ class entitle_status(VirtualCommand):
|
||||
if u'usercertificate' in registrations:
|
||||
certs = registrations['usercertificate']
|
||||
for cert in certs:
|
||||
cert = make_pem(base64.b64encode(cert))
|
||||
cert = x509.make_pem(base64.b64encode(cert))
|
||||
try:
|
||||
pc = EntitlementCertificate(cert)
|
||||
o = pc.getOrder()
|
||||
@@ -358,7 +349,7 @@ class entitle_consume(LDAPUpdate):
|
||||
results = cp.getCertificates(uuid)
|
||||
usercertificate = []
|
||||
for cert in results:
|
||||
usercertificate.append(normalize_certificate(cert['cert']))
|
||||
usercertificate.append(x509.normalize_certificate(cert['cert']))
|
||||
entry_attrs['usercertificate'] = usercertificate
|
||||
entry_attrs['ipaentitlementid'] = uuid
|
||||
finally:
|
||||
@@ -427,7 +418,7 @@ class entitle_get(VirtualCommand):
|
||||
if u'usercertificate' in registrations:
|
||||
# make it look like a UEP cert
|
||||
for cert in registrations['usercertificate']:
|
||||
certs.append(dict(cert = make_pem(base64.b64encode(cert))))
|
||||
certs.append(dict(cert = x509.make_pem(base64.b64encode(cert))))
|
||||
else:
|
||||
try:
|
||||
cp = UEPConnection(handler='/candlepin', cert_file=certfile, key_file=keyfile)
|
||||
@@ -626,8 +617,8 @@ class entitle_import(LDAPUpdate):
|
||||
|
||||
try:
|
||||
entry_attrs['ipaentitlementid'] = unicode('IMPORTED')
|
||||
newcert = normalize_certificate(keys[-1][0])
|
||||
cert = make_pem(base64.b64encode(newcert))
|
||||
newcert = x509.normalize_certificate(keys[-1][0])
|
||||
cert = x509.make_pem(base64.b64encode(newcert))
|
||||
try:
|
||||
pc = EntitlementCertificate(cert)
|
||||
o = pc.getOrder()
|
||||
@@ -645,7 +636,7 @@ class entitle_import(LDAPUpdate):
|
||||
# First import, create the entry
|
||||
entry_attrs['ipaentitlementid'] = unicode('IMPORTED')
|
||||
entry_attrs['objectclass'] = self.obj.object_class
|
||||
entry_attrs['usercertificate'] = normalize_certificate(keys[-1][0])
|
||||
entry_attrs['usercertificate'] = x509.normalize_certificate(keys[-1][0])
|
||||
ldap.add_entry(dn, entry_attrs)
|
||||
setattr(context, 'entitle_import', True)
|
||||
|
||||
@@ -717,7 +708,7 @@ class entitle_sync(LDAPUpdate):
|
||||
results = cp.getCertificates(uuid)
|
||||
usercertificate = []
|
||||
for cert in results:
|
||||
usercertificate.append(normalize_certificate(cert['cert']))
|
||||
usercertificate.append(x509.normalize_certificate(cert['cert']))
|
||||
entry_attrs['usercertificate'] = usercertificate
|
||||
entry_attrs['ipaentitlementid'] = uuid
|
||||
finally:
|
||||
|
@@ -81,11 +81,7 @@ from ipalib import Str, Flag, Bytes
|
||||
from ipalib.plugins.baseldap import *
|
||||
from ipalib.plugins.service import split_principal
|
||||
from ipalib.plugins.service import validate_certificate
|
||||
from ipalib.plugins.service import normalize_certificate
|
||||
from ipalib.plugins.service import set_certificate_attrs
|
||||
from ipalib.plugins.service import make_pem, check_writable_file
|
||||
from ipalib.plugins.service import write_certificate
|
||||
from ipalib.plugins.service import verify_cert_subject
|
||||
from ipalib.plugins.dns import dns_container_exists, _record_types
|
||||
from ipalib.plugins.dns import add_forward_record
|
||||
from ipalib import _, ngettext
|
||||
@@ -423,8 +419,8 @@ class host_add(LDAPCreate):
|
||||
del entry_attrs['random']
|
||||
cert = options.get('usercertificate')
|
||||
if cert:
|
||||
cert = normalize_certificate(cert)
|
||||
verify_cert_subject(ldap, keys[-1], cert)
|
||||
cert = x509.normalize_certificate(cert)
|
||||
x509.verify_cert_subject(ldap, keys[-1], cert)
|
||||
entry_attrs['usercertificate'] = cert
|
||||
entry_attrs['managedby'] = dn
|
||||
return dn
|
||||
@@ -562,7 +558,7 @@ class host_del(LDAPDelete):
|
||||
self.obj.handle_not_found(*keys)
|
||||
|
||||
if 'usercertificate' in entry_attrs:
|
||||
cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
|
||||
cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
@@ -626,12 +622,12 @@ class host_mod(LDAPUpdate):
|
||||
if 'krbprincipalaux' not in obj_classes:
|
||||
obj_classes.append('krbprincipalaux')
|
||||
entry_attrs['objectclass'] = obj_classes
|
||||
cert = normalize_certificate(entry_attrs.get('usercertificate'))
|
||||
cert = x509.normalize_certificate(entry_attrs.get('usercertificate'))
|
||||
if cert:
|
||||
verify_cert_subject(ldap, keys[-1], cert)
|
||||
x509.verify_cert_subject(ldap, keys[-1], cert)
|
||||
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
|
||||
if 'usercertificate' in entry_attrs_old:
|
||||
oldcert = normalize_certificate(entry_attrs_old.get('usercertificate')[0])
|
||||
oldcert = x509.normalize_certificate(entry_attrs_old.get('usercertificate')[0])
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(oldcert, x509.DER))
|
||||
try:
|
||||
@@ -733,10 +729,10 @@ class host_show(LDAPRetrieve):
|
||||
|
||||
def forward(self, *keys, **options):
|
||||
if 'out' in options:
|
||||
check_writable_file(options['out'])
|
||||
util.check_writable_file(options['out'])
|
||||
result = super(host_show, self).forward(*keys, **options)
|
||||
if 'usercertificate' in result['result']:
|
||||
write_certificate(result['result']['usercertificate'][0], options['out'])
|
||||
x509.write_certificate(result['result']['usercertificate'][0], options['out'])
|
||||
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
|
||||
return result
|
||||
else:
|
||||
@@ -792,7 +788,7 @@ class host_disable(LDAPQuery):
|
||||
except errors.AlreadyInactive:
|
||||
pass
|
||||
if 'usercertificate' in entry_attrs:
|
||||
cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
|
||||
cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
|
@@ -171,60 +171,6 @@ def validate_certificate(ugettext, cert):
|
||||
# We'll assume this is DER data
|
||||
pass
|
||||
|
||||
def normalize_certificate(cert):
|
||||
"""
|
||||
Incoming certificates should be DER-encoded.
|
||||
|
||||
Note that this can't be a normalizer on the Param because only unicode
|
||||
variables are normalized.
|
||||
"""
|
||||
if not cert:
|
||||
return cert
|
||||
|
||||
s = cert.find('-----BEGIN CERTIFICATE-----')
|
||||
if s > -1:
|
||||
e = cert.find('-----END CERTIFICATE-----')
|
||||
cert = cert[s+27:e]
|
||||
|
||||
if util.isvalid_base64(cert):
|
||||
try:
|
||||
cert = base64.b64decode(cert)
|
||||
except Exception, e:
|
||||
raise errors.Base64DecodeError(reason=str(e))
|
||||
|
||||
# At this point we should have a certificate, either because the data
|
||||
# was base64-encoded and now its not or it came in as DER format.
|
||||
# Let's decode it and see. Fetching the serial number will pass the
|
||||
# certificate through the NSS DER parser.
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
except NSPRError, nsprerr:
|
||||
if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
|
||||
raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate')
|
||||
else:
|
||||
raise errors.CertificateFormatError(error=str(nsprerr))
|
||||
|
||||
return cert
|
||||
|
||||
def verify_cert_subject(ldap, hostname, cert):
|
||||
"""
|
||||
Verify that the certificate issuer we're adding matches the issuer
|
||||
base of our installation.
|
||||
|
||||
This assumes the certificate has already been normalized.
|
||||
|
||||
This raises an exception on errors and returns nothing otherwise.
|
||||
"""
|
||||
cert = x509.load_certificate(cert, datatype=x509.DER)
|
||||
subject = str(cert.subject)
|
||||
issuer = str(cert.issuer)
|
||||
|
||||
# Handle both supported forms of issuer, from selfsign and dogtag.
|
||||
if ((issuer != 'CN=%s Certificate Authority' % api.env.realm) and
|
||||
(issuer != 'CN=Certificate Authority,O=%s' % api.env.realm)):
|
||||
raise errors.CertificateOperationError(error=_('Issuer "%(issuer)s" does not match the expected issuer') % \
|
||||
{'issuer' : issuer})
|
||||
|
||||
def set_certificate_attrs(entry_attrs):
|
||||
"""
|
||||
Set individual attributes from some values from a certificate.
|
||||
@@ -239,7 +185,7 @@ def set_certificate_attrs(entry_attrs):
|
||||
cert = entry_attrs['usercertificate'][0]
|
||||
else:
|
||||
cert = entry_attrs['usercertificate']
|
||||
cert = normalize_certificate(cert)
|
||||
cert = x509.normalize_certificate(cert)
|
||||
cert = x509.load_certificate(cert, datatype=x509.DER)
|
||||
entry_attrs['subject'] = unicode(cert.subject)
|
||||
entry_attrs['serial_number'] = unicode(cert.serial_number)
|
||||
@@ -249,50 +195,6 @@ def set_certificate_attrs(entry_attrs):
|
||||
entry_attrs['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
|
||||
entry_attrs['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
|
||||
|
||||
def check_writable_file(filename):
|
||||
"""
|
||||
Determine if the file is writable. If the file doesn't exist then
|
||||
open the file to test writability.
|
||||
"""
|
||||
if filename is None:
|
||||
raise errors.FileError(reason='Filename is empty')
|
||||
try:
|
||||
if file_exists(filename):
|
||||
if not os.access(filename, os.W_OK):
|
||||
raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename))
|
||||
else:
|
||||
fp = open(filename, 'w')
|
||||
fp.close()
|
||||
except (IOError, OSError), e:
|
||||
raise errors.FileError(reason=str(e))
|
||||
|
||||
def make_pem(data):
|
||||
"""
|
||||
Convert a raw base64-encoded blob into something that looks like a PE
|
||||
file with lines split to 64 characters and proper headers.
|
||||
"""
|
||||
cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
|
||||
return '-----BEGIN CERTIFICATE-----\n' + \
|
||||
cert + \
|
||||
'\n-----END CERTIFICATE-----'
|
||||
|
||||
def write_certificate(cert, filename):
|
||||
"""
|
||||
Check to see if the certificate should be written to a file and do so.
|
||||
"""
|
||||
if cert and util.isvalid_base64(cert):
|
||||
try:
|
||||
cert = base64.b64decode(cert)
|
||||
except Exception, e:
|
||||
raise errors.Base64DecodeError(reason=str(e))
|
||||
|
||||
try:
|
||||
fp = open(filename, 'w')
|
||||
fp.write(make_pem(base64.b64encode(cert)))
|
||||
fp.close()
|
||||
except (IOError, OSError), e:
|
||||
raise errors.FileError(reason=str(e))
|
||||
|
||||
class service(LDAPObject):
|
||||
"""
|
||||
Service object.
|
||||
@@ -361,9 +263,9 @@ class service_add(LDAPCreate):
|
||||
|
||||
cert = options.get('usercertificate')
|
||||
if cert:
|
||||
cert = normalize_certificate(cert)
|
||||
verify_cert_subject(ldap, hostname, cert)
|
||||
entry_attrs['usercertificate'] = cert
|
||||
dercert = x509.normalize_certificate(cert)
|
||||
x509.verify_cert_subject(ldap, hostname, dercert)
|
||||
entry_attrs['usercertificate'] = dercert
|
||||
|
||||
if not options.get('force', False):
|
||||
# We know the host exists if we've gotten this far but we
|
||||
@@ -430,8 +332,8 @@ class service_mod(LDAPUpdate):
|
||||
(service, hostname, realm) = split_principal(keys[-1])
|
||||
cert = options.get('usercertificate')
|
||||
if cert:
|
||||
cert = normalize_certificate(cert)
|
||||
verify_cert_subject(ldap, hostname, cert)
|
||||
dercert = x509.normalize_certificate(cert)
|
||||
x509.verify_cert_subject(ldap, hostname, dercert)
|
||||
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
|
||||
if 'usercertificate' in entry_attrs_old:
|
||||
# FIXME: what to do here? do we revoke the old cert?
|
||||
@@ -439,7 +341,7 @@ class service_mod(LDAPUpdate):
|
||||
x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
|
||||
)
|
||||
raise errors.GenericError(format=fmt)
|
||||
entry_attrs['usercertificate'] = cert
|
||||
entry_attrs['usercertificate'] = dercert
|
||||
else:
|
||||
entry_attrs['usercertificate'] = None
|
||||
return dn
|
||||
@@ -513,10 +415,10 @@ class service_show(LDAPRetrieve):
|
||||
|
||||
def forward(self, *keys, **options):
|
||||
if 'out' in options:
|
||||
check_writable_file(options['out'])
|
||||
util.check_writable_file(options['out'])
|
||||
result = super(service_show, self).forward(*keys, **options)
|
||||
if 'usercertificate' in result['result']:
|
||||
write_certificate(result['result']['usercertificate'][0], options['out'])
|
||||
x509.write_certificate(result['result']['usercertificate'][0], options['out'])
|
||||
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
|
||||
return result
|
||||
else:
|
||||
@@ -564,7 +466,7 @@ class service_disable(LDAPQuery):
|
||||
done_work = False
|
||||
|
||||
if 'usercertificate' in entry_attrs:
|
||||
cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
|
||||
cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
|
||||
try:
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
try:
|
||||
|
@@ -30,6 +30,7 @@ import re
|
||||
from types import NoneType
|
||||
|
||||
from ipalib import errors
|
||||
from ipalib.text import _
|
||||
from ipapython import dnsclient
|
||||
|
||||
|
||||
@@ -185,3 +186,20 @@ def validate_ipaddr(ipaddr):
|
||||
except socket.error:
|
||||
return False
|
||||
return True
|
||||
|
||||
def check_writable_file(filename):
|
||||
"""
|
||||
Determine if the file is writable. If the file doesn't exist then
|
||||
open the file to test writability.
|
||||
"""
|
||||
if filename is None:
|
||||
raise errors.FileError(reason='Filename is empty')
|
||||
try:
|
||||
if os.path.exists(filename):
|
||||
if not os.access(filename, os.W_OK):
|
||||
raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename))
|
||||
else:
|
||||
fp = open(filename, 'w')
|
||||
fp.close()
|
||||
except (IOError, OSError), e:
|
||||
raise errors.FileError(reason=str(e))
|
||||
|
116
ipalib/x509.py
116
ipalib/x509.py
@@ -17,12 +17,30 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Certificates should be stored internally DER-encoded. We can be passed
|
||||
# a certificate several ways: read if from LDAP, read it from a 3rd party
|
||||
# app (dogtag, candlepin, etc) or as user input. The normalize_certificate()
|
||||
# function will convert an incoming certificate to DER-encoding.
|
||||
|
||||
# Conventions
|
||||
#
|
||||
# Where possible the following naming conventions are used:
|
||||
#
|
||||
# cert: the certificate is a PEM-encoded certificate
|
||||
# dercert: the certificate is DER-encoded
|
||||
# nsscert: the certificate is an NSS Certificate object
|
||||
# rawcert: the cert is in an unknown format
|
||||
|
||||
import os
|
||||
import sys
|
||||
import base64
|
||||
import nss.nss as nss
|
||||
from nss.error import NSPRError
|
||||
from ipapython import ipautil
|
||||
from ipalib import api
|
||||
from ipalib import _
|
||||
from ipalib import util
|
||||
from ipalib import errors
|
||||
|
||||
PEM = 0
|
||||
DER = 1
|
||||
@@ -66,17 +84,103 @@ def get_subject(certificate, datatype=PEM):
|
||||
Load an X509.3 certificate and get the subject.
|
||||
"""
|
||||
|
||||
cert = load_certificate(certificate, datatype)
|
||||
return cert.subject
|
||||
nsscert = load_certificate(certificate, datatype)
|
||||
return nsscert.subject
|
||||
|
||||
def get_serial_number(certificate, datatype=PEM):
|
||||
"""
|
||||
Return the decimal value of the serial number.
|
||||
"""
|
||||
cert = load_certificate(certificate, datatype)
|
||||
return cert.serial_number
|
||||
nsscert = load_certificate(certificate, datatype)
|
||||
return nsscert.serial_number
|
||||
|
||||
def make_pem(data):
|
||||
"""
|
||||
Convert a raw base64-encoded blob into something that looks like a PE
|
||||
file with lines split to 64 characters and proper headers.
|
||||
"""
|
||||
pemcert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
|
||||
return '-----BEGIN CERTIFICATE-----\n' + \
|
||||
pemcert + \
|
||||
'\n-----END CERTIFICATE-----'
|
||||
|
||||
def normalize_certificate(rawcert):
|
||||
"""
|
||||
Incoming certificates should be DER-encoded. If not it is converted to
|
||||
DER-format.
|
||||
|
||||
Note that this can't be a normalizer on a Param because only unicode
|
||||
variables are normalized.
|
||||
"""
|
||||
if not rawcert:
|
||||
return None
|
||||
|
||||
rawcert = strip_header(rawcert)
|
||||
|
||||
if util.isvalid_base64(rawcert):
|
||||
try:
|
||||
dercert = base64.b64decode(rawcert)
|
||||
except Exception, e:
|
||||
raise errors.Base64DecodeError(reason=str(e))
|
||||
else:
|
||||
dercert = rawcert
|
||||
|
||||
# At this point we should have a certificate, either because the data
|
||||
# was base64-encoded and now its not or it came in as DER format.
|
||||
# Let's decode it and see. Fetching the serial number will pass the
|
||||
# certificate through the NSS DER parser.
|
||||
try:
|
||||
serial = unicode(get_serial_number(dercert, DER))
|
||||
except NSPRError, nsprerr:
|
||||
if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
|
||||
raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate')
|
||||
else:
|
||||
raise errors.CertificateFormatError(error=str(nsprerr))
|
||||
|
||||
return dercert
|
||||
|
||||
def write_certificate(rawcert, filename):
|
||||
"""
|
||||
Write the certificate to a file in PEM format.
|
||||
|
||||
The cert value can be either DER or PEM-encoded, it will be normalized
|
||||
to DER regardless, then back out to PEM.
|
||||
"""
|
||||
dercert = normalize_certificate(rawcert)
|
||||
|
||||
try:
|
||||
fp = open(filename, 'w')
|
||||
fp.write(make_pem(base64.b64encode(dercert)))
|
||||
fp.close()
|
||||
except (IOError, OSError), e:
|
||||
raise errors.FileError(reason=str(e))
|
||||
|
||||
def verify_cert_subject(ldap, hostname, dercert):
|
||||
"""
|
||||
Verify that the certificate issuer we're adding matches the issuer
|
||||
base of our installation.
|
||||
|
||||
This assumes the certificate has already been normalized.
|
||||
|
||||
This raises an exception on errors and returns nothing otherwise.
|
||||
"""
|
||||
nsscert = load_certificate(dercert, datatype=DER)
|
||||
subject = str(nsscert.subject)
|
||||
issuer = str(nsscert.issuer)
|
||||
|
||||
# Handle both supported forms of issuer, from selfsign and dogtag.
|
||||
if ((issuer != 'CN=%s Certificate Authority' % api.env.realm) and
|
||||
(issuer != 'CN=Certificate Authority,O=%s' % api.env.realm)):
|
||||
raise errors.CertificateOperationError(error=_('Issuer "%(issuer)s" does not match the expected issuer') % \
|
||||
{'issuer' : issuer})
|
||||
|
||||
if __name__ == '__main__':
|
||||
# this can be run with:
|
||||
# python ipalib/x509.py < /etc/ipa/ca.crt
|
||||
|
||||
from ipalib import api
|
||||
api.bootstrap()
|
||||
api.finalize()
|
||||
|
||||
nss.nss_init_nodb()
|
||||
|
||||
@@ -85,6 +189,6 @@ if __name__ == '__main__':
|
||||
certlines = sys.stdin.readlines()
|
||||
cert = ''.join(certlines)
|
||||
|
||||
cert = load_certificate(cert)
|
||||
nsscert = load_certificate(cert)
|
||||
|
||||
print cert
|
||||
print nsscert
|
||||
|
@@ -38,7 +38,7 @@ import stat
|
||||
import socket
|
||||
from ipapython import dogtag
|
||||
from ipapython.certdb import get_ca_nickname
|
||||
from ipalib import pkcs10
|
||||
from ipalib import pkcs10, x509
|
||||
import subprocess
|
||||
|
||||
from nss.error import NSPRError
|
||||
@@ -322,7 +322,7 @@ class CADSInstance(service.Service):
|
||||
|
||||
# We only handle one server cert
|
||||
self.nickname = server_certs[0][0]
|
||||
self.dercert = dsdb.get_cert_from_db(self.nickname)
|
||||
self.dercert = dsdb.get_cert_from_db(self.nickname, pem=False)
|
||||
dsdb.track_server_cert(self.nickname, self.principal, dsdb.passwd_fname)
|
||||
|
||||
def create_certdb(self):
|
||||
@@ -721,13 +721,6 @@ class CAInstance(service.Service):
|
||||
# TODO: roll back here?
|
||||
logging.critical("Failed to restart the certificate server. See the installation log for details.")
|
||||
|
||||
def __get_agent_cert(self, nickname):
|
||||
args = ["/usr/bin/certutil", "-L", "-d", self.ca_agent_db, "-n", nickname, "-a"]
|
||||
(out, err, returncode) = ipautil.run(args)
|
||||
out = out.replace('-----BEGIN CERTIFICATE-----', '')
|
||||
out = out.replace('-----END CERTIFICATE-----', '')
|
||||
return out
|
||||
|
||||
def __issue_ra_cert(self):
|
||||
# The CA certificate is in the agent DB but isn't trusted
|
||||
(admin_fd, admin_name) = tempfile.mkstemp()
|
||||
@@ -801,8 +794,7 @@ class CAInstance(service.Service):
|
||||
|
||||
self.ra_cert = outputList['b64_cert']
|
||||
self.ra_cert = self.ra_cert.replace('\\n','')
|
||||
self.ra_cert = self.ra_cert.replace('-----BEGIN CERTIFICATE-----','')
|
||||
self.ra_cert = self.ra_cert.replace('-----END CERTIFICATE-----','')
|
||||
self.ra_cert = x509.strip_header(self.ra_cert)
|
||||
|
||||
# Add the new RA cert to the database in /etc/httpd/alias
|
||||
(agent_fd, agent_name) = tempfile.mkstemp()
|
||||
|
@@ -432,11 +432,22 @@ class CertDB(object):
|
||||
except RuntimeError:
|
||||
break
|
||||
|
||||
def get_cert_from_db(self, nickname):
|
||||
def get_cert_from_db(self, nickname, pem=True):
|
||||
"""
|
||||
Retrieve a certificate from the current NSS database for nickname.
|
||||
|
||||
pem controls whether the value returned PEM or DER-encoded. The
|
||||
default is the data straight from certutil -a.
|
||||
"""
|
||||
try:
|
||||
args = ["-L", "-n", nickname, "-a"]
|
||||
(cert, err, returncode) = self.run_certutil(args)
|
||||
return cert
|
||||
if pem:
|
||||
return cert
|
||||
else:
|
||||
(cert, start) = find_cert_from_txt(cert, start=0)
|
||||
dercert = base64.b64decode(cert)
|
||||
return dercert
|
||||
except ipautil.CalledProcessError:
|
||||
return ''
|
||||
|
||||
@@ -501,6 +512,8 @@ class CertDB(object):
|
||||
that will issue our cert.
|
||||
|
||||
You can override the certificate Subject by specifying a subject.
|
||||
|
||||
Returns a certificate in DER format.
|
||||
"""
|
||||
cdb = other_certdb
|
||||
if not cdb:
|
||||
|
@@ -379,7 +379,7 @@ class DsInstance(service.Service):
|
||||
logging.debug("completed creating ds instance")
|
||||
except ipautil.CalledProcessError, e:
|
||||
logging.critical("failed to restart ds instance %s" % e)
|
||||
|
||||
|
||||
# check for open port 389 from now on
|
||||
self.open_ports.append(389)
|
||||
|
||||
@@ -517,7 +517,7 @@ class DsInstance(service.Service):
|
||||
|
||||
# We only handle one server cert
|
||||
nickname = server_certs[0][0]
|
||||
self.dercert = dsdb.get_cert_from_db(nickname)
|
||||
self.dercert = dsdb.get_cert_from_db(nickname, pem=False)
|
||||
dsdb.track_server_cert(nickname, self.principal, dsdb.passwd_fname)
|
||||
else:
|
||||
nickname = "Server-Cert"
|
||||
|
@@ -185,7 +185,7 @@ class HTTPInstance(service.Service):
|
||||
db.create_password_conf()
|
||||
# We only handle one server cert
|
||||
nickname = server_certs[0][0]
|
||||
self.dercert = db.get_cert_from_db(nickname)
|
||||
self.dercert = db.get_cert_from_db(nickname, pem=False)
|
||||
db.track_server_cert(nickname, self.principal, db.passwd_fname)
|
||||
|
||||
self.__set_mod_nss_nickname(nickname)
|
||||
|
@@ -94,6 +94,7 @@ class Service(object):
|
||||
self.realm = None
|
||||
self.suffix = None
|
||||
self.principal = None
|
||||
self.dercert = None
|
||||
|
||||
def ldap_connect(self):
|
||||
self.admin_conn = self.__get_conn(self.fqdn, self.dm_password)
|
||||
@@ -192,23 +193,12 @@ class Service(object):
|
||||
"""
|
||||
Add a certificate to a service
|
||||
|
||||
This should be passed in DER format but we'll be nice and convert
|
||||
a base64-encoded cert if needed (like when we add certs that come
|
||||
from PKCS#12 files.)
|
||||
This server cert should be in DER format.
|
||||
"""
|
||||
|
||||
if not self.admin_conn:
|
||||
self.ldap_connect()
|
||||
|
||||
try:
|
||||
s = self.dercert.find('-----BEGIN CERTIFICATE-----')
|
||||
if s > -1:
|
||||
e = self.dercert.find('-----END CERTIFICATE-----')
|
||||
s = s + 27
|
||||
self.dercert = self.dercert[s:e]
|
||||
self.dercert = base64.b64decode(self.dercert)
|
||||
except Exception:
|
||||
pass
|
||||
dn = "krbprincipalname=%s,cn=services,cn=accounts,%s" % (self.principal, self.suffix)
|
||||
mod = [(ldap.MOD_ADD, 'userCertificate', self.dercert)]
|
||||
try:
|
||||
|
@@ -83,9 +83,6 @@ def makecert(reqdir):
|
||||
api.register(client)
|
||||
api.finalize()
|
||||
|
||||
# This needs to be imported after the API is initialized
|
||||
from ipalib.plugins.service import make_pem
|
||||
|
||||
ra = rabase.rabase()
|
||||
if not os.path.exists(ra.sec_dir) and api.env.xmlrpc_uri == 'http://localhost:8888/ipa/xml':
|
||||
sys.exit('The in-tree self-signed CA is not configured, see tests/test_xmlrpc/test_cert.py')
|
||||
@@ -108,7 +105,7 @@ def makecert(reqdir):
|
||||
try:
|
||||
res = api.Backend.client.run('cert_request', csr, principal=princ,
|
||||
add=True)
|
||||
cert = make_pem(res['result']['certificate'])
|
||||
cert = x509.make_pem(res['result']['certificate'])
|
||||
fd = open(CERTPATH, 'w')
|
||||
fd.write(cert)
|
||||
fd.close()
|
||||
|
Reference in New Issue
Block a user