freeipa/ipaclient/plugins/vault.py
Jan Cholasta 98bb5397c5 vault: cache the transport certificate on client
Cache the KRA transport certificate on disk (in ~/.cache/ipa) as well as
in memory.

https://fedorahosted.org/freeipa/ticket/6652

Reviewed-By: Martin Basti <mbasti@redhat.com>
Reviewed-By: Christian Heimes <cheimes@redhat.com>
2017-03-13 16:02:16 +01:00

1155 lines
36 KiB
Python

# Authors:
# Endi S. Dewata <edewata@redhat.com>
#
# Copyright (C) 2015 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import base64
import collections
import errno
import getpass
import io
import json
import os
import sys
import tempfile
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.serialization import (
load_pem_public_key, load_pem_private_key)
from ipaclient.frontend import MethodOverride
from ipalib import x509
from ipalib.constants import USER_CACHE_PATH
from ipalib.frontend import Local, Method, Object
from ipalib.util import classproperty
from ipalib import api, errors
from ipalib import Bytes, Flag, Str
from ipalib.plugable import Registry
from ipalib import _
from ipapython.dnsutil import DNSName
from ipapython.ipa_log_manager import log_mgr
logger = log_mgr.get_logger(__name__)
def validated_read(argname, filename, mode='r', encoding=None):
"""Read file and catch errors
IOError and UnicodeError (for text files) are turned into a
ValidationError
"""
try:
with io.open(filename, mode=mode, encoding=encoding) as f:
data = f.read()
except IOError as exc:
raise errors.ValidationError(
name=argname,
error=_("Cannot read file '%(filename)s': %(exc)s") % {
'filename': filename, 'exc': exc.args[1]
}
)
except UnicodeError as exc:
raise errors.ValidationError(
name=argname,
error=_("Cannot decode file '%(filename)s': %(exc)s") % {
'filename': filename, 'exc': exc
}
)
return data
register = Registry()
MAX_VAULT_DATA_SIZE = 2**20 # = 1 MB
def get_new_password():
"""
Gets new password from user and verify it.
"""
while True:
password = getpass.getpass('New password: ').decode(
sys.stdin.encoding)
password2 = getpass.getpass('Verify password: ').decode(
sys.stdin.encoding)
if password == password2:
return password
print(' ** Passwords do not match! **')
def get_existing_password():
"""
Gets existing password from user.
"""
return getpass.getpass('Password: ').decode(sys.stdin.encoding)
def generate_symmetric_key(password, salt):
"""
Generates symmetric key from password and salt.
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
return base64.b64encode(kdf.derive(password.encode('utf-8')))
def encrypt(data, symmetric_key=None, public_key=None):
"""
Encrypts data with symmetric key or public key.
"""
if symmetric_key:
fernet = Fernet(symmetric_key)
return fernet.encrypt(data)
elif public_key:
public_key_obj = load_pem_public_key(
data=public_key,
backend=default_backend()
)
return public_key_obj.encrypt(
data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
def decrypt(data, symmetric_key=None, private_key=None):
"""
Decrypts data with symmetric key or public key.
"""
if symmetric_key:
try:
fernet = Fernet(symmetric_key)
return fernet.decrypt(data)
except InvalidToken:
raise errors.AuthenticationError(
message=_('Invalid credentials'))
elif private_key:
try:
private_key_obj = load_pem_private_key(
data=private_key,
password=None,
backend=default_backend()
)
return private_key_obj.decrypt(
data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
except ValueError:
raise errors.AuthenticationError(
message=_('Invalid credentials'))
@register(no_fail=True)
class _fake_vault(Object):
name = 'vault'
@register(no_fail=True)
class _fake_vault_add_internal(Method):
name = 'vault_add_internal'
NO_CLI = True
@register()
class vault_add(Local):
__doc__ = _('Create a new vault.')
takes_options = (
Str(
'password?',
cli_name='password',
doc=_('Vault password'),
),
Str( # TODO: use File parameter
'password_file?',
cli_name='password_file',
doc=_('File containing the vault password'),
),
Str( # TODO: use File parameter
'public_key_file?',
cli_name='public_key_file',
doc=_('File containing the vault public key'),
),
)
@classmethod
def __NO_CLI_getter(cls):
return (api.Command.get_plugin('vault_add_internal') is
_fake_vault_add_internal)
NO_CLI = classproperty(__NO_CLI_getter)
@property
def api_version(self):
return self.api.Command.vault_add_internal.api_version
def get_args(self):
for arg in self.api.Command.vault_add_internal.args():
yield arg
for arg in super(vault_add, self).get_args():
yield arg
def get_options(self):
for option in self.api.Command.vault_add_internal.options():
if option.name not in ('ipavaultsalt', 'version'):
yield option
for option in super(vault_add, self).get_options():
yield option
def get_output_params(self):
for param in self.api.Command.vault_add_internal.output_params():
yield param
for param in super(vault_add, self).get_output_params():
yield param
def _iter_output(self):
return self.api.Command.vault_add_internal.output()
def forward(self, *args, **options):
vault_type = options.get('ipavaulttype')
if vault_type is None:
internal_cmd = self.api.Command.vault_add_internal
vault_type = internal_cmd.params.ipavaulttype.default
password = options.get('password')
password_file = options.get('password_file')
public_key = options.get('ipavaultpublickey')
public_key_file = options.get('public_key_file')
# don't send these parameters to server
if 'password' in options:
del options['password']
if 'password_file' in options:
del options['password_file']
if 'public_key_file' in options:
del options['public_key_file']
if vault_type != u'symmetric' and (password or password_file):
raise errors.MutuallyExclusiveError(
reason=_('Password can be specified only for '
'symmetric vault')
)
if vault_type != u'asymmetric' and (public_key or public_key_file):
raise errors.MutuallyExclusiveError(
reason=_('Public key can be specified only for '
'asymmetric vault')
)
if self.api.env.in_server:
backend = self.api.Backend.ldap2
else:
backend = self.api.Backend.rpcclient
if not backend.isconnected():
backend.connect()
if vault_type == u'standard':
pass
elif vault_type == u'symmetric':
# get password
if password and password_file:
raise errors.MutuallyExclusiveError(
reason=_('Password specified multiple times'))
elif password:
pass
elif password_file:
password = validated_read('password-file',
password_file,
encoding='utf-8')
password = password.rstrip('\n')
else:
password = get_new_password()
# generate vault salt
options['ipavaultsalt'] = os.urandom(16)
elif vault_type == u'asymmetric':
# get new vault public key
if public_key and public_key_file:
raise errors.MutuallyExclusiveError(
reason=_('Public key specified multiple times'))
elif public_key:
pass
elif public_key_file:
public_key = validated_read('public-key-file',
public_key_file,
mode='rb')
# store vault public key
options['ipavaultpublickey'] = public_key
else:
raise errors.ValidationError(
name='ipavaultpublickey',
error=_('Missing vault public key'))
# validate public key and prevent users from accidentally
# sending a private key to the server.
try:
load_pem_public_key(
data=public_key,
backend=default_backend()
)
except ValueError as e:
raise errors.ValidationError(
name='ipavaultpublickey',
error=_('Invalid or unsupported vault public key: %s') % e,
)
# create vault
response = self.api.Command.vault_add_internal(*args, **options)
# prepare parameters for archival
opts = options.copy()
if 'description' in opts:
del opts['description']
if 'ipavaulttype' in opts:
del opts['ipavaulttype']
if vault_type == u'symmetric':
opts['password'] = password
del opts['ipavaultsalt']
elif vault_type == u'asymmetric':
del opts['ipavaultpublickey']
# archive blank data
self.api.Command.vault_archive(*args, **opts)
return response
@register(no_fail=True)
class _fake_vault_mod_internal(Method):
name = 'vault_mod_internal'
NO_CLI = True
@register()
class vault_mod(Local):
__doc__ = _('Modify a vault.')
takes_options = (
Flag(
'change_password?',
doc=_('Change password'),
),
Str(
'old_password?',
cli_name='old_password',
doc=_('Old vault password'),
),
Str( # TODO: use File parameter
'old_password_file?',
cli_name='old_password_file',
doc=_('File containing the old vault password'),
),
Str(
'new_password?',
cli_name='new_password',
doc=_('New vault password'),
),
Str( # TODO: use File parameter
'new_password_file?',
cli_name='new_password_file',
doc=_('File containing the new vault password'),
),
Bytes(
'private_key?',
cli_name='private_key',
doc=_('Old vault private key'),
),
Str( # TODO: use File parameter
'private_key_file?',
cli_name='private_key_file',
doc=_('File containing the old vault private key'),
),
Str( # TODO: use File parameter
'public_key_file?',
cli_name='public_key_file',
doc=_('File containing the new vault public key'),
),
)
@classmethod
def __NO_CLI_getter(cls):
return (api.Command.get_plugin('vault_mod_internal') is
_fake_vault_mod_internal)
NO_CLI = classproperty(__NO_CLI_getter)
@property
def api_version(self):
return self.api.Command.vault_mod_internal.api_version
def get_args(self):
for arg in self.api.Command.vault_mod_internal.args():
yield arg
for arg in super(vault_mod, self).get_args():
yield arg
def get_options(self):
for option in self.api.Command.vault_mod_internal.options():
if option.name != 'version':
yield option
for option in super(vault_mod, self).get_options():
yield option
def get_output_params(self):
for param in self.api.Command.vault_mod_internal.output_params():
yield param
for param in super(vault_mod, self).get_output_params():
yield param
def _iter_output(self):
return self.api.Command.vault_mod_internal.output()
def forward(self, *args, **options):
vault_type = options.pop('ipavaulttype', False)
salt = options.pop('ipavaultsalt', False)
change_password = options.pop('change_password', False)
old_password = options.pop('old_password', None)
old_password_file = options.pop('old_password_file', None)
new_password = options.pop('new_password', None)
new_password_file = options.pop('new_password_file', None)
old_private_key = options.pop('private_key', None)
old_private_key_file = options.pop('private_key_file', None)
new_public_key = options.pop('ipavaultpublickey', None)
new_public_key_file = options.pop('public_key_file', None)
if self.api.env.in_server:
backend = self.api.Backend.ldap2
else:
backend = self.api.Backend.rpcclient
if not backend.isconnected():
backend.connect()
# determine the vault type based on parameters specified
if vault_type:
pass
elif change_password or new_password or new_password_file or salt:
vault_type = u'symmetric'
elif new_public_key or new_public_key_file:
vault_type = u'asymmetric'
# if vault type is specified, retrieve existing secret
if vault_type:
opts = options.copy()
opts.pop('description', None)
opts['password'] = old_password
opts['password_file'] = old_password_file
opts['private_key'] = old_private_key
opts['private_key_file'] = old_private_key_file
response = self.api.Command.vault_retrieve(*args, **opts)
data = response['result']['data']
opts = options.copy()
# if vault type is specified, update crypto attributes
if vault_type:
opts['ipavaulttype'] = vault_type
if vault_type == u'standard':
opts['ipavaultsalt'] = None
opts['ipavaultpublickey'] = None
elif vault_type == u'symmetric':
if salt:
opts['ipavaultsalt'] = salt
else:
opts['ipavaultsalt'] = os.urandom(16)
opts['ipavaultpublickey'] = None
elif vault_type == u'asymmetric':
# get new vault public key
if new_public_key and new_public_key_file:
raise errors.MutuallyExclusiveError(
reason=_('New public key specified multiple times'))
elif new_public_key:
pass
elif new_public_key_file:
new_public_key = validated_read('public_key_file',
new_public_key_file,
mode='rb')
else:
raise errors.ValidationError(
name='ipavaultpublickey',
error=_('Missing new vault public key'))
opts['ipavaultsalt'] = None
opts['ipavaultpublickey'] = new_public_key
response = self.api.Command.vault_mod_internal(*args, **opts)
# if vault type is specified, rearchive existing secret
if vault_type:
opts = options.copy()
opts.pop('description', None)
opts['data'] = data
opts['password'] = new_password
opts['password_file'] = new_password_file
opts['override_password'] = True
self.api.Command.vault_archive(*args, **opts)
return response
class _TransportCertCache(collections.MutableMapping):
def __init__(self):
self._dirname = os.path.join(
USER_CACHE_PATH, 'ipa', 'kra-transport-certs')
self._transport_certs = {}
def _get_filename(self, domain):
basename = DNSName(domain).ToASCII() + '.pem'
return os.path.join(self._dirname, basename)
def __getitem__(self, domain):
try:
transport_cert = self._transport_certs[domain]
except KeyError:
transport_cert = None
filename = self._get_filename(domain)
try:
try:
transport_cert = x509.load_certificate_from_file(filename)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
except Exception:
logger.warning("Failed to load %s: %s", filename,
exc_info=True)
if transport_cert is None:
raise KeyError(domain)
self._transport_certs[domain] = transport_cert
return transport_cert
def __setitem__(self, domain, transport_cert):
filename = self._get_filename(domain)
transport_cert_der = (
transport_cert.public_bytes(serialization.Encoding.DER))
try:
try:
os.makedirs(self._dirname)
except EnvironmentError as e:
if e.errno != errno.EEXIST:
raise
fd, tmpfilename = tempfile.mkstemp(dir=self._dirname)
os.close(fd)
x509.write_certificate(transport_cert_der, tmpfilename)
os.rename(tmpfilename, filename)
except Exception:
logger.warning("Failed to save %s", filename, exc_info=True)
self._transport_certs[domain] = transport_cert
def __delitem__(self, domain):
filename = self._get_filename(domain)
try:
os.unlink(filename)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
logger.warning("Failed to remove %s", filename, exc_info=True)
del self._transport_certs[domain]
def __len__(self):
return len(self._transport_certs)
def __iter__(self):
return iter(self._transport_certs)
_transport_cert_cache = _TransportCertCache()
@register(override=True, no_fail=True)
class vaultconfig_show(MethodOverride):
def forward(self, *args, **options):
file = options.get('transport_out')
# don't send these parameters to server
if 'transport_out' in options:
del options['transport_out']
response = super(vaultconfig_show, self).forward(*args, **options)
# cache transport certificate
transport_cert = x509.load_certificate(
response['result']['transport_cert'], x509.DER)
_transport_cert_cache[self.api.env.domain] = transport_cert
if file:
with open(file, 'w') as f:
f.write(response['result']['transport_cert'])
return response
class ModVaultData(Local):
def _generate_session_key(self):
key_length = max(algorithms.TripleDES.key_sizes)
algo = algorithms.TripleDES(os.urandom(key_length // 8))
return algo
def _do_internal(self, algo, transport_cert, raise_unexpected,
*args, **options):
public_key = transport_cert.public_key()
# wrap session key with transport certificate
wrapped_session_key = public_key.encrypt(
algo.key,
padding.PKCS1v15()
)
options['session_key'] = wrapped_session_key
name = self.name + '_internal'
try:
return self.api.Command[name](*args, **options)
except errors.NotFound:
raise
except (errors.InternalError,
errors.ExecutionError,
errors.GenericError):
_transport_cert_cache.pop(self.api.env.domain, None)
if raise_unexpected:
raise
def internal(self, algo, *args, **options):
"""
Calls the internal counterpart of the command.
"""
domain = self.api.env.domain
# try call with cached transport certificate
transport_cert = _transport_cert_cache.get(domain)
if transport_cert is not None:
result = self._do_internal(algo, transport_cert, False,
*args, **options)
if result is not None:
return result
# retrieve and cache transport certificate
self.api.Command.vaultconfig_show()
transport_cert = _transport_cert_cache[domain]
# call with the retrieved transport certificate
return self._do_internal(algo, transport_cert, True,
*args, **options)
@register(no_fail=True)
class _fake_vault_archive_internal(Method):
name = 'vault_archive_internal'
NO_CLI = True
@register()
class vault_archive(ModVaultData):
__doc__ = _('Archive data into a vault.')
takes_options = (
Bytes(
'data?',
doc=_('Binary data to archive'),
),
Str( # TODO: use File parameter
'in?',
doc=_('File containing data to archive'),
),
Str(
'password?',
cli_name='password',
doc=_('Vault password'),
),
Str( # TODO: use File parameter
'password_file?',
cli_name='password_file',
doc=_('File containing the vault password'),
),
Flag(
'override_password?',
doc=_('Override existing password'),
),
)
@classmethod
def __NO_CLI_getter(cls):
return (api.Command.get_plugin('vault_archive_internal') is
_fake_vault_archive_internal)
NO_CLI = classproperty(__NO_CLI_getter)
@property
def api_version(self):
return self.api.Command.vault_archive_internal.api_version
def get_args(self):
for arg in self.api.Command.vault_archive_internal.args():
yield arg
for arg in super(vault_archive, self).get_args():
yield arg
def get_options(self):
for option in self.api.Command.vault_archive_internal.options():
if option.name not in ('nonce',
'session_key',
'vault_data',
'version'):
yield option
for option in super(vault_archive, self).get_options():
yield option
def get_output_params(self):
for param in self.api.Command.vault_archive_internal.output_params():
yield param
for param in super(vault_archive, self).get_output_params():
yield param
def _iter_output(self):
return self.api.Command.vault_archive_internal.output()
def _wrap_data(self, algo, json_vault_data):
"""Encrypt data with wrapped session key and transport cert
:param bytes algo: wrapping algorithm instance
:param bytes json_vault_data: dumped vault data
:return:
"""
nonce = os.urandom(algo.block_size // 8)
# wrap vault_data with session key
padder = PKCS7(algo.block_size).padder()
padded_data = padder.update(json_vault_data)
padded_data += padder.finalize()
cipher = Cipher(algo, modes.CBC(nonce), backend=default_backend())
encryptor = cipher.encryptor()
wrapped_vault_data = encryptor.update(padded_data) + encryptor.finalize()
return nonce, wrapped_vault_data
def forward(self, *args, **options):
data = options.get('data')
input_file = options.get('in')
password = options.get('password')
password_file = options.get('password_file')
override_password = options.pop('override_password', False)
# don't send these parameters to server
if 'data' in options:
del options['data']
if 'in' in options:
del options['in']
if 'password' in options:
del options['password']
if 'password_file' in options:
del options['password_file']
# get data
if data and input_file:
raise errors.MutuallyExclusiveError(
reason=_('Input data specified multiple times'))
elif data:
if len(data) > MAX_VAULT_DATA_SIZE:
raise errors.ValidationError(name="data", error=_(
"Size of data exceeds the limit. Current vault data size "
"limit is %(limit)d B")
% {'limit': MAX_VAULT_DATA_SIZE})
elif input_file:
try:
stat = os.stat(input_file)
except OSError as exc:
raise errors.ValidationError(name="in", error=_(
"Cannot read file '%(filename)s': %(exc)s")
% {'filename': input_file, 'exc': exc.args[1]})
if stat.st_size > MAX_VAULT_DATA_SIZE:
raise errors.ValidationError(name="in", error=_(
"Size of data exceeds the limit. Current vault data size "
"limit is %(limit)d B")
% {'limit': MAX_VAULT_DATA_SIZE})
data = validated_read('in', input_file, mode='rb')
else:
data = ''
if self.api.env.in_server:
backend = self.api.Backend.ldap2
else:
backend = self.api.Backend.rpcclient
if not backend.isconnected():
backend.connect()
# retrieve vault info
vault = self.api.Command.vault_show(*args, **options)['result']
vault_type = vault['ipavaulttype'][0]
if vault_type == u'standard':
encrypted_key = None
elif vault_type == u'symmetric':
# get password
if password and password_file:
raise errors.MutuallyExclusiveError(
reason=_('Password specified multiple times'))
elif password:
pass
elif password_file:
password = validated_read('password-file',
password_file,
encoding='utf-8')
password = password.rstrip('\n')
else:
if override_password:
password = get_new_password()
else:
password = get_existing_password()
if not override_password:
# verify password by retrieving existing data
opts = options.copy()
opts['password'] = password
try:
self.api.Command.vault_retrieve(*args, **opts)
except errors.NotFound:
pass
salt = vault['ipavaultsalt'][0]
# generate encryption key from vault password
encryption_key = generate_symmetric_key(password, salt)
# encrypt data with encryption key
data = encrypt(data, symmetric_key=encryption_key)
encrypted_key = None
elif vault_type == u'asymmetric':
public_key = vault['ipavaultpublickey'][0].encode('utf-8')
# generate encryption key
encryption_key = base64.b64encode(os.urandom(32))
# encrypt data with encryption key
data = encrypt(data, symmetric_key=encryption_key)
# encrypt encryption key with public key
encrypted_key = encrypt(encryption_key, public_key=public_key)
else:
raise errors.ValidationError(
name='vault_type',
error=_('Invalid vault type'))
vault_data = {
'data': base64.b64encode(data).decode('utf-8')
}
if encrypted_key:
vault_data[u'encrypted_key'] = base64.b64encode(encrypted_key)\
.decode('utf-8')
json_vault_data = json.dumps(vault_data).encode('utf-8')
# generate session key
algo = self._generate_session_key()
# wrap vault data
nonce, wrapped_vault_data = self._wrap_data(algo, json_vault_data)
options.update(
nonce=nonce,
vault_data=wrapped_vault_data
)
return self.internal(algo, *args, **options)
@register(no_fail=True)
class _fake_vault_retrieve_internal(Method):
name = 'vault_retrieve_internal'
NO_CLI = True
@register()
class vault_retrieve(ModVaultData):
__doc__ = _('Retrieve a data from a vault.')
takes_options = (
Str(
'out?',
doc=_('File to store retrieved data'),
),
Str(
'password?',
cli_name='password',
doc=_('Vault password'),
),
Str( # TODO: use File parameter
'password_file?',
cli_name='password_file',
doc=_('File containing the vault password'),
),
Bytes(
'private_key?',
cli_name='private_key',
doc=_('Vault private key'),
),
Str( # TODO: use File parameter
'private_key_file?',
cli_name='private_key_file',
doc=_('File containing the vault private key'),
),
)
has_output_params = (
Bytes(
'data',
label=_('Data'),
),
)
@classmethod
def __NO_CLI_getter(cls):
return (api.Command.get_plugin('vault_retrieve_internal') is
_fake_vault_retrieve_internal)
NO_CLI = classproperty(__NO_CLI_getter)
@property
def api_version(self):
return self.api.Command.vault_retrieve_internal.api_version
def get_args(self):
for arg in self.api.Command.vault_retrieve_internal.args():
yield arg
for arg in super(vault_retrieve, self).get_args():
yield arg
def get_options(self):
for option in self.api.Command.vault_retrieve_internal.options():
if option.name not in ('session_key', 'version'):
yield option
for option in super(vault_retrieve, self).get_options():
yield option
def get_output_params(self):
for param in self.api.Command.vault_retrieve_internal.output_params():
yield param
for param in super(vault_retrieve, self).get_output_params():
yield param
def _iter_output(self):
return self.api.Command.vault_retrieve_internal.output()
def _unwrap_response(self, algo, nonce, vault_data):
cipher = Cipher(algo, modes.CBC(nonce), backend=default_backend())
# decrypt
decryptor = cipher.decryptor()
padded_data = decryptor.update(vault_data)
padded_data += decryptor.finalize()
# remove padding
unpadder = PKCS7(algo.block_size).unpadder()
json_vault_data = unpadder.update(padded_data)
json_vault_data += unpadder.finalize()
# load JSON
return json.loads(json_vault_data.decode('utf-8'))
def forward(self, *args, **options):
output_file = options.get('out')
password = options.get('password')
password_file = options.get('password_file')
private_key = options.get('private_key')
private_key_file = options.get('private_key_file')
# don't send these parameters to server
if 'out' in options:
del options['out']
if 'password' in options:
del options['password']
if 'password_file' in options:
del options['password_file']
if 'private_key' in options:
del options['private_key']
if 'private_key_file' in options:
del options['private_key_file']
if self.api.env.in_server:
backend = self.api.Backend.ldap2
else:
backend = self.api.Backend.rpcclient
if not backend.isconnected():
backend.connect()
# retrieve vault info
vault = self.api.Command.vault_show(*args, **options)['result']
vault_type = vault['ipavaulttype'][0]
# generate session key
algo = self._generate_session_key()
# send retrieval request to server
response = self.internal(algo, *args, **options)
# unwrap data with session key
vault_data = self._unwrap_response(
algo,
response['result']['nonce'],
response['result']['vault_data']
)
del algo
data = base64.b64decode(vault_data[u'data'].encode('utf-8'))
encrypted_key = None
if 'encrypted_key' in vault_data:
encrypted_key = base64.b64decode(vault_data[u'encrypted_key']
.encode('utf-8'))
if vault_type == u'standard':
pass
elif vault_type == u'symmetric':
salt = vault['ipavaultsalt'][0]
# get encryption key from vault password
if password and password_file:
raise errors.MutuallyExclusiveError(
reason=_('Password specified multiple times'))
elif password:
pass
elif password_file:
password = validated_read('password-file',
password_file,
encoding='utf-8')
password = password.rstrip('\n')
else:
password = get_existing_password()
# generate encryption key from password
encryption_key = generate_symmetric_key(password, salt)
# decrypt data with encryption key
data = decrypt(data, symmetric_key=encryption_key)
elif vault_type == u'asymmetric':
# get encryption key with vault private key
if private_key and private_key_file:
raise errors.MutuallyExclusiveError(
reason=_('Private key specified multiple times'))
elif private_key:
pass
elif private_key_file:
private_key = validated_read('private-key-file',
private_key_file,
mode='rb')
else:
raise errors.ValidationError(
name='private_key',
error=_('Missing vault private key'))
# decrypt encryption key with private key
encryption_key = decrypt(encrypted_key, private_key=private_key)
# decrypt data with encryption key
data = decrypt(data, symmetric_key=encryption_key)
else:
raise errors.ValidationError(
name='vault_type',
error=_('Invalid vault type'))
if output_file:
with open(output_file, 'w') as f:
f.write(data)
else:
response['result'] = {'data': data}
return response