Migrate OTP import script to python-cryptography

https://fedorahosted.org/freeipa/ticket/5192

Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
This commit is contained in:
Nathaniel McCallum 2015-08-31 10:46:19 -04:00 committed by Martin Basti
parent 3be696c92f
commit d00ae870dd
2 changed files with 80 additions and 124 deletions

View File

@ -29,11 +29,15 @@ import struct
from lxml import etree
import dateutil.parser
import dateutil.tz
import nss.nss as nss
import gssapi
import six
from six.moves import xrange
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf import pbkdf2
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from ipaplatform.paths import paths
from ipapython import admintool
from ipalib import api, errors
@ -119,13 +123,13 @@ def convertAlgorithm(value):
"Converts encryption URI to (mech, ivlen)."
return {
"http://www.w3.org/2001/04/xmlenc#aes128-cbc": (nss.CKM_AES_CBC_PAD, 128),
"http://www.w3.org/2001/04/xmlenc#aes192-cbc": (nss.CKM_AES_CBC_PAD, 192),
"http://www.w3.org/2001/04/xmlenc#aes256-cbc": (nss.CKM_AES_CBC_PAD, 256),
"http://www.w3.org/2001/04/xmlenc#tripledes-cbc": (nss.CKM_DES3_CBC_PAD, 64),
"http://www.w3.org/2001/04/xmldsig-more#camellia128": (nss.CKM_CAMELLIA_CBC_PAD, 128),
"http://www.w3.org/2001/04/xmldsig-more#camellia192": (nss.CKM_CAMELLIA_CBC_PAD, 192),
"http://www.w3.org/2001/04/xmldsig-more#camellia256": (nss.CKM_CAMELLIA_CBC_PAD, 256),
"http://www.w3.org/2001/04/xmlenc#aes128-cbc": (algorithms.AES, modes.CBC, 128),
"http://www.w3.org/2001/04/xmlenc#aes192-cbc": (algorithms.AES, modes.CBC, 192),
"http://www.w3.org/2001/04/xmlenc#aes256-cbc": (algorithms.AES, modes.CBC, 256),
"http://www.w3.org/2001/04/xmlenc#tripledes-cbc": (algorithms.TripleDES, modes.CBC, 64),
"http://www.w3.org/2001/04/xmldsig-more#camellia128": (algorithms.Camellia, modes.CBC, 128),
"http://www.w3.org/2001/04/xmldsig-more#camellia192": (algorithms.Camellia, modes.CBC, 192),
"http://www.w3.org/2001/04/xmldsig-more#camellia256": (algorithms.Camellia, modes.CBC, 256),
# TODO: add support for these formats.
# "http://www.w3.org/2001/04/xmlenc#kw-aes128": "kw-aes128",
@ -135,7 +139,7 @@ def convertAlgorithm(value):
# "http://www.w3.org/2001/04/xmldsig-more#kw-camellia128": "kw-camellia128",
# "http://www.w3.org/2001/04/xmldsig-more#kw-camellia192": "kw-camellia192",
# "http://www.w3.org/2001/04/xmldsig-more#kw-camellia256": "kw-camellia256",
}.get(value.lower(), (None, None))
}.get(value.lower(), (None, None, None))
def convertEncrypted(value, decryptor=None, pconv=base64.b64decode, econv=lambda x: x):
@ -170,50 +174,29 @@ class PBKDF2KeyDerivation(XMLKeyDerivation):
if params is None:
raise ValueError("XML file is missing PBKDF2 parameters!")
self.salt = fetch(params, "./xenc11:Salt/xenc11:Specified/text()", base64.b64decode)
self.iter = fetch(params, "./xenc11:IterationCount/text()", int)
self.klen = fetch(params, "./xenc11:KeyLength/text()", int)
self.hmod = fetch(params, "./xenc11:PRF/@Algorithm", convertHMACType, hashlib.sha1)
salt = fetch(params, "./xenc11:Salt/xenc11:Specified/text()", base64.b64decode)
itrs = fetch(params, "./xenc11:IterationCount/text()", int)
klen = fetch(params, "./xenc11:KeyLength/text()", int)
hmod = fetch(params, "./xenc11:PRF/@Algorithm", convertHMACType, hashlib.sha1)
if self.salt is None:
if salt is None:
raise ValueError("XML file is missing PBKDF2 salt!")
if self.iter is None:
if itrs is None:
raise ValueError("XML file is missing PBKDF2 iteration count!")
if self.klen is None:
if klen is None:
raise ValueError("XML file is missing PBKDF2 key length!")
self.kdf = pbkdf2.PBKDF2HMAC(
algorithm=hmod,
length=klen,
salt=salt,
iterations=itrs
)
def derive(self, masterkey):
mac = hmac.HMAC(masterkey, None, self.hmod)
# Figure out how many blocks we will have to combine
# to expand the master key to the desired length.
blocks = self.klen // mac.digest_size
if self.klen % mac.digest_size != 0:
blocks += 1
# Loop through each block adding it to the derived key.
dk = []
for i in range(1, blocks + 1):
# Set initial values.
last = self.salt + struct.pack('>I', i)
hash = [0] * mac.digest_size
# Perform n iterations.
for _j in xrange(self.iter):
tmp = mac.copy()
tmp.update(last)
last = tmp.digest()
# XOR the previous hash with the new hash.
for k in range(mac.digest_size):
hash[k] ^= ord(last[k])
# Add block to derived key.
dk.extend(hash)
return ''.join([chr(c) for c in dk])[:self.klen]
return self.kdf.derive(masterkey)
def convertKeyDerivation(value):
@ -230,13 +213,17 @@ class XMLDecryptor(object):
* RFC 6931"""
def __init__(self, key, hmac=None):
self.__key = nss.SecItem(key)
self.__key = key
self.__hmac = hmac
def __call__(self, element, mac=None):
(mech, ivlen) = fetch(element, "./xenc:EncryptionMethod/@Algorithm", convertAlgorithm)
(algo, mode, klen) = fetch(element, "./xenc:EncryptionMethod/@Algorithm", convertAlgorithm)
data = fetch(element, "./xenc:CipherData/xenc:CipherValue/text()", base64.b64decode)
# Make sure the key is the right length.
if len(self.__key) * 8 != klen:
raise ValidationError("Invalid key length!")
# If a MAC is present, perform validation.
if mac:
tmp = self.__hmac.copy()
@ -244,13 +231,14 @@ class XMLDecryptor(object):
if tmp.digest() != mac:
raise ValidationError("MAC validation failed!")
# Decrypt the data.
slot = nss.get_best_slot(mech)
key = nss.import_sym_key(slot, mech, nss.PK11_OriginUnwrap, nss.CKA_ENCRYPT, self.__key)
iv = nss.param_from_iv(mech, nss.SecItem(data[0:ivlen//8]))
ctx = nss.create_context_by_sym_key(mech, nss.CKA_DECRYPT, key, iv)
out = ctx.cipher_op(data[ivlen // 8:])
out += ctx.digest_final()
iv = data[:algo.block_size / 8]
data = data[len(iv):]
cipher = Cipher(algo(self.__key), mode(iv))
decryptor = cipher.decryptor()
out = decryptor.update(data)
out += decryptor.finalize()
return out
@ -469,14 +457,6 @@ class OTPTokenImport(admintool.AdminTool):
description = "Import OTP tokens."
usage = "%prog [options] <PSKC file> <output file>"
@classmethod
def main(cls, argv):
nss.nss_init_nodb()
try:
super(OTPTokenImport, cls).main(argv)
finally:
nss.nss_shutdown()
@classmethod
def add_options(cls, parser):
super(OTPTokenImport, cls).add_options(parser)

View File

@ -19,16 +19,13 @@
import os
import pytest
from nss import nss
from ipaserver.install.ipa_otptoken_import import PSKCDocument, ValidationError
basename = os.path.join(os.path.dirname(__file__), "data")
@pytest.mark.skipif(True, reason="Causes NSS errors. Ticket 5192")
@pytest.mark.tier1
class test_otptoken_import(object):
def test_figure3(self):
doc = PSKCDocument(os.path.join(basename, "pskc-figure3.xml"))
assert doc.keyname is None
@ -63,8 +60,6 @@ class test_otptoken_import(object):
assert False
def test_figure6(self):
nss.nss_init_nodb()
try:
doc = PSKCDocument(os.path.join(basename, "pskc-figure6.xml"))
assert doc.keyname == 'Pre-shared-key'
doc.setKey('12345678901234567890123456789012'.decode('hex'))
@ -76,12 +71,8 @@ class test_otptoken_import(object):
'ipatokenhotpcounter': 0,
'ipatokenotpdigits': 8,
'type': u'hotp'})]
finally:
nss.nss_shutdown()
def test_figure7(self):
nss.nss_init_nodb()
try:
doc = PSKCDocument(os.path.join(basename, "pskc-figure7.xml"))
assert doc.keyname == 'My Password 1'
doc.setKey('qwerty')
@ -92,33 +83,24 @@ class test_otptoken_import(object):
'ipatokenserial': u'987654321',
'ipatokenotpdigits': 8,
'type': u'hotp'})]
finally:
nss.nss_shutdown()
def test_figure8(self):
nss.nss_init_nodb()
try:
PSKCDocument(os.path.join(basename, "pskc-figure8.xml"))
except NotImplementedError: # X.509 is not supported.
pass
else:
assert False
finally:
nss.nss_shutdown()
def test_invalid(self):
nss.nss_init_nodb()
try:
PSKCDocument(os.path.join(basename, "pskc-invalid.xml"))
except ValueError: # File is invalid.
pass
else:
assert False
finally:
nss.nss_shutdown()
def test_mini(self):
nss.nss_init_nodb()
try:
doc = PSKCDocument(os.path.join(basename, "pskc-mini.xml"))
for t in doc.getKeyPackages():
@ -127,12 +109,8 @@ class test_otptoken_import(object):
pass
else:
assert False
finally:
nss.nss_shutdown()
def test_full(self):
nss.nss_init_nodb()
try:
doc = PSKCDocument(os.path.join(basename, "full.xml"))
assert [(t.id, t.options) for t in doc.getKeyPackages()] == \
[(u'KID1', {
@ -150,5 +128,3 @@ class test_otptoken_import(object):
'ipatokenotpdigits': 8,
'type': u'hotp',
})]
finally:
nss.nss_shutdown()