Enable a host to retrieve a keytab for all its services.

Using the host service principal one should be able to retrieve a keytab
for other services for the host using ipa-getkeytab. This required a number
of changes:

- allow hosts in the service's managedby to write krbPrincipalKey
- automatically add the host to managedby when a service is created
- fix ipa-getkeytab to return the entire prinicpal and not just the
  first data element. It was returning "host" from the service tgt
  and not host/ipa.example.com
- fix the display of the managedby attribute in the service plugin

This led to a number of changes in the service unit tests. I took the
opportunity to switch to the Declarative scheme and tripled the number
of tests we were doing. This shed some light on a few bugs in the plugin:

- if a service had a bad usercertificate it was impossible to delete the
  service. I made it a bit more flexible.
- I added a summary for the mod and find commands
- has_keytab wasn't being set in the find output

ticket 68
This commit is contained in:
Rob Crittenden 2010-08-05 22:41:32 -04:00
parent 81ae7c3a60
commit 2f4f9054aa
5 changed files with 353 additions and 114 deletions

View File

@ -38,10 +38,12 @@ add: aci
aci: (targetattr="krbPrincipalName || krbCanonicalName || krbUPEnabled || krbPrincipalKey || krbTicketPolicyReference || krbPrincipalExpiration || krbPasswordExpiration || krbPwdPolicyReference || krbPrincipalType || krbPwdHistory || krbLastPwdChange || krbPrincipalAliases || krbExtraData")(version 3.0; acl "KDC System Account"; allow (read, search, compare, write) userdn="ldap:///uid=kdc,cn=sysaccounts,cn=etc,$SUFFIX";)
# Define which hosts can edit services
# The managedby attribute stores the DN of hosts that are allowed to manage
# a service. Use service-add-host to add hosts to a service.
dn: cn=services,cn=accounts,$SUFFIX
changetype: modify
add: aci
aci: (targetattr=userCertificate)(version 3.0; aci "Hosts can modify service userCertificate"; allow(write) userattr = "parent[0,1].managedby#USERDN";)
aci: (targetattr="userCertificate || krbPrincipalKey")(version 3.0; aci "Hosts can manage service Certificates and kerberos keys"; allow(write) userattr = "parent[0,1].managedby#USERDN";)
# Allow hosts to update their own certificate in host/
dn: cn=computers,cn=accounts,$SUFFIX

View File

@ -69,14 +69,19 @@ static int ldap_sasl_interact(LDAP *ld, unsigned flags, void *priv_data, void *s
sasl_interact_t *in = NULL;
int ret = LDAP_OTHER;
krb5_principal princ = (krb5_principal)priv_data;
krb5_context krbctx;
char *outname = NULL;
if (!ld) return LDAP_PARAM_ERROR;
krb5_init_context(&krbctx);
for (in = sit; in && in->id != SASL_CB_LIST_END; in++) {
switch(in->id) {
case SASL_CB_USER:
in->result = princ->data[0].data;
in->len = princ->data[0].length;
krb5_unparse_name(krbctx, princ, &outname);
in->result = outname;
in->len = strlen(outname);
ret = LDAP_SUCCESS;
break;
case SASL_CB_GETREALM:
@ -90,7 +95,8 @@ static int ldap_sasl_interact(LDAP *ld, unsigned flags, void *priv_data, void *s
ret = LDAP_OTHER;
}
}
return ret;
krb5_free_context(krbctx);
return ret;
}
static void free_keys_contents(krb5_context krbctx, struct keys_container *keys)
@ -809,19 +815,19 @@ int main(int argc, char *argv[])
}
if (NULL == bindpw) {
krberr = krb5_cc_default(krbctx, &ccache);
if (krberr) {
fprintf(stderr, "Kerberos Credential Cache not found\n"
"Do you have a Kerberos Ticket?\n");
exit(5);
}
krberr = krb5_cc_default(krbctx, &ccache);
if (krberr) {
fprintf(stderr, "Kerberos Credential Cache not found\n"
"Do you have a Kerberos Ticket?\n");
exit(5);
}
krberr = krb5_cc_get_principal(krbctx, ccache, &uprinc);
if (krberr) {
fprintf(stderr, "Kerberos User Principal not found\n"
"Do you have a valid Credential Cache?\n");
exit(6);
}
krberr = krb5_cc_get_principal(krbctx, ccache, &uprinc);
if (krberr) {
fprintf(stderr, "Kerberos User Principal not found\n"
"Do you have a valid Credential Cache?\n");
exit(6);
}
}
krberr = krb5_kt_resolve(krbctx, ktname, &kt);

View File

@ -65,8 +65,18 @@ from ipalib import Str, Flag, Bytes
from ipalib.plugins.baseldap import *
from ipalib import x509
from ipalib import _, ngettext
from nss.error import NSPRError
output_params = (
Flag('has_keytab',
label=_('Keytab'),
),
Str('managedby_host',
label='Managed by',
),
)
def split_principal(principal):
service = hostname = realm = None
@ -155,6 +165,7 @@ class service_add(LDAPCreate):
"""
msg_summary = _('Added service "%(value)s"')
member_attributes = ['managedby']
has_output_params = LDAPCreate.has_output_params + output_params
takes_options = (
Flag('force',
doc=_('force principal name even if not in DNS'),
@ -171,7 +182,7 @@ class service_add(LDAPCreate):
raise errors.HostService()
try:
api.Command['host_show'](hostname)
hostresult = api.Command['host_show'](hostname)['result']
except errors.NotFound:
raise errors.NotFound(reason="The host '%s' does not exist to add a service to." % hostname)
@ -188,6 +199,8 @@ class service_add(LDAPCreate):
# really want to discourage creating services for hosts that
# don't exist in DNS.
util.validate_host_dns(self.log, hostname)
if not 'managedby' in entry_attrs:
entry_attrs['managedby'] = hostresult['dn']
return dn
@ -206,18 +219,26 @@ class service_del(LDAPDelete):
cert = entry_attrs.get('usercertificate')
if cert:
cert = cert[0]
serial = unicode(x509.get_serial_number(cert, x509.DER))
try:
result = api.Command['cert_show'](unicode(serial))['result']
if 'revocation_reason' not in result:
try:
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
except errors.NotImplementedError:
# some CA's might not implement revoke
pass
except errors.NotImplementedError:
# some CA's might not implement revoke
pass
serial = unicode(x509.get_serial_number(cert, x509.DER))
try:
result = api.Command['cert_show'](unicode(serial))['result']
if 'revocation_reason' not in result:
try:
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
except errors.NotImplementedError:
# some CA's might not implement revoke
pass
except errors.NotImplementedError:
# some CA's might not implement revoke
pass
except NSPRError, nsprerr:
if nsprerr.errno == -8183:
# If we can't decode the cert them proceed with
# removing the service.
self.log.info("Problem decoding certificate %s" % nsprerr.args[1])
else:
raise nsprerr
return dn
api.register(service_del)
@ -227,6 +248,7 @@ class service_mod(LDAPUpdate):
"""
Modify service.
"""
msg_summary = _('Modified service "%(value)s"')
takes_options = LDAPUpdate.takes_options + (
Bytes('usercertificate?', validate_certificate,
cli_name='certificate',
@ -234,6 +256,7 @@ class service_mod(LDAPUpdate):
doc=_('Base-64 encoded server certificate'),
),
)
has_output_params = LDAPUpdate.has_output_params + output_params
member_attributes = ['managedby']
@ -261,6 +284,9 @@ class service_find(LDAPSearch):
"""
Search for services.
"""
msg_summary = ngettext(
'%(count)d service matched', '%(count)d services matched'
)
member_attributes = ['managedby']
takes_options = LDAPSearch.takes_options + (
Bytes('usercertificate?', validate_certificate,
@ -269,6 +295,7 @@ class service_find(LDAPSearch):
doc=_('Base-64 encoded server certificate'),
),
)
has_output_params = LDAPSearch.has_output_params + output_params
def pre_callback(self, ldap, filter, attrs_list, base_dn, *args, **options):
# lisp style!
custom_filter = '(&(objectclass=ipaService)' \
@ -282,6 +309,16 @@ class service_find(LDAPSearch):
(custom_filter, filter), rules=ldap.MATCH_ALL
)
def post_callback(self, ldap, entries, truncated, *args, **options):
for entry in entries:
entry_attrs = entry[1]
if 'krblastpwdchange' in entry_attrs:
entry_attrs['has_keytab'] = True
if not options.get('all', False):
del entry_attrs['krblastpwdchange']
else:
entry_attrs['has_keytab'] = False
api.register(service_find)
@ -297,11 +334,7 @@ class service_show(LDAPRetrieve):
doc=_('Base-64 encoded server certificate'),
),
)
has_output_params = (
Flag('has_keytab',
label=_('Keytab'),
)
)
has_output_params = LDAPRetrieve.has_output_params + output_params
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
if 'krblastpwdchange' in entry_attrs:
@ -320,6 +353,7 @@ class service_add_host(LDAPAddMember):
Add hosts that can manage this service.
"""
member_attributes = ['managedby']
has_output_params = LDAPAddMember.has_output_params + output_params
api.register(service_add_host)
@ -329,6 +363,7 @@ class service_remove_host(LDAPRemoveMember):
Remove hosts that can manage this service.
"""
member_attributes = ['managedby']
has_output_params = LDAPRemoveMember.has_output_params + output_params
api.register(service_remove_host)
@ -339,6 +374,7 @@ class service_disable(LDAPQuery):
"""
has_output = output.standard_value
msg_summary = _('Removed kerberos key from "%(value)s"')
has_output_params = LDAPQuery.has_output_params + output_params
def execute(self, *keys, **options):
ldap = self.obj.backend

View File

@ -300,6 +300,7 @@ class test_host(Declarative):
dn=service1dn,
krbprincipalname=[service1],
objectclass=objectclasses.service,
managedby_host=[fqdn1],
ipauniqueid=[fuzzy_uuid],
),
),
@ -321,7 +322,7 @@ class test_host(Declarative):
expected=dict(
count=0,
truncated=False,
summary=None,
summary=u'0 services matched',
result=[
],
),

View File

@ -21,94 +21,288 @@
Test the `ipalib/plugins/service.py` module.
"""
import sys
from xmlrpc_test import XMLRPC_test, assert_attr_equal
from ipalib import api
from ipalib import errors
from ipalib import api, errors
from tests.test_xmlrpc.xmlrpc_test import Declarative, fuzzy_uuid
from tests.test_xmlrpc import objectclasses
class test_service(XMLRPC_test):
"""
Test the `service` plugin.
"""
host = u'ipatest.%s' % api.env.domain
principal = u'HTTP/ipatest.%s@%s' % (api.env.domain, api.env.realm)
hostprincipal = u'host/ipatest.%s@%s' % (api.env.domain, api.env.realm)
kw = {'krbprincipalname': principal}
fqdn1 = u'testhost1.%s' % api.env.domain
fqdn2 = u'testhost2.%s' % api.env.domain
service1 = u'HTTP/%s@%s' % (fqdn1, api.env.realm)
hostprincipal1 = u'host/%s@%s' % (fqdn1, api.env.realm)
service1dn = u'krbprincipalname=%s,cn=services,cn=accounts,%s' % (service1.lower(), api.env.basedn)
host1dn = u'fqdn=%s,cn=computers,cn=accounts,%s' % (fqdn1, api.env.basedn)
host2dn = u'fqdn=%s,cn=computers,cn=accounts,%s' % (fqdn2, api.env.basedn)
def test_1_service_add(self):
"""
Test adding a HTTP principal using the `xmlrpc.service_add` method.
"""
self.failsafe_add(api.Object.host, self.host, force=True)
entry = self.failsafe_add(api.Object.service, self.principal, force=True)['result']
assert_attr_equal(entry, 'krbprincipalname', self.principal)
assert_attr_equal(entry, 'objectclass', 'ipaobject')
def test_2_service_add(self):
"""
Test adding a host principal using `xmlrpc.service_add`. Host
services are not allowed.
"""
kw = {'krbprincipalname': self.hostprincipal}
try:
api.Command['service_add'](**kw)
except errors.HostService:
pass
else:
assert False
class test_host(Declarative):
def test_3_service_add(self):
"""
Test adding a malformed principal ('foo').
"""
kw = {'krbprincipalname': u'foo', 'force': True}
try:
api.Command['service_add'](**kw)
except errors.MalformedServicePrincipal:
pass
else:
assert False
cleanup_commands = [
('host_del', [fqdn1], {}),
('host_del', [fqdn2], {}),
('service_del', [service1], {}),
]
def test_4_service_add(self):
"""
Test adding a malformed principal ('HTTP/foo@FOO.NET').
"""
kw = {'krbprincipalname': u'HTTP/foo@FOO.NET', 'force': True}
try:
api.Command['service_add'](**kw)
except errors.RealmMismatch:
pass
else:
assert False
tests = [
dict(
desc='Try to retrieve non-existent %r' % service1,
command=('service_show', [service1], {}),
expected=errors.NotFound(reason='no such entry'),
),
def test_5_service_show(self):
"""
Test the `xmlrpc.service_show` method.
"""
entry = api.Command['service_show'](self.principal)['result']
assert_attr_equal(entry, 'krbprincipalname', self.principal)
assert(entry['has_keytab'] == False)
def test_6_service_find(self):
"""
Test the `xmlrpc.service_find` method.
"""
entries = api.Command['service_find'](self.principal)['result']
assert_attr_equal(entries[0], 'krbprincipalname', self.principal)
dict(
desc='Try to update non-existent %r' % service1,
command=('service_mod', [service1], dict(usercertificate='Nope')),
expected=errors.NotFound(reason='no such entry'),
),
def test_7_service_del(self):
"""
Test the `xmlrpc.service_del` method.
"""
assert api.Command['service_del'](self.principal)['result'] is True
# Verify that it is gone
try:
api.Command['service_show'](self.principal)
except errors.NotFound:
pass
else:
assert False
dict(
desc='Try to delete non-existent %r' % service1,
command=('service_del', [service1], {}),
expected=errors.NotFound(reason='no such entry'),
),
api.Command['host_del'](self.host)
dict(
desc='Create %r' % fqdn1,
command=('host_add', [fqdn1],
dict(
description=u'Test host 1',
l=u'Undisclosed location 1',
force=True,
),
),
expected=dict(
value=fqdn1,
summary=u'Added host "%s"' % fqdn1,
result=dict(
dn=host1dn,
fqdn=[fqdn1],
description=[u'Test host 1'],
l=[u'Undisclosed location 1'],
krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
objectclass=objectclasses.host,
ipauniqueid=[fuzzy_uuid],
),
),
),
dict(
desc='Create %r' % fqdn2,
command=('host_add', [fqdn2],
dict(
description=u'Test host 2',
l=u'Undisclosed location 2',
force=True,
),
),
expected=dict(
value=fqdn2,
summary=u'Added host "%s"' % fqdn2,
result=dict(
dn=host2dn,
fqdn=[fqdn2],
description=[u'Test host 2'],
l=[u'Undisclosed location 2'],
krbprincipalname=[u'host/%s@%s' % (fqdn2, api.env.realm)],
objectclass=objectclasses.host,
ipauniqueid=[fuzzy_uuid],
),
),
),
dict(
desc='Create %r' % service1,
command=('service_add', [service1],
dict(
force=True,
),
),
expected=dict(
value=service1,
summary=u'Added service "%s"' % service1,
result=dict(
dn=service1dn,
krbprincipalname=[service1],
objectclass=objectclasses.service,
ipauniqueid=[fuzzy_uuid],
managedby_host=[fqdn1],
),
),
),
dict(
desc='Try to create duplicate %r' % service1,
command=('service_add', [service1],
dict(
force=True,
),
),
expected=errors.DuplicateEntry(),
),
dict(
desc='Retrieve %r' % service1,
command=('service_show', [service1], {}),
expected=dict(
value=service1,
summary=None,
result=dict(
dn=service1dn,
krbprincipalname=[service1],
has_keytab=False,
managedby_host=[fqdn1],
),
),
),
dict(
desc='Retrieve %r with all=True' % service1,
command=('service_show', [service1], dict(all=True)),
expected=dict(
value=service1,
summary=None,
result=dict(
dn=service1dn,
krbprincipalname=[service1],
objectclass=objectclasses.service,
ipauniqueid=[fuzzy_uuid],
managedby_host=[fqdn1],
has_keytab=False
),
),
),
dict(
desc='Search for %r' % service1,
command=('service_find', [service1], {}),
expected=dict(
count=1,
truncated=False,
summary=u'1 service matched',
result=[
dict(
dn=service1dn,
krbprincipalname=[service1],
managedby_host=[fqdn1],
has_keytab=False,
),
],
),
),
dict(
desc='Search for %r with all=True' % service1,
command=('service_find', [service1], dict(all=True)),
expected=dict(
count=1,
truncated=False,
summary=u'1 service matched',
result=[
dict(
dn=service1dn,
krbprincipalname=[service1],
objectclass=objectclasses.service,
ipauniqueid=[fuzzy_uuid],
has_keytab=False,
managedby_host=[fqdn1],
),
],
),
),
dict(
desc='Update %r' % service1,
command=('service_mod', [service1], dict(usercertificate='aGVsbG8=')),
expected=dict(
value=service1,
summary=u'Modified service "%s"' % service1,
result=dict(
usercertificate=['hello'],
krbprincipalname=[service1],
managedby_host=[fqdn1],
),
),
),
dict(
desc='Retrieve %r to verify update' % service1,
command=('service_show', [service1], {}),
expected=dict(
value=service1,
summary=None,
result=dict(
dn=service1dn,
usercertificate=['hello'],
krbprincipalname=[service1],
has_keytab=False,
managedby_host=[fqdn1],
),
),
),
dict(
desc='Delete %r' % service1,
command=('service_del', [service1], {}),
expected=dict(
value=service1,
summary=u'Deleted service "%s"' % service1,
result=True,
),
),
dict(
desc='Try to retrieve non-existent %r' % service1,
command=('service_show', [service1], {}),
expected=errors.NotFound(reason='no such entry'),
),
dict(
desc='Try to update non-existent %r' % service1,
command=('service_mod', [service1], dict(usercertificate='Nope')),
expected=errors.NotFound(reason='no such entry'),
),
dict(
desc='Try to delete non-existent %r' % service1,
command=('service_del', [service1], {}),
expected=errors.NotFound(reason='no such entry'),
),
dict(
desc='Create service with malformed principal "foo"',
command=('service_add', [u'foo'], {}),
expected=errors.MalformedServicePrincipal(reason='missing service')
),
dict(
desc='Create service with bad realm "HTTP/foo@FOO.NET"',
command=('service_add', [u'HTTP/foo@FOO.NET'], {}),
expected=errors.RealmMismatch(),
),
dict(
desc='Create a host service %r' % hostprincipal1,
command=('service_add', [hostprincipal1], {}),
expected=errors.HostService()
),
]