Store session cookie in ccache for cli users

Try to use the URI /ipa/session/xml if there is a key in the kernel
keyring. If there is no cookie or it turns out to be invalid (expired,
whatever) then use the standard URI /ipa/xml. This in turn will create
a session that the user can then use later.

https://fedorahosted.org/freeipa/ticket/2331
This commit is contained in:
Rob Crittenden
2012-06-06 22:54:16 -04:00
committed by Martin Kosek
parent 0c96f59356
commit 54135ecd9a
7 changed files with 499 additions and 79 deletions

View File

@@ -154,6 +154,7 @@ Requires(preun): python initscripts chkconfig
Requires(postun): python initscripts chkconfig Requires(postun): python initscripts chkconfig
%endif %endif
Requires: python-dns Requires: python-dns
Requires: keyutils
# We have a soft-requires on bind. It is an optional part of # We have a soft-requires on bind. It is an optional part of
# IPA but if it is configured we need a way to require versions # IPA but if it is configured we need a way to require versions

View File

@@ -1,5 +1,7 @@
# #
# VERSION 5 - DO NOT REMOVE THIS LINE # VERSION 6 - DO NOT REMOVE THIS LINE
#
# This file may be overwritten on upgrades.
# #
# LoadModule auth_kerb_module modules/mod_auth_kerb.so # LoadModule auth_kerb_module modules/mod_auth_kerb.so
@@ -66,6 +68,12 @@ KrbConstrainedDelegationLock ipa
Allow from all Allow from all
</Location> </Location>
<Location "/ipa/session/xml">
Satisfy Any
Order Deny,Allow
Allow from all
</Location>
<Location "/ipa/session/login_password"> <Location "/ipa/session/login_password">
Satisfy Any Satisfy Any
Order Deny,Allow Order Deny,Allow

View File

@@ -47,6 +47,7 @@ from ipalib.errors import public_errors, PublicError, UnknownError, NetworkError
from ipalib import errors from ipalib import errors
from ipalib.request import context, Connection from ipalib.request import context, Connection
from ipapython import ipautil from ipapython import ipautil
from ipapython import kernel_keyring
import httplib import httplib
import socket import socket
@@ -257,6 +258,13 @@ class SSLTransport(LanguageAwareTransport):
conn.connect() conn.connect()
return conn return conn
def parse_response(self, response):
session_cookie = response.getheader('Set-Cookie')
if session_cookie:
kernel_keyring.update_key('ipa_session_cookie', session_cookie)
return LanguageAwareTransport.parse_response(self, response)
class KerbTransport(SSLTransport): class KerbTransport(SSLTransport):
""" """
Handles Kerberos Negotiation authentication to an XML-RPC server. Handles Kerberos Negotiation authentication to an XML-RPC server.
@@ -281,8 +289,20 @@ class KerbTransport(SSLTransport):
raise errors.KerberosError(major=major, minor=minor) raise errors.KerberosError(major=major, minor=minor)
def get_host_info(self, host): def get_host_info(self, host):
"""
Two things can happen here. If we have a session we will add
a cookie for that. If not we will set an Authorization header.
"""
(host, extra_headers, x509) = SSLTransport.get_host_info(self, host) (host, extra_headers, x509) = SSLTransport.get_host_info(self, host)
if not isinstance(extra_headers, list):
extra_headers = []
session_data = getattr(context, 'session_data', None)
if session_data:
extra_headers.append(('Cookie', session_data))
return (host, extra_headers, x509)
# Set the remote host principal # Set the remote host principal
service = "HTTP@" + host.split(':')[0] service = "HTTP@" + host.split(':')[0]
@@ -296,9 +316,6 @@ class KerbTransport(SSLTransport):
except kerberos.GSSError, e: except kerberos.GSSError, e:
self._handle_exception(e, service=service) self._handle_exception(e, service=service)
if not isinstance(extra_headers, list):
extra_headers = []
for (h, v) in extra_headers: for (h, v) in extra_headers:
if h == 'Authorization': if h == 'Authorization':
extra_headers.remove((h, v)) extra_headers.remove((h, v))
@@ -345,12 +362,12 @@ class xmlclient(Connectible):
server = '%s://%s%s' % (scheme, ipautil.format_netloc(self.conn._ServerProxy__host), self.conn._ServerProxy__handler) server = '%s://%s%s' % (scheme, ipautil.format_netloc(self.conn._ServerProxy__host), self.conn._ServerProxy__handler)
return server return server
def get_url_list(self): def get_url_list(self, xmlrpc_uri):
""" """
Create a list of urls consisting of the available IPA servers. Create a list of urls consisting of the available IPA servers.
""" """
# the configured URL defines what we use for the discovered servers # the configured URL defines what we use for the discovered servers
(scheme, netloc, path, params, query, fragment) = urlparse.urlparse(self.env.xmlrpc_uri) (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(xmlrpc_uri)
servers = [] servers = []
name = '_ldap._tcp.%s.' % self.env.domain name = '_ldap._tcp.%s.' % self.env.domain
@@ -366,7 +383,7 @@ class xmlclient(Connectible):
servers = list(set(servers)) servers = list(set(servers))
# the list/set conversion won't preserve order so stick in the # the list/set conversion won't preserve order so stick in the
# local config file version here. # local config file version here.
cfg_server = self.env.xmlrpc_uri cfg_server = xmlrpc_uri
if cfg_server in servers: if cfg_server in servers:
# make sure the configured master server is there just once and # make sure the configured master server is there just once and
# it is the first one # it is the first one
@@ -379,7 +396,22 @@ class xmlclient(Connectible):
def create_connection(self, ccache=None, verbose=False, fallback=True, def create_connection(self, ccache=None, verbose=False, fallback=True,
delegate=False): delegate=False):
servers = self.get_url_list() try:
session = False
session_data = None
xmlrpc_uri = self.env.xmlrpc_uri
# We have a session cookie, try using the session URI to see if it
# is still valid
if not delegate:
session_data = kernel_keyring.read_key('ipa_session_cookie')
setattr(context, 'session_data', session_data)
(scheme, netloc, path, params, query, fragment) = urlparse.urlparse(self.env.xmlrpc_uri)
xmlrpc_uri = urlparse.urlunparse((scheme, netloc, '/ipa/session/xml', params, query, fragment))
session = True
except ValueError:
# No session key, do full Kerberos auth
pass
servers = self.get_url_list(xmlrpc_uri)
serverproxy = None serverproxy = None
for server in servers: for server in servers:
kw = dict(allow_none=True, encoding='UTF-8') kw = dict(allow_none=True, encoding='UTF-8')
@@ -393,9 +425,10 @@ class xmlclient(Connectible):
kw['transport'] = LanguageAwareTransport() kw['transport'] = LanguageAwareTransport()
self.log.info('trying %s' % server) self.log.info('trying %s' % server)
serverproxy = ServerProxy(server, **kw) serverproxy = ServerProxy(server, **kw)
if len(servers) == 1 or not fallback: if len(servers) == 1:
# if we have only 1 server to try then let the main # if we have only 1 server and then let the
# requester handle any errors # main requester handle any errors. This also means it
# must handle a 401 but we save a ping.
return serverproxy return serverproxy
try: try:
command = getattr(serverproxy, 'ping') command = getattr(serverproxy, 'ping')
@@ -417,9 +450,23 @@ class xmlclient(Connectible):
except KerberosError, krberr: except KerberosError, krberr:
# kerberos error on one server is likely on all # kerberos error on one server is likely on all
raise errors.KerberosError(major=str(krberr), minor='') raise errors.KerberosError(major=str(krberr), minor='')
except ProtocolError, e:
if session_data and e.errcode == 401:
# Unauthorized. Remove the session and try again.
try:
kernel_keyring.del_key('ipa_session_cookie')
delattr(context, 'session_data')
except ValueError:
# This shouldn't happen if we have a session but
# it isn't fatal.
pass
return self.create_connection(ccache, verbose, fallback, delegate)
if not fallback:
raise
serverproxy = None
except Exception, e: except Exception, e:
if not fallback: if not fallback:
raise e raise
serverproxy = None serverproxy = None
if serverproxy is None: if serverproxy is None:
@@ -466,6 +513,22 @@ class xmlclient(Connectible):
except NSPRError, e: except NSPRError, e:
raise NetworkError(uri=server, error=str(e)) raise NetworkError(uri=server, error=str(e))
except ProtocolError, e: except ProtocolError, e:
# By catching a 401 here we can detect the case where we have
# a single IPA server and the session is invalid. Otherwise
# we always have to do a ping().
session_data = getattr(context, 'session_data', None)
if session_data and e.errcode == 401:
# Unauthorized. Remove the session and try again.
try:
kernel_keyring.del_key('ipa_session_cookie')
delattr(context, 'session_data')
except ValueError:
# This shouldn't happen if we have a session but
# it isn't fatal.
pass
serverproxy = self.create_connection(os.environ.get('KRB5CCNAME'), self.env.verbose, self.env.fallback, self.env.delegate)
setattr(context, self.id, Connection(serverproxy, self.disconnect))
return self.forward(name, *args, **kw)
raise NetworkError(uri=server, error=e.errmsg) raise NetworkError(uri=server, error=e.errmsg)
except socket.error, e: except socket.error, e:
raise NetworkError(uri=server, error=str(e)) raise NetworkError(uri=server, error=str(e))

102
ipapython/kernel_keyring.py Normal file
View File

@@ -0,0 +1,102 @@
# Authors: Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2012 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ipapython.ipautil import run
# NOTE: Absolute path not required for keyctl since we reset the environment
# in ipautil.run.
# Use the session keyring so the same user can have a different principal
# in different shells. This was explicitly chosen over @us because then
# it is not possible to use KRB5CCNAME to have a different user principal.
# The same session would always be used and the first principal would
# always win.
KEYRING = '@s'
KEYTYPE = 'user'
def dump_keys():
"""
Dump all keys
"""
(stdout, stderr, rc) = run(['keyctl', 'list', KEYRING], raiseonerr=False)
return stdout
def get_real_key(key):
"""
One cannot request a key based on the description it was created with
so find the one we're looking for.
"""
(stdout, stderr, rc) = run(['keyctl', 'search', KEYRING, KEYTYPE, key], raiseonerr=False)
if rc:
raise ValueError('key %s not found' % key)
return stdout.rstrip()
def has_key(key):
"""
Returns True/False whether the key exists in the keyring.
"""
try:
get_real_key(key)
return True
except ValueError:
return False
def read_key(key):
"""
Read the keyring and return the value for key.
Use pipe instead of print here to ensure we always get the raw data.
"""
real_key = get_real_key(key)
(stdout, stderr, rc) = run(['keyctl', 'pipe', real_key], raiseonerr=False)
if rc:
raise ValueError('keyctl pipe failed: %s' % stderr)
return stdout
def update_key(key, value):
"""
Update the keyring data. If they key doesn't exist it is created.
"""
if has_key(key):
real_key = get_real_key(key)
(stdout, stderr, rc) = run(['keyctl', 'pupdate', real_key], stdin=value, raiseonerr=False)
if rc:
raise ValueError('keyctl pupdate failed: %s' % stderr)
else:
add_key(key, value)
def add_key(key, value):
"""
Add a key to the kernel keyring.
"""
if has_key(key):
raise ValueError('key %s already exists' % key)
(stdout, stderr, rc) = run(['keyctl', 'padd', KEYTYPE, key, KEYRING], stdin=value, raiseonerr=False)
if rc:
raise ValueError('keyctl padd failed: %s' % stderr)
def del_key(key):
"""
Remove a key from the keyring
"""
real_key = get_real_key(key)
(stdout, stderr, rc) = run(['keyctl', 'unlink', real_key, KEYRING], raiseonerr=False)
if rc:
raise ValueError('keyctl unlink failed: %s' % stderr)

View File

@@ -25,7 +25,7 @@ Loads WSGI server plugins.
from ipalib import api from ipalib import api
if 'in_server' in api.env and api.env.in_server is True: if 'in_server' in api.env and api.env.in_server is True:
from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password, change_password from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password, change_password, xmlserver_session
api.register(wsgi_dispatch) api.register(wsgi_dispatch)
api.register(xmlserver) api.register(xmlserver)
api.register(jsonserver_kerb) api.register(jsonserver_kerb)
@@ -33,3 +33,4 @@ if 'in_server' in api.env and api.env.in_server is True:
api.register(login_kerberos) api.register(login_kerberos)
api.register(login_password) api.register(login_password)
api.register(change_password) api.register(change_password)
api.register(xmlserver_session)

View File

@@ -395,72 +395,6 @@ class WSGIExecutioner(Executioner):
raise NotImplementedError('%s.marshal()' % self.fullname) raise NotImplementedError('%s.marshal()' % self.fullname)
class xmlserver(WSGIExecutioner, HTTP_Status):
"""
Execution backend plugin for XML-RPC server.
Also see the `ipalib.rpc.xmlclient` plugin.
"""
content_type = 'text/xml'
key = '/xml'
def _on_finalize(self):
self.__system = {
'system.listMethods': self.listMethods,
'system.methodSignature': self.methodSignature,
'system.methodHelp': self.methodHelp,
}
super(xmlserver, self)._on_finalize()
def __call__(self, environ, start_response):
'''
'''
self.debug('WSGI xmlserver.__call__:')
user_ccache=environ.get('KRB5CCNAME')
if user_ccache is None:
self.internal_error(environ, start_response,
'xmlserver.__call__: KRB5CCNAME not defined in HTTP request environment')
return self.marshal(None, CCacheError())
try:
self.create_context(ccache=user_ccache)
response = super(xmlserver, self).__call__(environ, start_response)
except PublicError, e:
status = HTTP_STATUS_SUCCESS
response = status
headers = [('Content-Type', 'text/plain; charset=utf-8')]
start_response(status, headers)
return self.marshal(None, e)
finally:
destroy_context()
return response
def listMethods(self, *params):
return tuple(name.decode('UTF-8') for name in self.Command)
def methodSignature(self, *params):
return u'methodSignature not implemented'
def methodHelp(self, *params):
return u'methodHelp not implemented'
def unmarshal(self, data):
(params, name) = xml_loads(data)
(args, options) = params_2_args_options(params)
return (name, args, options, None)
def marshal(self, result, error, _id=None):
if error:
self.debug('response: %s: %s', error.__class__.__name__, str(error))
response = Fault(error.errno, error.strerror)
else:
if isinstance(result, dict):
self.debug('response: entries returned %d', result.get('count', 1))
response = (result,)
return xml_dumps(response, methodresponse=True)
def json_encode_binary(val): def json_encode_binary(val):
''' '''
JSON cannot encode binary values. We encode binary values in Python str JSON cannot encode binary values. We encode binary values in Python str
@@ -757,6 +691,76 @@ class KerberosSession(object):
return [''] return ['']
class xmlserver(WSGIExecutioner, HTTP_Status, KerberosSession):
"""
Execution backend plugin for XML-RPC server.
Also see the `ipalib.rpc.xmlclient` plugin.
"""
content_type = 'text/xml'
key = '/xml'
def _on_finalize(self):
self.__system = {
'system.listMethods': self.listMethods,
'system.methodSignature': self.methodSignature,
'system.methodHelp': self.methodHelp,
}
super(xmlserver, self)._on_finalize()
self.kerb_session_on_finalize()
def __call__(self, environ, start_response):
'''
'''
self.debug('WSGI xmlserver.__call__:')
user_ccache=environ.get('KRB5CCNAME')
if user_ccache is None:
self.internal_error(environ, start_response,
'xmlserver.__call__: KRB5CCNAME not defined in HTTP request environment')
return self.marshal(None, CCacheError())
try:
self.create_context(ccache=user_ccache)
response = super(xmlserver, self).__call__(environ, start_response)
if getattr(context, 'session_data', None) is None and \
self.env.context != 'lite':
self.finalize_kerberos_acquisition('xmlserver', user_ccache, environ, start_response)
except PublicError, e:
status = HTTP_STATUS_SUCCESS
response = status
headers = [('Content-Type', 'text/plain; charset=utf-8')]
start_response(status, headers)
return self.marshal(None, e)
finally:
destroy_context()
return response
def listMethods(self, *params):
return tuple(name.decode('UTF-8') for name in self.Command)
def methodSignature(self, *params):
return u'methodSignature not implemented'
def methodHelp(self, *params):
return u'methodHelp not implemented'
def unmarshal(self, data):
(params, name) = xml_loads(data)
(args, options) = params_2_args_options(params)
return (name, args, options, None)
def marshal(self, result, error, _id=None):
if error:
self.debug('response: %s: %s', error.__class__.__name__, str(error))
response = Fault(error.errno, error.strerror)
else:
if isinstance(result, dict):
self.debug('response: entries returned %d', result.get('count', 1))
response = (result,)
return xml_dumps(response, methodresponse=True)
class jsonserver_session(jsonserver, KerberosSession): class jsonserver_session(jsonserver, KerberosSession):
""" """
JSON RPC server protected with session auth. JSON RPC server protected with session auth.
@@ -1098,3 +1102,97 @@ class change_password(Backend, HTTP_Status):
output = _pwchange_template % dict(title=str(title), output = _pwchange_template % dict(title=str(title),
message=str(message)) message=str(message))
return [output] return [output]
class xmlserver_session(xmlserver, KerberosSession):
"""
XML RPC server protected with session auth.
"""
key = '/session/xml'
def __init__(self):
super(xmlserver_session, self).__init__()
auth_mgr = AuthManagerKerb(self.__class__.__name__)
session_mgr.auth_mgr.register(auth_mgr.name, auth_mgr)
def _on_finalize(self):
super(xmlserver_session, self)._on_finalize()
self.kerb_session_on_finalize()
def need_login(self, start_response):
status = '401 Unauthorized'
headers = []
response = ''
self.debug('xmlserver_session: %s need login', status)
start_response(status, headers)
return [response]
def __call__(self, environ, start_response):
'''
'''
self.debug('WSGI xmlserver_session.__call__:')
# Load the session data
session_data = session_mgr.load_session_data(environ.get('HTTP_COOKIE'))
session_id = session_data['session_id']
self.debug('xmlserver_session.__call__: session_id=%s start_timestamp=%s access_timestamp=%s expiration_timestamp=%s',
session_id,
fmt_time(session_data['session_start_timestamp']),
fmt_time(session_data['session_access_timestamp']),
fmt_time(session_data['session_expiration_timestamp']))
ccache_data = session_data.get('ccache_data')
# Redirect to /ipa/xml if no Kerberos credentials
if ccache_data is None:
self.debug('xmlserver_session.__call_: no ccache, need TGT')
return self.need_login(start_response)
ipa_ccache_name = bind_ipa_ccache(ccache_data)
# Redirect to /ipa/xml if Kerberos credentials are expired
cc = KRB5_CCache(ipa_ccache_name)
if not cc.valid(self.api.env.host, self.api.env.realm):
self.debug('xmlserver_session.__call_: ccache expired, deleting session, need login')
# The request is finished with the ccache, destroy it.
release_ipa_ccache(ipa_ccache_name)
return self.need_login(start_response)
# Update the session expiration based on the Kerberos expiration
endtime = cc.endtime(self.api.env.host, self.api.env.realm)
self.update_session_expiration(session_data, endtime)
# Store the session data in the per-thread context
setattr(context, 'session_data', session_data)
environ['KRB5CCNAME'] = ipa_ccache_name
try:
response = super(xmlserver_session, self).__call__(environ, start_response)
finally:
# Kerberos may have updated the ccache data during the
# execution of the command therefore we need refresh our
# copy of it in the session data so the next command sees
# the same state of the ccache.
#
# However we must be careful not to restore the ccache
# data in the session data if it was explicitly deleted
# during the execution of the command. For example the
# logout command removes the ccache data from the session
# data to invalidate the session credentials.
if session_data.has_key('ccache_data'):
session_data['ccache_data'] = load_ccache_data(ipa_ccache_name)
# The request is finished with the ccache, destroy it.
release_ipa_ccache(ipa_ccache_name)
# Store the session data.
session_mgr.store_session_data(session_data)
destroy_context()
return response

View File

@@ -0,0 +1,147 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2012 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Test the `kernel_keyring.py` module.
"""
from nose.tools import raises, assert_raises # pylint: disable=E0611
from ipapython import kernel_keyring
TEST_KEY = 'ipa_test'
TEST_VALUE = 'abc123'
UPDATE_VALUE = '123abc'
SIZE_256 = 'abcdefgh' * 32
SIZE_512 = 'abcdefgh' * 64
SIZE_1024 = 'abcdefgh' * 128
class test_keyring(object):
"""
Test the kernel keyring interface
"""
def setUp(self):
try:
kernel_keyring.del_key(TEST_KEY)
except ValueError:
pass
try:
kernel_keyring.del_key(SIZE_256)
except ValueError:
pass
def test_01(self):
"""
Add a new key and value, then remove it
"""
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
result = kernel_keyring.read_key(TEST_KEY)
assert(result == TEST_VALUE)
kernel_keyring.del_key(TEST_KEY)
# Make sure it is gone
try:
result = kernel_keyring.read_key(TEST_KEY)
except ValueError, e:
assert e.message == 'key %s not found' % TEST_KEY
def test_02(self):
"""
Delete a non_existent key
"""
try:
kernel_keyring.del_key(TEST_KEY)
raise AssertionError('key should not have been deleted')
except ValueError:
pass
@raises(ValueError)
def test_03(self):
"""
Add a duplicate key
"""
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
def test_04(self):
"""
Update the value in a key
"""
kernel_keyring.update_key(TEST_KEY, UPDATE_VALUE)
result = kernel_keyring.read_key(TEST_KEY)
assert(result == UPDATE_VALUE)
# Now update it 10 times
for i in xrange(10):
kernel_keyring.update_key(TEST_KEY, 'test %d' % i)
result = kernel_keyring.read_key(TEST_KEY)
assert(result == 'test %d' % i)
kernel_keyring.del_key(TEST_KEY)
@raises(ValueError)
def test_05(self):
"""
Read a non-existent key
"""
result = kernel_keyring.read_key(TEST_KEY)
def test_06(self):
"""
See if a key is available
"""
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
result = kernel_keyring.has_key(TEST_KEY)
assert(result == True)
kernel_keyring.del_key(TEST_KEY)
result = kernel_keyring.has_key(TEST_KEY)
assert(result == False)
def test_07(self):
"""
Test a 256-byte key
"""
kernel_keyring.add_key(SIZE_256, TEST_VALUE)
result = kernel_keyring.read_key(SIZE_256)
assert(result == TEST_VALUE)
kernel_keyring.del_key(SIZE_256)
def test_08(self):
"""
Test 512-bytes of data
"""
kernel_keyring.add_key(TEST_KEY, SIZE_512)
result = kernel_keyring.read_key(TEST_KEY)
assert(result == SIZE_512)
kernel_keyring.del_key(TEST_KEY)
def test_09(self):
"""
Test 1k bytes of data
"""
kernel_keyring.add_key(TEST_KEY, SIZE_1024)
result = kernel_keyring.read_key(TEST_KEY)
assert(result == SIZE_1024)
kernel_keyring.del_key(TEST_KEY)