Use OpenSSH-style public keys as the preferred format of SSH public keys.

Public keys in the old format (raw RFC 4253 blob) are automatically
converted to OpenSSH-style public keys. OpenSSH-style public keys are now
stored in LDAP.

Changed sshpubkeyfp to be an output parameter, as that is what it actually
is.

Allow parameter normalizers to be used on values of any type, not just
unicode, so that public key blobs (which are str) can be normalized to
OpenSSH-style public keys.

ticket 2932, 2935
This commit is contained in:
Jan Cholasta
2012-09-03 09:33:30 -04:00
committed by Rob Crittenden
parent 0f81268ec4
commit 46ad724301
12 changed files with 464 additions and 90 deletions

View File

@@ -0,0 +1,76 @@
# Authors:
# Jan Cholasta <jcholast@redhat.com>
#
# Copyright (C) 2011 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Test the `ipapython/ssh.py` module.
"""
import base64
import nose
from ipapython import ssh
class CheckPublicKey:
def __init__(self, pk):
self.description = "Test SSH public key parsing (%s)" % repr(pk)
def __call__(self, pk, out):
try:
parsed = ssh.SSHPublicKey(pk)
assert parsed.openssh() == out
except Exception, e:
assert type(e) is out
def test_public_key_parsing():
b64 = 'AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L'
raw = base64.b64decode(b64)
openssh = 'ssh-rsa %s' % b64
pks = [
('\xff', UnicodeDecodeError),
(raw, openssh),
('\0\0\0\x04none', u'none AAAABG5vbmU='),
('\0\0\0', ValueError),
('\0\0\0\0', ValueError),
('\0\0\0\x01', ValueError),
('\0\0\0\x01\xff', ValueError),
(b64, openssh),
(unicode(b64), openssh),
(u'\n%s\n\n' % b64, openssh),
(u'AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'AAAAB', ValueError),
(openssh, openssh),
(unicode(openssh), openssh),
(u'none AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'\t \t ssh-rsa \t \t%s\t \tthis is a comment\t \t ' % b64,
u'%s this is a comment' % openssh),
(u'opt3,opt2="\tx ",opt1,opt2="\\"x " %s comment ' % openssh,
u'opt1,opt2="\\"x ",opt3 %s comment' % openssh),
(u'ssh-rsa\n%s' % b64, ValueError),
(u'ssh-rsa\t%s' % b64, ValueError),
(u'vanitas %s' % b64, ValueError),
(u'@opt %s' % openssh, ValueError),
(u'opt=val %s' % openssh, ValueError),
(u'opt, %s' % openssh, ValueError),
]
for pk in pks:
yield (CheckPublicKey(pk[0]),) + pk

View File

@@ -62,6 +62,9 @@ servercert = ''.join(servercert)
servercert = x509.strip_header(servercert)
fd.close()
sshpubkey = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L public key test'
sshpubkeyfp = u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B public key test (ssh-rsa)'
class test_host(Declarative):
cleanup_commands = [
@@ -541,6 +544,45 @@ class test_host(Declarative):
),
dict(
desc='Add SSH public key to %r' % fqdn1,
command=('host_mod', [fqdn1], dict(ipasshpubkey=[sshpubkey])),
expected=dict(
value=fqdn1,
summary=u'Modified host "%s"' % fqdn1,
result=dict(
description=[u'Updated host 1'],
fqdn=[fqdn1],
l=[u'Undisclosed location 1'],
krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
managedby_host=[u'%s' % fqdn1],
usercertificate=[base64.b64decode(servercert)],
valid_not_before=fuzzy_date,
valid_not_after=fuzzy_date,
subject=DN(('CN',api.env.host),x509.subject_base()),
serial_number=fuzzy_digits,
serial_number_hex=fuzzy_hex,
md5_fingerprint=fuzzy_hash,
sha1_fingerprint=fuzzy_hash,
issuer=fuzzy_issuer,
macaddress=[u'00:50:56:30:F6:5F', u'00:50:56:2C:8D:82'],
ipasshpubkey=[sshpubkey],
sshpubkeyfp=[sshpubkeyfp],
has_keytab=False,
has_password=False,
),
),
),
dict(
desc='Add an illegal SSH public key to %r' % fqdn1,
command=('host_mod', [fqdn1], dict(ipasshpubkey=[u'no-pty %s' % sshpubkey])),
expected=errors.ValidationError(name='sshpubkey',
error=u'options are not allowed'),
),
dict(
desc='Delete %r' % fqdn1,
command=('host_del', [fqdn1], {}),

View File

@@ -40,6 +40,9 @@ admins_group=u'admins'
invaliduser1=u'+tuser1'
invaliduser2=u'tuser1234567890123456789012345678901234567890'
sshpubkey = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L public key test'
sshpubkeyfp = u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B public key test (ssh-rsa)'
def get_user_dn(uid):
return DN(('uid', uid), api.env.container_user, api.env.basedn)
@@ -562,6 +565,64 @@ class test_user(Declarative):
),
dict(
desc='Create "%s" with SSH public key' % user1,
command=(
'user_add', [user1], dict(givenname=u'Test', sn=u'User1', ipasshpubkey=[sshpubkey])
),
expected=dict(
value=user1,
summary=u'Added user "%s"' % user1,
result=dict(
gecos=[u'Test User1'],
givenname=[u'Test'],
homedirectory=[u'/home/tuser1'],
krbprincipalname=[u'tuser1@' + api.env.realm],
loginshell=[u'/bin/sh'],
objectclass=objectclasses.user,
sn=[u'User1'],
uid=[user1],
uidnumber=[fuzzy_digits],
gidnumber=[fuzzy_digits],
displayname=[u'Test User1'],
cn=[u'Test User1'],
initials=[u'TU'],
mail=[u'%s@%s' % (user1, api.env.domain)],
ipasshpubkey=[sshpubkey],
sshpubkeyfp=[sshpubkeyfp],
ipauniqueid=[fuzzy_uuid],
krbpwdpolicyreference=[DN(('cn','global_policy'),('cn',api.env.realm),
('cn','kerberos'),api.env.basedn)],
mepmanagedentry=[get_group_dn(user1)],
memberof_group=[u'ipausers'],
has_keytab=False,
has_password=False,
dn=get_user_dn(user1),
),
),
extra_check = upg_check,
),
dict(
desc='Add an illegal SSH public key to "%r"' % user1,
command=('user_mod', [user1], dict(ipasshpubkey=[u"anal nathrach orth' bhais's bethad do che'l de'nmha"])),
expected=errors.ValidationError(name='sshpubkey',
error=u'invalid SSH public key'),
),
dict(
desc='Delete "%s"' % user1,
command=('user_del', [user1], {}),
expected=dict(
result=dict(failed=u''),
summary=u'Deleted user "%s"' % user1,
value=user1,
),
),
dict(
desc='Create "%s"' % user1,
command=(