mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-27 01:11:32 -06:00
8661611d3e
Refactor convertHashName() method to accept hash names prefixed with HMAC- or any other prefix. Extending the method should be easier in future. Add tests proposed by Rob Crittenden to make sure we don't regress with expected behavior of convertHashName(). Fixes https://pagure.io/freeipa/issue/7146 Reviewed-By: Rob Crittenden <rcritten@redhat.com>
569 lines
20 KiB
Python
569 lines
20 KiB
Python
# Authors: Nathaniel McCallum <npmccallum@redhat.com>
|
|
#
|
|
# Copyright (C) 2014 Red Hat
|
|
# see file 'COPYING' for use and warranty information
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import abc
|
|
import base64
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import uuid
|
|
|
|
from lxml import etree
|
|
import dateutil.parser
|
|
import dateutil.tz
|
|
import gssapi
|
|
import six
|
|
|
|
from cryptography.exceptions import InvalidSignature
|
|
from cryptography.hazmat.primitives import hashes, hmac
|
|
from cryptography.hazmat.primitives.padding import PKCS7
|
|
from cryptography.hazmat.primitives.kdf import pbkdf2
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
from ipaplatform.paths import paths
|
|
from ipapython import admintool
|
|
from ipalib import api, errors
|
|
from ipaserver.plugins.ldap2 import AUTOBIND_DISABLED
|
|
|
|
if six.PY3:
|
|
unicode = str
|
|
long = int
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ValidationError(Exception):
|
|
pass
|
|
|
|
|
|
def fetchAll(element, xpath, conv=lambda x: x):
|
|
return [conv(e) for e in element.xpath(xpath, namespaces={
|
|
"pkcs5": "http://www.rsasecurity.com/rsalabs/pkcs/schemas/pkcs-5v2-0#",
|
|
"pskc": "urn:ietf:params:xml:ns:keyprov:pskc",
|
|
"xenc11": "http://www.w3.org/2009/xmlenc11#",
|
|
"xenc": "http://www.w3.org/2001/04/xmlenc#",
|
|
"ds": "http://www.w3.org/2000/09/xmldsig#",
|
|
})]
|
|
|
|
|
|
def fetch(element, xpath, conv=lambda x: x, default=None):
|
|
result = fetchAll(element, xpath, conv)
|
|
return result[0] if result else default
|
|
|
|
|
|
def convertDate(value):
|
|
"Converts an ISO 8601 string into a UTC datetime object."
|
|
|
|
dt = dateutil.parser.parse(value)
|
|
|
|
# pylint: disable=E1101
|
|
if dt.tzinfo is None:
|
|
dt = datetime.datetime(*dt.timetuple()[0:6],
|
|
tzinfo=dateutil.tz.tzlocal())
|
|
|
|
return dt.astimezone(dateutil.tz.tzutc())
|
|
|
|
|
|
def convertTokenType(value):
|
|
"Converts token algorithm URI to token type string."
|
|
|
|
return {
|
|
"urn:ietf:params:xml:ns:keyprov:pskc:hotp": u"hotp",
|
|
"urn:ietf:params:xml:ns:keyprov:pskc#hotp": u"hotp",
|
|
"urn:ietf:params:xml:ns:keyprov:pskc:totp": u"totp",
|
|
"urn:ietf:params:xml:ns:keyprov:pskc#totp": u"totp",
|
|
}.get(value.lower(), None)
|
|
|
|
|
|
def convertHashName(value):
|
|
"Converts hash names to their canonical names."
|
|
|
|
default_hash = u"sha1"
|
|
known_prefixes = ("", "hmac-",)
|
|
known_hashes = {
|
|
"sha1": u"sha1",
|
|
"sha224": u"sha224",
|
|
"sha256": u"sha256",
|
|
"sha384": u"sha384",
|
|
"sha512": u"sha512",
|
|
"sha-1": u"sha1",
|
|
"sha-224": u"sha224",
|
|
"sha-256": u"sha256",
|
|
"sha-384": u"sha384",
|
|
"sha-512": u"sha512",
|
|
}
|
|
|
|
if value is None:
|
|
return default_hash
|
|
|
|
v = value.lower()
|
|
for prefix in known_prefixes:
|
|
if prefix:
|
|
w = v[len(prefix):]
|
|
else:
|
|
w = v
|
|
result = known_hashes.get(w)
|
|
if result is not None:
|
|
break
|
|
else:
|
|
result = default_hash
|
|
|
|
return result
|
|
|
|
|
|
def convertHMACType(value):
|
|
"Converts HMAC URI to hashlib object."
|
|
|
|
return {
|
|
"http://www.w3.org/2000/09/xmldsig#hmac-sha1": hashes.SHA1,
|
|
"http://www.w3.org/2001/04/xmldsig-more#hmac-sha224": hashes.SHA224,
|
|
"http://www.w3.org/2001/04/xmldsig-more#hmac-sha256": hashes.SHA256,
|
|
"http://www.w3.org/2001/04/xmldsig-more#hmac-sha384": hashes.SHA384,
|
|
"http://www.w3.org/2001/04/xmldsig-more#hmac-sha512": hashes.SHA512,
|
|
}.get(value.lower(), hashes.SHA1)
|
|
|
|
|
|
def convertAlgorithm(value):
|
|
"Converts encryption URI to (mech, ivlen)."
|
|
|
|
return {
|
|
"http://www.w3.org/2001/04/xmlenc#aes128-cbc": (
|
|
algorithms.AES, modes.CBC, 128),
|
|
"http://www.w3.org/2001/04/xmlenc#aes192-cbc": (
|
|
algorithms.AES, modes.CBC, 192),
|
|
"http://www.w3.org/2001/04/xmlenc#aes256-cbc": (
|
|
algorithms.AES, modes.CBC, 256),
|
|
"http://www.w3.org/2001/04/xmlenc#tripledes-cbc": (
|
|
algorithms.TripleDES, modes.CBC, 64),
|
|
"http://www.w3.org/2001/04/xmldsig-more#camellia128": (
|
|
algorithms.Camellia, modes.CBC, 128),
|
|
"http://www.w3.org/2001/04/xmldsig-more#camellia192": (
|
|
algorithms.Camellia, modes.CBC, 192),
|
|
"http://www.w3.org/2001/04/xmldsig-more#camellia256": (
|
|
algorithms.Camellia, modes.CBC, 256),
|
|
|
|
# TODO: add support for these formats.
|
|
# "http://www.w3.org/2001/04/xmlenc#kw-aes128": "kw-aes128",
|
|
# "http://www.w3.org/2001/04/xmlenc#kw-aes192": "kw-aes192",
|
|
# "http://www.w3.org/2001/04/xmlenc#kw-aes256": "kw-aes256",
|
|
# "http://www.w3.org/2001/04/xmlenc#kw-tripledes": "kw-tripledes",
|
|
# "http://www.w3.org/2001/04/xmldsig-more#kw-camellia128": "kw-camellia128",
|
|
# "http://www.w3.org/2001/04/xmldsig-more#kw-camellia192": "kw-camellia192",
|
|
# "http://www.w3.org/2001/04/xmldsig-more#kw-camellia256": "kw-camellia256",
|
|
}.get(value.lower(), (None, None, None))
|
|
|
|
|
|
def convertEncrypted(value, decryptor=None, pconv=base64.b64decode, econv=lambda x: x):
|
|
"Converts a value element, decrypting if necessary. See RFC 6030."
|
|
|
|
v = fetch(value, "./pskc:PlainValue/text()", pconv)
|
|
if v is not None:
|
|
return v
|
|
|
|
mac = fetch(value, "./pskc:ValueMAC/text()", base64.b64decode)
|
|
ev = fetch(value, "./pskc:EncryptedValue")
|
|
if ev is not None and decryptor is not None:
|
|
return econv(decryptor(ev, mac))
|
|
|
|
return None
|
|
|
|
|
|
class XMLKeyDerivation(six.with_metaclass(abc.ABCMeta, object)):
|
|
"Interface for XML Encryption 1.1 key derivation."
|
|
@abc.abstractmethod
|
|
def __init__(self, enckey):
|
|
"Sets up key derivation parameters from the parent XML entity."
|
|
|
|
@abc.abstractmethod
|
|
def derive(self, masterkey):
|
|
"Derives a key from the master key."
|
|
|
|
|
|
class PBKDF2KeyDerivation(XMLKeyDerivation):
|
|
def __init__(self, enckey):
|
|
params = fetch(enckey, "./xenc11:DerivedKey/xenc11:KeyDerivationMethod/pkcs5:PBKDF2-params")
|
|
if params is None:
|
|
raise ValueError("XML file is missing PBKDF2 parameters!")
|
|
|
|
salt = fetch(params, "./Salt/Specified/text()", base64.b64decode)
|
|
itrs = fetch(params, "./IterationCount/text()", int)
|
|
klen = fetch(params, "./KeyLength/text()", int)
|
|
hmod = fetch(params, "./PRF/@Algorithm", convertHMACType, hashes.SHA1)
|
|
|
|
if salt is None:
|
|
raise ValueError("XML file is missing PBKDF2 salt!")
|
|
|
|
if itrs is None:
|
|
raise ValueError("XML file is missing PBKDF2 iteration count!")
|
|
|
|
if klen is None:
|
|
raise ValueError("XML file is missing PBKDF2 key length!")
|
|
|
|
self.kdf = pbkdf2.PBKDF2HMAC(
|
|
algorithm=hmod(),
|
|
length=klen,
|
|
salt=salt,
|
|
iterations=itrs,
|
|
backend=default_backend()
|
|
)
|
|
|
|
def derive(self, masterkey):
|
|
return self.kdf.derive(masterkey)
|
|
|
|
|
|
def convertKeyDerivation(value):
|
|
"Converts key derivation URI to a BaseKeyDerivation class."
|
|
|
|
return {
|
|
"http://www.rsasecurity.com/rsalabs/pkcs/schemas/pkcs-5v2-0#pbkdf2": PBKDF2KeyDerivation,
|
|
}.get(value.lower(), None)
|
|
|
|
|
|
class XMLDecryptor(object):
|
|
"""This decrypts values from XML as specified in:
|
|
* http://www.w3.org/TR/xmlenc-core/
|
|
* RFC 6931"""
|
|
|
|
def __init__(self, key, hmac=None):
|
|
self.__key = key
|
|
self.__hmac = hmac
|
|
|
|
def __call__(self, element, mac=None):
|
|
algo, mode, klen = fetch(
|
|
element, "./xenc:EncryptionMethod/@Algorithm", convertAlgorithm)
|
|
data = fetch(
|
|
element,
|
|
"./xenc:CipherData/xenc:CipherValue/text()",
|
|
base64.b64decode
|
|
)
|
|
|
|
# Make sure the key is the right length.
|
|
if len(self.__key) * 8 != klen:
|
|
raise ValidationError("Invalid key length!")
|
|
|
|
# If a MAC is present, perform validation.
|
|
if mac:
|
|
tmp = self.__hmac.copy()
|
|
tmp.update(data)
|
|
try:
|
|
tmp.verify(mac)
|
|
except InvalidSignature as e:
|
|
raise ValidationError("MAC validation failed!", e)
|
|
|
|
iv = data[:algo.block_size // 8]
|
|
data = data[len(iv):]
|
|
|
|
algorithm = algo(self.__key)
|
|
cipher = Cipher(algorithm, mode(iv), default_backend())
|
|
decryptor = cipher.decryptor()
|
|
padded = decryptor.update(data)
|
|
padded += decryptor.finalize()
|
|
|
|
unpadder = PKCS7(algorithm.block_size).unpadder()
|
|
out = unpadder.update(padded)
|
|
out += unpadder.finalize()
|
|
|
|
return out
|
|
|
|
|
|
class PSKCKeyPackage(object):
|
|
_XML = {
|
|
'pskc:DeviceInfo': {
|
|
'pskc:IssueNo/text()': ('issueno', unicode),
|
|
'pskc:ExpiryDate/text()': ('notafter.hw', convertDate),
|
|
'pskc:Manufacturer/text()': ('vendor', unicode),
|
|
'pskc:Model/text()': ('model', unicode),
|
|
'pskc:SerialNo/text()': ('serial', unicode),
|
|
'pskc:StartDate/text()': ('notbefore.hw', convertDate),
|
|
'pskc:UserId/text()': ('owner', unicode),
|
|
},
|
|
|
|
'pskc:Key': {
|
|
'@Algorithm': ('type', convertTokenType),
|
|
'@Id': ('id', unicode),
|
|
'pskc:FriendlyName/text()': ('description', unicode),
|
|
'pskc:Issuer/text()': ('issuer', unicode),
|
|
'pskc:KeyReference/text()': ('keyref', unicode),
|
|
|
|
'pskc:AlgorithmParameters': {
|
|
'pskc:Suite/text()': ('algorithm', convertHashName),
|
|
'pskc:ResponseFormat/@CheckDigit': ('checkdigit', unicode),
|
|
'pskc:ResponseFormat/@Encoding': ('encoding', unicode),
|
|
'pskc:ResponseFormat/@Length': ('digits', int),
|
|
},
|
|
|
|
'pskc:Data': {
|
|
'pskc:Counter': ('counter', lambda v, d: convertEncrypted(v, d, long, long)),
|
|
'pskc:Secret': ('key', convertEncrypted),
|
|
'pskc:Time': ('time', lambda v, d: convertEncrypted(v, d, int, int)),
|
|
'pskc:TimeDrift': ('offset', lambda v, d: convertEncrypted(v, d, int, int)),
|
|
'pskc:TimeInterval': ('interval', lambda v, d: convertEncrypted(v, d, int, int)),
|
|
},
|
|
|
|
'pskc:Policy': {
|
|
'pskc:ExpiryDate/text()': ('notafter.sw', convertDate),
|
|
'pskc:KeyUsage/text()': ('keyusage', unicode),
|
|
'pskc:NumberOfTransactions': ('maxtransact', lambda v: v),
|
|
'pskc:PINPolicy': ('pinpolicy', lambda v: v),
|
|
'pskc:StartDate/text()': ('notbefore.sw', convertDate),
|
|
},
|
|
},
|
|
}
|
|
|
|
_MAP = (
|
|
('type', 'type', lambda v, o: v.strip()),
|
|
('description', 'description', lambda v, o: v.strip()),
|
|
('vendor', 'ipatokenvendor', lambda v, o: v.strip()),
|
|
('model', 'ipatokenmodel', lambda v, o: v.strip()),
|
|
('serial', 'ipatokenserial', lambda v, o: v.strip()),
|
|
('issueno', 'ipatokenserial', lambda v, o: o.get('ipatokenserial', '') + '-' + v.strip()),
|
|
(
|
|
'key',
|
|
'ipatokenotpkey',
|
|
lambda v, o: base64.b32encode(v).decode('ascii')
|
|
),
|
|
('digits', 'ipatokenotpdigits', lambda v, o: v),
|
|
('algorithm', 'ipatokenotpalgorithm', lambda v, o: v),
|
|
('counter', 'ipatokenhotpcounter', lambda v, o: v),
|
|
('interval', 'ipatokentotptimestep', lambda v, o: v),
|
|
('offset', 'ipatokentotpclockoffset', lambda v, o: o.get('ipatokentotptimestep', 30) * v),
|
|
)
|
|
|
|
def __init__(self, element, decryptor):
|
|
self.__element = element
|
|
self.__decryptor = decryptor
|
|
self.__id = None
|
|
self.__options = None
|
|
|
|
@property
|
|
def id(self):
|
|
if self.__id is None:
|
|
self.__process()
|
|
|
|
return self.__id
|
|
|
|
@property
|
|
def options(self):
|
|
if self.__options is None:
|
|
self.__process()
|
|
|
|
return self.__options
|
|
|
|
def remove(self):
|
|
self.__element.getparent().remove(self.__element)
|
|
|
|
def __process(self):
|
|
# Parse and validate.
|
|
data = self.__parse(self.__decryptor, self.__element, ".", self._XML)
|
|
self.__validate(data)
|
|
|
|
# Copy values into output.
|
|
options = {}
|
|
for (dk, ok, f) in self._MAP:
|
|
if dk in data:
|
|
options[ok] = f(data[dk], options)
|
|
|
|
# Copy validity dates.
|
|
self.__dates(options, data, 'notbefore', max)
|
|
self.__dates(options, data, 'notafter', min)
|
|
|
|
# Save attributes.
|
|
self.__options = options
|
|
self.__id = data.get('id', uuid.uuid4())
|
|
|
|
def __parse(self, decryptor, element, prefix, table):
|
|
"Recursively parses the xml from a table."
|
|
|
|
data = {}
|
|
for k, v in table.items():
|
|
path = prefix + "/" + k
|
|
|
|
if isinstance(v, dict):
|
|
data.update(self.__parse(decryptor, element, path, v))
|
|
continue
|
|
|
|
result = fetch(element, path)
|
|
if result is not None:
|
|
lambda_code_attr = "__code__" if six.PY3 else "func_code"
|
|
if getattr(
|
|
getattr(v[1], lambda_code_attr, None),
|
|
"co_argcount", 0) > 1:
|
|
data[v[0]] = v[1](result, decryptor)
|
|
else:
|
|
data[v[0]] = v[1](result)
|
|
|
|
return data
|
|
|
|
def __validate(self, data):
|
|
"Validates the parsed data."
|
|
|
|
if 'type' not in data or data['type'] not in ('totp', 'hotp'):
|
|
raise ValidationError("Unsupported token type!")
|
|
|
|
if 'key' not in data:
|
|
if 'keyref' in data:
|
|
raise ValidationError("Referenced keys are not supported!")
|
|
raise ValidationError("Key not found in token!")
|
|
|
|
if data.get('checkdigit', 'FALSE').upper() != 'FALSE':
|
|
raise ValidationError("CheckDigit not supported!")
|
|
|
|
if data.get('maxtransact', None) is not None:
|
|
raise ValidationError('NumberOfTransactions policy not supported!')
|
|
|
|
if data.get('pinpolicy', None) is not None:
|
|
raise ValidationError('PINPolicy policy not supported!')
|
|
|
|
if data.get('time', 0) != 0:
|
|
raise ValidationError('Specified time is not supported!')
|
|
|
|
encoding = data.get('encoding', 'DECIMAL').upper()
|
|
if encoding != 'DECIMAL':
|
|
raise ValidationError('Unsupported encoding: %s!' % encoding)
|
|
|
|
usage = data.get('keyusage', 'OTP')
|
|
if usage != 'OTP':
|
|
raise ValidationError('Unsupported key usage: %s' % usage)
|
|
|
|
def __dates(self, out, data, key, reducer):
|
|
dates = (data.get(key + '.sw', None), data.get(key + '.hw', None))
|
|
dates = [x for x in dates if x is not None]
|
|
if dates:
|
|
out['ipatoken' + key] = unicode(reducer(dates).strftime("%Y%m%d%H%M%SZ"))
|
|
|
|
|
|
class PSKCDocument(object):
|
|
@property
|
|
def keyname(self):
|
|
return self.__keyname
|
|
|
|
def __init__(self, filename):
|
|
self.__keyname = None
|
|
self.__decryptor = None
|
|
self.__doc = etree.parse(filename)
|
|
self.__mkey = fetch(self.__doc, "./pskc:MACMethod/pskc:MACKey")
|
|
self.__algo = fetch(self.__doc, "./pskc:MACMethod/@Algorithm", convertHMACType)
|
|
|
|
self.__keypackages = fetchAll(self.__doc, "./pskc:KeyPackage")
|
|
if not self.__keypackages:
|
|
raise ValueError("PSKC file is invalid!")
|
|
|
|
self.__enckey = fetch(self.__doc, "./pskc:EncryptionKey")
|
|
if self.__enckey is not None:
|
|
# Check for x509 key.
|
|
x509key = fetch(self.__enckey, "./ds:X509Data")
|
|
if x509key is not None:
|
|
raise NotImplementedError("X.509 keys are not currently supported!")
|
|
|
|
# Get the keyname.
|
|
self.__keyname = fetch(self.__enckey, "./ds:KeyName/text()")
|
|
if self.__keyname is None:
|
|
self.__keyname = fetch(self.__enckey,
|
|
"./xenc11:DerivedKey/xenc11:MasterKeyName/text()")
|
|
|
|
def setKey(self, key):
|
|
# Derive the enckey if required.
|
|
kd = fetch(self.__enckey,
|
|
"./xenc11:DerivedKey/xenc11:KeyDerivationMethod/@Algorithm",
|
|
convertKeyDerivation)
|
|
if kd is not None:
|
|
key = kd(self.__enckey).derive(key)
|
|
|
|
# Load the decryptor.
|
|
self.__decryptor = XMLDecryptor(key)
|
|
if self.__mkey is not None and self.__algo is not None:
|
|
tmp = hmac.HMAC(
|
|
self.__decryptor(self.__mkey),
|
|
self.__algo(),
|
|
backend=default_backend()
|
|
)
|
|
self.__decryptor = XMLDecryptor(key, tmp)
|
|
|
|
def getKeyPackages(self):
|
|
for kp in self.__keypackages:
|
|
yield PSKCKeyPackage(kp, self.__decryptor)
|
|
|
|
def save(self, dest):
|
|
self.__doc.write(dest)
|
|
|
|
|
|
class OTPTokenImport(admintool.AdminTool):
|
|
command_name = 'ipa-otptoken-import'
|
|
description = "Import OTP tokens."
|
|
usage = "%prog [options] <PSKC file> <output file>"
|
|
|
|
@classmethod
|
|
def add_options(cls, parser):
|
|
super(OTPTokenImport, cls).add_options(parser)
|
|
|
|
parser.add_option("-k", "--keyfile", dest="keyfile",
|
|
help="File containing the key used to decrypt token secrets")
|
|
|
|
def validate_options(self):
|
|
super(OTPTokenImport, self).validate_options()
|
|
|
|
# Parse the file.
|
|
if len(self.args) < 1:
|
|
raise admintool.ScriptError("Import file required!")
|
|
self.doc = PSKCDocument(self.args[0])
|
|
|
|
# Get the output file.
|
|
if len(self.args) < 2:
|
|
raise admintool.ScriptError("Output file required!")
|
|
self.output = self.args[1]
|
|
if os.path.exists(self.output):
|
|
raise admintool.ScriptError("Output file already exists!")
|
|
|
|
# Verify a key is provided if one is needed.
|
|
if self.doc.keyname is not None:
|
|
if self.safe_options.keyfile is None: # pylint: disable=no-member
|
|
raise admintool.ScriptError("Encryption key required: %s!" % self.doc.keyname)
|
|
|
|
# Load the keyfile.
|
|
keyfile = self.safe_options.keyfile # pylint: disable=no-member
|
|
with open(keyfile) as f:
|
|
self.doc.setKey(f.read())
|
|
|
|
def run(self):
|
|
api.bootstrap(in_server=True, confdir=paths.ETC_IPA)
|
|
api.finalize()
|
|
|
|
try:
|
|
api.Backend.ldap2.connect(ccache=os.environ.get('KRB5CCNAME'),
|
|
autobind=AUTOBIND_DISABLED)
|
|
except (gssapi.exceptions.GSSError, errors.ACIError):
|
|
raise admintool.ScriptError("Unable to connect to LDAP! Did you kinit?")
|
|
|
|
try:
|
|
# Parse tokens
|
|
for keypkg in self.doc.getKeyPackages():
|
|
try:
|
|
api.Command.otptoken_add(keypkg.id, no_qrcode=True, **keypkg.options)
|
|
except Exception as e:
|
|
logger.warning("Error adding token: %s", e)
|
|
else:
|
|
logger.info("Added token: %s", keypkg.id)
|
|
keypkg.remove()
|
|
finally:
|
|
api.Backend.ldap2.disconnect()
|
|
|
|
# Write out the XML file without the tokens that succeeded.
|
|
self.doc.save(self.output)
|