pgadmin4/web/pgadmin/utils/paths.py
2024-01-01 14:13:48 +05:30

179 lines
5.1 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2024, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
#########################################################################
"""This file contains functions fetching different utility paths."""
import os
from flask import current_app, url_for
from flask_security import current_user
from werkzeug.exceptions import InternalServerError
from pgadmin.utils.constants import MY_STORAGE
from pgadmin.model import User
PGADMIN_PATH = '~/.pgadmin/'
def preprocess_username(un):
ret_un = un
if len(ret_un) == 0 or ret_un[0].isdigit():
ret_un = 'pga_user_' + un
ret_un = ret_un.replace('@', '_') \
.replace('/', 'slash') \
.replace('\\', 'slash')
return ret_un
def get_storage_directory(user=current_user, shared_storage=''):
# Don't move this import statement to the top of the file,
# it throws circular import error.
import config
if config.SERVER_MODE is not True:
return None
is_shared_storage = False
if shared_storage != MY_STORAGE and shared_storage:
is_shared_storage = True
selected_dir = [sdir for sdir in config.SHARED_STORAGE if
sdir['name'] == shared_storage]
storage_dir = None
if len(selected_dir) > 0:
the_dir = selected_dir[0]['path']
storage_dir = the_dir
else:
storage_dir = getattr(
config, 'STORAGE_DIR',
os.path.join(
os.path.realpath(
os.path.expanduser(PGADMIN_PATH)
), 'storage'
)
)
if storage_dir is None:
return None
username = preprocess_username(user.username.split('@')[0])
# Figure out the old-style storage directory name
old_storage_dir = os.path.join(
storage_dir.decode('utf-8') if hasattr(storage_dir, 'decode')
else storage_dir,
username
)
username = preprocess_username(user.username)
if is_shared_storage:
# Figure out the new style storage directory name
storage_dir = os.path.join(
storage_dir.decode('utf-8') if hasattr(storage_dir, 'decode')
else storage_dir
)
else:
# Figure out the new style storage directory name
storage_dir = os.path.join(
storage_dir.decode('utf-8') if hasattr(storage_dir, 'decode')
else storage_dir,
username
)
# Rename an old-style storage directory, if the new style doesn't exist
if os.path.exists(old_storage_dir) and not os.path.exists(storage_dir):
current_app.logger.warning(
'Renaming storage directory %s to %s.',
old_storage_dir, storage_dir
)
os.rename(old_storage_dir, storage_dir)
if not os.path.exists(storage_dir):
os.makedirs(storage_dir, int('700', 8))
return storage_dir
def init_app():
# Don't move this import statement to the top of the file,
# it throws circular import error.
import config
if config.SERVER_MODE is not True:
return None
storage_dir = getattr(
config, 'STORAGE_DIR',
os.path.join(
os.path.realpath(
os.path.expanduser(PGADMIN_PATH)
), 'storage'
)
)
if storage_dir and not os.path.isdir(storage_dir):
if os.path.exists(storage_dir):
raise InternalServerError(
'The path specified for the storage directory is not a '
'directory.'
)
os.makedirs(storage_dir, int('700', 8))
if storage_dir and not os.access(storage_dir, os.W_OK | os.R_OK):
raise InternalServerError(
'The user does not have permission to read and write to the '
'specified storage directory.'
)
def get_cookie_path():
cookie_root_path = '/'
pgadmin_root_path = url_for('browser.index')
if pgadmin_root_path != '/browser/':
cookie_root_path = pgadmin_root_path.replace(
'/browser/', ''
)
return cookie_root_path
def create_users_storage_directory():
"""
This function is used to iterate through all the users and
create users directory if not already created.
"""
# Don't move this import statement to the top of the file,
# it throws circular import error.
import config
if not config.SERVER_MODE:
return None
users = User.query.all()
for usr in users:
username = preprocess_username(usr.username)
storage_dir = getattr(
config, 'STORAGE_DIR',
os.path.join(
os.path.realpath(
os.path.expanduser(PGADMIN_PATH)
), 'storage'
)
)
if storage_dir is None:
return None
storage_dir = os.path.join(
storage_dir.decode('utf-8') if hasattr(storage_dir, 'decode')
else storage_dir, username
)
if not os.path.exists(storage_dir):
os.makedirs(storage_dir, int('700', 8))