Add API to delete a service principal key, service-disable.

I have to do some pretty low-level LDAP work to achieve this. Since
we can't read the key using our modlist generator won't work and lots of
tricks would be needed to use the LDAPUpdate object in any case.

I pulled usercertificate out of the global params and put into each
appropriate function because it makes no sense for service-disable.

This also adds a new variable, has_keytab, to service/host_show output.
This flag tells us whether there is a krbprincipalkey.
This commit is contained in:
Rob Crittenden 2010-07-12 17:45:06 -04:00
parent c9e0b43d53
commit 1e1985b17c
6 changed files with 149 additions and 7 deletions

View File

@ -57,6 +57,9 @@ EXAMPLES:
Update information about a host
ipa host-mod --os='Fedora 12' test.example.com
Disable the host kerberos key
ipa host-disable test.example.com
"""
import platform
@ -91,9 +94,14 @@ class host(LDAPObject):
object_name_plural = 'hosts'
object_class = ['ipaobject', 'nshost', 'ipahost', 'pkiuser', 'ipaservice']
# object_class_config = 'ipahostobjectclasses'
search_attributes = [
'fqdn', 'description', 'l', 'nshostlocation', 'krbprincipalname',
'nshardwareplatform', 'nsosversion',
]
default_attributes = [
'fqdn', 'description', 'l', 'nshostlocation', 'krbprincipalname',
'nshardwareplatform', 'nsosversion', 'usercertificate', 'memberof',
'krblastpwdchange',
]
uuid_attribute = 'ipauniqueid'
attribute_members = {
@ -316,5 +324,47 @@ class host_show(LDAPRetrieve):
"""
Display host.
"""
has_output_params = (
Flag('has_keytab',
label=_('Keytab'),
)
)
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
if 'krblastpwdchange' in entry_attrs:
entry_attrs['has_keytab'] = True
if not options.get('all', False):
del entry_attrs['krblastpwdchange']
else:
entry_attrs['has_keytab'] = False
return dn
api.register(host_show)
class host_disable(LDAPQuery):
"""
Disable the kerberos key of this host.
"""
has_output = output.standard_value
msg_summary = _('Removed kerberos key from "%(value)s"')
def execute(self, *keys, **options):
ldap = self.obj.backend
dn = self.obj.get_dn(*keys, **options)
(dn, entry_attrs) = ldap.get_entry(dn, ['krblastpwdchange'])
if 'krblastpwdchange' not in entry_attrs:
error_msg = _('Host principal has no kerberos key')
raise errors.NotFound(reason=error_msg)
ldap.remove_principal_key(dn)
return dict(
result=True,
value=keys[0],
)
api.register(host_disable)

View File

@ -53,6 +53,10 @@ EXAMPLES:
Find all HTTP services:
ipa service-find HTTP
Disable a service kerberos key:
ipa service-disable HTTP/web.example.com
"""
import base64
@ -140,7 +144,8 @@ class service(LDAPObject):
'krbprincipal', 'krbprincipalaux', 'krbticketpolicyaux', 'ipaobject',
'ipaservice', 'pkiuser'
]
default_attributes = ['krbprincipalname', 'usercertificate', 'managedby']
search_attributes = ['krbprincipalname', 'managedby']
default_attributes = ['krbprincipalname', 'usercertificate', 'managedby', 'krblastpwdchange']
uuid_attribute = 'ipauniqueid'
attribute_members = {
'managedby': ['host'],
@ -156,11 +161,6 @@ class service(LDAPObject):
primary_key=True,
normalizer=lambda value: normalize_principal(value),
),
Bytes('usercertificate?', validate_certificate,
cli_name='certificate',
label=_('Certificate'),
doc=_('Base-64 encoded server certificate'),
),
)
api.register(service)
@ -176,6 +176,11 @@ class service_add(LDAPCreate):
Flag('force',
doc=_('force principal name even if not in DNS'),
),
Bytes('usercertificate?', validate_certificate,
cli_name='certificate',
label=_('Certificate'),
doc=_('Base-64 encoded server certificate'),
),
)
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
(service, hostname, realm) = split_principal(keys[-1])
@ -245,9 +250,18 @@ class service_mod(LDAPUpdate):
"""
Modify service.
"""
takes_options = LDAPUpdate.takes_options + (
Bytes('usercertificate?', validate_certificate,
cli_name='certificate',
label=_('Certificate'),
doc=_('Base-64 encoded server certificate'),
),
)
member_attributes = ['managedby']
def pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
cert = entry_attrs.get('usercertificate')
cert = options.get('usercertificate')
if cert:
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
if 'usercertificate' in entry_attrs_old:
@ -268,6 +282,13 @@ class service_find(LDAPSearch):
Search for services.
"""
member_attributes = ['managedby']
takes_options = LDAPSearch.takes_options + (
Bytes('usercertificate?', validate_certificate,
cli_name='certificate',
label=_('Certificate'),
doc=_('Base-64 encoded server certificate'),
),
)
def pre_callback(self, ldap, filter, attrs_list, base_dn, *args, **options):
# lisp style!
custom_filter = '(&(objectclass=ipaService)' \
@ -289,6 +310,28 @@ class service_show(LDAPRetrieve):
Display service.
"""
member_attributes = ['managedby']
takes_options = LDAPRetrieve.takes_options + (
Bytes('usercertificate?', validate_certificate,
cli_name='certificate',
label=_('Certificate'),
doc=_('Base-64 encoded server certificate'),
),
)
has_output_params = (
Flag('has_keytab',
label=_('Keytab'),
)
)
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
if 'krblastpwdchange' in entry_attrs:
entry_attrs['has_keytab'] = True
if not options.get('all', False):
del entry_attrs['krblastpwdchange']
else:
entry_attrs['has_keytab'] = False
return dn
api.register(service_show)
@ -308,3 +351,30 @@ class service_remove_host(LDAPRemoveMember):
member_attributes = ['managedby']
api.register(service_remove_host)
class service_disable(LDAPQuery):
"""
Disable the kerberos key of this service.
"""
has_output = output.standard_value
msg_summary = _('Removed kerberos key from "%(value)s"')
def execute(self, *keys, **options):
ldap = self.obj.backend
dn = self.obj.get_dn(*keys, **options)
(dn, entry_attrs) = ldap.get_entry(dn, ['krblastpwdchange'])
if 'krblastpwdchange' not in entry_attrs:
error_msg = _('Service principal has no kerberos key')
raise errors.NotFound(reason=error_msg)
ldap.remove_principal_key(dn)
return dict(
result=True,
value=keys[0],
)
api.register(service_disable)

View File

@ -825,6 +825,22 @@ class ldap2(CrudBackend, Encoder):
"""Mark entry inactive."""
self.set_entry_active(dn, False)
def remove_principal_key(self, dn):
"""Remove a kerberos principal key."""
dn = self.normalize_dn(dn)
# We need to do this directly using the LDAP library because we
# don't have read access to krbprincipalkey so we need to delete
# it in the blind.
mod = [(_ldap.MOD_REPLACE, 'krbprincipalkey', None),
(_ldap.MOD_REPLACE, 'krblastpwdchange', None)]
try:
self.conn.modify_s(dn, mod)
except _ldap.LDAPError, e:
self._handle_errors(e, **{})
# CrudBackend methods
def _get_normalized_entry_for_crud(self, dn, attrs_list=None):

View File

@ -112,6 +112,7 @@ class test_host(Declarative):
description=[u'Test host 1'],
l=[u'Undisclosed location 1'],
krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
has_keytab=False
),
),
),
@ -138,6 +139,7 @@ class test_host(Declarative):
objectclass=objectclasses.host,
managedby=[dn1],
ipauniqueid=[fuzzy_uuid],
has_keytab=False
),
),
),
@ -220,6 +222,7 @@ class test_host(Declarative):
description=[u'Updated host 1'],
l=[u'Undisclosed location 1'],
krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
has_keytab=False
),
),
),

View File

@ -93,6 +93,7 @@ class test_service(XMLRPC_test):
"""
entry = api.Command['service_show'](self.principal)['result']
assert_attr_equal(entry, 'krbprincipalname', self.principal)
assert(entry['has_keytab'] == False)
def test_6_service_find(self):
"""

View File

@ -46,6 +46,8 @@ try:
res = api.Command['user_show'](u'notfound')
except errors.NetworkError:
server_available = False
except IOError:
server_available = False
except errors.NotFound:
server_available = True