mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-01-09 23:54:09 -06:00
8857f0d179
1) String literals should not be duplicated. 2) Prefer using an optional chain expression instead, as it's more concise and easier to read. 3) Expected the Promise rejection reason to be an Error.
1163 lines
38 KiB
Python
1163 lines
38 KiB
Python
##########################################################################
|
|
#
|
|
# pgAdmin 4 - PostgreSQL Tools
|
|
#
|
|
# Copyright (C) 2013 - 2024, The pgAdmin Development Team
|
|
# This software is released under the PostgreSQL Licence
|
|
#
|
|
##########################################################################
|
|
|
|
"""Implements File Manager"""
|
|
|
|
import os
|
|
import os.path
|
|
import secrets
|
|
import string
|
|
import time
|
|
from urllib.parse import unquote
|
|
from sys import platform as _platform
|
|
from flask_security import current_user
|
|
from pgadmin.utils.constants import ACCESS_DENIED_MESSAGE, TWO_PARAM_STRING
|
|
import config
|
|
import codecs
|
|
import pathlib
|
|
|
|
import json
|
|
from flask import render_template, Response, session, request as req, \
|
|
url_for, current_app, send_from_directory
|
|
from flask_babel import gettext
|
|
from pgadmin.user_login_check import pga_login_required
|
|
from pgadmin.utils import PgAdminModule
|
|
from pgadmin.utils import get_storage_directory
|
|
from pgadmin.utils.ajax import make_json_response, unauthorized, \
|
|
internal_server_error
|
|
from pgadmin.utils.preferences import Preferences
|
|
from pgadmin.utils.constants import PREF_LABEL_OPTIONS, MIMETYPE_APP_JS, \
|
|
MY_STORAGE
|
|
from pgadmin.settings.utils import get_file_type_setting
|
|
|
|
# Checks if platform is Windows
|
|
if _platform == "win32":
|
|
import ctypes
|
|
oldmode = ctypes.c_uint()
|
|
kernel32 = ctypes.WinDLL('kernel32')
|
|
SEM_FAILCRITICALERRORS = 1
|
|
SEM_NOOPENFILEERRORBOX = 0x8000
|
|
SEM_FAIL = SEM_NOOPENFILEERRORBOX | SEM_FAILCRITICALERRORS
|
|
file_root = ""
|
|
|
|
MODULE_NAME = 'file_manager'
|
|
global transid
|
|
|
|
path_exists = os.path.exists
|
|
split_path = os.path.split
|
|
encode_json = json.JSONEncoder().encode
|
|
|
|
|
|
# utility functions
|
|
# convert bytes type to human readable format
|
|
def sizeof_fmt(num, suffix='B'):
|
|
for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']:
|
|
if abs(num) < 1024.0:
|
|
return "%3.1f %s%s" % (num, unit, suffix)
|
|
num /= 1024.0
|
|
return "%.1f %s%s" % (num, 'Y', suffix)
|
|
|
|
|
|
# return size of file
|
|
def getsize(path):
|
|
st = os.stat(path)
|
|
return st.st_size
|
|
|
|
|
|
def getdrivesize(path):
|
|
if _platform == "win32":
|
|
free_bytes = ctypes.c_ulonglong(0)
|
|
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
|
|
ctypes.c_wchar_p(path), None, None, ctypes.pointer(free_bytes))
|
|
return free_bytes.value
|
|
|
|
|
|
# split extension for files
|
|
def splitext(path):
|
|
for ext in ['.tar.gz', '.tar.bz2']:
|
|
if path.endswith(ext):
|
|
path, ext = path[:-len(ext)], path[-len(ext):]
|
|
break
|
|
else:
|
|
path, ext = os.path.splitext(path)
|
|
return ext[1:]
|
|
|
|
|
|
# check if file is hidden in windows platform
|
|
def is_folder_hidden(filepath):
|
|
if _platform == "win32":
|
|
try:
|
|
attrs = ctypes.windll.kernel32.GetFileAttributesW(filepath)
|
|
assert attrs != -1
|
|
result = bool(attrs & 2)
|
|
except (AttributeError, AssertionError):
|
|
result = False
|
|
return result
|
|
else:
|
|
return os.path.basename(filepath).startswith('.')
|
|
|
|
|
|
class FileManagerModule(PgAdminModule):
|
|
"""
|
|
FileManager lists files and folders and does
|
|
following operations:
|
|
- File selection
|
|
- Folder selection
|
|
- Open file
|
|
- Create File
|
|
and also supports:
|
|
- Rename file
|
|
- Delete file
|
|
- Upload file
|
|
- Create folder
|
|
"""
|
|
|
|
LABEL = gettext("Storage")
|
|
|
|
def get_own_menuitems(self):
|
|
return {
|
|
'file_items': []
|
|
}
|
|
|
|
def get_exposed_url_endpoints(self):
|
|
"""
|
|
Returns:
|
|
list: a list of url endpoints exposed to the client.
|
|
"""
|
|
return [
|
|
'file_manager.init',
|
|
'file_manager.filemanager',
|
|
'file_manager.index',
|
|
'file_manager.delete_trans_id',
|
|
'file_manager.save_last_dir',
|
|
'file_manager.save_file_dialog_view',
|
|
'file_manager.save_show_hidden_file_option'
|
|
]
|
|
|
|
def get_file_size_preference(self):
|
|
return self.file_upload_size
|
|
|
|
def register_preferences(self):
|
|
# Register 'file upload size' preference
|
|
self.file_upload_size = self.preference.register(
|
|
'options', 'file_upload_size',
|
|
gettext("Maximum file upload size (MB)"), 'integer', 50,
|
|
category_label=PREF_LABEL_OPTIONS
|
|
)
|
|
self.last_directory_visited = self.preference.register(
|
|
'options', 'last_directory_visited',
|
|
gettext("Last directory visited"), 'text', '/',
|
|
category_label=PREF_LABEL_OPTIONS
|
|
)
|
|
self.last_storage = self.preference.register(
|
|
'options', 'last_storage',
|
|
gettext("Last storage"), 'text', '',
|
|
category_label=PREF_LABEL_OPTIONS,
|
|
hidden=True
|
|
)
|
|
self.file_dialog_view = self.preference.register(
|
|
'options', 'file_dialog_view',
|
|
gettext("File dialog view"), 'select', 'list',
|
|
category_label=PREF_LABEL_OPTIONS,
|
|
options=[{'label': gettext('List'), 'value': 'list'},
|
|
{'label': gettext('Grid'), 'value': 'grid'}],
|
|
control_props={
|
|
'allowClear': False,
|
|
'tags': False
|
|
},
|
|
)
|
|
self.show_hidden_files = self.preference.register(
|
|
'options', 'show_hidden_files',
|
|
gettext("Show hidden files and folders?"), 'boolean', False,
|
|
category_label=PREF_LABEL_OPTIONS
|
|
)
|
|
|
|
|
|
# Initialise the module
|
|
blueprint = FileManagerModule(MODULE_NAME, __name__)
|
|
|
|
|
|
@blueprint.route("/", endpoint='index')
|
|
@pga_login_required
|
|
def index():
|
|
return bad_request(
|
|
errormsg=gettext("This URL cannot be called directly.")
|
|
)
|
|
|
|
|
|
@blueprint.route("/utility.js")
|
|
@pga_login_required
|
|
def utility():
|
|
"""render the required javascript"""
|
|
return Response(response=render_template(
|
|
"file_manager/js/utility.js", _=gettext),
|
|
status=200,
|
|
mimetype=MIMETYPE_APP_JS)
|
|
|
|
|
|
@blueprint.route(
|
|
"/init", methods=["POST"], endpoint='init'
|
|
)
|
|
@pga_login_required
|
|
def init_filemanager():
|
|
if len(req.data) != 0:
|
|
configs = json.loads(req.data)
|
|
trans_id = Filemanager.create_new_transaction(configs)
|
|
data = Filemanager.get_trasaction_selection(trans_id)
|
|
pref = Preferences.module('file_manager')
|
|
file_dialog_view = pref.preference('file_dialog_view').get()
|
|
if isinstance(file_dialog_view, list):
|
|
file_dialog_view = file_dialog_view[0]
|
|
|
|
last_selected_format = get_file_type_setting(data['supported_types'])
|
|
# in some cases, the setting may not match with available types
|
|
if last_selected_format not in data['supported_types']:
|
|
last_selected_format = data['supported_types'][0]
|
|
|
|
res_data = {
|
|
'transId': trans_id,
|
|
"options": {
|
|
"culture": "en",
|
|
"lang": "py",
|
|
"defaultViewMode": file_dialog_view,
|
|
"autoload": True,
|
|
"showFullPath": False,
|
|
"dialog_type": data['dialog_type'],
|
|
"show_hidden_files":
|
|
pref.preference('show_hidden_files').get(),
|
|
"fileRoot": data['fileroot'],
|
|
"capabilities": data['capabilities'],
|
|
"allowed_file_types": data['supported_types'],
|
|
"platform_type": data['platform_type'],
|
|
"show_volumes": data['show_volumes'],
|
|
"homedir": data['homedir'],
|
|
'storage_folder': data['storage_folder'],
|
|
"last_selected_format": last_selected_format
|
|
},
|
|
"security": {
|
|
"uploadPolicy": data['security']['uploadPolicy'],
|
|
"uploadRestrictions": data['security']['uploadRestrictions'],
|
|
},
|
|
"upload": {
|
|
"multiple": data['upload']['multiple'],
|
|
"number": 20,
|
|
"fileSizeLimit": data['upload']['fileSizeLimit'],
|
|
"imagesOnly": False
|
|
}
|
|
}
|
|
|
|
return make_json_response(data=res_data)
|
|
|
|
|
|
@blueprint.route(
|
|
"/delete_trans_id/<int:trans_id>",
|
|
methods=["DELETE"], endpoint='delete_trans_id'
|
|
)
|
|
@pga_login_required
|
|
def delete_trans_id(trans_id):
|
|
Filemanager.release_transaction(trans_id)
|
|
return make_json_response(
|
|
data={'status': True}
|
|
)
|
|
|
|
|
|
@blueprint.route(
|
|
"/save_last_dir/<int:trans_id>", methods=["POST"], endpoint='save_last_dir'
|
|
)
|
|
@pga_login_required
|
|
def save_last_directory_visited(trans_id):
|
|
blueprint.last_directory_visited.set(req.json['path'])
|
|
blueprint.last_storage.set(req.json['storage_folder'])
|
|
return make_json_response(status=200)
|
|
|
|
|
|
@blueprint.route(
|
|
"/save_file_dialog_view/<int:trans_id>", methods=["POST"],
|
|
endpoint='save_file_dialog_view'
|
|
)
|
|
@pga_login_required
|
|
def save_file_dialog_view(trans_id):
|
|
blueprint.file_dialog_view.set(req.json['view'])
|
|
return make_json_response(status=200)
|
|
|
|
|
|
@blueprint.route(
|
|
"/save_show_hidden_file_option/<int:trans_id>", methods=["PUT"],
|
|
endpoint='save_show_hidden_file_option'
|
|
)
|
|
@pga_login_required
|
|
def save_show_hidden_file_option(trans_id):
|
|
blueprint.show_hidden_files.set(req.json['show_hidden'])
|
|
return make_json_response(status=200)
|
|
|
|
|
|
class Filemanager():
|
|
"""FileManager Class."""
|
|
|
|
# Stores list of dict for filename & its encoding
|
|
loaded_file_encoding_list = []
|
|
|
|
ERROR_NOT_ALLOWED = {
|
|
'Error': gettext('Not allowed'),
|
|
'Code': 0
|
|
}
|
|
|
|
def __init__(self, trans_id, ss=''):
|
|
self.trans_id = trans_id
|
|
self.dir = get_storage_directory()
|
|
self.shared_dir = get_storage_directory(shared_storage=ss)
|
|
|
|
if self.dir is not None and isinstance(self.dir, list):
|
|
self.dir = ""
|
|
|
|
@staticmethod
|
|
def get_closest_parent(storage_dir, last_dir):
|
|
"""
|
|
Check if path exists and if not then get closest parent which exists
|
|
:param storage_dir: Base dir
|
|
:param last_dir: check dir
|
|
:return: exist dir
|
|
"""
|
|
if len(last_dir) > 1 and \
|
|
(last_dir.endswith('/') or last_dir.endswith('\\')):
|
|
last_dir = last_dir[:-1]
|
|
while last_dir:
|
|
if os.path.exists(storage_dir or '' + last_dir):
|
|
break
|
|
index = max(last_dir.rfind('\\'), last_dir.rfind('/')) \
|
|
if _platform == 'win32' else last_dir.rfind('/')
|
|
last_dir = last_dir[0:index]
|
|
|
|
if _platform == 'win32':
|
|
if not last_dir.endswith('\\'):
|
|
last_dir += "\\"
|
|
|
|
return last_dir
|
|
|
|
if not last_dir.endswith('/'):
|
|
last_dir += "/"
|
|
|
|
return last_dir
|
|
|
|
@staticmethod
|
|
def create_new_transaction(params):
|
|
"""
|
|
It will also create a unique transaction id and
|
|
store the information into session variable.
|
|
Args:
|
|
capabilities: Allow/Disallow user to perform
|
|
selection, rename, delete etc.
|
|
"""
|
|
|
|
# Define configs for dialog types
|
|
# select file, select folder, create mode
|
|
Filemanager.suspend_windows_warning()
|
|
fm_type = params['dialog_type']
|
|
storage_dir = get_storage_directory()
|
|
|
|
# It is used in utitlity js to decide to
|
|
# show or hide select file type options
|
|
show_volumes = isinstance(storage_dir, list) or not storage_dir
|
|
supp_types = allow_upload_files = params.get('supported_types', [])
|
|
|
|
# tuples with (capabilities, files_only, folders_only, title)
|
|
capability_map = {
|
|
'select_file': (
|
|
['select_file', 'rename', 'upload', 'delete'],
|
|
True,
|
|
False,
|
|
gettext("Select File")
|
|
),
|
|
'select_folder': (
|
|
['select_folder', 'rename', 'create'],
|
|
False,
|
|
True,
|
|
gettext("Select Folder")
|
|
),
|
|
'create_file': (
|
|
['select_file', 'rename', 'create'],
|
|
True,
|
|
False,
|
|
gettext("Create File")
|
|
),
|
|
'storage_dialog': (
|
|
['select_folder', 'select_file', 'download',
|
|
'rename', 'delete', 'upload', 'create'],
|
|
True,
|
|
False,
|
|
gettext("Storage Manager")
|
|
),
|
|
}
|
|
|
|
capabilities, files_only, folders_only, title = capability_map[fm_type]
|
|
|
|
# Using os.path.join to make sure we have trailing '/' or '\'
|
|
homedir = '/' if (config.SERVER_MODE) \
|
|
else os.path.join(os.path.expanduser('~'), '')
|
|
|
|
# get last visited directory, if not present then traverse in reverse
|
|
# order to find closest parent directory
|
|
if 'init_path' in params:
|
|
blueprint.last_directory_visited.get(params['init_path'])
|
|
last_dir = blueprint.last_directory_visited.get()
|
|
last_ss_name = blueprint.last_storage.get()
|
|
if last_ss_name and last_ss_name != MY_STORAGE \
|
|
and len(config.SHARED_STORAGE) > 0:
|
|
selected_dir = [sdir for sdir in config.SHARED_STORAGE if
|
|
sdir['name'] == last_ss_name]
|
|
last_ss = selected_dir[0]['path'] if len(
|
|
selected_dir) == 1 else storage_dir
|
|
else:
|
|
if last_ss_name != MY_STORAGE:
|
|
last_dir = '/'
|
|
blueprint.last_storage.set(MY_STORAGE)
|
|
|
|
last_ss = storage_dir
|
|
|
|
check_dir_exists = False
|
|
if last_dir is None:
|
|
last_dir = "/"
|
|
else:
|
|
check_dir_exists = True
|
|
|
|
if not config.SERVER_MODE and last_dir == "/" or last_dir == "/":
|
|
last_dir = homedir
|
|
|
|
if check_dir_exists:
|
|
last_dir = Filemanager.get_closest_parent(last_ss, last_dir)
|
|
|
|
# create configs using above configs
|
|
configs = {
|
|
"fileroot": last_dir,
|
|
"homedir": homedir,
|
|
'storage_folder': last_ss_name,
|
|
"dialog_type": fm_type,
|
|
"title": title,
|
|
"upload": {
|
|
"multiple": True
|
|
},
|
|
"capabilities": capabilities,
|
|
"security": {
|
|
"uploadPolicy": "",
|
|
"uploadRestrictions": allow_upload_files
|
|
},
|
|
"files_only": files_only,
|
|
"folders_only": folders_only,
|
|
"supported_types": supp_types,
|
|
"platform_type": _platform,
|
|
"show_volumes": show_volumes
|
|
}
|
|
|
|
# Create a unique id for the transaction
|
|
trans_id = str(secrets.choice(range(1, 9999999)))
|
|
|
|
if 'fileManagerData' not in session:
|
|
file_manager_data = dict()
|
|
else:
|
|
file_manager_data = session['fileManagerData']
|
|
|
|
file_upload_size = blueprint.get_file_size_preference().get()
|
|
configs['upload']['fileSizeLimit'] = file_upload_size
|
|
file_manager_data[trans_id] = configs
|
|
session['fileManagerData'] = file_manager_data
|
|
Filemanager.resume_windows_warning()
|
|
|
|
return trans_id
|
|
|
|
@staticmethod
|
|
def get_trasaction_selection(trans_id):
|
|
"""
|
|
This method returns the information of unique transaction
|
|
id from the session variable.
|
|
|
|
Args:
|
|
trans_id: unique transaction id
|
|
"""
|
|
file_manager_data = session['fileManagerData']
|
|
|
|
# Return from the function if transaction id not found
|
|
if str(trans_id) in file_manager_data:
|
|
return file_manager_data[str(trans_id)]
|
|
|
|
@staticmethod
|
|
def release_transaction(trans_id):
|
|
"""
|
|
This method is to remove the information of unique transaction
|
|
id from the session variable.
|
|
|
|
Args:
|
|
trans_id: unique transaction id
|
|
"""
|
|
file_manager_data = session['fileManagerData']
|
|
# Return from the function if transaction id not found
|
|
if str(trans_id) not in file_manager_data:
|
|
return make_json_response(status=200)
|
|
|
|
# Remove the information of unique transaction id
|
|
# from the session variable.
|
|
file_manager_data.pop(str(trans_id), None)
|
|
session['fileManagerData'] = file_manager_data
|
|
|
|
return make_json_response(status=200)
|
|
|
|
@staticmethod
|
|
def _get_drives_with_size(drive_name=None):
|
|
"""
|
|
This is a generic function which returns the default path for storage
|
|
manager dialog irrespective of any Platform type to list all
|
|
files and directories.
|
|
Platform windows:
|
|
if no path is given, it will list volumes, else list directory
|
|
Platform unix:
|
|
it returns path to root directory if no path is specified.
|
|
"""
|
|
def _get_drive_size(path):
|
|
try:
|
|
drive_size = getdrivesize(path)
|
|
return sizeof_fmt(drive_size)
|
|
except Exception:
|
|
return 0
|
|
|
|
if _platform == "win32":
|
|
try:
|
|
drives = []
|
|
bitmask = ctypes.windll.kernel32.GetLogicalDrives()
|
|
for letter in string.ascii_uppercase:
|
|
if bitmask & 1:
|
|
drives.append((letter, _get_drive_size(letter)))
|
|
bitmask >>= 1
|
|
if (drive_name != '' and drive_name is not None and
|
|
drive_name in drives):
|
|
letter = "{0}{1}".format(drive_name, ':')
|
|
return (letter, _get_drive_size(letter))
|
|
else:
|
|
return drives # return drives if no argument is passed
|
|
except Exception:
|
|
return [('C:', _get_drive_size('C:'))]
|
|
else:
|
|
return '/'
|
|
|
|
@staticmethod
|
|
def suspend_windows_warning():
|
|
"""
|
|
Prevents 'there is no disk in drive' waning on windows
|
|
"""
|
|
# StackOverflow Ref: https://goo.gl/9gYdef
|
|
if _platform == "win32":
|
|
kernel32.SetThreadErrorMode(SEM_FAIL, ctypes.byref(oldmode))
|
|
|
|
@staticmethod
|
|
def resume_windows_warning():
|
|
"""
|
|
Resumes waning on windows
|
|
"""
|
|
if _platform == "win32":
|
|
# Resume windows error
|
|
kernel32.SetThreadErrorMode(oldmode, ctypes.byref(oldmode))
|
|
|
|
@staticmethod
|
|
def _skip_file_extension(
|
|
file_type, supported_types, folders_only, file_extension):
|
|
"""
|
|
Used internally by get_files_in_path to check if
|
|
the file extn to be skipped
|
|
"""
|
|
return file_type is not None and file_type != "*" and (
|
|
folders_only or len(supported_types) > 0 and
|
|
file_extension not in supported_types or
|
|
file_type != file_extension)
|
|
|
|
@staticmethod
|
|
def get_files_in_path(
|
|
show_hidden_files, files_only, folders_only, supported_types,
|
|
file_type, user_dir, orig_path):
|
|
"""
|
|
Get list of files and dirs in the path
|
|
:param show_hidden_files: boolean
|
|
:param files_only: boolean
|
|
:param folders_only: boolean
|
|
:param supported_types: array of supported types
|
|
:param file_type: file type
|
|
:param user_dir: base user dir
|
|
:param orig_path: path after user dir
|
|
:return:
|
|
"""
|
|
files = []
|
|
|
|
for f in sorted(os.listdir(orig_path)):
|
|
system_path = os.path.join(os.path.join(orig_path, f))
|
|
|
|
# continue if file/folder is hidden (based on user preference)
|
|
if not show_hidden_files and is_folder_hidden(system_path):
|
|
continue
|
|
try:
|
|
user_path = os.path.join(os.path.join(user_dir, f))
|
|
created = time.ctime(os.path.getctime(system_path))
|
|
modified = time.ctime(os.path.getmtime(system_path))
|
|
file_extension = str(splitext(system_path))
|
|
except Exception:
|
|
continue
|
|
|
|
# set protected to 1 if no write or read permission
|
|
protected = 0
|
|
if (not os.access(system_path, os.R_OK) or
|
|
not os.access(system_path, os.W_OK)):
|
|
protected = 1
|
|
|
|
# list files only or folders only
|
|
if os.path.isdir(system_path):
|
|
if files_only == 'true':
|
|
continue
|
|
file_extension = "dir"
|
|
# filter files based on file_type
|
|
elif Filemanager._skip_file_extension(
|
|
file_type, supported_types, folders_only, file_extension):
|
|
continue
|
|
|
|
# create a list of files and folders
|
|
files.append({
|
|
"Filename": f,
|
|
"Path": user_path,
|
|
"file_type": file_extension,
|
|
"Protected": protected,
|
|
"Properties": {
|
|
"Date Created": created,
|
|
"Date Modified": modified,
|
|
"Size": sizeof_fmt(getsize(system_path))
|
|
}
|
|
})
|
|
|
|
return files
|
|
|
|
@staticmethod
|
|
def list_filesystem(in_dir, path, trans_data, file_type, show_hidden):
|
|
"""
|
|
It lists all file and folders within the given
|
|
directory.
|
|
"""
|
|
Filemanager.suspend_windows_warning()
|
|
is_show_hidden_files = show_hidden
|
|
|
|
path = unquote(path)
|
|
|
|
Filemanager.check_access_permission(in_dir, path)
|
|
Filemanager.resume_windows_warning()
|
|
|
|
files = []
|
|
if (_platform == "win32" and (path == '/' or path == '\\'))\
|
|
and in_dir is None:
|
|
drives = Filemanager._get_drives_with_size()
|
|
for drive, drive_size in drives:
|
|
path = file_name = "{0}:".format(drive)
|
|
files.append({
|
|
"Filename": file_name,
|
|
"Path": path,
|
|
"file_type": 'drive',
|
|
"Protected": 1 if drive_size == 0 else 0,
|
|
"Properties": {
|
|
"Date Created": "",
|
|
"Date Modified": "",
|
|
"Size": drive_size
|
|
}
|
|
})
|
|
Filemanager.resume_windows_warning()
|
|
return files
|
|
|
|
orig_path = Filemanager.get_abs_path(in_dir, path)
|
|
|
|
if not path_exists(orig_path):
|
|
Filemanager.resume_windows_warning()
|
|
return make_json_response(
|
|
status=404,
|
|
errormsg=gettext("'{0}' file does not exist.").format(path))
|
|
|
|
user_dir = path
|
|
folders_only = trans_data.get('folders_only', '')
|
|
files_only = trans_data.get('files_only', '')
|
|
supported_types = trans_data.get('supported_types', [])
|
|
|
|
orig_path = unquote(orig_path)
|
|
try:
|
|
files = Filemanager.get_files_in_path(
|
|
is_show_hidden_files, files_only, folders_only,
|
|
supported_types, file_type, user_dir, orig_path
|
|
)
|
|
except Exception as e:
|
|
Filemanager.resume_windows_warning()
|
|
err_msg = str(e)
|
|
if (hasattr(e, 'strerror') and
|
|
e.strerror == gettext('Permission denied')):
|
|
err_msg = str(e.strerror)
|
|
return unauthorized(err_msg)
|
|
Filemanager.resume_windows_warning()
|
|
return files
|
|
|
|
@staticmethod
|
|
def check_access_permission(in_dir, path, skip_permission_check=False):
|
|
if not config.SERVER_MODE or skip_permission_check:
|
|
return
|
|
|
|
in_dir = '' if in_dir is None else in_dir
|
|
orig_path = Filemanager.get_abs_path(in_dir, path)
|
|
|
|
# This translates path with relative path notations
|
|
# like ./ and ../ to absolute path.
|
|
orig_path = os.path.abspath(orig_path)
|
|
|
|
if in_dir:
|
|
if _platform == 'win32':
|
|
if in_dir[-1] == '\\' or in_dir[-1] == '/':
|
|
in_dir = in_dir[:-1]
|
|
else:
|
|
if in_dir[-1] == '/':
|
|
in_dir = in_dir[:-1]
|
|
|
|
# Do not allow user to access outside his storage dir
|
|
# in server mode.
|
|
try:
|
|
pathlib.Path(orig_path).relative_to(in_dir)
|
|
except ValueError:
|
|
raise PermissionError(gettext("Access denied ({0})").format(path))
|
|
|
|
@staticmethod
|
|
def get_abs_path(in_dir, path):
|
|
|
|
if (path.startswith('\\\\') and _platform == 'win32')\
|
|
or config.SERVER_MODE is False or in_dir is None:
|
|
return "{}".format(path)
|
|
|
|
if path == '/' or path == '\\':
|
|
if _platform == 'win32':
|
|
if in_dir.endswith('\\') or in_dir.endswith('/'):
|
|
return "{}".format(in_dir)
|
|
else:
|
|
return "{}{}".format(in_dir, '\\')
|
|
else:
|
|
if in_dir.endswith('/'):
|
|
return "{}".format(in_dir)
|
|
else:
|
|
return "{}{}".format(in_dir, '/')
|
|
|
|
if in_dir.endswith('/') or in_dir.endswith('\\'):
|
|
if path.startswith('/') or path.startswith('\\'):
|
|
return "{}{}".format(in_dir[:-1], path)
|
|
else:
|
|
return TWO_PARAM_STRING.format(in_dir, path)
|
|
else:
|
|
if path.startswith('/') or path.startswith('\\'):
|
|
return "{}{}".format(in_dir, path)
|
|
else:
|
|
return TWO_PARAM_STRING.format(in_dir, path)
|
|
|
|
def validate_request(self, capability):
|
|
"""
|
|
It validates the capability with the capabilities
|
|
stored in the session
|
|
"""
|
|
trans_data = Filemanager.get_trasaction_selection(self.trans_id)
|
|
return False if capability not in trans_data['capabilities'] else True
|
|
|
|
def getfolder(self, path=None, file_type="", show_hidden=False):
|
|
"""
|
|
Returns files and folders in give path
|
|
"""
|
|
trans_data = Filemanager.get_trasaction_selection(self.trans_id)
|
|
the_dir = None
|
|
if config.SERVER_MODE:
|
|
if self.shared_dir and len(config.SHARED_STORAGE) > 0:
|
|
the_dir = self.shared_dir
|
|
else:
|
|
the_dir = self.dir
|
|
|
|
if the_dir is not None and not the_dir.endswith('/'):
|
|
the_dir += '/'
|
|
|
|
filelist = self.list_filesystem(
|
|
the_dir, path, trans_data, file_type, show_hidden)
|
|
return filelist
|
|
|
|
def check_access(self, ss):
|
|
if self.shared_dir:
|
|
selected_dir_list = [sdir for sdir in config.SHARED_STORAGE if
|
|
sdir['name'] == ss]
|
|
selected_dir = selected_dir_list[0] if len(
|
|
selected_dir_list) == 1 else None
|
|
|
|
if selected_dir and selected_dir['restricted_access'] and \
|
|
not current_user.has_role("Administrator"):
|
|
raise PermissionError(ACCESS_DENIED_MESSAGE)
|
|
|
|
def rename(self, old=None, new=None):
|
|
"""
|
|
Rename file or folder
|
|
"""
|
|
if not self.validate_request('rename'):
|
|
return unauthorized(self.ERROR_NOT_ALLOWED['Error'])
|
|
|
|
if self.shared_dir:
|
|
the_dir = self.shared_dir
|
|
else:
|
|
the_dir = self.dir if self.dir is not None else ''
|
|
|
|
Filemanager.check_access_permission(the_dir, old)
|
|
Filemanager.check_access_permission(the_dir, new)
|
|
|
|
# check if it's dir
|
|
if old[-1] == '/':
|
|
old = old[:-1]
|
|
|
|
# extract filename
|
|
oldname = split_path(old)[-1]
|
|
path = old
|
|
path = split_path(path)[0] # extract path
|
|
|
|
if path[-1] != '/':
|
|
path += '/'
|
|
|
|
newname = new
|
|
newpath = path + newname
|
|
|
|
# make system old path
|
|
oldpath_sys = "{0}{1}".format(the_dir, old)
|
|
newpath_sys = "{0}{1}".format(the_dir, newpath)
|
|
|
|
try:
|
|
os.rename(oldpath_sys, newpath_sys)
|
|
except OSError as e:
|
|
return internal_server_error("{0} {1}".format(
|
|
gettext('There was an error renaming the file:'), e.strerror))
|
|
|
|
return {
|
|
'Old Path': old,
|
|
'Old Name': oldname,
|
|
'New Path': newpath,
|
|
'New Name': newname,
|
|
}
|
|
|
|
def delete(self, path=None):
|
|
"""
|
|
Delete file or folder
|
|
"""
|
|
if not self.validate_request('delete'):
|
|
return unauthorized(self.ERROR_NOT_ALLOWED['Error'])
|
|
if self.shared_dir:
|
|
the_dir = self.shared_dir
|
|
else:
|
|
the_dir = self.dir if self.dir is not None else ''
|
|
orig_path = "{0}{1}".format(the_dir, path)
|
|
|
|
Filemanager.check_access_permission(the_dir, path)
|
|
|
|
try:
|
|
if os.path.isdir(orig_path):
|
|
os.rmdir(orig_path)
|
|
else:
|
|
os.remove(orig_path)
|
|
except OSError as e:
|
|
return internal_server_error("{0} {1}".format(
|
|
gettext('There was an error deleting the file:'), e.strerror))
|
|
|
|
return make_json_response(status=200)
|
|
|
|
def add(self, req=None):
|
|
"""
|
|
File upload functionality
|
|
"""
|
|
if not self.validate_request('upload'):
|
|
return unauthorized(self.ERROR_NOT_ALLOWED['Error'])
|
|
|
|
if self.shared_dir:
|
|
the_dir = self.shared_dir
|
|
else:
|
|
the_dir = self.dir if self.dir is not None else ''
|
|
|
|
try:
|
|
path = req.form.get('currentpath')
|
|
|
|
file_obj = req.files['newfile']
|
|
file_name = file_obj.filename
|
|
orig_path = "{0}{1}".format(the_dir, path)
|
|
new_name = "{0}{1}".format(orig_path, file_name)
|
|
|
|
try:
|
|
# Check if the new file is inside the users directory
|
|
if config.SERVER_MODE:
|
|
pathlib.Path(
|
|
os.path.abspath(
|
|
os.path.join(the_dir, new_name)
|
|
)
|
|
).relative_to(the_dir)
|
|
except ValueError:
|
|
return unauthorized(self.ERROR_NOT_ALLOWED['Error'])
|
|
|
|
with open(new_name, 'wb') as f:
|
|
while True:
|
|
# 4MB chunk (4 * 1024 * 1024 Bytes)
|
|
data = file_obj.read(4194304)
|
|
if not data:
|
|
break
|
|
f.write(data)
|
|
except OSError as e:
|
|
return internal_server_error("{0} {1}".format(
|
|
gettext('There was an error adding the file:'), e.strerror))
|
|
|
|
Filemanager.check_access_permission(the_dir, path)
|
|
|
|
return {
|
|
'Path': path,
|
|
'Name': new_name,
|
|
}
|
|
|
|
def is_file_exist(self, path, name):
|
|
"""
|
|
Checks whether given file exists or not
|
|
"""
|
|
the_dir = self.dir if self.dir is not None else ''
|
|
code = 1
|
|
|
|
name = unquote(name)
|
|
path = unquote(path)
|
|
|
|
orig_path = "{0}{1}".format(the_dir, path)
|
|
Filemanager.check_access_permission(
|
|
the_dir, "{}{}".format(path, name))
|
|
|
|
new_name = "{0}{1}".format(orig_path, name)
|
|
if not os.path.exists(new_name):
|
|
code = 0
|
|
|
|
return {
|
|
'Path': path,
|
|
'Name': name,
|
|
'Code': code,
|
|
}
|
|
|
|
@staticmethod
|
|
def get_new_name(in_dir, path, name):
|
|
"""
|
|
Utility to provide new name for folder if file
|
|
with same name already exists
|
|
"""
|
|
new_name = name
|
|
count = 0
|
|
while True:
|
|
if not (path.endswith("/") or name.startswith("/")):
|
|
path = path + "/"
|
|
file_path = "{}{}/".format(path, new_name)
|
|
create_path = file_path
|
|
if in_dir != "":
|
|
create_path = TWO_PARAM_STRING.format(in_dir, file_path)
|
|
|
|
if not path_exists(create_path):
|
|
return create_path, file_path, new_name
|
|
else:
|
|
count += 1
|
|
new_name = "{}_{}".format(name, count)
|
|
|
|
@staticmethod
|
|
def check_file_for_bom_and_binary(filename, enc="utf-8"):
|
|
"""
|
|
This utility function will check if file is Binary file
|
|
and/or if it startswith BOM character
|
|
|
|
Args:
|
|
filename: File
|
|
enc: Encoding for the file
|
|
|
|
Returns:
|
|
Status(Error?), Error message, Binary file flag,
|
|
BOM character flag and Encoding to open file
|
|
"""
|
|
status = True
|
|
err_msg = None
|
|
is_startswith_bom = False
|
|
is_binary = False
|
|
|
|
# check if file type is text or binary
|
|
text_chars = bytearray([7, 8, 9, 10, 12, 13, 27]) \
|
|
+ bytearray(range(0x20, 0x7f)) \
|
|
+ bytearray(range(0x80, 0x100))
|
|
|
|
def is_binary_string(bytes_data):
|
|
"""Checks if string data is binary"""
|
|
return bool(
|
|
bytes_data.translate(None, text_chars)
|
|
)
|
|
|
|
# read the file
|
|
try:
|
|
|
|
with open(filename, 'rb') as f:
|
|
file_data = f.read(1024)
|
|
|
|
# Check for BOM in file data
|
|
for encoding, boms in \
|
|
('utf-8-sig', (codecs.BOM_UTF8,)), \
|
|
('utf-16', (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE)), \
|
|
('utf-32', (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE)):
|
|
if any(file_data.startswith(bom) for bom in boms):
|
|
is_startswith_bom = True
|
|
enc = encoding
|
|
|
|
# No need to check for binary file, a BOM marker already
|
|
# indicates that text stream afterwards
|
|
if not is_startswith_bom:
|
|
# Check if string is binary
|
|
is_binary = is_binary_string(file_data)
|
|
|
|
# Store encoding for future use
|
|
Filemanager.loaded_file_encoding_list.\
|
|
append({os.path.basename(filename): enc})
|
|
|
|
except IOError as ex:
|
|
# we don't want to expose real path of file
|
|
# so only show error message.
|
|
if ex.strerror == 'Permission denied':
|
|
return unauthorized(str(ex.strerror))
|
|
else:
|
|
return internal_server_error(str(ex.strerror))
|
|
|
|
except Exception as ex:
|
|
return internal_server_error(str(ex.strerror))
|
|
|
|
# Remove root storage path from error message
|
|
# when running in Server mode
|
|
if not status and not current_app.PGADMIN_RUNTIME:
|
|
storage_directory = get_storage_directory()
|
|
if storage_directory:
|
|
err_msg = err_msg.replace(storage_directory, '')
|
|
|
|
return status, err_msg, is_binary, is_startswith_bom, enc
|
|
|
|
def addfolder(self, path, name):
|
|
"""
|
|
Functionality to create new folder
|
|
"""
|
|
if not self.validate_request('create'):
|
|
return unauthorized(self.ERROR_NOT_ALLOWED['Error'])
|
|
|
|
if self.shared_dir and len(config.SHARED_STORAGE) > 0:
|
|
user_dir = self.shared_dir
|
|
else:
|
|
user_dir = self.dir if self.dir is not None else ''
|
|
|
|
Filemanager.check_access_permission(user_dir, "{}{}".format(
|
|
path, name))
|
|
|
|
create_path, new_path, new_name = \
|
|
self.get_new_name(user_dir, path, name)
|
|
try:
|
|
os.mkdir(create_path)
|
|
except OSError as e:
|
|
return internal_server_error(str(e.strerror))
|
|
|
|
result = {
|
|
'Parent': path,
|
|
'Path': new_path,
|
|
'Name': new_name,
|
|
'Date Modified': time.ctime(time.time())
|
|
}
|
|
|
|
return result
|
|
|
|
def download(self, path=None):
|
|
"""
|
|
Functionality to download file
|
|
"""
|
|
if not self.validate_request('download'):
|
|
return unauthorized(self.ERROR_NOT_ALLOWED['Error'])
|
|
|
|
if self.shared_dir and len(config.SHARED_STORAGE) > 0:
|
|
the_dir = self.shared_dir
|
|
else:
|
|
the_dir = self.dir if self.dir is not None else ''
|
|
|
|
orig_path = "{0}{1}".format(the_dir, path)
|
|
|
|
Filemanager.check_access_permission(
|
|
the_dir, "{}{}".format(path, path)
|
|
)
|
|
|
|
filename = os.path.basename(path)
|
|
if orig_path and len(orig_path) > 0:
|
|
dir_path = os.path.dirname(orig_path)
|
|
else:
|
|
dir_path = os.path.dirname(path)
|
|
|
|
response = send_from_directory(dir_path, filename,
|
|
mimetype='application/octet-stream',
|
|
as_attachment=True)
|
|
response.headers["filename"] = filename
|
|
|
|
return response
|
|
|
|
def permission(self, path=None):
|
|
the_dir = self.dir if self.dir is not None else ''
|
|
res = {'Code': 1}
|
|
Filemanager.check_access_permission(the_dir, path)
|
|
return res
|
|
|
|
|
|
@blueprint.route(
|
|
"/filemanager/<int:trans_id>/",
|
|
methods=["POST"], endpoint='filemanager'
|
|
)
|
|
@pga_login_required
|
|
def file_manager(trans_id):
|
|
"""
|
|
It is the common function for every call which is made
|
|
and takes function name from post request and calls it.
|
|
It gets unique transaction id from post request and
|
|
rotate it into Filemanager class.
|
|
"""
|
|
mode = ''
|
|
kwargs = {}
|
|
if req.method == 'POST':
|
|
if req.files:
|
|
mode = 'add'
|
|
kwargs = {'req': req,
|
|
'storage_folder': req.form.get('storage_folder', None)}
|
|
else:
|
|
kwargs = json.loads(req.data)
|
|
kwargs['req'] = req
|
|
mode = kwargs['mode']
|
|
del kwargs['mode']
|
|
elif req.method == 'GET':
|
|
kwargs = {
|
|
'path': req.args['path'],
|
|
'name': req.args['name'] if 'name' in req.args else ''
|
|
}
|
|
mode = req.args['mode']
|
|
ss = kwargs['storage_folder'] if 'storage_folder' in kwargs else None
|
|
my_fm = Filemanager(trans_id, ss)
|
|
|
|
if ss and mode in ['upload', 'rename', 'delete', 'addfolder', 'add',
|
|
'permission']:
|
|
my_fm.check_access(ss)
|
|
func = getattr(my_fm, mode)
|
|
try:
|
|
if mode in ['getfolder', 'download']:
|
|
kwargs.pop('name', None)
|
|
|
|
if mode in ['add']:
|
|
kwargs.pop('storage_folder', None)
|
|
|
|
if mode in ['addfolder', 'getfolder', 'rename', 'delete',
|
|
'is_file_exist', 'req', 'permission', 'download']:
|
|
kwargs.pop('req', None)
|
|
kwargs.pop('storage_folder', None)
|
|
|
|
res = func(**kwargs)
|
|
except PermissionError as e:
|
|
return unauthorized(str(e))
|
|
|
|
if isinstance(res, Response):
|
|
return res
|
|
return make_json_response(data={'result': res, 'status': True})
|