Vault: port key wrapping to python-cryptography

https://fedorahosted.org/freeipa/ticket/6650

Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
This commit is contained in:
Christian Heimes 2017-02-25 13:09:11 +01:00 committed by Martin Basti
parent ba3c201a03
commit ed7a03a1af
2 changed files with 92 additions and 90 deletions

View File

@ -31,10 +31,11 @@ from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key,\
load_pem_private_key
import nss.nss as nss
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.serialization import (
load_pem_public_key, load_pem_private_key)
from cryptography.x509 import load_der_x509_certificate
from ipaclient.frontend import MethodOverride
from ipalib.frontend import Local, Method, Object
@ -639,6 +640,39 @@ class vault_archive(Local):
def _iter_output(self):
return self.api.Command.vault_archive_internal.output()
def _wrap_data(self, transport_cert_der, json_vault_data):
"""Encrypt data with wrapped session key and transport cert
:param bytes transport_cert_der: transport cert in DER encoding
:param bytes json_vault_data: dumped vault data
:return:
"""
transport_cert = load_der_x509_certificate(
transport_cert_der, default_backend())
public_key = transport_cert.public_key()
# generate session key
key_length = max(algorithms.TripleDES.key_sizes)
algo = algorithms.TripleDES(os.urandom(key_length // 8))
nonce = os.urandom(algo.block_size // 8)
# wrap session key with transport certificate
wrapped_session_key = public_key.encrypt(
algo.key,
padding.PKCS1v15()
)
# wrap vault_data with session key
padder = PKCS7(algo.block_size).padder()
padded_data = padder.update(json_vault_data)
padded_data += padder.finalize()
cipher = Cipher(algo, modes.CBC(nonce), backend=default_backend())
encryptor = cipher.encryptor()
wrapped_vault_data = encryptor.update(padded_data) + encryptor.finalize()
return wrapped_session_key, nonce, wrapped_vault_data
def forward(self, *args, **options):
data = options.get('data')
input_file = options.get('in')
@ -762,57 +796,29 @@ class vault_archive(Local):
name='vault_type',
error=_('Invalid vault type'))
# initialize NSS database
nss.nss_init(api.env.nss_dir)
# retrieve transport certificate
config = self.api.Command.vaultconfig_show()['result']
transport_cert_der = config['transport_cert']
nss_transport_cert = nss.Certificate(transport_cert_der)
# generate session key
mechanism = nss.CKM_DES3_CBC_PAD
slot = nss.get_best_slot(mechanism)
key_length = slot.get_best_key_length(mechanism)
session_key = slot.key_gen(mechanism, None, key_length)
# wrap session key with transport certificate
# pylint: disable=no-member
public_key = nss_transport_cert.subject_public_key_info.public_key
# pylint: enable=no-member
wrapped_session_key = nss.pub_wrap_sym_key(mechanism,
public_key,
session_key)
options['session_key'] = wrapped_session_key.data
nonce_length = nss.get_iv_length(mechanism)
nonce = nss.generate_random(nonce_length)
options['nonce'] = nonce
vault_data = {}
vault_data[u'data'] = base64.b64encode(data).decode('utf-8')
vault_data = {
'data': base64.b64encode(data).decode('utf-8')
}
if encrypted_key:
vault_data[u'encrypted_key'] = base64.b64encode(encrypted_key)\
.decode('utf-8')
json_vault_data = json.dumps(vault_data)
json_vault_data = json.dumps(vault_data).encode('utf-8')
# wrap vault_data with session key
iv_si = nss.SecItem(nonce)
iv_param = nss.param_from_iv(mechanism, iv_si)
encoding_ctx = nss.create_context_by_sym_key(mechanism,
nss.CKA_ENCRYPT,
session_key,
iv_param)
wrapped_vault_data = encoding_ctx.cipher_op(json_vault_data)\
+ encoding_ctx.digest_final()
options['vault_data'] = wrapped_vault_data
# retrieve transport certificate
config = self.api.Command.vaultconfig_show()['result']
transport_cert_der = config['transport_cert']
# created wrapped session key and wrap vault data
wrapped_session_key, nonce, wrapped_vault_data = self._wrap_data(
transport_cert_der, json_vault_data
)
options.update(
session_key=wrapped_session_key,
nonce=nonce,
vault_data=wrapped_vault_data
)
return self.api.Command.vault_archive_internal(*args, **options)
@ -893,6 +899,33 @@ class vault_retrieve(Local):
def _iter_output(self):
return self.api.Command.vault_retrieve_internal.output()
def _wrap_session_key(self, transport_cert_der):
transport_cert = load_der_x509_certificate(
transport_cert_der, default_backend())
public_key = transport_cert.public_key()
# generate session key
key_length = max(algorithms.TripleDES.key_sizes)
algo = algorithms.TripleDES(os.urandom(key_length // 8))
# wrap session key with transport certificate
wrapped_session_key = public_key.encrypt(
algo.key,
padding.PKCS1v15()
)
return algo, wrapped_session_key
def _unwrap_response(self, algo, nonce, vault_data):
cipher = Cipher(algo, modes.CBC(nonce), backend=default_backend())
# decrypt
decryptor = cipher.decryptor()
padded_data = decryptor.update(vault_data)
padded_data += decryptor.finalize()
# remove padding
unpadder = PKCS7(algo.block_size).unpadder()
json_vault_data = unpadder.update(padded_data)
json_vault_data += unpadder.finalize()
# load JSON
return json.loads(json_vault_data.decode('utf-8'))
def forward(self, *args, **options):
output_file = options.get('out')
@ -922,56 +955,26 @@ class vault_retrieve(Local):
# retrieve vault info
vault = self.api.Command.vault_show(*args, **options)['result']
vault_type = vault['ipavaulttype'][0]
# initialize NSS database
nss.nss_init(api.env.nss_dir)
# retrieve transport certificate
config = self.api.Command.vaultconfig_show()['result']
transport_cert_der = config['transport_cert']
nss_transport_cert = nss.Certificate(transport_cert_der)
# generate session key
mechanism = nss.CKM_DES3_CBC_PAD
slot = nss.get_best_slot(mechanism)
key_length = slot.get_best_key_length(mechanism)
session_key = slot.key_gen(mechanism, None, key_length)
# wrap session key with transport certificate
# pylint: disable=no-member
public_key = nss_transport_cert.subject_public_key_info.public_key
# pylint: enable=no-member
wrapped_session_key = nss.pub_wrap_sym_key(mechanism,
public_key,
session_key)
# create algo and wrap session key with transport cert
algo, wrapped_session_key = self._wrap_session_key(
config['transport_cert']
)
# send retrieval request to server
options['session_key'] = wrapped_session_key.data
options['session_key'] = wrapped_session_key
response = self.api.Command.vault_retrieve_internal(*args, **options)
result = response['result']
nonce = result['nonce']
# unwrap data with session key
wrapped_vault_data = result['vault_data']
vault_data = self._unwrap_response(
algo,
response['result']['nonce'],
response['result']['vault_data']
)
del algo
iv_si = nss.SecItem(nonce)
iv_param = nss.param_from_iv(mechanism, iv_si)
decoding_ctx = nss.create_context_by_sym_key(mechanism,
nss.CKA_DECRYPT,
session_key,
iv_param)
json_vault_data = decoding_ctx.cipher_op(wrapped_vault_data)\
+ decoding_ctx.digest_final()
vault_data = json.loads(json_vault_data.decode('utf-8'))
data = base64.b64decode(vault_data[u'data'].encode('utf-8'))
encrypted_key = None
if 'encrypted_key' in vault_data:

View File

@ -48,7 +48,6 @@ if __name__ == '__main__':
"ipalib",
"ipapython",
"jinja2",
"python-nss",
"python-yubico",
"pyusb",
"qrcode",