pgadmin4/web/pgadmin/misc/cloud/azure/azure_cache.py
2022-06-28 11:28:55 +05:30

140 lines
5.5 KiB
Python

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import logging
import os
import sys
from typing import TYPE_CHECKING
import config
import six
if TYPE_CHECKING:
from typing import Any
import msal_extensions
_LOGGER = logging.getLogger(__name__)
class TokenCachePersistenceOptions(object):
"""Options for persistent token caching.
Most credentials accept an instance of this class to configure persistent
token caching. The default values configure a credential to use a cache
shared with Microsoft developer tools and
:class:`~azure.identity.SharedTokenCacheCredential`.
To isolate a credential's data from other applications,
specify a `name` for the cache.
By default, the cache is encrypted with the current platform's user data
protection API, and will raise an error when this is not available.
To configure the cache to fall back to an unencrypted file instead
of raising an error, specify `allow_unencrypted_storage=True`.
.. warning:: The cache contains authentication secrets. If the cache is
not encrypted, protecting it is the application's responsibility.
A breach of its contents will fully compromise accounts.
.. literalinclude:: ../tests/test_persistent_cache.py
:start-after: [START snippet]
:end-before: [END snippet]
:language: python
:caption: Configuring a credential for persistent caching
:dedent: 8
:keyword str name: name of the cache, used to isolate its data from other
applications. Defaults to the name of the cache shared by Microsoft
dev tools and :class:`~azure.identity.SharedTokenCacheCredential`.
:keyword bool allow_unencrypted_storage: whether the cache should fall
back to storing its data in plain text when
encryption isn't possible. False by default. Setting this to
True does not disable encryption. The cache will
always try to encrypt its data.
"""
def __init__(self, **kwargs):
# type: (**Any) -> None
self.allow_unencrypted_storage = \
kwargs.get("allow_unencrypted_storage", False)
self.name = kwargs.get("name", "msal.cache")
self.cache_location = kwargs.get("cache_location", None)
def load_persistent_cache(options):
# type:
# (TokenCachePersistenceOptions) -> msal_extensions.PersistedTokenCache
import msal_extensions
persistence = _get_persistence(
allow_unencrypted=options.allow_unencrypted_storage,
account_name="MSALCache",
cache_name=options.name,
cache_location=options.cache_location
)
return msal_extensions.PersistedTokenCache(persistence)
def _get_persistence(
allow_unencrypted, account_name, cache_name, cache_location):
"""Get an msal_extensions persistence instance for the current platform.
On Windows the cache is a file protected by the Data Protection API.
On Linux and macOS the cache is stored by
libsecret and Keychain, respectively. On those platforms the cache uses
the modified timestamp of a file on disk to
decide whether to reload the cache.
:param bool allow_unencrypted: when True, the cache will be kept in
plaintext should encryption be impossible in the
current environment
"""
import msal_extensions
if cache_location is None:
cache_location = config.AZURE_CREDENTIAL_CACHE_DIR + '/'
cache_file_path = os.path.join(cache_location, cache_name)
if sys.platform.startswith("win") and "LOCALAPPDATA" in os.environ:
return \
msal_extensions.FilePersistenceWithDataProtection(cache_file_path)
if sys.platform.startswith("darwin"):
# the cache uses this file's modified timestamp
# to decide whether to reload
return msal_extensions.KeychainPersistence(
cache_file_path,
"Microsoft.Developer.IdentityService",
account_name)
if sys.platform.startswith("linux"):
# The cache uses this file's modified timestamp to decide whether
# to reload. Note this path is the same as that of the plaintext
# fallback: a new encrypted cache will stomp an unencrypted cache.
try:
return msal_extensions.LibsecretPersistence(
cache_location, cache_name,
{"MsalClientID": "Microsoft.Developer.IdentityService"},
label=account_name
)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.debug(
'msal-extensions is unable to encrypt '
'a persistent cache: "%s"', ex, exc_info=True)
if not allow_unencrypted:
error = ValueError(
"Cache encryption is impossible because libsecret "
"dependencies are not installed or are unusable,"
" for example because no display is available "
"(as in an SSH session). The chained exception has"
' more information. Specify '
'"allow_unencrypted_storage=True" to store'
' the cache unencrypted'
" instead of raising this exception."
)
six.raise_from(error, ex)
return msal_extensions.FilePersistence(cache_file_path)
raise NotImplementedError("A persistent cache is not "
"available in this environment.")