mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-01-24 23:36:48 -06:00
Fixed cognitive complexity issues reported by SonarQube.
This commit is contained in:
parent
43d97b0e6f
commit
f3462e36a4
@ -95,7 +95,8 @@ def is_folder_hidden(filepath):
|
||||
except (AttributeError, AssertionError):
|
||||
result = False
|
||||
return result
|
||||
return False
|
||||
else:
|
||||
return os.path.basename(filepath).startswith('.')
|
||||
|
||||
|
||||
class FileManagerModule(PgAdminModule):
|
||||
@ -338,6 +339,35 @@ class Filemanager(object):
|
||||
if self.dir is not None and isinstance(self.dir, list):
|
||||
self.dir = ""
|
||||
|
||||
@staticmethod
|
||||
def get_closest_parent(storage_dir, last_dir):
|
||||
"""
|
||||
Check if path exists and if not then get closest parent which exists
|
||||
:param storage_dir: Base dir
|
||||
:param last_dir: check dir
|
||||
:return: exist dir
|
||||
"""
|
||||
if len(last_dir) > 1 and \
|
||||
(last_dir.endswith('/') or last_dir.endswith('\\')):
|
||||
last_dir = last_dir[:-1]
|
||||
while last_dir:
|
||||
if os.path.exists(storage_dir or '' + last_dir):
|
||||
break
|
||||
index = max(last_dir.rfind('\\'), last_dir.rfind('/')) \
|
||||
if _platform == 'win32' else last_dir.rfind('/')
|
||||
last_dir = last_dir[0:index]
|
||||
|
||||
if _platform == 'win32':
|
||||
if not last_dir.endswith('\\'):
|
||||
last_dir += "\\"
|
||||
|
||||
return last_dir
|
||||
|
||||
if not last_dir.endswith('/'):
|
||||
last_dir += "/"
|
||||
|
||||
return last_dir
|
||||
|
||||
@staticmethod
|
||||
def create_new_transaction(params):
|
||||
"""
|
||||
@ -357,29 +387,38 @@ class Filemanager(object):
|
||||
# It is used in utitlity js to decide to
|
||||
# show or hide select file type options
|
||||
show_volumes = isinstance(storage_dir, list) or not storage_dir
|
||||
supp_types = allow_upload_files = params['supported_types'] \
|
||||
if 'supported_types' in params else []
|
||||
if fm_type == 'select_file':
|
||||
capabilities = ['select_file', 'rename', 'upload', 'create']
|
||||
files_only = True
|
||||
folders_only = False
|
||||
title = gettext("Select File")
|
||||
elif fm_type == 'select_folder':
|
||||
capabilities = ['select_folder', 'rename', 'create']
|
||||
files_only = False
|
||||
folders_only = True
|
||||
title = gettext("Select Folder")
|
||||
elif fm_type == 'create_file':
|
||||
capabilities = ['select_file', 'rename', 'create']
|
||||
files_only = True
|
||||
folders_only = False
|
||||
title = gettext("Create File")
|
||||
elif fm_type == 'storage_dialog':
|
||||
capabilities = ['select_folder', 'select_file', 'download',
|
||||
'rename', 'delete', 'upload', 'create']
|
||||
files_only = True
|
||||
folders_only = False
|
||||
title = gettext("Storage Manager")
|
||||
supp_types = allow_upload_files = params.get('supported_types', [])
|
||||
|
||||
# tuples with (capabilities, files_only, folders_only, title)
|
||||
capability_map = {
|
||||
'select_file': (
|
||||
['select_file', 'rename', 'upload', 'create'],
|
||||
True,
|
||||
False,
|
||||
gettext("Select File")
|
||||
),
|
||||
'select_folder': (
|
||||
['select_folder', 'rename', 'create'],
|
||||
False,
|
||||
True,
|
||||
gettext("Select Folder")
|
||||
),
|
||||
'create_file': (
|
||||
['select_file', 'rename', 'create'],
|
||||
True,
|
||||
False,
|
||||
gettext("Create File")
|
||||
),
|
||||
'storage_dialog': (
|
||||
['select_folder', 'select_file', 'download',
|
||||
'rename', 'delete', 'upload', 'create'],
|
||||
True,
|
||||
False,
|
||||
gettext("Storage Manager")
|
||||
),
|
||||
}
|
||||
|
||||
capabilities, files_only, folders_only, title = capability_map[fm_type]
|
||||
|
||||
# Using os.path.join to make sure we have trailing '/' or '\'
|
||||
homedir = '/' if (config.SERVER_MODE) \
|
||||
@ -389,43 +428,16 @@ class Filemanager(object):
|
||||
# order to find closest parent directory
|
||||
last_dir = blueprint.last_directory_visited.get()
|
||||
check_dir_exists = False
|
||||
if storage_dir is None:
|
||||
if last_dir is None:
|
||||
last_dir = "/"
|
||||
else:
|
||||
check_dir_exists = True
|
||||
if last_dir is None:
|
||||
last_dir = "/"
|
||||
else:
|
||||
if last_dir is not None:
|
||||
check_dir_exists = True
|
||||
else:
|
||||
last_dir = "/"
|
||||
check_dir_exists = True
|
||||
|
||||
if not config.SERVER_MODE and last_dir == "/" or last_dir == "/":
|
||||
last_dir = homedir
|
||||
|
||||
if check_dir_exists:
|
||||
if len(last_dir) > 1 and \
|
||||
(last_dir.endswith('/') or last_dir.endswith('\\')):
|
||||
last_dir = last_dir[:-1]
|
||||
while last_dir:
|
||||
if os.path.exists(
|
||||
storage_dir
|
||||
if storage_dir is not None else '' + last_dir):
|
||||
break
|
||||
if _platform == 'win32':
|
||||
index = max(last_dir.rfind('\\'), last_dir.rfind('/'))
|
||||
else:
|
||||
index = last_dir.rfind('/')
|
||||
last_dir = last_dir[0:index]
|
||||
if not last_dir:
|
||||
last_dir = "/"
|
||||
|
||||
if _platform == 'win32':
|
||||
if not (last_dir.endswith('\\') or last_dir.endswith('/')):
|
||||
last_dir += "\\"
|
||||
else:
|
||||
if not last_dir.endswith('/'):
|
||||
last_dir += "/"
|
||||
last_dir = Filemanager.get_closest_parent(storage_dir, last_dir)
|
||||
|
||||
# create configs using above configs
|
||||
configs = {
|
||||
@ -502,7 +514,7 @@ class Filemanager(object):
|
||||
return make_json_response(data={'status': True})
|
||||
|
||||
@staticmethod
|
||||
def _get_drives(drive_name=None):
|
||||
def _get_drives_with_size(drive_name=None):
|
||||
"""
|
||||
This is a generic function which returns the default path for storage
|
||||
manager dialog irrespective of any Platform type to list all
|
||||
@ -512,21 +524,29 @@ class Filemanager(object):
|
||||
Platform unix:
|
||||
it returns path to root directory if no path is specified.
|
||||
"""
|
||||
def _get_drive_size(path):
|
||||
try:
|
||||
drive_size = getdrivesize(path)
|
||||
return sizeof_fmt(drive_size)
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
if _platform == "win32":
|
||||
try:
|
||||
drives = []
|
||||
bitmask = ctypes.windll.kernel32.GetLogicalDrives()
|
||||
for letter in string.ascii_uppercase:
|
||||
if bitmask & 1:
|
||||
drives.append(letter)
|
||||
drives.append((letter, _get_drive_size(letter)))
|
||||
bitmask >>= 1
|
||||
if (drive_name != '' and drive_name is not None and
|
||||
drive_name in drives):
|
||||
return "{0}{1}".format(drive_name, ':')
|
||||
letter = "{0}{1}".format(drive_name, ':')
|
||||
return (letter, _get_drive_size(letter))
|
||||
else:
|
||||
return drives # return drives if no argument is passed
|
||||
except Exception:
|
||||
return ['C:']
|
||||
return [('C:', _get_drive_size('C:'))]
|
||||
else:
|
||||
return '/'
|
||||
|
||||
@ -548,6 +568,79 @@ class Filemanager(object):
|
||||
# Resume windows error
|
||||
kernel32.SetThreadErrorMode(oldmode, ctypes.byref(oldmode))
|
||||
|
||||
@staticmethod
|
||||
def _skip_file_extension(
|
||||
file_type, supported_types, folders_only, file_extension):
|
||||
"""
|
||||
Used internally by get_files_in_path to check if
|
||||
the file extn to be skipped
|
||||
"""
|
||||
return file_type is not None and file_type != "*" and (
|
||||
folders_only or len(supported_types) > 0 and
|
||||
file_extension not in supported_types or
|
||||
file_type != file_extension)
|
||||
|
||||
@staticmethod
|
||||
def get_files_in_path(
|
||||
show_hidden_files, files_only, folders_only, supported_types,
|
||||
file_type, user_dir, orig_path):
|
||||
"""
|
||||
Get list of files and dirs in the path
|
||||
:param show_hidden_files: boolean
|
||||
:param files_only: boolean
|
||||
:param folders_only: boolean
|
||||
:param supported_types: array of supported types
|
||||
:param file_type: file type
|
||||
:param user_dir: base user dir
|
||||
:param orig_path: path after user dir
|
||||
:return:
|
||||
"""
|
||||
files = {}
|
||||
|
||||
for f in sorted(os.listdir(orig_path)):
|
||||
system_path = os.path.join(os.path.join(orig_path, f))
|
||||
|
||||
# continue if file/folder is hidden (based on user preference)
|
||||
if not show_hidden_files and is_folder_hidden(system_path):
|
||||
continue
|
||||
|
||||
user_path = os.path.join(os.path.join(user_dir, f))
|
||||
created = time.ctime(os.path.getctime(system_path))
|
||||
modified = time.ctime(os.path.getmtime(system_path))
|
||||
file_extension = str(splitext(system_path))
|
||||
|
||||
# set protected to 1 if no write or read permission
|
||||
protected = 0
|
||||
if (not os.access(system_path, os.R_OK) or
|
||||
not os.access(system_path, os.W_OK)):
|
||||
protected = 1
|
||||
|
||||
# list files only or folders only
|
||||
if os.path.isdir(system_path):
|
||||
if files_only == 'true':
|
||||
continue
|
||||
file_extension = "dir"
|
||||
user_path = "{0}/".format(user_path)
|
||||
# filter files based on file_type
|
||||
elif Filemanager._skip_file_extension(
|
||||
file_type, supported_types, folders_only, file_extension):
|
||||
continue
|
||||
|
||||
# create a list of files and folders
|
||||
files[f] = {
|
||||
"Filename": f,
|
||||
"Path": user_path,
|
||||
"file_type": file_extension,
|
||||
"Protected": protected,
|
||||
"Properties": {
|
||||
"Date Created": created,
|
||||
"Date Modified": modified,
|
||||
"Size": sizeof_fmt(getsize(system_path))
|
||||
}
|
||||
}
|
||||
|
||||
return files
|
||||
|
||||
@staticmethod
|
||||
def list_filesystem(in_dir, path, trans_data, file_type, show_hidden):
|
||||
"""
|
||||
@ -572,25 +665,18 @@ class Filemanager(object):
|
||||
files = {}
|
||||
if (_platform == "win32" and (path == '/' or path == '\\'))\
|
||||
and in_dir is None:
|
||||
drives = Filemanager._get_drives()
|
||||
for drive in drives:
|
||||
protected = 0
|
||||
drives = Filemanager._get_drives_with_size()
|
||||
for drive, drive_size in drives:
|
||||
path = file_name = "{0}:".format(drive)
|
||||
try:
|
||||
drive_size = getdrivesize(path)
|
||||
drive_size_in_units = sizeof_fmt(drive_size)
|
||||
except Exception:
|
||||
drive_size = 0
|
||||
protected = 1 if drive_size == 0 else 0
|
||||
files[file_name] = {
|
||||
"Filename": file_name,
|
||||
"Path": path,
|
||||
"file_type": 'drive',
|
||||
"Protected": protected,
|
||||
"Protected": 1 if drive_size == 0 else 0,
|
||||
"Properties": {
|
||||
"Date Created": "",
|
||||
"Date Modified": "",
|
||||
"Size": drive_size_in_units
|
||||
"Size": drive_size
|
||||
}
|
||||
}
|
||||
Filemanager.resume_windows_warning()
|
||||
@ -606,68 +692,23 @@ class Filemanager(object):
|
||||
}
|
||||
|
||||
user_dir = path
|
||||
folders_only = trans_data['folders_only'] \
|
||||
if 'folders_only' in trans_data else ''
|
||||
files_only = trans_data['files_only'] \
|
||||
if 'files_only' in trans_data else ''
|
||||
supported_types = trans_data['supported_types'] \
|
||||
if 'supported_types' in trans_data else []
|
||||
folders_only = trans_data.get('folders_only', '')
|
||||
files_only = trans_data.get('files_only', '')
|
||||
supported_types = trans_data.get('supported_types', [])
|
||||
|
||||
orig_path = unquote(orig_path)
|
||||
try:
|
||||
mylist = [x for x in sorted(os.listdir(orig_path))]
|
||||
for f in mylist:
|
||||
protected = 0
|
||||
system_path = os.path.join(os.path.join(orig_path, f))
|
||||
|
||||
# continue if file/folder is hidden (based on user preference)
|
||||
if not is_show_hidden_files and \
|
||||
(is_folder_hidden(system_path) or f.startswith('.')):
|
||||
continue
|
||||
|
||||
user_path = os.path.join(os.path.join(user_dir, f))
|
||||
created = time.ctime(os.path.getctime(system_path))
|
||||
modified = time.ctime(os.path.getmtime(system_path))
|
||||
file_extension = str(splitext(system_path))
|
||||
|
||||
# set protected to 1 if no write or read permission
|
||||
if (not os.access(system_path, os.R_OK) or
|
||||
not os.access(system_path, os.W_OK)):
|
||||
protected = 1
|
||||
|
||||
# list files only or folders only
|
||||
if os.path.isdir(system_path):
|
||||
if files_only == 'true':
|
||||
continue
|
||||
file_extension = "dir"
|
||||
user_path = "{0}/".format(user_path)
|
||||
else:
|
||||
# filter files based on file_type
|
||||
if file_type is not None and file_type != "*" and \
|
||||
(folders_only or len(supported_types) > 0 and
|
||||
file_extension not in supported_types or
|
||||
file_type != file_extension):
|
||||
continue
|
||||
|
||||
# create a list of files and folders
|
||||
files[f] = {
|
||||
"Filename": f,
|
||||
"Path": user_path,
|
||||
"file_type": file_extension,
|
||||
"Protected": protected,
|
||||
"Properties": {
|
||||
"Date Created": created,
|
||||
"Date Modified": modified,
|
||||
"Size": sizeof_fmt(getsize(system_path))
|
||||
}
|
||||
}
|
||||
files = Filemanager.get_files_in_path(
|
||||
is_show_hidden_files, files_only, folders_only,
|
||||
supported_types, file_type, user_dir, orig_path
|
||||
)
|
||||
except Exception as e:
|
||||
Filemanager.resume_windows_warning()
|
||||
err_msg = str(e)
|
||||
if (hasattr(e, 'strerror') and
|
||||
e.strerror == gettext('Permission denied')):
|
||||
err_msg = str(e.strerror)
|
||||
else:
|
||||
err_msg = str(e)
|
||||
|
||||
files = {
|
||||
'Code': 0,
|
||||
'Error': err_msg
|
||||
|
Loading…
Reference in New Issue
Block a user