freeipa/install/tools/ipa-compliance
Pavel Zuna 64575a411b Use ldapi: instead of unsecured ldap: in ipa core tools.
The patch also corrects exception handling in some of the tools.

Fix #874
2011-03-03 14:04:34 -05:00

193 lines
6.1 KiB
Python

#!/usr/bin/env python
#
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# An LDAP client to count entitlements and log to syslog if the number is
# exceeded.
try:
import sys
import os
import syslog
import tempfile
import krbV
import base64
import shutil
from rhsm.certificate import EntitlementCertificate
from ipaserver.plugins.ldap2 import ldap2
from ipalib import api, errors, backend
except ImportError, e:
# If python-rhsm isn't installed exit gracefully and quietly.
if e.args[0] == 'No module named rhsm.certificate':
sys.exit(0)
print >> sys.stderr, """\
There was a problem importing one of the required Python modules. The
error was:
%s
""" % sys.exc_value
sys.exit(1)
# Each IPA server comes with this many entitlements
DEFAULT_ENTITLEMENTS = 25
class client(backend.Executioner):
"""
A simple-minded IPA client that can execute remote commands.
"""
def run(self, method, **kw):
self.create_context()
result = self.execute(method, **kw)
return result
def parse_options():
from optparse import OptionParser
parser = OptionParser()
parser.add_option("--debug", dest="debug", action="store_true",
default=False, help="enable debugging")
options, args = parser.parse_args()
return options, args
def check_compliance(tmpdir, debug=False):
cfg = dict(
context='cli',
in_server=False,
debug=debug,
verbose=0,
)
api.bootstrap(**cfg)
api.register(client)
api.finalize()
from ipalib.plugins.service import normalize_certificate, make_pem
try:
# Create a new credentials cache for this tool. This executes
# using the systems host principal.
ccache_file = 'FILE:%s/ccache' % tmpdir
krbcontext = krbV.default_context()
principal = str('host/%s@%s' % (api.env.host, api.env.realm))
keytab = krbV.Keytab(name='/etc/krb5.keytab', context=krbcontext)
principal = krbV.Principal(name=principal, context=krbcontext)
os.environ['KRB5CCNAME'] = ccache_file
ccache = krbV.CCache(name=ccache_file, context=krbcontext, primary_principal=principal)
ccache.init(principal)
ccache.init_creds_keytab(keytab=keytab, principal=principal)
except krbV.Krb5Error, e:
raise StandardError('Error initializing principal %s in %s: %s' % (principal.name, '/etc/krb5.keytab', str(e)))
# entitle-sync doesn't return any information we want to see, it just
# needs to be done so the LDAP data is correct.
try:
result = api.Backend.client.run('entitle_sync')
except errors.NotRegisteredError:
# Even if not registered they have some default entitlements
pass
conn = ldap2(shared_instance=False)
# Bind using GSSAPI
conn.connect(ccache=ccache_file)
hostcount = 0
# Get the hosts first
try:
(entries, truncated) = conn.find_entries('(krblastpwdchange=*)', ['dn'],
'%s,%s' % (api.env.container_host, api.env.basedn),
conn.SCOPE_ONELEVEL,
size_limit = -1)
except errors.NotFound:
# No hosts
pass
if not truncated:
hostcount = len(entries)
else:
# This will not happen unless we bump into a server-side limit.
msg = 'The host count result was truncated, they will be underreported'
syslog.syslog(syslog.LOG_ERR, msg)
if sys.stdin.isatty():
print msg
available = 0
try:
(entries, truncated) = conn.find_entries('(objectclass=ipaentitlement)',
['dn', 'userCertificate'],
'%s,%s' % (api.env.container_entitlements, api.env.basedn),
conn.SCOPE_ONELEVEL,
size_limit = -1)
for entry in entries:
(dn, attrs) = entry
if 'usercertificate' in attrs:
rawcert = attrs['usercertificate'][0]
rawcert = normalize_certificate(rawcert)
cert = make_pem(base64.b64encode(rawcert))
cert = EntitlementCertificate(cert)
order = cert.getOrder()
available += int(order.getQuantityUsed())
except errors.NotFound:
pass
conn.disconnect()
available += DEFAULT_ENTITLEMENTS
if hostcount > available:
syslog.syslog(syslog.LOG_ERR, 'IPA is out of compliance: %d of %d entitlements used.' % (hostcount, available))
if sys.stdin.isatty():
print 'IPA is out of compliance: %d of %d entitlements used.' % (hostcount, available)
else:
if sys.stdin.isatty():
# If run from the command-line display some info
print 'IPA is in compliance: %d of %d entitlements used.' % (hostcount, available)
def main():
if os.getegid() != 0:
sys.exit("Must be root to check compliance")
if not os.path.exists('/etc/ipa/default.conf'):
return 0
options, args = parse_options()
try:
tmpdir = tempfile.mkdtemp(prefix = "tmp-")
try:
check_compliance(tmpdir, options.debug)
finally:
shutil.rmtree(tmpdir)
except KeyboardInterrupt:
return 1
except (StandardError, errors.PublicError), e:
syslog.syslog(syslog.LOG_ERR, 'IPA compliance checking failed: %s' % str(e))
if sys.stdin.isatty():
print 'IPA compliance checking failed: %s' % str(e)
return 1
return 0
sys.exit(main())