Files
freeipa/ipaserver/plugins/dogtag.py
Rob Crittenden e4877c946f Only initialize the API once in the installer
Make the ldap2 plugin schema loader ignore SERVER_DOWN errors

525303
2009-09-28 22:17:01 -06:00

325 lines
12 KiB
Python

# Authors:
# Andrew Wnuk <awnuk@redhat.com>
# Jason Gerard DeRose <jderose@redhat.com>
# Rob Crittenden <rcritten@@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Backend plugin for RA using Dogtag.
"""
from ipalib import api, SkipPluginModule
if api.env.ra_plugin != 'dogtag':
# In this case, abort loading this plugin module...
raise SkipPluginModule(reason='dogtag not selected as RA plugin')
import os
from httplib import HTTPConnection
from urllib import urlencode
from ipaserver.plugins import rabase
import socket
from ipalib.errors import NetworkError
from ipalib.constants import TYPE_ERROR
from ipapython import nsslib
import nss.nss as nss
import xml.dom.minidom
def get_xml_value(doc, tagname):
try:
item_node = doc.getElementsByTagName(tagname)
return item_node[0].childNodes[0].data
except IndexError:
return None
class ra(rabase.rabase):
"""
Request Authority backend plugin.
"""
def __init__(self):
if api.env.home:
self.sec_dir = api.env.dot_ipa + os.sep + 'alias'
self.pwd_file = self.sec_dir + os.sep + '.pwd'
else:
self.sec_dir = "/etc/httpd/alias"
self.pwd_file = "/etc/httpd/alias/pwdfile.txt"
self.noise_file = self.sec_dir + os.sep + '.noise'
self.ipa_key_size = "2048"
self.ipa_certificate_nickname = "ipaCert"
self.ca_certificate_nickname = "caCert"
try:
f = open(self.pwd_file, "r")
self.password = f.readline().strip()
f.close()
except IOError:
self.password = ''
super(ra, self).__init__()
def _request(self, url, **kw):
"""
Perform an HTTP request.
:param url: The URL to post to.
:param kw: Keyword arguments to encode into POST body.
"""
uri = 'http://%s:%s%s' % (self.env.ca_host, self.env.ca_port, url)
post = urlencode(kw)
self.info('request %r', uri)
self.debug('request post %r', post)
conn = HTTPConnection(self.env.ca_host, self.env.ca_port)
try:
conn.request('POST', url,
body=post,
headers={'Content-type': 'application/x-www-form-urlencoded'},
)
except socket.error, e:
raise NetworkError(uri=uri, error=e.args[1])
response = conn.getresponse()
(status, reason) = (response.status, response.reason)
data = response.read()
conn.close()
self.debug('request status %r', status)
self.debug('request reason %s', reason)
self.debug('request data %s', data)
return (status, reason, data)
def _sslget(self, url, port, **kw):
"""
Perform an HTTPS request
:param url: The URL to post to.
:param kw: Keyword arguments to encode into POST body.
"""
uri = 'https://%s:%d%s' % (self.env.ca_host, port, url)
post = urlencode(kw)
self.info('sslget %r', uri)
self.debug('sslget post %r', post)
headers = {"Content-type": "application/x-www-form-urlencoded",
"Accept": "text/plain"}
conn = nsslib.NSSConnection(self.env.ca_host, port, dbdir=self.sec_dir)
conn.sslsock.set_client_auth_data_callback(nsslib.client_auth_data_callback, self.ipa_certificate_nickname, self.password, nss.get_default_certdb())
conn.set_debuglevel(10)
conn.request("POST", url, post, headers)
res = conn.getresponse()
(status, reason) = (res.status, res.reason)
data = res.read()
conn.close()
self.debug('sslget status %r', status)
self.debug('sslget reason %s', reason)
self.debug('sslget output %s', data)
return (status, reason, data)
def check_request_status(self, request_id):
"""
Check status of a certificate signing request.
:param request_id: request ID
"""
self.debug('%s.check_request_status()', self.fullname)
(s, r, data) = self._request('/ca/ee/ca/checkRequest',
requestId=request_id,
xmlOutput='true',
)
response = {'status': '2'}
if data is not None:
request_status = self.__find_substring(
data, 'header.status = "', '"'
)
if request_status is not None:
response['status'] = '0'
response['request_status'] = request_status
serial_number = self.__find_substring(
data, 'record.serialNumber="', '"'
)
if serial_number is not None:
# This was "0x"+serial_number, but we should return it in
# the same form used as arg to get_certificate(), etc.
response['serial_number'] = serial_number
request_id = self.__find_substring(
data, 'header.requestId = "', '"'
)
if request_id is not None:
response['request_id'] = request_id
error = self.__find_substring(
data, 'fixed.unexpectedError = "', '"'
)
if error is not None:
response['error'] = error
return response
def get_certificate(self, serial_number=None):
"""
Retrieve an existing certificate.
:param serial_number: certificate serial number
"""
self.debug('%s.get_certificate()', self.fullname)
issued_certificate = None
(status, reason, stdout) = self._sslget(
'/ca/agent/ca/displayBySerial',
self.env.ca_agent_port,
serialNumber=serial_number,
xmlOutput='true',
)
response = {}
if (status == 200):
issued_certificate = self.__find_substring(
stdout, 'header.certChainBase64 = "', '"'
)
if issued_certificate is not None:
response['status'] = '0'
issued_certificate = issued_certificate.replace('\\r', '')
issued_certificate = issued_certificate.replace('\\n', '')
self.debug('IPA-RA: issued_certificate: %s', issued_certificate)
response['certificate'] = issued_certificate
else:
response['status'] = '1'
revocation_reason = self.__find_substring(
stdout, 'header.revocationReason = ', ';'
)
if revocation_reason is not None:
response['revocation_reason'] = revocation_reason
else:
response['status'] = str(status)
return response
def request_certificate(self, csr, request_type='pkcs10'):
"""
Submit certificate signing request.
:param csr: The certificate signing request.
:param request_type: The request type (defaults to ``'pkcs10'``).
"""
self.debug('%s.request_certificate()', self.fullname)
certificate = None
(status, reason, stdout) = self._sslget('/ca/ee/ca/profileSubmit',
self.env.ca_ee_port,
profileId='caRAserverCert',
cert_request_type=request_type,
cert_request=csr,
xmlOutput='true',
)
response = {}
if (status == 200):
doc = xml.dom.minidom.parseString(stdout)
status = get_xml_value(doc, "Status")
if status is not None:
response["status"] = status
request_id = get_xml_value(doc, "Id")
if request_id is not None:
response["request_id"] = request_id
serial_number = get_xml_value(doc, "serialno")
if serial_number is not None:
response["serial_number"] = ("0x%s" % serial_number)
subject = get_xml_value(doc, "SubjectDN")
if subject is not None:
response["subject"] = subject
certificate = get_xml_value(doc, "b64")
if certificate is not None:
response["certificate"] = certificate
if response.has_key("status") is False:
response["status"] = "2"
doc.unlink()
else:
response["status"] = str(status)
return response
def revoke_certificate(self, serial_number, revocation_reason=0):
"""
Revoke a certificate.
The integer ``revocation_reason`` code must have one of these values:
* ``0`` - unspecified
* ``1`` - keyCompromise
* ``2`` - cACompromise
* ``3`` - affiliationChanged
* ``4`` - superseded
* ``5`` - cessationOfOperation
* ``6`` - certificateHold
* ``8`` - removeFromCRL
* ``9`` - privilegeWithdrawn
* ``10`` - aACompromise
Note that reason code ``7`` is not used. See RFC 5280 for more details:
http://www.ietf.org/rfc/rfc5280.txt
:param serial_number: Certificate serial number.
:param revocation_reason: Integer code of revocation reason.
"""
self.debug('%s.revoke_certificate()', self.fullname)
if type(revocation_reason) is not int:
raise TYPE_ERROR('revocation_reason', int, revocation_reason,
type(revocation_reason)
)
response = {}
(status, reason, stdout) = self._sslget('/ca/agent/ca/doRevoke',
self.env.ca_agent_port,
op='revoke',
revocationReason=revocation_reason,
revokeAll='(certRecordId=%s)' % serial_number,
totalRecordCount=1,
)
if status == 200:
response['status'] = '0'
if (stdout.find('revoked = "yes"') > -1):
response['revoked'] = True
else:
response['revoked'] = False
else:
response['status'] = str(status)
return response
def take_certificate_off_hold(self, serial_number):
"""
Take revoked certificate off hold.
:param serial_number: Certificate serial number.
"""
response = {}
self.debug('%s.take_certificate_off_hold()', self.fullname)
(status, reason, stdout) = self._sslget('/ca/agent/ca/doUnrevoke',
self.env.ca_agent_port,
serialNumber=serial_number,
)
if (status == 0):
if (stdout.find('unrevoked = "yes"') > -1):
response['taken_off_hold'] = True
else:
response['taken_off_hold'] = False
else:
response['status'] = str(status)
return response
def __find_substring(self, str, str1, str2):
sub_str = None
k0 = len(str)
k1 = str.find(str1)
k2 = len(str1)
if (k0 > 0 and k1 > -1 and k2 > 0 and k0 > k1 + k2):
sub_str = str[k1+k2:]
k3 = len(sub_str)
k4 = sub_str.find(str2)
if (k3 > 0 and k4 > -1 and k3 > k4):
sub_str = sub_str[:k4]
return sub_str
api.register(ra)